aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-12-07 10:55:57 +0100
committerGitHub <[email protected]>2022-12-07 01:55:57 -0800
commit10c141fece26f9946595028afb069cbee1502067 (patch)
tree1513b9e704db223803f8181400946e70fd7a9241
parentUse Iso8601 format for logging start and end message (#202) (diff)
downloadzen-10c141fece26f9946595028afb069cbee1502067.tar.xz
zen-10c141fece26f9946595028afb069cbee1502067.zip
Cache request record/replay (#198)
This adds recording and playback of cache request with full data - both get and put operations can be replayed. Invoke via web request. `<host>/z$/exec$/start-recording?<disk-storage-path>` `<host>/z$/exec$/stop-recording` `<host>/z$/exec$/replay-recording?<thread-count>&<disk-storage-path>`
-rw-r--r--CHANGELOG.md4
-rw-r--r--zencore/filesystem.cpp26
-rw-r--r--zencore/include/zencore/thread.h9
-rw-r--r--zenserver/cache/structuredcache.cpp570
-rw-r--r--zenserver/cache/structuredcache.h67
-rw-r--r--zenserver/cache/structuredcachestore.cpp33
6 files changed, 490 insertions, 219 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 66cae5602..cb083345c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,8 @@
##
+- Feature: Recording and playback of cache request with full data - both get and put operations can be replayed. Invoke via web request.
+ - `<host>/z$/exec$/start-recording?<disk-storage-path>`
+ - `<host>/z$/exec$/stop-recording`
+ - `<host>/z$/exec$/replay-recording?<thread-count>&<disk-storage-path>`
- Feature: Disk size triggered GC, a soft disk usage limit for cache data.
- Feature: New option `--gc-disk-size-soft-limit` (command line), `gc.cache.disksizesoftlimit` (lua config) controlling limit for soft disk usage limit. Defaults to zero which disables soft disk usage limit.
- Improvement: Disk write pressure in GC log and cleaned up clutter in GC logging.
diff --git a/zencore/filesystem.cpp b/zencore/filesystem.cpp
index 5e376ffda..0aa478404 100644
--- a/zencore/filesystem.cpp
+++ b/zencore/filesystem.cpp
@@ -947,7 +947,28 @@ PathFromHandle(void* NativeHandle)
return std::filesystem::path();
}
- const DWORD RequiredLengthIncludingNul = GetFinalPathNameByHandleW(NativeHandle, nullptr, 0, FILE_NAME_OPENED);
+ auto GetFinalPathNameByHandleWRetry = [](HANDLE hFile, LPWSTR lpszFilePath, DWORD cchFilePath, DWORD dwFlags) -> DWORD {
+ while (true)
+ {
+ DWORD Res = GetFinalPathNameByHandleW(hFile, lpszFilePath, cchFilePath, dwFlags);
+ if (Res == 0)
+ {
+ DWORD LastError = zen::GetLastError();
+ // Under heavy concurrent loads we might get access denied on a file handle while trying to get path name.
+ // Retry if that is the case.
+ if (LastError != ERROR_ACCESS_DENIED)
+ {
+ ThrowSystemError(LastError, fmt::format("failed to get path from file handle {}", hFile));
+ }
+ // Retry
+ continue;
+ }
+ ZEN_ASSERT(Res != 1); // We don't accept empty path names
+ return Res;
+ }
+ };
+
+ DWORD RequiredLengthIncludingNul = GetFinalPathNameByHandleWRetry(NativeHandle, nullptr, 0, FILE_NAME_OPENED);
if (RequiredLengthIncludingNul == 0)
{
ThrowLastError(fmt::format("failed to get path from file handle {}", NativeHandle));
@@ -956,8 +977,7 @@ PathFromHandle(void* NativeHandle)
std::wstring FullPath;
FullPath.resize(RequiredLengthIncludingNul - 1);
- const DWORD FinalLength = GetFinalPathNameByHandleW(NativeHandle, FullPath.data(), RequiredLengthIncludingNul, FILE_NAME_OPENED);
-
+ const DWORD FinalLength = GetFinalPathNameByHandleWRetry(NativeHandle, FullPath.data(), RequiredLengthIncludingNul, FILE_NAME_OPENED);
ZEN_UNUSED(FinalLength);
return FullPath;
diff --git a/zencore/include/zencore/thread.h b/zencore/include/zencore/thread.h
index 3c1821a62..2aad22061 100644
--- a/zencore/include/zencore/thread.h
+++ b/zencore/include/zencore/thread.h
@@ -161,13 +161,16 @@ public:
}
}
- void Wait()
+ std::ptrdiff_t Remaining() const { return Counter.load(); }
+
+ bool Wait(int TimeoutMs = -1)
{
std::ptrdiff_t Old = Counter.load();
- if (Old != 0)
+ if (Old == 0)
{
- Complete.Wait();
+ return true;
}
+ return Complete.Wait(TimeoutMs);
}
private:
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 0df7472ac..baaf94dd0 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -14,8 +14,10 @@
#include <zencore/stream.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
+#include <zencore/workthreadpool.h>
#include <zenhttp/httpserver.h>
#include <zenhttp/httpshared.h>
+#include <zenutil/basicfile.h>
#include <zenutil/cache/cache.h>
//#include "cachekey.h"
@@ -44,6 +46,183 @@ namespace zen {
using namespace std::literals;
+namespace cache::detail {
+ struct RecordedRequest
+ {
+ uint64_t Offset;
+ uint64_t Length;
+ ZenContentType ContentType;
+ };
+
+ const uint64_t RecordedRequestBlockSize = 1ull << 31u;
+
+ struct RecordedRequestsWriter
+ {
+ void BeginWrite(const std::filesystem::path& BasePath)
+ {
+ m_BasePath = BasePath;
+ std::filesystem::create_directories(m_BasePath);
+ }
+
+ void EndWrite()
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_BlockFiles.clear();
+
+ IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest));
+ BasicFile IndexFile;
+ IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate);
+ std::error_code Ec;
+ IndexFile.WriteAll(IndexBuffer, Ec);
+ IndexFile.Close();
+ m_Entries.clear();
+ }
+
+ uint64_t WriteRequest(ZenContentType ContentType, const IoBuffer& RequestBuffer)
+ {
+ RwLock::ExclusiveLockScope Lock(m_Lock);
+ uint64_t RequestIndex = m_Entries.size();
+ RecordedRequest& Entry =
+ m_Entries.emplace_back(RecordedRequest{.Offset = ~0ull, .Length = RequestBuffer.Size(), .ContentType = ContentType});
+ if (Entry.Length < 1 * 1024 * 1024)
+ {
+ uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize);
+ if (BlockIndex == m_BlockFiles.size())
+ {
+ std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>());
+ NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate);
+ m_ChunkOffset = BlockIndex * RecordedRequestBlockSize;
+ }
+ ZEN_ASSERT(BlockIndex < m_BlockFiles.size());
+ BasicFile* BlockFile = m_BlockFiles[BlockIndex].get();
+ ZEN_ASSERT(BlockFile != nullptr);
+
+ Entry.Offset = m_ChunkOffset;
+ m_ChunkOffset = RoundUp(m_ChunkOffset + Entry.Length, 1u << 4u);
+ Lock.ReleaseNow();
+
+ std::error_code Ec;
+ BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), Entry.Offset - BlockIndex * RecordedRequestBlockSize, Ec);
+ if (Ec)
+ {
+ Entry.Length = 0;
+ return ~0ull;
+ }
+ return RequestIndex;
+ }
+ Lock.ReleaseNow();
+
+ BasicFile RequestFile;
+ RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate);
+ std::error_code Ec;
+ RequestFile.WriteAll(RequestBuffer, Ec);
+ if (Ec)
+ {
+ Entry.Length = 0;
+ return ~0ull;
+ }
+ return RequestIndex;
+ }
+
+ std::filesystem::path m_BasePath;
+ mutable RwLock m_Lock;
+ std::vector<RecordedRequest> m_Entries;
+ std::vector<std::unique_ptr<BasicFile>> m_BlockFiles;
+ uint64_t m_ChunkOffset;
+ };
+
+ struct RecordedRequestsReader
+ {
+ uint64_t BeginRead(const std::filesystem::path& BasePath)
+ {
+ m_BasePath = BasePath;
+ BasicFile IndexFile;
+ IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kRead);
+ m_Entries.resize(IndexFile.FileSize() / sizeof(RecordedRequest));
+ IndexFile.Read(m_Entries.data(), IndexFile.FileSize(), 0);
+ uint64_t MaxChunkPosition = 0;
+ for (const RecordedRequest& R : m_Entries)
+ {
+ if (R.Offset != ~0ull)
+ {
+ MaxChunkPosition = Max(MaxChunkPosition, R.Offset + R.Length);
+ }
+ }
+ uint32_t BlockCount = gsl::narrow<uint32_t>(MaxChunkPosition / RecordedRequestBlockSize) + 1;
+ m_IoBuffers.resize(BlockCount);
+ for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; ++BlockIndex)
+ {
+ m_IoBuffers[BlockIndex] = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("chunks{}.bin", BlockIndex));
+ }
+ return m_Entries.size();
+ }
+ void EndRead() { m_IoBuffers.clear(); }
+
+ ZenContentType ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const
+ {
+ if (RequestIndex >= m_Entries.size())
+ {
+ return ZenContentType::kUnknownContentType;
+ }
+ const RecordedRequest& Entry = m_Entries[RequestIndex];
+ if (Entry.Length == 0)
+ {
+ return ZenContentType::kUnknownContentType;
+ }
+ if (Entry.Offset != ~0ull)
+ {
+ uint32_t BlockIndex = gsl::narrow<uint32_t>((Entry.Offset + Entry.Length) / RecordedRequestBlockSize);
+ uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize);
+ OutBuffer = IoBuffer(m_IoBuffers[BlockIndex], ChunkOffset, Entry.Length);
+ return Entry.ContentType;
+ }
+ OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex));
+ return Entry.ContentType;
+ }
+
+ std::filesystem::path m_BasePath;
+ std::vector<RecordedRequest> m_Entries;
+ std::vector<std::unique_ptr<BasicFile>> m_BlockFiles;
+ std::vector<IoBuffer> m_IoBuffers;
+ };
+
+ class DiskRequestRecorder : public IRequestRecorder
+ {
+ public:
+ DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); }
+ virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); }
+
+ private:
+ virtual uint64_t IRequestRecorder_RecordRequest(ZenContentType ContentType, const IoBuffer& RequestBuffer) override
+ {
+ return m_RecordedRequests.WriteRequest(ContentType, RequestBuffer);
+ }
+ virtual void IRequestRecorder_RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {}
+ virtual void IRequestRecorder_RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {}
+ RecordedRequestsWriter m_RecordedRequests;
+ };
+
+ class DiskRequestReplayer : public IRequestReplayer
+ {
+ public:
+ DiskRequestReplayer(const std::filesystem::path& BasePath) { m_RequestCount = m_RequestBuffer.BeginRead(BasePath); }
+ virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); }
+
+ private:
+ virtual uint64_t IRequestReplayer_GetRequestCount() const override { return m_RequestCount; }
+
+ virtual ZenContentType IRequestReplayer_GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override
+ {
+ return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer);
+ }
+ virtual ZenContentType IRequestRecorder_GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; }
+
+ std::uint64_t m_RequestCount;
+ RecordedRequestsReader m_RequestBuffer;
+ };
+
+} // namespace cache::detail
+
//////////////////////////////////////////////////////////////////////////
CachePolicy
@@ -77,7 +256,10 @@ struct PutRequestData
};
namespace {
- static constinit std::string_view HttpZCacheRPCPrefix = "$rpc"sv;
+ static constinit std::string_view HttpZCacheRPCPrefix = "$rpc"sv;
+ static constinit std::string_view HttpZCacheUtilStartRecording = "exec$/start-recording"sv;
+ static constinit std::string_view HttpZCacheUtilStopRecording = "exec$/stop-recording"sv;
+ static constinit std::string_view HttpZCacheUtilReplayRecording = "exec$/replay-recording"sv;
struct HttpRequestData
{
@@ -326,6 +508,7 @@ HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheS
HttpStructuredCacheService::~HttpStructuredCacheService()
{
ZEN_INFO("closing structured cache");
+ m_RequestRecorder.reset();
m_StatsService.UnregisterHandler("z$", *this);
m_StatusService.UnregisterHandler("z$", *this);
@@ -367,6 +550,40 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
return HandleRpcRequest(Request);
}
+ if (Key == HttpZCacheUtilStartRecording)
+ {
+ m_RequestRecorder.reset();
+ HttpServerRequest::QueryParams Params = Request.GetQueryParams();
+ std::string_view RecordPath = Params.GetValue("path");
+ m_RequestRecorder = std::make_unique<cache::detail::DiskRequestRecorder>(RecordPath);
+ Request.WriteResponse(HttpResponseCode::OK);
+ return;
+ }
+ if (Key == HttpZCacheUtilStopRecording)
+ {
+ m_RequestRecorder.reset();
+ Request.WriteResponse(HttpResponseCode::OK);
+ return;
+ }
+ if (Key == HttpZCacheUtilReplayRecording)
+ {
+ m_RequestRecorder.reset();
+ HttpServerRequest::QueryParams Params = Request.GetQueryParams();
+ std::string_view RecordPath = Params.GetValue("path");
+ uint32_t ThreadCount = std::thread::hardware_concurrency();
+ if (auto Param = Params.GetValue("thread_count"); Param.empty() == false)
+ {
+ if (auto Value = ParseInt<uint64_t>(Param))
+ {
+ ThreadCount = gsl::narrow<uint32_t>(Value.value());
+ }
+ }
+ cache::detail::DiskRequestReplayer Replayer(RecordPath);
+ ReplayRequestRecorder(Replayer, ThreadCount < 1 ? 1 : ThreadCount);
+ Request.WriteResponse(HttpResponseCode::OK);
+ return;
+ }
+
HttpRequestData RequestData;
if (!HttpRequestParseRelativeUri(Key, RequestData))
{
@@ -1126,6 +1343,101 @@ HttpStructuredCacheService::HandlePutCacheChunk(zen::HttpServerRequest& Request,
Request.WriteResponse(ResponseCode);
}
+CbPackage
+HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType,
+ IoBuffer&& Body,
+ uint32_t& OutAcceptMagic,
+ RpcAcceptOptions& OutAcceptFlags)
+{
+ CbPackage Package;
+ CbObjectView Object;
+ CbObject ObjectBuffer;
+ if (ContentType == ZenContentType::kCbObject)
+ {
+ ObjectBuffer = zen::LoadCompactBinaryObject(std::move(Body));
+ Object = ObjectBuffer;
+ }
+ else
+ {
+ Package = ParsePackageMessage(Body);
+ Object = Package.GetObject();
+ }
+ OutAcceptMagic = Object["Accept"sv].AsUInt32();
+ OutAcceptFlags = static_cast<RpcAcceptOptions>(Object["AcceptFlags"sv].AsUInt16(0u));
+
+ const std::string_view Method = Object["Method"sv].AsString();
+
+ if (Method == "PutCacheRecords"sv)
+ {
+ return HandleRpcPutCacheRecords(Package);
+ }
+ else if (Method == "GetCacheRecords"sv)
+ {
+ return HandleRpcGetCacheRecords(Object);
+ }
+ else if (Method == "PutCacheValues"sv)
+ {
+ return HandleRpcPutCacheValues(Package);
+ }
+ else if (Method == "GetCacheValues"sv)
+ {
+ return HandleRpcGetCacheValues(Object);
+ }
+ else if (Method == "GetCacheChunks"sv)
+ {
+ return HandleRpcGetCacheChunks(Object);
+ }
+ return CbPackage{};
+}
+
+void
+HttpStructuredCacheService::ReplayRequestRecorder(cache::detail::IRequestReplayer& Replayer, uint32_t ThreadCount)
+{
+ WorkerThreadPool WorkerPool(ThreadCount);
+ uint64_t RequestCount = Replayer.IRequestReplayer_GetRequestCount();
+ Stopwatch Timer;
+ auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); });
+ Latch JobLatch(RequestCount);
+ ZEN_INFO("Replaying {} requests", RequestCount);
+ for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex)
+ {
+ WorkerPool.ScheduleWork([this, &JobLatch, &Replayer, RequestIndex]() {
+ IoBuffer Body;
+ ZenContentType ContentType = Replayer.IRequestReplayer_GetRequest(RequestIndex, Body);
+ if (Body)
+ {
+ uint32_t AcceptMagic = 0;
+ RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
+ CbPackage RpcResult = HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags);
+ if (AcceptMagic == kCbPkgMagic)
+ {
+ bool AllowFileReferences = EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences);
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(
+ RpcResult,
+ AllowFileReferences ? FormatFlags::kAllowLocalReferences | FormatFlags::kDenyPartialLocalReferences
+ : FormatFlags::kDefault);
+ ZEN_ASSERT(RpcResponseBuffer.GetSize() > 0);
+ }
+ else
+ {
+ BinaryWriter MemStream;
+ RpcResult.Save(MemStream);
+ IoBuffer RpcResponseBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize());
+ ZEN_ASSERT(RpcResponseBuffer.Size() > 0);
+ }
+ }
+ JobLatch.CountDown();
+ });
+ }
+ while (!JobLatch.Wait(10000))
+ {
+ ZEN_INFO("Replayed {} of {} requests, elapsed {}",
+ RequestCount - JobLatch.Remaining(),
+ RequestCount,
+ NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
+ }
+}
+
void
HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request)
{
@@ -1143,43 +1455,48 @@ HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request)
}
Request.WriteResponseAsync([this, Body = Request.ReadPayload(), ContentType](HttpServerRequest& AsyncRequest) mutable {
- CbPackage Package;
- CbObjectView Object;
- CbObject ObjectBuffer;
- if (ContentType == HttpContentType::kCbObject)
- {
- ObjectBuffer = zen::LoadCompactBinaryObject(std::move(Body));
- Object = ObjectBuffer;
- }
- else
- {
- Package = ParsePackageMessage(Body);
- Object = Package.GetObject();
- }
- const std::string_view Method = Object["Method"sv].AsString();
- if (Method == "PutCacheRecords"sv)
- {
- HandleRpcPutCacheRecords(AsyncRequest, Package);
- }
- else if (Method == "GetCacheRecords"sv)
- {
- HandleRpcGetCacheRecords(AsyncRequest, Object);
- }
- else if (Method == "PutCacheValues"sv)
- {
- HandleRpcPutCacheValues(AsyncRequest, Package);
- }
- else if (Method == "GetCacheValues"sv)
+ std::uint64_t RequestIndex =
+ m_RequestRecorder ? m_RequestRecorder->IRequestRecorder_RecordRequest(ContentType, Body) : ~0ull;
+ uint32_t AcceptMagic = 0;
+ RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
+ CbPackage RpcResult = HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags);
+ if (RpcResult.IsNull())
{
- HandleRpcGetCacheValues(AsyncRequest, Object);
+ AsyncRequest.WriteResponse(HttpResponseCode::BadRequest);
+ return;
}
- else if (Method == "GetCacheChunks"sv)
+ if (AcceptMagic == kCbPkgMagic)
{
- HandleRpcGetCacheChunks(AsyncRequest, Object);
+ bool AllowFileReferences = EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences);
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(
+ RpcResult,
+ AllowFileReferences ? FormatFlags::kAllowLocalReferences | FormatFlags::kDenyPartialLocalReferences
+ : FormatFlags::kDefault);
+ if (RequestIndex != ~0ull)
+ {
+ ZEN_ASSERT(m_RequestRecorder);
+ m_RequestRecorder->IRequestRecorder_RecordResponse(RequestIndex,
+ HttpContentType::kCbPackage,
+ RpcResponseBuffer);
+ }
+ AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
}
else
{
- AsyncRequest.WriteResponse(HttpResponseCode::BadRequest);
+ BinaryWriter MemStream;
+ RpcResult.Save(MemStream);
+
+ if (RequestIndex != ~0ull)
+ {
+ ZEN_ASSERT(m_RequestRecorder);
+ m_RequestRecorder->IRequestRecorder_RecordResponse(
+ RequestIndex,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ }
+ AsyncRequest.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
}
});
}
@@ -1190,15 +1507,13 @@ HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request)
}
}
-void
-HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Request, const CbPackage& BatchRequest)
+CbPackage
+HttpStructuredCacheService::HandleRpcPutCacheRecords(const CbPackage& BatchRequest)
{
ZEN_TRACE_CPU("Z$::RpcPutCacheRecords");
CbObjectView BatchObject = BatchRequest.GetObject();
ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheRecords"sv);
- uint32_t AcceptMagic = BatchObject["Accept"sv].AsUInt32();
-
CbObjectView Params = BatchObject["Params"sv].AsObjectView();
CachePolicy DefaultPolicy;
@@ -1206,7 +1521,7 @@ HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Req
std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
if (!Namespace)
{
- return Request.WriteResponse(HttpResponseCode::BadRequest);
+ return CbPackage{};
}
DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
std::vector<bool> Results;
@@ -1219,7 +1534,7 @@ HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Req
CacheKey Key;
if (!GetRpcRequestCacheKey(KeyView, Key))
{
- return Request.WriteResponse(HttpResponseCode::BadRequest);
+ return CbPackage{};
}
CacheRecordPolicy Policy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy);
PutRequestData PutRequest{*Namespace, std::move(Key), RecordObject, std::move(Policy)};
@@ -1228,13 +1543,13 @@ HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Req
if (Result == PutResult::Invalid)
{
- return Request.WriteResponse(HttpResponseCode::BadRequest);
+ return CbPackage{};
}
Results.push_back(Result == PutResult::Success);
}
if (Results.empty())
{
- return Request.WriteResponse(HttpResponseCode::BadRequest);
+ return CbPackage{};
}
CbObjectWriter ResponseObject;
@@ -1247,21 +1562,7 @@ HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Req
CbPackage RpcResponse;
RpcResponse.SetObject(ResponseObject.Save());
-
- if (AcceptMagic == kCbPkgMagic)
- {
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse, FormatFlags::kDefault);
- Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
- }
- else
- {
- BinaryWriter MemStream;
- RpcResponse.Save(MemStream);
-
- Request.WriteResponse(HttpResponseCode::OK,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
- }
+ return RpcResponse;
}
HttpStructuredCacheService::PutResult
@@ -1354,17 +1655,13 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack
return PutResult::Success;
}
-void
-HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest)
+CbPackage
+HttpStructuredCacheService::HandleRpcGetCacheRecords(CbObjectView RpcRequest)
{
ZEN_TRACE_CPU("Z$::RpcGetCacheRecords");
ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv);
- uint32_t AcceptMagic = RpcRequest["Accept"sv].AsUInt32();
- RpcAcceptOptions AcceptFlags = static_cast<RpcAcceptOptions>(RpcRequest["AcceptFlags"sv].AsUInt16(0u));
- bool AllowFileReferences = EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences);
-
CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
struct ValueRequestData
@@ -1393,7 +1690,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt
std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
if (!Namespace)
{
- return HttpRequest.WriteResponse(HttpResponseCode::BadRequest);
+ return CbPackage{};
}
std::vector<RecordRequestData> Requests;
std::vector<size_t> UpstreamIndexes;
@@ -1427,7 +1724,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt
CacheKey& Key = Request.Upstream.Key;
if (!GetRpcRequestCacheKey(KeyObject, Key))
{
- return HttpRequest.WriteResponse(HttpResponseCode::BadRequest);
+ return CbPackage{};
}
Request.DownstreamPolicy = LoadCacheRecordPolicy(RequestObject["Policy"sv].AsObjectView(), DefaultPolicy);
@@ -1520,7 +1817,7 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt
}
if (Requests.empty())
{
- return HttpRequest.WriteResponse(HttpResponseCode::BadRequest);
+ return CbPackage{};
}
if (!UpstreamIndexes.empty())
@@ -1693,42 +1990,21 @@ HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Htt
}
ResponseObject.EndArray();
ResponsePackage.SetObject(ResponseObject.Save());
-
- if (AcceptMagic == kCbPkgMagic)
- {
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(
- ResponsePackage,
- AllowFileReferences ? FormatFlags::kAllowLocalReferences | FormatFlags::kDenyPartialLocalReferences : FormatFlags::kDefault);
- HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
- }
- else
- {
- BinaryWriter MemStream;
- ResponsePackage.Save(MemStream);
-
- HttpRequest.WriteResponse(HttpResponseCode::OK,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
- }
+ return ResponsePackage;
}
-void
-HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Request, const CbPackage& BatchRequest)
+CbPackage
+HttpStructuredCacheService::HandleRpcPutCacheValues(const CbPackage& BatchRequest)
{
- ZEN_TRACE_CPU("Z$::RpcPutCacheValues");
CbObjectView BatchObject = BatchRequest.GetObject();
- ZEN_ASSERT(BatchObject["Method"sv].AsString() == "PutCacheValues"sv);
-
- uint32_t AcceptMagic = BatchObject["Accept"sv].AsUInt32();
-
- CbObjectView Params = BatchObject["Params"sv].AsObjectView();
+ CbObjectView Params = BatchObject["Params"sv].AsObjectView();
std::string_view PolicyText = Params["DefaultPolicy"].AsString();
CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
if (!Namespace)
{
- return Request.WriteResponse(HttpResponseCode::BadRequest);
+ return CbPackage{};
}
std::vector<bool> Results;
for (CbFieldView RequestField : Params["Requests"sv])
@@ -1741,7 +2017,7 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ
CacheKey Key;
if (!GetRpcRequestCacheKey(KeyView, Key))
{
- return Request.WriteResponse(HttpResponseCode::BadRequest);
+ return CbPackage{};
}
PolicyText = RequestObject["Policy"sv].AsString();
@@ -1774,7 +2050,7 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ
else
{
ZEN_WARN("PUTCACHEVALUES - '{}/{}/{}/{}' FAILED, value is not compressed", *Namespace, Key.Bucket, Key.Hash, RawHash);
- return Request.WriteResponse(HttpResponseCode::BadRequest);
+ return CbPackage{};
}
}
else if (EnumHasAllFlags(Policy, CachePolicy::QueryLocal))
@@ -1804,7 +2080,7 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ
}
if (Results.empty())
{
- return Request.WriteResponse(HttpResponseCode::BadRequest);
+ return CbPackage{};
}
CbObjectWriter ResponseObject;
@@ -1818,40 +2094,21 @@ HttpStructuredCacheService::HandleRpcPutCacheValues(zen::HttpServerRequest& Requ
CbPackage RpcResponse;
RpcResponse.SetObject(ResponseObject.Save());
- if (AcceptMagic == kCbPkgMagic)
- {
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResponse, FormatFlags::kDefault);
- Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
- }
- else
- {
- BinaryWriter MemStream;
- RpcResponse.Save(MemStream);
-
- Request.WriteResponse(HttpResponseCode::OK,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
- }
+ return RpcResponse;
}
-void
-HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest)
+CbPackage
+HttpStructuredCacheService::HandleRpcGetCacheValues(CbObjectView RpcRequest)
{
- ZEN_TRACE_CPU("Z$::RpcGetCacheValues");
-
ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheValues"sv);
- uint32_t AcceptMagic = RpcRequest["Accept"sv].AsUInt32();
- RpcAcceptOptions AcceptFlags = static_cast<RpcAcceptOptions>(RpcRequest["AcceptFlags"sv].AsUInt16(0u));
- bool AllowFileReferences = EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences);
-
CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
std::string_view PolicyText = Params["DefaultPolicy"sv].AsString();
CachePolicy DefaultPolicy = !PolicyText.empty() ? ParseCachePolicy(PolicyText) : CachePolicy::Default;
std::optional<std::string> Namespace = GetRpcRequestNamespace(Params);
if (!Namespace)
{
- return HttpRequest.WriteResponse(HttpResponseCode::BadRequest);
+ return CbPackage{};
}
struct RequestData
@@ -1876,7 +2133,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
if (!GetRpcRequestCacheKey(KeyObject, Request.Key))
{
- return HttpRequest.WriteResponse(HttpResponseCode::BadRequest);
+ return CbPackage{};
}
PolicyText = RequestObject["Policy"sv].AsString();
@@ -1991,7 +2248,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
if (Requests.empty())
{
- return HttpRequest.WriteResponse(HttpResponseCode::BadRequest);
+ return CbPackage{};
}
CbPackage RpcResponse;
@@ -2026,23 +2283,7 @@ HttpStructuredCacheService::HandleRpcGetCacheValues(zen::HttpServerRequest& Http
ResponseObject.EndArray();
RpcResponse.SetObject(ResponseObject.Save());
-
- if (AcceptMagic == kCbPkgMagic)
- {
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(
- RpcResponse,
- AllowFileReferences ? FormatFlags::kAllowLocalReferences | FormatFlags::kDenyPartialLocalReferences : FormatFlags::kDefault);
- HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
- }
- else
- {
- BinaryWriter MemStream;
- RpcResponse.Save(MemStream);
-
- HttpRequest.WriteResponse(HttpResponseCode::OK,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
- }
+ return RpcResponse;
}
namespace cache::detail {
@@ -2081,13 +2322,11 @@ namespace cache::detail {
} // namespace cache::detail
-void
-HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& HttpRequest, CbObjectView RpcRequest)
+CbPackage
+HttpStructuredCacheService::HandleRpcGetCacheChunks(CbObjectView RpcRequest)
{
using namespace cache::detail;
- ZEN_TRACE_CPU("Z$::RpcGetCacheChunks");
-
std::string Namespace;
std::vector<CacheKeyRequest> RecordKeys; // Data about a Record necessary to identify it to the upstream
std::vector<RecordBody> Records; // Scratch-space data about a Record when fulfilling RecordRequests
@@ -2096,22 +2335,11 @@ HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& Http
std::vector<ChunkRequest*> RecordRequests; // The ChunkRequests that are requesting a subvalue from a Record Key
std::vector<ChunkRequest*> ValueRequests; // The ChunkRequests that are requesting a Value Key
std::vector<CacheChunkRequest*> UpstreamChunks; // ChunkRequests that we need to send to the upstream
- uint32_t AcceptMagic = 0;
- uint16_t AcceptFlags = 0;
// Parse requests from the CompactBinary body of the RpcRequest and divide it into RecordRequests and ValueRequests
- if (!ParseGetCacheChunksRequest(AcceptMagic,
- AcceptFlags,
- Namespace,
- RecordKeys,
- Records,
- RequestKeys,
- Requests,
- RecordRequests,
- ValueRequests,
- RpcRequest))
+ if (!ParseGetCacheChunksRequest(Namespace, RecordKeys, Records, RequestKeys, Requests, RecordRequests, ValueRequests, RpcRequest))
{
- return HttpRequest.WriteResponse(HttpResponseCode::BadRequest);
+ return CbPackage{};
}
// For each Record request, load the Record if necessary to find the Chunk's ContentId, load its Payloads if we
@@ -2125,13 +2353,11 @@ HttpStructuredCacheService::HandleRpcGetCacheChunks(zen::HttpServerRequest& Http
GetUpstreamCacheChunks(Namespace, UpstreamChunks, RequestKeys, Requests);
// Send the payload and descriptive data about each chunk to the client
- WriteGetCacheChunksResponse(AcceptMagic, AcceptFlags, Namespace, Requests, HttpRequest);
+ return WriteGetCacheChunksResponse(Namespace, Requests);
}
bool
-HttpStructuredCacheService::ParseGetCacheChunksRequest(uint32_t& AcceptMagic,
- uint16_t& AcceptFlags,
- std::string& Namespace,
+HttpStructuredCacheService::ParseGetCacheChunksRequest(std::string& Namespace,
std::vector<CacheKeyRequest>& RecordKeys,
std::vector<cache::detail::RecordBody>& Records,
std::vector<CacheChunkRequest>& RequestKeys,
@@ -2144,9 +2370,6 @@ HttpStructuredCacheService::ParseGetCacheChunksRequest(uint32_t& Accept
ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheChunks"sv);
- AcceptMagic = RpcRequest["Accept"sv].AsUInt32();
- AcceptFlags = RpcRequest["AcceptFlags"sv].AsUInt16(0u);
-
CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
std::string_view DefaultPolicyText = Params["DefaultPolicy"sv].AsString();
CachePolicy DefaultPolicy = !DefaultPolicyText.empty() ? ParseCachePolicy(DefaultPolicyText) : CachePolicy::Default;
@@ -2496,18 +2719,11 @@ HttpStructuredCacheService::GetUpstreamCacheChunks(std::string_view Names
}
}
-void
-HttpStructuredCacheService::WriteGetCacheChunksResponse(uint32_t AcceptMagic,
- uint16_t AcceptFlags,
- std::string_view Namespace,
- std::vector<cache::detail::ChunkRequest>& Requests,
- zen::HttpServerRequest& HttpRequest)
+CbPackage
+HttpStructuredCacheService::WriteGetCacheChunksResponse(std::string_view Namespace, std::vector<cache::detail::ChunkRequest>& Requests)
{
using namespace cache::detail;
- RpcAcceptOptions AcceptOptions = static_cast<RpcAcceptOptions>(AcceptFlags);
- bool AllowFileReferences = EnumHasAllFlags(AcceptOptions, RpcAcceptOptions::kAllowLocalReferences);
-
CbPackage RpcResponse;
CbObjectWriter Writer;
@@ -2564,23 +2780,7 @@ HttpStructuredCacheService::WriteGetCacheChunksResponse(uint32_t Accept
Writer.EndArray();
RpcResponse.SetObject(Writer.Save());
-
- if (AcceptMagic == kCbPkgMagic)
- {
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(
- RpcResponse,
- AllowFileReferences ? FormatFlags::kAllowLocalReferences | FormatFlags::kDenyPartialLocalReferences : FormatFlags::kDefault);
- HttpRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
- }
- else
- {
- BinaryWriter MemStream;
- RpcResponse.Save(MemStream);
-
- HttpRequest.WriteResponse(HttpResponseCode::OK,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
- }
+ return RpcResponse;
}
void
diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h
index a74d6b7a6..a2a4a940c 100644
--- a/zenserver/cache/structuredcache.h
+++ b/zenserver/cache/structuredcache.h
@@ -26,10 +26,33 @@ class ScrubContext;
class UpstreamCache;
class ZenCacheStore;
enum class CachePolicy : uint32_t;
+enum class RpcAcceptOptions : uint16_t;
namespace cache::detail {
struct RecordBody;
struct ChunkRequest;
+ struct RecordedRequests;
+
+ class IRequestRecorder
+ {
+ public:
+ virtual ~IRequestRecorder() {}
+ virtual uint64_t IRequestRecorder_RecordRequest(const ZenContentType ContentType, const IoBuffer& RequestBuffer) = 0;
+ virtual void IRequestRecorder_RecordResponse(uint64_t RequestIndex,
+ const ZenContentType ContentType,
+ const IoBuffer& ResponseBuffer) = 0;
+ virtual void IRequestRecorder_RecordResponse(uint64_t RequestIndex,
+ const ZenContentType ContentType,
+ const CompositeBuffer& ResponseBuffer) = 0;
+ };
+ class IRequestReplayer
+ {
+ public:
+ virtual ~IRequestReplayer() {}
+ virtual uint64_t IRequestReplayer_GetRequestCount() const = 0;
+ virtual ZenContentType IRequestReplayer_GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) = 0;
+ virtual ZenContentType IRequestRecorder_GetResponse(uint64_t RequestIndex, IoBuffer& OutBuffer) = 0;
+ };
} // namespace cache::detail
/**
@@ -98,18 +121,24 @@ private:
Invalid,
};
- void HandleCacheRecordRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
- void HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
- void HandlePutCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
- void HandleCacheChunkRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
- void HandleGetCacheChunk(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
- void HandlePutCacheChunk(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
- void HandleRpcRequest(zen::HttpServerRequest& Request);
- void HandleRpcPutCacheRecords(zen::HttpServerRequest& Request, const CbPackage& BatchRequest);
- void HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
- void HandleRpcPutCacheValues(zen::HttpServerRequest& Request, const CbPackage& BatchRequest);
- void HandleRpcGetCacheValues(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
- void HandleRpcGetCacheChunks(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
+ void HandleCacheRecordRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
+ void HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
+ void HandlePutCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
+ void HandleCacheChunkRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
+ void HandleGetCacheChunk(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
+ void HandlePutCacheChunk(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy PolicyFromUrl);
+ void HandleRpcRequest(zen::HttpServerRequest& Request);
+
+ CbPackage HandleRpcPutCacheRecords(const CbPackage& BatchRequest);
+ CbPackage HandleRpcGetCacheRecords(CbObjectView BatchRequest);
+ CbPackage HandleRpcPutCacheValues(const CbPackage& BatchRequest);
+ CbPackage HandleRpcGetCacheValues(CbObjectView BatchRequest);
+ CbPackage HandleRpcGetCacheChunks(CbObjectView BatchRequest);
+ CbPackage HandleRpcRequest(const ZenContentType ContentType,
+ IoBuffer&& Body,
+ uint32_t& OutAcceptMagic,
+ RpcAcceptOptions& OutAcceptFlags);
+
void HandleCacheNamespaceRequest(zen::HttpServerRequest& Request, std::string_view Namespace);
void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Namespace, std::string_view Bucket);
virtual void HandleStatsRequest(zen::HttpServerRequest& Request) override;
@@ -117,9 +146,7 @@ private:
PutResult PutCacheRecord(PutRequestData& Request, const CbPackage* Package);
/** HandleRpcGetCacheChunks Helper: Parse the Body object into RecordValue Requests and Value Requests. */
- bool ParseGetCacheChunksRequest(uint32_t& AcceptMagic,
- uint16_t& AcceptFlags,
- std::string& Namespace,
+ bool ParseGetCacheChunksRequest(std::string& Namespace,
std::vector<CacheKeyRequest>& RecordKeys,
std::vector<cache::detail::RecordBody>& Records,
std::vector<CacheChunkRequest>& RequestKeys,
@@ -143,11 +170,7 @@ private:
std::vector<CacheChunkRequest>& RequestKeys,
std::vector<cache::detail::ChunkRequest>& Requests);
/** HandleRpcGetCacheChunks Helper: Send response message containing all chunk results. */
- void WriteGetCacheChunksResponse(uint32_t AcceptMagic,
- uint16_t AcceptFlags,
- std::string_view Namespace,
- std::vector<cache::detail::ChunkRequest>& Requests,
- zen::HttpServerRequest& HttpRequest);
+ CbPackage WriteGetCacheChunksResponse(std::string_view Namespace, std::vector<cache::detail::ChunkRequest>& Requests);
spdlog::logger& Log() { return m_Log; }
spdlog::logger& m_Log;
@@ -160,6 +183,10 @@ private:
metrics::OperationTiming m_HttpRequests;
metrics::OperationTiming m_UpstreamGetRequestTiming;
CacheStats m_CacheStats;
+
+ void ReplayRequestRecorder(cache::detail::IRequestReplayer& Replayer, uint32_t ThreadCount);
+
+ std::unique_ptr<cache::detail::IRequestRecorder> m_RequestRecorder;
};
/** Recognize both kBinary and kCompressedBinary as kCompressedBinary for structured cache value keys.
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index c20e40655..1f48aaebe 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -1827,7 +1827,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach
if (!Bucket->OpenOrCreate(BucketPath))
{
- m_Buckets.erase(BucketName);
+ m_Buckets.erase(InsertResult.first);
return false;
}
}
@@ -1872,9 +1872,18 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z
std::filesystem::path BucketPath = m_RootDir;
BucketPath /= BucketName;
- if (!Bucket->OpenOrCreate(BucketPath))
+ try
+ {
+ if (!Bucket->OpenOrCreate(BucketPath))
+ {
+ ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir);
+ m_Buckets.erase(InsertResult.first);
+ return;
+ }
+ }
+ catch (const std::exception& Err)
{
- m_Buckets.erase(BucketName);
+ ZEN_ERROR("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what());
return;
}
}
@@ -1897,7 +1906,7 @@ ZenCacheDiskLayer::DiscoverBuckets()
for (const std::filesystem::path& BucketPath : DirContent.Directories)
{
- std::string BucketName = PathToUtf8(BucketPath.stem());
+ const std::string BucketName = PathToUtf8(BucketPath.stem());
// New bucket needs to be created
if (auto It = m_Buckets.find(BucketName); It != m_Buckets.end())
{
@@ -1907,12 +1916,20 @@ ZenCacheDiskLayer::DiscoverBuckets()
auto InsertResult = m_Buckets.emplace(BucketName, std::make_unique<CacheBucket>(BucketName));
CacheBucket& Bucket = *InsertResult.first->second;
- if (!Bucket.OpenOrCreate(BucketPath, /* AllowCreate */ false))
+ try
{
- ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir);
+ if (!Bucket.OpenOrCreate(BucketPath, /* AllowCreate */ false))
+ {
+ ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName, m_RootDir);
- m_Buckets.erase(InsertResult.first);
- continue;
+ m_Buckets.erase(InsertResult.first);
+ continue;
+ }
+ }
+ catch (const std::exception& Err)
+ {
+ ZEN_ERROR("creating bucket '{}' in '{}' FAILED, reason: '{}'", BucketName, BucketPath, Err.what());
+ return;
}
ZEN_INFO("Discovered bucket '{}'", BucketName);
}