diff options
| author | Dan Engelbrecht <[email protected]> | 2023-02-17 08:20:46 +0000 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-02-17 00:20:46 -0800 |
| commit | 2ec65fd3e171145450416ae76be1a7dd8c646704 (patch) | |
| tree | 13bdb7e5deb60d4e58be2714cfdcf7f70272bc1e | |
| parent | Experimental ObjectStore/CDN like endpoint (diff) | |
| download | zen-2ec65fd3e171145450416ae76be1a7dd8c646704.tar.xz zen-2ec65fd3e171145450416ae76be1a7dd8c646704.zip | |
Enhanced rpc request recording (#229)
* rpc replay zen command
* fix replay sessions for thread
* recording start/stop as zen commands
* move rpcrecording code to zenutil to remove code duplication
* simplify recording http request threading
* added more data logging to rpc replay
* NotFound is an acceptable response for an rpc request
* fix rpc replay command line parsing
* rpc replay stats
* Allow spawning of sub-process workers when replaying rpc recording
* changelog
| -rw-r--r-- | CHANGELOG.md | 10 | ||||
| -rw-r--r-- | zen/cmds/rpcreplay.cpp | 258 | ||||
| -rw-r--r-- | zen/cmds/rpcreplay.h | 58 | ||||
| -rw-r--r-- | zen/zen.cpp | 49 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 285 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 38 | ||||
| -rw-r--r-- | zenserver/projectstore/projectstore.cpp | 1 | ||||
| -rw-r--r-- | zenutil/cache/rpcrecording.cpp | 210 | ||||
| -rw-r--r-- | zenutil/include/zenutil/cache/rpcrecording.h | 29 |
9 files changed, 655 insertions, 283 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ac2e6642..1598d4496 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,14 @@ ## +- Feature: Zen command line tool `rpc-record-start` to record all RPC requests to the structured cache + - `--path` Recording file path where the rpc requests will be stored +- Feature: Zen command line tool `rpc-record-stop` stop the currently active RPC request recording started with `rpc-record-start` +- Feature: Zen command line tool `rpc-record-replay` replacy a RPC request recording created with `rpc-record-start` + - `--path` Recording file path where the rpc requests are stored + - `--numthreads` Number of worker threads to use while replaying the RPC requests + - `--numproc` Number of worker processes to run, if more than one new processes will be spawn with `<numthreads>` workers each + - `--offset` Offset into request playback to start at + - `--stride` The stride to use when selecting requests to playback + - `--onhost` Replay the recording inside the zenserver bypassing http overhead - Improvement: FileCas now keeps an up to date index of all the entries improving performance when getting cache misses on large payloads - Changed: Exit with failure code on port conflict rather than reporting crash to Sentry diff --git a/zen/cmds/rpcreplay.cpp b/zen/cmds/rpcreplay.cpp new file mode 100644 index 000000000..228a4daa9 --- /dev/null +++ b/zen/cmds/rpcreplay.cpp @@ -0,0 +1,258 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "rpcreplay.h" + +#include <zencore/filesystem.h> +#include <zencore/logging.h> +#include <zencore/scopeguard.h> +#include <zencore/timer.h> +#include <zencore/workthreadpool.h> +#include <zenhttp/httpcommon.h> +#include <zenutil/cache/rpcrecording.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <cpr/cpr.h> +#include <fmt/format.h> +#include <gsl/gsl-lite.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +#include <memory> + +namespace zen { + +RpcStartRecordingCommand::RpcStartRecordingCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value("http://localhost:1337"), "<hosturl>"); + m_Options.add_option("", "p", "path", "Recording file path", cxxopts::value(m_RecordingPath), "<path>"); + + m_Options.parse_positional("path"); +} + +RpcStartRecordingCommand::~RpcStartRecordingCommand() = default; + +int +RpcStartRecordingCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + ZEN_UNUSED(GlobalOptions, argc, argv); + if (!ParseOptions(argc, argv)) + { + return 0; + } + + if (m_RecordingPath.empty()) + { + throw cxxopts::OptionParseException("Rpc start recording command requires a path"); + } + + cpr::Session Session; + Session.SetUrl(fmt::format("{}/z$/exec$/start-recording", m_HostName)); + Session.SetParameters({{"path", m_RecordingPath}}); + cpr::Response Response = Session.Post(); + ZEN_CONSOLE("{}", FormatResponse(Response)); + return GetReturnCode(Response); +} + +//////////////////////////////////////////////////// + +RpcStopRecordingCommand::RpcStopRecordingCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value("http://localhost:1337"), "<hosturl>"); +} + +RpcStopRecordingCommand::~RpcStopRecordingCommand() = default; + +int +RpcStopRecordingCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + ZEN_UNUSED(GlobalOptions, argc, argv); + + if (!ParseOptions(argc, argv)) + { + return 0; + } + + cpr::Session Session; + Session.SetUrl(fmt::format("{}/z$/exec$/stop-recording", m_HostName)); + cpr::Response Response = Session.Post(); + ZEN_CONSOLE("{}", FormatResponse(Response)); + return GetReturnCode(Response); +} + +//////////////////////////////////////////////////// + +RpcReplayCommand::RpcReplayCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value("http://localhost:1337"), "<hosturl>"); + m_Options.add_option("", "p", "path", "Recording file path", cxxopts::value(m_RecordingPath), "<path>"); + m_Options.add_option("", + "w", + "numthreads", + "Number of worker threads per process", + cxxopts::value(m_ThreadCount)->default_value(fmt::format("{}", std::thread::hardware_concurrency())), + "<count>"); + m_Options.add_option("", "", "onhost", "Replay on host, bypassing http/network layer", cxxopts::value(m_OnHost), "<onhost>"); + m_Options.add_option("", + "", + "offset", + "Offset into request recording to start replay", + cxxopts::value(m_Offset)->default_value("0"), + "<offset>"); + m_Options.add_option("", + "", + "stride", + "Stride for request recording when replaying requests", + cxxopts::value(m_Stride)->default_value("1"), + "<stride>"); + m_Options.add_option("", "", "numproc", "Number of worker processes", cxxopts::value(m_ProcessCount)->default_value("1"), "<count>"); + + m_Options.parse_positional("path"); +} + +RpcReplayCommand::~RpcReplayCommand() = default; + +int +RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + ZEN_UNUSED(GlobalOptions, argc, argv); + + if (!ParseOptions(argc, argv)) + { + return 0; + } + + if (m_RecordingPath.empty()) + { + throw cxxopts::OptionParseException("Rpc replay command requires a path"); + } + + if (m_OnHost) + { + cpr::Session Session; + Session.SetUrl(fmt::format("{}/z$/exec$/replay-recording", m_HostName)); + Session.SetParameters({{"path", m_RecordingPath}, {"thread-count", fmt::format("{}", m_ThreadCount)}}); + cpr::Response Response = Session.Post(); + ZEN_CONSOLE("{}", FormatResponse(Response)); + return GetReturnCode(Response); + } + + std::unique_ptr<cache::IRpcRequestReplayer> Replayer = cache::MakeDiskRequestReplayer(m_RecordingPath, true); + uint64_t EntryCount = Replayer->GetRequestCount(); + + std::atomic_uint64_t EntryOffset = m_Offset; + std::atomic_uint64_t BytesSent = 0; + std::atomic_uint64_t BytesReceived = 0; + + Stopwatch Timer; + + if (m_ProcessCount > 1) + { + std::vector<std::unique_ptr<ProcessHandle>> WorkerProcesses; + WorkerProcesses.resize(m_ProcessCount); + + ProcessMonitor Monitor; + for (int ProcessIndex = 0; ProcessIndex < m_ProcessCount; ++ProcessIndex) + { + std::string CommandLine = + fmt::format("{} rpc-record-replay --hosturl {} --path \"{}\" --offset {} --stride {} --numthreads {} --numproc {}", + argv[0], + m_HostName, + m_RecordingPath, + m_Stride == 1 ? 0 : m_Offset + ProcessIndex, + m_Stride, + m_ThreadCount, + 1); + CreateProcResult Result(CreateProc(std::filesystem::path(std::string(argv[0])), CommandLine)); + WorkerProcesses[ProcessIndex] = std::make_unique<ProcessHandle>(); + WorkerProcesses[ProcessIndex]->Initialize(Result); + Monitor.AddPid(WorkerProcesses[ProcessIndex]->Pid()); + } + while (Monitor.IsRunning()) + { + ZEN_CONSOLE("Waiting for worker processes..."); + Sleep(1000); + } + return 0; + } + else + { + WorkerThreadPool WorkerPool(m_ThreadCount); + + Latch WorkLatch(m_ThreadCount); + for (int WorkerIndex = 0; WorkerIndex < m_ThreadCount; ++WorkerIndex) + { + WorkerPool.ScheduleWork( + [HostName = m_HostName, &WorkLatch, EntryCount, &EntryOffset, &Replayer, &BytesSent, &BytesReceived, Stride = m_Stride]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + + cpr::Session Session; + Session.SetUrl(fmt::format("{}/z$/$rpc", HostName)); + + uint64_t EntryIndex = EntryOffset.fetch_add(Stride) - Stride; + while (EntryIndex < EntryCount) + { + IoBuffer Payload; + std::pair<ZenContentType, ZenContentType> Types = Replayer->GetRequest(EntryIndex, Payload); + ZenContentType RequestContentType = Types.first; + ZenContentType AcceptContentType = Types.second; + + Session.SetHeader({{"Content-Type", std::string(MapContentTypeToString(RequestContentType))}, + {"Accept", std::string(MapContentTypeToString(AcceptContentType))}}); + uint64_t Offset = 0; + auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) { + size = Min<size_t>(size, Payload.GetSize() - Offset); + IoBuffer PayloadRange = IoBuffer(Payload, Offset, size); + MutableMemoryView Data(buffer, size); + Data.CopyFrom(PayloadRange.GetView()); + Offset += size; + return true; + }; + Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); + cpr::Response Response = Session.Post(); + BytesSent.fetch_add(Payload.GetSize()); + if (Response.error || !(IsHttpSuccessCode(Response.status_code) || + Response.status_code == gsl::narrow<long>(HttpResponseCode::NotFound))) + { + ZEN_CONSOLE("{}", FormatResponse(Response)); + break; + } + BytesReceived.fetch_add(Response.downloaded_bytes); + EntryIndex = EntryOffset.fetch_add(Stride) - Stride; + } + }); + } + + while (!WorkLatch.Wait(1000)) + { + ZEN_CONSOLE("Processing {} requests, {} remaining (sent {}, recevied {})...", + (EntryCount - m_Offset) / m_Stride, + (EntryCount - EntryOffset.load()) / m_Stride, + NiceBytes(BytesSent.load()), + NiceBytes(BytesReceived.load())); + } + } + + const uint64_t RequestsSent = (EntryOffset.load() - m_Offset) / m_Stride; + const uint64_t ElapsedMS = Timer.GetElapsedTimeMs(); + const double ElapsedS = ElapsedMS / 1000.500; + const uint64_t Sent = BytesSent.load(); + const uint64_t Received = BytesReceived.load(); + const uint64_t RequestsPerS = static_cast<uint64_t>(RequestsSent / ElapsedS); + const uint64_t SentPerS = static_cast<uint64_t>(Sent / ElapsedS); + const uint64_t ReceivedPerS = static_cast<uint64_t>(Received / ElapsedS); + + ZEN_CONSOLE("Requests sent {} ({}/s), payloads sent {}B ({}B/s), payloads recevied {}B ({}B/s) in {}", + RequestsSent, + RequestsPerS, + NiceBytes(Sent), + NiceBytes(SentPerS), + NiceBytes(Received), + NiceBytes(ReceivedPerS), + NiceTimeSpanMs(ElapsedMS)); + + return 0; +} + +} // namespace zen diff --git a/zen/cmds/rpcreplay.h b/zen/cmds/rpcreplay.h new file mode 100644 index 000000000..6f476f01e --- /dev/null +++ b/zen/cmds/rpcreplay.h @@ -0,0 +1,58 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "../zen.h" + +namespace zen { + +class RpcStartRecordingCommand : public ZenCmdBase +{ +public: + RpcStartRecordingCommand(); + ~RpcStartRecordingCommand(); + + virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override; + virtual cxxopts::Options& Options() override { return m_Options; } + +private: + cxxopts::Options m_Options{"rpc-record-start", "Starts recording of cache rpc requests on a host"}; + std::string m_HostName; + std::string m_RecordingPath; +}; + +class RpcStopRecordingCommand : public ZenCmdBase +{ +public: + RpcStopRecordingCommand(); + ~RpcStopRecordingCommand(); + + virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override; + virtual cxxopts::Options& Options() override { return m_Options; } + +private: + cxxopts::Options m_Options{"rpc-record-stop", "Stops recording of cache rpc requests on a host"}; + std::string m_HostName; +}; + +class RpcReplayCommand : public ZenCmdBase +{ +public: + RpcReplayCommand(); + ~RpcReplayCommand(); + + virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override; + virtual cxxopts::Options& Options() override { return m_Options; } + +private: + cxxopts::Options m_Options{"rpc-record-replay", "Replays a previously recorded session of cache rpc requests to a target host"}; + std::string m_HostName; + std::string m_RecordingPath; + bool m_OnHost = false; + int m_ProcessCount; + int m_ThreadCount; + uint64_t m_Offset; + uint64_t m_Stride; +}; + +} // namespace zen diff --git a/zen/zen.cpp b/zen/zen.cpp index 9d85680d1..e8ed08963 100644 --- a/zen/zen.cpp +++ b/zen/zen.cpp @@ -12,6 +12,7 @@ #include "cmds/hash.h" #include "cmds/print.h" #include "cmds/projectstore.h" +#include "cmds/rpcreplay.h" #include "cmds/scrub.h" #include "cmds/status.h" #include "cmds/top.h" @@ -203,27 +204,30 @@ main(int argc, char** argv) auto _ = zen::MakeGuard([] { spdlog::shutdown(); }); - HashCommand HashCmd; - CopyCommand CopyCmd; - DedupCommand DedupCmd; - DropCommand DropCmd; - StatusCommand StatusCmd; - TopCommand TopCmd; - PrintCommand PrintCmd; - PrintPackageCommand PrintPkgCmd; - PsCommand PsCmd; - UpCommand UpCmd; - DownCommand DownCmd; - ExportOplogCommand ExportOplogCmd; - ImportOplogCommand ImportOplogCmd; - VersionCommand VersionCmd; - CacheInfoCommand CacheInfoCmd; - DropProjectCommand ProjectDropCmd; - ProjectInfoCommand ProjectInfoCmd; - CreateProjectCommand CreateProjectCmd; - CreateOplogCommand CreateOplogCmd; - GcCommand GcCmd; - GcStatusCommand GcStatusCmd; + HashCommand HashCmd; + CopyCommand CopyCmd; + DedupCommand DedupCmd; + DropCommand DropCmd; + StatusCommand StatusCmd; + TopCommand TopCmd; + PrintCommand PrintCmd; + PrintPackageCommand PrintPkgCmd; + PsCommand PsCmd; + UpCommand UpCmd; + DownCommand DownCmd; + ExportOplogCommand ExportOplogCmd; + ImportOplogCommand ImportOplogCmd; + VersionCommand VersionCmd; + CacheInfoCommand CacheInfoCmd; + DropProjectCommand ProjectDropCmd; + ProjectInfoCommand ProjectInfoCmd; + CreateProjectCommand CreateProjectCmd; + CreateOplogCommand CreateOplogCmd; + GcCommand GcCmd; + GcStatusCommand GcStatusCmd; + RpcReplayCommand RpcReplayCmd; + RpcStartRecordingCommand RpcStartRecordingCmd; + RpcStopRecordingCommand RpcStopRecordingCmd; #if ZEN_WITH_TESTS RunTestsCommand RunTestsCmd; @@ -258,6 +262,9 @@ main(int argc, char** argv) {"oplog-import", &ImportOplogCmd, "Import project store oplog"}, {"gc", &GcCmd, "Garbage collect zen storage"}, {"gc-status", &GcStatusCmd, "Garbage collect zen storage status check"}, + {"rpc-record-start", &RpcStartRecordingCmd, "Replays a previously recorded session of rpc requests"}, + {"rpc-record-stop", &RpcStopRecordingCmd, "Starts recording of cache rpc requests on a host"}, + {"rpc-record-replay", &RpcReplayCmd, "Stops recording of cache rpc requests on a host"}, #if ZEN_WITH_TESTS {"runtests", &RunTestsCmd, "Run zen tests"}, #endif diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 7550ce111..1ef880396 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -17,10 +17,9 @@ #include <zencore/workthreadpool.h> #include <zenhttp/httpserver.h> #include <zenhttp/httpshared.h> -#include <zenutil/basicfile.h> #include <zenutil/cache/cache.h> +#include <zenutil/cache/rpcrecording.h> -//#include "cachekey.h" #include "monitoring/httpstats.h" #include "structuredcachestore.h" #include "upstream/jupiter.h" @@ -35,6 +34,7 @@ #include <queue> #include <thread> +#include <cpr/cpr.h> #include <gsl/gsl-lite.hpp> #if ZEN_WITH_TESTS @@ -46,187 +46,6 @@ namespace zen { using namespace std::literals; -namespace cache::detail { - struct RecordedRequest - { - uint64_t Offset; - uint64_t Length; - ZenContentType ContentType; - }; - - const uint64_t RecordedRequestBlockSize = 1ull << 31u; - - struct RecordedRequestsWriter - { - void BeginWrite(const std::filesystem::path& BasePath) - { - m_BasePath = BasePath; - std::filesystem::create_directories(m_BasePath); - } - - void EndWrite() - { - RwLock::ExclusiveLockScope _(m_Lock); - m_BlockFiles.clear(); - - IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest)); - BasicFile IndexFile; - IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate); - std::error_code Ec; - IndexFile.WriteAll(IndexBuffer, Ec); - IndexFile.Close(); - m_Entries.clear(); - } - - uint64_t WriteRequest(ZenContentType ContentType, const IoBuffer& RequestBuffer) - { - RwLock::ExclusiveLockScope Lock(m_Lock); - uint64_t RequestIndex = m_Entries.size(); - RecordedRequest& Entry = - m_Entries.emplace_back(RecordedRequest{.Offset = ~0ull, .Length = RequestBuffer.Size(), .ContentType = ContentType}); - if (Entry.Length < 1 * 1024 * 1024) - { - uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize); - if (BlockIndex == m_BlockFiles.size()) - { - std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>()); - NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate); - m_ChunkOffset = BlockIndex * RecordedRequestBlockSize; - } - ZEN_ASSERT(BlockIndex < m_BlockFiles.size()); - BasicFile* BlockFile = m_BlockFiles[BlockIndex].get(); - ZEN_ASSERT(BlockFile != nullptr); - - Entry.Offset = m_ChunkOffset; - m_ChunkOffset = RoundUp(m_ChunkOffset + Entry.Length, 1u << 4u); - Lock.ReleaseNow(); - - std::error_code Ec; - BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), Entry.Offset - BlockIndex * RecordedRequestBlockSize, Ec); - if (Ec) - { - Entry.Length = 0; - return ~0ull; - } - return RequestIndex; - } - Lock.ReleaseNow(); - - BasicFile RequestFile; - RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate); - std::error_code Ec; - RequestFile.WriteAll(RequestBuffer, Ec); - if (Ec) - { - Entry.Length = 0; - return ~0ull; - } - return RequestIndex; - } - - std::filesystem::path m_BasePath; - mutable RwLock m_Lock; - std::vector<RecordedRequest> m_Entries; - std::vector<std::unique_ptr<BasicFile>> m_BlockFiles; - uint64_t m_ChunkOffset; - }; - - struct RecordedRequestsReader - { - uint64_t BeginRead(const std::filesystem::path& BasePath) - { - m_BasePath = BasePath; - BasicFile IndexFile; - IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kRead); - m_Entries.resize(IndexFile.FileSize() / sizeof(RecordedRequest)); - IndexFile.Read(m_Entries.data(), IndexFile.FileSize(), 0); - uint64_t MaxChunkPosition = 0; - for (const RecordedRequest& R : m_Entries) - { - if (R.Offset != ~0ull) - { - MaxChunkPosition = Max(MaxChunkPosition, R.Offset + R.Length); - } - } - uint32_t BlockCount = gsl::narrow<uint32_t>(MaxChunkPosition / RecordedRequestBlockSize) + 1; - m_BlockFiles.resize(BlockCount); - for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; ++BlockIndex) - { - m_BlockFiles[BlockIndex] = std::make_unique<BasicFile>(); - m_BlockFiles[BlockIndex]->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kRead); - } - return m_Entries.size(); - } - void EndRead() { m_BlockFiles.clear(); } - - ZenContentType ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const - { - if (RequestIndex >= m_Entries.size()) - { - return ZenContentType::kUnknownContentType; - } - const RecordedRequest& Entry = m_Entries[RequestIndex]; - if (Entry.Length == 0) - { - return ZenContentType::kUnknownContentType; - } - if (Entry.Offset != ~0ull) - { - uint32_t BlockIndex = gsl::narrow<uint32_t>((Entry.Offset + Entry.Length) / RecordedRequestBlockSize); - uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize); - OutBuffer = IoBuffer(Entry.Length); - MutableMemoryView OutView = OutBuffer.GetMutableView(); - m_BlockFiles[BlockIndex]->Read(OutView.GetData(), OutView.GetSize(), ChunkOffset); - return Entry.ContentType; - } - BasicFile ChunkFile; - ChunkFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kRead); - OutBuffer = ChunkFile.ReadAll(); - return Entry.ContentType; - } - - std::filesystem::path m_BasePath; - std::vector<RecordedRequest> m_Entries; - std::vector<std::unique_ptr<BasicFile>> m_BlockFiles; - }; - - class DiskRequestRecorder : public IRequestRecorder - { - public: - DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); } - virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); } - - private: - virtual uint64_t IRequestRecorder_RecordRequest(ZenContentType ContentType, const IoBuffer& RequestBuffer) override - { - return m_RecordedRequests.WriteRequest(ContentType, RequestBuffer); - } - virtual void IRequestRecorder_RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {} - virtual void IRequestRecorder_RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {} - RecordedRequestsWriter m_RecordedRequests; - }; - - class DiskRequestReplayer : public IRequestReplayer - { - public: - DiskRequestReplayer(const std::filesystem::path& BasePath) { m_RequestCount = m_RequestBuffer.BeginRead(BasePath); } - virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); } - - private: - virtual uint64_t IRequestReplayer_GetRequestCount() const override { return m_RequestCount; } - - virtual ZenContentType IRequestReplayer_GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override - { - return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer); - } - virtual ZenContentType IRequestRecorder_GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; } - - std::uint64_t m_RequestCount; - RecordedRequestsReader m_RequestBuffer; - }; - -} // namespace cache::detail - ////////////////////////////////////////////////////////////////////////// CachePolicy @@ -561,8 +380,8 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) { m_RequestRecorder.reset(); HttpServerRequest::QueryParams Params = Request.GetQueryParams(); - std::string_view RecordPath = Params.GetValue("path"); - m_RequestRecorder = std::make_unique<cache::detail::DiskRequestRecorder>(RecordPath); + std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path"))); + m_RequestRecorder = cache::MakeDiskRequestRecorder(RecordPath); Request.WriteResponse(HttpResponseCode::OK); return; } @@ -576,7 +395,7 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) { m_RequestRecorder.reset(); HttpServerRequest::QueryParams Params = Request.GetQueryParams(); - std::string_view RecordPath = Params.GetValue("path"); + std::string RecordPath = cpr::util::urlDecode(std::string(Params.GetValue("path"))); uint32_t ThreadCount = std::thread::hardware_concurrency(); if (auto Param = Params.GetValue("thread_count"); Param.empty() == false) { @@ -585,8 +404,8 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) ThreadCount = gsl::narrow<uint32_t>(Value.value()); } } - cache::detail::DiskRequestReplayer Replayer(RecordPath); - ReplayRequestRecorder(Replayer, ThreadCount < 1 ? 1 : ThreadCount); + std::unique_ptr<cache::IRpcRequestReplayer> Replayer(cache::MakeDiskRequestReplayer(RecordPath, false)); + ReplayRequestRecorder(*Replayer, ThreadCount < 1 ? 1 : ThreadCount); Request.WriteResponse(HttpResponseCode::OK); return; } @@ -1516,10 +1335,10 @@ HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType, } void -HttpStructuredCacheService::ReplayRequestRecorder(cache::detail::IRequestReplayer& Replayer, uint32_t ThreadCount) +HttpStructuredCacheService::ReplayRequestRecorder(cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount) { WorkerThreadPool WorkerPool(ThreadCount); - uint64_t RequestCount = Replayer.IRequestReplayer_GetRequestCount(); + uint64_t RequestCount = Replayer.GetRequestCount(); Stopwatch Timer; auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); }); Latch JobLatch(RequestCount); @@ -1527,13 +1346,13 @@ HttpStructuredCacheService::ReplayRequestRecorder(cache::detail::IRequestReplaye for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex) { WorkerPool.ScheduleWork([this, &JobLatch, &Replayer, RequestIndex]() { - IoBuffer Body; - ZenContentType ContentType = Replayer.IRequestReplayer_GetRequest(RequestIndex, Body); + IoBuffer Body; + std::pair<ZenContentType, ZenContentType> ContentType = Replayer.GetRequest(RequestIndex, Body); if (Body) { uint32_t AcceptMagic = 0; RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; - CbPackage RpcResult = HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags); + CbPackage RpcResult = HandleRpcRequest(ContentType.first, std::move(Body), AcceptMagic, AcceptFlags); if (AcceptMagic == kCbPkgMagic) { bool AllowFileReferences = EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences); @@ -1579,51 +1398,49 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) return Request.WriteResponse(HttpResponseCode::BadRequest); } - Request.WriteResponseAsync([this, Body = Request.ReadPayload(), ContentType](HttpServerRequest& AsyncRequest) mutable { - std::uint64_t RequestIndex = - m_RequestRecorder ? m_RequestRecorder->IRequestRecorder_RecordRequest(ContentType, Body) : ~0ull; - uint32_t AcceptMagic = 0; - RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; - CbPackage RpcResult = HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags); - if (RpcResult.IsNull()) - { - AsyncRequest.WriteResponse(HttpResponseCode::BadRequest); - return; - } - if (AcceptMagic == kCbPkgMagic) - { - bool AllowFileReferences = EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences); - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer( - RpcResult, - AllowFileReferences ? FormatFlags::kAllowLocalReferences | FormatFlags::kDenyPartialLocalReferences - : FormatFlags::kDefault); - if (RequestIndex != ~0ull) + Request.WriteResponseAsync( + [this, Body = Request.ReadPayload(), ContentType, AcceptType](HttpServerRequest& AsyncRequest) mutable { + std::uint64_t RequestIndex = + m_RequestRecorder ? m_RequestRecorder->RecordRequest(ContentType, AcceptType, Body) : ~0ull; + uint32_t AcceptMagic = 0; + RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; + CbPackage RpcResult = HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags); + if (RpcResult.IsNull()) { - ZEN_ASSERT(m_RequestRecorder); - m_RequestRecorder->IRequestRecorder_RecordResponse(RequestIndex, - HttpContentType::kCbPackage, - RpcResponseBuffer); + AsyncRequest.WriteResponse(HttpResponseCode::BadRequest); + return; } - AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); - } - else - { - BinaryWriter MemStream; - RpcResult.Save(MemStream); - - if (RequestIndex != ~0ull) + if (AcceptMagic == kCbPkgMagic) { - ZEN_ASSERT(m_RequestRecorder); - m_RequestRecorder->IRequestRecorder_RecordResponse( - RequestIndex, - HttpContentType::kCbPackage, - IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + bool AllowFileReferences = EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences); + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer( + RpcResult, + AllowFileReferences ? FormatFlags::kAllowLocalReferences | FormatFlags::kDenyPartialLocalReferences + : FormatFlags::kDefault); + if (RequestIndex != ~0ull) + { + ZEN_ASSERT(m_RequestRecorder); + m_RequestRecorder->RecordResponse(RequestIndex, HttpContentType::kCbPackage, RpcResponseBuffer); + } + AsyncRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); } - AsyncRequest.WriteResponse(HttpResponseCode::OK, - HttpContentType::kCbPackage, - IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); - } - }); + else + { + BinaryWriter MemStream; + RpcResult.Save(MemStream); + + if (RequestIndex != ~0ull) + { + ZEN_ASSERT(m_RequestRecorder); + m_RequestRecorder->RecordResponse(RequestIndex, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + } + AsyncRequest.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); + } + }); } break; default: diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index 6a4cedef0..f606a1cf5 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -28,32 +28,14 @@ class ZenCacheStore; enum class CachePolicy : uint32_t; enum class RpcAcceptOptions : uint16_t; -namespace cache::detail { - struct RecordBody; - struct ChunkRequest; - struct RecordedRequests; - - class IRequestRecorder - { - public: - virtual ~IRequestRecorder() {} - virtual uint64_t IRequestRecorder_RecordRequest(const ZenContentType ContentType, const IoBuffer& RequestBuffer) = 0; - virtual void IRequestRecorder_RecordResponse(uint64_t RequestIndex, - const ZenContentType ContentType, - const IoBuffer& ResponseBuffer) = 0; - virtual void IRequestRecorder_RecordResponse(uint64_t RequestIndex, - const ZenContentType ContentType, - const CompositeBuffer& ResponseBuffer) = 0; - }; - class IRequestReplayer - { - public: - virtual ~IRequestReplayer() {} - virtual uint64_t IRequestReplayer_GetRequestCount() const = 0; - virtual ZenContentType IRequestReplayer_GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) = 0; - virtual ZenContentType IRequestRecorder_GetResponse(uint64_t RequestIndex, IoBuffer& OutBuffer) = 0; - }; -} // namespace cache::detail +namespace cache { + class IRpcRequestReplayer; + class IRpcRequestRecorder; + namespace detail { + struct RecordBody; + struct ChunkRequest; + } // namespace detail +} // namespace cache /** * Structured cache service. Imposes constraints on keys, supports blobs and @@ -185,9 +167,9 @@ private: metrics::OperationTiming m_UpstreamGetRequestTiming; CacheStats m_CacheStats; - void ReplayRequestRecorder(cache::detail::IRequestReplayer& Replayer, uint32_t ThreadCount); + void ReplayRequestRecorder(cache::IRpcRequestReplayer& Replayer, uint32_t ThreadCount); - std::unique_ptr<cache::detail::IRequestRecorder> m_RequestRecorder; + std::unique_ptr<cache::IRpcRequestRecorder> m_RequestRecorder; }; /** Recognize both kBinary and kCompressedBinary as kCompressedBinary for structured cache value keys. diff --git a/zenserver/projectstore/projectstore.cpp b/zenserver/projectstore/projectstore.cpp index 57b24c63d..c3fba7dff 100644 --- a/zenserver/projectstore/projectstore.cpp +++ b/zenserver/projectstore/projectstore.cpp @@ -16,6 +16,7 @@ #include <zenstore/caslog.h> #include <zenstore/cidstore.h> #include <zenstore/scrubcontext.h> +#include <zenutil/cache/rpcrecording.h> #include "fileremoteprojectstore.h" #include "jupiterremoteprojectstore.h" diff --git a/zenutil/cache/rpcrecording.cpp b/zenutil/cache/rpcrecording.cpp new file mode 100644 index 000000000..4958a27f6 --- /dev/null +++ b/zenutil/cache/rpcrecording.cpp @@ -0,0 +1,210 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/basicfile.h> +#include <zenutil/cache/rpcrecording.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <fmt/format.h> +#include <gsl/gsl-lite.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen::cache { +struct RecordedRequest +{ + uint64_t Offset; + uint64_t Length; + ZenContentType ContentType; + ZenContentType AcceptType; +}; + +const uint64_t RecordedRequestBlockSize = 1ull << 31u; + +struct RecordedRequestsWriter +{ + void BeginWrite(const std::filesystem::path& BasePath) + { + m_BasePath = BasePath; + std::filesystem::create_directories(m_BasePath); + } + + void EndWrite() + { + RwLock::ExclusiveLockScope _(m_Lock); + m_BlockFiles.clear(); + + IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest)); + BasicFile IndexFile; + IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate); + std::error_code Ec; + IndexFile.WriteAll(IndexBuffer, Ec); + IndexFile.Close(); + m_Entries.clear(); + } + + uint64_t WriteRequest(ZenContentType ContentType, ZenContentType AcceptType, const IoBuffer& RequestBuffer) + { + RwLock::ExclusiveLockScope Lock(m_Lock); + uint64_t RequestIndex = m_Entries.size(); + RecordedRequest& Entry = m_Entries.emplace_back( + RecordedRequest{.Offset = ~0ull, .Length = RequestBuffer.Size(), .ContentType = ContentType, .AcceptType = AcceptType}); + if (Entry.Length < 1 * 1024 * 1024) + { + uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize); + if (BlockIndex == m_BlockFiles.size()) + { + std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>()); + NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate); + m_ChunkOffset = BlockIndex * RecordedRequestBlockSize; + } + ZEN_ASSERT(BlockIndex < m_BlockFiles.size()); + BasicFile* BlockFile = m_BlockFiles[BlockIndex].get(); + ZEN_ASSERT(BlockFile != nullptr); + + Entry.Offset = m_ChunkOffset; + m_ChunkOffset = RoundUp(m_ChunkOffset + Entry.Length, 1u << 4u); + Lock.ReleaseNow(); + + std::error_code Ec; + BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), Entry.Offset - BlockIndex * RecordedRequestBlockSize, Ec); + if (Ec) + { + Entry.Length = 0; + return ~0ull; + } + return RequestIndex; + } + Lock.ReleaseNow(); + + BasicFile RequestFile; + RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate); + std::error_code Ec; + RequestFile.WriteAll(RequestBuffer, Ec); + if (Ec) + { + Entry.Length = 0; + return ~0ull; + } + return RequestIndex; + } + + std::filesystem::path m_BasePath; + mutable RwLock m_Lock; + std::vector<RecordedRequest> m_Entries; + std::vector<std::unique_ptr<BasicFile>> m_BlockFiles; + uint64_t m_ChunkOffset; +}; + +struct RecordedRequestsReader +{ + uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory) + { + m_BasePath = BasePath; + BasicFile IndexFile; + IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kRead); + m_Entries.resize(IndexFile.FileSize() / sizeof(RecordedRequest)); + IndexFile.Read(m_Entries.data(), IndexFile.FileSize(), 0); + uint64_t MaxChunkPosition = 0; + for (const RecordedRequest& R : m_Entries) + { + if (R.Offset != ~0ull) + { + MaxChunkPosition = Max(MaxChunkPosition, R.Offset + R.Length); + } + } + uint32_t BlockCount = gsl::narrow<uint32_t>(MaxChunkPosition / RecordedRequestBlockSize) + 1; + m_BlockFiles.resize(BlockCount); + for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; ++BlockIndex) + { + if (InMemory) + { + BasicFile Chunk; + Chunk.Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kRead); + m_BlockFiles[BlockIndex] = Chunk.ReadAll(); + continue; + } + m_BlockFiles[BlockIndex] = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("chunks{}.bin", BlockIndex)); + } + return m_Entries.size(); + } + void EndRead() { m_BlockFiles.clear(); } + + std::pair<ZenContentType, ZenContentType> ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const + { + if (RequestIndex >= m_Entries.size()) + { + return {ZenContentType::kUnknownContentType, ZenContentType::kUnknownContentType}; + } + const RecordedRequest& Entry = m_Entries[RequestIndex]; + if (Entry.Length == 0) + { + return {ZenContentType::kUnknownContentType, ZenContentType::kUnknownContentType}; + } + if (Entry.Offset != ~0ull) + { + uint32_t BlockIndex = gsl::narrow<uint32_t>((Entry.Offset + Entry.Length) / RecordedRequestBlockSize); + uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize); + OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length); + return {Entry.ContentType, Entry.AcceptType}; + } + OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex)); + return {Entry.ContentType, Entry.AcceptType}; + } + + std::filesystem::path m_BasePath; + std::vector<RecordedRequest> m_Entries; + std::vector<IoBuffer> m_BlockFiles; +}; + +class DiskRequestRecorder : public IRpcRequestRecorder +{ +public: + DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); } + virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); } + +private: + virtual uint64_t RecordRequest(const ZenContentType ContentType, + const ZenContentType AcceptType, + const IoBuffer& RequestBuffer) override + { + return m_RecordedRequests.WriteRequest(ContentType, AcceptType, RequestBuffer); + } + virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {} + virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {} + RecordedRequestsWriter m_RecordedRequests; +}; + +class DiskRequestReplayer : public IRpcRequestReplayer +{ +public: + DiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory) + { + m_RequestCount = m_RequestBuffer.BeginRead(BasePath, InMemory); + } + virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); } + +private: + virtual uint64_t GetRequestCount() const override { return m_RequestCount; } + + virtual std::pair<ZenContentType, ZenContentType> GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override + { + return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer); + } + virtual ZenContentType GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; } + + std::uint64_t m_RequestCount; + RecordedRequestsReader m_RequestBuffer; +}; + +std::unique_ptr<cache::IRpcRequestRecorder> +MakeDiskRequestRecorder(const std::filesystem::path& BasePath) +{ + return std::make_unique<DiskRequestRecorder>(BasePath); +} + +std::unique_ptr<cache::IRpcRequestReplayer> +MakeDiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory) +{ + return std::make_unique<DiskRequestReplayer>(BasePath, InMemory); +} + +} // namespace zen::cache diff --git a/zenutil/include/zenutil/cache/rpcrecording.h b/zenutil/include/zenutil/cache/rpcrecording.h new file mode 100644 index 000000000..6d65a532a --- /dev/null +++ b/zenutil/include/zenutil/cache/rpcrecording.h @@ -0,0 +1,29 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/compositebuffer.h> +#include <zencore/iobuffer.h> + +namespace zen::cache { +class IRpcRequestRecorder +{ +public: + virtual ~IRpcRequestRecorder() {} + virtual uint64_t RecordRequest(const ZenContentType ContentType, const ZenContentType AcceptType, const IoBuffer& RequestBuffer) = 0; + virtual void RecordResponse(uint64_t RequestIndex, const ZenContentType ContentType, const IoBuffer& ResponseBuffer) = 0; + virtual void RecordResponse(uint64_t RequestIndex, const ZenContentType ContentType, const CompositeBuffer& ResponseBuffer) = 0; +}; +class IRpcRequestReplayer +{ +public: + virtual ~IRpcRequestReplayer() {} + virtual uint64_t GetRequestCount() const = 0; + virtual std::pair<ZenContentType, ZenContentType> GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) = 0; + virtual ZenContentType GetResponse(uint64_t RequestIndex, IoBuffer& OutBuffer) = 0; +}; + +std::unique_ptr<cache::IRpcRequestRecorder> MakeDiskRequestRecorder(const std::filesystem::path& BasePath); +std::unique_ptr<cache::IRpcRequestReplayer> MakeDiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory); + +} // namespace zen::cache |