aboutsummaryrefslogtreecommitdiff
path: root/zenserver/cache/structuredcache.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-02-17 08:20:46 +0000
committerGitHub <[email protected]>2023-02-17 00:20:46 -0800
commit2ec65fd3e171145450416ae76be1a7dd8c646704 (patch)
tree13bdb7e5deb60d4e58be2714cfdcf7f70272bc1e /zenserver/cache/structuredcache.cpp
parentExperimental ObjectStore/CDN like endpoint (diff)
downloadzen-2ec65fd3e171145450416ae76be1a7dd8c646704.tar.xz
zen-2ec65fd3e171145450416ae76be1a7dd8c646704.zip
Enhanced rpc request recording (#229)
* rpc replay zen command * fix replay sessions for thread * recording start/stop as zen commands * move rpcrecording code to zenutil to remove code duplication * simplify recording http request threading * added more data logging to rpc replay * NotFound is an acceptable response for an rpc request * fix rpc replay command line parsing * rpc replay stats * Allow spawning of sub-process workers when replaying rpc recording * changelog
Diffstat (limited to 'zenserver/cache/structuredcache.cpp')
-rw-r--r--zenserver/cache/structuredcache.cpp285
1 files changed, 51 insertions, 234 deletions
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 7550ce111..1ef880396 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -17,10 +17,9 @@
#include <zencore/workthreadpool.h>
#include <zenhttp/httpserver.h>
#include <zenhttp/httpshared.h>
-#include <zenutil/basicfile.h>
#include <zenutil/cache/cache.h>
+#include <zenutil/cache/rpcrecording.h>
-//#include "cachekey.h"
#include "monitoring/httpstats.h"
#include "structuredcachestore.h"
#include "upstream/jupiter.h"
@@ -35,6 +34,7 @@
#include <queue>
#include <thread>
+#include <cpr/cpr.h>
#include <gsl/gsl-lite.hpp>
#if ZEN_WITH_TESTS
@@ -46,187 +46,6 @@ namespace zen {
using namespace std::literals;
-namespace cache::detail {
- struct RecordedRequest
- {
- uint64_t Offset;
- uint64_t Length;
- ZenContentType ContentType;
- };
-
- const uint64_t RecordedRequestBlockSize = 1ull << 31u;
-
- struct RecordedRequestsWriter
- {
- void BeginWrite(const std::filesystem::path& BasePath)
- {
- m_BasePath = BasePath;
- std::filesystem::create_directories(m_BasePath);
- }
-
- void EndWrite()
- {
- RwLock::ExclusiveLockScope _(m_Lock);
- m_BlockFiles.clear();
-
- IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest));
- BasicFile IndexFile;
- IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate);
- std::error_code Ec;
- IndexFile.WriteAll(IndexBuffer, Ec);
- IndexFile.Close();
- m_Entries.clear();
- }
-
- uint64_t WriteRequest(ZenContentType ContentType, const IoBuffer& RequestBuffer)
- {
- RwLock::ExclusiveLockScope Lock(m_Lock);
- uint64_t RequestIndex = m_Entries.size();
- RecordedRequest& Entry =
- m_Entries.emplace_back(RecordedRequest{.Offset = ~0ull, .Length = RequestBuffer.Size(), .ContentType = ContentType});
- if (Entry.Length < 1 * 1024 * 1024)
- {
- uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize);
- if (BlockIndex == m_BlockFiles.size())
- {
- std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>());
- NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate);
- m_ChunkOffset = BlockIndex * RecordedRequestBlockSize;
- }
- ZEN_ASSERT(BlockIndex < m_BlockFiles.size());
- BasicFile* BlockFile = m_BlockFiles[BlockIndex].get();
- ZEN_ASSERT(BlockFile != nullptr);
-
- Entry.Offset = m_ChunkOffset;
- m_ChunkOffset = RoundUp(m_ChunkOffset + Entry.Length, 1u << 4u);
- Lock.ReleaseNow();
-
- std::error_code Ec;
- BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), Entry.Offset - BlockIndex * RecordedRequestBlockSize, Ec);
- if (Ec)
- {
- Entry.Length = 0;
- return ~0ull;
- }
- return RequestIndex;
- }
- Lock.ReleaseNow();
-
- BasicFile RequestFile;
- RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate);
- std::error_code Ec;
- RequestFile.WriteAll(RequestBuffer, Ec);
- if (Ec)
- {
- Entry.Length = 0;
- return ~0ull;
- }
- return RequestIndex;
- }
-
- std::filesystem::path m_BasePath;
- mutable RwLock m_Lock;
- std::vector<RecordedRequest> m_Entries;
- std::vector<std::unique_ptr<BasicFile>> m_BlockFiles;
- uint64_t m_ChunkOffset;
- };
-
- struct RecordedRequestsReader
- {
- uint64_t BeginRead(const std::filesystem::path& BasePath)
- {
- m_BasePath = BasePath;
- BasicFile IndexFile;
- IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kRead);
- m_Entries.resize(IndexFile.FileSize() / sizeof(RecordedRequest));
- IndexFile.Read(m_Entries.data(), IndexFile.FileSize(), 0);
- uint64_t MaxChunkPosition = 0;
- for (const RecordedRequest& R : m_Entries)
- {
- if (R.Offset != ~0ull)
- {
- MaxChunkPosition = Max(MaxChunkPosition, R.Offset + R.Length);
- }
- }
- uint32_t BlockCount = gsl::narrow<uint32_t>(MaxChunkPosition / RecordedRequestBlockSize) + 1;
- m_BlockFiles.resize(BlockCount);
- for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; ++BlockIndex)
- {
- m_BlockFiles[BlockIndex] = std::make_unique<BasicFile>();
- m_BlockFiles[BlockIndex]->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kRead);
- }
- return m_Entries.size();
- }
- void EndRead() { m_BlockFiles.clear(); }
-
- ZenContentType ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const
- {
- if (RequestIndex >= m_Entries.size())
- {
- return ZenContentType::kUnknownContentType;
- }
- const RecordedRequest& Entry = m_Entries[RequestIndex];
- if (Entry.Length == 0)
- {
- return ZenContentType::kUnknownContentType;
- }
- if (Entry.Offset != ~0ull)
- {
- uint32_t BlockIndex = gsl::narrow<uint32_t>((Entry.Offset + Entry.Length) / RecordedRequestBlockSize);
- uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize);
- OutBuffer = IoBuffer(Entry.Length);
- MutableMemoryView OutView = OutBuffer.GetMutableView();
- m_BlockFiles[BlockIndex]->Read(OutView.GetData(), OutView.GetSize(), ChunkOffset);
- return Entry.ContentType;
- }
- BasicFile ChunkFile;
- ChunkFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kRead);
- OutBuffer = ChunkFile.ReadAll();
- return Entry.ContentType;
- }
-
- std::filesystem::path m_BasePath;
- std::vector<RecordedRequest> m_Entries;
- std::vector<std::unique_ptr<BasicFile>> m_BlockFiles;
- };
-
- class DiskRequestRecorder : public IRequestRecorder
- {
- public:
- DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); }
- virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); }
-
- private:
- virtual uint64_t IRequestRecorder_RecordRequest(ZenContentType ContentType, const IoBuffer& RequestBuffer) override
- {
- return m_RecordedRequests.WriteRequest(ContentType, RequestBuffer);
- }
- virtual void IRequestRecorder_RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {}
- virtual void IRequestRecorder_RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {}
- RecordedRequestsWriter m_RecordedRequests;
- };
-
- class DiskRequestReplayer : public IRequestReplayer
- {
- public:
- DiskRequestReplayer(const std::filesystem::path& BasePath) { m_RequestCount = m_RequestBuffer.BeginRead(BasePath); }
- virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); }
-
- private:
- virtual uint64_t IRequestReplayer_GetRequestCount() const override { return m_RequestCount; }
-
- virtual ZenContentType IRequestReplayer_GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override
- {
- return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer);
- }
- virtual ZenContentType IRequestRecorder_GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; }
-
- std::uint64_t m_RequestCount;
- RecordedRequestsReader m_RequestBuffer;
- };
-
-} // namespace cache::detail
-
//////////////////////////////////////////////////////////////////////////
CachePolicy
@@ -561,8 +380,8 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
{
m_RequestRecorder.reset();
HttpServerRequest::QueryParams Params = Request.GetQueryParams();
- std::string_view RecordPath = Params.GetValue("path");
- m_RequestRecorder = std::make_unique<cache::detail::DiskRequestRecorder>(RecordPath);
+ std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path")));
+ m_RequestRecorder = cache::MakeDiskRequestRecorder(RecordPath);
Request.WriteResponse(HttpResponseCode::OK);
return;
}
@@ -576,7 +395,7 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
{
m_RequestRecorder.reset();
HttpServerRequest::QueryParams Params = Request.GetQueryParams();
- std::string_view RecordPath = Params.GetValue("path");
+ std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path")));
uint32_t ThreadCount = std::thread::hardware_concurrency();
if (auto Param = Params.GetValue("thread_count"); Param.empty() == false)
{
@@ -585,8 +404,8 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
ThreadCount = gsl::narrow<uint32_t>(Value.value());
}
}
- cache::detail::DiskRequestReplayer Replayer(RecordPath);
- ReplayRequestRecorder(Replayer, ThreadCount < 1 ? 1 : ThreadCount);
+ std::unique_ptr<cache::IRpcRequestReplayer> Replayer(cache::MakeDiskRequestReplayer(RecordPath, false));
+ ReplayRequestRecorder(*Replayer, ThreadCount < 1 ? 1 : ThreadCount);
Request.WriteResponse(HttpResponseCode::OK);
return;
}
@@ -1516,10 +1335,10 @@ HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType,
}
void
-HttpStructuredCacheService::ReplayRequestRecorder(cache::detail::IRequestReplayer& Replayer, uint32_t ThreadCount)
+HttpStructuredCacheService::ReplayRequestRecorder(cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount)
{
WorkerThreadPool WorkerPool(ThreadCount);
- uint64_t RequestCount = Replayer.IRequestReplayer_GetRequestCount();
+ uint64_t RequestCount = Replayer.GetRequestCount();
Stopwatch Timer;
auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); });
Latch JobLatch(RequestCount);
@@ -1527,13 +1346,13 @@ HttpStructuredCacheService::ReplayRequestRecorder(cache::detail::IRequestReplaye
for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex)
{
WorkerPool.ScheduleWork([this, &JobLatch, &Replayer, RequestIndex]() {
- IoBuffer Body;
- ZenContentType ContentType = Replayer.IRequestReplayer_GetRequest(RequestIndex, Body);
+ IoBuffer Body;
+ std::pair<ZenContentType, ZenContentType> ContentType = Replayer.GetRequest(RequestIndex, Body);
if (Body)
{
uint32_t AcceptMagic = 0;
RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
- CbPackage RpcResult = HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags);
+ CbPackage RpcResult = HandleRpcRequest(ContentType.first, std::move(Body), AcceptMagic, AcceptFlags);
if (AcceptMagic == kCbPkgMagic)
{
bool AllowFileReferences = EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences);
@@ -1579,51 +1398,49 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
return Request.WriteResponse(HttpResponseCode::BadRequest);
}
- Request.WriteResponseAsync([this, Body = Request.ReadPayload(), ContentType](HttpServerRequest& AsyncRequest) mutable {
- std::uint64_t RequestIndex =
- m_RequestRecorder ? m_RequestRecorder->IRequestRecorder_RecordRequest(ContentType, Body) : ~0ull;
- uint32_t AcceptMagic = 0;
- RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
- CbPackage RpcResult = HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags);
- if (RpcResult.IsNull())
- {
- AsyncRequest.WriteResponse(HttpResponseCode::BadRequest);
- return;
- }
- if (AcceptMagic == kCbPkgMagic)
- {
- bool AllowFileReferences = EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences);
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(
- RpcResult,
- AllowFileReferences ? FormatFlags::kAllowLocalReferences | FormatFlags::kDenyPartialLocalReferences
- : FormatFlags::kDefault);
- if (RequestIndex != ~0ull)
+ Request.WriteResponseAsync(
+ [this, Body = Request.ReadPayload(), ContentType, AcceptType](HttpServerRequest& AsyncRequest) mutable {
+ std::uint64_t RequestIndex =
+ m_RequestRecorder ? m_RequestRecorder->RecordRequest(ContentType, AcceptType, Body) : ~0ull;
+ uint32_t AcceptMagic = 0;
+ RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
+ CbPackage RpcResult = HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags);
+ if (RpcResult.IsNull())
{
- ZEN_ASSERT(m_RequestRecorder);
- m_RequestRecorder->IRequestRecorder_RecordResponse(RequestIndex,
- HttpContentType::kCbPackage,
- RpcResponseBuffer);
+ AsyncRequest.WriteResponse(HttpResponseCode::BadRequest);
+ return;
}
- AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
- }
- else
- {
- BinaryWriter MemStream;
- RpcResult.Save(MemStream);
-
- if (RequestIndex != ~0ull)
+ if (AcceptMagic == kCbPkgMagic)
{
- ZEN_ASSERT(m_RequestRecorder);
- m_RequestRecorder->IRequestRecorder_RecordResponse(
- RequestIndex,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ bool AllowFileReferences = EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences);
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(
+ RpcResult,
+ AllowFileReferences ? FormatFlags::kAllowLocalReferences | FormatFlags::kDenyPartialLocalReferences
+ : FormatFlags::kDefault);
+ if (RequestIndex != ~0ull)
+ {
+ ZEN_ASSERT(m_RequestRecorder);
+ m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer);
+ }
+ AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
}
- AsyncRequest.WriteResponse(HttpResponseCode::OK,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
- }
- });
+ else
+ {
+ BinaryWriter MemStream;
+ RpcResult.Save(MemStream);
+
+ if (RequestIndex != ~0ull)
+ {
+ ZEN_ASSERT(m_RequestRecorder);
+ m_RequestRecorder->RecordResponse(RequestIndex,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ }
+ AsyncRequest.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ }
+ });
}
break;
default: