diff options
| author | Dan Engelbrecht <[email protected]> | 2025-10-03 17:17:44 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-03 17:17:44 +0200 |
| commit | f8dae0f66d17a904dfa4a54771df031b62bce10e (patch) | |
| tree | 5f268e0f3189d8f0be9230dd67b217ca021ca54a /src/zenutil/cache | |
| parent | cacherequests helpers test only (#551) (diff) | |
| download | zen-f8dae0f66d17a904dfa4a54771df031b62bce10e.tar.xz zen-f8dae0f66d17a904dfa4a54771df031b62bce10e.zip | |
move rpcrecorder out from cache subfolder (#552)
Diffstat (limited to 'src/zenutil/cache')
| -rw-r--r-- | src/zenutil/cache/rpcrecording.cpp | 1175 |
1 files changed, 0 insertions, 1175 deletions
diff --git a/src/zenutil/cache/rpcrecording.cpp b/src/zenutil/cache/rpcrecording.cpp deleted file mode 100644 index 46e80f6b7..000000000 --- a/src/zenutil/cache/rpcrecording.cpp +++ /dev/null @@ -1,1175 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include <zenutil/cache/rpcrecording.h> - -#include <zencore/basicfile.h> -#include <zencore/compactbinarybuilder.h> -#include <zencore/filesystem.h> -#include <zencore/fmtutils.h> -#include <zencore/logging.h> -#include <zencore/session.h> -#include <zencore/system.h> -#include <zencore/testing.h> -#include <zencore/testutils.h> - -ZEN_THIRD_PARTY_INCLUDES_START -#include <fmt/format.h> -#include <gsl/gsl-lite.hpp> -ZEN_THIRD_PARTY_INCLUDES_END - -#include <deque> -#include <thread> - -namespace zen::cache { - -const RecordedRequestInfo RecordedRequestInfo::NullRequest = {.ContentType = ZenContentType::kUnknownContentType, - .AcceptType = ZenContentType::kUnknownContentType, - .SessionId = Oid::Zero}; - -} - -namespace zen::cache::v1 { - -struct RecordedRequest -{ - uint64_t Offset; - uint64_t Length; - ZenContentType ContentType; - ZenContentType AcceptType; -}; - -const uint64_t RecordedRequestBlockSize = 1ull << 31u; -const uint64_t StandaloneFileSizeThreshold = 1 * 1024 * 1024ull; - -struct RecordedRequestsWriter -{ - void BeginWrite(const std::filesystem::path& BasePath) - { - m_BasePath = BasePath; - CreateDirectories(m_BasePath); - } - - void EndWrite() - { - RwLock::ExclusiveLockScope _(m_Lock); - m_BlockFiles.clear(); - - try - { - // Emit some metadata alongside the recording - - DateTime EndTime = DateTime::Now(); - TimeSpan Duration{EndTime.GetTicks() - m_StartTime.GetTicks()}; - - CbObjectWriter Cbo; - Cbo << "time_start" << m_StartTime << "time_end" << EndTime << "duration" << Duration; - Cbo << "entry_count" << m_Entries.size() << "entry_size" << sizeof(RecordedRequest); - Cbo << "block_size" << RecordedRequestBlockSize << "standalone_threshold" << StandaloneFileSizeThreshold; - - Cbo.BeginObject("system_info"); - Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName(); - Describe(GetSystemMetrics(), Cbo); - Cbo.EndObject(); - CbObject Metadata = Cbo.Save(); - - WriteFile(m_BasePath / "rpc_recording_metadata.zcb", Metadata.GetBuffer().AsIoBuffer()); - } - catch (const std::exception& Ex) - { - ZEN_WARN("caught exception while generating metadata for RPC recording: {}", Ex.what()); - } - - IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest)); - BasicFile IndexFile; - IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate); - std::error_code Ec; - IndexFile.WriteAll(IndexBuffer, Ec); - IndexFile.Close(); - m_Entries.clear(); - } - - void WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) - { - RwLock::ExclusiveLockScope Lock(m_Lock); - uint64_t RequestIndex = m_Entries.size(); - RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0ull, - .Length = RequestBuffer.Size(), - .ContentType = RequestInfo.ContentType, - .AcceptType = RequestInfo.AcceptType}); - - if (Entry.Length < StandaloneFileSizeThreshold) - { - const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize); - if (BlockIndex == m_BlockFiles.size()) - { - std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>()); - NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate); - m_ChunkOffset = BlockIndex * RecordedRequestBlockSize; - } - ZEN_ASSERT(BlockIndex < m_BlockFiles.size()); - BasicFile* BlockFile = m_BlockFiles[BlockIndex].get(); - ZEN_ASSERT(BlockFile != nullptr); - - Entry.Offset = m_ChunkOffset; - m_ChunkOffset = RoundUp(m_ChunkOffset + Entry.Length, 1u << 4u); - Lock.ReleaseNow(); - - std::error_code Ec; - BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), Entry.Offset - BlockIndex * RecordedRequestBlockSize, Ec); - if (Ec) - { - Entry.Length = 0; - } - } - else - { - Lock.ReleaseNow(); - - BasicFile RequestFile; - RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate); - std::error_code Ec; - RequestFile.WriteAll(RequestBuffer, Ec); - if (Ec) - { - Entry.Length = 0; - } - } - } - - std::filesystem::path m_BasePath; - mutable RwLock m_Lock; - std::vector<RecordedRequest> m_Entries; - std::vector<std::unique_ptr<BasicFile>> m_BlockFiles; - uint64_t m_ChunkOffset; - zen::DateTime m_StartTime = DateTime::Now(); -}; - -struct RecordedRequestsReader -{ - uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory) - { - m_BasePath = BasePath; - BasicFile IndexFile; - IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kRead); - m_Entries.resize(IndexFile.FileSize() / sizeof(RecordedRequest)); - IndexFile.Read(m_Entries.data(), IndexFile.FileSize(), 0); - uint64_t MaxChunkPosition = 0; - for (const RecordedRequest& R : m_Entries) - { - if (R.Offset != ~0ull) - { - MaxChunkPosition = Max(MaxChunkPosition, R.Offset + R.Length); - } - } - uint32_t BlockCount = gsl::narrow<uint32_t>(MaxChunkPosition / RecordedRequestBlockSize) + 1; - m_BlockFiles.resize(BlockCount); - for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; ++BlockIndex) - { - if (InMemory) - { - BasicFile Chunk; - Chunk.Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kRead); - m_BlockFiles[BlockIndex] = Chunk.ReadAll(); - continue; - } - m_BlockFiles[BlockIndex] = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("chunks{}.bin", BlockIndex)); - } - return m_Entries.size(); - } - void EndRead() { m_BlockFiles.clear(); } - - RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const - { - if (RequestIndex >= m_Entries.size()) - { - return RecordedRequestInfo::NullRequest; - } - - const RecordedRequest& Entry = m_Entries[RequestIndex]; - if (Entry.Length == 0) - { - return RecordedRequestInfo::NullRequest; - } - - if (Entry.Offset != ~0ull) - { - uint32_t BlockIndex = gsl::narrow<uint32_t>((Entry.Offset + Entry.Length) / RecordedRequestBlockSize); - uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize); - OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length); - } - else - { - OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex)); - } - - return {.ContentType = Entry.ContentType, .AcceptType = Entry.AcceptType, .SessionId = Oid::Zero}; - } - - std::filesystem::path m_BasePath; - std::vector<RecordedRequest> m_Entries; - std::vector<IoBuffer> m_BlockFiles; -}; - -class DiskRequestRecorder : public IRpcRequestRecorder -{ -public: - DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); } - virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); } - -private: - virtual void RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override - { - m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer); - } - - RecordedRequestsWriter m_RecordedRequests; -}; - -class DiskRequestReplayer : public IRpcRequestReplayer -{ -public: - DiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory) - { - m_RequestCount = m_RequestBuffer.BeginRead(BasePath, InMemory); - } - virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); } - - virtual uint64_t GetRequestCount() const override { return m_RequestCount; } - - virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override - { - return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer); - } - -private: - std::uint64_t m_RequestCount; - RecordedRequestsReader m_RequestBuffer; -}; - -} // namespace zen::cache::v1 - -////////////////////////////////////////////////////////////////////////// - -namespace zen::cache::v2 { - -using namespace std::literals; - -struct RecordedRequest -{ - uint32_t Offset; // 4 bytes - uint32_t Length; // 4 bytes - ZenContentType ContentType; // 1 byte - ZenContentType AcceptType; // 1 byte - uint8_t OffsetHigh; // 1 byte - uint8_t Padding2; // 1 byte - Oid SessionId; // 12 bytes - - inline uint64_t GetOffset() const { return uint64_t(Offset) + (uint64_t(OffsetHigh) << 32); } - inline void SetOffset(uint64_t NewOffset) - { - Offset = gsl::narrow_cast<uint32_t>(NewOffset & 0xffff'ffff); - OffsetHigh = gsl::narrow_cast<uint8_t>(NewOffset >> 32); - } -}; - -static_assert(sizeof(RecordedRequest) == 24); - -const uint64_t RecordedRequestBlockSize = 1 * 1024 * 1024 * 1024; // 1GiB -const uint64_t StandaloneFileSizeThreshold = 16 * 1024 * 1024ull; // 16MiB -const uint64_t SegmentRequestCount = 10 * 1000 * 1000; -const uint64_t LooseFileThreshold = 5000; // Somewhat arbitrary, but we try to keep the - // number of files in a directory below this level - // for performance -const uint64_t SegmentByteThreshold = 16ull * 1024 * 1024 * 1024; -const TimeSpan SegmentTimeThreshold{/* hours */ 1, /* minutes */ 0, /* seconds */ 0}; -const int64_t MaximumBacklogCount = 2000; - -std::string -MakeSegmentPath(uint64_t SegmentIndex) -{ - return fmt::format("segment_{:06}", SegmentIndex); -} - -struct RecordedRequestsSegmentWriter -{ - RecordedRequestsSegmentWriter() = default; - ~RecordedRequestsSegmentWriter() = default; - - RecordedRequestsSegmentWriter(const RecordedRequestsSegmentWriter&) = delete; - RecordedRequestsSegmentWriter& operator=(const RecordedRequestsSegmentWriter&) = delete; - - void BeginWrite(const std::filesystem::path& BasePath, uint64_t SegmentIndex, uint64_t RequestBaseIndex); - void WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer); - void EndWrite(); - - inline uint64_t GetRequestCount() const - { - if (m_RequestCount) - { - return m_RequestCount; - } - - RwLock::SharedLockScope _(m_Lock); - return m_Entries.size(); - } - inline uint64_t GetBaseRequestIndex() const { return m_RequestBaseIndex; } - inline uint64_t GetSegmentIndex() const { return m_SegmentIndex; } - inline uint64_t GetFileCount() const { return m_FileCount; } - inline uint64_t GetRequestsByteCount() const { return m_RequestsByteCount; } - inline DateTime GetStartTime() const { return m_StartTime; } - inline DateTime GetEndTime() const { return m_EndTime; } - -private: - std::filesystem::path m_BasePath; - uint64_t m_SegmentIndex = 0; - uint64_t m_RequestBaseIndex = 0; - uint64_t m_RequestCount = 0; - std::atomic_uint64_t m_FileCount{}; - std::atomic_uint64_t m_RequestsByteCount{}; - mutable RwLock m_Lock; - std::vector<RecordedRequest> m_Entries; - std::vector<std::unique_ptr<BasicFile>> m_BlockFiles; - uint64_t m_ChunkOffset; - DateTime m_StartTime = DateTime::Now(); - DateTime m_EndTime = DateTime::Now(); -}; - -struct RecordedRequestsWriter -{ - RecordedRequestsWriter(); - ~RecordedRequestsWriter(); - - void BeginWrite(const std::filesystem::path& BasePath); - void EndWrite(); - void WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer); - -private: - std::filesystem::path m_BasePath; - zen::DateTime m_StartTime = DateTime::Now(); - - mutable RwLock m_Lock; - std::unique_ptr<RecordedRequestsSegmentWriter> m_CurrentWriter; - std::vector<std::unique_ptr<RecordedRequestsSegmentWriter>> m_FinishedSegments; - std::vector<uint64_t> m_SegmentBaseIndex; - uint64_t m_NextSegmentBaseIndex = 0; - - RecordedRequestsSegmentWriter& EnsureCurrentSegment(); - void CommitCurrentSegment(RwLock::ExclusiveLockScope&); - void WriteRecordingMetadata(); - - // I/O thread state - - struct QueuedRequest - { - RecordedRequestInfo RequestInfo; - IoBuffer RequestBuffer; - }; - - std::unique_ptr<std::thread> m_WriterThread; - std::atomic_bool m_IsWriterReady{false}; - std::atomic_bool m_IsActive{false}; - std::atomic_int64_t m_PendingRequests{0}; - RwLock m_RequestQueueLock; - std::deque<QueuedRequest> m_RequestQueue; - - void WriterThreadMain(); -}; - -////////////////////////////////////////////////////////////////////////// - -struct RecordedRequestsSegmentReader -{ - RecordedRequestsSegmentReader() = default; - - RecordedRequestsSegmentReader(const RecordedRequestsSegmentReader&) = delete; - RecordedRequestsSegmentReader& operator=(const RecordedRequestsSegmentReader&) = delete; - - uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory); - void EndRead(); - RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const; - -private: - std::filesystem::path m_BasePath; - std::vector<RecordedRequest> m_Entries; - std::vector<IoBuffer> m_BlockFiles; -}; - -struct RecordedRequestsReader -{ - uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory); - void EndRead(); - RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const; - -private: - struct SegmentInfo - { - uint64_t SegmentIndex; - uint64_t BaseRequestIndex; - uint64_t RequestCount; - uint64_t RequestBytes; - DateTime StartTime{0}; - DateTime EndTime{0}; - }; - - bool m_InMemory = false; - std::filesystem::path m_BasePath; - std::vector<SegmentInfo> m_KnownSegments; - - mutable RwLock m_SegmentLock; - mutable std::vector<std::unique_ptr<RecordedRequestsSegmentReader>> m_SegmentReaders; -}; - -////////////////////////////////////////////////////////////////////////// - -void -RecordedRequestsSegmentWriter::BeginWrite(const std::filesystem::path& BasePath, uint64_t SegmentIndex, uint64_t RequestBaseIndex) -{ - m_BasePath = BasePath; - m_SegmentIndex = SegmentIndex; - m_RequestBaseIndex = RequestBaseIndex; - CreateDirectories(m_BasePath); -} - -void -RecordedRequestsSegmentWriter::EndWrite() -{ - RwLock::ExclusiveLockScope _(m_Lock); - m_RequestCount = m_Entries.size(); - m_BlockFiles.clear(); - - // Emit some metadata alongside the recording - - try - { - m_EndTime = DateTime::Now(); - TimeSpan Duration{m_EndTime.GetTicks() - m_StartTime.GetTicks()}; - - CbObjectWriter Cbo; - Cbo << "time_start" << m_StartTime << "time_end" << m_EndTime << "duration" << Duration; - Cbo << "segment_index" << m_SegmentIndex; - Cbo << "entry_count" << m_RequestCount << "entry_size" << sizeof(RecordedRequest); - Cbo << "block_size" << RecordedRequestBlockSize << "standalone_threshold" << StandaloneFileSizeThreshold; - - Cbo.BeginObject("system_info"); - Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName(); - Cbo.EndObject(); - CbObject Metadata = Cbo.Save(); - - WriteFile(m_BasePath / "rpc_segment_info.zcb", Metadata.GetBuffer().AsIoBuffer()); - } - catch (const std::exception& Ex) - { - ZEN_WARN("caught exception while writing segment metadata for RPC recording: {}", Ex.what()); - } - - IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest)); - BasicFile IndexFile; - IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate); - std::error_code Ec; - IndexFile.WriteAll(IndexBuffer, Ec); - IndexFile.Close(); - - // note that simply calling `.reset()` here will *not* release backing memory - // and it's important that we do because on high traffic servers this will use - // a lot of memory otherwise - std::vector<RecordedRequest> EmptyEntries; - swap(m_Entries, EmptyEntries); -} - -void -RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) -{ - const uint64_t RequestBufferSize = RequestBuffer.GetSize(); - uint64_t RequestIndex = ~0ull; - - { - RwLock::ExclusiveLockScope Lock(m_Lock); - RequestIndex = m_Entries.size(); - RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0u, - .Length = uint32_t(RequestBufferSize & 0xffffFFFFu), - .ContentType = RequestInfo.ContentType, - .AcceptType = RequestInfo.AcceptType, - .OffsetHigh = 0, - .Padding2 = 0, - .SessionId = RequestInfo.SessionId}); - - if (Entry.Length < StandaloneFileSizeThreshold) - { - const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize); - - if (BlockIndex == m_BlockFiles.size()) - { - std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>()); - NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate); - m_ChunkOffset = BlockIndex * RecordedRequestBlockSize; - ++m_FileCount; - } - - ZEN_ASSERT(BlockIndex < m_BlockFiles.size()); - BasicFile* BlockFile = m_BlockFiles[BlockIndex].get(); - ZEN_ASSERT(BlockFile != nullptr); - - // Note that this is the overall logical offset, not the offset within a single file - const uint64_t ChunkWriteOffset = m_ChunkOffset; - m_ChunkOffset = RoundUp(ChunkWriteOffset + Entry.Length, 1u << 4u); - Entry.SetOffset(ChunkWriteOffset); - Lock.ReleaseNow(); - - std::error_code Ec; - BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), ChunkWriteOffset - (BlockIndex * RecordedRequestBlockSize), Ec); - if (Ec) - { - // We cannot simply use `Entry` here because the vector may - // have been reallocated causing the entry to be in a different - // location - RwLock::ExclusiveLockScope _(m_Lock); - m_Entries[RequestIndex].Length = 0; - return; - } - - m_RequestsByteCount.fetch_add(RequestBuffer.GetSize()); - - return; - } - } - - // Write request data to standalone file - - try - { - BasicFile RequestFile; - RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate); - - if (RequestBufferSize > std::numeric_limits<uint32_t>::max()) - { - // The exact value of the entry is not important, we will use - // the size of the standalone file regardless when performing - // the read - RwLock::ExclusiveLockScope _(m_Lock); - m_Entries[RequestIndex].Length = std::numeric_limits<uint32_t>::max(); - } - - ++m_FileCount; - - std::error_code Ec; - RequestFile.WriteAll(RequestBuffer, Ec); - - if (Ec) - { - RwLock::ExclusiveLockScope _(m_Lock); - m_Entries[RequestIndex].Length = 0; - } - else - { - m_RequestsByteCount.fetch_add(RequestBuffer.GetSize()); - } - } - catch (const std::exception&) - { - RwLock::ExclusiveLockScope _(m_Lock); - m_Entries[RequestIndex].Length = 0; - } -} - -uint64_t -RecordedRequestsSegmentReader::BeginRead(const std::filesystem::path& BasePath, bool InMemory) -{ - m_BasePath = BasePath; - BasicFile IndexFile; - IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kRead); - m_Entries.resize(IndexFile.FileSize() / sizeof(RecordedRequest)); - IndexFile.Read(m_Entries.data(), IndexFile.FileSize(), 0); - uint64_t MaxChunkPosition = 0; - for (const RecordedRequest& R : m_Entries) - { - if (R.Offset != ~0u) - { - MaxChunkPosition = Max(MaxChunkPosition, R.GetOffset() + R.Length); - } - } - uint32_t BlockCount = gsl::narrow<uint32_t>(MaxChunkPosition / RecordedRequestBlockSize) + 1; - m_BlockFiles.resize(BlockCount); - for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; ++BlockIndex) - { - if (InMemory) - { - BasicFile Chunk; - Chunk.Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kRead); - m_BlockFiles[BlockIndex] = Chunk.ReadAll(); - continue; - } - m_BlockFiles[BlockIndex] = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("chunks{}.bin", BlockIndex)); - } - return m_Entries.size(); -} - -void -RecordedRequestsSegmentReader::EndRead() -{ - m_BlockFiles.clear(); -} - -RecordedRequestInfo -RecordedRequestsSegmentReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const -{ - if (RequestIndex >= m_Entries.size()) - { - return RecordedRequestInfo::NullRequest; - } - const RecordedRequest& Entry = m_Entries[RequestIndex]; - if (Entry.Length == 0) - { - return RecordedRequestInfo::NullRequest; - } - - RecordedRequestInfo RequestInfo = {.ContentType = Entry.ContentType, .AcceptType = Entry.AcceptType, .SessionId = Entry.SessionId}; - - if (Entry.Offset != ~0u) - { - // Inline in block file - const uint64_t EntryOffset = Entry.GetOffset(); - const uint32_t BlockIndex = gsl::narrow<uint32_t>((EntryOffset + Entry.Length) / RecordedRequestBlockSize); - const uint64_t ChunkOffset = EntryOffset - (BlockIndex * RecordedRequestBlockSize); - OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length); - - return RequestInfo; - } - - // Standalone file - OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex)); - - return RequestInfo; -} - -////////////////////////////////////////////////////////////////////////// - -RecordedRequestsWriter::RecordedRequestsWriter() -{ -} - -RecordedRequestsWriter::~RecordedRequestsWriter() -{ - EndWrite(); -} - -void -RecordedRequestsWriter::BeginWrite(const std::filesystem::path& BasePath) -{ - m_BasePath = BasePath; - m_IsActive = true; - - m_WriterThread.reset(new std::thread(&RecordedRequestsWriter::WriterThreadMain, this)); - - m_IsWriterReady.wait(false); -} - -void -RecordedRequestsWriter::EndWrite() -{ - if (m_WriterThread) - { - m_IsActive = false; - const int64_t PendingCount = m_PendingRequests.fetch_add(1); - m_PendingRequests.notify_all(); - - if (PendingCount) - { - ZEN_INFO("waiting for RPC recorder writing thread to drain {} pending items", PendingCount); - } - - if (m_WriterThread->joinable()) - { - m_WriterThread->join(); - } - - m_WriterThread.reset(); - } -} - -void -RecordedRequestsWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) -{ - if (m_IsActive) - { - IoBuffer OwnedRequest = RequestBuffer; - OwnedRequest.MakeOwned(); - - { - RwLock::ExclusiveLockScope _(m_RequestQueueLock); - m_RequestQueue.push_back(QueuedRequest{RequestInfo, std::move(OwnedRequest)}); - m_PendingRequests.fetch_add(1); - } - - m_PendingRequests.notify_all(); - } -} - -void -RecordedRequestsWriter::WriterThreadMain() -{ - SetCurrentThreadName("rpc_writer"); - EnsureCurrentSegment(); - - m_IsWriterReady.store(true); - m_IsWriterReady.notify_all(); - - while (m_IsActive) - { - m_PendingRequests.wait(0); - - while (m_PendingRequests) - { - RwLock::ExclusiveLockScope _(m_RequestQueueLock); - if (!m_RequestQueue.empty()) - { - bool DrainBacklog = false; - - do - { - QueuedRequest Request = m_RequestQueue.front(); - - m_RequestQueue.pop_front(); - m_PendingRequests.fetch_sub(1); - - // For a sufficiently large backlog, keep blocking queueing operations - // until we get below the threshold - DrainBacklog = m_RequestQueue.size() >= MaximumBacklogCount; - - if (!DrainBacklog) - { - _.ReleaseNow(); - } - - try - { - RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment(); - Writer.WriteRequest(Request.RequestInfo, Request.RequestBuffer); - } - catch (const std::exception&) - { - // TODO: what's the right behaviour here? The most likely cause would - // be some I/O error and we probably ought to just shut down recording - // at that point - } - } while (DrainBacklog); - } - else - { - // shutdown increments this counter so we need to decrement it - // here even though we didn't process any request - m_PendingRequests.fetch_sub(1); - } - } - } - - RwLock::ExclusiveLockScope _(m_Lock); - CommitCurrentSegment(_); - WriteRecordingMetadata(); -} - -RecordedRequestsSegmentWriter& -RecordedRequestsWriter::EnsureCurrentSegment() -{ - RwLock::ExclusiveLockScope _(m_Lock); - - if (m_CurrentWriter) - { - bool StartNewSegment = false; - - TimeSpan SegmentAge(DateTime::NowTicks() - m_CurrentWriter->GetStartTime().GetTicks()); - - if (m_CurrentWriter->GetRequestCount() >= SegmentRequestCount) - { - ZEN_DEBUG("starting new RPC recording segment due to request count >= {}", SegmentRequestCount); - StartNewSegment = true; - } - else if (m_CurrentWriter->GetFileCount() >= LooseFileThreshold) - { - ZEN_DEBUG("starting new RPC recording segment due to file count >= {}", LooseFileThreshold); - StartNewSegment = true; - } - else if (m_CurrentWriter->GetRequestsByteCount() >= SegmentByteThreshold) - { - ZEN_DEBUG("starting new RPC recording segment due to footprint >= {} bytes", SegmentByteThreshold); - StartNewSegment = true; - } - else if (SegmentAge >= SegmentTimeThreshold) - { - ZEN_DEBUG("starting new RPC recording segment due to age >= {}", - NiceTimeSpanMs(SegmentTimeThreshold.GetTicks() / TimeSpan::TicksPerMillisecond)); - StartNewSegment = true; - } - - if (StartNewSegment) - { - CommitCurrentSegment(_); - } - } - - if (!m_CurrentWriter) - { - const uint64_t SegmentIndex = m_FinishedSegments.size(); - m_CurrentWriter = std::make_unique<RecordedRequestsSegmentWriter>(); - - m_CurrentWriter->BeginWrite(m_BasePath / MakeSegmentPath(SegmentIndex), SegmentIndex, m_NextSegmentBaseIndex); - } - - return *m_CurrentWriter; -} - -void -RecordedRequestsWriter::CommitCurrentSegment(RwLock::ExclusiveLockScope&) -{ - if (m_CurrentWriter) - { - RecordedRequestsSegmentWriter& Writer = *m_CurrentWriter; - m_FinishedSegments.push_back(std::move(m_CurrentWriter)); - m_SegmentBaseIndex.push_back(m_NextSegmentBaseIndex); - m_NextSegmentBaseIndex += Writer.GetRequestCount(); - - Writer.EndWrite(); - } -} - -void -RecordedRequestsWriter::WriteRecordingMetadata() -{ - try - { - DateTime EndTime = DateTime::Now(); - TimeSpan Duration{EndTime.GetTicks() - m_StartTime.GetTicks()}; - - CbObjectWriter Cbo; - Cbo << "time_start" << m_StartTime << "time_end" << EndTime << "duration" << Duration << "format_version" << 2; - - Cbo.BeginObject("system_info"); - Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName(); - Cbo << "segment_count" << m_FinishedSegments.size(); - Cbo << "block_size" << RecordedRequestBlockSize; - Cbo << "standalone_threshold" << StandaloneFileSizeThreshold; - Cbo << "segment_request_count" << SegmentRequestCount; - Cbo << "segment_file_threshold" << LooseFileThreshold; - Cbo << "segment_byte_threshold" << SegmentByteThreshold; - - Describe(GetSystemMetrics(), Cbo); - Cbo.EndObject(); - - Cbo.BeginArray("segments"); - - for (auto& Segment : m_FinishedSegments) - { - Cbo.BeginObject(); - Cbo << "segment" << Segment->GetSegmentIndex(); - Cbo << "base_index" << Segment->GetBaseRequestIndex(); - Cbo << "request_count" << Segment->GetRequestCount(); - Cbo << "request_bytes" << Segment->GetRequestsByteCount(); - Cbo << "start_time" << Segment->GetStartTime(); - Cbo << "end_time" << Segment->GetEndTime(); - Cbo.EndObject(); - } - - Cbo.EndArray(); - - CbObject Metadata = Cbo.Save(); - - WriteFile(m_BasePath / "rpc_recording_info.zcb", Metadata.GetBuffer().AsIoBuffer()); - } - catch (const std::exception& Ex) - { - ZEN_WARN("caught exception while writing metadata for RPC recording: {}", Ex.what()); - } -} - -////////////////////////////////////////////////////////////////////////// - -uint64_t -RecordedRequestsReader::BeginRead(const std::filesystem::path& BasePath, bool InMemory) -{ - m_InMemory = InMemory; - m_BasePath = BasePath; - - std::error_code Ec; - BasicFile InfoFile; - InfoFile.Open(m_BasePath / "rpc_recording_info.zcb", BasicFile::Mode::kRead, Ec); - - if (!Ec) - { - try - { - CbObject CbInfo = LoadCompactBinaryObject(InfoFile.ReadAll()); - - uint64_t TotalRequestCount = 0; - uint64_t MaxSegmentIndex = 0; - - for (auto SegmentElement : CbInfo["segments"]) - { - CbObjectView Segment = SegmentElement.AsObjectView(); - - const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment"sv].AsUInt64(), - .BaseRequestIndex = Segment["base_index"sv].AsUInt64(), - .RequestCount = Segment["request_count"sv].AsUInt64(), - .RequestBytes = Segment["request_bytes"sv].AsUInt64(), - .StartTime = Segment["start_time"sv].AsDateTime(), - .EndTime = Segment["end_time"sv].AsDateTime()}); - - TotalRequestCount += Info.RequestCount; - MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex); - } - - m_SegmentReaders.resize(MaxSegmentIndex + 1); - - return TotalRequestCount; - } - catch (const std::exception& Ex) - { - ZEN_WARN("could not read metadata file: {}", Ex.what()); - } - } - - ZEN_INFO("recovering segment info for '{}'", BasePath); - - uint64_t TotalRequestCount = 0; - uint64_t MaxSegmentIndex = 0; - - try - { - for (int SegmentIndex = 0;; ++SegmentIndex) - { - const std::filesystem::path ZcbPath = BasePath / MakeSegmentPath(SegmentIndex) / "rpc_segment_info.zcb"; - FileContents Fc = ReadFile(ZcbPath); - - if (Fc.ErrorCode) - break; - - if (IoBuffer SegmentInfoBuffer = Fc.Flatten()) - { - CbObject Segment = LoadCompactBinaryObject(SegmentInfoBuffer); - - const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment_index"sv].AsUInt64(), - .BaseRequestIndex = 0, - .RequestCount = Segment["entry_count"sv].AsUInt64(), - .RequestBytes = 0, - .StartTime = Segment["time_start"sv].AsDateTime(), - .EndTime = Segment["time_end"sv].AsDateTime()}); - - TotalRequestCount += Info.RequestCount; - MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex); - } - } - } - catch (const std::exception&) - { - } - - std::sort(begin(m_KnownSegments), end(m_KnownSegments), [](const auto& Lhs, const auto& Rhs) { - return Lhs.SegmentIndex < Rhs.SegmentIndex; - }); - - uint64_t SegmentRequestOffset = 0; - - for (SegmentInfo& Info : m_KnownSegments) - { - Info.BaseRequestIndex = SegmentRequestOffset; - SegmentRequestOffset += Info.RequestCount; - } - - m_SegmentReaders.resize(MaxSegmentIndex + 1); - - return TotalRequestCount; -} - -void -RecordedRequestsReader::EndRead() -{ - RwLock::ExclusiveLockScope _(m_SegmentLock); - m_KnownSegments.clear(); - m_SegmentReaders.clear(); - m_BasePath.clear(); -} - -RecordedRequestInfo -RecordedRequestsReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const -{ - auto EnsureSegment = [&](uint64_t SegmentIndex) -> const RecordedRequestsSegmentReader& { - { - RwLock::SharedLockScope _(m_SegmentLock); - - if (auto SegmentReaderPtr = m_SegmentReaders[SegmentIndex].get()) - { - return *SegmentReaderPtr; - } - } - - RwLock::ExclusiveLockScope _(m_SegmentLock); - - auto& SegmentReaderPtr = m_SegmentReaders[SegmentIndex]; - - if (!SegmentReaderPtr) - { - RecordedRequestsSegmentReader* NewSegment = new RecordedRequestsSegmentReader; - NewSegment->BeginRead(m_BasePath / MakeSegmentPath(SegmentIndex), m_InMemory); - SegmentReaderPtr.reset(NewSegment); - } - - return *(SegmentReaderPtr.get()); - }; - - for (auto& Segment : m_KnownSegments) - { - if (Segment.RequestCount > RequestIndex) - { - return EnsureSegment(Segment.SegmentIndex).ReadRequest(RequestIndex, OutBuffer); - } - else - { - RequestIndex -= Segment.RequestCount; - } - } - - return RecordedRequestInfo::NullRequest; -} - -////////////////////////////////////////////////////////////////////////// - -class DiskRequestRecorder : public IRpcRequestRecorder -{ -public: - DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); } - virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); } - - virtual void RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override - { - m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer); - } - -private: - RecordedRequestsWriter m_RecordedRequests; -}; - -class DiskRequestReplayer : public IRpcRequestReplayer -{ -public: - DiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory) - { - m_RequestCount = m_RequestReader.BeginRead(BasePath, InMemory); - } - virtual ~DiskRequestReplayer() { m_RequestReader.EndRead(); } - - static bool IsCompatible(const std::filesystem::path& BasePath) - { - if (IsFile(BasePath / "rpc_recording_info.zcb")) - { - return true; - } - - const std::filesystem::path SegmentZero = BasePath / MakeSegmentPath(0); - - if (IsFile(SegmentZero / "rpc_segment_info.zcb") && IsFile(SegmentZero / "index.bin")) - { - // top-level metadata is missing, possibly because of premature exit - // on the recording side - - return true; - } - - return false; - } - - virtual uint64_t GetRequestCount() const override { return m_RequestCount; } - - virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override - { - return m_RequestReader.ReadRequest(RequestIndex, OutBuffer); - } - -private: - std::uint64_t m_RequestCount; - RecordedRequestsReader m_RequestReader; -}; - -} // namespace zen::cache::v2 - -////////////////////////////////////////////////////////////////////////// - -namespace zen::cache { - -std::unique_ptr<cache::IRpcRequestRecorder> -MakeDiskRequestRecorder(const std::filesystem::path& BasePath) -{ - return std::make_unique<v2::DiskRequestRecorder>(BasePath); -} - -std::unique_ptr<cache::IRpcRequestReplayer> -MakeDiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory) -{ - if (v2::DiskRequestReplayer::IsCompatible(BasePath)) - { - return std::make_unique<v2::DiskRequestReplayer>(BasePath, InMemory); - } - else - { - return std::make_unique<v1::DiskRequestReplayer>(BasePath, InMemory); - } -} - -#if ZEN_WITH_TESTS - -void -rpcrecord_forcelink() -{ -} - -TEST_SUITE_BEGIN("rpc.recording"); - -TEST_CASE("rpc.record") -{ - ScopedTemporaryDirectory TempDir; - auto Path = TempDir.Path(); - - const Oid SessionId = GetSessionId(); - - using namespace std::literals; - - { - cache::v2::DiskRequestRecorder Recorder{Path}; - - for (int i = 0; i < 1000; ++i) - { - RecordedRequestInfo RequestInfo{.ContentType = ZenContentType::kCbObject, - .AcceptType = ZenContentType::kCbObject, - .SessionId = SessionId}; - - CbObjectWriter RequestPayload; - RequestPayload << "test"sv << true; - RequestPayload << "index"sv << i; - CbObject Req = RequestPayload.Save(); - IoBuffer RequestBuffer = Req.GetBuffer().AsIoBuffer(); - - Recorder.RecordRequest(RequestInfo, RequestBuffer); - } - } - - { - cache::v2::DiskRequestReplayer Replayer{Path, false}; - - for (int i = 0; i < 1000; ++i) - { - IoBuffer RequestBuffer; - RecordedRequestInfo RequestInfo = Replayer.GetRequest(i, RequestBuffer); - - CHECK(RequestInfo.AcceptType == ZenContentType::kCbObject); - CHECK(RequestInfo.ContentType == ZenContentType::kCbObject); - CHECK(RequestInfo.SessionId == SessionId); - - CbObject Req = LoadCompactBinaryObject(RequestBuffer); - CHECK_EQ(Req["index"sv].AsInt32(), i); - CHECK_EQ(Req["test"sv].AsBool(), true); - } - } -} - -TEST_SUITE_END(); - -#endif - -} // namespace zen::cache |