diff options
Diffstat (limited to 'src/zenutil/cache')
| -rw-r--r-- | src/zenutil/cache/rpcrecording.cpp | 701 |
1 files changed, 680 insertions, 21 deletions
diff --git a/src/zenutil/cache/rpcrecording.cpp b/src/zenutil/cache/rpcrecording.cpp index 4958a27f6..054ac0e56 100644 --- a/src/zenutil/cache/rpcrecording.cpp +++ b/src/zenutil/cache/rpcrecording.cpp @@ -1,5 +1,9 @@ // Copyright Epic Games, Inc. All Rights Reserved. +#include <zencore/compactbinarybuilder.h> +#include <zencore/filesystem.h> +#include <zencore/logging.h> +#include <zencore/system.h> #include <zenutil/basicfile.h> #include <zenutil/cache/rpcrecording.h> @@ -9,6 +13,15 @@ ZEN_THIRD_PARTY_INCLUDES_START ZEN_THIRD_PARTY_INCLUDES_END namespace zen::cache { + +const RecordedRequestInfo RecordedRequestInfo::NullRequest = {.ContentType = ZenContentType::kUnknownContentType, + .AcceptType = ZenContentType::kUnknownContentType, + .SessionId = Oid::Zero}; + +} + +namespace zen::cache::v1 { + struct RecordedRequest { uint64_t Offset; @@ -17,7 +30,8 @@ struct RecordedRequest ZenContentType AcceptType; }; -const uint64_t RecordedRequestBlockSize = 1ull << 31u; +const uint64_t RecordedRequestBlockSize = 1ull << 31u; +const uint64_t StandaloneFileSizeThreshold = 1 * 1024 * 1024ull; struct RecordedRequestsWriter { @@ -32,6 +46,31 @@ struct RecordedRequestsWriter RwLock::ExclusiveLockScope _(m_Lock); m_BlockFiles.clear(); + try + { + // Emit some metadata alongside the recording + + DateTime EndTime = DateTime::Now(); + TimeSpan Duration{EndTime.GetTicks() - m_StartTime.GetTicks()}; + + CbObjectWriter Cbo; + Cbo << "time_start" << m_StartTime << "time_end" << EndTime << "duration" << Duration; + Cbo << "entry_count" << m_Entries.size() << "entry_size" << sizeof(RecordedRequest); + Cbo << "block_size" << RecordedRequestBlockSize << "standalone_threshold" << StandaloneFileSizeThreshold; + + Cbo.BeginObject("system_info"); + Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName(); + Describe(GetSystemMetrics(), Cbo); + Cbo.EndObject(); + CbObject Metadata = Cbo.Save(); + + WriteFile(m_BasePath / "rpc_recording_metadata.zcb", Metadata.GetBuffer().AsIoBuffer()); + } + catch (std::exception& Ex) + { + ZEN_WARN("caught exception while generating metadata for RPC recording: {}", Ex.what()); + } + IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest)); BasicFile IndexFile; IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate); @@ -41,15 +80,18 @@ struct RecordedRequestsWriter m_Entries.clear(); } - uint64_t WriteRequest(ZenContentType ContentType, ZenContentType AcceptType, const IoBuffer& RequestBuffer) + uint64_t WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) { RwLock::ExclusiveLockScope Lock(m_Lock); uint64_t RequestIndex = m_Entries.size(); - RecordedRequest& Entry = m_Entries.emplace_back( - RecordedRequest{.Offset = ~0ull, .Length = RequestBuffer.Size(), .ContentType = ContentType, .AcceptType = AcceptType}); - if (Entry.Length < 1 * 1024 * 1024) + RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0ull, + .Length = RequestBuffer.Size(), + .ContentType = RequestInfo.ContentType, + .AcceptType = RequestInfo.AcceptType}); + + if (Entry.Length < StandaloneFileSizeThreshold) { - uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize); + const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize); if (BlockIndex == m_BlockFiles.size()) { std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>()); @@ -92,6 +134,7 @@ struct RecordedRequestsWriter std::vector<RecordedRequest> m_Entries; std::vector<std::unique_ptr<BasicFile>> m_BlockFiles; uint64_t m_ChunkOffset; + zen::DateTime m_StartTime = DateTime::Now(); }; struct RecordedRequestsReader @@ -128,26 +171,31 @@ struct RecordedRequestsReader } void EndRead() { m_BlockFiles.clear(); } - std::pair<ZenContentType, ZenContentType> ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const + RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const { if (RequestIndex >= m_Entries.size()) { - return {ZenContentType::kUnknownContentType, ZenContentType::kUnknownContentType}; + return RecordedRequestInfo::NullRequest; } + const RecordedRequest& Entry = m_Entries[RequestIndex]; if (Entry.Length == 0) { - return {ZenContentType::kUnknownContentType, ZenContentType::kUnknownContentType}; + return RecordedRequestInfo::NullRequest; } + if (Entry.Offset != ~0ull) { uint32_t BlockIndex = gsl::narrow<uint32_t>((Entry.Offset + Entry.Length) / RecordedRequestBlockSize); uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize); OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length); - return {Entry.ContentType, Entry.AcceptType}; } - OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex)); - return {Entry.ContentType, Entry.AcceptType}; + else + { + OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex)); + } + + return {.ContentType = Entry.ContentType, .AcceptType = Entry.AcceptType, .SessionId = Oid::Zero}; } std::filesystem::path m_BasePath; @@ -162,14 +210,13 @@ public: virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); } private: - virtual uint64_t RecordRequest(const ZenContentType ContentType, - const ZenContentType AcceptType, - const IoBuffer& RequestBuffer) override + virtual uint64_t RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override { - return m_RecordedRequests.WriteRequest(ContentType, AcceptType, RequestBuffer); + return m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer); } - virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {} - virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {} + virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {} + virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {} + RecordedRequestsWriter m_RecordedRequests; }; @@ -185,7 +232,7 @@ public: private: virtual uint64_t GetRequestCount() const override { return m_RequestCount; } - virtual std::pair<ZenContentType, ZenContentType> GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override + virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override { return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer); } @@ -195,16 +242,628 @@ private: RecordedRequestsReader m_RequestBuffer; }; +} // namespace zen::cache::v1 + +////////////////////////////////////////////////////////////////////////// + +namespace zen::cache::v2 { + +using namespace std::literals; + +struct RecordedRequest +{ + uint32_t Offset; // 4 bytes + uint32_t Length; // 4 bytes + ZenContentType ContentType; // 1 byte + ZenContentType AcceptType; // 1 byte + uint8_t Padding; // 1 byte + uint8_t Padding2; // 1 byte + Oid SessionId; // 12 bytes +}; + +static_assert(sizeof(RecordedRequest) == 24); + +const uint64_t RecordedRequestBlockSize = 1 * 1024 * 1024 * 1024; // 1GiB +const uint64_t StandaloneFileSizeThreshold = 1 * 1024 * 1024ull; // 1MiB +const uint64_t SegmentRequestCount = 10 * 1000 * 1000; +const uint64_t LooseFileThreshold = 5000; // Somewhat arbitrary, but we try to keep the + // number of files in a directory below this level + // for performance +const uint64_t SegmentByteThreshold = 16ull * 1024 * 1024 * 1024; +const TimeSpan SegmentTimeThreshold{/* hours */ 1, /* minutes */ 0, /* seconds */ 0}; + +struct RecordedRequestsSegmentWriter +{ + RecordedRequestsSegmentWriter() = default; + ~RecordedRequestsSegmentWriter() = default; + + RecordedRequestsSegmentWriter(const RecordedRequestsSegmentWriter&) = delete; + RecordedRequestsSegmentWriter& operator=(const RecordedRequestsSegmentWriter&) = delete; + + void BeginWrite(const std::filesystem::path& BasePath, uint64_t SegmentIndex, uint64_t RequestBaseIndex); + uint64_t WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer); + void EndWrite(); + + inline uint64_t GetRequestCount() const + { + if (m_RequestCount) + { + return m_RequestCount; + } + + return m_Entries.size(); + } + inline uint64_t GetBaseRequestIndex() const { return m_RequestBaseIndex; } + inline uint64_t GetSegmentIndex() const { return m_SegmentIndex; } + inline uint64_t GetFileCount() const { return m_FileCount; } + inline uint64_t GetRequestsByteCount() const { return m_RequestsByteCount; } + inline DateTime GetStartTime() const { return m_StartTime; } + inline DateTime GetEndTime() const { return m_EndTime; } + +private: + std::filesystem::path m_BasePath; + uint64_t m_SegmentIndex = 0; + uint64_t m_RequestBaseIndex = 0; + uint64_t m_RequestCount = 0; + std::atomic_uint64_t m_FileCount{}; + std::atomic_uint64_t m_RequestsByteCount{}; + mutable RwLock m_Lock; + std::vector<RecordedRequest> m_Entries; + std::vector<std::unique_ptr<BasicFile>> m_BlockFiles; + uint64_t m_ChunkOffset; + DateTime m_StartTime = DateTime::Now(); + DateTime m_EndTime = DateTime::Now(); +}; + +struct RecordedRequestsWriter +{ + void BeginWrite(const std::filesystem::path& BasePath); + RecordedRequestsSegmentWriter& EnsureCurrentSegment(); + void CommitCurrentSegment(RwLock::ExclusiveLockScope&); + void EndWrite(); + uint64_t WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer); + +private: + std::filesystem::path m_BasePath; + zen::DateTime m_StartTime = DateTime::Now(); + + mutable RwLock m_Lock; + std::unique_ptr<RecordedRequestsSegmentWriter> m_CurrentWriter; + std::vector<std::unique_ptr<RecordedRequestsSegmentWriter>> m_FinishedSegments; + std::vector<uint64_t> m_SegmentBaseIndex; + uint64_t m_NextSegmentBaseIndex = 0; +}; + +////////////////////////////////////////////////////////////////////////// + +struct RecordedRequestsSegmentReader +{ + RecordedRequestsSegmentReader() = default; + + RecordedRequestsSegmentReader(const RecordedRequestsSegmentReader&) = delete; + RecordedRequestsSegmentReader& operator=(const RecordedRequestsSegmentReader&) = delete; + + uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory); + void EndRead(); + RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const; + +private: + std::filesystem::path m_BasePath; + std::vector<RecordedRequest> m_Entries; + std::vector<IoBuffer> m_BlockFiles; +}; + +struct RecordedRequestsReader +{ + uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory); + void EndRead(); + RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const; + +private: + struct SegmentInfo + { + uint64_t SegmentIndex; + uint64_t BaseRequestIndex; + uint64_t RequestCount; + uint64_t RequestBytes; + DateTime StartTime{0}; + DateTime EndTime{0}; + }; + + bool m_InMemory = false; + std::filesystem::path m_BasePath; + std::vector<SegmentInfo> m_KnownSegments; + + mutable RwLock m_SegmentLock; + mutable std::vector<std::unique_ptr<RecordedRequestsSegmentReader>> m_SegmentReaders; +}; + +////////////////////////////////////////////////////////////////////////// + +void +RecordedRequestsSegmentWriter::BeginWrite(const std::filesystem::path& BasePath, uint64_t SegmentIndex, uint64_t RequestBaseIndex) +{ + m_BasePath = BasePath; + m_SegmentIndex = SegmentIndex; + m_RequestBaseIndex = RequestBaseIndex; + std::filesystem::create_directories(m_BasePath); +} + +void +RecordedRequestsSegmentWriter::EndWrite() +{ + RwLock::ExclusiveLockScope _(m_Lock); + m_RequestCount = m_Entries.size(); + m_BlockFiles.clear(); + + // Emit some metadata alongside the recording + + try + { + m_EndTime = DateTime::Now(); + TimeSpan Duration{m_EndTime.GetTicks() - m_StartTime.GetTicks()}; + + CbObjectWriter Cbo; + Cbo << "time_start" << m_StartTime << "time_end" << m_EndTime << "duration" << Duration; + Cbo << "segment_index" << m_SegmentIndex; + Cbo << "entry_count" << m_Entries.size() << "entry_size" << sizeof(RecordedRequest); + Cbo << "block_size" << RecordedRequestBlockSize << "standalone_threshold" << StandaloneFileSizeThreshold; + + Cbo.BeginObject("system_info"); + Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName(); + Cbo.EndObject(); + CbObject Metadata = Cbo.Save(); + + WriteFile(m_BasePath / "rpc_segment_info.zcb", Metadata.GetBuffer().AsIoBuffer()); + } + catch (std::exception& Ex) + { + ZEN_WARN("caught exception while writing segment metadata for RPC recording: {}", Ex.what()); + } + + IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest)); + BasicFile IndexFile; + IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate); + std::error_code Ec; + IndexFile.WriteAll(IndexBuffer, Ec); + IndexFile.Close(); + m_Entries.clear(); +} + +uint64_t +RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) +{ + const uint64_t RequestBufferSize = RequestBuffer.GetSize(); + + RwLock::ExclusiveLockScope Lock(m_Lock); + uint64_t RequestIndex = m_Entries.size(); + RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0u, + .Length = uint32_t(RequestBufferSize & 0xffffFFFFu), + .ContentType = RequestInfo.ContentType, + .AcceptType = RequestInfo.AcceptType, + .Padding = 0, + .Padding2 = 0, + .SessionId = RequestInfo.SessionId}); + + if (Entry.Length < StandaloneFileSizeThreshold) + { + const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize); + + if (BlockIndex == m_BlockFiles.size()) + { + std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>()); + NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate); + m_ChunkOffset = BlockIndex * RecordedRequestBlockSize; + ++m_FileCount; + } + + ZEN_ASSERT(BlockIndex < m_BlockFiles.size()); + BasicFile* BlockFile = m_BlockFiles[BlockIndex].get(); + ZEN_ASSERT(BlockFile != nullptr); + + Entry.Offset = uint32_t(m_ChunkOffset & 0xffffFFFF); + m_ChunkOffset = RoundUp(m_ChunkOffset + Entry.Length, 1u << 4u); + Lock.ReleaseNow(); + + std::error_code Ec; + BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), Entry.Offset - BlockIndex * RecordedRequestBlockSize, Ec); + if (Ec) + { + Entry.Length = 0; + return ~0ull; + } + + m_RequestsByteCount.fetch_add(RequestBuffer.GetSize()); + + return RequestIndex; + } + Lock.ReleaseNow(); + + // Write request data to standalone file + + try + { + BasicFile RequestFile; + RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate); + + if (RequestBufferSize > std::numeric_limits<uint32_t>::max()) + { + // The exact value of the entry is not important, we will use + // the size of the standalone file regardless when performing + // the read + Entry.Length = std::numeric_limits<uint32_t>::max(); + } + + ++m_FileCount; + + std::error_code Ec; + RequestFile.WriteAll(RequestBuffer, Ec); + + if (Ec) + { + Entry.Length = 0; + return ~0ull; + } + + m_RequestsByteCount.fetch_add(RequestBuffer.GetSize()); + + return RequestIndex; + } + catch (std::exception&) + { + Entry.Length = 0; + return ~0ull; + } +} + +uint64_t +RecordedRequestsSegmentReader::BeginRead(const std::filesystem::path& BasePath, bool InMemory) +{ + m_BasePath = BasePath; + BasicFile IndexFile; + IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kRead); + m_Entries.resize(IndexFile.FileSize() / sizeof(RecordedRequest)); + IndexFile.Read(m_Entries.data(), IndexFile.FileSize(), 0); + uint64_t MaxChunkPosition = 0; + for (const RecordedRequest& R : m_Entries) + { + if (R.Offset != ~0u) + { + MaxChunkPosition = Max(MaxChunkPosition, R.Offset + R.Length); + } + } + uint32_t BlockCount = gsl::narrow<uint32_t>(MaxChunkPosition / RecordedRequestBlockSize) + 1; + m_BlockFiles.resize(BlockCount); + for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; ++BlockIndex) + { + if (InMemory) + { + BasicFile Chunk; + Chunk.Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kRead); + m_BlockFiles[BlockIndex] = Chunk.ReadAll(); + continue; + } + m_BlockFiles[BlockIndex] = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("chunks{}.bin", BlockIndex)); + } + return m_Entries.size(); +} +void +RecordedRequestsSegmentReader::EndRead() +{ + m_BlockFiles.clear(); +} + +RecordedRequestInfo +RecordedRequestsSegmentReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const +{ + if (RequestIndex >= m_Entries.size()) + { + return RecordedRequestInfo::NullRequest; + } + const RecordedRequest& Entry = m_Entries[RequestIndex]; + if (Entry.Length == 0) + { + return RecordedRequestInfo::NullRequest; + } + + RecordedRequestInfo RequestInfo = {.ContentType = Entry.ContentType, .AcceptType = Entry.AcceptType, .SessionId = Entry.SessionId}; + + if (Entry.Offset != ~0u) + { + // Inline in block file + uint32_t BlockIndex = gsl::narrow<uint32_t>((Entry.Offset + Entry.Length) / RecordedRequestBlockSize); + uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize); + OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length); + + return RequestInfo; + } + + // Standalone file + OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex)); + + return RequestInfo; +} + +void +RecordedRequestsWriter::BeginWrite(const std::filesystem::path& BasePath) +{ + m_BasePath = BasePath; + EnsureCurrentSegment(); +} + +RecordedRequestsSegmentWriter& +RecordedRequestsWriter::EnsureCurrentSegment() +{ + RwLock::ExclusiveLockScope _(m_Lock); + + if (m_CurrentWriter) + { + bool StartNewSegment = false; + + if (m_CurrentWriter->GetRequestCount() >= SegmentRequestCount) + { + ZEN_DEBUG("starting new RPC recording segment due to request count >= {}", SegmentRequestCount); + StartNewSegment = true; + } + else if (m_CurrentWriter->GetFileCount() >= LooseFileThreshold) + { + ZEN_DEBUG("starting new RPC recording segment due to file count >= {}", LooseFileThreshold); + StartNewSegment = true; + } + else if (m_CurrentWriter->GetRequestsByteCount() >= SegmentByteThreshold) + { + ZEN_DEBUG("starting new RPC recording segment due to footprint >= {} bytes", SegmentByteThreshold); + StartNewSegment = true; + } + + if (StartNewSegment) + { + CommitCurrentSegment(_); + } + } + + if (!m_CurrentWriter) + { + const uint64_t SegmentIndex = m_FinishedSegments.size(); + m_CurrentWriter = std::make_unique<RecordedRequestsSegmentWriter>(); + + m_CurrentWriter->BeginWrite(m_BasePath / fmt::format("segment_{:06}", SegmentIndex), SegmentIndex, m_NextSegmentBaseIndex); + } + + return *m_CurrentWriter; +} + +void +RecordedRequestsWriter::CommitCurrentSegment(RwLock::ExclusiveLockScope&) +{ + if (m_CurrentWriter) + { + RecordedRequestsSegmentWriter& Writer = *m_CurrentWriter; + m_FinishedSegments.push_back(std::move(m_CurrentWriter)); + m_SegmentBaseIndex.push_back(m_NextSegmentBaseIndex); + m_NextSegmentBaseIndex += Writer.GetRequestCount(); + + Writer.EndWrite(); + } +} + +void +RecordedRequestsWriter::EndWrite() +{ + RwLock::ExclusiveLockScope _(m_Lock); + + CommitCurrentSegment(_); + + // Emit some metadata alongside the recording + + try + { + DateTime EndTime = DateTime::Now(); + TimeSpan Duration{EndTime.GetTicks() - m_StartTime.GetTicks()}; + + CbObjectWriter Cbo; + Cbo << "time_start" << m_StartTime << "time_end" << EndTime << "duration" << Duration << "format_version" << 2; + + Cbo.BeginObject("system_info"); + Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName(); + Cbo << "segment_count" << m_FinishedSegments.size(); + Cbo << "block_size" << RecordedRequestBlockSize; + Cbo << "standalone_threshold" << StandaloneFileSizeThreshold; + Cbo << "segment_request_count" << SegmentRequestCount; + Cbo << "segment_file_threshold" << LooseFileThreshold; + Cbo << "segment_byte_threshold" << SegmentByteThreshold; + + Describe(GetSystemMetrics(), Cbo); + Cbo.EndObject(); + + Cbo.BeginArray("segments"); + + for (auto& Segment : m_FinishedSegments) + { + Cbo.BeginObject(); + Cbo << "segment" << Segment->GetSegmentIndex(); + Cbo << "base_index" << Segment->GetBaseRequestIndex(); + Cbo << "request_count" << Segment->GetRequestCount(); + Cbo << "request_bytes" << Segment->GetRequestsByteCount(); + Cbo << "start_time" << Segment->GetStartTime(); + Cbo << "end_time" << Segment->GetEndTime(); + Cbo.EndObject(); + } + + Cbo.EndArray(); + + CbObject Metadata = Cbo.Save(); + + WriteFile(m_BasePath / "rpc_recording_info.zcb", Metadata.GetBuffer().AsIoBuffer()); + } + catch (std::exception& Ex) + { + ZEN_WARN("caught exception while writing metadata for RPC recording: {}", Ex.what()); + } +} + +uint64_t +RecordedRequestsWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) +{ + RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment(); + + const uint64_t SegmentLocalIndex = Writer.WriteRequest(RequestInfo, RequestBuffer); + + return Writer.GetBaseRequestIndex() + SegmentLocalIndex; +} + +////////////////////////////////////////////////////////////////////////// + +uint64_t +RecordedRequestsReader::BeginRead(const std::filesystem::path& BasePath, bool InMemory) +{ + m_InMemory = InMemory; + m_BasePath = BasePath; + + BasicFile InfoFile; + InfoFile.Open(m_BasePath / "rpc_recording_info.zcb", BasicFile::Mode::kRead); + CbObject CbInfo = LoadCompactBinaryObject(InfoFile.ReadAll()); + + uint64_t TotalRequestCount = 0; + uint64_t MaxSegmentIndex = 0; + + for (auto SegmentElement : CbInfo["segments"]) + { + CbObjectView Segment = SegmentElement.AsObjectView(); + + const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment"sv].AsUInt64(), + .BaseRequestIndex = Segment["base_index"sv].AsUInt64(), + .RequestCount = Segment["request_count"sv].AsUInt64(), + .RequestBytes = Segment["request_bytes"sv].AsUInt64(), + .StartTime = Segment["start_time"sv].AsDateTime(), + .EndTime = Segment["end_time"sv].AsDateTime()}); + + TotalRequestCount += Info.RequestCount; + MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex); + } + + m_SegmentReaders.resize(MaxSegmentIndex + 1); + + return TotalRequestCount; +} + +void +RecordedRequestsReader::EndRead() +{ + RwLock::ExclusiveLockScope _(m_SegmentLock); + m_KnownSegments.clear(); + m_SegmentReaders.clear(); + m_BasePath.clear(); +} + +RecordedRequestInfo +RecordedRequestsReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const +{ + auto EnsureSegment = [&](uint64_t SegmentIndex) -> const RecordedRequestsSegmentReader& { + { + RwLock::SharedLockScope _(m_SegmentLock); + + if (auto SegmentReaderPtr = m_SegmentReaders[SegmentIndex].get()) + { + return *SegmentReaderPtr; + } + } + + RwLock::ExclusiveLockScope _(m_SegmentLock); + + auto& SegmentReaderPtr = m_SegmentReaders[SegmentIndex]; + + if (!SegmentReaderPtr) + { + RecordedRequestsSegmentReader* NewSegment = new RecordedRequestsSegmentReader; + NewSegment->BeginRead(m_BasePath / fmt::format("segment_{:06}", SegmentIndex), m_InMemory); + SegmentReaderPtr.reset(NewSegment); + } + + return *(SegmentReaderPtr.get()); + }; + + for (auto& Segment : m_KnownSegments) + { + if (Segment.RequestCount > RequestIndex) + { + return EnsureSegment(Segment.SegmentIndex).ReadRequest(RequestIndex, OutBuffer); + } + else + { + RequestIndex -= Segment.RequestCount; + } + } + + return RecordedRequestInfo::NullRequest; +} + +////////////////////////////////////////////////////////////////////////// + +class DiskRequestRecorder : public IRpcRequestRecorder +{ +public: + DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); } + virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); } + +private: + virtual uint64_t RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override + { + return m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer); + } + virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {} + virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {} + + RecordedRequestsWriter m_RecordedRequests; +}; + +class DiskRequestReplayer : public IRpcRequestReplayer +{ +public: + DiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory) + { + m_RequestCount = m_RequestBuffer.BeginRead(BasePath, InMemory); + } + virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); } + + static bool IsCompatible(const std::filesystem::path& BasePath) { return std::filesystem::exists(BasePath / "rpc_recording_info.zcb"); } + +private: + virtual uint64_t GetRequestCount() const override { return m_RequestCount; } + + virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override + { + return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer); + } + virtual ZenContentType GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; } + + std::uint64_t m_RequestCount; + RecordedRequestsReader m_RequestBuffer; +}; + +} // namespace zen::cache::v2 + +////////////////////////////////////////////////////////////////////////// + +namespace zen::cache { + std::unique_ptr<cache::IRpcRequestRecorder> MakeDiskRequestRecorder(const std::filesystem::path& BasePath) { - return std::make_unique<DiskRequestRecorder>(BasePath); + return std::make_unique<v2::DiskRequestRecorder>(BasePath); } std::unique_ptr<cache::IRpcRequestReplayer> MakeDiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory) { - return std::make_unique<DiskRequestReplayer>(BasePath, InMemory); + if (v2::DiskRequestReplayer::IsCompatible(BasePath)) + { + return std::make_unique<v2::DiskRequestReplayer>(BasePath, InMemory); + } + else + { + return std::make_unique<v1::DiskRequestReplayer>(BasePath, InMemory); + } } } // namespace zen::cache |