// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include #include ZEN_THIRD_PARTY_INCLUDES_END #include #include namespace zen { template void ClearContainer(T& Container) { T Empty; swap(Container, Empty); } } // namespace zen namespace zen::cache { const RecordedRequestInfo RecordedRequestInfo::NullRequest = {.ContentType = ZenContentType::kUnknownContentType, .AcceptType = ZenContentType::kUnknownContentType, .SessionId = Oid::Zero}; } ////////////////////////////////////////////////////////////////////////// namespace zen::cache::v2 { using namespace std::literals; struct RecordedRequest { uint32_t Offset; // 4 bytes uint32_t Length; // 4 bytes ZenContentType ContentType; // 1 byte ZenContentType AcceptType; // 1 byte uint8_t OffsetHigh; // 1 byte uint8_t Padding2; // 1 byte Oid SessionId; // 12 bytes inline uint64_t GetOffset() const { return uint64_t(Offset) + (uint64_t(OffsetHigh) << 32); } inline void SetOffset(uint64_t NewOffset) { Offset = gsl::narrow_cast(NewOffset & 0xffff'ffff); OffsetHigh = gsl::narrow_cast(NewOffset >> 32); } }; static_assert(sizeof(RecordedRequest) == 24); const uint64_t RecordedRequestBlockSize = 1 * 1024 * 1024 * 1024; // 1GiB const uint64_t StandaloneFileSizeThreshold = 16 * 1024 * 1024ull; // 16MiB const uint64_t SegmentRequestCount = 10 * 1000 * 1000; const uint64_t LooseFileThreshold = 5000; // Somewhat arbitrary, but we try to keep the // number of files in a directory below this level // for performance const uint64_t SegmentByteThreshold = 16ull * 1024 * 1024 * 1024; const TimeSpan SegmentTimeThreshold{/* hours */ 1, /* minutes */ 0, /* seconds */ 0}; const int64_t MaximumBacklogCount = 2000; std::string MakeSegmentPath(uint64_t SegmentIndex) { return fmt::format("segment_{:06}", SegmentIndex); } struct RecordedRequestsSegmentWriter { RecordedRequestsSegmentWriter() = default; ~RecordedRequestsSegmentWriter() = default; RecordedRequestsSegmentWriter(const RecordedRequestsSegmentWriter&) = delete; RecordedRequestsSegmentWriter& operator=(const RecordedRequestsSegmentWriter&) = delete; void BeginWrite(const Oid& RecordingId, const std::filesystem::path& BasePath, uint64_t SegmentIndex, uint64_t RequestBaseIndex); void WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer); void EndWrite(); inline uint64_t GetRequestCount() const { if (m_RequestCount) { return m_RequestCount; } RwLock::SharedLockScope _(m_Lock); return m_Entries.size(); } inline uint64_t GetBaseRequestIndex() const { return m_RequestBaseIndex; } inline uint64_t GetSegmentIndex() const { return m_SegmentIndex; } inline uint64_t GetFileCount() const { return m_FileCount; } inline uint64_t GetRequestsByteCount() const { return m_RequestsByteCount; } inline DateTime GetStartTime() const { return m_StartTime; } inline DateTime GetEndTime() const { return m_EndTime; } private: Oid m_RecordingId; std::filesystem::path m_BasePath; uint64_t m_SegmentIndex = 0; uint64_t m_RequestBaseIndex = 0; uint64_t m_RequestCount = 0; std::atomic_uint64_t m_FileCount{}; std::atomic_uint64_t m_RequestsByteCount{}; mutable RwLock m_Lock; std::vector m_Entries; std::vector> m_BlockFiles; uint64_t m_ChunkOffset; DateTime m_StartTime = DateTime::Now(); DateTime m_EndTime = DateTime::Now(); }; struct RecordedRequestsWriter { RecordedRequestsWriter(); ~RecordedRequestsWriter(); void BeginWrite(const std::filesystem::path& BasePath); void EndWrite(); void WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer); private: std::filesystem::path m_BasePath; zen::DateTime m_StartTime = DateTime::Now(); mutable RwLock m_Lock; std::unique_ptr m_CurrentWriter; std::vector> m_FinishedSegments; std::vector m_SegmentBaseIndex; uint64_t m_NextSegmentBaseIndex = 0; RecordedRequestsSegmentWriter& EnsureCurrentSegment(); void CommitCurrentSegment(RwLock::ExclusiveLockScope&); void WriteRecordingMetadata(); // I/O thread state struct QueuedRequest { RecordedRequestInfo RequestInfo; IoBuffer RequestBuffer; }; Oid m_RecordingId = Oid::NewOid(); std::unique_ptr m_WriterThread; std::atomic_bool m_IsWriterReady{false}; std::atomic_bool m_IsActive{false}; std::atomic_int64_t m_PendingRequests{0}; RwLock m_RequestQueueLock; std::deque m_RequestQueue; void WriterThreadMain(); }; ////////////////////////////////////////////////////////////////////////// struct RecordedRequestsSegmentReader { RecordedRequestsSegmentReader() = default; RecordedRequestsSegmentReader(const RecordedRequestsSegmentReader&) = delete; RecordedRequestsSegmentReader& operator=(const RecordedRequestsSegmentReader&) = delete; uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory); void EndRead(); RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const; private: std::filesystem::path m_BasePath; std::vector m_Entries; std::vector m_BlockFiles; }; struct RecordedRequestsReader { uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory); void EndRead(); RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const; private: static std::vector IterateSegments(std::filesystem::path RootPath); struct SegmentInfo { uint64_t SegmentIndex; uint64_t BaseRequestIndex; uint64_t RequestCount; uint64_t RequestBytes; DateTime StartTime{0}; DateTime EndTime{0}; std::filesystem::path BasePath; }; bool m_InMemory = false; std::filesystem::path m_BasePath; std::vector m_KnownSegments; mutable RwLock m_SegmentLock; mutable std::vector> m_SegmentReaders; }; ////////////////////////////////////////////////////////////////////////// void RecordedRequestsSegmentWriter::BeginWrite(const Oid& RecordingId, const std::filesystem::path& BasePath, uint64_t SegmentIndex, uint64_t RequestBaseIndex) { m_RecordingId = RecordingId; m_BasePath = BasePath; m_SegmentIndex = SegmentIndex; m_RequestBaseIndex = RequestBaseIndex; CreateDirectories(m_BasePath); } void RecordedRequestsSegmentWriter::EndWrite() { RwLock::ExclusiveLockScope _(m_Lock); m_RequestCount = m_Entries.size(); m_BlockFiles.clear(); // Emit some metadata alongside the recorded segment try { m_EndTime = DateTime::Now(); TimeSpan Duration{m_EndTime.GetTicks() - m_StartTime.GetTicks()}; CbObjectWriter Cbo; Cbo << "recording_id" << m_RecordingId; Cbo << "time_start" << m_StartTime << "time_end" << m_EndTime << "duration" << Duration; Cbo << "segment_index" << m_SegmentIndex; Cbo << "entry_count" << m_RequestCount << "entry_size" << sizeof(RecordedRequest); Cbo << "block_size" << RecordedRequestBlockSize << "standalone_threshold" << StandaloneFileSizeThreshold; Cbo.BeginObject("system_info"); Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName(); Cbo.EndObject(); CbObject Metadata = Cbo.Save(); WriteFile(m_BasePath / "rpc_segment_info.zcb", Metadata.GetBuffer().AsIoBuffer()); } catch (const std::exception& Ex) { ZEN_WARN("caught exception while writing segment metadata for RPC recording: {}", Ex.what()); } IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest)); BasicFile IndexFile; IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate); std::error_code Ec; IndexFile.WriteAll(IndexBuffer, Ec); IndexFile.Close(); // note that simply calling `.reset()` here will *not* release backing memory // and it's important that we do because on high traffic servers this will use // a lot of memory otherwise ClearContainer(m_Entries); } void RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) { const uint64_t RequestBufferSize = RequestBuffer.GetSize(); uint64_t RequestIndex = ~0ull; { RwLock::ExclusiveLockScope Lock(m_Lock); RequestIndex = m_Entries.size(); RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0u, .Length = uint32_t(RequestBufferSize & 0xffffFFFFu), .ContentType = RequestInfo.ContentType, .AcceptType = RequestInfo.AcceptType, .OffsetHigh = 0, .Padding2 = 0, .SessionId = RequestInfo.SessionId}); if (Entry.Length < StandaloneFileSizeThreshold) { const uint32_t BlockIndex = gsl::narrow((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize); if (BlockIndex == m_BlockFiles.size()) { std::unique_ptr& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique()); NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate); m_ChunkOffset = BlockIndex * RecordedRequestBlockSize; ++m_FileCount; } ZEN_ASSERT(BlockIndex < m_BlockFiles.size()); BasicFile* BlockFile = m_BlockFiles[BlockIndex].get(); ZEN_ASSERT(BlockFile != nullptr); // Note that this is the overall logical offset, not the offset within a single file const uint64_t ChunkWriteOffset = m_ChunkOffset; m_ChunkOffset = RoundUp(ChunkWriteOffset + Entry.Length, 1u << 4u); Entry.SetOffset(ChunkWriteOffset); Lock.ReleaseNow(); std::error_code Ec; BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), ChunkWriteOffset - (BlockIndex * RecordedRequestBlockSize), Ec); if (Ec) { // We cannot simply use `Entry` here because the vector may // have been reallocated causing the entry to be in a different // location RwLock::ExclusiveLockScope _(m_Lock); m_Entries[RequestIndex].Length = 0; return; } m_RequestsByteCount.fetch_add(RequestBuffer.GetSize()); return; } } // Write request data to standalone file try { BasicFile RequestFile; RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate); if (RequestBufferSize > std::numeric_limits::max()) { // The exact value of the entry is not important, we will use // the size of the standalone file regardless when performing // the read RwLock::ExclusiveLockScope _(m_Lock); m_Entries[RequestIndex].Length = std::numeric_limits::max(); } ++m_FileCount; std::error_code Ec; RequestFile.WriteAll(RequestBuffer, Ec); if (Ec) { RwLock::ExclusiveLockScope _(m_Lock); m_Entries[RequestIndex].Length = 0; } else { m_RequestsByteCount.fetch_add(RequestBuffer.GetSize()); } } catch (const std::exception&) { RwLock::ExclusiveLockScope _(m_Lock); m_Entries[RequestIndex].Length = 0; } } uint64_t RecordedRequestsSegmentReader::BeginRead(const std::filesystem::path& BasePath, bool InMemory) { m_BasePath = BasePath; BasicFile IndexFile; IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kRead); m_Entries.resize(IndexFile.FileSize() / sizeof(RecordedRequest)); IndexFile.Read(m_Entries.data(), IndexFile.FileSize(), 0); uint64_t MaxChunkPosition = 0; for (const RecordedRequest& R : m_Entries) { if (R.Offset != ~0u) { MaxChunkPosition = Max(MaxChunkPosition, R.GetOffset() + R.Length); } } uint32_t BlockCount = gsl::narrow(MaxChunkPosition / RecordedRequestBlockSize) + 1; m_BlockFiles.resize(BlockCount); for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; ++BlockIndex) { if (InMemory) { BasicFile Chunk; Chunk.Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kRead); m_BlockFiles[BlockIndex] = Chunk.ReadAll(); continue; } m_BlockFiles[BlockIndex] = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("chunks{}.bin", BlockIndex)); } return m_Entries.size(); } void RecordedRequestsSegmentReader::EndRead() { // Ensure memory is released fully once we're done ClearContainer(m_BlockFiles); ClearContainer(m_Entries); } RecordedRequestInfo RecordedRequestsSegmentReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const { if (RequestIndex >= m_Entries.size()) { return RecordedRequestInfo::NullRequest; } const RecordedRequest& Entry = m_Entries[RequestIndex]; if (Entry.Length == 0) { return RecordedRequestInfo::NullRequest; } RecordedRequestInfo RequestInfo = {.ContentType = Entry.ContentType, .AcceptType = Entry.AcceptType, .SessionId = Entry.SessionId}; if (Entry.Offset != ~0u) { // Inline in block file const uint64_t EntryOffset = Entry.GetOffset(); const uint32_t BlockIndex = gsl::narrow((EntryOffset + Entry.Length) / RecordedRequestBlockSize); const uint64_t ChunkOffset = EntryOffset - (BlockIndex * RecordedRequestBlockSize); OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length); return RequestInfo; } // Standalone file OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex)); return RequestInfo; } ////////////////////////////////////////////////////////////////////////// RecordedRequestsWriter::RecordedRequestsWriter() { } RecordedRequestsWriter::~RecordedRequestsWriter() { EndWrite(); } void RecordedRequestsWriter::BeginWrite(const std::filesystem::path& BasePath) { m_BasePath = BasePath; m_IsActive = true; m_WriterThread.reset(new std::thread(&RecordedRequestsWriter::WriterThreadMain, this)); m_IsWriterReady.wait(false); } void RecordedRequestsWriter::EndWrite() { if (m_WriterThread) { m_IsActive = false; const int64_t PendingCount = m_PendingRequests.fetch_add(1); m_PendingRequests.notify_all(); if (PendingCount) { ZEN_INFO("waiting for RPC recorder writing thread to drain {} pending items", PendingCount); } if (m_WriterThread->joinable()) { m_WriterThread->join(); } m_WriterThread.reset(); } } void RecordedRequestsWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) { if (m_IsActive) { IoBuffer OwnedRequest = RequestBuffer; OwnedRequest.MakeOwned(); { RwLock::ExclusiveLockScope _(m_RequestQueueLock); m_RequestQueue.push_back(QueuedRequest{RequestInfo, std::move(OwnedRequest)}); m_PendingRequests.fetch_add(1); } m_PendingRequests.notify_all(); } } void RecordedRequestsWriter::WriterThreadMain() { SetCurrentThreadName("rpc_writer"); EnsureCurrentSegment(); m_IsWriterReady.store(true); m_IsWriterReady.notify_all(); while (m_IsActive) { m_PendingRequests.wait(0); while (m_PendingRequests) { RwLock::ExclusiveLockScope _(m_RequestQueueLock); if (!m_RequestQueue.empty()) { bool DrainBacklog = false; do { QueuedRequest Request = m_RequestQueue.front(); m_RequestQueue.pop_front(); m_PendingRequests.fetch_sub(1); // For a sufficiently large backlog, keep blocking queueing operations // until we get below the threshold DrainBacklog = m_RequestQueue.size() >= MaximumBacklogCount; if (!DrainBacklog) { _.ReleaseNow(); } try { RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment(); Writer.WriteRequest(Request.RequestInfo, Request.RequestBuffer); } catch (const std::exception&) { // TODO: what's the right behaviour here? The most likely cause would // be some I/O error and we probably ought to just shut down recording // at that point } } while (DrainBacklog); } else { // shutdown increments this counter so we need to decrement it // here even though we didn't process any request m_PendingRequests.fetch_sub(1); } } } RwLock::ExclusiveLockScope _(m_Lock); CommitCurrentSegment(_); WriteRecordingMetadata(); } RecordedRequestsSegmentWriter& RecordedRequestsWriter::EnsureCurrentSegment() { RwLock::ExclusiveLockScope _(m_Lock); if (m_CurrentWriter) { bool StartNewSegment = false; TimeSpan SegmentAge(DateTime::NowTicks() - m_CurrentWriter->GetStartTime().GetTicks()); if (m_CurrentWriter->GetRequestCount() >= SegmentRequestCount) { ZEN_DEBUG("starting new RPC recording segment due to request count >= {}", SegmentRequestCount); StartNewSegment = true; } else if (m_CurrentWriter->GetFileCount() >= LooseFileThreshold) { ZEN_DEBUG("starting new RPC recording segment due to file count >= {}", LooseFileThreshold); StartNewSegment = true; } else if (m_CurrentWriter->GetRequestsByteCount() >= SegmentByteThreshold) { ZEN_DEBUG("starting new RPC recording segment due to footprint >= {} bytes", SegmentByteThreshold); StartNewSegment = true; } else if (SegmentAge >= SegmentTimeThreshold) { ZEN_DEBUG("starting new RPC recording segment due to age >= {}", NiceTimeSpanMs(SegmentTimeThreshold.GetTicks() / TimeSpan::TicksPerMillisecond)); StartNewSegment = true; } if (StartNewSegment) { CommitCurrentSegment(_); } } if (!m_CurrentWriter) { const uint64_t SegmentIndex = m_FinishedSegments.size(); m_CurrentWriter = std::make_unique(); m_CurrentWriter->BeginWrite(m_RecordingId, m_BasePath / MakeSegmentPath(SegmentIndex), SegmentIndex, m_NextSegmentBaseIndex); } return *m_CurrentWriter; } void RecordedRequestsWriter::CommitCurrentSegment(RwLock::ExclusiveLockScope&) { if (m_CurrentWriter) { RecordedRequestsSegmentWriter& Writer = *m_CurrentWriter; m_FinishedSegments.push_back(std::move(m_CurrentWriter)); m_SegmentBaseIndex.push_back(m_NextSegmentBaseIndex); m_NextSegmentBaseIndex += Writer.GetRequestCount(); Writer.EndWrite(); } } void RecordedRequestsWriter::WriteRecordingMetadata() { try { DateTime EndTime = DateTime::Now(); TimeSpan Duration{EndTime.GetTicks() - m_StartTime.GetTicks()}; CbObjectWriter Cbo; Cbo << "recording_id" << m_RecordingId; Cbo << "time_start" << m_StartTime << "time_end" << EndTime << "duration" << Duration << "format_version" << 2; Cbo << "segment_count" << m_FinishedSegments.size(); Cbo << "block_size" << RecordedRequestBlockSize; Cbo << "standalone_threshold" << StandaloneFileSizeThreshold; Cbo << "segment_request_count" << SegmentRequestCount; Cbo << "segment_file_threshold" << LooseFileThreshold; Cbo << "segment_byte_threshold" << SegmentByteThreshold; Cbo.BeginObject("system_info"); Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName(); Describe(GetSystemMetrics(), Cbo); Cbo.EndObject(); Cbo.BeginArray("segments"); for (auto& Segment : m_FinishedSegments) { Cbo.BeginObject(); Cbo << "segment" << Segment->GetSegmentIndex(); Cbo << "base_index" << Segment->GetBaseRequestIndex(); Cbo << "request_count" << Segment->GetRequestCount(); Cbo << "request_bytes" << Segment->GetRequestsByteCount(); Cbo << "start_time" << Segment->GetStartTime(); Cbo << "end_time" << Segment->GetEndTime(); Cbo.EndObject(); } Cbo.EndArray(); CbObject Metadata = Cbo.Save(); WriteFile(m_BasePath / "rpc_recording_info.zcb", Metadata.GetBuffer().AsIoBuffer()); } catch (const std::exception& Ex) { ZEN_WARN("caught exception while writing metadata for RPC recording: {}", Ex.what()); } } ////////////////////////////////////////////////////////////////////////// std::vector RecordedRequestsReader::IterateSegments(std::filesystem::path RootPath) { std::vector Paths; std::error_code Ec; std::filesystem::recursive_directory_iterator DirIt(RootPath, Ec); if (!Ec) { for (auto& Dir : DirIt) { if (Dir.is_regular_file() && Dir.path().filename().string().ends_with("rpc_segment_info.zcb")) { Paths.emplace_back(Dir.path()); } } } std::sort(begin(Paths), end(Paths)); return Paths; } uint64_t RecordedRequestsReader::BeginRead(const std::filesystem::path& BasePath, bool InMemory) { m_InMemory = InMemory; m_BasePath = BasePath; // Note: we currently ignore the root recording manifest to better handle partial // or merged recordings, and recordings with deeper directory structure created as // part of ad hoc organisation of larger recordings which may introduce another // directory level to keep directories to a more reasonable size // // I'm not yet sure if we should just get rid of this forever though, so leaving // the code in a disabled state for now. #if 0 std::error_code Ec; BasicFile InfoFile; InfoFile.Open(m_BasePath / "rpc_recording_info.zcb", BasicFile::Mode::kRead, Ec); if (!Ec) { try { CbObject CbInfo = LoadCompactBinaryObject(InfoFile.ReadAll()); uint64_t TotalRequestCount = 0; uint64_t MaxSegmentIndex = 0; for (auto SegmentElement : CbInfo["segments"]) { CbObjectView Segment = SegmentElement.AsObjectView(); const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment"sv].AsUInt64(), .BaseRequestIndex = Segment["base_index"sv].AsUInt64(), .RequestCount = Segment["request_count"sv].AsUInt64(), .RequestBytes = Segment["request_bytes"sv].AsUInt64(), .StartTime = Segment["start_time"sv].AsDateTime(), .EndTime = Segment["end_time"sv].AsDateTime()}); TotalRequestCount += Info.RequestCount; MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex); } m_SegmentReaders.resize(MaxSegmentIndex + 1); return TotalRequestCount; } catch (const std::exception& Ex) { ZEN_WARN("could not read metadata file: {}", Ex.what()); } } #endif ZEN_INFO("recovering segment info for '{}'", BasePath); uint64_t TotalRequestCount = 0; uint64_t MaxSegmentIndex = 0; try { auto SegmentPaths = IterateSegments(BasePath); for (const std::filesystem::path& ZcbPath : SegmentPaths) { FileContents Fc = ReadFile(ZcbPath); if (Fc.ErrorCode) { ZEN_WARN("Error opening '{}': {}", ZcbPath, Fc.ErrorCode.message()); } else { if (IoBuffer SegmentInfoBuffer = Fc.Flatten()) { if (ValidateCompactBinaryRange(SegmentInfoBuffer.GetView(), CbValidateMode::Default) == CbValidateError::None) { CbObject Segment = LoadCompactBinaryObject(SegmentInfoBuffer); const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment_index"sv].AsUInt64(), .BaseRequestIndex = 0, // computed below .RequestCount = Segment["entry_count"sv].AsUInt64(), .RequestBytes = 0, .StartTime = Segment["time_start"sv].AsDateTime(), .EndTime = Segment["time_end"sv].AsDateTime(), .BasePath = ZcbPath.parent_path()}); TotalRequestCount += Info.RequestCount; MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex); } else { ZEN_WARN("invalid segment metadata found in '{}' - ignoring", ZcbPath); } } } } } catch (const std::exception&) { } std::sort(begin(m_KnownSegments), end(m_KnownSegments), [](const auto& Lhs, const auto& Rhs) { return Lhs.SegmentIndex < Rhs.SegmentIndex; }); uint64_t SegmentRequestOffset = 0; for (SegmentInfo& Info : m_KnownSegments) { Info.BaseRequestIndex = SegmentRequestOffset; SegmentRequestOffset += Info.RequestCount; } m_SegmentReaders.resize(MaxSegmentIndex + 1); return TotalRequestCount; } void RecordedRequestsReader::EndRead() { RwLock::ExclusiveLockScope _(m_SegmentLock); m_KnownSegments.clear(); m_SegmentReaders.clear(); m_BasePath.clear(); } RecordedRequestInfo RecordedRequestsReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const { auto EnsureSegment = [&](const SegmentInfo& Segment) -> const RecordedRequestsSegmentReader& { { RwLock::SharedLockScope _(m_SegmentLock); if (auto SegmentReaderPtr = m_SegmentReaders[Segment.SegmentIndex].get()) { return *SegmentReaderPtr; } } RwLock::ExclusiveLockScope _(m_SegmentLock); auto& SegmentReaderPtr = m_SegmentReaders[Segment.SegmentIndex]; if (!SegmentReaderPtr) { RecordedRequestsSegmentReader* NewSegment = new RecordedRequestsSegmentReader; NewSegment->BeginRead(Segment.BasePath, m_InMemory); SegmentReaderPtr.reset(NewSegment); } return *(SegmentReaderPtr.get()); }; // This is pretty slow for large replays, should have an index lookup for this instead for (const SegmentInfo& Segment : m_KnownSegments) { if (Segment.RequestCount > RequestIndex) { return EnsureSegment(Segment).ReadRequest(RequestIndex, OutBuffer); } else { RequestIndex -= Segment.RequestCount; } } return RecordedRequestInfo::NullRequest; } ////////////////////////////////////////////////////////////////////////// class DiskRequestRecorder : public IRpcRequestRecorder { public: DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); } virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); } virtual void RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override { m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer); } private: RecordedRequestsWriter m_RecordedRequests; }; class DiskRequestAnalyzer : public IRpcRequestAnalyzer { public: DiskRequestAnalyzer(const std::filesystem::path& BasePath) { m_RequestCount = m_RequestReader.BeginRead(BasePath, false); } virtual ~DiskRequestAnalyzer() { m_RequestReader.EndRead(); } virtual uint64_t GetRequestCount() const override { return m_RequestCount; } virtual void IterateRange(uint64_t RequestIndex, uint64_t EndRequestIndex, std::function&& Func) override { while (RequestIndex < EndRequestIndex) { IoBuffer Buffer; RecordedRequestInfo Info = m_RequestReader.ReadRequest(RequestIndex, /* out */ Buffer); Func(Info, Buffer); ++RequestIndex; }; } private: std::uint64_t m_RequestCount; RecordedRequestsReader m_RequestReader; }; class DiskRequestReplayer : public IRpcRequestReplayer { public: DiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory) { m_RequestCount = m_RequestReader.BeginRead(BasePath, InMemory); } virtual ~DiskRequestReplayer() { m_RequestReader.EndRead(); } virtual uint64_t GetRequestCount() const override { return m_RequestCount; } virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override { return m_RequestReader.ReadRequest(RequestIndex, OutBuffer); } static bool IsCompatible(const std::filesystem::path& BasePath) { if (IsFile(BasePath / "rpc_recording_info.zcb")) { return true; } return HasSegments(BasePath); } private: std::uint64_t m_RequestCount; RecordedRequestsReader m_RequestReader; static bool HasSegments(const std::filesystem::path& RootPath) { std::error_code Ec; std::filesystem::recursive_directory_iterator DirIt(RootPath, Ec); if (Ec) { return false; } for (auto& Dir : DirIt) { if (Dir.is_regular_file() && Dir.path().filename().string().ends_with("rpc_segment_info.zcb")) { return true; } } return false; }; }; } // namespace zen::cache::v2 ////////////////////////////////////////////////////////////////////////// namespace zen::cache { std::unique_ptr MakeDiskRequestRecorder(const std::filesystem::path& BasePath) { return std::make_unique(BasePath); } std::unique_ptr MakeDiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory) { if (v2::DiskRequestReplayer::IsCompatible(BasePath)) { return std::make_unique(BasePath, InMemory); } else { return nullptr; } } std::unique_ptr MakeDiskRequestAnalyzer(const std::filesystem::path& BasePath) { if (v2::DiskRequestReplayer::IsCompatible(BasePath)) { return std::make_unique(BasePath); } else { return nullptr; } } #if ZEN_WITH_TESTS void rpcrecord_forcelink() { } TEST_SUITE_BEGIN("rpc.recording"); TEST_CASE("rpc.record") { ScopedTemporaryDirectory TempDir; auto Path = TempDir.Path(); const Oid SessionId = GetSessionId(); using namespace std::literals; { cache::v2::DiskRequestRecorder Recorder{Path}; for (int i = 0; i < 1000; ++i) { RecordedRequestInfo RequestInfo{.ContentType = ZenContentType::kCbObject, .AcceptType = ZenContentType::kCbObject, .SessionId = SessionId}; CbObjectWriter RequestPayload; RequestPayload << "test"sv << true; RequestPayload << "index"sv << i; CbObject Req = RequestPayload.Save(); IoBuffer RequestBuffer = Req.GetBuffer().AsIoBuffer(); Recorder.RecordRequest(RequestInfo, RequestBuffer); } } { cache::v2::DiskRequestReplayer Replayer{Path, false}; for (int i = 0; i < 1000; ++i) { IoBuffer RequestBuffer; RecordedRequestInfo RequestInfo = Replayer.GetRequest(i, RequestBuffer); CHECK(RequestInfo.AcceptType == ZenContentType::kCbObject); CHECK(RequestInfo.ContentType == ZenContentType::kCbObject); CHECK(RequestInfo.SessionId == SessionId); CbObject Req = LoadCompactBinaryObject(RequestBuffer); CHECK_EQ(Req["index"sv].AsInt32(), i); CHECK_EQ(Req["test"sv].AsBool(), true); } } } TEST_SUITE_END(); #endif } // namespace zen::cache