aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/cache
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenutil/cache')
-rw-r--r--src/zenutil/cache/rpcrecording.cpp701
1 files changed, 680 insertions, 21 deletions
diff --git a/src/zenutil/cache/rpcrecording.cpp b/src/zenutil/cache/rpcrecording.cpp
index 4958a27f6..054ac0e56 100644
--- a/src/zenutil/cache/rpcrecording.cpp
+++ b/src/zenutil/cache/rpcrecording.cpp
@@ -1,5 +1,9 @@
// Copyright Epic Games, Inc. All Rights Reserved.
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/filesystem.h>
+#include <zencore/logging.h>
+#include <zencore/system.h>
#include <zenutil/basicfile.h>
#include <zenutil/cache/rpcrecording.h>
@@ -9,6 +13,15 @@ ZEN_THIRD_PARTY_INCLUDES_START
ZEN_THIRD_PARTY_INCLUDES_END
namespace zen::cache {
+
+const RecordedRequestInfo RecordedRequestInfo::NullRequest = {.ContentType = ZenContentType::kUnknownContentType,
+ .AcceptType = ZenContentType::kUnknownContentType,
+ .SessionId = Oid::Zero};
+
+}
+
+namespace zen::cache::v1 {
+
struct RecordedRequest
{
uint64_t Offset;
@@ -17,7 +30,8 @@ struct RecordedRequest
ZenContentType AcceptType;
};
-const uint64_t RecordedRequestBlockSize = 1ull << 31u;
+const uint64_t RecordedRequestBlockSize = 1ull << 31u;
+const uint64_t StandaloneFileSizeThreshold = 1 * 1024 * 1024ull;
struct RecordedRequestsWriter
{
@@ -32,6 +46,31 @@ struct RecordedRequestsWriter
RwLock::ExclusiveLockScope _(m_Lock);
m_BlockFiles.clear();
+ try
+ {
+ // Emit some metadata alongside the recording
+
+ DateTime EndTime = DateTime::Now();
+ TimeSpan Duration{EndTime.GetTicks() - m_StartTime.GetTicks()};
+
+ CbObjectWriter Cbo;
+ Cbo << "time_start" << m_StartTime << "time_end" << EndTime << "duration" << Duration;
+ Cbo << "entry_count" << m_Entries.size() << "entry_size" << sizeof(RecordedRequest);
+ Cbo << "block_size" << RecordedRequestBlockSize << "standalone_threshold" << StandaloneFileSizeThreshold;
+
+ Cbo.BeginObject("system_info");
+ Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName();
+ Describe(GetSystemMetrics(), Cbo);
+ Cbo.EndObject();
+ CbObject Metadata = Cbo.Save();
+
+ WriteFile(m_BasePath / "rpc_recording_metadata.zcb", Metadata.GetBuffer().AsIoBuffer());
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("caught exception while generating metadata for RPC recording: {}", Ex.what());
+ }
+
IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest));
BasicFile IndexFile;
IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate);
@@ -41,15 +80,18 @@ struct RecordedRequestsWriter
m_Entries.clear();
}
- uint64_t WriteRequest(ZenContentType ContentType, ZenContentType AcceptType, const IoBuffer& RequestBuffer)
+ uint64_t WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer)
{
RwLock::ExclusiveLockScope Lock(m_Lock);
uint64_t RequestIndex = m_Entries.size();
- RecordedRequest& Entry = m_Entries.emplace_back(
- RecordedRequest{.Offset = ~0ull, .Length = RequestBuffer.Size(), .ContentType = ContentType, .AcceptType = AcceptType});
- if (Entry.Length < 1 * 1024 * 1024)
+ RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0ull,
+ .Length = RequestBuffer.Size(),
+ .ContentType = RequestInfo.ContentType,
+ .AcceptType = RequestInfo.AcceptType});
+
+ if (Entry.Length < StandaloneFileSizeThreshold)
{
- uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize);
+ const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize);
if (BlockIndex == m_BlockFiles.size())
{
std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>());
@@ -92,6 +134,7 @@ struct RecordedRequestsWriter
std::vector<RecordedRequest> m_Entries;
std::vector<std::unique_ptr<BasicFile>> m_BlockFiles;
uint64_t m_ChunkOffset;
+ zen::DateTime m_StartTime = DateTime::Now();
};
struct RecordedRequestsReader
@@ -128,26 +171,31 @@ struct RecordedRequestsReader
}
void EndRead() { m_BlockFiles.clear(); }
- std::pair<ZenContentType, ZenContentType> ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const
+ RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const
{
if (RequestIndex >= m_Entries.size())
{
- return {ZenContentType::kUnknownContentType, ZenContentType::kUnknownContentType};
+ return RecordedRequestInfo::NullRequest;
}
+
const RecordedRequest& Entry = m_Entries[RequestIndex];
if (Entry.Length == 0)
{
- return {ZenContentType::kUnknownContentType, ZenContentType::kUnknownContentType};
+ return RecordedRequestInfo::NullRequest;
}
+
if (Entry.Offset != ~0ull)
{
uint32_t BlockIndex = gsl::narrow<uint32_t>((Entry.Offset + Entry.Length) / RecordedRequestBlockSize);
uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize);
OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length);
- return {Entry.ContentType, Entry.AcceptType};
}
- OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex));
- return {Entry.ContentType, Entry.AcceptType};
+ else
+ {
+ OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex));
+ }
+
+ return {.ContentType = Entry.ContentType, .AcceptType = Entry.AcceptType, .SessionId = Oid::Zero};
}
std::filesystem::path m_BasePath;
@@ -162,14 +210,13 @@ public:
virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); }
private:
- virtual uint64_t RecordRequest(const ZenContentType ContentType,
- const ZenContentType AcceptType,
- const IoBuffer& RequestBuffer) override
+ virtual uint64_t RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override
{
- return m_RecordedRequests.WriteRequest(ContentType, AcceptType, RequestBuffer);
+ return m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer);
}
- virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {}
- virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {}
+ virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {}
+ virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {}
+
RecordedRequestsWriter m_RecordedRequests;
};
@@ -185,7 +232,7 @@ public:
private:
virtual uint64_t GetRequestCount() const override { return m_RequestCount; }
- virtual std::pair<ZenContentType, ZenContentType> GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override
+ virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override
{
return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer);
}
@@ -195,16 +242,628 @@ private:
RecordedRequestsReader m_RequestBuffer;
};
+} // namespace zen::cache::v1
+
+//////////////////////////////////////////////////////////////////////////
+
+namespace zen::cache::v2 {
+
+using namespace std::literals;
+
+struct RecordedRequest
+{
+ uint32_t Offset; // 4 bytes
+ uint32_t Length; // 4 bytes
+ ZenContentType ContentType; // 1 byte
+ ZenContentType AcceptType; // 1 byte
+ uint8_t Padding; // 1 byte
+ uint8_t Padding2; // 1 byte
+ Oid SessionId; // 12 bytes
+};
+
+static_assert(sizeof(RecordedRequest) == 24);
+
+const uint64_t RecordedRequestBlockSize = 1 * 1024 * 1024 * 1024; // 1GiB
+const uint64_t StandaloneFileSizeThreshold = 1 * 1024 * 1024ull; // 1MiB
+const uint64_t SegmentRequestCount = 10 * 1000 * 1000;
+const uint64_t LooseFileThreshold = 5000; // Somewhat arbitrary, but we try to keep the
+ // number of files in a directory below this level
+ // for performance
+const uint64_t SegmentByteThreshold = 16ull * 1024 * 1024 * 1024;
+const TimeSpan SegmentTimeThreshold{/* hours */ 1, /* minutes */ 0, /* seconds */ 0};
+
+struct RecordedRequestsSegmentWriter
+{
+ RecordedRequestsSegmentWriter() = default;
+ ~RecordedRequestsSegmentWriter() = default;
+
+ RecordedRequestsSegmentWriter(const RecordedRequestsSegmentWriter&) = delete;
+ RecordedRequestsSegmentWriter& operator=(const RecordedRequestsSegmentWriter&) = delete;
+
+ void BeginWrite(const std::filesystem::path& BasePath, uint64_t SegmentIndex, uint64_t RequestBaseIndex);
+ uint64_t WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer);
+ void EndWrite();
+
+ inline uint64_t GetRequestCount() const
+ {
+ if (m_RequestCount)
+ {
+ return m_RequestCount;
+ }
+
+ return m_Entries.size();
+ }
+ inline uint64_t GetBaseRequestIndex() const { return m_RequestBaseIndex; }
+ inline uint64_t GetSegmentIndex() const { return m_SegmentIndex; }
+ inline uint64_t GetFileCount() const { return m_FileCount; }
+ inline uint64_t GetRequestsByteCount() const { return m_RequestsByteCount; }
+ inline DateTime GetStartTime() const { return m_StartTime; }
+ inline DateTime GetEndTime() const { return m_EndTime; }
+
+private:
+ std::filesystem::path m_BasePath;
+ uint64_t m_SegmentIndex = 0;
+ uint64_t m_RequestBaseIndex = 0;
+ uint64_t m_RequestCount = 0;
+ std::atomic_uint64_t m_FileCount{};
+ std::atomic_uint64_t m_RequestsByteCount{};
+ mutable RwLock m_Lock;
+ std::vector<RecordedRequest> m_Entries;
+ std::vector<std::unique_ptr<BasicFile>> m_BlockFiles;
+ uint64_t m_ChunkOffset;
+ DateTime m_StartTime = DateTime::Now();
+ DateTime m_EndTime = DateTime::Now();
+};
+
+struct RecordedRequestsWriter
+{
+ void BeginWrite(const std::filesystem::path& BasePath);
+ RecordedRequestsSegmentWriter& EnsureCurrentSegment();
+ void CommitCurrentSegment(RwLock::ExclusiveLockScope&);
+ void EndWrite();
+ uint64_t WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer);
+
+private:
+ std::filesystem::path m_BasePath;
+ zen::DateTime m_StartTime = DateTime::Now();
+
+ mutable RwLock m_Lock;
+ std::unique_ptr<RecordedRequestsSegmentWriter> m_CurrentWriter;
+ std::vector<std::unique_ptr<RecordedRequestsSegmentWriter>> m_FinishedSegments;
+ std::vector<uint64_t> m_SegmentBaseIndex;
+ uint64_t m_NextSegmentBaseIndex = 0;
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+struct RecordedRequestsSegmentReader
+{
+ RecordedRequestsSegmentReader() = default;
+
+ RecordedRequestsSegmentReader(const RecordedRequestsSegmentReader&) = delete;
+ RecordedRequestsSegmentReader& operator=(const RecordedRequestsSegmentReader&) = delete;
+
+ uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory);
+ void EndRead();
+ RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const;
+
+private:
+ std::filesystem::path m_BasePath;
+ std::vector<RecordedRequest> m_Entries;
+ std::vector<IoBuffer> m_BlockFiles;
+};
+
+struct RecordedRequestsReader
+{
+ uint64_t BeginRead(const std::filesystem::path& BasePath, bool InMemory);
+ void EndRead();
+ RecordedRequestInfo ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const;
+
+private:
+ struct SegmentInfo
+ {
+ uint64_t SegmentIndex;
+ uint64_t BaseRequestIndex;
+ uint64_t RequestCount;
+ uint64_t RequestBytes;
+ DateTime StartTime{0};
+ DateTime EndTime{0};
+ };
+
+ bool m_InMemory = false;
+ std::filesystem::path m_BasePath;
+ std::vector<SegmentInfo> m_KnownSegments;
+
+ mutable RwLock m_SegmentLock;
+ mutable std::vector<std::unique_ptr<RecordedRequestsSegmentReader>> m_SegmentReaders;
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+void
+RecordedRequestsSegmentWriter::BeginWrite(const std::filesystem::path& BasePath, uint64_t SegmentIndex, uint64_t RequestBaseIndex)
+{
+ m_BasePath = BasePath;
+ m_SegmentIndex = SegmentIndex;
+ m_RequestBaseIndex = RequestBaseIndex;
+ std::filesystem::create_directories(m_BasePath);
+}
+
+void
+RecordedRequestsSegmentWriter::EndWrite()
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_RequestCount = m_Entries.size();
+ m_BlockFiles.clear();
+
+ // Emit some metadata alongside the recording
+
+ try
+ {
+ m_EndTime = DateTime::Now();
+ TimeSpan Duration{m_EndTime.GetTicks() - m_StartTime.GetTicks()};
+
+ CbObjectWriter Cbo;
+ Cbo << "time_start" << m_StartTime << "time_end" << m_EndTime << "duration" << Duration;
+ Cbo << "segment_index" << m_SegmentIndex;
+ Cbo << "entry_count" << m_Entries.size() << "entry_size" << sizeof(RecordedRequest);
+ Cbo << "block_size" << RecordedRequestBlockSize << "standalone_threshold" << StandaloneFileSizeThreshold;
+
+ Cbo.BeginObject("system_info");
+ Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName();
+ Cbo.EndObject();
+ CbObject Metadata = Cbo.Save();
+
+ WriteFile(m_BasePath / "rpc_segment_info.zcb", Metadata.GetBuffer().AsIoBuffer());
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("caught exception while writing segment metadata for RPC recording: {}", Ex.what());
+ }
+
+ IoBuffer IndexBuffer(IoBuffer::Wrap, m_Entries.data(), m_Entries.size() * sizeof(RecordedRequest));
+ BasicFile IndexFile;
+ IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kTruncate);
+ std::error_code Ec;
+ IndexFile.WriteAll(IndexBuffer, Ec);
+ IndexFile.Close();
+ m_Entries.clear();
+}
+
+uint64_t
+RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer)
+{
+ const uint64_t RequestBufferSize = RequestBuffer.GetSize();
+
+ RwLock::ExclusiveLockScope Lock(m_Lock);
+ uint64_t RequestIndex = m_Entries.size();
+ RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0u,
+ .Length = uint32_t(RequestBufferSize & 0xffffFFFFu),
+ .ContentType = RequestInfo.ContentType,
+ .AcceptType = RequestInfo.AcceptType,
+ .Padding = 0,
+ .Padding2 = 0,
+ .SessionId = RequestInfo.SessionId});
+
+ if (Entry.Length < StandaloneFileSizeThreshold)
+ {
+ const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize);
+
+ if (BlockIndex == m_BlockFiles.size())
+ {
+ std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>());
+ NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate);
+ m_ChunkOffset = BlockIndex * RecordedRequestBlockSize;
+ ++m_FileCount;
+ }
+
+ ZEN_ASSERT(BlockIndex < m_BlockFiles.size());
+ BasicFile* BlockFile = m_BlockFiles[BlockIndex].get();
+ ZEN_ASSERT(BlockFile != nullptr);
+
+ Entry.Offset = uint32_t(m_ChunkOffset & 0xffffFFFF);
+ m_ChunkOffset = RoundUp(m_ChunkOffset + Entry.Length, 1u << 4u);
+ Lock.ReleaseNow();
+
+ std::error_code Ec;
+ BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), Entry.Offset - BlockIndex * RecordedRequestBlockSize, Ec);
+ if (Ec)
+ {
+ Entry.Length = 0;
+ return ~0ull;
+ }
+
+ m_RequestsByteCount.fetch_add(RequestBuffer.GetSize());
+
+ return RequestIndex;
+ }
+ Lock.ReleaseNow();
+
+ // Write request data to standalone file
+
+ try
+ {
+ BasicFile RequestFile;
+ RequestFile.Open(m_BasePath / fmt::format("request{}.bin", RequestIndex), BasicFile::Mode::kTruncate);
+
+ if (RequestBufferSize > std::numeric_limits<uint32_t>::max())
+ {
+ // The exact value of the entry is not important, we will use
+ // the size of the standalone file regardless when performing
+ // the read
+ Entry.Length = std::numeric_limits<uint32_t>::max();
+ }
+
+ ++m_FileCount;
+
+ std::error_code Ec;
+ RequestFile.WriteAll(RequestBuffer, Ec);
+
+ if (Ec)
+ {
+ Entry.Length = 0;
+ return ~0ull;
+ }
+
+ m_RequestsByteCount.fetch_add(RequestBuffer.GetSize());
+
+ return RequestIndex;
+ }
+ catch (std::exception&)
+ {
+ Entry.Length = 0;
+ return ~0ull;
+ }
+}
+
+uint64_t
+RecordedRequestsSegmentReader::BeginRead(const std::filesystem::path& BasePath, bool InMemory)
+{
+ m_BasePath = BasePath;
+ BasicFile IndexFile;
+ IndexFile.Open(m_BasePath / "index.bin", BasicFile::Mode::kRead);
+ m_Entries.resize(IndexFile.FileSize() / sizeof(RecordedRequest));
+ IndexFile.Read(m_Entries.data(), IndexFile.FileSize(), 0);
+ uint64_t MaxChunkPosition = 0;
+ for (const RecordedRequest& R : m_Entries)
+ {
+ if (R.Offset != ~0u)
+ {
+ MaxChunkPosition = Max(MaxChunkPosition, R.Offset + R.Length);
+ }
+ }
+ uint32_t BlockCount = gsl::narrow<uint32_t>(MaxChunkPosition / RecordedRequestBlockSize) + 1;
+ m_BlockFiles.resize(BlockCount);
+ for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; ++BlockIndex)
+ {
+ if (InMemory)
+ {
+ BasicFile Chunk;
+ Chunk.Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kRead);
+ m_BlockFiles[BlockIndex] = Chunk.ReadAll();
+ continue;
+ }
+ m_BlockFiles[BlockIndex] = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("chunks{}.bin", BlockIndex));
+ }
+ return m_Entries.size();
+}
+void
+RecordedRequestsSegmentReader::EndRead()
+{
+ m_BlockFiles.clear();
+}
+
+RecordedRequestInfo
+RecordedRequestsSegmentReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const
+{
+ if (RequestIndex >= m_Entries.size())
+ {
+ return RecordedRequestInfo::NullRequest;
+ }
+ const RecordedRequest& Entry = m_Entries[RequestIndex];
+ if (Entry.Length == 0)
+ {
+ return RecordedRequestInfo::NullRequest;
+ }
+
+ RecordedRequestInfo RequestInfo = {.ContentType = Entry.ContentType, .AcceptType = Entry.AcceptType, .SessionId = Entry.SessionId};
+
+ if (Entry.Offset != ~0u)
+ {
+ // Inline in block file
+ uint32_t BlockIndex = gsl::narrow<uint32_t>((Entry.Offset + Entry.Length) / RecordedRequestBlockSize);
+ uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize);
+ OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length);
+
+ return RequestInfo;
+ }
+
+ // Standalone file
+ OutBuffer = IoBufferBuilder::MakeFromFile(m_BasePath / fmt::format("request{}.bin", RequestIndex));
+
+ return RequestInfo;
+}
+
+void
+RecordedRequestsWriter::BeginWrite(const std::filesystem::path& BasePath)
+{
+ m_BasePath = BasePath;
+ EnsureCurrentSegment();
+}
+
+RecordedRequestsSegmentWriter&
+RecordedRequestsWriter::EnsureCurrentSegment()
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ if (m_CurrentWriter)
+ {
+ bool StartNewSegment = false;
+
+ if (m_CurrentWriter->GetRequestCount() >= SegmentRequestCount)
+ {
+ ZEN_DEBUG("starting new RPC recording segment due to request count >= {}", SegmentRequestCount);
+ StartNewSegment = true;
+ }
+ else if (m_CurrentWriter->GetFileCount() >= LooseFileThreshold)
+ {
+ ZEN_DEBUG("starting new RPC recording segment due to file count >= {}", LooseFileThreshold);
+ StartNewSegment = true;
+ }
+ else if (m_CurrentWriter->GetRequestsByteCount() >= SegmentByteThreshold)
+ {
+ ZEN_DEBUG("starting new RPC recording segment due to footprint >= {} bytes", SegmentByteThreshold);
+ StartNewSegment = true;
+ }
+
+ if (StartNewSegment)
+ {
+ CommitCurrentSegment(_);
+ }
+ }
+
+ if (!m_CurrentWriter)
+ {
+ const uint64_t SegmentIndex = m_FinishedSegments.size();
+ m_CurrentWriter = std::make_unique<RecordedRequestsSegmentWriter>();
+
+ m_CurrentWriter->BeginWrite(m_BasePath / fmt::format("segment_{:06}", SegmentIndex), SegmentIndex, m_NextSegmentBaseIndex);
+ }
+
+ return *m_CurrentWriter;
+}
+
+void
+RecordedRequestsWriter::CommitCurrentSegment(RwLock::ExclusiveLockScope&)
+{
+ if (m_CurrentWriter)
+ {
+ RecordedRequestsSegmentWriter& Writer = *m_CurrentWriter;
+ m_FinishedSegments.push_back(std::move(m_CurrentWriter));
+ m_SegmentBaseIndex.push_back(m_NextSegmentBaseIndex);
+ m_NextSegmentBaseIndex += Writer.GetRequestCount();
+
+ Writer.EndWrite();
+ }
+}
+
+void
+RecordedRequestsWriter::EndWrite()
+{
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ CommitCurrentSegment(_);
+
+ // Emit some metadata alongside the recording
+
+ try
+ {
+ DateTime EndTime = DateTime::Now();
+ TimeSpan Duration{EndTime.GetTicks() - m_StartTime.GetTicks()};
+
+ CbObjectWriter Cbo;
+ Cbo << "time_start" << m_StartTime << "time_end" << EndTime << "duration" << Duration << "format_version" << 2;
+
+ Cbo.BeginObject("system_info");
+ Cbo << "host" << GetMachineName() << "os" << GetOperatingSystemName() << "cpu" << GetCpuName();
+ Cbo << "segment_count" << m_FinishedSegments.size();
+ Cbo << "block_size" << RecordedRequestBlockSize;
+ Cbo << "standalone_threshold" << StandaloneFileSizeThreshold;
+ Cbo << "segment_request_count" << SegmentRequestCount;
+ Cbo << "segment_file_threshold" << LooseFileThreshold;
+ Cbo << "segment_byte_threshold" << SegmentByteThreshold;
+
+ Describe(GetSystemMetrics(), Cbo);
+ Cbo.EndObject();
+
+ Cbo.BeginArray("segments");
+
+ for (auto& Segment : m_FinishedSegments)
+ {
+ Cbo.BeginObject();
+ Cbo << "segment" << Segment->GetSegmentIndex();
+ Cbo << "base_index" << Segment->GetBaseRequestIndex();
+ Cbo << "request_count" << Segment->GetRequestCount();
+ Cbo << "request_bytes" << Segment->GetRequestsByteCount();
+ Cbo << "start_time" << Segment->GetStartTime();
+ Cbo << "end_time" << Segment->GetEndTime();
+ Cbo.EndObject();
+ }
+
+ Cbo.EndArray();
+
+ CbObject Metadata = Cbo.Save();
+
+ WriteFile(m_BasePath / "rpc_recording_info.zcb", Metadata.GetBuffer().AsIoBuffer());
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("caught exception while writing metadata for RPC recording: {}", Ex.what());
+ }
+}
+
+uint64_t
+RecordedRequestsWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer)
+{
+ RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment();
+
+ const uint64_t SegmentLocalIndex = Writer.WriteRequest(RequestInfo, RequestBuffer);
+
+ return Writer.GetBaseRequestIndex() + SegmentLocalIndex;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+uint64_t
+RecordedRequestsReader::BeginRead(const std::filesystem::path& BasePath, bool InMemory)
+{
+ m_InMemory = InMemory;
+ m_BasePath = BasePath;
+
+ BasicFile InfoFile;
+ InfoFile.Open(m_BasePath / "rpc_recording_info.zcb", BasicFile::Mode::kRead);
+ CbObject CbInfo = LoadCompactBinaryObject(InfoFile.ReadAll());
+
+ uint64_t TotalRequestCount = 0;
+ uint64_t MaxSegmentIndex = 0;
+
+ for (auto SegmentElement : CbInfo["segments"])
+ {
+ CbObjectView Segment = SegmentElement.AsObjectView();
+
+ const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment"sv].AsUInt64(),
+ .BaseRequestIndex = Segment["base_index"sv].AsUInt64(),
+ .RequestCount = Segment["request_count"sv].AsUInt64(),
+ .RequestBytes = Segment["request_bytes"sv].AsUInt64(),
+ .StartTime = Segment["start_time"sv].AsDateTime(),
+ .EndTime = Segment["end_time"sv].AsDateTime()});
+
+ TotalRequestCount += Info.RequestCount;
+ MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex);
+ }
+
+ m_SegmentReaders.resize(MaxSegmentIndex + 1);
+
+ return TotalRequestCount;
+}
+
+void
+RecordedRequestsReader::EndRead()
+{
+ RwLock::ExclusiveLockScope _(m_SegmentLock);
+ m_KnownSegments.clear();
+ m_SegmentReaders.clear();
+ m_BasePath.clear();
+}
+
+RecordedRequestInfo
+RecordedRequestsReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const
+{
+ auto EnsureSegment = [&](uint64_t SegmentIndex) -> const RecordedRequestsSegmentReader& {
+ {
+ RwLock::SharedLockScope _(m_SegmentLock);
+
+ if (auto SegmentReaderPtr = m_SegmentReaders[SegmentIndex].get())
+ {
+ return *SegmentReaderPtr;
+ }
+ }
+
+ RwLock::ExclusiveLockScope _(m_SegmentLock);
+
+ auto& SegmentReaderPtr = m_SegmentReaders[SegmentIndex];
+
+ if (!SegmentReaderPtr)
+ {
+ RecordedRequestsSegmentReader* NewSegment = new RecordedRequestsSegmentReader;
+ NewSegment->BeginRead(m_BasePath / fmt::format("segment_{:06}", SegmentIndex), m_InMemory);
+ SegmentReaderPtr.reset(NewSegment);
+ }
+
+ return *(SegmentReaderPtr.get());
+ };
+
+ for (auto& Segment : m_KnownSegments)
+ {
+ if (Segment.RequestCount > RequestIndex)
+ {
+ return EnsureSegment(Segment.SegmentIndex).ReadRequest(RequestIndex, OutBuffer);
+ }
+ else
+ {
+ RequestIndex -= Segment.RequestCount;
+ }
+ }
+
+ return RecordedRequestInfo::NullRequest;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+class DiskRequestRecorder : public IRpcRequestRecorder
+{
+public:
+ DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); }
+ virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); }
+
+private:
+ virtual uint64_t RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override
+ {
+ return m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer);
+ }
+ virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {}
+ virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {}
+
+ RecordedRequestsWriter m_RecordedRequests;
+};
+
+class DiskRequestReplayer : public IRpcRequestReplayer
+{
+public:
+ DiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory)
+ {
+ m_RequestCount = m_RequestBuffer.BeginRead(BasePath, InMemory);
+ }
+ virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); }
+
+ static bool IsCompatible(const std::filesystem::path& BasePath) { return std::filesystem::exists(BasePath / "rpc_recording_info.zcb"); }
+
+private:
+ virtual uint64_t GetRequestCount() const override { return m_RequestCount; }
+
+ virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override
+ {
+ return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer);
+ }
+ virtual ZenContentType GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; }
+
+ std::uint64_t m_RequestCount;
+ RecordedRequestsReader m_RequestBuffer;
+};
+
+} // namespace zen::cache::v2
+
+//////////////////////////////////////////////////////////////////////////
+
+namespace zen::cache {
+
std::unique_ptr<cache::IRpcRequestRecorder>
MakeDiskRequestRecorder(const std::filesystem::path& BasePath)
{
- return std::make_unique<DiskRequestRecorder>(BasePath);
+ return std::make_unique<v2::DiskRequestRecorder>(BasePath);
}
std::unique_ptr<cache::IRpcRequestReplayer>
MakeDiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory)
{
- return std::make_unique<DiskRequestReplayer>(BasePath, InMemory);
+ if (v2::DiskRequestReplayer::IsCompatible(BasePath))
+ {
+ return std::make_unique<v2::DiskRequestReplayer>(BasePath, InMemory);
+ }
+ else
+ {
+ return std::make_unique<v1::DiskRequestReplayer>(BasePath, InMemory);
+ }
}
} // namespace zen::cache