diff options
| author | Dan Engelbrecht <[email protected]> | 2023-02-17 08:20:46 +0000 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-02-17 00:20:46 -0800 |
| commit | 2ec65fd3e171145450416ae76be1a7dd8c646704 (patch) | |
| tree | 13bdb7e5deb60d4e58be2714cfdcf7f70272bc1e /zenutil/cache | |
| parent | Experimental ObjectStore/CDN like endpoint (diff) | |
| download | zen-2ec65fd3e171145450416ae76be1a7dd8c646704.tar.xz zen-2ec65fd3e171145450416ae76be1a7dd8c646704.zip | |
Enhanced rpc request recording (#229)
* rpc replay zen command
* fix replay sessions for thread
* recording start/stop as zen commands
* move rpcrecording code to zenutil to remove code duplication
* simplify recording http request threading
* added more data logging to rpc replay
* NotFound is an acceptable response for an rpc request
* fix rpc replay command line parsing
* rpc replay stats
* Allow spawning of sub-process workers when replaying rpc recording
* changelog
Diffstat (limited to 'zenutil/cache')
| -rw-r--r-- | zenutil/cache/rpcrecording.cpp | 210 |
1 files changed, 210 insertions, 0 deletions
diff --git a/zenutil/cache/rpcrecording.cpp b/zenutil/cache/rpcrecording.cpp new file mode 100644 index 000000000..4958a27f6 --- /dev/null +++ b/zenutil/cache/rpcrecording.cpp @@ -0,0 +1,210 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/basicfile.h> +#include <zenutil/cache/rpcrecording.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <fmt/format.h> +#include <gsl/gsl-lite.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen::cache { +struct RecordedRequest +{ + uint64_t Offset; + uint64_t Length; + ZenContentType ContentType; + ZenContentType AcceptType; +}; + +const uint64_t RecordedRequestBlockSize = 1ull << 31u; + +struct RecordedRequestsWriter +{ + void BeginWrite(const std::filesystem::path& BasePath) + { + m_BasePath = BasePath; + std::filesystem::create_directories(m_BasePath); + } + + void EndWrite() + { + RwLock::ExclusiveLockScope _(m_Lock); + m_BlockFiles.clear(); + + IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest)); + BasicFile IndexFile; + IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate); + std::error_code Ec; + IndexFile.WriteAll(IndexBuffer, Ec); + IndexFile.Close(); + m_Entries.clear(); + } + + uint64_t WriteRequest(ZenContentType ContentType, ZenContentType AcceptType, const IoBuffer& RequestBuffer) + { + RwLock::ExclusiveLockScope Lock(m_Lock); + uint64_t RequestIndex = m_Entries.size(); + RecordedRequest& Entry = m_Entries.emplace_back( + RecordedRequest{.Offset = ~0ull, .Length = RequestBuffer.Size(), .ContentType = ContentType, .AcceptType = AcceptType}); + if (Entry.Length < 1 * 1024 * 1024) + { + uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize); + if (BlockIndex == m_BlockFiles.size()) + { + std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>()); + NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate); + m_ChunkOffset = BlockIndex * RecordedRequestBlockSize; + } + ZEN_ASSERT(BlockIndex < m_BlockFiles.size()); + BasicFile* BlockFile = m_BlockFiles[BlockIndex].get(); + ZEN_ASSERT(BlockFile != nullptr); + + Entry.Offset = m_ChunkOffset; + m_ChunkOffset = RoundUp(m_ChunkOffset + Entry.Length, 1u << 4u); + Lock.ReleaseNow(); + + std::error_code Ec; + BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), Entry.Offset - BlockIndex * RecordedRequestBlockSize, Ec); + if (Ec) + { + Entry.Length = 0; + return ~0ull; + } + return RequestIndex; + } + Lock.ReleaseNow(); + + BasicFile RequestFile; + RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate); + std::error_code Ec; + RequestFile.WriteAll(RequestBuffer, Ec); + if (Ec) + { + Entry.Length = 0; + return ~0ull; + } + return RequestIndex; + } + + std::filesystem::path m_BasePath; + mutable RwLock m_Lock; + std::vector<RecordedRequest> m_Entries; + std::vector<std::unique_ptr<BasicFile>> m_BlockFiles; + uint64_t m_ChunkOffset; +}; + +struct RecordedRequestsReader +{ + uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory) + { + m_BasePath = BasePath; + BasicFile IndexFile; + IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kRead); + m_Entries.resize(IndexFile.FileSize() / sizeof(RecordedRequest)); + IndexFile.Read(m_Entries.data(), IndexFile.FileSize(), 0); + uint64_t MaxChunkPosition = 0; + for (const RecordedRequest& R : m_Entries) + { + if (R.Offset != ~0ull) + { + MaxChunkPosition = Max(MaxChunkPosition, R.Offset + R.Length); + } + } + uint32_t BlockCount = gsl::narrow<uint32_t>(MaxChunkPosition / RecordedRequestBlockSize) + 1; + m_BlockFiles.resize(BlockCount); + for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; ++BlockIndex) + { + if (InMemory) + { + BasicFile Chunk; + Chunk.Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kRead); + m_BlockFiles[BlockIndex] = Chunk.ReadAll(); + continue; + } + m_BlockFiles[BlockIndex] = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("chunks{}.bin", BlockIndex)); + } + return m_Entries.size(); + } + void EndRead() { m_BlockFiles.clear(); } + + std::pair<ZenContentType, ZenContentType> ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const + { + if (RequestIndex >= m_Entries.size()) + { + return {ZenContentType::kUnknownContentType, ZenContentType::kUnknownContentType}; + } + const RecordedRequest& Entry = m_Entries[RequestIndex]; + if (Entry.Length == 0) + { + return {ZenContentType::kUnknownContentType, ZenContentType::kUnknownContentType}; + } + if (Entry.Offset != ~0ull) + { + uint32_t BlockIndex = gsl::narrow<uint32_t>((Entry.Offset + Entry.Length) / RecordedRequestBlockSize); + uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize); + OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length); + return {Entry.ContentType, Entry.AcceptType}; + } + OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex)); + return {Entry.ContentType, Entry.AcceptType}; + } + + std::filesystem::path m_BasePath; + std::vector<RecordedRequest> m_Entries; + std::vector<IoBuffer> m_BlockFiles; +}; + +class DiskRequestRecorder : public IRpcRequestRecorder +{ +public: + DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); } + virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); } + +private: + virtual uint64_t RecordRequest(const ZenContentType ContentType, + const ZenContentType AcceptType, + const IoBuffer& RequestBuffer) override + { + return m_RecordedRequests.WriteRequest(ContentType, AcceptType, RequestBuffer); + } + virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {} + virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {} + RecordedRequestsWriter m_RecordedRequests; +}; + +class DiskRequestReplayer : public IRpcRequestReplayer +{ +public: + DiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory) + { + m_RequestCount = m_RequestBuffer.BeginRead(BasePath, InMemory); + } + virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); } + +private: + virtual uint64_t GetRequestCount() const override { return m_RequestCount; } + + virtual std::pair<ZenContentType, ZenContentType> GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override + { + return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer); + } + virtual ZenContentType GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; } + + std::uint64_t m_RequestCount; + RecordedRequestsReader m_RequestBuffer; +}; + +std::unique_ptr<cache::IRpcRequestRecorder> +MakeDiskRequestRecorder(const std::filesystem::path& BasePath) +{ + return std::make_unique<DiskRequestRecorder>(BasePath); +} + +std::unique_ptr<cache::IRpcRequestReplayer> +MakeDiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory) +{ + return std::make_unique<DiskRequestReplayer>(BasePath, InMemory); +} + +} // namespace zen::cache |