// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include #include ZEN_THIRD_PARTY_INCLUDES_END #include #include namespace zen::cache { const RecordedRequestInfo RecordedRequestInfo::NullRequest = {.ContentType = ZenContentType::kUnknownContentType, .AcceptType = ZenContentType::kUnknownContentType, .SessionId = Oid::Zero}; } namespace zen::cache::v1 { struct RecordedRequest { uint64_t Offset; uint64_t Length; ZenContentType ContentType; ZenContentType AcceptType; }; const uint64_t RecordedRequestBlockSize = 1ull << 31u; const uint64_t StandaloneFileSizeThreshold = 1 * 1024 * 1024ull; struct RecordedRequestsWriter { void BeginWrite(const std::filesystem::path& BasePath) { m_BasePath = BasePath; std::filesystem::create_directories(m_BasePath); } void EndWrite() { RwLock::ExclusiveLockScope _(m_Lock); m_BlockFiles.clear(); try { // Emit some metadata alongside the recording DateTime EndTime = DateTime::Now(); TimeSpan Duration{EndTime.GetTicks() - m_StartTime.GetTicks()}; CbObjectWriter Cbo; Cbo << "time_start" << m_StartTime << "time_end" << EndTime << "duration" << Duration; Cbo << "entry_count" << m_Entries.size() << "entry_size" << sizeof(RecordedRequest); Cbo << "block_size" << RecordedRequestBlockSize << "standalone_threshold" << StandaloneFileSizeThreshold; Cbo.BeginObject("system_info"); Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName(); Describe(GetSystemMetrics(), Cbo); Cbo.EndObject(); CbObject Metadata = Cbo.Save(); WriteFile(m_BasePath / "rpc_recording_metadata.zcb", Metadata.GetBuffer().AsIoBuffer()); } catch (const std::exception& Ex) { ZEN_WARN("caught exception while generating metadata for RPC recording: {}", Ex.what()); } IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest)); BasicFile IndexFile; IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate); std::error_code Ec; IndexFile.WriteAll(IndexBuffer, Ec); IndexFile.Close(); m_Entries.clear(); } void WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) { RwLock::ExclusiveLockScope Lock(m_Lock); uint64_t RequestIndex = m_Entries.size(); RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0ull, .Length = RequestBuffer.Size(), .ContentType = RequestInfo.ContentType, .AcceptType = RequestInfo.AcceptType}); if (Entry.Length < StandaloneFileSizeThreshold) { const uint32_t BlockIndex = gsl::narrow((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize); if (BlockIndex == m_BlockFiles.size()) { std::unique_ptr& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique()); NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate); m_ChunkOffset = BlockIndex * RecordedRequestBlockSize; } ZEN_ASSERT(BlockIndex < m_BlockFiles.size()); BasicFile* BlockFile = m_BlockFiles[BlockIndex].get(); ZEN_ASSERT(BlockFile != nullptr); Entry.Offset = m_ChunkOffset; m_ChunkOffset = RoundUp(m_ChunkOffset + Entry.Length, 1u << 4u); Lock.ReleaseNow(); std::error_code Ec; BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), Entry.Offset - BlockIndex * RecordedRequestBlockSize, Ec); if (Ec) { Entry.Length = 0; } } else { Lock.ReleaseNow(); BasicFile RequestFile; RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate); std::error_code Ec; RequestFile.WriteAll(RequestBuffer, Ec); if (Ec) { Entry.Length = 0; } } } std::filesystem::path m_BasePath; mutable RwLock m_Lock; std::vector m_Entries; std::vector> m_BlockFiles; uint64_t m_ChunkOffset; zen::DateTime m_StartTime = DateTime::Now(); }; struct RecordedRequestsReader { uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory) { m_BasePath = BasePath; BasicFile IndexFile; IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kRead); m_Entries.resize(IndexFile.FileSize() / sizeof(RecordedRequest)); IndexFile.Read(m_Entries.data(), IndexFile.FileSize(), 0); uint64_t MaxChunkPosition = 0; for (const RecordedRequest& R : m_Entries) { if (R.Offset != ~0ull) { MaxChunkPosition = Max(MaxChunkPosition, R.Offset + R.Length); } } uint32_t BlockCount = gsl::narrow(MaxChunkPosition / RecordedRequestBlockSize) + 1; m_BlockFiles.resize(BlockCount); for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; ++BlockIndex) { if (InMemory) { BasicFile Chunk; Chunk.Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kRead); m_BlockFiles[BlockIndex] = Chunk.ReadAll(); continue; } m_BlockFiles[BlockIndex] = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("chunks{}.bin", BlockIndex)); } return m_Entries.size(); } void EndRead() { m_BlockFiles.clear(); } RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const { if (RequestIndex >= m_Entries.size()) { return RecordedRequestInfo::NullRequest; } const RecordedRequest& Entry = m_Entries[RequestIndex]; if (Entry.Length == 0) { return RecordedRequestInfo::NullRequest; } if (Entry.Offset != ~0ull) { uint32_t BlockIndex = gsl::narrow((Entry.Offset + Entry.Length) / RecordedRequestBlockSize); uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize); OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length); } else { OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex)); } return {.ContentType = Entry.ContentType, .AcceptType = Entry.AcceptType, .SessionId = Oid::Zero}; } std::filesystem::path m_BasePath; std::vector m_Entries; std::vector m_BlockFiles; }; class DiskRequestRecorder : public IRpcRequestRecorder { public: DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); } virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); } private: virtual void RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override { m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer); } RecordedRequestsWriter m_RecordedRequests; }; class DiskRequestReplayer : public IRpcRequestReplayer { public: DiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory) { m_RequestCount = m_RequestBuffer.BeginRead(BasePath, InMemory); } virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); } virtual uint64_t GetRequestCount() const override { return m_RequestCount; } virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override { return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer); } private: std::uint64_t m_RequestCount; RecordedRequestsReader m_RequestBuffer; }; } // namespace zen::cache::v1 ////////////////////////////////////////////////////////////////////////// namespace zen::cache::v2 { using namespace std::literals; struct RecordedRequest { uint32_t Offset; // 4 bytes uint32_t Length; // 4 bytes ZenContentType ContentType; // 1 byte ZenContentType AcceptType; // 1 byte uint8_t OffsetHigh; // 1 byte uint8_t Padding2; // 1 byte Oid SessionId; // 12 bytes inline uint64_t GetOffset() const { return uint64_t(Offset) + (uint64_t(OffsetHigh) << 32); } inline void SetOffset(uint64_t NewOffset) { Offset = gsl::narrow_cast(NewOffset & 0xffff'ffff); OffsetHigh = gsl::narrow_cast(NewOffset >> 32); } }; static_assert(sizeof(RecordedRequest) == 24); const uint64_t RecordedRequestBlockSize = 1 * 1024 * 1024 * 1024; // 1GiB const uint64_t StandaloneFileSizeThreshold = 16 * 1024 * 1024ull; // 16MiB const uint64_t SegmentRequestCount = 10 * 1000 * 1000; const uint64_t LooseFileThreshold = 5000; // Somewhat arbitrary, but we try to keep the // number of files in a directory below this level // for performance const uint64_t SegmentByteThreshold = 16ull * 1024 * 1024 * 1024; const TimeSpan SegmentTimeThreshold{/* hours */ 1, /* minutes */ 0, /* seconds */ 0}; const int64_t MaximumBacklogCount = 2000; std::string MakeSegmentPath(uint64_t SegmentIndex) { return fmt::format("segment_{:06}", SegmentIndex); } struct RecordedRequestsSegmentWriter { RecordedRequestsSegmentWriter() = default; ~RecordedRequestsSegmentWriter() = default; RecordedRequestsSegmentWriter(const RecordedRequestsSegmentWriter&) = delete; RecordedRequestsSegmentWriter& operator=(const RecordedRequestsSegmentWriter&) = delete; void BeginWrite(const std::filesystem::path& BasePath, uint64_t SegmentIndex, uint64_t RequestBaseIndex); void WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer); void EndWrite(); inline uint64_t GetRequestCount() const { if (m_RequestCount) { return m_RequestCount; } RwLock::SharedLockScope _(m_Lock); return m_Entries.size(); } inline uint64_t GetBaseRequestIndex() const { return m_RequestBaseIndex; } inline uint64_t GetSegmentIndex() const { return m_SegmentIndex; } inline uint64_t GetFileCount() const { return m_FileCount; } inline uint64_t GetRequestsByteCount() const { return m_RequestsByteCount; } inline DateTime GetStartTime() const { return m_StartTime; } inline DateTime GetEndTime() const { return m_EndTime; } private: std::filesystem::path m_BasePath; uint64_t m_SegmentIndex = 0; uint64_t m_RequestBaseIndex = 0; uint64_t m_RequestCount = 0; std::atomic_uint64_t m_FileCount{}; std::atomic_uint64_t m_RequestsByteCount{}; mutable RwLock m_Lock; std::vector m_Entries; std::vector> m_BlockFiles; uint64_t m_ChunkOffset; DateTime m_StartTime = DateTime::Now(); DateTime m_EndTime = DateTime::Now(); }; struct RecordedRequestsWriter { RecordedRequestsWriter(); ~RecordedRequestsWriter(); void BeginWrite(const std::filesystem::path& BasePath); void EndWrite(); void WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer); private: std::filesystem::path m_BasePath; zen::DateTime m_StartTime = DateTime::Now(); mutable RwLock m_Lock; std::unique_ptr m_CurrentWriter; std::vector> m_FinishedSegments; std::vector m_SegmentBaseIndex; uint64_t m_NextSegmentBaseIndex = 0; RecordedRequestsSegmentWriter& EnsureCurrentSegment(); void CommitCurrentSegment(RwLock::ExclusiveLockScope&); void WriteRecordingMetadata(); // I/O thread state struct QueuedRequest { RecordedRequestInfo RequestInfo; IoBuffer RequestBuffer; }; std::unique_ptr m_WriterThread; std::atomic_bool m_IsActive{false}; std::atomic_int64_t m_PendingRequests{0}; RwLock m_RequestQueueLock; std::deque m_RequestQueue; void WriterThreadMain(); }; ////////////////////////////////////////////////////////////////////////// struct RecordedRequestsSegmentReader { RecordedRequestsSegmentReader() = default; RecordedRequestsSegmentReader(const RecordedRequestsSegmentReader&) = delete; RecordedRequestsSegmentReader& operator=(const RecordedRequestsSegmentReader&) = delete; uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory); void EndRead(); RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const; private: std::filesystem::path m_BasePath; std::vector m_Entries; std::vector m_BlockFiles; }; struct RecordedRequestsReader { uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory); void EndRead(); RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const; private: struct SegmentInfo { uint64_t SegmentIndex; uint64_t BaseRequestIndex; uint64_t RequestCount; uint64_t RequestBytes; DateTime StartTime{0}; DateTime EndTime{0}; }; bool m_InMemory = false; std::filesystem::path m_BasePath; std::vector m_KnownSegments; mutable RwLock m_SegmentLock; mutable std::vector> m_SegmentReaders; }; ////////////////////////////////////////////////////////////////////////// void RecordedRequestsSegmentWriter::BeginWrite(const std::filesystem::path& BasePath, uint64_t SegmentIndex, uint64_t RequestBaseIndex) { m_BasePath = BasePath; m_SegmentIndex = SegmentIndex; m_RequestBaseIndex = RequestBaseIndex; std::filesystem::create_directories(m_BasePath); } void RecordedRequestsSegmentWriter::EndWrite() { RwLock::ExclusiveLockScope _(m_Lock); m_RequestCount = m_Entries.size(); m_BlockFiles.clear(); // Emit some metadata alongside the recording try { m_EndTime = DateTime::Now(); TimeSpan Duration{m_EndTime.GetTicks() - m_StartTime.GetTicks()}; CbObjectWriter Cbo; Cbo << "time_start" << m_StartTime << "time_end" << m_EndTime << "duration" << Duration; Cbo << "segment_index" << m_SegmentIndex; Cbo << "entry_count" << m_RequestCount << "entry_size" << sizeof(RecordedRequest); Cbo << "block_size" << RecordedRequestBlockSize << "standalone_threshold" << StandaloneFileSizeThreshold; Cbo.BeginObject("system_info"); Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName(); Cbo.EndObject(); CbObject Metadata = Cbo.Save(); WriteFile(m_BasePath / "rpc_segment_info.zcb", Metadata.GetBuffer().AsIoBuffer()); } catch (const std::exception& Ex) { ZEN_WARN("caught exception while writing segment metadata for RPC recording: {}", Ex.what()); } IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest)); BasicFile IndexFile; IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate); std::error_code Ec; IndexFile.WriteAll(IndexBuffer, Ec); IndexFile.Close(); // note that simply calling `.reset()` here will *not* release backing memory // and it's important that we do because on high traffic servers this will use // a lot of memory otherwise std::vector EmptyEntries; swap(m_Entries, EmptyEntries); } void RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) { const uint64_t RequestBufferSize = RequestBuffer.GetSize(); uint64_t RequestIndex = ~0ull; { RwLock::ExclusiveLockScope Lock(m_Lock); RequestIndex = m_Entries.size(); RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0u, .Length = uint32_t(RequestBufferSize & 0xffffFFFFu), .ContentType = RequestInfo.ContentType, .AcceptType = RequestInfo.AcceptType, .OffsetHigh = 0, .Padding2 = 0, .SessionId = RequestInfo.SessionId}); if (Entry.Length < StandaloneFileSizeThreshold) { const uint32_t BlockIndex = gsl::narrow((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize); if (BlockIndex == m_BlockFiles.size()) { std::unique_ptr& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique()); NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate); m_ChunkOffset = BlockIndex * RecordedRequestBlockSize; ++m_FileCount; } ZEN_ASSERT(BlockIndex < m_BlockFiles.size()); BasicFile* BlockFile = m_BlockFiles[BlockIndex].get(); ZEN_ASSERT(BlockFile != nullptr); // Note that this is the overall logical offset, not the offset within a single file const uint64_t ChunkWriteOffset = m_ChunkOffset; m_ChunkOffset = RoundUp(ChunkWriteOffset + Entry.Length, 1u << 4u); Entry.SetOffset(ChunkWriteOffset); Lock.ReleaseNow(); std::error_code Ec; BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), ChunkWriteOffset - (BlockIndex * RecordedRequestBlockSize), Ec); if (Ec) { // We cannot simply use `Entry` here because the vector may // have been reallocated causing the entry to be in a different // location RwLock::ExclusiveLockScope _(m_Lock); m_Entries[RequestIndex].Length = 0; return; } m_RequestsByteCount.fetch_add(RequestBuffer.GetSize()); return; } } // Write request data to standalone file try { BasicFile RequestFile; RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate); if (RequestBufferSize > std::numeric_limits::max()) { // The exact value of the entry is not important, we will use // the size of the standalone file regardless when performing // the read RwLock::ExclusiveLockScope _(m_Lock); m_Entries[RequestIndex].Length = std::numeric_limits::max(); } ++m_FileCount; std::error_code Ec; RequestFile.WriteAll(RequestBuffer, Ec); if (Ec) { RwLock::ExclusiveLockScope _(m_Lock); m_Entries[RequestIndex].Length = 0; } else { m_RequestsByteCount.fetch_add(RequestBuffer.GetSize()); } } catch (const std::exception&) { RwLock::ExclusiveLockScope _(m_Lock); m_Entries[RequestIndex].Length = 0; } } uint64_t RecordedRequestsSegmentReader::BeginRead(const std::filesystem::path& BasePath, bool InMemory) { m_BasePath = BasePath; BasicFile IndexFile; IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kRead); m_Entries.resize(IndexFile.FileSize() / sizeof(RecordedRequest)); IndexFile.Read(m_Entries.data(), IndexFile.FileSize(), 0); uint64_t MaxChunkPosition = 0; for (const RecordedRequest& R : m_Entries) { if (R.Offset != ~0u) { MaxChunkPosition = Max(MaxChunkPosition, R.GetOffset() + R.Length); } } uint32_t BlockCount = gsl::narrow(MaxChunkPosition / RecordedRequestBlockSize) + 1; m_BlockFiles.resize(BlockCount); for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; ++BlockIndex) { if (InMemory) { BasicFile Chunk; Chunk.Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kRead); m_BlockFiles[BlockIndex] = Chunk.ReadAll(); continue; } m_BlockFiles[BlockIndex] = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("chunks{}.bin", BlockIndex)); } return m_Entries.size(); } void RecordedRequestsSegmentReader::EndRead() { m_BlockFiles.clear(); } RecordedRequestInfo RecordedRequestsSegmentReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const { if (RequestIndex >= m_Entries.size()) { return RecordedRequestInfo::NullRequest; } const RecordedRequest& Entry = m_Entries[RequestIndex]; if (Entry.Length == 0) { return RecordedRequestInfo::NullRequest; } RecordedRequestInfo RequestInfo = {.ContentType = Entry.ContentType, .AcceptType = Entry.AcceptType, .SessionId = Entry.SessionId}; if (Entry.Offset != ~0u) { // Inline in block file const uint64_t EntryOffset = Entry.GetOffset(); const uint32_t BlockIndex = gsl::narrow((EntryOffset + Entry.Length) / RecordedRequestBlockSize); const uint64_t ChunkOffset = EntryOffset - (BlockIndex * RecordedRequestBlockSize); OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length); return RequestInfo; } // Standalone file OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex)); return RequestInfo; } ////////////////////////////////////////////////////////////////////////// RecordedRequestsWriter::RecordedRequestsWriter() { } RecordedRequestsWriter::~RecordedRequestsWriter() { EndWrite(); } void RecordedRequestsWriter::BeginWrite(const std::filesystem::path& BasePath) { m_BasePath = BasePath; m_IsActive = true; m_WriterThread.reset(new std::thread(&RecordedRequestsWriter::WriterThreadMain, this)); } void RecordedRequestsWriter::EndWrite() { if (m_WriterThread) { m_IsActive = false; const int64_t PendingCount = m_PendingRequests.fetch_add(1); m_PendingRequests.notify_all(); if (PendingCount) { ZEN_INFO("waiting for RPC recorder writing thread to drain {} pending items", PendingCount); } if (m_WriterThread->joinable()) { m_WriterThread->join(); } m_WriterThread.reset(); } } void RecordedRequestsWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) { if (m_IsActive) { IoBuffer OwnedRequest = RequestBuffer; OwnedRequest.MakeOwned(); { RwLock::ExclusiveLockScope _(m_RequestQueueLock); m_RequestQueue.push_back(QueuedRequest{RequestInfo, std::move(OwnedRequest)}); m_PendingRequests.fetch_add(1); } m_PendingRequests.notify_all(); } } void RecordedRequestsWriter::WriterThreadMain() { SetCurrentThreadName("rpc_writer"); EnsureCurrentSegment(); while (m_IsActive) { m_PendingRequests.wait(0); while (m_PendingRequests) { RwLock::ExclusiveLockScope _(m_RequestQueueLock); if (!m_RequestQueue.empty()) { bool DrainBacklog = false; do { QueuedRequest Request = m_RequestQueue.front(); m_RequestQueue.pop_front(); m_PendingRequests.fetch_sub(1); // For a sufficiently large backlog, keep blocking queueing operations // until we get below the threshold DrainBacklog = m_RequestQueue.size() >= MaximumBacklogCount; if (!DrainBacklog) { _.ReleaseNow(); } try { RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment(); Writer.WriteRequest(Request.RequestInfo, Request.RequestBuffer); } catch (const std::exception&) { // TODO: what's the right behaviour here? The most likely cause would // be some I/O error and we probably ought to just shut down recording // at that point } } while (DrainBacklog); } else { // shutdown increments this counter so we need to decrement it // here even though we didn't process any request m_PendingRequests.fetch_sub(1); } } } RwLock::ExclusiveLockScope _(m_Lock); CommitCurrentSegment(_); WriteRecordingMetadata(); } RecordedRequestsSegmentWriter& RecordedRequestsWriter::EnsureCurrentSegment() { RwLock::ExclusiveLockScope _(m_Lock); if (m_CurrentWriter) { bool StartNewSegment = false; TimeSpan SegmentAge(DateTime::NowTicks() - m_CurrentWriter->GetStartTime().GetTicks()); if (m_CurrentWriter->GetRequestCount() >= SegmentRequestCount) { ZEN_DEBUG("starting new RPC recording segment due to request count >= {}", SegmentRequestCount); StartNewSegment = true; } else if (m_CurrentWriter->GetFileCount() >= LooseFileThreshold) { ZEN_DEBUG("starting new RPC recording segment due to file count >= {}", LooseFileThreshold); StartNewSegment = true; } else if (m_CurrentWriter->GetRequestsByteCount() >= SegmentByteThreshold) { ZEN_DEBUG("starting new RPC recording segment due to footprint >= {} bytes", SegmentByteThreshold); StartNewSegment = true; } else if (SegmentAge >= SegmentTimeThreshold) { ZEN_DEBUG("starting new RPC recording segment due to age >= {}", NiceTimeSpanMs(SegmentTimeThreshold.GetTicks() / TimeSpan::TicksPerMillisecond)); StartNewSegment = true; } if (StartNewSegment) { CommitCurrentSegment(_); } } if (!m_CurrentWriter) { const uint64_t SegmentIndex = m_FinishedSegments.size(); m_CurrentWriter = std::make_unique(); m_CurrentWriter->BeginWrite(m_BasePath / MakeSegmentPath(SegmentIndex), SegmentIndex, m_NextSegmentBaseIndex); } return *m_CurrentWriter; } void RecordedRequestsWriter::CommitCurrentSegment(RwLock::ExclusiveLockScope&) { if (m_CurrentWriter) { RecordedRequestsSegmentWriter& Writer = *m_CurrentWriter; m_FinishedSegments.push_back(std::move(m_CurrentWriter)); m_SegmentBaseIndex.push_back(m_NextSegmentBaseIndex); m_NextSegmentBaseIndex += Writer.GetRequestCount(); Writer.EndWrite(); } } void RecordedRequestsWriter::WriteRecordingMetadata() { try { DateTime EndTime = DateTime::Now(); TimeSpan Duration{EndTime.GetTicks() - m_StartTime.GetTicks()}; CbObjectWriter Cbo; Cbo << "time_start" << m_StartTime << "time_end" << EndTime << "duration" << Duration << "format_version" << 2; Cbo.BeginObject("system_info"); Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName(); Cbo << "segment_count" << m_FinishedSegments.size(); Cbo << "block_size" << RecordedRequestBlockSize; Cbo << "standalone_threshold" << StandaloneFileSizeThreshold; Cbo << "segment_request_count" << SegmentRequestCount; Cbo << "segment_file_threshold" << LooseFileThreshold; Cbo << "segment_byte_threshold" << SegmentByteThreshold; Describe(GetSystemMetrics(), Cbo); Cbo.EndObject(); Cbo.BeginArray("segments"); for (auto& Segment : m_FinishedSegments) { Cbo.BeginObject(); Cbo << "segment" << Segment->GetSegmentIndex(); Cbo << "base_index" << Segment->GetBaseRequestIndex(); Cbo << "request_count" << Segment->GetRequestCount(); Cbo << "request_bytes" << Segment->GetRequestsByteCount(); Cbo << "start_time" << Segment->GetStartTime(); Cbo << "end_time" << Segment->GetEndTime(); Cbo.EndObject(); } Cbo.EndArray(); CbObject Metadata = Cbo.Save(); WriteFile(m_BasePath / "rpc_recording_info.zcb", Metadata.GetBuffer().AsIoBuffer()); } catch (const std::exception& Ex) { ZEN_WARN("caught exception while writing metadata for RPC recording: {}", Ex.what()); } } ////////////////////////////////////////////////////////////////////////// uint64_t RecordedRequestsReader::BeginRead(const std::filesystem::path& BasePath, bool InMemory) { m_InMemory = InMemory; m_BasePath = BasePath; std::error_code Ec; BasicFile InfoFile; InfoFile.Open(m_BasePath / "rpc_recording_info.zcb", BasicFile::Mode::kRead, Ec); if (!Ec) { try { CbObject CbInfo = LoadCompactBinaryObject(InfoFile.ReadAll()); uint64_t TotalRequestCount = 0; uint64_t MaxSegmentIndex = 0; for (auto SegmentElement : CbInfo["segments"]) { CbObjectView Segment = SegmentElement.AsObjectView(); const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment"sv].AsUInt64(), .BaseRequestIndex = Segment["base_index"sv].AsUInt64(), .RequestCount = Segment["request_count"sv].AsUInt64(), .RequestBytes = Segment["request_bytes"sv].AsUInt64(), .StartTime = Segment["start_time"sv].AsDateTime(), .EndTime = Segment["end_time"sv].AsDateTime()}); TotalRequestCount += Info.RequestCount; MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex); } m_SegmentReaders.resize(MaxSegmentIndex + 1); return TotalRequestCount; } catch (const std::exception& Ex) { ZEN_WARN("could not read metadata file: {}", Ex.what()); } } ZEN_INFO("recovering segment info for '{}'", BasePath); uint64_t TotalRequestCount = 0; uint64_t MaxSegmentIndex = 0; try { for (int SegmentIndex = 0;; ++SegmentIndex) { const std::filesystem::path ZcbPath = BasePath / MakeSegmentPath(SegmentIndex) / "rpc_segment_info.zcb"; FileContents Fc = ReadFile(ZcbPath); if (Fc.ErrorCode) break; if (IoBuffer SegmentInfoBuffer = Fc.Flatten()) { CbObject Segment = LoadCompactBinaryObject(SegmentInfoBuffer); const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment_index"sv].AsUInt64(), .BaseRequestIndex = 0, .RequestCount = Segment["entry_count"sv].AsUInt64(), .RequestBytes = 0, .StartTime = Segment["time_start"sv].AsDateTime(), .EndTime = Segment["time_end"sv].AsDateTime()}); TotalRequestCount += Info.RequestCount; MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex); } } } catch (const std::exception&) { } std::sort(begin(m_KnownSegments), end(m_KnownSegments), [](const auto& Lhs, const auto& Rhs) { return Lhs.SegmentIndex < Rhs.SegmentIndex; }); uint64_t SegmentRequestOffset = 0; for (SegmentInfo& Info : m_KnownSegments) { Info.BaseRequestIndex = SegmentRequestOffset; SegmentRequestOffset += Info.RequestCount; } m_SegmentReaders.resize(MaxSegmentIndex + 1); return TotalRequestCount; } void RecordedRequestsReader::EndRead() { RwLock::ExclusiveLockScope _(m_SegmentLock); m_KnownSegments.clear(); m_SegmentReaders.clear(); m_BasePath.clear(); } RecordedRequestInfo RecordedRequestsReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const { auto EnsureSegment = [&](uint64_t SegmentIndex) -> const RecordedRequestsSegmentReader& { { RwLock::SharedLockScope _(m_SegmentLock); if (auto SegmentReaderPtr = m_SegmentReaders[SegmentIndex].get()) { return *SegmentReaderPtr; } } RwLock::ExclusiveLockScope _(m_SegmentLock); auto& SegmentReaderPtr = m_SegmentReaders[SegmentIndex]; if (!SegmentReaderPtr) { RecordedRequestsSegmentReader* NewSegment = new RecordedRequestsSegmentReader; NewSegment->BeginRead(m_BasePath / MakeSegmentPath(SegmentIndex), m_InMemory); SegmentReaderPtr.reset(NewSegment); } return *(SegmentReaderPtr.get()); }; for (auto& Segment : m_KnownSegments) { if (Segment.RequestCount > RequestIndex) { return EnsureSegment(Segment.SegmentIndex).ReadRequest(RequestIndex, OutBuffer); } else { RequestIndex -= Segment.RequestCount; } } return RecordedRequestInfo::NullRequest; } ////////////////////////////////////////////////////////////////////////// class DiskRequestRecorder : public IRpcRequestRecorder { public: DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); } virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); } virtual void RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override { m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer); } private: RecordedRequestsWriter m_RecordedRequests; }; class DiskRequestReplayer : public IRpcRequestReplayer { public: DiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory) { m_RequestCount = m_RequestReader.BeginRead(BasePath, InMemory); } virtual ~DiskRequestReplayer() { m_RequestReader.EndRead(); } static bool IsCompatible(const std::filesystem::path& BasePath) { if (std::filesystem::exists(BasePath / "rpc_recording_info.zcb")) { return true; } const std::filesystem::path SegmentZero = BasePath / MakeSegmentPath(0); if (std::filesystem::exists(SegmentZero / "rpc_segment_info.zcb") && std::filesystem::exists(SegmentZero / "index.bin")) { // top-level metadata is missing, possibly because of premature exit // on the recording side return true; } return false; } virtual uint64_t GetRequestCount() const override { return m_RequestCount; } virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override { return m_RequestReader.ReadRequest(RequestIndex, OutBuffer); } private: std::uint64_t m_RequestCount; RecordedRequestsReader m_RequestReader; }; } // namespace zen::cache::v2 ////////////////////////////////////////////////////////////////////////// namespace zen::cache { std::unique_ptr MakeDiskRequestRecorder(const std::filesystem::path& BasePath) { return std::make_unique(BasePath); } std::unique_ptr MakeDiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory) { if (v2::DiskRequestReplayer::IsCompatible(BasePath)) { return std::make_unique(BasePath, InMemory); } else { return std::make_unique(BasePath, InMemory); } } #if ZEN_WITH_TESTS void rpcrecord_forcelink() { } TEST_SUITE_BEGIN("rpc.recording"); TEST_CASE("rpc.record") { ScopedTemporaryDirectory TempDir; auto Path = TempDir.Path(); const Oid SessionId = GetSessionId(); using namespace std::literals; { cache::v2::DiskRequestRecorder Recorder{Path}; for (int i = 0; i < 1000; ++i) { RecordedRequestInfo RequestInfo{.ContentType = ZenContentType::kCbObject, .AcceptType = ZenContentType::kCbObject, .SessionId = SessionId}; CbObjectWriter RequestPayload; RequestPayload << "test"sv << true; RequestPayload << "index"sv << i; CbObject Req = RequestPayload.Save(); IoBuffer RequestBuffer = Req.GetBuffer().AsIoBuffer(); Recorder.RecordRequest(RequestInfo, RequestBuffer); } } { cache::v2::DiskRequestReplayer Replayer{Path, false}; for (int i = 0; i < 1000; ++i) { IoBuffer RequestBuffer; RecordedRequestInfo RequestInfo = Replayer.GetRequest(i, RequestBuffer); CHECK(RequestInfo.AcceptType == ZenContentType::kCbObject); CHECK(RequestInfo.ContentType == ZenContentType::kCbObject); CHECK(RequestInfo.SessionId == SessionId); CbObject Req = LoadCompactBinaryObject(RequestBuffer); CHECK_EQ(Req["index"sv].AsInt32(), i); CHECK_EQ(Req["test"sv].AsBool(), true); } } } TEST_SUITE_END(); #endif } // namespace zen::cache