aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/cache
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-10-03 17:17:44 +0200
committerGitHub Enterprise <[email protected]>2025-10-03 17:17:44 +0200
commitf8dae0f66d17a904dfa4a54771df031b62bce10e (patch)
tree5f268e0f3189d8f0be9230dd67b217ca021ca54a /src/zenutil/cache
parentcacherequests helpers test only (#551) (diff)
downloadzen-f8dae0f66d17a904dfa4a54771df031b62bce10e.tar.xz
zen-f8dae0f66d17a904dfa4a54771df031b62bce10e.zip
move rpcrecorder out from cache subfolder (#552)
Diffstat (limited to 'src/zenutil/cache')
-rw-r--r--src/zenutil/cache/rpcrecording.cpp1175
1 files changed, 0 insertions, 1175 deletions
diff --git a/src/zenutil/cache/rpcrecording.cpp b/src/zenutil/cache/rpcrecording.cpp
deleted file mode 100644
index 46e80f6b7..000000000
--- a/src/zenutil/cache/rpcrecording.cpp
+++ /dev/null
@@ -1,1175 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include <zenutil/cache/rpcrecording.h>
-
-#include <zencore/basicfile.h>
-#include <zencore/compactbinarybuilder.h>
-#include <zencore/filesystem.h>
-#include <zencore/fmtutils.h>
-#include <zencore/logging.h>
-#include <zencore/session.h>
-#include <zencore/system.h>
-#include <zencore/testing.h>
-#include <zencore/testutils.h>
-
-ZEN_THIRD_PARTY_INCLUDES_START
-#include <fmt/format.h>
-#include <gsl/gsl-lite.hpp>
-ZEN_THIRD_PARTY_INCLUDES_END
-
-#include <deque>
-#include <thread>
-
-namespace zen::cache {
-
-const RecordedRequestInfo RecordedRequestInfo::NullRequest = {.ContentType = ZenContentType::kUnknownContentType,
- .AcceptType = ZenContentType::kUnknownContentType,
- .SessionId = Oid::Zero};
-
-}
-
-namespace zen::cache::v1 {
-
-struct RecordedRequest
-{
- uint64_t Offset;
- uint64_t Length;
- ZenContentType ContentType;
- ZenContentType AcceptType;
-};
-
-const uint64_t RecordedRequestBlockSize = 1ull << 31u;
-const uint64_t StandaloneFileSizeThreshold = 1 * 1024 * 1024ull;
-
-struct RecordedRequestsWriter
-{
- void BeginWrite(const std::filesystem::path& BasePath)
- {
- m_BasePath = BasePath;
- CreateDirectories(m_BasePath);
- }
-
- void EndWrite()
- {
- RwLock::ExclusiveLockScope _(m_Lock);
- m_BlockFiles.clear();
-
- try
- {
- // Emit some metadata alongside the recording
-
- DateTime EndTime = DateTime::Now();
- TimeSpan Duration{EndTime.GetTicks() - m_StartTime.GetTicks()};
-
- CbObjectWriter Cbo;
- Cbo << "time_start" << m_StartTime << "time_end" << EndTime << "duration" << Duration;
- Cbo << "entry_count" << m_Entries.size() << "entry_size" << sizeof(RecordedRequest);
- Cbo << "block_size" << RecordedRequestBlockSize << "standalone_threshold" << StandaloneFileSizeThreshold;
-
- Cbo.BeginObject("system_info");
- Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName();
- Describe(GetSystemMetrics(), Cbo);
- Cbo.EndObject();
- CbObject Metadata = Cbo.Save();
-
- WriteFile(m_BasePath / "rpc_recording_metadata.zcb", Metadata.GetBuffer().AsIoBuffer());
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("caught exception while generating metadata for RPC recording: {}", Ex.what());
- }
-
- IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest));
- BasicFile IndexFile;
- IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate);
- std::error_code Ec;
- IndexFile.WriteAll(IndexBuffer, Ec);
- IndexFile.Close();
- m_Entries.clear();
- }
-
- void WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer)
- {
- RwLock::ExclusiveLockScope Lock(m_Lock);
- uint64_t RequestIndex = m_Entries.size();
- RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0ull,
- .Length = RequestBuffer.Size(),
- .ContentType = RequestInfo.ContentType,
- .AcceptType = RequestInfo.AcceptType});
-
- if (Entry.Length < StandaloneFileSizeThreshold)
- {
- const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize);
- if (BlockIndex == m_BlockFiles.size())
- {
- std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>());
- NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate);
- m_ChunkOffset = BlockIndex * RecordedRequestBlockSize;
- }
- ZEN_ASSERT(BlockIndex < m_BlockFiles.size());
- BasicFile* BlockFile = m_BlockFiles[BlockIndex].get();
- ZEN_ASSERT(BlockFile != nullptr);
-
- Entry.Offset = m_ChunkOffset;
- m_ChunkOffset = RoundUp(m_ChunkOffset + Entry.Length, 1u << 4u);
- Lock.ReleaseNow();
-
- std::error_code Ec;
- BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), Entry.Offset - BlockIndex * RecordedRequestBlockSize, Ec);
- if (Ec)
- {
- Entry.Length = 0;
- }
- }
- else
- {
- Lock.ReleaseNow();
-
- BasicFile RequestFile;
- RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate);
- std::error_code Ec;
- RequestFile.WriteAll(RequestBuffer, Ec);
- if (Ec)
- {
- Entry.Length = 0;
- }
- }
- }
-
- std::filesystem::path m_BasePath;
- mutable RwLock m_Lock;
- std::vector<RecordedRequest> m_Entries;
- std::vector<std::unique_ptr<BasicFile>> m_BlockFiles;
- uint64_t m_ChunkOffset;
- zen::DateTime m_StartTime = DateTime::Now();
-};
-
-struct RecordedRequestsReader
-{
- uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory)
- {
- m_BasePath = BasePath;
- BasicFile IndexFile;
- IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kRead);
- m_Entries.resize(IndexFile.FileSize() / sizeof(RecordedRequest));
- IndexFile.Read(m_Entries.data(), IndexFile.FileSize(), 0);
- uint64_t MaxChunkPosition = 0;
- for (const RecordedRequest& R : m_Entries)
- {
- if (R.Offset != ~0ull)
- {
- MaxChunkPosition = Max(MaxChunkPosition, R.Offset + R.Length);
- }
- }
- uint32_t BlockCount = gsl::narrow<uint32_t>(MaxChunkPosition / RecordedRequestBlockSize) + 1;
- m_BlockFiles.resize(BlockCount);
- for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; ++BlockIndex)
- {
- if (InMemory)
- {
- BasicFile Chunk;
- Chunk.Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kRead);
- m_BlockFiles[BlockIndex] = Chunk.ReadAll();
- continue;
- }
- m_BlockFiles[BlockIndex] = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("chunks{}.bin", BlockIndex));
- }
- return m_Entries.size();
- }
- void EndRead() { m_BlockFiles.clear(); }
-
- RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const
- {
- if (RequestIndex >= m_Entries.size())
- {
- return RecordedRequestInfo::NullRequest;
- }
-
- const RecordedRequest& Entry = m_Entries[RequestIndex];
- if (Entry.Length == 0)
- {
- return RecordedRequestInfo::NullRequest;
- }
-
- if (Entry.Offset != ~0ull)
- {
- uint32_t BlockIndex = gsl::narrow<uint32_t>((Entry.Offset + Entry.Length) / RecordedRequestBlockSize);
- uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize);
- OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length);
- }
- else
- {
- OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex));
- }
-
- return {.ContentType = Entry.ContentType, .AcceptType = Entry.AcceptType, .SessionId = Oid::Zero};
- }
-
- std::filesystem::path m_BasePath;
- std::vector<RecordedRequest> m_Entries;
- std::vector<IoBuffer> m_BlockFiles;
-};
-
-class DiskRequestRecorder : public IRpcRequestRecorder
-{
-public:
- DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); }
- virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); }
-
-private:
- virtual void RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override
- {
- m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer);
- }
-
- RecordedRequestsWriter m_RecordedRequests;
-};
-
-class DiskRequestReplayer : public IRpcRequestReplayer
-{
-public:
- DiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory)
- {
- m_RequestCount = m_RequestBuffer.BeginRead(BasePath, InMemory);
- }
- virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); }
-
- virtual uint64_t GetRequestCount() const override { return m_RequestCount; }
-
- virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override
- {
- return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer);
- }
-
-private:
- std::uint64_t m_RequestCount;
- RecordedRequestsReader m_RequestBuffer;
-};
-
-} // namespace zen::cache::v1
-
-//////////////////////////////////////////////////////////////////////////
-
-namespace zen::cache::v2 {
-
-using namespace std::literals;
-
-struct RecordedRequest
-{
- uint32_t Offset; // 4 bytes
- uint32_t Length; // 4 bytes
- ZenContentType ContentType; // 1 byte
- ZenContentType AcceptType; // 1 byte
- uint8_t OffsetHigh; // 1 byte
- uint8_t Padding2; // 1 byte
- Oid SessionId; // 12 bytes
-
- inline uint64_t GetOffset() const { return uint64_t(Offset) + (uint64_t(OffsetHigh) << 32); }
- inline void SetOffset(uint64_t NewOffset)
- {
- Offset = gsl::narrow_cast<uint32_t>(NewOffset & 0xffff'ffff);
- OffsetHigh = gsl::narrow_cast<uint8_t>(NewOffset >> 32);
- }
-};
-
-static_assert(sizeof(RecordedRequest) == 24);
-
-const uint64_t RecordedRequestBlockSize = 1 * 1024 * 1024 * 1024; // 1GiB
-const uint64_t StandaloneFileSizeThreshold = 16 * 1024 * 1024ull; // 16MiB
-const uint64_t SegmentRequestCount = 10 * 1000 * 1000;
-const uint64_t LooseFileThreshold = 5000; // Somewhat arbitrary, but we try to keep the
- // number of files in a directory below this level
- // for performance
-const uint64_t SegmentByteThreshold = 16ull * 1024 * 1024 * 1024;
-const TimeSpan SegmentTimeThreshold{/* hours */ 1, /* minutes */ 0, /* seconds */ 0};
-const int64_t MaximumBacklogCount = 2000;
-
-std::string
-MakeSegmentPath(uint64_t SegmentIndex)
-{
- return fmt::format("segment_{:06}", SegmentIndex);
-}
-
-struct RecordedRequestsSegmentWriter
-{
- RecordedRequestsSegmentWriter() = default;
- ~RecordedRequestsSegmentWriter() = default;
-
- RecordedRequestsSegmentWriter(const RecordedRequestsSegmentWriter&) = delete;
- RecordedRequestsSegmentWriter& operator=(const RecordedRequestsSegmentWriter&) = delete;
-
- void BeginWrite(const std::filesystem::path& BasePath, uint64_t SegmentIndex, uint64_t RequestBaseIndex);
- void WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer);
- void EndWrite();
-
- inline uint64_t GetRequestCount() const
- {
- if (m_RequestCount)
- {
- return m_RequestCount;
- }
-
- RwLock::SharedLockScope _(m_Lock);
- return m_Entries.size();
- }
- inline uint64_t GetBaseRequestIndex() const { return m_RequestBaseIndex; }
- inline uint64_t GetSegmentIndex() const { return m_SegmentIndex; }
- inline uint64_t GetFileCount() const { return m_FileCount; }
- inline uint64_t GetRequestsByteCount() const { return m_RequestsByteCount; }
- inline DateTime GetStartTime() const { return m_StartTime; }
- inline DateTime GetEndTime() const { return m_EndTime; }
-
-private:
- std::filesystem::path m_BasePath;
- uint64_t m_SegmentIndex = 0;
- uint64_t m_RequestBaseIndex = 0;
- uint64_t m_RequestCount = 0;
- std::atomic_uint64_t m_FileCount{};
- std::atomic_uint64_t m_RequestsByteCount{};
- mutable RwLock m_Lock;
- std::vector<RecordedRequest> m_Entries;
- std::vector<std::unique_ptr<BasicFile>> m_BlockFiles;
- uint64_t m_ChunkOffset;
- DateTime m_StartTime = DateTime::Now();
- DateTime m_EndTime = DateTime::Now();
-};
-
-struct RecordedRequestsWriter
-{
- RecordedRequestsWriter();
- ~RecordedRequestsWriter();
-
- void BeginWrite(const std::filesystem::path& BasePath);
- void EndWrite();
- void WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer);
-
-private:
- std::filesystem::path m_BasePath;
- zen::DateTime m_StartTime = DateTime::Now();
-
- mutable RwLock m_Lock;
- std::unique_ptr<RecordedRequestsSegmentWriter> m_CurrentWriter;
- std::vector<std::unique_ptr<RecordedRequestsSegmentWriter>> m_FinishedSegments;
- std::vector<uint64_t> m_SegmentBaseIndex;
- uint64_t m_NextSegmentBaseIndex = 0;
-
- RecordedRequestsSegmentWriter& EnsureCurrentSegment();
- void CommitCurrentSegment(RwLock::ExclusiveLockScope&);
- void WriteRecordingMetadata();
-
- // I/O thread state
-
- struct QueuedRequest
- {
- RecordedRequestInfo RequestInfo;
- IoBuffer RequestBuffer;
- };
-
- std::unique_ptr<std::thread> m_WriterThread;
- std::atomic_bool m_IsWriterReady{false};
- std::atomic_bool m_IsActive{false};
- std::atomic_int64_t m_PendingRequests{0};
- RwLock m_RequestQueueLock;
- std::deque<QueuedRequest> m_RequestQueue;
-
- void WriterThreadMain();
-};
-
-//////////////////////////////////////////////////////////////////////////
-
-struct RecordedRequestsSegmentReader
-{
- RecordedRequestsSegmentReader() = default;
-
- RecordedRequestsSegmentReader(const RecordedRequestsSegmentReader&) = delete;
- RecordedRequestsSegmentReader& operator=(const RecordedRequestsSegmentReader&) = delete;
-
- uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory);
- void EndRead();
- RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const;
-
-private:
- std::filesystem::path m_BasePath;
- std::vector<RecordedRequest> m_Entries;
- std::vector<IoBuffer> m_BlockFiles;
-};
-
-struct RecordedRequestsReader
-{
- uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory);
- void EndRead();
- RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const;
-
-private:
- struct SegmentInfo
- {
- uint64_t SegmentIndex;
- uint64_t BaseRequestIndex;
- uint64_t RequestCount;
- uint64_t RequestBytes;
- DateTime StartTime{0};
- DateTime EndTime{0};
- };
-
- bool m_InMemory = false;
- std::filesystem::path m_BasePath;
- std::vector<SegmentInfo> m_KnownSegments;
-
- mutable RwLock m_SegmentLock;
- mutable std::vector<std::unique_ptr<RecordedRequestsSegmentReader>> m_SegmentReaders;
-};
-
-//////////////////////////////////////////////////////////////////////////
-
-void
-RecordedRequestsSegmentWriter::BeginWrite(const std::filesystem::path& BasePath, uint64_t SegmentIndex, uint64_t RequestBaseIndex)
-{
- m_BasePath = BasePath;
- m_SegmentIndex = SegmentIndex;
- m_RequestBaseIndex = RequestBaseIndex;
- CreateDirectories(m_BasePath);
-}
-
-void
-RecordedRequestsSegmentWriter::EndWrite()
-{
- RwLock::ExclusiveLockScope _(m_Lock);
- m_RequestCount = m_Entries.size();
- m_BlockFiles.clear();
-
- // Emit some metadata alongside the recording
-
- try
- {
- m_EndTime = DateTime::Now();
- TimeSpan Duration{m_EndTime.GetTicks() - m_StartTime.GetTicks()};
-
- CbObjectWriter Cbo;
- Cbo << "time_start" << m_StartTime << "time_end" << m_EndTime << "duration" << Duration;
- Cbo << "segment_index" << m_SegmentIndex;
- Cbo << "entry_count" << m_RequestCount << "entry_size" << sizeof(RecordedRequest);
- Cbo << "block_size" << RecordedRequestBlockSize << "standalone_threshold" << StandaloneFileSizeThreshold;
-
- Cbo.BeginObject("system_info");
- Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName();
- Cbo.EndObject();
- CbObject Metadata = Cbo.Save();
-
- WriteFile(m_BasePath / "rpc_segment_info.zcb", Metadata.GetBuffer().AsIoBuffer());
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("caught exception while writing segment metadata for RPC recording: {}", Ex.what());
- }
-
- IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest));
- BasicFile IndexFile;
- IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate);
- std::error_code Ec;
- IndexFile.WriteAll(IndexBuffer, Ec);
- IndexFile.Close();
-
- // note that simply calling `.reset()` here will *not* release backing memory
- // and it's important that we do because on high traffic servers this will use
- // a lot of memory otherwise
- std::vector<RecordedRequest> EmptyEntries;
- swap(m_Entries, EmptyEntries);
-}
-
-void
-RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer)
-{
- const uint64_t RequestBufferSize = RequestBuffer.GetSize();
- uint64_t RequestIndex = ~0ull;
-
- {
- RwLock::ExclusiveLockScope Lock(m_Lock);
- RequestIndex = m_Entries.size();
- RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0u,
- .Length = uint32_t(RequestBufferSize & 0xffffFFFFu),
- .ContentType = RequestInfo.ContentType,
- .AcceptType = RequestInfo.AcceptType,
- .OffsetHigh = 0,
- .Padding2 = 0,
- .SessionId = RequestInfo.SessionId});
-
- if (Entry.Length < StandaloneFileSizeThreshold)
- {
- const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize);
-
- if (BlockIndex == m_BlockFiles.size())
- {
- std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>());
- NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate);
- m_ChunkOffset = BlockIndex * RecordedRequestBlockSize;
- ++m_FileCount;
- }
-
- ZEN_ASSERT(BlockIndex < m_BlockFiles.size());
- BasicFile* BlockFile = m_BlockFiles[BlockIndex].get();
- ZEN_ASSERT(BlockFile != nullptr);
-
- // Note that this is the overall logical offset, not the offset within a single file
- const uint64_t ChunkWriteOffset = m_ChunkOffset;
- m_ChunkOffset = RoundUp(ChunkWriteOffset + Entry.Length, 1u << 4u);
- Entry.SetOffset(ChunkWriteOffset);
- Lock.ReleaseNow();
-
- std::error_code Ec;
- BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), ChunkWriteOffset - (BlockIndex * RecordedRequestBlockSize), Ec);
- if (Ec)
- {
- // We cannot simply use `Entry` here because the vector may
- // have been reallocated causing the entry to be in a different
- // location
- RwLock::ExclusiveLockScope _(m_Lock);
- m_Entries[RequestIndex].Length = 0;
- return;
- }
-
- m_RequestsByteCount.fetch_add(RequestBuffer.GetSize());
-
- return;
- }
- }
-
- // Write request data to standalone file
-
- try
- {
- BasicFile RequestFile;
- RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate);
-
- if (RequestBufferSize > std::numeric_limits<uint32_t>::max())
- {
- // The exact value of the entry is not important, we will use
- // the size of the standalone file regardless when performing
- // the read
- RwLock::ExclusiveLockScope _(m_Lock);
- m_Entries[RequestIndex].Length = std::numeric_limits<uint32_t>::max();
- }
-
- ++m_FileCount;
-
- std::error_code Ec;
- RequestFile.WriteAll(RequestBuffer, Ec);
-
- if (Ec)
- {
- RwLock::ExclusiveLockScope _(m_Lock);
- m_Entries[RequestIndex].Length = 0;
- }
- else
- {
- m_RequestsByteCount.fetch_add(RequestBuffer.GetSize());
- }
- }
- catch (const std::exception&)
- {
- RwLock::ExclusiveLockScope _(m_Lock);
- m_Entries[RequestIndex].Length = 0;
- }
-}
-
-uint64_t
-RecordedRequestsSegmentReader::BeginRead(const std::filesystem::path& BasePath, bool InMemory)
-{
- m_BasePath = BasePath;
- BasicFile IndexFile;
- IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kRead);
- m_Entries.resize(IndexFile.FileSize() / sizeof(RecordedRequest));
- IndexFile.Read(m_Entries.data(), IndexFile.FileSize(), 0);
- uint64_t MaxChunkPosition = 0;
- for (const RecordedRequest& R : m_Entries)
- {
- if (R.Offset != ~0u)
- {
- MaxChunkPosition = Max(MaxChunkPosition, R.GetOffset() + R.Length);
- }
- }
- uint32_t BlockCount = gsl::narrow<uint32_t>(MaxChunkPosition / RecordedRequestBlockSize) + 1;
- m_BlockFiles.resize(BlockCount);
- for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; ++BlockIndex)
- {
- if (InMemory)
- {
- BasicFile Chunk;
- Chunk.Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kRead);
- m_BlockFiles[BlockIndex] = Chunk.ReadAll();
- continue;
- }
- m_BlockFiles[BlockIndex] = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("chunks{}.bin", BlockIndex));
- }
- return m_Entries.size();
-}
-
-void
-RecordedRequestsSegmentReader::EndRead()
-{
- m_BlockFiles.clear();
-}
-
-RecordedRequestInfo
-RecordedRequestsSegmentReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const
-{
- if (RequestIndex >= m_Entries.size())
- {
- return RecordedRequestInfo::NullRequest;
- }
- const RecordedRequest& Entry = m_Entries[RequestIndex];
- if (Entry.Length == 0)
- {
- return RecordedRequestInfo::NullRequest;
- }
-
- RecordedRequestInfo RequestInfo = {.ContentType = Entry.ContentType, .AcceptType = Entry.AcceptType, .SessionId = Entry.SessionId};
-
- if (Entry.Offset != ~0u)
- {
- // Inline in block file
- const uint64_t EntryOffset = Entry.GetOffset();
- const uint32_t BlockIndex = gsl::narrow<uint32_t>((EntryOffset + Entry.Length) / RecordedRequestBlockSize);
- const uint64_t ChunkOffset = EntryOffset - (BlockIndex * RecordedRequestBlockSize);
- OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length);
-
- return RequestInfo;
- }
-
- // Standalone file
- OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex));
-
- return RequestInfo;
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-RecordedRequestsWriter::RecordedRequestsWriter()
-{
-}
-
-RecordedRequestsWriter::~RecordedRequestsWriter()
-{
- EndWrite();
-}
-
-void
-RecordedRequestsWriter::BeginWrite(const std::filesystem::path& BasePath)
-{
- m_BasePath = BasePath;
- m_IsActive = true;
-
- m_WriterThread.reset(new std::thread(&RecordedRequestsWriter::WriterThreadMain, this));
-
- m_IsWriterReady.wait(false);
-}
-
-void
-RecordedRequestsWriter::EndWrite()
-{
- if (m_WriterThread)
- {
- m_IsActive = false;
- const int64_t PendingCount = m_PendingRequests.fetch_add(1);
- m_PendingRequests.notify_all();
-
- if (PendingCount)
- {
- ZEN_INFO("waiting for RPC recorder writing thread to drain {} pending items", PendingCount);
- }
-
- if (m_WriterThread->joinable())
- {
- m_WriterThread->join();
- }
-
- m_WriterThread.reset();
- }
-}
-
-void
-RecordedRequestsWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer)
-{
- if (m_IsActive)
- {
- IoBuffer OwnedRequest = RequestBuffer;
- OwnedRequest.MakeOwned();
-
- {
- RwLock::ExclusiveLockScope _(m_RequestQueueLock);
- m_RequestQueue.push_back(QueuedRequest{RequestInfo, std::move(OwnedRequest)});
- m_PendingRequests.fetch_add(1);
- }
-
- m_PendingRequests.notify_all();
- }
-}
-
-void
-RecordedRequestsWriter::WriterThreadMain()
-{
- SetCurrentThreadName("rpc_writer");
- EnsureCurrentSegment();
-
- m_IsWriterReady.store(true);
- m_IsWriterReady.notify_all();
-
- while (m_IsActive)
- {
- m_PendingRequests.wait(0);
-
- while (m_PendingRequests)
- {
- RwLock::ExclusiveLockScope _(m_RequestQueueLock);
- if (!m_RequestQueue.empty())
- {
- bool DrainBacklog = false;
-
- do
- {
- QueuedRequest Request = m_RequestQueue.front();
-
- m_RequestQueue.pop_front();
- m_PendingRequests.fetch_sub(1);
-
- // For a sufficiently large backlog, keep blocking queueing operations
- // until we get below the threshold
- DrainBacklog = m_RequestQueue.size() >= MaximumBacklogCount;
-
- if (!DrainBacklog)
- {
- _.ReleaseNow();
- }
-
- try
- {
- RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment();
- Writer.WriteRequest(Request.RequestInfo, Request.RequestBuffer);
- }
- catch (const std::exception&)
- {
- // TODO: what's the right behaviour here? The most likely cause would
- // be some I/O error and we probably ought to just shut down recording
- // at that point
- }
- } while (DrainBacklog);
- }
- else
- {
- // shutdown increments this counter so we need to decrement it
- // here even though we didn't process any request
- m_PendingRequests.fetch_sub(1);
- }
- }
- }
-
- RwLock::ExclusiveLockScope _(m_Lock);
- CommitCurrentSegment(_);
- WriteRecordingMetadata();
-}
-
-RecordedRequestsSegmentWriter&
-RecordedRequestsWriter::EnsureCurrentSegment()
-{
- RwLock::ExclusiveLockScope _(m_Lock);
-
- if (m_CurrentWriter)
- {
- bool StartNewSegment = false;
-
- TimeSpan SegmentAge(DateTime::NowTicks() - m_CurrentWriter->GetStartTime().GetTicks());
-
- if (m_CurrentWriter->GetRequestCount() >= SegmentRequestCount)
- {
- ZEN_DEBUG("starting new RPC recording segment due to request count >= {}", SegmentRequestCount);
- StartNewSegment = true;
- }
- else if (m_CurrentWriter->GetFileCount() >= LooseFileThreshold)
- {
- ZEN_DEBUG("starting new RPC recording segment due to file count >= {}", LooseFileThreshold);
- StartNewSegment = true;
- }
- else if (m_CurrentWriter->GetRequestsByteCount() >= SegmentByteThreshold)
- {
- ZEN_DEBUG("starting new RPC recording segment due to footprint >= {} bytes", SegmentByteThreshold);
- StartNewSegment = true;
- }
- else if (SegmentAge >= SegmentTimeThreshold)
- {
- ZEN_DEBUG("starting new RPC recording segment due to age >= {}",
- NiceTimeSpanMs(SegmentTimeThreshold.GetTicks() / TimeSpan::TicksPerMillisecond));
- StartNewSegment = true;
- }
-
- if (StartNewSegment)
- {
- CommitCurrentSegment(_);
- }
- }
-
- if (!m_CurrentWriter)
- {
- const uint64_t SegmentIndex = m_FinishedSegments.size();
- m_CurrentWriter = std::make_unique<RecordedRequestsSegmentWriter>();
-
- m_CurrentWriter->BeginWrite(m_BasePath / MakeSegmentPath(SegmentIndex), SegmentIndex, m_NextSegmentBaseIndex);
- }
-
- return *m_CurrentWriter;
-}
-
-void
-RecordedRequestsWriter::CommitCurrentSegment(RwLock::ExclusiveLockScope&)
-{
- if (m_CurrentWriter)
- {
- RecordedRequestsSegmentWriter& Writer = *m_CurrentWriter;
- m_FinishedSegments.push_back(std::move(m_CurrentWriter));
- m_SegmentBaseIndex.push_back(m_NextSegmentBaseIndex);
- m_NextSegmentBaseIndex += Writer.GetRequestCount();
-
- Writer.EndWrite();
- }
-}
-
-void
-RecordedRequestsWriter::WriteRecordingMetadata()
-{
- try
- {
- DateTime EndTime = DateTime::Now();
- TimeSpan Duration{EndTime.GetTicks() - m_StartTime.GetTicks()};
-
- CbObjectWriter Cbo;
- Cbo << "time_start" << m_StartTime << "time_end" << EndTime << "duration" << Duration << "format_version" << 2;
-
- Cbo.BeginObject("system_info");
- Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName();
- Cbo << "segment_count" << m_FinishedSegments.size();
- Cbo << "block_size" << RecordedRequestBlockSize;
- Cbo << "standalone_threshold" << StandaloneFileSizeThreshold;
- Cbo << "segment_request_count" << SegmentRequestCount;
- Cbo << "segment_file_threshold" << LooseFileThreshold;
- Cbo << "segment_byte_threshold" << SegmentByteThreshold;
-
- Describe(GetSystemMetrics(), Cbo);
- Cbo.EndObject();
-
- Cbo.BeginArray("segments");
-
- for (auto& Segment : m_FinishedSegments)
- {
- Cbo.BeginObject();
- Cbo << "segment" << Segment->GetSegmentIndex();
- Cbo << "base_index" << Segment->GetBaseRequestIndex();
- Cbo << "request_count" << Segment->GetRequestCount();
- Cbo << "request_bytes" << Segment->GetRequestsByteCount();
- Cbo << "start_time" << Segment->GetStartTime();
- Cbo << "end_time" << Segment->GetEndTime();
- Cbo.EndObject();
- }
-
- Cbo.EndArray();
-
- CbObject Metadata = Cbo.Save();
-
- WriteFile(m_BasePath / "rpc_recording_info.zcb", Metadata.GetBuffer().AsIoBuffer());
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("caught exception while writing metadata for RPC recording: {}", Ex.what());
- }
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-uint64_t
-RecordedRequestsReader::BeginRead(const std::filesystem::path& BasePath, bool InMemory)
-{
- m_InMemory = InMemory;
- m_BasePath = BasePath;
-
- std::error_code Ec;
- BasicFile InfoFile;
- InfoFile.Open(m_BasePath / "rpc_recording_info.zcb", BasicFile::Mode::kRead, Ec);
-
- if (!Ec)
- {
- try
- {
- CbObject CbInfo = LoadCompactBinaryObject(InfoFile.ReadAll());
-
- uint64_t TotalRequestCount = 0;
- uint64_t MaxSegmentIndex = 0;
-
- for (auto SegmentElement : CbInfo["segments"])
- {
- CbObjectView Segment = SegmentElement.AsObjectView();
-
- const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment"sv].AsUInt64(),
- .BaseRequestIndex = Segment["base_index"sv].AsUInt64(),
- .RequestCount = Segment["request_count"sv].AsUInt64(),
- .RequestBytes = Segment["request_bytes"sv].AsUInt64(),
- .StartTime = Segment["start_time"sv].AsDateTime(),
- .EndTime = Segment["end_time"sv].AsDateTime()});
-
- TotalRequestCount += Info.RequestCount;
- MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex);
- }
-
- m_SegmentReaders.resize(MaxSegmentIndex + 1);
-
- return TotalRequestCount;
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("could not read metadata file: {}", Ex.what());
- }
- }
-
- ZEN_INFO("recovering segment info for '{}'", BasePath);
-
- uint64_t TotalRequestCount = 0;
- uint64_t MaxSegmentIndex = 0;
-
- try
- {
- for (int SegmentIndex = 0;; ++SegmentIndex)
- {
- const std::filesystem::path ZcbPath = BasePath / MakeSegmentPath(SegmentIndex) / "rpc_segment_info.zcb";
- FileContents Fc = ReadFile(ZcbPath);
-
- if (Fc.ErrorCode)
- break;
-
- if (IoBuffer SegmentInfoBuffer = Fc.Flatten())
- {
- CbObject Segment = LoadCompactBinaryObject(SegmentInfoBuffer);
-
- const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment_index"sv].AsUInt64(),
- .BaseRequestIndex = 0,
- .RequestCount = Segment["entry_count"sv].AsUInt64(),
- .RequestBytes = 0,
- .StartTime = Segment["time_start"sv].AsDateTime(),
- .EndTime = Segment["time_end"sv].AsDateTime()});
-
- TotalRequestCount += Info.RequestCount;
- MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex);
- }
- }
- }
- catch (const std::exception&)
- {
- }
-
- std::sort(begin(m_KnownSegments), end(m_KnownSegments), [](const auto& Lhs, const auto& Rhs) {
- return Lhs.SegmentIndex < Rhs.SegmentIndex;
- });
-
- uint64_t SegmentRequestOffset = 0;
-
- for (SegmentInfo& Info : m_KnownSegments)
- {
- Info.BaseRequestIndex = SegmentRequestOffset;
- SegmentRequestOffset += Info.RequestCount;
- }
-
- m_SegmentReaders.resize(MaxSegmentIndex + 1);
-
- return TotalRequestCount;
-}
-
-void
-RecordedRequestsReader::EndRead()
-{
- RwLock::ExclusiveLockScope _(m_SegmentLock);
- m_KnownSegments.clear();
- m_SegmentReaders.clear();
- m_BasePath.clear();
-}
-
-RecordedRequestInfo
-RecordedRequestsReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const
-{
- auto EnsureSegment = [&](uint64_t SegmentIndex) -> const RecordedRequestsSegmentReader& {
- {
- RwLock::SharedLockScope _(m_SegmentLock);
-
- if (auto SegmentReaderPtr = m_SegmentReaders[SegmentIndex].get())
- {
- return *SegmentReaderPtr;
- }
- }
-
- RwLock::ExclusiveLockScope _(m_SegmentLock);
-
- auto& SegmentReaderPtr = m_SegmentReaders[SegmentIndex];
-
- if (!SegmentReaderPtr)
- {
- RecordedRequestsSegmentReader* NewSegment = new RecordedRequestsSegmentReader;
- NewSegment->BeginRead(m_BasePath / MakeSegmentPath(SegmentIndex), m_InMemory);
- SegmentReaderPtr.reset(NewSegment);
- }
-
- return *(SegmentReaderPtr.get());
- };
-
- for (auto& Segment : m_KnownSegments)
- {
- if (Segment.RequestCount > RequestIndex)
- {
- return EnsureSegment(Segment.SegmentIndex).ReadRequest(RequestIndex, OutBuffer);
- }
- else
- {
- RequestIndex -= Segment.RequestCount;
- }
- }
-
- return RecordedRequestInfo::NullRequest;
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-class DiskRequestRecorder : public IRpcRequestRecorder
-{
-public:
- DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); }
- virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); }
-
- virtual void RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override
- {
- m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer);
- }
-
-private:
- RecordedRequestsWriter m_RecordedRequests;
-};
-
-class DiskRequestReplayer : public IRpcRequestReplayer
-{
-public:
- DiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory)
- {
- m_RequestCount = m_RequestReader.BeginRead(BasePath, InMemory);
- }
- virtual ~DiskRequestReplayer() { m_RequestReader.EndRead(); }
-
- static bool IsCompatible(const std::filesystem::path& BasePath)
- {
- if (IsFile(BasePath / "rpc_recording_info.zcb"))
- {
- return true;
- }
-
- const std::filesystem::path SegmentZero = BasePath / MakeSegmentPath(0);
-
- if (IsFile(SegmentZero / "rpc_segment_info.zcb") && IsFile(SegmentZero / "index.bin"))
- {
- // top-level metadata is missing, possibly because of premature exit
- // on the recording side
-
- return true;
- }
-
- return false;
- }
-
- virtual uint64_t GetRequestCount() const override { return m_RequestCount; }
-
- virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override
- {
- return m_RequestReader.ReadRequest(RequestIndex, OutBuffer);
- }
-
-private:
- std::uint64_t m_RequestCount;
- RecordedRequestsReader m_RequestReader;
-};
-
-} // namespace zen::cache::v2
-
-//////////////////////////////////////////////////////////////////////////
-
-namespace zen::cache {
-
-std::unique_ptr<cache::IRpcRequestRecorder>
-MakeDiskRequestRecorder(const std::filesystem::path& BasePath)
-{
- return std::make_unique<v2::DiskRequestRecorder>(BasePath);
-}
-
-std::unique_ptr<cache::IRpcRequestReplayer>
-MakeDiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory)
-{
- if (v2::DiskRequestReplayer::IsCompatible(BasePath))
- {
- return std::make_unique<v2::DiskRequestReplayer>(BasePath, InMemory);
- }
- else
- {
- return std::make_unique<v1::DiskRequestReplayer>(BasePath, InMemory);
- }
-}
-
-#if ZEN_WITH_TESTS
-
-void
-rpcrecord_forcelink()
-{
-}
-
-TEST_SUITE_BEGIN("rpc.recording");
-
-TEST_CASE("rpc.record")
-{
- ScopedTemporaryDirectory TempDir;
- auto Path = TempDir.Path();
-
- const Oid SessionId = GetSessionId();
-
- using namespace std::literals;
-
- {
- cache::v2::DiskRequestRecorder Recorder{Path};
-
- for (int i = 0; i < 1000; ++i)
- {
- RecordedRequestInfo RequestInfo{.ContentType = ZenContentType::kCbObject,
- .AcceptType = ZenContentType::kCbObject,
- .SessionId = SessionId};
-
- CbObjectWriter RequestPayload;
- RequestPayload << "test"sv << true;
- RequestPayload << "index"sv << i;
- CbObject Req = RequestPayload.Save();
- IoBuffer RequestBuffer = Req.GetBuffer().AsIoBuffer();
-
- Recorder.RecordRequest(RequestInfo, RequestBuffer);
- }
- }
-
- {
- cache::v2::DiskRequestReplayer Replayer{Path, false};
-
- for (int i = 0; i < 1000; ++i)
- {
- IoBuffer RequestBuffer;
- RecordedRequestInfo RequestInfo = Replayer.GetRequest(i, RequestBuffer);
-
- CHECK(RequestInfo.AcceptType == ZenContentType::kCbObject);
- CHECK(RequestInfo.ContentType == ZenContentType::kCbObject);
- CHECK(RequestInfo.SessionId == SessionId);
-
- CbObject Req = LoadCompactBinaryObject(RequestBuffer);
- CHECK_EQ(Req["index"sv].AsInt32(), i);
- CHECK_EQ(Req["test"sv].AsBool(), true);
- }
- }
-}
-
-TEST_SUITE_END();
-
-#endif
-
-} // namespace zen::cache