diff options
Diffstat (limited to 'src/zenutil/rpcrecording.cpp')
| -rw-r--r-- | src/zenutil/rpcrecording.cpp | 1175 |
1 files changed, 1175 insertions, 0 deletions
diff --git a/src/zenutil/rpcrecording.cpp b/src/zenutil/rpcrecording.cpp new file mode 100644 index 000000000..54f27dee7 --- /dev/null +++ b/src/zenutil/rpcrecording.cpp @@ -0,0 +1,1175 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/rpcrecording.h> + +#include <zencore/basicfile.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/session.h> +#include <zencore/system.h> +#include <zencore/testing.h> +#include <zencore/testutils.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <fmt/format.h> +#include <gsl/gsl-lite.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +#include <deque> +#include <thread> + +namespace zen::cache { + +const RecordedRequestInfo RecordedRequestInfo::NullRequest = {.ContentType = ZenContentType::kUnknownContentType, + .AcceptType = ZenContentType::kUnknownContentType, + .SessionId = Oid::Zero}; + +} + +namespace zen::cache::v1 { + +struct RecordedRequest +{ + uint64_t Offset; + uint64_t Length; + ZenContentType ContentType; + ZenContentType AcceptType; +}; + +const uint64_t RecordedRequestBlockSize = 1ull << 31u; +const uint64_t StandaloneFileSizeThreshold = 1 * 1024 * 1024ull; + +struct RecordedRequestsWriter +{ + void BeginWrite(const std::filesystem::path& BasePath) + { + m_BasePath = BasePath; + CreateDirectories(m_BasePath); + } + + void EndWrite() + { + RwLock::ExclusiveLockScope _(m_Lock); + m_BlockFiles.clear(); + + try + { + // Emit some metadata alongside the recording + + DateTime EndTime = DateTime::Now(); + TimeSpan Duration{EndTime.GetTicks() - m_StartTime.GetTicks()}; + + CbObjectWriter Cbo; + Cbo << "time_start" << m_StartTime << "time_end" << EndTime << "duration" << Duration; + Cbo << "entry_count" << m_Entries.size() << "entry_size" << sizeof(RecordedRequest); + Cbo << "block_size" << RecordedRequestBlockSize << "standalone_threshold" << StandaloneFileSizeThreshold; + + Cbo.BeginObject("system_info"); + Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName(); + Describe(GetSystemMetrics(), Cbo); + Cbo.EndObject(); + CbObject Metadata = Cbo.Save(); + + WriteFile(m_BasePath / "rpc_recording_metadata.zcb", Metadata.GetBuffer().AsIoBuffer()); + } + catch (const std::exception& Ex) + { + ZEN_WARN("caught exception while generating metadata for RPC recording: {}", Ex.what()); + } + + IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest)); + BasicFile IndexFile; + IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate); + std::error_code Ec; + IndexFile.WriteAll(IndexBuffer, Ec); + IndexFile.Close(); + m_Entries.clear(); + } + + void WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) + { + RwLock::ExclusiveLockScope Lock(m_Lock); + uint64_t RequestIndex = m_Entries.size(); + RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0ull, + .Length = RequestBuffer.Size(), + .ContentType = RequestInfo.ContentType, + .AcceptType = RequestInfo.AcceptType}); + + if (Entry.Length < StandaloneFileSizeThreshold) + { + const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize); + if (BlockIndex == m_BlockFiles.size()) + { + std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>()); + NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate); + m_ChunkOffset = BlockIndex * RecordedRequestBlockSize; + } + ZEN_ASSERT(BlockIndex < m_BlockFiles.size()); + BasicFile* BlockFile = m_BlockFiles[BlockIndex].get(); + ZEN_ASSERT(BlockFile != nullptr); + + Entry.Offset = m_ChunkOffset; + m_ChunkOffset = RoundUp(m_ChunkOffset + Entry.Length, 1u << 4u); + Lock.ReleaseNow(); + + std::error_code Ec; + BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), Entry.Offset - BlockIndex * RecordedRequestBlockSize, Ec); + if (Ec) + { + Entry.Length = 0; + } + } + else + { + Lock.ReleaseNow(); + + BasicFile RequestFile; + RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate); + std::error_code Ec; + RequestFile.WriteAll(RequestBuffer, Ec); + if (Ec) + { + Entry.Length = 0; + } + } + } + + std::filesystem::path m_BasePath; + mutable RwLock m_Lock; + std::vector<RecordedRequest> m_Entries; + std::vector<std::unique_ptr<BasicFile>> m_BlockFiles; + uint64_t m_ChunkOffset; + zen::DateTime m_StartTime = DateTime::Now(); +}; + +struct RecordedRequestsReader +{ + uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory) + { + m_BasePath = BasePath; + BasicFile IndexFile; + IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kRead); + m_Entries.resize(IndexFile.FileSize() / sizeof(RecordedRequest)); + IndexFile.Read(m_Entries.data(), IndexFile.FileSize(), 0); + uint64_t MaxChunkPosition = 0; + for (const RecordedRequest& R : m_Entries) + { + if (R.Offset != ~0ull) + { + MaxChunkPosition = Max(MaxChunkPosition, R.Offset + R.Length); + } + } + uint32_t BlockCount = gsl::narrow<uint32_t>(MaxChunkPosition / RecordedRequestBlockSize) + 1; + m_BlockFiles.resize(BlockCount); + for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; ++BlockIndex) + { + if (InMemory) + { + BasicFile Chunk; + Chunk.Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kRead); + m_BlockFiles[BlockIndex] = Chunk.ReadAll(); + continue; + } + m_BlockFiles[BlockIndex] = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("chunks{}.bin", BlockIndex)); + } + return m_Entries.size(); + } + void EndRead() { m_BlockFiles.clear(); } + + RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const + { + if (RequestIndex >= m_Entries.size()) + { + return RecordedRequestInfo::NullRequest; + } + + const RecordedRequest& Entry = m_Entries[RequestIndex]; + if (Entry.Length == 0) + { + return RecordedRequestInfo::NullRequest; + } + + if (Entry.Offset != ~0ull) + { + uint32_t BlockIndex = gsl::narrow<uint32_t>((Entry.Offset + Entry.Length) / RecordedRequestBlockSize); + uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize); + OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length); + } + else + { + OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex)); + } + + return {.ContentType = Entry.ContentType, .AcceptType = Entry.AcceptType, .SessionId = Oid::Zero}; + } + + std::filesystem::path m_BasePath; + std::vector<RecordedRequest> m_Entries; + std::vector<IoBuffer> m_BlockFiles; +}; + +class DiskRequestRecorder : public IRpcRequestRecorder +{ +public: + DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); } + virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); } + +private: + virtual void RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override + { + m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer); + } + + RecordedRequestsWriter m_RecordedRequests; +}; + +class DiskRequestReplayer : public IRpcRequestReplayer +{ +public: + DiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory) + { + m_RequestCount = m_RequestBuffer.BeginRead(BasePath, InMemory); + } + virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); } + + virtual uint64_t GetRequestCount() const override { return m_RequestCount; } + + virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override + { + return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer); + } + +private: + std::uint64_t m_RequestCount; + RecordedRequestsReader m_RequestBuffer; +}; + +} // namespace zen::cache::v1 + +////////////////////////////////////////////////////////////////////////// + +namespace zen::cache::v2 { + +using namespace std::literals; + +struct RecordedRequest +{ + uint32_t Offset; // 4 bytes + uint32_t Length; // 4 bytes + ZenContentType ContentType; // 1 byte + ZenContentType AcceptType; // 1 byte + uint8_t OffsetHigh; // 1 byte + uint8_t Padding2; // 1 byte + Oid SessionId; // 12 bytes + + inline uint64_t GetOffset() const { return uint64_t(Offset) + (uint64_t(OffsetHigh) << 32); } + inline void SetOffset(uint64_t NewOffset) + { + Offset = gsl::narrow_cast<uint32_t>(NewOffset & 0xffff'ffff); + OffsetHigh = gsl::narrow_cast<uint8_t>(NewOffset >> 32); + } +}; + +static_assert(sizeof(RecordedRequest) == 24); + +const uint64_t RecordedRequestBlockSize = 1 * 1024 * 1024 * 1024; // 1GiB +const uint64_t StandaloneFileSizeThreshold = 16 * 1024 * 1024ull; // 16MiB +const uint64_t SegmentRequestCount = 10 * 1000 * 1000; +const uint64_t LooseFileThreshold = 5000; // Somewhat arbitrary, but we try to keep the + // number of files in a directory below this level + // for performance +const uint64_t SegmentByteThreshold = 16ull * 1024 * 1024 * 1024; +const TimeSpan SegmentTimeThreshold{/* hours */ 1, /* minutes */ 0, /* seconds */ 0}; +const int64_t MaximumBacklogCount = 2000; + +std::string +MakeSegmentPath(uint64_t SegmentIndex) +{ + return fmt::format("segment_{:06}", SegmentIndex); +} + +struct RecordedRequestsSegmentWriter +{ + RecordedRequestsSegmentWriter() = default; + ~RecordedRequestsSegmentWriter() = default; + + RecordedRequestsSegmentWriter(const RecordedRequestsSegmentWriter&) = delete; + RecordedRequestsSegmentWriter& operator=(const RecordedRequestsSegmentWriter&) = delete; + + void BeginWrite(const std::filesystem::path& BasePath, uint64_t SegmentIndex, uint64_t RequestBaseIndex); + void WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer); + void EndWrite(); + + inline uint64_t GetRequestCount() const + { + if (m_RequestCount) + { + return m_RequestCount; + } + + RwLock::SharedLockScope _(m_Lock); + return m_Entries.size(); + } + inline uint64_t GetBaseRequestIndex() const { return m_RequestBaseIndex; } + inline uint64_t GetSegmentIndex() const { return m_SegmentIndex; } + inline uint64_t GetFileCount() const { return m_FileCount; } + inline uint64_t GetRequestsByteCount() const { return m_RequestsByteCount; } + inline DateTime GetStartTime() const { return m_StartTime; } + inline DateTime GetEndTime() const { return m_EndTime; } + +private: + std::filesystem::path m_BasePath; + uint64_t m_SegmentIndex = 0; + uint64_t m_RequestBaseIndex = 0; + uint64_t m_RequestCount = 0; + std::atomic_uint64_t m_FileCount{}; + std::atomic_uint64_t m_RequestsByteCount{}; + mutable RwLock m_Lock; + std::vector<RecordedRequest> m_Entries; + std::vector<std::unique_ptr<BasicFile>> m_BlockFiles; + uint64_t m_ChunkOffset; + DateTime m_StartTime = DateTime::Now(); + DateTime m_EndTime = DateTime::Now(); +}; + +struct RecordedRequestsWriter +{ + RecordedRequestsWriter(); + ~RecordedRequestsWriter(); + + void BeginWrite(const std::filesystem::path& BasePath); + void EndWrite(); + void WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer); + +private: + std::filesystem::path m_BasePath; + zen::DateTime m_StartTime = DateTime::Now(); + + mutable RwLock m_Lock; + std::unique_ptr<RecordedRequestsSegmentWriter> m_CurrentWriter; + std::vector<std::unique_ptr<RecordedRequestsSegmentWriter>> m_FinishedSegments; + std::vector<uint64_t> m_SegmentBaseIndex; + uint64_t m_NextSegmentBaseIndex = 0; + + RecordedRequestsSegmentWriter& EnsureCurrentSegment(); + void CommitCurrentSegment(RwLock::ExclusiveLockScope&); + void WriteRecordingMetadata(); + + // I/O thread state + + struct QueuedRequest + { + RecordedRequestInfo RequestInfo; + IoBuffer RequestBuffer; + }; + + std::unique_ptr<std::thread> m_WriterThread; + std::atomic_bool m_IsWriterReady{false}; + std::atomic_bool m_IsActive{false}; + std::atomic_int64_t m_PendingRequests{0}; + RwLock m_RequestQueueLock; + std::deque<QueuedRequest> m_RequestQueue; + + void WriterThreadMain(); +}; + +////////////////////////////////////////////////////////////////////////// + +struct RecordedRequestsSegmentReader +{ + RecordedRequestsSegmentReader() = default; + + RecordedRequestsSegmentReader(const RecordedRequestsSegmentReader&) = delete; + RecordedRequestsSegmentReader& operator=(const RecordedRequestsSegmentReader&) = delete; + + uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory); + void EndRead(); + RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const; + +private: + std::filesystem::path m_BasePath; + std::vector<RecordedRequest> m_Entries; + std::vector<IoBuffer> m_BlockFiles; +}; + +struct RecordedRequestsReader +{ + uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory); + void EndRead(); + RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const; + +private: + struct SegmentInfo + { + uint64_t SegmentIndex; + uint64_t BaseRequestIndex; + uint64_t RequestCount; + uint64_t RequestBytes; + DateTime StartTime{0}; + DateTime EndTime{0}; + }; + + bool m_InMemory = false; + std::filesystem::path m_BasePath; + std::vector<SegmentInfo> m_KnownSegments; + + mutable RwLock m_SegmentLock; + mutable std::vector<std::unique_ptr<RecordedRequestsSegmentReader>> m_SegmentReaders; +}; + +////////////////////////////////////////////////////////////////////////// + +void +RecordedRequestsSegmentWriter::BeginWrite(const std::filesystem::path& BasePath, uint64_t SegmentIndex, uint64_t RequestBaseIndex) +{ + m_BasePath = BasePath; + m_SegmentIndex = SegmentIndex; + m_RequestBaseIndex = RequestBaseIndex; + CreateDirectories(m_BasePath); +} + +void +RecordedRequestsSegmentWriter::EndWrite() +{ + RwLock::ExclusiveLockScope _(m_Lock); + m_RequestCount = m_Entries.size(); + m_BlockFiles.clear(); + + // Emit some metadata alongside the recording + + try + { + m_EndTime = DateTime::Now(); + TimeSpan Duration{m_EndTime.GetTicks() - m_StartTime.GetTicks()}; + + CbObjectWriter Cbo; + Cbo << "time_start" << m_StartTime << "time_end" << m_EndTime << "duration" << Duration; + Cbo << "segment_index" << m_SegmentIndex; + Cbo << "entry_count" << m_RequestCount << "entry_size" << sizeof(RecordedRequest); + Cbo << "block_size" << RecordedRequestBlockSize << "standalone_threshold" << StandaloneFileSizeThreshold; + + Cbo.BeginObject("system_info"); + Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName(); + Cbo.EndObject(); + CbObject Metadata = Cbo.Save(); + + WriteFile(m_BasePath / "rpc_segment_info.zcb", Metadata.GetBuffer().AsIoBuffer()); + } + catch (const std::exception& Ex) + { + ZEN_WARN("caught exception while writing segment metadata for RPC recording: {}", Ex.what()); + } + + IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest)); + BasicFile IndexFile; + IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate); + std::error_code Ec; + IndexFile.WriteAll(IndexBuffer, Ec); + IndexFile.Close(); + + // note that simply calling `.reset()` here will *not* release backing memory + // and it's important that we do because on high traffic servers this will use + // a lot of memory otherwise + std::vector<RecordedRequest> EmptyEntries; + swap(m_Entries, EmptyEntries); +} + +void +RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) +{ + const uint64_t RequestBufferSize = RequestBuffer.GetSize(); + uint64_t RequestIndex = ~0ull; + + { + RwLock::ExclusiveLockScope Lock(m_Lock); + RequestIndex = m_Entries.size(); + RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0u, + .Length = uint32_t(RequestBufferSize & 0xffffFFFFu), + .ContentType = RequestInfo.ContentType, + .AcceptType = RequestInfo.AcceptType, + .OffsetHigh = 0, + .Padding2 = 0, + .SessionId = RequestInfo.SessionId}); + + if (Entry.Length < StandaloneFileSizeThreshold) + { + const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize); + + if (BlockIndex == m_BlockFiles.size()) + { + std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>()); + NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate); + m_ChunkOffset = BlockIndex * RecordedRequestBlockSize; + ++m_FileCount; + } + + ZEN_ASSERT(BlockIndex < m_BlockFiles.size()); + BasicFile* BlockFile = m_BlockFiles[BlockIndex].get(); + ZEN_ASSERT(BlockFile != nullptr); + + // Note that this is the overall logical offset, not the offset within a single file + const uint64_t ChunkWriteOffset = m_ChunkOffset; + m_ChunkOffset = RoundUp(ChunkWriteOffset + Entry.Length, 1u << 4u); + Entry.SetOffset(ChunkWriteOffset); + Lock.ReleaseNow(); + + std::error_code Ec; + BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), ChunkWriteOffset - (BlockIndex * RecordedRequestBlockSize), Ec); + if (Ec) + { + // We cannot simply use `Entry` here because the vector may + // have been reallocated causing the entry to be in a different + // location + RwLock::ExclusiveLockScope _(m_Lock); + m_Entries[RequestIndex].Length = 0; + return; + } + + m_RequestsByteCount.fetch_add(RequestBuffer.GetSize()); + + return; + } + } + + // Write request data to standalone file + + try + { + BasicFile RequestFile; + RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate); + + if (RequestBufferSize > std::numeric_limits<uint32_t>::max()) + { + // The exact value of the entry is not important, we will use + // the size of the standalone file regardless when performing + // the read + RwLock::ExclusiveLockScope _(m_Lock); + m_Entries[RequestIndex].Length = std::numeric_limits<uint32_t>::max(); + } + + ++m_FileCount; + + std::error_code Ec; + RequestFile.WriteAll(RequestBuffer, Ec); + + if (Ec) + { + RwLock::ExclusiveLockScope _(m_Lock); + m_Entries[RequestIndex].Length = 0; + } + else + { + m_RequestsByteCount.fetch_add(RequestBuffer.GetSize()); + } + } + catch (const std::exception&) + { + RwLock::ExclusiveLockScope _(m_Lock); + m_Entries[RequestIndex].Length = 0; + } +} + +uint64_t +RecordedRequestsSegmentReader::BeginRead(const std::filesystem::path& BasePath, bool InMemory) +{ + m_BasePath = BasePath; + BasicFile IndexFile; + IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kRead); + m_Entries.resize(IndexFile.FileSize() / sizeof(RecordedRequest)); + IndexFile.Read(m_Entries.data(), IndexFile.FileSize(), 0); + uint64_t MaxChunkPosition = 0; + for (const RecordedRequest& R : m_Entries) + { + if (R.Offset != ~0u) + { + MaxChunkPosition = Max(MaxChunkPosition, R.GetOffset() + R.Length); + } + } + uint32_t BlockCount = gsl::narrow<uint32_t>(MaxChunkPosition / RecordedRequestBlockSize) + 1; + m_BlockFiles.resize(BlockCount); + for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; ++BlockIndex) + { + if (InMemory) + { + BasicFile Chunk; + Chunk.Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kRead); + m_BlockFiles[BlockIndex] = Chunk.ReadAll(); + continue; + } + m_BlockFiles[BlockIndex] = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("chunks{}.bin", BlockIndex)); + } + return m_Entries.size(); +} + +void +RecordedRequestsSegmentReader::EndRead() +{ + m_BlockFiles.clear(); +} + +RecordedRequestInfo +RecordedRequestsSegmentReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const +{ + if (RequestIndex >= m_Entries.size()) + { + return RecordedRequestInfo::NullRequest; + } + const RecordedRequest& Entry = m_Entries[RequestIndex]; + if (Entry.Length == 0) + { + return RecordedRequestInfo::NullRequest; + } + + RecordedRequestInfo RequestInfo = {.ContentType = Entry.ContentType, .AcceptType = Entry.AcceptType, .SessionId = Entry.SessionId}; + + if (Entry.Offset != ~0u) + { + // Inline in block file + const uint64_t EntryOffset = Entry.GetOffset(); + const uint32_t BlockIndex = gsl::narrow<uint32_t>((EntryOffset + Entry.Length) / RecordedRequestBlockSize); + const uint64_t ChunkOffset = EntryOffset - (BlockIndex * RecordedRequestBlockSize); + OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length); + + return RequestInfo; + } + + // Standalone file + OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex)); + + return RequestInfo; +} + +////////////////////////////////////////////////////////////////////////// + +RecordedRequestsWriter::RecordedRequestsWriter() +{ +} + +RecordedRequestsWriter::~RecordedRequestsWriter() +{ + EndWrite(); +} + +void +RecordedRequestsWriter::BeginWrite(const std::filesystem::path& BasePath) +{ + m_BasePath = BasePath; + m_IsActive = true; + + m_WriterThread.reset(new std::thread(&RecordedRequestsWriter::WriterThreadMain, this)); + + m_IsWriterReady.wait(false); +} + +void +RecordedRequestsWriter::EndWrite() +{ + if (m_WriterThread) + { + m_IsActive = false; + const int64_t PendingCount = m_PendingRequests.fetch_add(1); + m_PendingRequests.notify_all(); + + if (PendingCount) + { + ZEN_INFO("waiting for RPC recorder writing thread to drain {} pending items", PendingCount); + } + + if (m_WriterThread->joinable()) + { + m_WriterThread->join(); + } + + m_WriterThread.reset(); + } +} + +void +RecordedRequestsWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) +{ + if (m_IsActive) + { + IoBuffer OwnedRequest = RequestBuffer; + OwnedRequest.MakeOwned(); + + { + RwLock::ExclusiveLockScope _(m_RequestQueueLock); + m_RequestQueue.push_back(QueuedRequest{RequestInfo, std::move(OwnedRequest)}); + m_PendingRequests.fetch_add(1); + } + + m_PendingRequests.notify_all(); + } +} + +void +RecordedRequestsWriter::WriterThreadMain() +{ + SetCurrentThreadName("rpc_writer"); + EnsureCurrentSegment(); + + m_IsWriterReady.store(true); + m_IsWriterReady.notify_all(); + + while (m_IsActive) + { + m_PendingRequests.wait(0); + + while (m_PendingRequests) + { + RwLock::ExclusiveLockScope _(m_RequestQueueLock); + if (!m_RequestQueue.empty()) + { + bool DrainBacklog = false; + + do + { + QueuedRequest Request = m_RequestQueue.front(); + + m_RequestQueue.pop_front(); + m_PendingRequests.fetch_sub(1); + + // For a sufficiently large backlog, keep blocking queueing operations + // until we get below the threshold + DrainBacklog = m_RequestQueue.size() >= MaximumBacklogCount; + + if (!DrainBacklog) + { + _.ReleaseNow(); + } + + try + { + RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment(); + Writer.WriteRequest(Request.RequestInfo, Request.RequestBuffer); + } + catch (const std::exception&) + { + // TODO: what's the right behaviour here? The most likely cause would + // be some I/O error and we probably ought to just shut down recording + // at that point + } + } while (DrainBacklog); + } + else + { + // shutdown increments this counter so we need to decrement it + // here even though we didn't process any request + m_PendingRequests.fetch_sub(1); + } + } + } + + RwLock::ExclusiveLockScope _(m_Lock); + CommitCurrentSegment(_); + WriteRecordingMetadata(); +} + +RecordedRequestsSegmentWriter& +RecordedRequestsWriter::EnsureCurrentSegment() +{ + RwLock::ExclusiveLockScope _(m_Lock); + + if (m_CurrentWriter) + { + bool StartNewSegment = false; + + TimeSpan SegmentAge(DateTime::NowTicks() - m_CurrentWriter->GetStartTime().GetTicks()); + + if (m_CurrentWriter->GetRequestCount() >= SegmentRequestCount) + { + ZEN_DEBUG("starting new RPC recording segment due to request count >= {}", SegmentRequestCount); + StartNewSegment = true; + } + else if (m_CurrentWriter->GetFileCount() >= LooseFileThreshold) + { + ZEN_DEBUG("starting new RPC recording segment due to file count >= {}", LooseFileThreshold); + StartNewSegment = true; + } + else if (m_CurrentWriter->GetRequestsByteCount() >= SegmentByteThreshold) + { + ZEN_DEBUG("starting new RPC recording segment due to footprint >= {} bytes", SegmentByteThreshold); + StartNewSegment = true; + } + else if (SegmentAge >= SegmentTimeThreshold) + { + ZEN_DEBUG("starting new RPC recording segment due to age >= {}", + NiceTimeSpanMs(SegmentTimeThreshold.GetTicks() / TimeSpan::TicksPerMillisecond)); + StartNewSegment = true; + } + + if (StartNewSegment) + { + CommitCurrentSegment(_); + } + } + + if (!m_CurrentWriter) + { + const uint64_t SegmentIndex = m_FinishedSegments.size(); + m_CurrentWriter = std::make_unique<RecordedRequestsSegmentWriter>(); + + m_CurrentWriter->BeginWrite(m_BasePath / MakeSegmentPath(SegmentIndex), SegmentIndex, m_NextSegmentBaseIndex); + } + + return *m_CurrentWriter; +} + +void +RecordedRequestsWriter::CommitCurrentSegment(RwLock::ExclusiveLockScope&) +{ + if (m_CurrentWriter) + { + RecordedRequestsSegmentWriter& Writer = *m_CurrentWriter; + m_FinishedSegments.push_back(std::move(m_CurrentWriter)); + m_SegmentBaseIndex.push_back(m_NextSegmentBaseIndex); + m_NextSegmentBaseIndex += Writer.GetRequestCount(); + + Writer.EndWrite(); + } +} + +void +RecordedRequestsWriter::WriteRecordingMetadata() +{ + try + { + DateTime EndTime = DateTime::Now(); + TimeSpan Duration{EndTime.GetTicks() - m_StartTime.GetTicks()}; + + CbObjectWriter Cbo; + Cbo << "time_start" << m_StartTime << "time_end" << EndTime << "duration" << Duration << "format_version" << 2; + + Cbo.BeginObject("system_info"); + Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName(); + Cbo << "segment_count" << m_FinishedSegments.size(); + Cbo << "block_size" << RecordedRequestBlockSize; + Cbo << "standalone_threshold" << StandaloneFileSizeThreshold; + Cbo << "segment_request_count" << SegmentRequestCount; + Cbo << "segment_file_threshold" << LooseFileThreshold; + Cbo << "segment_byte_threshold" << SegmentByteThreshold; + + Describe(GetSystemMetrics(), Cbo); + Cbo.EndObject(); + + Cbo.BeginArray("segments"); + + for (auto& Segment : m_FinishedSegments) + { + Cbo.BeginObject(); + Cbo << "segment" << Segment->GetSegmentIndex(); + Cbo << "base_index" << Segment->GetBaseRequestIndex(); + Cbo << "request_count" << Segment->GetRequestCount(); + Cbo << "request_bytes" << Segment->GetRequestsByteCount(); + Cbo << "start_time" << Segment->GetStartTime(); + Cbo << "end_time" << Segment->GetEndTime(); + Cbo.EndObject(); + } + + Cbo.EndArray(); + + CbObject Metadata = Cbo.Save(); + + WriteFile(m_BasePath / "rpc_recording_info.zcb", Metadata.GetBuffer().AsIoBuffer()); + } + catch (const std::exception& Ex) + { + ZEN_WARN("caught exception while writing metadata for RPC recording: {}", Ex.what()); + } +} + +////////////////////////////////////////////////////////////////////////// + +uint64_t +RecordedRequestsReader::BeginRead(const std::filesystem::path& BasePath, bool InMemory) +{ + m_InMemory = InMemory; + m_BasePath = BasePath; + + std::error_code Ec; + BasicFile InfoFile; + InfoFile.Open(m_BasePath / "rpc_recording_info.zcb", BasicFile::Mode::kRead, Ec); + + if (!Ec) + { + try + { + CbObject CbInfo = LoadCompactBinaryObject(InfoFile.ReadAll()); + + uint64_t TotalRequestCount = 0; + uint64_t MaxSegmentIndex = 0; + + for (auto SegmentElement : CbInfo["segments"]) + { + CbObjectView Segment = SegmentElement.AsObjectView(); + + const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment"sv].AsUInt64(), + .BaseRequestIndex = Segment["base_index"sv].AsUInt64(), + .RequestCount = Segment["request_count"sv].AsUInt64(), + .RequestBytes = Segment["request_bytes"sv].AsUInt64(), + .StartTime = Segment["start_time"sv].AsDateTime(), + .EndTime = Segment["end_time"sv].AsDateTime()}); + + TotalRequestCount += Info.RequestCount; + MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex); + } + + m_SegmentReaders.resize(MaxSegmentIndex + 1); + + return TotalRequestCount; + } + catch (const std::exception& Ex) + { + ZEN_WARN("could not read metadata file: {}", Ex.what()); + } + } + + ZEN_INFO("recovering segment info for '{}'", BasePath); + + uint64_t TotalRequestCount = 0; + uint64_t MaxSegmentIndex = 0; + + try + { + for (int SegmentIndex = 0;; ++SegmentIndex) + { + const std::filesystem::path ZcbPath = BasePath / MakeSegmentPath(SegmentIndex) / "rpc_segment_info.zcb"; + FileContents Fc = ReadFile(ZcbPath); + + if (Fc.ErrorCode) + break; + + if (IoBuffer SegmentInfoBuffer = Fc.Flatten()) + { + CbObject Segment = LoadCompactBinaryObject(SegmentInfoBuffer); + + const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment_index"sv].AsUInt64(), + .BaseRequestIndex = 0, + .RequestCount = Segment["entry_count"sv].AsUInt64(), + .RequestBytes = 0, + .StartTime = Segment["time_start"sv].AsDateTime(), + .EndTime = Segment["time_end"sv].AsDateTime()}); + + TotalRequestCount += Info.RequestCount; + MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex); + } + } + } + catch (const std::exception&) + { + } + + std::sort(begin(m_KnownSegments), end(m_KnownSegments), [](const auto& Lhs, const auto& Rhs) { + return Lhs.SegmentIndex < Rhs.SegmentIndex; + }); + + uint64_t SegmentRequestOffset = 0; + + for (SegmentInfo& Info : m_KnownSegments) + { + Info.BaseRequestIndex = SegmentRequestOffset; + SegmentRequestOffset += Info.RequestCount; + } + + m_SegmentReaders.resize(MaxSegmentIndex + 1); + + return TotalRequestCount; +} + +void +RecordedRequestsReader::EndRead() +{ + RwLock::ExclusiveLockScope _(m_SegmentLock); + m_KnownSegments.clear(); + m_SegmentReaders.clear(); + m_BasePath.clear(); +} + +RecordedRequestInfo +RecordedRequestsReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const +{ + auto EnsureSegment = [&](uint64_t SegmentIndex) -> const RecordedRequestsSegmentReader& { + { + RwLock::SharedLockScope _(m_SegmentLock); + + if (auto SegmentReaderPtr = m_SegmentReaders[SegmentIndex].get()) + { + return *SegmentReaderPtr; + } + } + + RwLock::ExclusiveLockScope _(m_SegmentLock); + + auto& SegmentReaderPtr = m_SegmentReaders[SegmentIndex]; + + if (!SegmentReaderPtr) + { + RecordedRequestsSegmentReader* NewSegment = new RecordedRequestsSegmentReader; + NewSegment->BeginRead(m_BasePath / MakeSegmentPath(SegmentIndex), m_InMemory); + SegmentReaderPtr.reset(NewSegment); + } + + return *(SegmentReaderPtr.get()); + }; + + for (auto& Segment : m_KnownSegments) + { + if (Segment.RequestCount > RequestIndex) + { + return EnsureSegment(Segment.SegmentIndex).ReadRequest(RequestIndex, OutBuffer); + } + else + { + RequestIndex -= Segment.RequestCount; + } + } + + return RecordedRequestInfo::NullRequest; +} + +////////////////////////////////////////////////////////////////////////// + +class DiskRequestRecorder : public IRpcRequestRecorder +{ +public: + DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); } + virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); } + + virtual void RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override + { + m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer); + } + +private: + RecordedRequestsWriter m_RecordedRequests; +}; + +class DiskRequestReplayer : public IRpcRequestReplayer +{ +public: + DiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory) + { + m_RequestCount = m_RequestReader.BeginRead(BasePath, InMemory); + } + virtual ~DiskRequestReplayer() { m_RequestReader.EndRead(); } + + static bool IsCompatible(const std::filesystem::path& BasePath) + { + if (IsFile(BasePath / "rpc_recording_info.zcb")) + { + return true; + } + + const std::filesystem::path SegmentZero = BasePath / MakeSegmentPath(0); + + if (IsFile(SegmentZero / "rpc_segment_info.zcb") && IsFile(SegmentZero / "index.bin")) + { + // top-level metadata is missing, possibly because of premature exit + // on the recording side + + return true; + } + + return false; + } + + virtual uint64_t GetRequestCount() const override { return m_RequestCount; } + + virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override + { + return m_RequestReader.ReadRequest(RequestIndex, OutBuffer); + } + +private: + std::uint64_t m_RequestCount; + RecordedRequestsReader m_RequestReader; +}; + +} // namespace zen::cache::v2 + +////////////////////////////////////////////////////////////////////////// + +namespace zen::cache { + +std::unique_ptr<cache::IRpcRequestRecorder> +MakeDiskRequestRecorder(const std::filesystem::path& BasePath) +{ + return std::make_unique<v2::DiskRequestRecorder>(BasePath); +} + +std::unique_ptr<cache::IRpcRequestReplayer> +MakeDiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory) +{ + if (v2::DiskRequestReplayer::IsCompatible(BasePath)) + { + return std::make_unique<v2::DiskRequestReplayer>(BasePath, InMemory); + } + else + { + return std::make_unique<v1::DiskRequestReplayer>(BasePath, InMemory); + } +} + +#if ZEN_WITH_TESTS + +void +rpcrecord_forcelink() +{ +} + +TEST_SUITE_BEGIN("rpc.recording"); + +TEST_CASE("rpc.record") +{ + ScopedTemporaryDirectory TempDir; + auto Path = TempDir.Path(); + + const Oid SessionId = GetSessionId(); + + using namespace std::literals; + + { + cache::v2::DiskRequestRecorder Recorder{Path}; + + for (int i = 0; i < 1000; ++i) + { + RecordedRequestInfo RequestInfo{.ContentType = ZenContentType::kCbObject, + .AcceptType = ZenContentType::kCbObject, + .SessionId = SessionId}; + + CbObjectWriter RequestPayload; + RequestPayload << "test"sv << true; + RequestPayload << "index"sv << i; + CbObject Req = RequestPayload.Save(); + IoBuffer RequestBuffer = Req.GetBuffer().AsIoBuffer(); + + Recorder.RecordRequest(RequestInfo, RequestBuffer); + } + } + + { + cache::v2::DiskRequestReplayer Replayer{Path, false}; + + for (int i = 0; i < 1000; ++i) + { + IoBuffer RequestBuffer; + RecordedRequestInfo RequestInfo = Replayer.GetRequest(i, RequestBuffer); + + CHECK(RequestInfo.AcceptType == ZenContentType::kCbObject); + CHECK(RequestInfo.ContentType == ZenContentType::kCbObject); + CHECK(RequestInfo.SessionId == SessionId); + + CbObject Req = LoadCompactBinaryObject(RequestBuffer); + CHECK_EQ(Req["index"sv].AsInt32(), i); + CHECK_EQ(Req["test"sv].AsBool(), true); + } + } +} + +TEST_SUITE_END(); + +#endif + +} // namespace zen::cache |