aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/rpcrecording.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenutil/rpcrecording.cpp')
-rw-r--r--src/zenutil/rpcrecording.cpp1175
1 files changed, 1175 insertions, 0 deletions
diff --git a/src/zenutil/rpcrecording.cpp b/src/zenutil/rpcrecording.cpp
new file mode 100644
index 000000000..54f27dee7
--- /dev/null
+++ b/src/zenutil/rpcrecording.cpp
@@ -0,0 +1,1175 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenutil/rpcrecording.h>
+
+#include <zencore/basicfile.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/session.h>
+#include <zencore/system.h>
+#include <zencore/testing.h>
+#include <zencore/testutils.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <fmt/format.h>
+#include <gsl/gsl-lite.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#include <deque>
+#include <thread>
+
+namespace zen::cache {
+
+const RecordedRequestInfo RecordedRequestInfo::NullRequest = {.ContentType = ZenContentType::kUnknownContentType,
+ .AcceptType = ZenContentType::kUnknownContentType,
+ .SessionId = Oid::Zero};
+
+}
+
+namespace zen::cache::v1 {
+
+struct RecordedRequest
+{
+ uint64_t Offset;
+ uint64_t Length;
+ ZenContentType ContentType;
+ ZenContentType AcceptType;
+};
+
+const uint64_t RecordedRequestBlockSize = 1ull << 31u;
+const uint64_t StandaloneFileSizeThreshold = 1 * 1024 * 1024ull;
+
+struct RecordedRequestsWriter
+{
+ void BeginWrite(const std::filesystem::path& BasePath)
+ {
+ m_BasePath = BasePath;
+ CreateDirectories(m_BasePath);
+ }
+
+ void EndWrite()
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_BlockFiles.clear();
+
+ try
+ {
+ // Emit some metadata alongside the recording
+
+ DateTime EndTime = DateTime::Now();
+ TimeSpan Duration{EndTime.GetTicks() - m_StartTime.GetTicks()};
+
+ CbObjectWriter Cbo;
+ Cbo << "time_start" << m_StartTime << "time_end" << EndTime << "duration" << Duration;
+ Cbo << "entry_count" << m_Entries.size() << "entry_size" << sizeof(RecordedRequest);
+ Cbo << "block_size" << RecordedRequestBlockSize << "standalone_threshold" << StandaloneFileSizeThreshold;
+
+ Cbo.BeginObject("system_info");
+ Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName();
+ Describe(GetSystemMetrics(), Cbo);
+ Cbo.EndObject();
+ CbObject Metadata = Cbo.Save();
+
+ WriteFile(m_BasePath / "rpc_recording_metadata.zcb", Metadata.GetBuffer().AsIoBuffer());
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("caught exception while generating metadata for RPC recording: {}", Ex.what());
+ }
+
+ IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest));
+ BasicFile IndexFile;
+ IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate);
+ std::error_code Ec;
+ IndexFile.WriteAll(IndexBuffer, Ec);
+ IndexFile.Close();
+ m_Entries.clear();
+ }
+
+ void WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer)
+ {
+ RwLock::ExclusiveLockScope Lock(m_Lock);
+ uint64_t RequestIndex = m_Entries.size();
+ RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0ull,
+ .Length = RequestBuffer.Size(),
+ .ContentType = RequestInfo.ContentType,
+ .AcceptType = RequestInfo.AcceptType});
+
+ if (Entry.Length < StandaloneFileSizeThreshold)
+ {
+ const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize);
+ if (BlockIndex == m_BlockFiles.size())
+ {
+ std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>());
+ NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate);
+ m_ChunkOffset = BlockIndex * RecordedRequestBlockSize;
+ }
+ ZEN_ASSERT(BlockIndex < m_BlockFiles.size());
+ BasicFile* BlockFile = m_BlockFiles[BlockIndex].get();
+ ZEN_ASSERT(BlockFile != nullptr);
+
+ Entry.Offset = m_ChunkOffset;
+ m_ChunkOffset = RoundUp(m_ChunkOffset + Entry.Length, 1u << 4u);
+ Lock.ReleaseNow();
+
+ std::error_code Ec;
+ BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), Entry.Offset - BlockIndex * RecordedRequestBlockSize, Ec);
+ if (Ec)
+ {
+ Entry.Length = 0;
+ }
+ }
+ else
+ {
+ Lock.ReleaseNow();
+
+ BasicFile RequestFile;
+ RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate);
+ std::error_code Ec;
+ RequestFile.WriteAll(RequestBuffer, Ec);
+ if (Ec)
+ {
+ Entry.Length = 0;
+ }
+ }
+ }
+
+ std::filesystem::path m_BasePath;
+ mutable RwLock m_Lock;
+ std::vector<RecordedRequest> m_Entries;
+ std::vector<std::unique_ptr<BasicFile>> m_BlockFiles;
+ uint64_t m_ChunkOffset;
+ zen::DateTime m_StartTime = DateTime::Now();
+};
+
+struct RecordedRequestsReader
+{
+ uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory)
+ {
+ m_BasePath = BasePath;
+ BasicFile IndexFile;
+ IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kRead);
+ m_Entries.resize(IndexFile.FileSize() / sizeof(RecordedRequest));
+ IndexFile.Read(m_Entries.data(), IndexFile.FileSize(), 0);
+ uint64_t MaxChunkPosition = 0;
+ for (const RecordedRequest& R : m_Entries)
+ {
+ if (R.Offset != ~0ull)
+ {
+ MaxChunkPosition = Max(MaxChunkPosition, R.Offset + R.Length);
+ }
+ }
+ uint32_t BlockCount = gsl::narrow<uint32_t>(MaxChunkPosition / RecordedRequestBlockSize) + 1;
+ m_BlockFiles.resize(BlockCount);
+ for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; ++BlockIndex)
+ {
+ if (InMemory)
+ {
+ BasicFile Chunk;
+ Chunk.Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kRead);
+ m_BlockFiles[BlockIndex] = Chunk.ReadAll();
+ continue;
+ }
+ m_BlockFiles[BlockIndex] = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("chunks{}.bin", BlockIndex));
+ }
+ return m_Entries.size();
+ }
+ void EndRead() { m_BlockFiles.clear(); }
+
+ RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const
+ {
+ if (RequestIndex >= m_Entries.size())
+ {
+ return RecordedRequestInfo::NullRequest;
+ }
+
+ const RecordedRequest& Entry = m_Entries[RequestIndex];
+ if (Entry.Length == 0)
+ {
+ return RecordedRequestInfo::NullRequest;
+ }
+
+ if (Entry.Offset != ~0ull)
+ {
+ uint32_t BlockIndex = gsl::narrow<uint32_t>((Entry.Offset + Entry.Length) / RecordedRequestBlockSize);
+ uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize);
+ OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length);
+ }
+ else
+ {
+ OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex));
+ }
+
+ return {.ContentType = Entry.ContentType, .AcceptType = Entry.AcceptType, .SessionId = Oid::Zero};
+ }
+
+ std::filesystem::path m_BasePath;
+ std::vector<RecordedRequest> m_Entries;
+ std::vector<IoBuffer> m_BlockFiles;
+};
+
+class DiskRequestRecorder : public IRpcRequestRecorder
+{
+public:
+ DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); }
+ virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); }
+
+private:
+ virtual void RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override
+ {
+ m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer);
+ }
+
+ RecordedRequestsWriter m_RecordedRequests;
+};
+
+class DiskRequestReplayer : public IRpcRequestReplayer
+{
+public:
+ DiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory)
+ {
+ m_RequestCount = m_RequestBuffer.BeginRead(BasePath, InMemory);
+ }
+ virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); }
+
+ virtual uint64_t GetRequestCount() const override { return m_RequestCount; }
+
+ virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override
+ {
+ return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer);
+ }
+
+private:
+ std::uint64_t m_RequestCount;
+ RecordedRequestsReader m_RequestBuffer;
+};
+
+} // namespace zen::cache::v1
+
+//////////////////////////////////////////////////////////////////////////
+
+namespace zen::cache::v2 {
+
+using namespace std::literals;
+
+struct RecordedRequest
+{
+ uint32_t Offset; // 4 bytes
+ uint32_t Length; // 4 bytes
+ ZenContentType ContentType; // 1 byte
+ ZenContentType AcceptType; // 1 byte
+ uint8_t OffsetHigh; // 1 byte
+ uint8_t Padding2; // 1 byte
+ Oid SessionId; // 12 bytes
+
+ inline uint64_t GetOffset() const { return uint64_t(Offset) + (uint64_t(OffsetHigh) << 32); }
+ inline void SetOffset(uint64_t NewOffset)
+ {
+ Offset = gsl::narrow_cast<uint32_t>(NewOffset & 0xffff'ffff);
+ OffsetHigh = gsl::narrow_cast<uint8_t>(NewOffset >> 32);
+ }
+};
+
+static_assert(sizeof(RecordedRequest) == 24);
+
+const uint64_t RecordedRequestBlockSize = 1 * 1024 * 1024 * 1024; // 1GiB
+const uint64_t StandaloneFileSizeThreshold = 16 * 1024 * 1024ull; // 16MiB
+const uint64_t SegmentRequestCount = 10 * 1000 * 1000;
+const uint64_t LooseFileThreshold = 5000; // Somewhat arbitrary, but we try to keep the
+ // number of files in a directory below this level
+ // for performance
+const uint64_t SegmentByteThreshold = 16ull * 1024 * 1024 * 1024;
+const TimeSpan SegmentTimeThreshold{/* hours */ 1, /* minutes */ 0, /* seconds */ 0};
+const int64_t MaximumBacklogCount = 2000;
+
+std::string
+MakeSegmentPath(uint64_t SegmentIndex)
+{
+ return fmt::format("segment_{:06}", SegmentIndex);
+}
+
+struct RecordedRequestsSegmentWriter
+{
+ RecordedRequestsSegmentWriter() = default;
+ ~RecordedRequestsSegmentWriter() = default;
+
+ RecordedRequestsSegmentWriter(const RecordedRequestsSegmentWriter&) = delete;
+ RecordedRequestsSegmentWriter& operator=(const RecordedRequestsSegmentWriter&) = delete;
+
+ void BeginWrite(const std::filesystem::path& BasePath, uint64_t SegmentIndex, uint64_t RequestBaseIndex);
+ void WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer);
+ void EndWrite();
+
+ inline uint64_t GetRequestCount() const
+ {
+ if (m_RequestCount)
+ {
+ return m_RequestCount;
+ }
+
+ RwLock::SharedLockScope _(m_Lock);
+ return m_Entries.size();
+ }
+ inline uint64_t GetBaseRequestIndex() const { return m_RequestBaseIndex; }
+ inline uint64_t GetSegmentIndex() const { return m_SegmentIndex; }
+ inline uint64_t GetFileCount() const { return m_FileCount; }
+ inline uint64_t GetRequestsByteCount() const { return m_RequestsByteCount; }
+ inline DateTime GetStartTime() const { return m_StartTime; }
+ inline DateTime GetEndTime() const { return m_EndTime; }
+
+private:
+ std::filesystem::path m_BasePath;
+ uint64_t m_SegmentIndex = 0;
+ uint64_t m_RequestBaseIndex = 0;
+ uint64_t m_RequestCount = 0;
+ std::atomic_uint64_t m_FileCount{};
+ std::atomic_uint64_t m_RequestsByteCount{};
+ mutable RwLock m_Lock;
+ std::vector<RecordedRequest> m_Entries;
+ std::vector<std::unique_ptr<BasicFile>> m_BlockFiles;
+ uint64_t m_ChunkOffset;
+ DateTime m_StartTime = DateTime::Now();
+ DateTime m_EndTime = DateTime::Now();
+};
+
+struct RecordedRequestsWriter
+{
+ RecordedRequestsWriter();
+ ~RecordedRequestsWriter();
+
+ void BeginWrite(const std::filesystem::path& BasePath);
+ void EndWrite();
+ void WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer);
+
+private:
+ std::filesystem::path m_BasePath;
+ zen::DateTime m_StartTime = DateTime::Now();
+
+ mutable RwLock m_Lock;
+ std::unique_ptr<RecordedRequestsSegmentWriter> m_CurrentWriter;
+ std::vector<std::unique_ptr<RecordedRequestsSegmentWriter>> m_FinishedSegments;
+ std::vector<uint64_t> m_SegmentBaseIndex;
+ uint64_t m_NextSegmentBaseIndex = 0;
+
+ RecordedRequestsSegmentWriter& EnsureCurrentSegment();
+ void CommitCurrentSegment(RwLock::ExclusiveLockScope&);
+ void WriteRecordingMetadata();
+
+ // I/O thread state
+
+ struct QueuedRequest
+ {
+ RecordedRequestInfo RequestInfo;
+ IoBuffer RequestBuffer;
+ };
+
+ std::unique_ptr<std::thread> m_WriterThread;
+ std::atomic_bool m_IsWriterReady{false};
+ std::atomic_bool m_IsActive{false};
+ std::atomic_int64_t m_PendingRequests{0};
+ RwLock m_RequestQueueLock;
+ std::deque<QueuedRequest> m_RequestQueue;
+
+ void WriterThreadMain();
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+struct RecordedRequestsSegmentReader
+{
+ RecordedRequestsSegmentReader() = default;
+
+ RecordedRequestsSegmentReader(const RecordedRequestsSegmentReader&) = delete;
+ RecordedRequestsSegmentReader& operator=(const RecordedRequestsSegmentReader&) = delete;
+
+ uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory);
+ void EndRead();
+ RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const;
+
+private:
+ std::filesystem::path m_BasePath;
+ std::vector<RecordedRequest> m_Entries;
+ std::vector<IoBuffer> m_BlockFiles;
+};
+
+struct RecordedRequestsReader
+{
+ uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory);
+ void EndRead();
+ RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const;
+
+private:
+ struct SegmentInfo
+ {
+ uint64_t SegmentIndex;
+ uint64_t BaseRequestIndex;
+ uint64_t RequestCount;
+ uint64_t RequestBytes;
+ DateTime StartTime{0};
+ DateTime EndTime{0};
+ };
+
+ bool m_InMemory = false;
+ std::filesystem::path m_BasePath;
+ std::vector<SegmentInfo> m_KnownSegments;
+
+ mutable RwLock m_SegmentLock;
+ mutable std::vector<std::unique_ptr<RecordedRequestsSegmentReader>> m_SegmentReaders;
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+void
+RecordedRequestsSegmentWriter::BeginWrite(const std::filesystem::path& BasePath, uint64_t SegmentIndex, uint64_t RequestBaseIndex)
+{
+ m_BasePath = BasePath;
+ m_SegmentIndex = SegmentIndex;
+ m_RequestBaseIndex = RequestBaseIndex;
+ CreateDirectories(m_BasePath);
+}
+
+void
+RecordedRequestsSegmentWriter::EndWrite()
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_RequestCount = m_Entries.size();
+ m_BlockFiles.clear();
+
+ // Emit some metadata alongside the recording
+
+ try
+ {
+ m_EndTime = DateTime::Now();
+ TimeSpan Duration{m_EndTime.GetTicks() - m_StartTime.GetTicks()};
+
+ CbObjectWriter Cbo;
+ Cbo << "time_start" << m_StartTime << "time_end" << m_EndTime << "duration" << Duration;
+ Cbo << "segment_index" << m_SegmentIndex;
+ Cbo << "entry_count" << m_RequestCount << "entry_size" << sizeof(RecordedRequest);
+ Cbo << "block_size" << RecordedRequestBlockSize << "standalone_threshold" << StandaloneFileSizeThreshold;
+
+ Cbo.BeginObject("system_info");
+ Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName();
+ Cbo.EndObject();
+ CbObject Metadata = Cbo.Save();
+
+ WriteFile(m_BasePath / "rpc_segment_info.zcb", Metadata.GetBuffer().AsIoBuffer());
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("caught exception while writing segment metadata for RPC recording: {}", Ex.what());
+ }
+
+ IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest));
+ BasicFile IndexFile;
+ IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate);
+ std::error_code Ec;
+ IndexFile.WriteAll(IndexBuffer, Ec);
+ IndexFile.Close();
+
+ // note that simply calling `.reset()` here will *not* release backing memory
+ // and it's important that we do because on high traffic servers this will use
+ // a lot of memory otherwise
+ std::vector<RecordedRequest> EmptyEntries;
+ swap(m_Entries, EmptyEntries);
+}
+
+void
+RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer)
+{
+ const uint64_t RequestBufferSize = RequestBuffer.GetSize();
+ uint64_t RequestIndex = ~0ull;
+
+ {
+ RwLock::ExclusiveLockScope Lock(m_Lock);
+ RequestIndex = m_Entries.size();
+ RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0u,
+ .Length = uint32_t(RequestBufferSize & 0xffffFFFFu),
+ .ContentType = RequestInfo.ContentType,
+ .AcceptType = RequestInfo.AcceptType,
+ .OffsetHigh = 0,
+ .Padding2 = 0,
+ .SessionId = RequestInfo.SessionId});
+
+ if (Entry.Length < StandaloneFileSizeThreshold)
+ {
+ const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize);
+
+ if (BlockIndex == m_BlockFiles.size())
+ {
+ std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>());
+ NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate);
+ m_ChunkOffset = BlockIndex * RecordedRequestBlockSize;
+ ++m_FileCount;
+ }
+
+ ZEN_ASSERT(BlockIndex < m_BlockFiles.size());
+ BasicFile* BlockFile = m_BlockFiles[BlockIndex].get();
+ ZEN_ASSERT(BlockFile != nullptr);
+
+ // Note that this is the overall logical offset, not the offset within a single file
+ const uint64_t ChunkWriteOffset = m_ChunkOffset;
+ m_ChunkOffset = RoundUp(ChunkWriteOffset + Entry.Length, 1u << 4u);
+ Entry.SetOffset(ChunkWriteOffset);
+ Lock.ReleaseNow();
+
+ std::error_code Ec;
+ BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), ChunkWriteOffset - (BlockIndex * RecordedRequestBlockSize), Ec);
+ if (Ec)
+ {
+ // We cannot simply use `Entry` here because the vector may
+ // have been reallocated causing the entry to be in a different
+ // location
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Entries[RequestIndex].Length = 0;
+ return;
+ }
+
+ m_RequestsByteCount.fetch_add(RequestBuffer.GetSize());
+
+ return;
+ }
+ }
+
+ // Write request data to standalone file
+
+ try
+ {
+ BasicFile RequestFile;
+ RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate);
+
+ if (RequestBufferSize > std::numeric_limits<uint32_t>::max())
+ {
+ // The exact value of the entry is not important, we will use
+ // the size of the standalone file regardless when performing
+ // the read
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Entries[RequestIndex].Length = std::numeric_limits<uint32_t>::max();
+ }
+
+ ++m_FileCount;
+
+ std::error_code Ec;
+ RequestFile.WriteAll(RequestBuffer, Ec);
+
+ if (Ec)
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Entries[RequestIndex].Length = 0;
+ }
+ else
+ {
+ m_RequestsByteCount.fetch_add(RequestBuffer.GetSize());
+ }
+ }
+ catch (const std::exception&)
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Entries[RequestIndex].Length = 0;
+ }
+}
+
+uint64_t
+RecordedRequestsSegmentReader::BeginRead(const std::filesystem::path& BasePath, bool InMemory)
+{
+ m_BasePath = BasePath;
+ BasicFile IndexFile;
+ IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kRead);
+ m_Entries.resize(IndexFile.FileSize() / sizeof(RecordedRequest));
+ IndexFile.Read(m_Entries.data(), IndexFile.FileSize(), 0);
+ uint64_t MaxChunkPosition = 0;
+ for (const RecordedRequest& R : m_Entries)
+ {
+ if (R.Offset != ~0u)
+ {
+ MaxChunkPosition = Max(MaxChunkPosition, R.GetOffset() + R.Length);
+ }
+ }
+ uint32_t BlockCount = gsl::narrow<uint32_t>(MaxChunkPosition / RecordedRequestBlockSize) + 1;
+ m_BlockFiles.resize(BlockCount);
+ for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; ++BlockIndex)
+ {
+ if (InMemory)
+ {
+ BasicFile Chunk;
+ Chunk.Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kRead);
+ m_BlockFiles[BlockIndex] = Chunk.ReadAll();
+ continue;
+ }
+ m_BlockFiles[BlockIndex] = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("chunks{}.bin", BlockIndex));
+ }
+ return m_Entries.size();
+}
+
+void
+RecordedRequestsSegmentReader::EndRead()
+{
+ m_BlockFiles.clear();
+}
+
+RecordedRequestInfo
+RecordedRequestsSegmentReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const
+{
+ if (RequestIndex >= m_Entries.size())
+ {
+ return RecordedRequestInfo::NullRequest;
+ }
+ const RecordedRequest& Entry = m_Entries[RequestIndex];
+ if (Entry.Length == 0)
+ {
+ return RecordedRequestInfo::NullRequest;
+ }
+
+ RecordedRequestInfo RequestInfo = {.ContentType = Entry.ContentType, .AcceptType = Entry.AcceptType, .SessionId = Entry.SessionId};
+
+ if (Entry.Offset != ~0u)
+ {
+ // Inline in block file
+ const uint64_t EntryOffset = Entry.GetOffset();
+ const uint32_t BlockIndex = gsl::narrow<uint32_t>((EntryOffset + Entry.Length) / RecordedRequestBlockSize);
+ const uint64_t ChunkOffset = EntryOffset - (BlockIndex * RecordedRequestBlockSize);
+ OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length);
+
+ return RequestInfo;
+ }
+
+ // Standalone file
+ OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex));
+
+ return RequestInfo;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+RecordedRequestsWriter::RecordedRequestsWriter()
+{
+}
+
+RecordedRequestsWriter::~RecordedRequestsWriter()
+{
+ EndWrite();
+}
+
+void
+RecordedRequestsWriter::BeginWrite(const std::filesystem::path& BasePath)
+{
+ m_BasePath = BasePath;
+ m_IsActive = true;
+
+ m_WriterThread.reset(new std::thread(&RecordedRequestsWriter::WriterThreadMain, this));
+
+ m_IsWriterReady.wait(false);
+}
+
+void
+RecordedRequestsWriter::EndWrite()
+{
+ if (m_WriterThread)
+ {
+ m_IsActive = false;
+ const int64_t PendingCount = m_PendingRequests.fetch_add(1);
+ m_PendingRequests.notify_all();
+
+ if (PendingCount)
+ {
+ ZEN_INFO("waiting for RPC recorder writing thread to drain {} pending items", PendingCount);
+ }
+
+ if (m_WriterThread->joinable())
+ {
+ m_WriterThread->join();
+ }
+
+ m_WriterThread.reset();
+ }
+}
+
+void
+RecordedRequestsWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer)
+{
+ if (m_IsActive)
+ {
+ IoBuffer OwnedRequest = RequestBuffer;
+ OwnedRequest.MakeOwned();
+
+ {
+ RwLock::ExclusiveLockScope _(m_RequestQueueLock);
+ m_RequestQueue.push_back(QueuedRequest{RequestInfo, std::move(OwnedRequest)});
+ m_PendingRequests.fetch_add(1);
+ }
+
+ m_PendingRequests.notify_all();
+ }
+}
+
+void
+RecordedRequestsWriter::WriterThreadMain()
+{
+ SetCurrentThreadName("rpc_writer");
+ EnsureCurrentSegment();
+
+ m_IsWriterReady.store(true);
+ m_IsWriterReady.notify_all();
+
+ while (m_IsActive)
+ {
+ m_PendingRequests.wait(0);
+
+ while (m_PendingRequests)
+ {
+ RwLock::ExclusiveLockScope _(m_RequestQueueLock);
+ if (!m_RequestQueue.empty())
+ {
+ bool DrainBacklog = false;
+
+ do
+ {
+ QueuedRequest Request = m_RequestQueue.front();
+
+ m_RequestQueue.pop_front();
+ m_PendingRequests.fetch_sub(1);
+
+ // For a sufficiently large backlog, keep blocking queueing operations
+ // until we get below the threshold
+ DrainBacklog = m_RequestQueue.size() >= MaximumBacklogCount;
+
+ if (!DrainBacklog)
+ {
+ _.ReleaseNow();
+ }
+
+ try
+ {
+ RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment();
+ Writer.WriteRequest(Request.RequestInfo, Request.RequestBuffer);
+ }
+ catch (const std::exception&)
+ {
+ // TODO: what's the right behaviour here? The most likely cause would
+ // be some I/O error and we probably ought to just shut down recording
+ // at that point
+ }
+ } while (DrainBacklog);
+ }
+ else
+ {
+ // shutdown increments this counter so we need to decrement it
+ // here even though we didn't process any request
+ m_PendingRequests.fetch_sub(1);
+ }
+ }
+ }
+
+ RwLock::ExclusiveLockScope _(m_Lock);
+ CommitCurrentSegment(_);
+ WriteRecordingMetadata();
+}
+
+RecordedRequestsSegmentWriter&
+RecordedRequestsWriter::EnsureCurrentSegment()
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ if (m_CurrentWriter)
+ {
+ bool StartNewSegment = false;
+
+ TimeSpan SegmentAge(DateTime::NowTicks() - m_CurrentWriter->GetStartTime().GetTicks());
+
+ if (m_CurrentWriter->GetRequestCount() >= SegmentRequestCount)
+ {
+ ZEN_DEBUG("starting new RPC recording segment due to request count >= {}", SegmentRequestCount);
+ StartNewSegment = true;
+ }
+ else if (m_CurrentWriter->GetFileCount() >= LooseFileThreshold)
+ {
+ ZEN_DEBUG("starting new RPC recording segment due to file count >= {}", LooseFileThreshold);
+ StartNewSegment = true;
+ }
+ else if (m_CurrentWriter->GetRequestsByteCount() >= SegmentByteThreshold)
+ {
+ ZEN_DEBUG("starting new RPC recording segment due to footprint >= {} bytes", SegmentByteThreshold);
+ StartNewSegment = true;
+ }
+ else if (SegmentAge >= SegmentTimeThreshold)
+ {
+ ZEN_DEBUG("starting new RPC recording segment due to age >= {}",
+ NiceTimeSpanMs(SegmentTimeThreshold.GetTicks() / TimeSpan::TicksPerMillisecond));
+ StartNewSegment = true;
+ }
+
+ if (StartNewSegment)
+ {
+ CommitCurrentSegment(_);
+ }
+ }
+
+ if (!m_CurrentWriter)
+ {
+ const uint64_t SegmentIndex = m_FinishedSegments.size();
+ m_CurrentWriter = std::make_unique<RecordedRequestsSegmentWriter>();
+
+ m_CurrentWriter->BeginWrite(m_BasePath / MakeSegmentPath(SegmentIndex), SegmentIndex, m_NextSegmentBaseIndex);
+ }
+
+ return *m_CurrentWriter;
+}
+
+void
+RecordedRequestsWriter::CommitCurrentSegment(RwLock::ExclusiveLockScope&)
+{
+ if (m_CurrentWriter)
+ {
+ RecordedRequestsSegmentWriter& Writer = *m_CurrentWriter;
+ m_FinishedSegments.push_back(std::move(m_CurrentWriter));
+ m_SegmentBaseIndex.push_back(m_NextSegmentBaseIndex);
+ m_NextSegmentBaseIndex += Writer.GetRequestCount();
+
+ Writer.EndWrite();
+ }
+}
+
+void
+RecordedRequestsWriter::WriteRecordingMetadata()
+{
+ try
+ {
+ DateTime EndTime = DateTime::Now();
+ TimeSpan Duration{EndTime.GetTicks() - m_StartTime.GetTicks()};
+
+ CbObjectWriter Cbo;
+ Cbo << "time_start" << m_StartTime << "time_end" << EndTime << "duration" << Duration << "format_version" << 2;
+
+ Cbo.BeginObject("system_info");
+ Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName();
+ Cbo << "segment_count" << m_FinishedSegments.size();
+ Cbo << "block_size" << RecordedRequestBlockSize;
+ Cbo << "standalone_threshold" << StandaloneFileSizeThreshold;
+ Cbo << "segment_request_count" << SegmentRequestCount;
+ Cbo << "segment_file_threshold" << LooseFileThreshold;
+ Cbo << "segment_byte_threshold" << SegmentByteThreshold;
+
+ Describe(GetSystemMetrics(), Cbo);
+ Cbo.EndObject();
+
+ Cbo.BeginArray("segments");
+
+ for (auto& Segment : m_FinishedSegments)
+ {
+ Cbo.BeginObject();
+ Cbo << "segment" << Segment->GetSegmentIndex();
+ Cbo << "base_index" << Segment->GetBaseRequestIndex();
+ Cbo << "request_count" << Segment->GetRequestCount();
+ Cbo << "request_bytes" << Segment->GetRequestsByteCount();
+ Cbo << "start_time" << Segment->GetStartTime();
+ Cbo << "end_time" << Segment->GetEndTime();
+ Cbo.EndObject();
+ }
+
+ Cbo.EndArray();
+
+ CbObject Metadata = Cbo.Save();
+
+ WriteFile(m_BasePath / "rpc_recording_info.zcb", Metadata.GetBuffer().AsIoBuffer());
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("caught exception while writing metadata for RPC recording: {}", Ex.what());
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+uint64_t
+RecordedRequestsReader::BeginRead(const std::filesystem::path& BasePath, bool InMemory)
+{
+ m_InMemory = InMemory;
+ m_BasePath = BasePath;
+
+ std::error_code Ec;
+ BasicFile InfoFile;
+ InfoFile.Open(m_BasePath / "rpc_recording_info.zcb", BasicFile::Mode::kRead, Ec);
+
+ if (!Ec)
+ {
+ try
+ {
+ CbObject CbInfo = LoadCompactBinaryObject(InfoFile.ReadAll());
+
+ uint64_t TotalRequestCount = 0;
+ uint64_t MaxSegmentIndex = 0;
+
+ for (auto SegmentElement : CbInfo["segments"])
+ {
+ CbObjectView Segment = SegmentElement.AsObjectView();
+
+ const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment"sv].AsUInt64(),
+ .BaseRequestIndex = Segment["base_index"sv].AsUInt64(),
+ .RequestCount = Segment["request_count"sv].AsUInt64(),
+ .RequestBytes = Segment["request_bytes"sv].AsUInt64(),
+ .StartTime = Segment["start_time"sv].AsDateTime(),
+ .EndTime = Segment["end_time"sv].AsDateTime()});
+
+ TotalRequestCount += Info.RequestCount;
+ MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex);
+ }
+
+ m_SegmentReaders.resize(MaxSegmentIndex + 1);
+
+ return TotalRequestCount;
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("could not read metadata file: {}", Ex.what());
+ }
+ }
+
+ ZEN_INFO("recovering segment info for '{}'", BasePath);
+
+ uint64_t TotalRequestCount = 0;
+ uint64_t MaxSegmentIndex = 0;
+
+ try
+ {
+ for (int SegmentIndex = 0;; ++SegmentIndex)
+ {
+ const std::filesystem::path ZcbPath = BasePath / MakeSegmentPath(SegmentIndex) / "rpc_segment_info.zcb";
+ FileContents Fc = ReadFile(ZcbPath);
+
+ if (Fc.ErrorCode)
+ break;
+
+ if (IoBuffer SegmentInfoBuffer = Fc.Flatten())
+ {
+ CbObject Segment = LoadCompactBinaryObject(SegmentInfoBuffer);
+
+ const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment_index"sv].AsUInt64(),
+ .BaseRequestIndex = 0,
+ .RequestCount = Segment["entry_count"sv].AsUInt64(),
+ .RequestBytes = 0,
+ .StartTime = Segment["time_start"sv].AsDateTime(),
+ .EndTime = Segment["time_end"sv].AsDateTime()});
+
+ TotalRequestCount += Info.RequestCount;
+ MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex);
+ }
+ }
+ }
+ catch (const std::exception&)
+ {
+ }
+
+ std::sort(begin(m_KnownSegments), end(m_KnownSegments), [](const auto& Lhs, const auto& Rhs) {
+ return Lhs.SegmentIndex < Rhs.SegmentIndex;
+ });
+
+ uint64_t SegmentRequestOffset = 0;
+
+ for (SegmentInfo& Info : m_KnownSegments)
+ {
+ Info.BaseRequestIndex = SegmentRequestOffset;
+ SegmentRequestOffset += Info.RequestCount;
+ }
+
+ m_SegmentReaders.resize(MaxSegmentIndex + 1);
+
+ return TotalRequestCount;
+}
+
+void
+RecordedRequestsReader::EndRead()
+{
+ RwLock::ExclusiveLockScope _(m_SegmentLock);
+ m_KnownSegments.clear();
+ m_SegmentReaders.clear();
+ m_BasePath.clear();
+}
+
+RecordedRequestInfo
+RecordedRequestsReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const
+{
+ auto EnsureSegment = [&](uint64_t SegmentIndex) -> const RecordedRequestsSegmentReader& {
+ {
+ RwLock::SharedLockScope _(m_SegmentLock);
+
+ if (auto SegmentReaderPtr = m_SegmentReaders[SegmentIndex].get())
+ {
+ return *SegmentReaderPtr;
+ }
+ }
+
+ RwLock::ExclusiveLockScope _(m_SegmentLock);
+
+ auto& SegmentReaderPtr = m_SegmentReaders[SegmentIndex];
+
+ if (!SegmentReaderPtr)
+ {
+ RecordedRequestsSegmentReader* NewSegment = new RecordedRequestsSegmentReader;
+ NewSegment->BeginRead(m_BasePath / MakeSegmentPath(SegmentIndex), m_InMemory);
+ SegmentReaderPtr.reset(NewSegment);
+ }
+
+ return *(SegmentReaderPtr.get());
+ };
+
+ for (auto& Segment : m_KnownSegments)
+ {
+ if (Segment.RequestCount > RequestIndex)
+ {
+ return EnsureSegment(Segment.SegmentIndex).ReadRequest(RequestIndex, OutBuffer);
+ }
+ else
+ {
+ RequestIndex -= Segment.RequestCount;
+ }
+ }
+
+ return RecordedRequestInfo::NullRequest;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+class DiskRequestRecorder : public IRpcRequestRecorder
+{
+public:
+ DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); }
+ virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); }
+
+ virtual void RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override
+ {
+ m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer);
+ }
+
+private:
+ RecordedRequestsWriter m_RecordedRequests;
+};
+
+class DiskRequestReplayer : public IRpcRequestReplayer
+{
+public:
+ DiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory)
+ {
+ m_RequestCount = m_RequestReader.BeginRead(BasePath, InMemory);
+ }
+ virtual ~DiskRequestReplayer() { m_RequestReader.EndRead(); }
+
+ static bool IsCompatible(const std::filesystem::path& BasePath)
+ {
+ if (IsFile(BasePath / "rpc_recording_info.zcb"))
+ {
+ return true;
+ }
+
+ const std::filesystem::path SegmentZero = BasePath / MakeSegmentPath(0);
+
+ if (IsFile(SegmentZero / "rpc_segment_info.zcb") && IsFile(SegmentZero / "index.bin"))
+ {
+ // top-level metadata is missing, possibly because of premature exit
+ // on the recording side
+
+ return true;
+ }
+
+ return false;
+ }
+
+ virtual uint64_t GetRequestCount() const override { return m_RequestCount; }
+
+ virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override
+ {
+ return m_RequestReader.ReadRequest(RequestIndex, OutBuffer);
+ }
+
+private:
+ std::uint64_t m_RequestCount;
+ RecordedRequestsReader m_RequestReader;
+};
+
+} // namespace zen::cache::v2
+
+//////////////////////////////////////////////////////////////////////////
+
+namespace zen::cache {
+
+std::unique_ptr<cache::IRpcRequestRecorder>
+MakeDiskRequestRecorder(const std::filesystem::path& BasePath)
+{
+ return std::make_unique<v2::DiskRequestRecorder>(BasePath);
+}
+
+std::unique_ptr<cache::IRpcRequestReplayer>
+MakeDiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory)
+{
+ if (v2::DiskRequestReplayer::IsCompatible(BasePath))
+ {
+ return std::make_unique<v2::DiskRequestReplayer>(BasePath, InMemory);
+ }
+ else
+ {
+ return std::make_unique<v1::DiskRequestReplayer>(BasePath, InMemory);
+ }
+}
+
+#if ZEN_WITH_TESTS
+
+void
+rpcrecord_forcelink()
+{
+}
+
+TEST_SUITE_BEGIN("rpc.recording");
+
+TEST_CASE("rpc.record")
+{
+ ScopedTemporaryDirectory TempDir;
+ auto Path = TempDir.Path();
+
+ const Oid SessionId = GetSessionId();
+
+ using namespace std::literals;
+
+ {
+ cache::v2::DiskRequestRecorder Recorder{Path};
+
+ for (int i = 0; i < 1000; ++i)
+ {
+ RecordedRequestInfo RequestInfo{.ContentType = ZenContentType::kCbObject,
+ .AcceptType = ZenContentType::kCbObject,
+ .SessionId = SessionId};
+
+ CbObjectWriter RequestPayload;
+ RequestPayload << "test"sv << true;
+ RequestPayload << "index"sv << i;
+ CbObject Req = RequestPayload.Save();
+ IoBuffer RequestBuffer = Req.GetBuffer().AsIoBuffer();
+
+ Recorder.RecordRequest(RequestInfo, RequestBuffer);
+ }
+ }
+
+ {
+ cache::v2::DiskRequestReplayer Replayer{Path, false};
+
+ for (int i = 0; i < 1000; ++i)
+ {
+ IoBuffer RequestBuffer;
+ RecordedRequestInfo RequestInfo = Replayer.GetRequest(i, RequestBuffer);
+
+ CHECK(RequestInfo.AcceptType == ZenContentType::kCbObject);
+ CHECK(RequestInfo.ContentType == ZenContentType::kCbObject);
+ CHECK(RequestInfo.SessionId == SessionId);
+
+ CbObject Req = LoadCompactBinaryObject(RequestBuffer);
+ CHECK_EQ(Req["index"sv].AsInt32(), i);
+ CHECK_EQ(Req["test"sv].AsBool(), true);
+ }
+ }
+}
+
+TEST_SUITE_END();
+
+#endif
+
+} // namespace zen::cache