aboutsummaryrefslogtreecommitdiff
path: root/zenutil/cache
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-02-17 08:20:46 +0000
committerGitHub <[email protected]>2023-02-17 00:20:46 -0800
commit2ec65fd3e171145450416ae76be1a7dd8c646704 (patch)
tree13bdb7e5deb60d4e58be2714cfdcf7f70272bc1e /zenutil/cache
parentExperimental ObjectStore/CDN like endpoint (diff)
downloadzen-2ec65fd3e171145450416ae76be1a7dd8c646704.tar.xz
zen-2ec65fd3e171145450416ae76be1a7dd8c646704.zip
Enhanced rpc request recording (#229)
* rpc replay zen command * fix replay sessions for thread * recording start/stop as zen commands * move rpcrecording code to zenutil to remove code duplication * simplify recording http request threading * added more data logging to rpc replay * NotFound is an acceptable response for an rpc request * fix rpc replay command line parsing * rpc replay stats * Allow spawning of sub-process workers when replaying rpc recording * changelog
Diffstat (limited to 'zenutil/cache')
-rw-r--r--zenutil/cache/rpcrecording.cpp210
1 files changed, 210 insertions, 0 deletions
diff --git a/zenutil/cache/rpcrecording.cpp b/zenutil/cache/rpcrecording.cpp
new file mode 100644
index 000000000..4958a27f6
--- /dev/null
+++ b/zenutil/cache/rpcrecording.cpp
@@ -0,0 +1,210 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenutil/basicfile.h>
+#include <zenutil/cache/rpcrecording.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <fmt/format.h>
+#include <gsl/gsl-lite.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen::cache {
+struct RecordedRequest
+{
+ uint64_t Offset;
+ uint64_t Length;
+ ZenContentType ContentType;
+ ZenContentType AcceptType;
+};
+
+const uint64_t RecordedRequestBlockSize = 1ull << 31u;
+
+struct RecordedRequestsWriter
+{
+ void BeginWrite(const std::filesystem::path& BasePath)
+ {
+ m_BasePath = BasePath;
+ std::filesystem::create_directories(m_BasePath);
+ }
+
+ void EndWrite()
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_BlockFiles.clear();
+
+ IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest));
+ BasicFile IndexFile;
+ IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate);
+ std::error_code Ec;
+ IndexFile.WriteAll(IndexBuffer, Ec);
+ IndexFile.Close();
+ m_Entries.clear();
+ }
+
+ uint64_t WriteRequest(ZenContentType ContentType, ZenContentType AcceptType, const IoBuffer& RequestBuffer)
+ {
+ RwLock::ExclusiveLockScope Lock(m_Lock);
+ uint64_t RequestIndex = m_Entries.size();
+ RecordedRequest& Entry = m_Entries.emplace_back(
+ RecordedRequest{.Offset = ~0ull, .Length = RequestBuffer.Size(), .ContentType = ContentType, .AcceptType = AcceptType});
+ if (Entry.Length < 1 * 1024 * 1024)
+ {
+ uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize);
+ if (BlockIndex == m_BlockFiles.size())
+ {
+ std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>());
+ NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate);
+ m_ChunkOffset = BlockIndex * RecordedRequestBlockSize;
+ }
+ ZEN_ASSERT(BlockIndex < m_BlockFiles.size());
+ BasicFile* BlockFile = m_BlockFiles[BlockIndex].get();
+ ZEN_ASSERT(BlockFile != nullptr);
+
+ Entry.Offset = m_ChunkOffset;
+ m_ChunkOffset = RoundUp(m_ChunkOffset + Entry.Length, 1u << 4u);
+ Lock.ReleaseNow();
+
+ std::error_code Ec;
+ BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), Entry.Offset - BlockIndex * RecordedRequestBlockSize, Ec);
+ if (Ec)
+ {
+ Entry.Length = 0;
+ return ~0ull;
+ }
+ return RequestIndex;
+ }
+ Lock.ReleaseNow();
+
+ BasicFile RequestFile;
+ RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate);
+ std::error_code Ec;
+ RequestFile.WriteAll(RequestBuffer, Ec);
+ if (Ec)
+ {
+ Entry.Length = 0;
+ return ~0ull;
+ }
+ return RequestIndex;
+ }
+
+ std::filesystem::path m_BasePath;
+ mutable RwLock m_Lock;
+ std::vector<RecordedRequest> m_Entries;
+ std::vector<std::unique_ptr<BasicFile>> m_BlockFiles;
+ uint64_t m_ChunkOffset;
+};
+
+struct RecordedRequestsReader
+{
+ uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory)
+ {
+ m_BasePath = BasePath;
+ BasicFile IndexFile;
+ IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kRead);
+ m_Entries.resize(IndexFile.FileSize() / sizeof(RecordedRequest));
+ IndexFile.Read(m_Entries.data(), IndexFile.FileSize(), 0);
+ uint64_t MaxChunkPosition = 0;
+ for (const RecordedRequest& R : m_Entries)
+ {
+ if (R.Offset != ~0ull)
+ {
+ MaxChunkPosition = Max(MaxChunkPosition, R.Offset + R.Length);
+ }
+ }
+ uint32_t BlockCount = gsl::narrow<uint32_t>(MaxChunkPosition / RecordedRequestBlockSize) + 1;
+ m_BlockFiles.resize(BlockCount);
+ for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; ++BlockIndex)
+ {
+ if (InMemory)
+ {
+ BasicFile Chunk;
+ Chunk.Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kRead);
+ m_BlockFiles[BlockIndex] = Chunk.ReadAll();
+ continue;
+ }
+ m_BlockFiles[BlockIndex] = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("chunks{}.bin", BlockIndex));
+ }
+ return m_Entries.size();
+ }
+ void EndRead() { m_BlockFiles.clear(); }
+
+ std::pair<ZenContentType, ZenContentType> ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const
+ {
+ if (RequestIndex >= m_Entries.size())
+ {
+ return {ZenContentType::kUnknownContentType, ZenContentType::kUnknownContentType};
+ }
+ const RecordedRequest& Entry = m_Entries[RequestIndex];
+ if (Entry.Length == 0)
+ {
+ return {ZenContentType::kUnknownContentType, ZenContentType::kUnknownContentType};
+ }
+ if (Entry.Offset != ~0ull)
+ {
+ uint32_t BlockIndex = gsl::narrow<uint32_t>((Entry.Offset + Entry.Length) / RecordedRequestBlockSize);
+ uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize);
+ OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length);
+ return {Entry.ContentType, Entry.AcceptType};
+ }
+ OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex));
+ return {Entry.ContentType, Entry.AcceptType};
+ }
+
+ std::filesystem::path m_BasePath;
+ std::vector<RecordedRequest> m_Entries;
+ std::vector<IoBuffer> m_BlockFiles;
+};
+
+class DiskRequestRecorder : public IRpcRequestRecorder
+{
+public:
+ DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); }
+ virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); }
+
+private:
+ virtual uint64_t RecordRequest(const ZenContentType ContentType,
+ const ZenContentType AcceptType,
+ const IoBuffer& RequestBuffer) override
+ {
+ return m_RecordedRequests.WriteRequest(ContentType, AcceptType, RequestBuffer);
+ }
+ virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {}
+ virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {}
+ RecordedRequestsWriter m_RecordedRequests;
+};
+
+class DiskRequestReplayer : public IRpcRequestReplayer
+{
+public:
+ DiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory)
+ {
+ m_RequestCount = m_RequestBuffer.BeginRead(BasePath, InMemory);
+ }
+ virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); }
+
+private:
+ virtual uint64_t GetRequestCount() const override { return m_RequestCount; }
+
+ virtual std::pair<ZenContentType, ZenContentType> GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override
+ {
+ return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer);
+ }
+ virtual ZenContentType GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; }
+
+ std::uint64_t m_RequestCount;
+ RecordedRequestsReader m_RequestBuffer;
+};
+
+std::unique_ptr<cache::IRpcRequestRecorder>
+MakeDiskRequestRecorder(const std::filesystem::path& BasePath)
+{
+ return std::make_unique<DiskRequestRecorder>(BasePath);
+}
+
+std::unique_ptr<cache::IRpcRequestReplayer>
+MakeDiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory)
+{
+ return std::make_unique<DiskRequestReplayer>(BasePath, InMemory);
+}
+
+} // namespace zen::cache