aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/cache
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-10-20 13:25:58 +0200
committerGitHub <[email protected]>2023-10-20 13:25:58 +0200
commit733dce4c722f101635d60709613d7c97c3bd30df (patch)
tree548cfddd0329a2be1e11ddb0b08ce4ec7b238950 /src/zenutil/cache
parentclean up GcContributor and GcStorage to be pure interfaces (#485) (diff)
downloadzen-733dce4c722f101635d60709613d7c97c3bd30df.tar.xz
zen-733dce4c722f101635d60709613d7c97c3bd30df.zip
Cache (rpc) activitity recording improvements (#482)
this adds a new RPC recording path aimed at more continuous recording and analysis of recorded sessions the new strategy is implemented alongside the original in order to retain the ability to read the older format the main difference between v2 and v1 is that the new strategy splits the recording into segments which are independent from each other. This is done to enable long running sessions with automatic disk cleanup (not implemented yet), appending to an existing recording (not implemented) and/or partial analysis and processing. The recorder will start a new segment when some criteria is fulfilled, including the number of files in the segment directory, disk footprint etc
Diffstat (limited to 'src/zenutil/cache')
-rw-r--r--src/zenutil/cache/rpcrecording.cpp701
1 files changed, 680 insertions, 21 deletions
diff --git a/src/zenutil/cache/rpcrecording.cpp b/src/zenutil/cache/rpcrecording.cpp
index 4958a27f6..054ac0e56 100644
--- a/src/zenutil/cache/rpcrecording.cpp
+++ b/src/zenutil/cache/rpcrecording.cpp
@@ -1,5 +1,9 @@
// Copyright Epic Games, Inc. All Rights Reserved.
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/filesystem.h>
+#include <zencore/logging.h>
+#include <zencore/system.h>
#include <zenutil/basicfile.h>
#include <zenutil/cache/rpcrecording.h>
@@ -9,6 +13,15 @@ ZEN_THIRD_PARTY_INCLUDES_START
ZEN_THIRD_PARTY_INCLUDES_END
namespace zen::cache {
+
+const RecordedRequestInfo RecordedRequestInfo::NullRequest = {.ContentType = ZenContentType::kUnknownContentType,
+ .AcceptType = ZenContentType::kUnknownContentType,
+ .SessionId = Oid::Zero};
+
+}
+
+namespace zen::cache::v1 {
+
struct RecordedRequest
{
uint64_t Offset;
@@ -17,7 +30,8 @@ struct RecordedRequest
ZenContentType AcceptType;
};
-const uint64_t RecordedRequestBlockSize = 1ull << 31u;
+const uint64_t RecordedRequestBlockSize = 1ull << 31u;
+const uint64_t StandaloneFileSizeThreshold = 1 * 1024 * 1024ull;
struct RecordedRequestsWriter
{
@@ -32,6 +46,31 @@ struct RecordedRequestsWriter
RwLock::ExclusiveLockScope _(m_Lock);
m_BlockFiles.clear();
+ try
+ {
+ // Emit some metadata alongside the recording
+
+ DateTime EndTime = DateTime::Now();
+ TimeSpan Duration{EndTime.GetTicks() - m_StartTime.GetTicks()};
+
+ CbObjectWriter Cbo;
+ Cbo << "time_start" << m_StartTime << "time_end" << EndTime << "duration" << Duration;
+ Cbo << "entry_count" << m_Entries.size() << "entry_size" << sizeof(RecordedRequest);
+ Cbo << "block_size" << RecordedRequestBlockSize << "standalone_threshold" << StandaloneFileSizeThreshold;
+
+ Cbo.BeginObject("system_info");
+ Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName();
+ Describe(GetSystemMetrics(), Cbo);
+ Cbo.EndObject();
+ CbObject Metadata = Cbo.Save();
+
+ WriteFile(m_BasePath / "rpc_recording_metadata.zcb", Metadata.GetBuffer().AsIoBuffer());
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("caught exception while generating metadata for RPC recording: {}", Ex.what());
+ }
+
IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest));
BasicFile IndexFile;
IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate);
@@ -41,15 +80,18 @@ struct RecordedRequestsWriter
m_Entries.clear();
}
- uint64_t WriteRequest(ZenContentType ContentType, ZenContentType AcceptType, const IoBuffer& RequestBuffer)
+ uint64_t WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer)
{
RwLock::ExclusiveLockScope Lock(m_Lock);
uint64_t RequestIndex = m_Entries.size();
- RecordedRequest& Entry = m_Entries.emplace_back(
- RecordedRequest{.Offset = ~0ull, .Length = RequestBuffer.Size(), .ContentType = ContentType, .AcceptType = AcceptType});
- if (Entry.Length < 1 * 1024 * 1024)
+ RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0ull,
+ .Length = RequestBuffer.Size(),
+ .ContentType = RequestInfo.ContentType,
+ .AcceptType = RequestInfo.AcceptType});
+
+ if (Entry.Length < StandaloneFileSizeThreshold)
{
- uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize);
+ const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize);
if (BlockIndex == m_BlockFiles.size())
{
std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>());
@@ -92,6 +134,7 @@ struct RecordedRequestsWriter
std::vector<RecordedRequest> m_Entries;
std::vector<std::unique_ptr<BasicFile>> m_BlockFiles;
uint64_t m_ChunkOffset;
+ zen::DateTime m_StartTime = DateTime::Now();
};
struct RecordedRequestsReader
@@ -128,26 +171,31 @@ struct RecordedRequestsReader
}
void EndRead() { m_BlockFiles.clear(); }
- std::pair<ZenContentType, ZenContentType> ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const
+ RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const
{
if (RequestIndex >= m_Entries.size())
{
- return {ZenContentType::kUnknownContentType, ZenContentType::kUnknownContentType};
+ return RecordedRequestInfo::NullRequest;
}
+
const RecordedRequest& Entry = m_Entries[RequestIndex];
if (Entry.Length == 0)
{
- return {ZenContentType::kUnknownContentType, ZenContentType::kUnknownContentType};
+ return RecordedRequestInfo::NullRequest;
}
+
if (Entry.Offset != ~0ull)
{
uint32_t BlockIndex = gsl::narrow<uint32_t>((Entry.Offset + Entry.Length) / RecordedRequestBlockSize);
uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize);
OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length);
- return {Entry.ContentType, Entry.AcceptType};
}
- OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex));
- return {Entry.ContentType, Entry.AcceptType};
+ else
+ {
+ OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex));
+ }
+
+ return {.ContentType = Entry.ContentType, .AcceptType = Entry.AcceptType, .SessionId = Oid::Zero};
}
std::filesystem::path m_BasePath;
@@ -162,14 +210,13 @@ public:
virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); }
private:
- virtual uint64_t RecordRequest(const ZenContentType ContentType,
- const ZenContentType AcceptType,
- const IoBuffer& RequestBuffer) override
+ virtual uint64_t RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override
{
- return m_RecordedRequests.WriteRequest(ContentType, AcceptType, RequestBuffer);
+ return m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer);
}
- virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {}
- virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {}
+ virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {}
+ virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {}
+
RecordedRequestsWriter m_RecordedRequests;
};
@@ -185,7 +232,7 @@ public:
private:
virtual uint64_t GetRequestCount() const override { return m_RequestCount; }
- virtual std::pair<ZenContentType, ZenContentType> GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override
+ virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override
{
return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer);
}
@@ -195,16 +242,628 @@ private:
RecordedRequestsReader m_RequestBuffer;
};
+} // namespace zen::cache::v1
+
+//////////////////////////////////////////////////////////////////////////
+
+namespace zen::cache::v2 {
+
+using namespace std::literals;
+
+struct RecordedRequest
+{
+ uint32_t Offset; // 4 bytes
+ uint32_t Length; // 4 bytes
+ ZenContentType ContentType; // 1 byte
+ ZenContentType AcceptType; // 1 byte
+ uint8_t Padding; // 1 byte
+ uint8_t Padding2; // 1 byte
+ Oid SessionId; // 12 bytes
+};
+
+static_assert(sizeof(RecordedRequest) == 24);
+
+const uint64_t RecordedRequestBlockSize = 1 * 1024 * 1024 * 1024; // 1GiB
+const uint64_t StandaloneFileSizeThreshold = 1 * 1024 * 1024ull; // 1MiB
+const uint64_t SegmentRequestCount = 10 * 1000 * 1000;
+const uint64_t LooseFileThreshold = 5000; // Somewhat arbitrary, but we try to keep the
+ // number of files in a directory below this level
+ // for performance
+const uint64_t SegmentByteThreshold = 16ull * 1024 * 1024 * 1024;
+const TimeSpan SegmentTimeThreshold{/* hours */ 1, /* minutes */ 0, /* seconds */ 0};
+
+struct RecordedRequestsSegmentWriter
+{
+ RecordedRequestsSegmentWriter() = default;
+ ~RecordedRequestsSegmentWriter() = default;
+
+ RecordedRequestsSegmentWriter(const RecordedRequestsSegmentWriter&) = delete;
+ RecordedRequestsSegmentWriter& operator=(const RecordedRequestsSegmentWriter&) = delete;
+
+ void BeginWrite(const std::filesystem::path& BasePath, uint64_t SegmentIndex, uint64_t RequestBaseIndex);
+ uint64_t WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer);
+ void EndWrite();
+
+ inline uint64_t GetRequestCount() const
+ {
+ if (m_RequestCount)
+ {
+ return m_RequestCount;
+ }
+
+ return m_Entries.size();
+ }
+ inline uint64_t GetBaseRequestIndex() const { return m_RequestBaseIndex; }
+ inline uint64_t GetSegmentIndex() const { return m_SegmentIndex; }
+ inline uint64_t GetFileCount() const { return m_FileCount; }
+ inline uint64_t GetRequestsByteCount() const { return m_RequestsByteCount; }
+ inline DateTime GetStartTime() const { return m_StartTime; }
+ inline DateTime GetEndTime() const { return m_EndTime; }
+
+private:
+ std::filesystem::path m_BasePath;
+ uint64_t m_SegmentIndex = 0;
+ uint64_t m_RequestBaseIndex = 0;
+ uint64_t m_RequestCount = 0;
+ std::atomic_uint64_t m_FileCount{};
+ std::atomic_uint64_t m_RequestsByteCount{};
+ mutable RwLock m_Lock;
+ std::vector<RecordedRequest> m_Entries;
+ std::vector<std::unique_ptr<BasicFile>> m_BlockFiles;
+ uint64_t m_ChunkOffset;
+ DateTime m_StartTime = DateTime::Now();
+ DateTime m_EndTime = DateTime::Now();
+};
+
+struct RecordedRequestsWriter
+{
+ void BeginWrite(const std::filesystem::path& BasePath);
+ RecordedRequestsSegmentWriter& EnsureCurrentSegment();
+ void CommitCurrentSegment(RwLock::ExclusiveLockScope&);
+ void EndWrite();
+ uint64_t WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer);
+
+private:
+ std::filesystem::path m_BasePath;
+ zen::DateTime m_StartTime = DateTime::Now();
+
+ mutable RwLock m_Lock;
+ std::unique_ptr<RecordedRequestsSegmentWriter> m_CurrentWriter;
+ std::vector<std::unique_ptr<RecordedRequestsSegmentWriter>> m_FinishedSegments;
+ std::vector<uint64_t> m_SegmentBaseIndex;
+ uint64_t m_NextSegmentBaseIndex = 0;
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+struct RecordedRequestsSegmentReader
+{
+ RecordedRequestsSegmentReader() = default;
+
+ RecordedRequestsSegmentReader(const RecordedRequestsSegmentReader&) = delete;
+ RecordedRequestsSegmentReader& operator=(const RecordedRequestsSegmentReader&) = delete;
+
+ uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory);
+ void EndRead();
+ RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const;
+
+private:
+ std::filesystem::path m_BasePath;
+ std::vector<RecordedRequest> m_Entries;
+ std::vector<IoBuffer> m_BlockFiles;
+};
+
+struct RecordedRequestsReader
+{
+ uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory);
+ void EndRead();
+ RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const;
+
+private:
+ struct SegmentInfo
+ {
+ uint64_t SegmentIndex;
+ uint64_t BaseRequestIndex;
+ uint64_t RequestCount;
+ uint64_t RequestBytes;
+ DateTime StartTime{0};
+ DateTime EndTime{0};
+ };
+
+ bool m_InMemory = false;
+ std::filesystem::path m_BasePath;
+ std::vector<SegmentInfo> m_KnownSegments;
+
+ mutable RwLock m_SegmentLock;
+ mutable std::vector<std::unique_ptr<RecordedRequestsSegmentReader>> m_SegmentReaders;
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+void
+RecordedRequestsSegmentWriter::BeginWrite(const std::filesystem::path& BasePath, uint64_t SegmentIndex, uint64_t RequestBaseIndex)
+{
+ m_BasePath = BasePath;
+ m_SegmentIndex = SegmentIndex;
+ m_RequestBaseIndex = RequestBaseIndex;
+ std::filesystem::create_directories(m_BasePath);
+}
+
+void
+RecordedRequestsSegmentWriter::EndWrite()
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_RequestCount = m_Entries.size();
+ m_BlockFiles.clear();
+
+ // Emit some metadata alongside the recording
+
+ try
+ {
+ m_EndTime = DateTime::Now();
+ TimeSpan Duration{m_EndTime.GetTicks() - m_StartTime.GetTicks()};
+
+ CbObjectWriter Cbo;
+ Cbo << "time_start" << m_StartTime << "time_end" << m_EndTime << "duration" << Duration;
+ Cbo << "segment_index" << m_SegmentIndex;
+ Cbo << "entry_count" << m_Entries.size() << "entry_size" << sizeof(RecordedRequest);
+ Cbo << "block_size" << RecordedRequestBlockSize << "standalone_threshold" << StandaloneFileSizeThreshold;
+
+ Cbo.BeginObject("system_info");
+ Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName();
+ Cbo.EndObject();
+ CbObject Metadata = Cbo.Save();
+
+ WriteFile(m_BasePath / "rpc_segment_info.zcb", Metadata.GetBuffer().AsIoBuffer());
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("caught exception while writing segment metadata for RPC recording: {}", Ex.what());
+ }
+
+ IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest));
+ BasicFile IndexFile;
+ IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate);
+ std::error_code Ec;
+ IndexFile.WriteAll(IndexBuffer, Ec);
+ IndexFile.Close();
+ m_Entries.clear();
+}
+
+uint64_t
+RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer)
+{
+ const uint64_t RequestBufferSize = RequestBuffer.GetSize();
+
+ RwLock::ExclusiveLockScope Lock(m_Lock);
+ uint64_t RequestIndex = m_Entries.size();
+ RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0u,
+ .Length = uint32_t(RequestBufferSize & 0xffffFFFFu),
+ .ContentType = RequestInfo.ContentType,
+ .AcceptType = RequestInfo.AcceptType,
+ .Padding = 0,
+ .Padding2 = 0,
+ .SessionId = RequestInfo.SessionId});
+
+ if (Entry.Length < StandaloneFileSizeThreshold)
+ {
+ const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize);
+
+ if (BlockIndex == m_BlockFiles.size())
+ {
+ std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>());
+ NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate);
+ m_ChunkOffset = BlockIndex * RecordedRequestBlockSize;
+ ++m_FileCount;
+ }
+
+ ZEN_ASSERT(BlockIndex < m_BlockFiles.size());
+ BasicFile* BlockFile = m_BlockFiles[BlockIndex].get();
+ ZEN_ASSERT(BlockFile != nullptr);
+
+ Entry.Offset = uint32_t(m_ChunkOffset & 0xffffFFFF);
+ m_ChunkOffset = RoundUp(m_ChunkOffset + Entry.Length, 1u << 4u);
+ Lock.ReleaseNow();
+
+ std::error_code Ec;
+ BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), Entry.Offset - BlockIndex * RecordedRequestBlockSize, Ec);
+ if (Ec)
+ {
+ Entry.Length = 0;
+ return ~0ull;
+ }
+
+ m_RequestsByteCount.fetch_add(RequestBuffer.GetSize());
+
+ return RequestIndex;
+ }
+ Lock.ReleaseNow();
+
+ // Write request data to standalone file
+
+ try
+ {
+ BasicFile RequestFile;
+ RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate);
+
+ if (RequestBufferSize > std::numeric_limits<uint32_t>::max())
+ {
+ // The exact value of the entry is not important, we will use
+ // the size of the standalone file regardless when performing
+ // the read
+ Entry.Length = std::numeric_limits<uint32_t>::max();
+ }
+
+ ++m_FileCount;
+
+ std::error_code Ec;
+ RequestFile.WriteAll(RequestBuffer, Ec);
+
+ if (Ec)
+ {
+ Entry.Length = 0;
+ return ~0ull;
+ }
+
+ m_RequestsByteCount.fetch_add(RequestBuffer.GetSize());
+
+ return RequestIndex;
+ }
+ catch (std::exception&)
+ {
+ Entry.Length = 0;
+ return ~0ull;
+ }
+}
+
+uint64_t
+RecordedRequestsSegmentReader::BeginRead(const std::filesystem::path& BasePath, bool InMemory)
+{
+ m_BasePath = BasePath;
+ BasicFile IndexFile;
+ IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kRead);
+ m_Entries.resize(IndexFile.FileSize() / sizeof(RecordedRequest));
+ IndexFile.Read(m_Entries.data(), IndexFile.FileSize(), 0);
+ uint64_t MaxChunkPosition = 0;
+ for (const RecordedRequest& R : m_Entries)
+ {
+ if (R.Offset != ~0u)
+ {
+ MaxChunkPosition = Max(MaxChunkPosition, R.Offset + R.Length);
+ }
+ }
+ uint32_t BlockCount = gsl::narrow<uint32_t>(MaxChunkPosition / RecordedRequestBlockSize) + 1;
+ m_BlockFiles.resize(BlockCount);
+ for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; ++BlockIndex)
+ {
+ if (InMemory)
+ {
+ BasicFile Chunk;
+ Chunk.Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kRead);
+ m_BlockFiles[BlockIndex] = Chunk.ReadAll();
+ continue;
+ }
+ m_BlockFiles[BlockIndex] = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("chunks{}.bin", BlockIndex));
+ }
+ return m_Entries.size();
+}
+void
+RecordedRequestsSegmentReader::EndRead()
+{
+ m_BlockFiles.clear();
+}
+
+RecordedRequestInfo
+RecordedRequestsSegmentReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const
+{
+ if (RequestIndex >= m_Entries.size())
+ {
+ return RecordedRequestInfo::NullRequest;
+ }
+ const RecordedRequest& Entry = m_Entries[RequestIndex];
+ if (Entry.Length == 0)
+ {
+ return RecordedRequestInfo::NullRequest;
+ }
+
+ RecordedRequestInfo RequestInfo = {.ContentType = Entry.ContentType, .AcceptType = Entry.AcceptType, .SessionId = Entry.SessionId};
+
+ if (Entry.Offset != ~0u)
+ {
+ // Inline in block file
+ uint32_t BlockIndex = gsl::narrow<uint32_t>((Entry.Offset + Entry.Length) / RecordedRequestBlockSize);
+ uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize);
+ OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length);
+
+ return RequestInfo;
+ }
+
+ // Standalone file
+ OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex));
+
+ return RequestInfo;
+}
+
+void
+RecordedRequestsWriter::BeginWrite(const std::filesystem::path& BasePath)
+{
+ m_BasePath = BasePath;
+ EnsureCurrentSegment();
+}
+
+RecordedRequestsSegmentWriter&
+RecordedRequestsWriter::EnsureCurrentSegment()
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ if (m_CurrentWriter)
+ {
+ bool StartNewSegment = false;
+
+ if (m_CurrentWriter->GetRequestCount() >= SegmentRequestCount)
+ {
+ ZEN_DEBUG("starting new RPC recording segment due to request count >= {}", SegmentRequestCount);
+ StartNewSegment = true;
+ }
+ else if (m_CurrentWriter->GetFileCount() >= LooseFileThreshold)
+ {
+ ZEN_DEBUG("starting new RPC recording segment due to file count >= {}", LooseFileThreshold);
+ StartNewSegment = true;
+ }
+ else if (m_CurrentWriter->GetRequestsByteCount() >= SegmentByteThreshold)
+ {
+ ZEN_DEBUG("starting new RPC recording segment due to footprint >= {} bytes", SegmentByteThreshold);
+ StartNewSegment = true;
+ }
+
+ if (StartNewSegment)
+ {
+ CommitCurrentSegment(_);
+ }
+ }
+
+ if (!m_CurrentWriter)
+ {
+ const uint64_t SegmentIndex = m_FinishedSegments.size();
+ m_CurrentWriter = std::make_unique<RecordedRequestsSegmentWriter>();
+
+ m_CurrentWriter->BeginWrite(m_BasePath / fmt::format("segment_{:06}", SegmentIndex), SegmentIndex, m_NextSegmentBaseIndex);
+ }
+
+ return *m_CurrentWriter;
+}
+
+void
+RecordedRequestsWriter::CommitCurrentSegment(RwLock::ExclusiveLockScope&)
+{
+ if (m_CurrentWriter)
+ {
+ RecordedRequestsSegmentWriter& Writer = *m_CurrentWriter;
+ m_FinishedSegments.push_back(std::move(m_CurrentWriter));
+ m_SegmentBaseIndex.push_back(m_NextSegmentBaseIndex);
+ m_NextSegmentBaseIndex += Writer.GetRequestCount();
+
+ Writer.EndWrite();
+ }
+}
+
+void
+RecordedRequestsWriter::EndWrite()
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ CommitCurrentSegment(_);
+
+ // Emit some metadata alongside the recording
+
+ try
+ {
+ DateTime EndTime = DateTime::Now();
+ TimeSpan Duration{EndTime.GetTicks() - m_StartTime.GetTicks()};
+
+ CbObjectWriter Cbo;
+ Cbo << "time_start" << m_StartTime << "time_end" << EndTime << "duration" << Duration << "format_version" << 2;
+
+ Cbo.BeginObject("system_info");
+ Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName();
+ Cbo << "segment_count" << m_FinishedSegments.size();
+ Cbo << "block_size" << RecordedRequestBlockSize;
+ Cbo << "standalone_threshold" << StandaloneFileSizeThreshold;
+ Cbo << "segment_request_count" << SegmentRequestCount;
+ Cbo << "segment_file_threshold" << LooseFileThreshold;
+ Cbo << "segment_byte_threshold" << SegmentByteThreshold;
+
+ Describe(GetSystemMetrics(), Cbo);
+ Cbo.EndObject();
+
+ Cbo.BeginArray("segments");
+
+ for (auto& Segment : m_FinishedSegments)
+ {
+ Cbo.BeginObject();
+ Cbo << "segment" << Segment->GetSegmentIndex();
+ Cbo << "base_index" << Segment->GetBaseRequestIndex();
+ Cbo << "request_count" << Segment->GetRequestCount();
+ Cbo << "request_bytes" << Segment->GetRequestsByteCount();
+ Cbo << "start_time" << Segment->GetStartTime();
+ Cbo << "end_time" << Segment->GetEndTime();
+ Cbo.EndObject();
+ }
+
+ Cbo.EndArray();
+
+ CbObject Metadata = Cbo.Save();
+
+ WriteFile(m_BasePath / "rpc_recording_info.zcb", Metadata.GetBuffer().AsIoBuffer());
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("caught exception while writing metadata for RPC recording: {}", Ex.what());
+ }
+}
+
+uint64_t
+RecordedRequestsWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer)
+{
+ RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment();
+
+ const uint64_t SegmentLocalIndex = Writer.WriteRequest(RequestInfo, RequestBuffer);
+
+ return Writer.GetBaseRequestIndex() + SegmentLocalIndex;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+uint64_t
+RecordedRequestsReader::BeginRead(const std::filesystem::path& BasePath, bool InMemory)
+{
+ m_InMemory = InMemory;
+ m_BasePath = BasePath;
+
+ BasicFile InfoFile;
+ InfoFile.Open(m_BasePath / "rpc_recording_info.zcb", BasicFile::Mode::kRead);
+ CbObject CbInfo = LoadCompactBinaryObject(InfoFile.ReadAll());
+
+ uint64_t TotalRequestCount = 0;
+ uint64_t MaxSegmentIndex = 0;
+
+ for (auto SegmentElement : CbInfo["segments"])
+ {
+ CbObjectView Segment = SegmentElement.AsObjectView();
+
+ const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment"sv].AsUInt64(),
+ .BaseRequestIndex = Segment["base_index"sv].AsUInt64(),
+ .RequestCount = Segment["request_count"sv].AsUInt64(),
+ .RequestBytes = Segment["request_bytes"sv].AsUInt64(),
+ .StartTime = Segment["start_time"sv].AsDateTime(),
+ .EndTime = Segment["end_time"sv].AsDateTime()});
+
+ TotalRequestCount += Info.RequestCount;
+ MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex);
+ }
+
+ m_SegmentReaders.resize(MaxSegmentIndex + 1);
+
+ return TotalRequestCount;
+}
+
+void
+RecordedRequestsReader::EndRead()
+{
+ RwLock::ExclusiveLockScope _(m_SegmentLock);
+ m_KnownSegments.clear();
+ m_SegmentReaders.clear();
+ m_BasePath.clear();
+}
+
+RecordedRequestInfo
+RecordedRequestsReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const
+{
+ auto EnsureSegment = [&](uint64_t SegmentIndex) -> const RecordedRequestsSegmentReader& {
+ {
+ RwLock::SharedLockScope _(m_SegmentLock);
+
+ if (auto SegmentReaderPtr = m_SegmentReaders[SegmentIndex].get())
+ {
+ return *SegmentReaderPtr;
+ }
+ }
+
+ RwLock::ExclusiveLockScope _(m_SegmentLock);
+
+ auto& SegmentReaderPtr = m_SegmentReaders[SegmentIndex];
+
+ if (!SegmentReaderPtr)
+ {
+ RecordedRequestsSegmentReader* NewSegment = new RecordedRequestsSegmentReader;
+ NewSegment->BeginRead(m_BasePath / fmt::format("segment_{:06}", SegmentIndex), m_InMemory);
+ SegmentReaderPtr.reset(NewSegment);
+ }
+
+ return *(SegmentReaderPtr.get());
+ };
+
+ for (auto& Segment : m_KnownSegments)
+ {
+ if (Segment.RequestCount > RequestIndex)
+ {
+ return EnsureSegment(Segment.SegmentIndex).ReadRequest(RequestIndex, OutBuffer);
+ }
+ else
+ {
+ RequestIndex -= Segment.RequestCount;
+ }
+ }
+
+ return RecordedRequestInfo::NullRequest;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+class DiskRequestRecorder : public IRpcRequestRecorder
+{
+public:
+ DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); }
+ virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); }
+
+private:
+ virtual uint64_t RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override
+ {
+ return m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer);
+ }
+ virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {}
+ virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {}
+
+ RecordedRequestsWriter m_RecordedRequests;
+};
+
+class DiskRequestReplayer : public IRpcRequestReplayer
+{
+public:
+ DiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory)
+ {
+ m_RequestCount = m_RequestBuffer.BeginRead(BasePath, InMemory);
+ }
+ virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); }
+
+ static bool IsCompatible(const std::filesystem::path& BasePath) { return std::filesystem::exists(BasePath / "rpc_recording_info.zcb"); }
+
+private:
+ virtual uint64_t GetRequestCount() const override { return m_RequestCount; }
+
+ virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override
+ {
+ return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer);
+ }
+ virtual ZenContentType GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; }
+
+ std::uint64_t m_RequestCount;
+ RecordedRequestsReader m_RequestBuffer;
+};
+
+} // namespace zen::cache::v2
+
+//////////////////////////////////////////////////////////////////////////
+
+namespace zen::cache {
+
std::unique_ptr<cache::IRpcRequestRecorder>
MakeDiskRequestRecorder(const std::filesystem::path& BasePath)
{
- return std::make_unique<DiskRequestRecorder>(BasePath);
+ return std::make_unique<v2::DiskRequestRecorder>(BasePath);
}
std::unique_ptr<cache::IRpcRequestReplayer>
MakeDiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory)
{
- return std::make_unique<DiskRequestReplayer>(BasePath, InMemory);
+ if (v2::DiskRequestReplayer::IsCompatible(BasePath))
+ {
+ return std::make_unique<v2::DiskRequestReplayer>(BasePath, InMemory);
+ }
+ else
+ {
+ return std::make_unique<v1::DiskRequestReplayer>(BasePath, InMemory);
+ }
}
} // namespace zen::cache