diff options
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 285 |
1 files changed, 51 insertions, 234 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 7550ce111..1ef880396 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -17,10 +17,9 @@ #include <zencore/workthreadpool.h> #include <zenhttp/httpserver.h> #include <zenhttp/httpshared.h> -#include <zenutil/basicfile.h> #include <zenutil/cache/cache.h> +#include <zenutil/cache/rpcrecording.h> -//#include "cachekey.h" #include "monitoring/httpstats.h" #include "structuredcachestore.h" #include "upstream/jupiter.h" @@ -35,6 +34,7 @@ #include <queue> #include <thread> +#include <cpr/cpr.h> #include <gsl/gsl-lite.hpp> #if ZEN_WITH_TESTS @@ -46,187 +46,6 @@ namespace zen { using namespace std::literals; -namespace cache::detail { - struct RecordedRequest - { - uint64_t Offset; - uint64_t Length; - ZenContentType ContentType; - }; - - const uint64_t RecordedRequestBlockSize = 1ull << 31u; - - struct RecordedRequestsWriter - { - void BeginWrite(const std::filesystem::path& BasePath) - { - m_BasePath = BasePath; - std::filesystem::create_directories(m_BasePath); - } - - void EndWrite() - { - RwLock::ExclusiveLockScope _(m_Lock); - m_BlockFiles.clear(); - - IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest)); - BasicFile IndexFile; - IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate); - std::error_code Ec; - IndexFile.WriteAll(IndexBuffer, Ec); - IndexFile.Close(); - m_Entries.clear(); - } - - uint64_t WriteRequest(ZenContentType ContentType, const IoBuffer& RequestBuffer) - { - RwLock::ExclusiveLockScope Lock(m_Lock); - uint64_t RequestIndex = m_Entries.size(); - RecordedRequest& Entry = - m_Entries.emplace_back(RecordedRequest{.Offset = ~0ull, .Length = RequestBuffer.Size(), .ContentType = ContentType}); - if (Entry.Length < 1 * 1024 * 1024) - { - uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize); - if (BlockIndex == m_BlockFiles.size()) - { - std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>()); - NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate); - m_ChunkOffset = BlockIndex * RecordedRequestBlockSize; - } - ZEN_ASSERT(BlockIndex < m_BlockFiles.size()); - BasicFile* BlockFile = m_BlockFiles[BlockIndex].get(); - ZEN_ASSERT(BlockFile != nullptr); - - Entry.Offset = m_ChunkOffset; - m_ChunkOffset = RoundUp(m_ChunkOffset + Entry.Length, 1u << 4u); - Lock.ReleaseNow(); - - std::error_code Ec; - BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), Entry.Offset - BlockIndex * RecordedRequestBlockSize, Ec); - if (Ec) - { - Entry.Length = 0; - return ~0ull; - } - return RequestIndex; - } - Lock.ReleaseNow(); - - BasicFile RequestFile; - RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate); - std::error_code Ec; - RequestFile.WriteAll(RequestBuffer, Ec); - if (Ec) - { - Entry.Length = 0; - return ~0ull; - } - return RequestIndex; - } - - std::filesystem::path m_BasePath; - mutable RwLock m_Lock; - std::vector<RecordedRequest> m_Entries; - std::vector<std::unique_ptr<BasicFile>> m_BlockFiles; - uint64_t m_ChunkOffset; - }; - - struct RecordedRequestsReader - { - uint64_t BeginRead(const std::filesystem::path& BasePath) - { - m_BasePath = BasePath; - BasicFile IndexFile; - IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kRead); - m_Entries.resize(IndexFile.FileSize() / sizeof(RecordedRequest)); - IndexFile.Read(m_Entries.data(), IndexFile.FileSize(), 0); - uint64_t MaxChunkPosition = 0; - for (const RecordedRequest& R : m_Entries) - { - if (R.Offset != ~0ull) - { - MaxChunkPosition = Max(MaxChunkPosition, R.Offset + R.Length); - } - } - uint32_t BlockCount = gsl::narrow<uint32_t>(MaxChunkPosition / RecordedRequestBlockSize) + 1; - m_BlockFiles.resize(BlockCount); - for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; ++BlockIndex) - { - m_BlockFiles[BlockIndex] = std::make_unique<BasicFile>(); - m_BlockFiles[BlockIndex]->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kRead); - } - return m_Entries.size(); - } - void EndRead() { m_BlockFiles.clear(); } - - ZenContentType ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const - { - if (RequestIndex >= m_Entries.size()) - { - return ZenContentType::kUnknownContentType; - } - const RecordedRequest& Entry = m_Entries[RequestIndex]; - if (Entry.Length == 0) - { - return ZenContentType::kUnknownContentType; - } - if (Entry.Offset != ~0ull) - { - uint32_t BlockIndex = gsl::narrow<uint32_t>((Entry.Offset + Entry.Length) / RecordedRequestBlockSize); - uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize); - OutBuffer = IoBuffer(Entry.Length); - MutableMemoryView OutView = OutBuffer.GetMutableView(); - m_BlockFiles[BlockIndex]->Read(OutView.GetData(), OutView.GetSize(), ChunkOffset); - return Entry.ContentType; - } - BasicFile ChunkFile; - ChunkFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kRead); - OutBuffer = ChunkFile.ReadAll(); - return Entry.ContentType; - } - - std::filesystem::path m_BasePath; - std::vector<RecordedRequest> m_Entries; - std::vector<std::unique_ptr<BasicFile>> m_BlockFiles; - }; - - class DiskRequestRecorder : public IRequestRecorder - { - public: - DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); } - virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); } - - private: - virtual uint64_t IRequestRecorder_RecordRequest(ZenContentType ContentType, const IoBuffer& RequestBuffer) override - { - return m_RecordedRequests.WriteRequest(ContentType, RequestBuffer); - } - virtual void IRequestRecorder_RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {} - virtual void IRequestRecorder_RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {} - RecordedRequestsWriter m_RecordedRequests; - }; - - class DiskRequestReplayer : public IRequestReplayer - { - public: - DiskRequestReplayer(const std::filesystem::path& BasePath) { m_RequestCount = m_RequestBuffer.BeginRead(BasePath); } - virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); } - - private: - virtual uint64_t IRequestReplayer_GetRequestCount() const override { return m_RequestCount; } - - virtual ZenContentType IRequestReplayer_GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override - { - return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer); - } - virtual ZenContentType IRequestRecorder_GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; } - - std::uint64_t m_RequestCount; - RecordedRequestsReader m_RequestBuffer; - }; - -} // namespace cache::detail - ////////////////////////////////////////////////////////////////////////// CachePolicy @@ -561,8 +380,8 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) { m_RequestRecorder.reset(); HttpServerRequest::QueryParams Params = Request.GetQueryParams(); - std::string_view RecordPath = Params.GetValue("path"); - m_RequestRecorder = std::make_unique<cache::detail::DiskRequestRecorder>(RecordPath); + std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path"))); + m_RequestRecorder = cache::MakeDiskRequestRecorder(RecordPath); Request.WriteResponse(HttpResponseCode::OK); return; } @@ -576,7 +395,7 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) { m_RequestRecorder.reset(); HttpServerRequest::QueryParams Params = Request.GetQueryParams(); - std::string_view RecordPath = Params.GetValue("path"); + std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path"))); uint32_t ThreadCount = std::thread::hardware_concurrency(); if (auto Param = Params.GetValue("thread_count"); Param.empty() == false) { @@ -585,8 +404,8 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) ThreadCount = gsl::narrow<uint32_t>(Value.value()); } } - cache::detail::DiskRequestReplayer Replayer(RecordPath); - ReplayRequestRecorder(Replayer, ThreadCount < 1 ? 1 : ThreadCount); + std::unique_ptr<cache::IRpcRequestReplayer> Replayer(cache::MakeDiskRequestReplayer(RecordPath, false)); + ReplayRequestRecorder(*Replayer, ThreadCount < 1 ? 1 : ThreadCount); Request.WriteResponse(HttpResponseCode::OK); return; } @@ -1516,10 +1335,10 @@ HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType, } void -HttpStructuredCacheService::ReplayRequestRecorder(cache::detail::IRequestReplayer& Replayer, uint32_t ThreadCount) +HttpStructuredCacheService::ReplayRequestRecorder(cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount) { WorkerThreadPool WorkerPool(ThreadCount); - uint64_t RequestCount = Replayer.IRequestReplayer_GetRequestCount(); + uint64_t RequestCount = Replayer.GetRequestCount(); Stopwatch Timer; auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); }); Latch JobLatch(RequestCount); @@ -1527,13 +1346,13 @@ HttpStructuredCacheService::ReplayRequestRecorder(cache::detail::IRequestReplaye for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex) { WorkerPool.ScheduleWork([this, &JobLatch, &Replayer, RequestIndex]() { - IoBuffer Body; - ZenContentType ContentType = Replayer.IRequestReplayer_GetRequest(RequestIndex, Body); + IoBuffer Body; + std::pair<ZenContentType, ZenContentType> ContentType = Replayer.GetRequest(RequestIndex, Body); if (Body) { uint32_t AcceptMagic = 0; RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; - CbPackage RpcResult = HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags); + CbPackage RpcResult = HandleRpcRequest(ContentType.first, std::move(Body), AcceptMagic, AcceptFlags); if (AcceptMagic == kCbPkgMagic) { bool AllowFileReferences = EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences); @@ -1579,51 +1398,49 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) return Request.WriteResponse(HttpResponseCode::BadRequest); } - Request.WriteResponseAsync([this, Body = Request.ReadPayload(), ContentType](HttpServerRequest& AsyncRequest) mutable { - std::uint64_t RequestIndex = - m_RequestRecorder ? m_RequestRecorder->IRequestRecorder_RecordRequest(ContentType, Body) : ~0ull; - uint32_t AcceptMagic = 0; - RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; - CbPackage RpcResult = HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags); - if (RpcResult.IsNull()) - { - AsyncRequest.WriteResponse(HttpResponseCode::BadRequest); - return; - } - if (AcceptMagic == kCbPkgMagic) - { - bool AllowFileReferences = EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences); - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer( - RpcResult, - AllowFileReferences ? FormatFlags::kAllowLocalReferences | FormatFlags::kDenyPartialLocalReferences - : FormatFlags::kDefault); - if (RequestIndex != ~0ull) + Request.WriteResponseAsync( + [this, Body = Request.ReadPayload(), ContentType, AcceptType](HttpServerRequest& AsyncRequest) mutable { + std::uint64_t RequestIndex = + m_RequestRecorder ? m_RequestRecorder->RecordRequest(ContentType, AcceptType, Body) : ~0ull; + uint32_t AcceptMagic = 0; + RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; + CbPackage RpcResult = HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags); + if (RpcResult.IsNull()) { - ZEN_ASSERT(m_RequestRecorder); - m_RequestRecorder->IRequestRecorder_RecordResponse(RequestIndex, - HttpContentType::kCbPackage, - RpcResponseBuffer); + AsyncRequest.WriteResponse(HttpResponseCode::BadRequest); + return; } - AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); - } - else - { - BinaryWriter MemStream; - RpcResult.Save(MemStream); - - if (RequestIndex != ~0ull) + if (AcceptMagic == kCbPkgMagic) { - ZEN_ASSERT(m_RequestRecorder); - m_RequestRecorder->IRequestRecorder_RecordResponse( - RequestIndex, - HttpContentType::kCbPackage, - IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + bool AllowFileReferences = EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences); + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer( + RpcResult, + AllowFileReferences ? FormatFlags::kAllowLocalReferences | FormatFlags::kDenyPartialLocalReferences + : FormatFlags::kDefault); + if (RequestIndex != ~0ull) + { + ZEN_ASSERT(m_RequestRecorder); + m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer); + } + AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); } - AsyncRequest.WriteResponse(HttpResponseCode::OK, - HttpContentType::kCbPackage, - IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); - } - }); + else + { + BinaryWriter MemStream; + RpcResult.Save(MemStream); + + if (RequestIndex != ~0ull) + { + ZEN_ASSERT(m_RequestRecorder); + m_RequestRecorder->RecordResponse(RequestIndex, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + } + AsyncRequest.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + } + }); } break; default: |