// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include #include ZEN_THIRD_PARTY_INCLUDES_END namespace zen::cache { const RecordedRequestInfo RecordedRequestInfo::NullRequest = {.ContentType = ZenContentType::kUnknownContentType, .AcceptType = ZenContentType::kUnknownContentType, .SessionId = Oid::Zero}; } namespace zen::cache::v1 { struct RecordedRequest { uint64_t Offset; uint64_t Length; ZenContentType ContentType; ZenContentType AcceptType; }; const uint64_t RecordedRequestBlockSize = 1ull << 31u; const uint64_t StandaloneFileSizeThreshold = 1 * 1024 * 1024ull; struct RecordedRequestsWriter { void BeginWrite(const std::filesystem::path& BasePath) { m_BasePath = BasePath; std::filesystem::create_directories(m_BasePath); } void EndWrite() { RwLock::ExclusiveLockScope _(m_Lock); m_BlockFiles.clear(); try { // Emit some metadata alongside the recording DateTime EndTime = DateTime::Now(); TimeSpan Duration{EndTime.GetTicks() - m_StartTime.GetTicks()}; CbObjectWriter Cbo; Cbo << "time_start" << m_StartTime << "time_end" << EndTime << "duration" << Duration; Cbo << "entry_count" << m_Entries.size() << "entry_size" << sizeof(RecordedRequest); Cbo << "block_size" << RecordedRequestBlockSize << "standalone_threshold" << StandaloneFileSizeThreshold; Cbo.BeginObject("system_info"); Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName(); Describe(GetSystemMetrics(), Cbo); Cbo.EndObject(); CbObject Metadata = Cbo.Save(); WriteFile(m_BasePath / "rpc_recording_metadata.zcb", Metadata.GetBuffer().AsIoBuffer()); } catch (std::exception& Ex) { ZEN_WARN("caught exception while generating metadata for RPC recording: {}", Ex.what()); } IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest)); BasicFile IndexFile; IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate); std::error_code Ec; IndexFile.WriteAll(IndexBuffer, Ec); IndexFile.Close(); m_Entries.clear(); } uint64_t WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) { RwLock::ExclusiveLockScope Lock(m_Lock); uint64_t RequestIndex = m_Entries.size(); RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0ull, .Length = RequestBuffer.Size(), .ContentType = RequestInfo.ContentType, .AcceptType = RequestInfo.AcceptType}); if (Entry.Length < StandaloneFileSizeThreshold) { const uint32_t BlockIndex = gsl::narrow((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize); if (BlockIndex == m_BlockFiles.size()) { std::unique_ptr& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique()); NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate); m_ChunkOffset = BlockIndex * RecordedRequestBlockSize; } ZEN_ASSERT(BlockIndex < m_BlockFiles.size()); BasicFile* BlockFile = m_BlockFiles[BlockIndex].get(); ZEN_ASSERT(BlockFile != nullptr); Entry.Offset = m_ChunkOffset; m_ChunkOffset = RoundUp(m_ChunkOffset + Entry.Length, 1u << 4u); Lock.ReleaseNow(); std::error_code Ec; BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), Entry.Offset - BlockIndex * RecordedRequestBlockSize, Ec); if (Ec) { Entry.Length = 0; return ~0ull; } return RequestIndex; } Lock.ReleaseNow(); BasicFile RequestFile; RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate); std::error_code Ec; RequestFile.WriteAll(RequestBuffer, Ec); if (Ec) { Entry.Length = 0; return ~0ull; } return RequestIndex; } std::filesystem::path m_BasePath; mutable RwLock m_Lock; std::vector m_Entries; std::vector> m_BlockFiles; uint64_t m_ChunkOffset; zen::DateTime m_StartTime = DateTime::Now(); }; struct RecordedRequestsReader { uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory) { m_BasePath = BasePath; BasicFile IndexFile; IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kRead); m_Entries.resize(IndexFile.FileSize() / sizeof(RecordedRequest)); IndexFile.Read(m_Entries.data(), IndexFile.FileSize(), 0); uint64_t MaxChunkPosition = 0; for (const RecordedRequest& R : m_Entries) { if (R.Offset != ~0ull) { MaxChunkPosition = Max(MaxChunkPosition, R.Offset + R.Length); } } uint32_t BlockCount = gsl::narrow(MaxChunkPosition / RecordedRequestBlockSize) + 1; m_BlockFiles.resize(BlockCount); for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; ++BlockIndex) { if (InMemory) { BasicFile Chunk; Chunk.Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kRead); m_BlockFiles[BlockIndex] = Chunk.ReadAll(); continue; } m_BlockFiles[BlockIndex] = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("chunks{}.bin", BlockIndex)); } return m_Entries.size(); } void EndRead() { m_BlockFiles.clear(); } RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const { if (RequestIndex >= m_Entries.size()) { return RecordedRequestInfo::NullRequest; } const RecordedRequest& Entry = m_Entries[RequestIndex]; if (Entry.Length == 0) { return RecordedRequestInfo::NullRequest; } if (Entry.Offset != ~0ull) { uint32_t BlockIndex = gsl::narrow((Entry.Offset + Entry.Length) / RecordedRequestBlockSize); uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize); OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length); } else { OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex)); } return {.ContentType = Entry.ContentType, .AcceptType = Entry.AcceptType, .SessionId = Oid::Zero}; } std::filesystem::path m_BasePath; std::vector m_Entries; std::vector m_BlockFiles; }; class DiskRequestRecorder : public IRpcRequestRecorder { public: DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); } virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); } private: virtual uint64_t RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override { return m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer); } virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {} virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {} RecordedRequestsWriter m_RecordedRequests; }; class DiskRequestReplayer : public IRpcRequestReplayer { public: DiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory) { m_RequestCount = m_RequestBuffer.BeginRead(BasePath, InMemory); } virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); } private: virtual uint64_t GetRequestCount() const override { return m_RequestCount; } virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override { return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer); } virtual ZenContentType GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; } std::uint64_t m_RequestCount; RecordedRequestsReader m_RequestBuffer; }; } // namespace zen::cache::v1 ////////////////////////////////////////////////////////////////////////// namespace zen::cache::v2 { using namespace std::literals; struct RecordedRequest { uint32_t Offset; // 4 bytes uint32_t Length; // 4 bytes ZenContentType ContentType; // 1 byte ZenContentType AcceptType; // 1 byte uint8_t Padding; // 1 byte uint8_t Padding2; // 1 byte Oid SessionId; // 12 bytes }; static_assert(sizeof(RecordedRequest) == 24); const uint64_t RecordedRequestBlockSize = 1 * 1024 * 1024 * 1024; // 1GiB const uint64_t StandaloneFileSizeThreshold = 1 * 1024 * 1024ull; // 1MiB const uint64_t SegmentRequestCount = 10 * 1000 * 1000; const uint64_t LooseFileThreshold = 5000; // Somewhat arbitrary, but we try to keep the // number of files in a directory below this level // for performance const uint64_t SegmentByteThreshold = 16ull * 1024 * 1024 * 1024; const TimeSpan SegmentTimeThreshold{/* hours */ 1, /* minutes */ 0, /* seconds */ 0}; struct RecordedRequestsSegmentWriter { RecordedRequestsSegmentWriter() = default; ~RecordedRequestsSegmentWriter() = default; RecordedRequestsSegmentWriter(const RecordedRequestsSegmentWriter&) = delete; RecordedRequestsSegmentWriter& operator=(const RecordedRequestsSegmentWriter&) = delete; void BeginWrite(const std::filesystem::path& BasePath, uint64_t SegmentIndex, uint64_t RequestBaseIndex); uint64_t WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer); void EndWrite(); inline uint64_t GetRequestCount() const { if (m_RequestCount) { return m_RequestCount; } return m_Entries.size(); } inline uint64_t GetBaseRequestIndex() const { return m_RequestBaseIndex; } inline uint64_t GetSegmentIndex() const { return m_SegmentIndex; } inline uint64_t GetFileCount() const { return m_FileCount; } inline uint64_t GetRequestsByteCount() const { return m_RequestsByteCount; } inline DateTime GetStartTime() const { return m_StartTime; } inline DateTime GetEndTime() const { return m_EndTime; } private: std::filesystem::path m_BasePath; uint64_t m_SegmentIndex = 0; uint64_t m_RequestBaseIndex = 0; uint64_t m_RequestCount = 0; std::atomic_uint64_t m_FileCount{}; std::atomic_uint64_t m_RequestsByteCount{}; mutable RwLock m_Lock; std::vector m_Entries; std::vector> m_BlockFiles; uint64_t m_ChunkOffset; DateTime m_StartTime = DateTime::Now(); DateTime m_EndTime = DateTime::Now(); }; struct RecordedRequestsWriter { void BeginWrite(const std::filesystem::path& BasePath); RecordedRequestsSegmentWriter& EnsureCurrentSegment(); void CommitCurrentSegment(RwLock::ExclusiveLockScope&); void EndWrite(); uint64_t WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer); private: std::filesystem::path m_BasePath; zen::DateTime m_StartTime = DateTime::Now(); mutable RwLock m_Lock; std::unique_ptr m_CurrentWriter; std::vector> m_FinishedSegments; std::vector m_SegmentBaseIndex; uint64_t m_NextSegmentBaseIndex = 0; }; ////////////////////////////////////////////////////////////////////////// struct RecordedRequestsSegmentReader { RecordedRequestsSegmentReader() = default; RecordedRequestsSegmentReader(const RecordedRequestsSegmentReader&) = delete; RecordedRequestsSegmentReader& operator=(const RecordedRequestsSegmentReader&) = delete; uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory); void EndRead(); RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const; private: std::filesystem::path m_BasePath; std::vector m_Entries; std::vector m_BlockFiles; }; struct RecordedRequestsReader { uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory); void EndRead(); RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const; private: struct SegmentInfo { uint64_t SegmentIndex; uint64_t BaseRequestIndex; uint64_t RequestCount; uint64_t RequestBytes; DateTime StartTime{0}; DateTime EndTime{0}; }; bool m_InMemory = false; std::filesystem::path m_BasePath; std::vector m_KnownSegments; mutable RwLock m_SegmentLock; mutable std::vector> m_SegmentReaders; }; ////////////////////////////////////////////////////////////////////////// void RecordedRequestsSegmentWriter::BeginWrite(const std::filesystem::path& BasePath, uint64_t SegmentIndex, uint64_t RequestBaseIndex) { m_BasePath = BasePath; m_SegmentIndex = SegmentIndex; m_RequestBaseIndex = RequestBaseIndex; std::filesystem::create_directories(m_BasePath); } void RecordedRequestsSegmentWriter::EndWrite() { RwLock::ExclusiveLockScope _(m_Lock); m_RequestCount = m_Entries.size(); m_BlockFiles.clear(); // Emit some metadata alongside the recording try { m_EndTime = DateTime::Now(); TimeSpan Duration{m_EndTime.GetTicks() - m_StartTime.GetTicks()}; CbObjectWriter Cbo; Cbo << "time_start" << m_StartTime << "time_end" << m_EndTime << "duration" << Duration; Cbo << "segment_index" << m_SegmentIndex; Cbo << "entry_count" << m_Entries.size() << "entry_size" << sizeof(RecordedRequest); Cbo << "block_size" << RecordedRequestBlockSize << "standalone_threshold" << StandaloneFileSizeThreshold; Cbo.BeginObject("system_info"); Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName(); Cbo.EndObject(); CbObject Metadata = Cbo.Save(); WriteFile(m_BasePath / "rpc_segment_info.zcb", Metadata.GetBuffer().AsIoBuffer()); } catch (std::exception& Ex) { ZEN_WARN("caught exception while writing segment metadata for RPC recording: {}", Ex.what()); } IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest)); BasicFile IndexFile; IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate); std::error_code Ec; IndexFile.WriteAll(IndexBuffer, Ec); IndexFile.Close(); m_Entries.clear(); } uint64_t RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) { const uint64_t RequestBufferSize = RequestBuffer.GetSize(); RwLock::ExclusiveLockScope Lock(m_Lock); uint64_t RequestIndex = m_Entries.size(); RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0u, .Length = uint32_t(RequestBufferSize & 0xffffFFFFu), .ContentType = RequestInfo.ContentType, .AcceptType = RequestInfo.AcceptType, .Padding = 0, .Padding2 = 0, .SessionId = RequestInfo.SessionId}); if (Entry.Length < StandaloneFileSizeThreshold) { const uint32_t BlockIndex = gsl::narrow((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize); if (BlockIndex == m_BlockFiles.size()) { std::unique_ptr& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique()); NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate); m_ChunkOffset = BlockIndex * RecordedRequestBlockSize; ++m_FileCount; } ZEN_ASSERT(BlockIndex < m_BlockFiles.size()); BasicFile* BlockFile = m_BlockFiles[BlockIndex].get(); ZEN_ASSERT(BlockFile != nullptr); Entry.Offset = uint32_t(m_ChunkOffset & 0xffffFFFF); m_ChunkOffset = RoundUp(m_ChunkOffset + Entry.Length, 1u << 4u); Lock.ReleaseNow(); std::error_code Ec; BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), Entry.Offset - BlockIndex * RecordedRequestBlockSize, Ec); if (Ec) { Entry.Length = 0; return ~0ull; } m_RequestsByteCount.fetch_add(RequestBuffer.GetSize()); return RequestIndex; } Lock.ReleaseNow(); // Write request data to standalone file try { BasicFile RequestFile; RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate); if (RequestBufferSize > std::numeric_limits::max()) { // The exact value of the entry is not important, we will use // the size of the standalone file regardless when performing // the read Entry.Length = std::numeric_limits::max(); } ++m_FileCount; std::error_code Ec; RequestFile.WriteAll(RequestBuffer, Ec); if (Ec) { Entry.Length = 0; return ~0ull; } m_RequestsByteCount.fetch_add(RequestBuffer.GetSize()); return RequestIndex; } catch (std::exception&) { Entry.Length = 0; return ~0ull; } } uint64_t RecordedRequestsSegmentReader::BeginRead(const std::filesystem::path& BasePath, bool InMemory) { m_BasePath = BasePath; BasicFile IndexFile; IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kRead); m_Entries.resize(IndexFile.FileSize() / sizeof(RecordedRequest)); IndexFile.Read(m_Entries.data(), IndexFile.FileSize(), 0); uint64_t MaxChunkPosition = 0; for (const RecordedRequest& R : m_Entries) { if (R.Offset != ~0u) { MaxChunkPosition = Max(MaxChunkPosition, R.Offset + R.Length); } } uint32_t BlockCount = gsl::narrow(MaxChunkPosition / RecordedRequestBlockSize) + 1; m_BlockFiles.resize(BlockCount); for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; ++BlockIndex) { if (InMemory) { BasicFile Chunk; Chunk.Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kRead); m_BlockFiles[BlockIndex] = Chunk.ReadAll(); continue; } m_BlockFiles[BlockIndex] = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("chunks{}.bin", BlockIndex)); } return m_Entries.size(); } void RecordedRequestsSegmentReader::EndRead() { m_BlockFiles.clear(); } RecordedRequestInfo RecordedRequestsSegmentReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const { if (RequestIndex >= m_Entries.size()) { return RecordedRequestInfo::NullRequest; } const RecordedRequest& Entry = m_Entries[RequestIndex]; if (Entry.Length == 0) { return RecordedRequestInfo::NullRequest; } RecordedRequestInfo RequestInfo = {.ContentType = Entry.ContentType, .AcceptType = Entry.AcceptType, .SessionId = Entry.SessionId}; if (Entry.Offset != ~0u) { // Inline in block file uint32_t BlockIndex = gsl::narrow((Entry.Offset + Entry.Length) / RecordedRequestBlockSize); uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize); OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length); return RequestInfo; } // Standalone file OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex)); return RequestInfo; } void RecordedRequestsWriter::BeginWrite(const std::filesystem::path& BasePath) { m_BasePath = BasePath; EnsureCurrentSegment(); } RecordedRequestsSegmentWriter& RecordedRequestsWriter::EnsureCurrentSegment() { RwLock::ExclusiveLockScope _(m_Lock); if (m_CurrentWriter) { bool StartNewSegment = false; if (m_CurrentWriter->GetRequestCount() >= SegmentRequestCount) { ZEN_DEBUG("starting new RPC recording segment due to request count >= {}", SegmentRequestCount); StartNewSegment = true; } else if (m_CurrentWriter->GetFileCount() >= LooseFileThreshold) { ZEN_DEBUG("starting new RPC recording segment due to file count >= {}", LooseFileThreshold); StartNewSegment = true; } else if (m_CurrentWriter->GetRequestsByteCount() >= SegmentByteThreshold) { ZEN_DEBUG("starting new RPC recording segment due to footprint >= {} bytes", SegmentByteThreshold); StartNewSegment = true; } if (StartNewSegment) { CommitCurrentSegment(_); } } if (!m_CurrentWriter) { const uint64_t SegmentIndex = m_FinishedSegments.size(); m_CurrentWriter = std::make_unique(); m_CurrentWriter->BeginWrite(m_BasePath / fmt::format("segment_{:06}", SegmentIndex), SegmentIndex, m_NextSegmentBaseIndex); } return *m_CurrentWriter; } void RecordedRequestsWriter::CommitCurrentSegment(RwLock::ExclusiveLockScope&) { if (m_CurrentWriter) { RecordedRequestsSegmentWriter& Writer = *m_CurrentWriter; m_FinishedSegments.push_back(std::move(m_CurrentWriter)); m_SegmentBaseIndex.push_back(m_NextSegmentBaseIndex); m_NextSegmentBaseIndex += Writer.GetRequestCount(); Writer.EndWrite(); } } void RecordedRequestsWriter::EndWrite() { RwLock::ExclusiveLockScope _(m_Lock); CommitCurrentSegment(_); // Emit some metadata alongside the recording try { DateTime EndTime = DateTime::Now(); TimeSpan Duration{EndTime.GetTicks() - m_StartTime.GetTicks()}; CbObjectWriter Cbo; Cbo << "time_start" << m_StartTime << "time_end" << EndTime << "duration" << Duration << "format_version" << 2; Cbo.BeginObject("system_info"); Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName(); Cbo << "segment_count" << m_FinishedSegments.size(); Cbo << "block_size" << RecordedRequestBlockSize; Cbo << "standalone_threshold" << StandaloneFileSizeThreshold; Cbo << "segment_request_count" << SegmentRequestCount; Cbo << "segment_file_threshold" << LooseFileThreshold; Cbo << "segment_byte_threshold" << SegmentByteThreshold; Describe(GetSystemMetrics(), Cbo); Cbo.EndObject(); Cbo.BeginArray("segments"); for (auto& Segment : m_FinishedSegments) { Cbo.BeginObject(); Cbo << "segment" << Segment->GetSegmentIndex(); Cbo << "base_index" << Segment->GetBaseRequestIndex(); Cbo << "request_count" << Segment->GetRequestCount(); Cbo << "request_bytes" << Segment->GetRequestsByteCount(); Cbo << "start_time" << Segment->GetStartTime(); Cbo << "end_time" << Segment->GetEndTime(); Cbo.EndObject(); } Cbo.EndArray(); CbObject Metadata = Cbo.Save(); WriteFile(m_BasePath / "rpc_recording_info.zcb", Metadata.GetBuffer().AsIoBuffer()); } catch (std::exception& Ex) { ZEN_WARN("caught exception while writing metadata for RPC recording: {}", Ex.what()); } } uint64_t RecordedRequestsWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) { RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment(); const uint64_t SegmentLocalIndex = Writer.WriteRequest(RequestInfo, RequestBuffer); return Writer.GetBaseRequestIndex() + SegmentLocalIndex; } ////////////////////////////////////////////////////////////////////////// uint64_t RecordedRequestsReader::BeginRead(const std::filesystem::path& BasePath, bool InMemory) { m_InMemory = InMemory; m_BasePath = BasePath; BasicFile InfoFile; InfoFile.Open(m_BasePath / "rpc_recording_info.zcb", BasicFile::Mode::kRead); CbObject CbInfo = LoadCompactBinaryObject(InfoFile.ReadAll()); uint64_t TotalRequestCount = 0; uint64_t MaxSegmentIndex = 0; for (auto SegmentElement : CbInfo["segments"]) { CbObjectView Segment = SegmentElement.AsObjectView(); const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment"sv].AsUInt64(), .BaseRequestIndex = Segment["base_index"sv].AsUInt64(), .RequestCount = Segment["request_count"sv].AsUInt64(), .RequestBytes = Segment["request_bytes"sv].AsUInt64(), .StartTime = Segment["start_time"sv].AsDateTime(), .EndTime = Segment["end_time"sv].AsDateTime()}); TotalRequestCount += Info.RequestCount; MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex); } m_SegmentReaders.resize(MaxSegmentIndex + 1); return TotalRequestCount; } void RecordedRequestsReader::EndRead() { RwLock::ExclusiveLockScope _(m_SegmentLock); m_KnownSegments.clear(); m_SegmentReaders.clear(); m_BasePath.clear(); } RecordedRequestInfo RecordedRequestsReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const { auto EnsureSegment = [&](uint64_t SegmentIndex) -> const RecordedRequestsSegmentReader& { { RwLock::SharedLockScope _(m_SegmentLock); if (auto SegmentReaderPtr = m_SegmentReaders[SegmentIndex].get()) { return *SegmentReaderPtr; } } RwLock::ExclusiveLockScope _(m_SegmentLock); auto& SegmentReaderPtr = m_SegmentReaders[SegmentIndex]; if (!SegmentReaderPtr) { RecordedRequestsSegmentReader* NewSegment = new RecordedRequestsSegmentReader; NewSegment->BeginRead(m_BasePath / fmt::format("segment_{:06}", SegmentIndex), m_InMemory); SegmentReaderPtr.reset(NewSegment); } return *(SegmentReaderPtr.get()); }; for (auto& Segment : m_KnownSegments) { if (Segment.RequestCount > RequestIndex) { return EnsureSegment(Segment.SegmentIndex).ReadRequest(RequestIndex, OutBuffer); } else { RequestIndex -= Segment.RequestCount; } } return RecordedRequestInfo::NullRequest; } ////////////////////////////////////////////////////////////////////////// class DiskRequestRecorder : public IRpcRequestRecorder { public: DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); } virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); } private: virtual uint64_t RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override { return m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer); } virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {} virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {} RecordedRequestsWriter m_RecordedRequests; }; class DiskRequestReplayer : public IRpcRequestReplayer { public: DiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory) { m_RequestCount = m_RequestBuffer.BeginRead(BasePath, InMemory); } virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); } static bool IsCompatible(const std::filesystem::path& BasePath) { return std::filesystem::exists(BasePath / "rpc_recording_info.zcb"); } private: virtual uint64_t GetRequestCount() const override { return m_RequestCount; } virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override { return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer); } virtual ZenContentType GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; } std::uint64_t m_RequestCount; RecordedRequestsReader m_RequestBuffer; }; } // namespace zen::cache::v2 ////////////////////////////////////////////////////////////////////////// namespace zen::cache { std::unique_ptr MakeDiskRequestRecorder(const std::filesystem::path& BasePath) { return std::make_unique(BasePath); } std::unique_ptr MakeDiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory) { if (v2::DiskRequestReplayer::IsCompatible(BasePath)) { return std::make_unique(BasePath, InMemory); } else { return std::make_unique(BasePath, InMemory); } } } // namespace zen::cache