aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-02-17 08:20:46 +0000
committerGitHub <[email protected]>2023-02-17 00:20:46 -0800
commit2ec65fd3e171145450416ae76be1a7dd8c646704 (patch)
tree13bdb7e5deb60d4e58be2714cfdcf7f70272bc1e
parentExperimental ObjectStore/CDN like endpoint (diff)
downloadzen-2ec65fd3e171145450416ae76be1a7dd8c646704.tar.xz
zen-2ec65fd3e171145450416ae76be1a7dd8c646704.zip
Enhanced rpc request recording (#229)
* rpc replay zen command * fix replay sessions for thread * recording start/stop as zen commands * move rpcrecording code to zenutil to remove code duplication * simplify recording http request threading * added more data logging to rpc replay * NotFound is an acceptable response for an rpc request * fix rpc replay command line parsing * rpc replay stats * Allow spawning of sub-process workers when replaying rpc recording * changelog
-rw-r--r--CHANGELOG.md10
-rw-r--r--zen/cmds/rpcreplay.cpp258
-rw-r--r--zen/cmds/rpcreplay.h58
-rw-r--r--zen/zen.cpp49
-rw-r--r--zenserver/cache/structuredcache.cpp285
-rw-r--r--zenserver/cache/structuredcache.h38
-rw-r--r--zenserver/projectstore/projectstore.cpp1
-rw-r--r--zenutil/cache/rpcrecording.cpp210
-rw-r--r--zenutil/include/zenutil/cache/rpcrecording.h29
9 files changed, 655 insertions, 283 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 7ac2e6642..1598d4496 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,14 @@
##
+- Feature: Zen command line tool `rpc-record-start` to record all RPC requests to the structured cache
+ - `--path` Recording file path where the rpc requests will be stored
+- Feature: Zen command line tool `rpc-record-stop` stop the currently active RPC request recording started with `rpc-record-start`
+- Feature: Zen command line tool `rpc-record-replay` replacy a RPC request recording created with `rpc-record-start`
+ - `--path` Recording file path where the rpc requests are stored
+ - `--numthreads` Number of worker threads to use while replaying the RPC requests
+ - `--numproc` Number of worker processes to run, if more than one new processes will be spawn with `<numthreads>` workers each
+ - `--offset` Offset into request playback to start at
+ - `--stride` The stride to use when selecting requests to playback
+ - `--onhost` Replay the recording inside the zenserver bypassing http overhead
- Improvement: FileCas now keeps an up to date index of all the entries improving performance when getting cache misses on large payloads
- Changed: Exit with failure code on port conflict rather than reporting crash to Sentry
diff --git a/zen/cmds/rpcreplay.cpp b/zen/cmds/rpcreplay.cpp
new file mode 100644
index 000000000..228a4daa9
--- /dev/null
+++ b/zen/cmds/rpcreplay.cpp
@@ -0,0 +1,258 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "rpcreplay.h"
+
+#include <zencore/filesystem.h>
+#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
+#include <zencore/timer.h>
+#include <zencore/workthreadpool.h>
+#include <zenhttp/httpcommon.h>
+#include <zenutil/cache/rpcrecording.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <cpr/cpr.h>
+#include <fmt/format.h>
+#include <gsl/gsl-lite.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#include <memory>
+
+namespace zen {
+
+RpcStartRecordingCommand::RpcStartRecordingCommand()
+{
+ m_Options.add_options()("h,help", "Print help");
+ m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value("http://localhost:1337"), "<hosturl>");
+ m_Options.add_option("", "p", "path", "Recording file path", cxxopts::value(m_RecordingPath), "<path>");
+
+ m_Options.parse_positional("path");
+}
+
+RpcStartRecordingCommand::~RpcStartRecordingCommand() = default;
+
+int
+RpcStartRecordingCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
+{
+ ZEN_UNUSED(GlobalOptions, argc, argv);
+ if (!ParseOptions(argc, argv))
+ {
+ return 0;
+ }
+
+ if (m_RecordingPath.empty())
+ {
+ throw cxxopts::OptionParseException("Rpc start recording command requires a path");
+ }
+
+ cpr::Session Session;
+ Session.SetUrl(fmt::format("{}/z$/exec$/start-recording", m_HostName));
+ Session.SetParameters({{"path", m_RecordingPath}});
+ cpr::Response Response = Session.Post();
+ ZEN_CONSOLE("{}", FormatResponse(Response));
+ return GetReturnCode(Response);
+}
+
+////////////////////////////////////////////////////
+
+RpcStopRecordingCommand::RpcStopRecordingCommand()
+{
+ m_Options.add_options()("h,help", "Print help");
+ m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value("http://localhost:1337"), "<hosturl>");
+}
+
+RpcStopRecordingCommand::~RpcStopRecordingCommand() = default;
+
+int
+RpcStopRecordingCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
+{
+ ZEN_UNUSED(GlobalOptions, argc, argv);
+
+ if (!ParseOptions(argc, argv))
+ {
+ return 0;
+ }
+
+ cpr::Session Session;
+ Session.SetUrl(fmt::format("{}/z$/exec$/stop-recording", m_HostName));
+ cpr::Response Response = Session.Post();
+ ZEN_CONSOLE("{}", FormatResponse(Response));
+ return GetReturnCode(Response);
+}
+
+////////////////////////////////////////////////////
+
+RpcReplayCommand::RpcReplayCommand()
+{
+ m_Options.add_options()("h,help", "Print help");
+ m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value("http://localhost:1337"), "<hosturl>");
+ m_Options.add_option("", "p", "path", "Recording file path", cxxopts::value(m_RecordingPath), "<path>");
+ m_Options.add_option("",
+ "w",
+ "numthreads",
+ "Number of worker threads per process",
+ cxxopts::value(m_ThreadCount)->default_value(fmt::format("{}", std::thread::hardware_concurrency())),
+ "<count>");
+ m_Options.add_option("", "", "onhost", "Replay on host, bypassing http/network layer", cxxopts::value(m_OnHost), "<onhost>");
+ m_Options.add_option("",
+ "",
+ "offset",
+ "Offset into request recording to start replay",
+ cxxopts::value(m_Offset)->default_value("0"),
+ "<offset>");
+ m_Options.add_option("",
+ "",
+ "stride",
+ "Stride for request recording when replaying requests",
+ cxxopts::value(m_Stride)->default_value("1"),
+ "<stride>");
+ m_Options.add_option("", "", "numproc", "Number of worker processes", cxxopts::value(m_ProcessCount)->default_value("1"), "<count>");
+
+ m_Options.parse_positional("path");
+}
+
+RpcReplayCommand::~RpcReplayCommand() = default;
+
+int
+RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
+{
+ ZEN_UNUSED(GlobalOptions, argc, argv);
+
+ if (!ParseOptions(argc, argv))
+ {
+ return 0;
+ }
+
+ if (m_RecordingPath.empty())
+ {
+ throw cxxopts::OptionParseException("Rpc replay command requires a path");
+ }
+
+ if (m_OnHost)
+ {
+ cpr::Session Session;
+ Session.SetUrl(fmt::format("{}/z$/exec$/replay-recording", m_HostName));
+ Session.SetParameters({{"path", m_RecordingPath}, {"thread-count", fmt::format("{}", m_ThreadCount)}});
+ cpr::Response Response = Session.Post();
+ ZEN_CONSOLE("{}", FormatResponse(Response));
+ return GetReturnCode(Response);
+ }
+
+ std::unique_ptr<cache::IRpcRequestReplayer> Replayer = cache::MakeDiskRequestReplayer(m_RecordingPath, true);
+ uint64_t EntryCount = Replayer->GetRequestCount();
+
+ std::atomic_uint64_t EntryOffset = m_Offset;
+ std::atomic_uint64_t BytesSent = 0;
+ std::atomic_uint64_t BytesReceived = 0;
+
+ Stopwatch Timer;
+
+ if (m_ProcessCount > 1)
+ {
+ std::vector<std::unique_ptr<ProcessHandle>> WorkerProcesses;
+ WorkerProcesses.resize(m_ProcessCount);
+
+ ProcessMonitor Monitor;
+ for (int ProcessIndex = 0; ProcessIndex < m_ProcessCount; ++ProcessIndex)
+ {
+ std::string CommandLine =
+ fmt::format("{} rpc-record-replay --hosturl {} --path \"{}\" --offset {} --stride {} --numthreads {} --numproc {}",
+ argv[0],
+ m_HostName,
+ m_RecordingPath,
+ m_Stride == 1 ? 0 : m_Offset + ProcessIndex,
+ m_Stride,
+ m_ThreadCount,
+ 1);
+ CreateProcResult Result(CreateProc(std::filesystem::path(std::string(argv[0])), CommandLine));
+ WorkerProcesses[ProcessIndex] = std::make_unique<ProcessHandle>();
+ WorkerProcesses[ProcessIndex]->Initialize(Result);
+ Monitor.AddPid(WorkerProcesses[ProcessIndex]->Pid());
+ }
+ while (Monitor.IsRunning())
+ {
+ ZEN_CONSOLE("Waiting for worker processes...");
+ Sleep(1000);
+ }
+ return 0;
+ }
+ else
+ {
+ WorkerThreadPool WorkerPool(m_ThreadCount);
+
+ Latch WorkLatch(m_ThreadCount);
+ for (int WorkerIndex = 0; WorkerIndex < m_ThreadCount; ++WorkerIndex)
+ {
+ WorkerPool.ScheduleWork(
+ [HostName = m_HostName, &WorkLatch, EntryCount, &EntryOffset, &Replayer, &BytesSent, &BytesReceived, Stride = m_Stride]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+
+ cpr::Session Session;
+ Session.SetUrl(fmt::format("{}/z$/$rpc", HostName));
+
+ uint64_t EntryIndex = EntryOffset.fetch_add(Stride) - Stride;
+ while (EntryIndex < EntryCount)
+ {
+ IoBuffer Payload;
+ std::pair<ZenContentType, ZenContentType> Types = Replayer->GetRequest(EntryIndex, Payload);
+ ZenContentType RequestContentType = Types.first;
+ ZenContentType AcceptContentType = Types.second;
+
+ Session.SetHeader({{"Content-Type", std::string(MapContentTypeToString(RequestContentType))},
+ {"Accept", std::string(MapContentTypeToString(AcceptContentType))}});
+ uint64_t Offset = 0;
+ auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) {
+ size = Min<size_t>(size, Payload.GetSize() - Offset);
+ IoBuffer PayloadRange = IoBuffer(Payload, Offset, size);
+ MutableMemoryView Data(buffer, size);
+ Data.CopyFrom(PayloadRange.GetView());
+ Offset += size;
+ return true;
+ };
+ Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
+ cpr::Response Response = Session.Post();
+ BytesSent.fetch_add(Payload.GetSize());
+ if (Response.error || !(IsHttpSuccessCode(Response.status_code) ||
+ Response.status_code == gsl::narrow<long>(HttpResponseCode::NotFound)))
+ {
+ ZEN_CONSOLE("{}", FormatResponse(Response));
+ break;
+ }
+ BytesReceived.fetch_add(Response.downloaded_bytes);
+ EntryIndex = EntryOffset.fetch_add(Stride) - Stride;
+ }
+ });
+ }
+
+ while (!WorkLatch.Wait(1000))
+ {
+ ZEN_CONSOLE("Processing {} requests, {} remaining (sent {}, recevied {})...",
+ (EntryCount - m_Offset) / m_Stride,
+ (EntryCount - EntryOffset.load()) / m_Stride,
+ NiceBytes(BytesSent.load()),
+ NiceBytes(BytesReceived.load()));
+ }
+ }
+
+ const uint64_t RequestsSent = (EntryOffset.load() - m_Offset) / m_Stride;
+ const uint64_t ElapsedMS = Timer.GetElapsedTimeMs();
+ const double ElapsedS = ElapsedMS / 1000.500;
+ const uint64_t Sent = BytesSent.load();
+ const uint64_t Received = BytesReceived.load();
+ const uint64_t RequestsPerS = static_cast<uint64_t>(RequestsSent / ElapsedS);
+ const uint64_t SentPerS = static_cast<uint64_t>(Sent / ElapsedS);
+ const uint64_t ReceivedPerS = static_cast<uint64_t>(Received / ElapsedS);
+
+ ZEN_CONSOLE("Requests sent {} ({}/s), payloads sent {}B ({}B/s), payloads recevied {}B ({}B/s) in {}",
+ RequestsSent,
+ RequestsPerS,
+ NiceBytes(Sent),
+ NiceBytes(SentPerS),
+ NiceBytes(Received),
+ NiceBytes(ReceivedPerS),
+ NiceTimeSpanMs(ElapsedMS));
+
+ return 0;
+}
+
+} // namespace zen
diff --git a/zen/cmds/rpcreplay.h b/zen/cmds/rpcreplay.h
new file mode 100644
index 000000000..6f476f01e
--- /dev/null
+++ b/zen/cmds/rpcreplay.h
@@ -0,0 +1,58 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "../zen.h"
+
+namespace zen {
+
+class RpcStartRecordingCommand : public ZenCmdBase
+{
+public:
+ RpcStartRecordingCommand();
+ ~RpcStartRecordingCommand();
+
+ virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override;
+ virtual cxxopts::Options& Options() override { return m_Options; }
+
+private:
+ cxxopts::Options m_Options{"rpc-record-start", "Starts recording of cache rpc requests on a host"};
+ std::string m_HostName;
+ std::string m_RecordingPath;
+};
+
+class RpcStopRecordingCommand : public ZenCmdBase
+{
+public:
+ RpcStopRecordingCommand();
+ ~RpcStopRecordingCommand();
+
+ virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override;
+ virtual cxxopts::Options& Options() override { return m_Options; }
+
+private:
+ cxxopts::Options m_Options{"rpc-record-stop", "Stops recording of cache rpc requests on a host"};
+ std::string m_HostName;
+};
+
+class RpcReplayCommand : public ZenCmdBase
+{
+public:
+ RpcReplayCommand();
+ ~RpcReplayCommand();
+
+ virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override;
+ virtual cxxopts::Options& Options() override { return m_Options; }
+
+private:
+ cxxopts::Options m_Options{"rpc-record-replay", "Replays a previously recorded session of cache rpc requests to a target host"};
+ std::string m_HostName;
+ std::string m_RecordingPath;
+ bool m_OnHost = false;
+ int m_ProcessCount;
+ int m_ThreadCount;
+ uint64_t m_Offset;
+ uint64_t m_Stride;
+};
+
+} // namespace zen
diff --git a/zen/zen.cpp b/zen/zen.cpp
index 9d85680d1..e8ed08963 100644
--- a/zen/zen.cpp
+++ b/zen/zen.cpp
@@ -12,6 +12,7 @@
#include "cmds/hash.h"
#include "cmds/print.h"
#include "cmds/projectstore.h"
+#include "cmds/rpcreplay.h"
#include "cmds/scrub.h"
#include "cmds/status.h"
#include "cmds/top.h"
@@ -203,27 +204,30 @@ main(int argc, char** argv)
auto _ = zen::MakeGuard([] { spdlog::shutdown(); });
- HashCommand HashCmd;
- CopyCommand CopyCmd;
- DedupCommand DedupCmd;
- DropCommand DropCmd;
- StatusCommand StatusCmd;
- TopCommand TopCmd;
- PrintCommand PrintCmd;
- PrintPackageCommand PrintPkgCmd;
- PsCommand PsCmd;
- UpCommand UpCmd;
- DownCommand DownCmd;
- ExportOplogCommand ExportOplogCmd;
- ImportOplogCommand ImportOplogCmd;
- VersionCommand VersionCmd;
- CacheInfoCommand CacheInfoCmd;
- DropProjectCommand ProjectDropCmd;
- ProjectInfoCommand ProjectInfoCmd;
- CreateProjectCommand CreateProjectCmd;
- CreateOplogCommand CreateOplogCmd;
- GcCommand GcCmd;
- GcStatusCommand GcStatusCmd;
+ HashCommand HashCmd;
+ CopyCommand CopyCmd;
+ DedupCommand DedupCmd;
+ DropCommand DropCmd;
+ StatusCommand StatusCmd;
+ TopCommand TopCmd;
+ PrintCommand PrintCmd;
+ PrintPackageCommand PrintPkgCmd;
+ PsCommand PsCmd;
+ UpCommand UpCmd;
+ DownCommand DownCmd;
+ ExportOplogCommand ExportOplogCmd;
+ ImportOplogCommand ImportOplogCmd;
+ VersionCommand VersionCmd;
+ CacheInfoCommand CacheInfoCmd;
+ DropProjectCommand ProjectDropCmd;
+ ProjectInfoCommand ProjectInfoCmd;
+ CreateProjectCommand CreateProjectCmd;
+ CreateOplogCommand CreateOplogCmd;
+ GcCommand GcCmd;
+ GcStatusCommand GcStatusCmd;
+ RpcReplayCommand RpcReplayCmd;
+ RpcStartRecordingCommand RpcStartRecordingCmd;
+ RpcStopRecordingCommand RpcStopRecordingCmd;
#if ZEN_WITH_TESTS
RunTestsCommand RunTestsCmd;
@@ -258,6 +262,9 @@ main(int argc, char** argv)
{"oplog-import", &ImportOplogCmd, "Import project store oplog"},
{"gc", &GcCmd, "Garbage collect zen storage"},
{"gc-status", &GcStatusCmd, "Garbage collect zen storage status check"},
+ {"rpc-record-start", &RpcStartRecordingCmd, "Replays a previously recorded session of rpc requests"},
+ {"rpc-record-stop", &RpcStopRecordingCmd, "Starts recording of cache rpc requests on a host"},
+ {"rpc-record-replay", &RpcReplayCmd, "Stops recording of cache rpc requests on a host"},
#if ZEN_WITH_TESTS
{"runtests", &RunTestsCmd, "Run zen tests"},
#endif
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 7550ce111..1ef880396 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -17,10 +17,9 @@
#include <zencore/workthreadpool.h>
#include <zenhttp/httpserver.h>
#include <zenhttp/httpshared.h>
-#include <zenutil/basicfile.h>
#include <zenutil/cache/cache.h>
+#include <zenutil/cache/rpcrecording.h>
-//#include "cachekey.h"
#include "monitoring/httpstats.h"
#include "structuredcachestore.h"
#include "upstream/jupiter.h"
@@ -35,6 +34,7 @@
#include <queue>
#include <thread>
+#include <cpr/cpr.h>
#include <gsl/gsl-lite.hpp>
#if ZEN_WITH_TESTS
@@ -46,187 +46,6 @@ namespace zen {
using namespace std::literals;
-namespace cache::detail {
- struct RecordedRequest
- {
- uint64_t Offset;
- uint64_t Length;
- ZenContentType ContentType;
- };
-
- const uint64_t RecordedRequestBlockSize = 1ull << 31u;
-
- struct RecordedRequestsWriter
- {
- void BeginWrite(const std::filesystem::path& BasePath)
- {
- m_BasePath = BasePath;
- std::filesystem::create_directories(m_BasePath);
- }
-
- void EndWrite()
- {
- RwLock::ExclusiveLockScope _(m_Lock);
- m_BlockFiles.clear();
-
- IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest));
- BasicFile IndexFile;
- IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate);
- std::error_code Ec;
- IndexFile.WriteAll(IndexBuffer, Ec);
- IndexFile.Close();
- m_Entries.clear();
- }
-
- uint64_t WriteRequest(ZenContentType ContentType, const IoBuffer& RequestBuffer)
- {
- RwLock::ExclusiveLockScope Lock(m_Lock);
- uint64_t RequestIndex = m_Entries.size();
- RecordedRequest& Entry =
- m_Entries.emplace_back(RecordedRequest{.Offset = ~0ull, .Length = RequestBuffer.Size(), .ContentType = ContentType});
- if (Entry.Length < 1 * 1024 * 1024)
- {
- uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize);
- if (BlockIndex == m_BlockFiles.size())
- {
- std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>());
- NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate);
- m_ChunkOffset = BlockIndex * RecordedRequestBlockSize;
- }
- ZEN_ASSERT(BlockIndex < m_BlockFiles.size());
- BasicFile* BlockFile = m_BlockFiles[BlockIndex].get();
- ZEN_ASSERT(BlockFile != nullptr);
-
- Entry.Offset = m_ChunkOffset;
- m_ChunkOffset = RoundUp(m_ChunkOffset + Entry.Length, 1u << 4u);
- Lock.ReleaseNow();
-
- std::error_code Ec;
- BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), Entry.Offset - BlockIndex * RecordedRequestBlockSize, Ec);
- if (Ec)
- {
- Entry.Length = 0;
- return ~0ull;
- }
- return RequestIndex;
- }
- Lock.ReleaseNow();
-
- BasicFile RequestFile;
- RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate);
- std::error_code Ec;
- RequestFile.WriteAll(RequestBuffer, Ec);
- if (Ec)
- {
- Entry.Length = 0;
- return ~0ull;
- }
- return RequestIndex;
- }
-
- std::filesystem::path m_BasePath;
- mutable RwLock m_Lock;
- std::vector<RecordedRequest> m_Entries;
- std::vector<std::unique_ptr<BasicFile>> m_BlockFiles;
- uint64_t m_ChunkOffset;
- };
-
- struct RecordedRequestsReader
- {
- uint64_t BeginRead(const std::filesystem::path& BasePath)
- {
- m_BasePath = BasePath;
- BasicFile IndexFile;
- IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kRead);
- m_Entries.resize(IndexFile.FileSize() / sizeof(RecordedRequest));
- IndexFile.Read(m_Entries.data(), IndexFile.FileSize(), 0);
- uint64_t MaxChunkPosition = 0;
- for (const RecordedRequest& R : m_Entries)
- {
- if (R.Offset != ~0ull)
- {
- MaxChunkPosition = Max(MaxChunkPosition, R.Offset + R.Length);
- }
- }
- uint32_t BlockCount = gsl::narrow<uint32_t>(MaxChunkPosition / RecordedRequestBlockSize) + 1;
- m_BlockFiles.resize(BlockCount);
- for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; ++BlockIndex)
- {
- m_BlockFiles[BlockIndex] = std::make_unique<BasicFile>();
- m_BlockFiles[BlockIndex]->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kRead);
- }
- return m_Entries.size();
- }
- void EndRead() { m_BlockFiles.clear(); }
-
- ZenContentType ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const
- {
- if (RequestIndex >= m_Entries.size())
- {
- return ZenContentType::kUnknownContentType;
- }
- const RecordedRequest& Entry = m_Entries[RequestIndex];
- if (Entry.Length == 0)
- {
- return ZenContentType::kUnknownContentType;
- }
- if (Entry.Offset != ~0ull)
- {
- uint32_t BlockIndex = gsl::narrow<uint32_t>((Entry.Offset + Entry.Length) / RecordedRequestBlockSize);
- uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize);
- OutBuffer = IoBuffer(Entry.Length);
- MutableMemoryView OutView = OutBuffer.GetMutableView();
- m_BlockFiles[BlockIndex]->Read(OutView.GetData(), OutView.GetSize(), ChunkOffset);
- return Entry.ContentType;
- }
- BasicFile ChunkFile;
- ChunkFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kRead);
- OutBuffer = ChunkFile.ReadAll();
- return Entry.ContentType;
- }
-
- std::filesystem::path m_BasePath;
- std::vector<RecordedRequest> m_Entries;
- std::vector<std::unique_ptr<BasicFile>> m_BlockFiles;
- };
-
- class DiskRequestRecorder : public IRequestRecorder
- {
- public:
- DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); }
- virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); }
-
- private:
- virtual uint64_t IRequestRecorder_RecordRequest(ZenContentType ContentType, const IoBuffer& RequestBuffer) override
- {
- return m_RecordedRequests.WriteRequest(ContentType, RequestBuffer);
- }
- virtual void IRequestRecorder_RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {}
- virtual void IRequestRecorder_RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {}
- RecordedRequestsWriter m_RecordedRequests;
- };
-
- class DiskRequestReplayer : public IRequestReplayer
- {
- public:
- DiskRequestReplayer(const std::filesystem::path& BasePath) { m_RequestCount = m_RequestBuffer.BeginRead(BasePath); }
- virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); }
-
- private:
- virtual uint64_t IRequestReplayer_GetRequestCount() const override { return m_RequestCount; }
-
- virtual ZenContentType IRequestReplayer_GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override
- {
- return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer);
- }
- virtual ZenContentType IRequestRecorder_GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; }
-
- std::uint64_t m_RequestCount;
- RecordedRequestsReader m_RequestBuffer;
- };
-
-} // namespace cache::detail
-
//////////////////////////////////////////////////////////////////////////
CachePolicy
@@ -561,8 +380,8 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
{
m_RequestRecorder.reset();
HttpServerRequest::QueryParams Params = Request.GetQueryParams();
- std::string_view RecordPath = Params.GetValue("path");
- m_RequestRecorder = std::make_unique<cache::detail::DiskRequestRecorder>(RecordPath);
+ std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path")));
+ m_RequestRecorder = cache::MakeDiskRequestRecorder(RecordPath);
Request.WriteResponse(HttpResponseCode::OK);
return;
}
@@ -576,7 +395,7 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
{
m_RequestRecorder.reset();
HttpServerRequest::QueryParams Params = Request.GetQueryParams();
- std::string_view RecordPath = Params.GetValue("path");
+ std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path")));
uint32_t ThreadCount = std::thread::hardware_concurrency();
if (auto Param = Params.GetValue("thread_count"); Param.empty() == false)
{
@@ -585,8 +404,8 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
ThreadCount = gsl::narrow<uint32_t>(Value.value());
}
}
- cache::detail::DiskRequestReplayer Replayer(RecordPath);
- ReplayRequestRecorder(Replayer, ThreadCount < 1 ? 1 : ThreadCount);
+ std::unique_ptr<cache::IRpcRequestReplayer> Replayer(cache::MakeDiskRequestReplayer(RecordPath, false));
+ ReplayRequestRecorder(*Replayer, ThreadCount < 1 ? 1 : ThreadCount);
Request.WriteResponse(HttpResponseCode::OK);
return;
}
@@ -1516,10 +1335,10 @@ HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType,
}
void
-HttpStructuredCacheService::ReplayRequestRecorder(cache::detail::IRequestReplayer& Replayer, uint32_t ThreadCount)
+HttpStructuredCacheService::ReplayRequestRecorder(cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount)
{
WorkerThreadPool WorkerPool(ThreadCount);
- uint64_t RequestCount = Replayer.IRequestReplayer_GetRequestCount();
+ uint64_t RequestCount = Replayer.GetRequestCount();
Stopwatch Timer;
auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); });
Latch JobLatch(RequestCount);
@@ -1527,13 +1346,13 @@ HttpStructuredCacheService::ReplayRequestRecorder(cache::detail::IRequestReplaye
for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex)
{
WorkerPool.ScheduleWork([this, &JobLatch, &Replayer, RequestIndex]() {
- IoBuffer Body;
- ZenContentType ContentType = Replayer.IRequestReplayer_GetRequest(RequestIndex, Body);
+ IoBuffer Body;
+ std::pair<ZenContentType, ZenContentType> ContentType = Replayer.GetRequest(RequestIndex, Body);
if (Body)
{
uint32_t AcceptMagic = 0;
RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
- CbPackage RpcResult = HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags);
+ CbPackage RpcResult = HandleRpcRequest(ContentType.first, std::move(Body), AcceptMagic, AcceptFlags);
if (AcceptMagic == kCbPkgMagic)
{
bool AllowFileReferences = EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences);
@@ -1579,51 +1398,49 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
return Request.WriteResponse(HttpResponseCode::BadRequest);
}
- Request.WriteResponseAsync([this, Body = Request.ReadPayload(), ContentType](HttpServerRequest& AsyncRequest) mutable {
- std::uint64_t RequestIndex =
- m_RequestRecorder ? m_RequestRecorder->IRequestRecorder_RecordRequest(ContentType, Body) : ~0ull;
- uint32_t AcceptMagic = 0;
- RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
- CbPackage RpcResult = HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags);
- if (RpcResult.IsNull())
- {
- AsyncRequest.WriteResponse(HttpResponseCode::BadRequest);
- return;
- }
- if (AcceptMagic == kCbPkgMagic)
- {
- bool AllowFileReferences = EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences);
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(
- RpcResult,
- AllowFileReferences ? FormatFlags::kAllowLocalReferences | FormatFlags::kDenyPartialLocalReferences
- : FormatFlags::kDefault);
- if (RequestIndex != ~0ull)
+ Request.WriteResponseAsync(
+ [this, Body = Request.ReadPayload(), ContentType, AcceptType](HttpServerRequest& AsyncRequest) mutable {
+ std::uint64_t RequestIndex =
+ m_RequestRecorder ? m_RequestRecorder->RecordRequest(ContentType, AcceptType, Body) : ~0ull;
+ uint32_t AcceptMagic = 0;
+ RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
+ CbPackage RpcResult = HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags);
+ if (RpcResult.IsNull())
{
- ZEN_ASSERT(m_RequestRecorder);
- m_RequestRecorder->IRequestRecorder_RecordResponse(RequestIndex,
- HttpContentType::kCbPackage,
- RpcResponseBuffer);
+ AsyncRequest.WriteResponse(HttpResponseCode::BadRequest);
+ return;
}
- AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
- }
- else
- {
- BinaryWriter MemStream;
- RpcResult.Save(MemStream);
-
- if (RequestIndex != ~0ull)
+ if (AcceptMagic == kCbPkgMagic)
{
- ZEN_ASSERT(m_RequestRecorder);
- m_RequestRecorder->IRequestRecorder_RecordResponse(
- RequestIndex,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ bool AllowFileReferences = EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences);
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(
+ RpcResult,
+ AllowFileReferences ? FormatFlags::kAllowLocalReferences | FormatFlags::kDenyPartialLocalReferences
+ : FormatFlags::kDefault);
+ if (RequestIndex != ~0ull)
+ {
+ ZEN_ASSERT(m_RequestRecorder);
+ m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer);
+ }
+ AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
}
- AsyncRequest.WriteResponse(HttpResponseCode::OK,
- HttpContentType::kCbPackage,
- IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
- }
- });
+ else
+ {
+ BinaryWriter MemStream;
+ RpcResult.Save(MemStream);
+
+ if (RequestIndex != ~0ull)
+ {
+ ZEN_ASSERT(m_RequestRecorder);
+ m_RequestRecorder->RecordResponse(RequestIndex,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ }
+ AsyncRequest.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+ }
+ });
}
break;
default:
diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h
index 6a4cedef0..f606a1cf5 100644
--- a/zenserver/cache/structuredcache.h
+++ b/zenserver/cache/structuredcache.h
@@ -28,32 +28,14 @@ class ZenCacheStore;
enum class CachePolicy : uint32_t;
enum class RpcAcceptOptions : uint16_t;
-namespace cache::detail {
- struct RecordBody;
- struct ChunkRequest;
- struct RecordedRequests;
-
- class IRequestRecorder
- {
- public:
- virtual ~IRequestRecorder() {}
- virtual uint64_t IRequestRecorder_RecordRequest(const ZenContentType ContentType, const IoBuffer& RequestBuffer) = 0;
- virtual void IRequestRecorder_RecordResponse(uint64_t RequestIndex,
- const ZenContentType ContentType,
- const IoBuffer& ResponseBuffer) = 0;
- virtual void IRequestRecorder_RecordResponse(uint64_t RequestIndex,
- const ZenContentType ContentType,
- const CompositeBuffer& ResponseBuffer) = 0;
- };
- class IRequestReplayer
- {
- public:
- virtual ~IRequestReplayer() {}
- virtual uint64_t IRequestReplayer_GetRequestCount() const = 0;
- virtual ZenContentType IRequestReplayer_GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) = 0;
- virtual ZenContentType IRequestRecorder_GetResponse(uint64_t RequestIndex, IoBuffer& OutBuffer) = 0;
- };
-} // namespace cache::detail
+namespace cache {
+ class IRpcRequestReplayer;
+ class IRpcRequestRecorder;
+ namespace detail {
+ struct RecordBody;
+ struct ChunkRequest;
+ } // namespace detail
+} // namespace cache
/**
* Structured cache service. Imposes constraints on keys, supports blobs and
@@ -185,9 +167,9 @@ private:
metrics::OperationTiming m_UpstreamGetRequestTiming;
CacheStats m_CacheStats;
- void ReplayRequestRecorder(cache::detail::IRequestReplayer& Replayer, uint32_t ThreadCount);
+ void ReplayRequestRecorder(cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount);
- std::unique_ptr<cache::detail::IRequestRecorder> m_RequestRecorder;
+ std::unique_ptr<cache::IRpcRequestRecorder> m_RequestRecorder;
};
/** Recognize both kBinary and kCompressedBinary as kCompressedBinary for structured cache value keys.
diff --git a/zenserver/projectstore/projectstore.cpp b/zenserver/projectstore/projectstore.cpp
index 57b24c63d..c3fba7dff 100644
--- a/zenserver/projectstore/projectstore.cpp
+++ b/zenserver/projectstore/projectstore.cpp
@@ -16,6 +16,7 @@
#include <zenstore/caslog.h>
#include <zenstore/cidstore.h>
#include <zenstore/scrubcontext.h>
+#include <zenutil/cache/rpcrecording.h>
#include "fileremoteprojectstore.h"
#include "jupiterremoteprojectstore.h"
diff --git a/zenutil/cache/rpcrecording.cpp b/zenutil/cache/rpcrecording.cpp
new file mode 100644
index 000000000..4958a27f6
--- /dev/null
+++ b/zenutil/cache/rpcrecording.cpp
@@ -0,0 +1,210 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenutil/basicfile.h>
+#include <zenutil/cache/rpcrecording.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <fmt/format.h>
+#include <gsl/gsl-lite.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen::cache {
+struct RecordedRequest
+{
+ uint64_t Offset;
+ uint64_t Length;
+ ZenContentType ContentType;
+ ZenContentType AcceptType;
+};
+
+const uint64_t RecordedRequestBlockSize = 1ull << 31u;
+
+struct RecordedRequestsWriter
+{
+ void BeginWrite(const std::filesystem::path& BasePath)
+ {
+ m_BasePath = BasePath;
+ std::filesystem::create_directories(m_BasePath);
+ }
+
+ void EndWrite()
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_BlockFiles.clear();
+
+ IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest));
+ BasicFile IndexFile;
+ IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate);
+ std::error_code Ec;
+ IndexFile.WriteAll(IndexBuffer, Ec);
+ IndexFile.Close();
+ m_Entries.clear();
+ }
+
+ uint64_t WriteRequest(ZenContentType ContentType, ZenContentType AcceptType, const IoBuffer& RequestBuffer)
+ {
+ RwLock::ExclusiveLockScope Lock(m_Lock);
+ uint64_t RequestIndex = m_Entries.size();
+ RecordedRequest& Entry = m_Entries.emplace_back(
+ RecordedRequest{.Offset = ~0ull, .Length = RequestBuffer.Size(), .ContentType = ContentType, .AcceptType = AcceptType});
+ if (Entry.Length < 1 * 1024 * 1024)
+ {
+ uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize);
+ if (BlockIndex == m_BlockFiles.size())
+ {
+ std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>());
+ NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate);
+ m_ChunkOffset = BlockIndex * RecordedRequestBlockSize;
+ }
+ ZEN_ASSERT(BlockIndex < m_BlockFiles.size());
+ BasicFile* BlockFile = m_BlockFiles[BlockIndex].get();
+ ZEN_ASSERT(BlockFile != nullptr);
+
+ Entry.Offset = m_ChunkOffset;
+ m_ChunkOffset = RoundUp(m_ChunkOffset + Entry.Length, 1u << 4u);
+ Lock.ReleaseNow();
+
+ std::error_code Ec;
+ BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), Entry.Offset - BlockIndex * RecordedRequestBlockSize, Ec);
+ if (Ec)
+ {
+ Entry.Length = 0;
+ return ~0ull;
+ }
+ return RequestIndex;
+ }
+ Lock.ReleaseNow();
+
+ BasicFile RequestFile;
+ RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate);
+ std::error_code Ec;
+ RequestFile.WriteAll(RequestBuffer, Ec);
+ if (Ec)
+ {
+ Entry.Length = 0;
+ return ~0ull;
+ }
+ return RequestIndex;
+ }
+
+ std::filesystem::path m_BasePath;
+ mutable RwLock m_Lock;
+ std::vector<RecordedRequest> m_Entries;
+ std::vector<std::unique_ptr<BasicFile>> m_BlockFiles;
+ uint64_t m_ChunkOffset;
+};
+
+struct RecordedRequestsReader
+{
+ uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory)
+ {
+ m_BasePath = BasePath;
+ BasicFile IndexFile;
+ IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kRead);
+ m_Entries.resize(IndexFile.FileSize() / sizeof(RecordedRequest));
+ IndexFile.Read(m_Entries.data(), IndexFile.FileSize(), 0);
+ uint64_t MaxChunkPosition = 0;
+ for (const RecordedRequest& R : m_Entries)
+ {
+ if (R.Offset != ~0ull)
+ {
+ MaxChunkPosition = Max(MaxChunkPosition, R.Offset + R.Length);
+ }
+ }
+ uint32_t BlockCount = gsl::narrow<uint32_t>(MaxChunkPosition / RecordedRequestBlockSize) + 1;
+ m_BlockFiles.resize(BlockCount);
+ for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; ++BlockIndex)
+ {
+ if (InMemory)
+ {
+ BasicFile Chunk;
+ Chunk.Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kRead);
+ m_BlockFiles[BlockIndex] = Chunk.ReadAll();
+ continue;
+ }
+ m_BlockFiles[BlockIndex] = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("chunks{}.bin", BlockIndex));
+ }
+ return m_Entries.size();
+ }
+ void EndRead() { m_BlockFiles.clear(); }
+
+ std::pair<ZenContentType, ZenContentType> ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const
+ {
+ if (RequestIndex >= m_Entries.size())
+ {
+ return {ZenContentType::kUnknownContentType, ZenContentType::kUnknownContentType};
+ }
+ const RecordedRequest& Entry = m_Entries[RequestIndex];
+ if (Entry.Length == 0)
+ {
+ return {ZenContentType::kUnknownContentType, ZenContentType::kUnknownContentType};
+ }
+ if (Entry.Offset != ~0ull)
+ {
+ uint32_t BlockIndex = gsl::narrow<uint32_t>((Entry.Offset + Entry.Length) / RecordedRequestBlockSize);
+ uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize);
+ OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length);
+ return {Entry.ContentType, Entry.AcceptType};
+ }
+ OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex));
+ return {Entry.ContentType, Entry.AcceptType};
+ }
+
+ std::filesystem::path m_BasePath;
+ std::vector<RecordedRequest> m_Entries;
+ std::vector<IoBuffer> m_BlockFiles;
+};
+
+class DiskRequestRecorder : public IRpcRequestRecorder
+{
+public:
+ DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); }
+ virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); }
+
+private:
+ virtual uint64_t RecordRequest(const ZenContentType ContentType,
+ const ZenContentType AcceptType,
+ const IoBuffer& RequestBuffer) override
+ {
+ return m_RecordedRequests.WriteRequest(ContentType, AcceptType, RequestBuffer);
+ }
+ virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {}
+ virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {}
+ RecordedRequestsWriter m_RecordedRequests;
+};
+
+class DiskRequestReplayer : public IRpcRequestReplayer
+{
+public:
+ DiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory)
+ {
+ m_RequestCount = m_RequestBuffer.BeginRead(BasePath, InMemory);
+ }
+ virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); }
+
+private:
+ virtual uint64_t GetRequestCount() const override { return m_RequestCount; }
+
+ virtual std::pair<ZenContentType, ZenContentType> GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override
+ {
+ return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer);
+ }
+ virtual ZenContentType GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; }
+
+ std::uint64_t m_RequestCount;
+ RecordedRequestsReader m_RequestBuffer;
+};
+
+std::unique_ptr<cache::IRpcRequestRecorder>
+MakeDiskRequestRecorder(const std::filesystem::path& BasePath)
+{
+ return std::make_unique<DiskRequestRecorder>(BasePath);
+}
+
+std::unique_ptr<cache::IRpcRequestReplayer>
+MakeDiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory)
+{
+ return std::make_unique<DiskRequestReplayer>(BasePath, InMemory);
+}
+
+} // namespace zen::cache
diff --git a/zenutil/include/zenutil/cache/rpcrecording.h b/zenutil/include/zenutil/cache/rpcrecording.h
new file mode 100644
index 000000000..6d65a532a
--- /dev/null
+++ b/zenutil/include/zenutil/cache/rpcrecording.h
@@ -0,0 +1,29 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/compositebuffer.h>
+#include <zencore/iobuffer.h>
+
+namespace zen::cache {
+class IRpcRequestRecorder
+{
+public:
+ virtual ~IRpcRequestRecorder() {}
+ virtual uint64_t RecordRequest(const ZenContentType ContentType, const ZenContentType AcceptType, const IoBuffer& RequestBuffer) = 0;
+ virtual void RecordResponse(uint64_t RequestIndex, const ZenContentType ContentType, const IoBuffer& ResponseBuffer) = 0;
+ virtual void RecordResponse(uint64_t RequestIndex, const ZenContentType ContentType, const CompositeBuffer& ResponseBuffer) = 0;
+};
+class IRpcRequestReplayer
+{
+public:
+ virtual ~IRpcRequestReplayer() {}
+ virtual uint64_t GetRequestCount() const = 0;
+ virtual std::pair<ZenContentType, ZenContentType> GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) = 0;
+ virtual ZenContentType GetResponse(uint64_t RequestIndex, IoBuffer& OutBuffer) = 0;
+};
+
+std::unique_ptr<cache::IRpcRequestRecorder> MakeDiskRequestRecorder(const std::filesystem::path& BasePath);
+std::unique_ptr<cache::IRpcRequestReplayer> MakeDiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory);
+
+} // namespace zen::cache