aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-12-15 14:09:12 +0100
committerGitHub <[email protected]>2023-12-15 14:09:12 +0100
commit9266e40239f241b7e38fa93719004b323ddedf10 (patch)
tree01ac8a3b38aa5913edab7ba10f299a48a01d8fe0 /src
parentwindows executable signing (#566) (diff)
downloadzen-9266e40239f241b7e38fa93719004b323ddedf10.tar.xz
zen-9266e40239f241b7e38fa93719004b323ddedf10.zip
fixed v2 rpc recording issue with >4GB data per segment (#612)
* fixed v2 rpc recording issue with >4GB data per segment * implemented recovery logic to deal with partial RPC recordings * added check for invalid/null requests in RPC replay * also made sure at least one worker thread is configured * fix problem where "null" requests would cause infinite loop! * added basic RPC recorder tests
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/rpcreplay_cmd.cpp234
-rw-r--r--src/zenutil/cache/rpcrecording.cpp227
-rw-r--r--src/zenutil/include/zenutil/cache/rpcrecording.h6
-rw-r--r--src/zenutil/zenutil.cpp2
4 files changed, 323 insertions, 146 deletions
diff --git a/src/zen/cmds/rpcreplay_cmd.cpp b/src/zen/cmds/rpcreplay_cmd.cpp
index 202829aa0..53f45358e 100644
--- a/src/zen/cmds/rpcreplay_cmd.cpp
+++ b/src/zen/cmds/rpcreplay_cmd.cpp
@@ -201,6 +201,8 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
throw std::runtime_error(fmt::format("could not find recording at '{}'", m_RecordingPath));
}
+ m_ThreadCount = Max(m_ThreadCount, 1);
+
Stopwatch TotalTimer;
if (m_OnHost)
@@ -282,152 +284,156 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
uint64_t EntryIndex = EntryOffset.fetch_add(m_Stride);
while (EntryIndex < EntryCount)
{
- IoBuffer Payload;
- zen::cache::RecordedRequestInfo RequestInfo = Replayer->GetRequest(EntryIndex, /* out */ Payload);
-
- CbPackage RequestPackage;
- CbObject Request;
+ IoBuffer Payload;
+ const zen::cache::RecordedRequestInfo RequestInfo = Replayer->GetRequest(EntryIndex, /* out */ Payload);
- switch (RequestInfo.ContentType)
+ if (RequestInfo != zen::cache::RecordedRequestInfo::NullRequest)
{
- case ZenContentType::kCbPackage:
- {
- if (ParsePackageMessageWithLegacyFallback(Payload, RequestPackage))
+ CbPackage RequestPackage;
+ CbObject Request;
+
+ switch (RequestInfo.ContentType)
+ {
+ case ZenContentType::kCbPackage:
{
- Request = RequestPackage.GetObject();
+ if (ParsePackageMessageWithLegacyFallback(Payload, RequestPackage))
+ {
+ Request = RequestPackage.GetObject();
+ }
}
- }
- break;
- case ZenContentType::kCbObject:
- {
- Request = LoadCompactBinaryObject(Payload);
- }
- break;
- }
+ break;
+ case ZenContentType::kCbObject:
+ {
+ Request = LoadCompactBinaryObject(Payload);
+ }
+ break;
+ }
- RpcAcceptOptions OriginalAcceptOptions = static_cast<RpcAcceptOptions>(Request["AcceptFlags"sv].AsUInt16(0u));
- int OriginalProcessPid = Request["Pid"sv].AsInt32(0);
+ RpcAcceptOptions OriginalAcceptOptions = static_cast<RpcAcceptOptions>(Request["AcceptFlags"sv].AsUInt16(0u));
+ int OriginalProcessPid = Request["Pid"sv].AsInt32(0);
- int AdjustedPid = 0;
- RpcAcceptOptions AdjustedAcceptOptions = RpcAcceptOptions::kNone;
+ int AdjustedPid = 0;
+ RpcAcceptOptions AdjustedAcceptOptions = RpcAcceptOptions::kNone;
- if (!m_DisableLocalRefs)
- {
- if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowLocalReferences) || m_ForceAllowLocalRefs)
+ if (!m_DisableLocalRefs)
{
- AdjustedAcceptOptions |= RpcAcceptOptions::kAllowLocalReferences;
- if (!m_DisablePartialLocalRefs)
+ if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowLocalReferences) ||
+ m_ForceAllowLocalRefs)
{
- if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowPartialLocalReferences) ||
- m_ForceAllowPartialLocalRefs)
+ AdjustedAcceptOptions |= RpcAcceptOptions::kAllowLocalReferences;
+ if (!m_DisablePartialLocalRefs)
{
- AdjustedAcceptOptions |= RpcAcceptOptions::kAllowPartialLocalReferences;
+ if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowPartialLocalReferences) ||
+ m_ForceAllowPartialLocalRefs)
+ {
+ AdjustedAcceptOptions |= RpcAcceptOptions::kAllowPartialLocalReferences;
+ }
}
- }
- if (!m_DisableLocalHandleRefs)
- {
- if (OriginalProcessPid != 0 || m_ForceAllowLocalHandleRef)
+ if (!m_DisableLocalHandleRefs)
{
- AdjustedPid = GetCurrentProcessId();
+ if (OriginalProcessPid != 0 || m_ForceAllowLocalHandleRef)
+ {
+ AdjustedPid = GetCurrentProcessId();
+ }
}
}
}
- }
- if (m_ShowMethodStats)
- {
- std::string MethodName = std::string(Request["Method"sv].AsString());
- if (auto It = LocalMethodTypes.find(MethodName); It != LocalMethodTypes.end())
- {
- It->second++;
- }
- else
+ if (m_ShowMethodStats)
{
- LocalMethodTypes[MethodName] = 1;
+ std::string MethodName = std::string(Request["Method"sv].AsString());
+ if (auto It = LocalMethodTypes.find(MethodName); It != LocalMethodTypes.end())
+ {
+ It->second++;
+ }
+ else
+ {
+ LocalMethodTypes[MethodName] = 1;
+ }
}
- }
- if (OriginalAcceptOptions != AdjustedAcceptOptions || OriginalProcessPid != AdjustedPid)
- {
- CbObjectWriter RequestCopyWriter;
- for (const CbFieldView& Field : Request)
+ if (OriginalAcceptOptions != AdjustedAcceptOptions || OriginalProcessPid != AdjustedPid)
{
- if (!Field.HasName())
+ CbObjectWriter RequestCopyWriter;
+ for (const CbFieldView& Field : Request)
{
- RequestCopyWriter.AddField(Field);
- continue;
+ if (!Field.HasName())
+ {
+ RequestCopyWriter.AddField(Field);
+ continue;
+ }
+ std::string_view FieldName = Field.GetName();
+ if (FieldName == "Pid"sv)
+ {
+ continue;
+ }
+ if (FieldName == "AcceptFlags"sv)
+ {
+ continue;
+ }
+ RequestCopyWriter.AddField(FieldName, Field);
}
- std::string_view FieldName = Field.GetName();
- if (FieldName == "Pid"sv)
+ if (AdjustedPid != 0)
{
- continue;
+ RequestCopyWriter.AddInteger("Pid"sv, AdjustedPid);
}
- if (FieldName == "AcceptFlags"sv)
+ if (AdjustedAcceptOptions != RpcAcceptOptions::kNone)
{
- continue;
+ RequestCopyWriter.AddInteger("AcceptFlags"sv, static_cast<uint16_t>(AdjustedAcceptOptions));
}
- RequestCopyWriter.AddField(FieldName, Field);
- }
- if (AdjustedPid != 0)
- {
- RequestCopyWriter.AddInteger("Pid"sv, AdjustedPid);
- }
- if (AdjustedAcceptOptions != RpcAcceptOptions::kNone)
- {
- RequestCopyWriter.AddInteger("AcceptFlags"sv, static_cast<uint16_t>(AdjustedAcceptOptions));
- }
- if (RequestInfo.ContentType == ZenContentType::kCbPackage)
- {
- RequestPackage.SetObject(RequestCopyWriter.Save());
- std::vector<IoBuffer> Buffers = FormatPackageMessage(RequestPackage);
- std::vector<SharedBuffer> SharedBuffers(Buffers.begin(), Buffers.end());
- Payload = CompositeBuffer(std::move(SharedBuffers)).Flatten().AsIoBuffer();
- }
- else
- {
- RequestCopyWriter.Finalize();
- Payload = IoBuffer(RequestCopyWriter.GetSaveSize());
- RequestCopyWriter.Save(Payload.GetMutableView());
+ if (RequestInfo.ContentType == ZenContentType::kCbPackage)
+ {
+ RequestPackage.SetObject(RequestCopyWriter.Save());
+ std::vector<IoBuffer> Buffers = FormatPackageMessage(RequestPackage);
+ std::vector<SharedBuffer> SharedBuffers(Buffers.begin(), Buffers.end());
+ Payload = CompositeBuffer(std::move(SharedBuffers)).Flatten().AsIoBuffer();
+ }
+ else
+ {
+ RequestCopyWriter.Finalize();
+ Payload = IoBuffer(RequestCopyWriter.GetSaveSize());
+ RequestCopyWriter.Save(Payload.GetMutableView());
+ }
}
- }
-
- if (!m_DryRun)
- {
- StringBuilder<32> SessionIdString;
- if (RequestInfo.SessionId != Oid::Zero)
+ if (!m_DryRun)
{
- RequestInfo.SessionId.ToString(SessionIdString);
- }
- else
- {
- GetSessionId().ToString(SessionIdString);
- }
+ StringBuilder<32> SessionIdString;
- Session.SetHeader({{"Content-Type", std::string(MapContentTypeToString(RequestInfo.ContentType))},
- {"Accept", std::string(MapContentTypeToString(RequestInfo.AcceptType))},
- {"UE-Session", std::string(SessionIdString)}});
-
- uint64_t Offset = 0;
- auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) {
- size = Min<size_t>(size, Payload.GetSize() - Offset);
- IoBuffer PayloadRange = IoBuffer(Payload, Offset, size);
- MutableMemoryView Data(buffer, size);
- Data.CopyFrom(PayloadRange.GetView());
- Offset += size;
- return true;
- };
- Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
- cpr::Response Response = Session.Post();
- BytesSent.fetch_add(Payload.GetSize());
- if (Response.error || !(IsHttpSuccessCode(Response.status_code) ||
- Response.status_code == gsl::narrow<long>(HttpResponseCode::NotFound)))
- {
- ZEN_CONSOLE("{}", FormatHttpResponse(Response));
- break;
+ if (RequestInfo.SessionId != Oid::Zero)
+ {
+ RequestInfo.SessionId.ToString(SessionIdString);
+ }
+ else
+ {
+ GetSessionId().ToString(SessionIdString);
+ }
+
+ Session.SetHeader({{"Content-Type", std::string(MapContentTypeToString(RequestInfo.ContentType))},
+ {"Accept", std::string(MapContentTypeToString(RequestInfo.AcceptType))},
+ {"UE-Session", std::string(SessionIdString)}});
+
+ uint64_t Offset = 0;
+ auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) {
+ size = Min<size_t>(size, Payload.GetSize() - Offset);
+ IoBuffer PayloadRange = IoBuffer(Payload, Offset, size);
+ MutableMemoryView Data(buffer, size);
+ Data.CopyFrom(PayloadRange.GetView());
+ Offset += size;
+ return true;
+ };
+ Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
+ cpr::Response Response = Session.Post();
+ BytesSent.fetch_add(Payload.GetSize());
+ if (Response.error || !(IsHttpSuccessCode(Response.status_code) ||
+ Response.status_code == gsl::narrow<long>(HttpResponseCode::NotFound)))
+ {
+ ZEN_CONSOLE("{}", FormatHttpResponse(Response));
+ break;
+ }
+ BytesReceived.fetch_add(Response.downloaded_bytes);
}
- BytesReceived.fetch_add(Response.downloaded_bytes);
}
EntryIndex = EntryOffset.fetch_add(m_Stride);
diff --git a/src/zenutil/cache/rpcrecording.cpp b/src/zenutil/cache/rpcrecording.cpp
index 00cecb8f7..c782f0920 100644
--- a/src/zenutil/cache/rpcrecording.cpp
+++ b/src/zenutil/cache/rpcrecording.cpp
@@ -4,7 +4,10 @@
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
+#include <zencore/session.h>
#include <zencore/system.h>
+#include <zencore/testing.h>
+#include <zencore/testutils.h>
#include <zenutil/basicfile.h>
#include <zenutil/cache/rpcrecording.h>
@@ -230,7 +233,6 @@ public:
}
virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); }
-private:
virtual uint64_t GetRequestCount() const override { return m_RequestCount; }
virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override
@@ -239,6 +241,7 @@ private:
}
virtual ZenContentType GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; }
+private:
std::uint64_t m_RequestCount;
RecordedRequestsReader m_RequestBuffer;
};
@@ -257,9 +260,16 @@ struct RecordedRequest
uint32_t Length; // 4 bytes
ZenContentType ContentType; // 1 byte
ZenContentType AcceptType; // 1 byte
- uint8_t Padding; // 1 byte
+ uint8_t OffsetHigh; // 1 byte
uint8_t Padding2; // 1 byte
Oid SessionId; // 12 bytes
+
+ inline uint64_t GetOffset() const { return uint64_t(Offset) + (uint64_t(OffsetHigh) << 32); }
+ inline void SetOffset(uint64_t NewOffset)
+ {
+ Offset = gsl::narrow_cast<uint32_t>(NewOffset & 0xffff'ffff);
+ OffsetHigh = gsl::narrow_cast<uint8_t>(NewOffset >> 32);
+ }
};
static_assert(sizeof(RecordedRequest) == 24);
@@ -273,6 +283,12 @@ const uint64_t LooseFileThreshold = 5000; // Somewhat arbitrary, but we try
const uint64_t SegmentByteThreshold = 16ull * 1024 * 1024 * 1024;
const TimeSpan SegmentTimeThreshold{/* hours */ 1, /* minutes */ 0, /* seconds */ 0};
+std::string
+MakeSegmentPath(uint64_t SegmentIndex)
+{
+ return fmt::format("segment_{:06}", SegmentIndex);
+}
+
struct RecordedRequestsSegmentWriter
{
RecordedRequestsSegmentWriter() = default;
@@ -443,7 +459,7 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn
.Length = uint32_t(RequestBufferSize & 0xffffFFFFu),
.ContentType = RequestInfo.ContentType,
.AcceptType = RequestInfo.AcceptType,
- .Padding = 0,
+ .OffsetHigh = 0,
.Padding2 = 0,
.SessionId = RequestInfo.SessionId});
@@ -463,12 +479,15 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn
BasicFile* BlockFile = m_BlockFiles[BlockIndex].get();
ZEN_ASSERT(BlockFile != nullptr);
- Entry.Offset = uint32_t(m_ChunkOffset & 0xffffFFFF);
- m_ChunkOffset = RoundUp(m_ChunkOffset + Entry.Length, 1u << 4u);
+ // Note that this is the overall logical offset, not the offset within a single file
+ const uint64_t ChunkWriteOffset = m_ChunkOffset;
+ m_ChunkOffset = RoundUp(ChunkWriteOffset + Entry.Length, 1u << 4u);
Lock.ReleaseNow();
+ Entry.SetOffset(ChunkWriteOffset);
+
std::error_code Ec;
- BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), Entry.Offset - BlockIndex * RecordedRequestBlockSize, Ec);
+ BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), ChunkWriteOffset - (BlockIndex * RecordedRequestBlockSize), Ec);
if (Ec)
{
Entry.Length = 0;
@@ -531,7 +550,7 @@ RecordedRequestsSegmentReader::BeginRead(const std::filesystem::path& BasePath,
{
if (R.Offset != ~0u)
{
- MaxChunkPosition = Max(MaxChunkPosition, R.Offset + R.Length);
+ MaxChunkPosition = Max(MaxChunkPosition, R.GetOffset() + R.Length);
}
}
uint32_t BlockCount = gsl::narrow<uint32_t>(MaxChunkPosition / RecordedRequestBlockSize) + 1;
@@ -574,9 +593,10 @@ RecordedRequestsSegmentReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutB
if (Entry.Offset != ~0u)
{
// Inline in block file
- uint32_t BlockIndex = gsl::narrow<uint32_t>((Entry.Offset + Entry.Length) / RecordedRequestBlockSize);
- uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize);
- OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length);
+ const uint64_t EntryOffset = Entry.GetOffset();
+ const uint32_t BlockIndex = gsl::narrow<uint32_t>((EntryOffset + Entry.Length) / RecordedRequestBlockSize);
+ const uint64_t ChunkOffset = EntryOffset - (BlockIndex * RecordedRequestBlockSize);
+ OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length);
return RequestInfo;
}
@@ -638,7 +658,7 @@ RecordedRequestsWriter::EnsureCurrentSegment()
const uint64_t SegmentIndex = m_FinishedSegments.size();
m_CurrentWriter = std::make_unique<RecordedRequestsSegmentWriter>();
- m_CurrentWriter->BeginWrite(m_BasePath / fmt::format("segment_{:06}", SegmentIndex), SegmentIndex, m_NextSegmentBaseIndex);
+ m_CurrentWriter->BeginWrite(m_BasePath / MakeSegmentPath(SegmentIndex), SegmentIndex, m_NextSegmentBaseIndex);
}
return *m_CurrentWriter;
@@ -735,26 +755,89 @@ RecordedRequestsReader::BeginRead(const std::filesystem::path& BasePath, bool In
m_InMemory = InMemory;
m_BasePath = BasePath;
- BasicFile InfoFile;
- InfoFile.Open(m_BasePath / "rpc_recording_info.zcb", BasicFile::Mode::kRead);
- CbObject CbInfo = LoadCompactBinaryObject(InfoFile.ReadAll());
+ std::error_code Ec;
+ BasicFile InfoFile;
+ InfoFile.Open(m_BasePath / "rpc_recording_info.zcb", BasicFile::Mode::kRead, Ec);
+
+ if (!Ec)
+ {
+ try
+ {
+ CbObject CbInfo = LoadCompactBinaryObject(InfoFile.ReadAll());
+
+ uint64_t TotalRequestCount = 0;
+ uint64_t MaxSegmentIndex = 0;
+
+ for (auto SegmentElement : CbInfo["segments"])
+ {
+ CbObjectView Segment = SegmentElement.AsObjectView();
+
+ const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment"sv].AsUInt64(),
+ .BaseRequestIndex = Segment["base_index"sv].AsUInt64(),
+ .RequestCount = Segment["request_count"sv].AsUInt64(),
+ .RequestBytes = Segment["request_bytes"sv].AsUInt64(),
+ .StartTime = Segment["start_time"sv].AsDateTime(),
+ .EndTime = Segment["end_time"sv].AsDateTime()});
+
+ TotalRequestCount += Info.RequestCount;
+ MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex);
+ }
+
+ m_SegmentReaders.resize(MaxSegmentIndex + 1);
+
+ return TotalRequestCount;
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("could not read metadata file: {}", Ex.what());
+ }
+ }
+
+ ZEN_INFO("recovering segment info for '{}'", BasePath);
uint64_t TotalRequestCount = 0;
uint64_t MaxSegmentIndex = 0;
- for (auto SegmentElement : CbInfo["segments"])
+ try
{
- CbObjectView Segment = SegmentElement.AsObjectView();
+ for (int SegmentIndex = 0;; ++SegmentIndex)
+ {
+ const std::filesystem::path ZcbPath = BasePath / MakeSegmentPath(SegmentIndex) / "rpc_segment_info.zcb";
+ FileContents Fc = ReadFile(ZcbPath);
+
+ if (Fc.ErrorCode)
+ break;
+
+ if (IoBuffer SegmentInfoBuffer = Fc.Flatten())
+ {
+ CbObject Segment = LoadCompactBinaryObject(SegmentInfoBuffer);
- const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment"sv].AsUInt64(),
- .BaseRequestIndex = Segment["base_index"sv].AsUInt64(),
- .RequestCount = Segment["request_count"sv].AsUInt64(),
- .RequestBytes = Segment["request_bytes"sv].AsUInt64(),
- .StartTime = Segment["start_time"sv].AsDateTime(),
- .EndTime = Segment["end_time"sv].AsDateTime()});
+ const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment_index"sv].AsUInt64(),
+ .BaseRequestIndex = 0,
+ .RequestCount = Segment["entry_count"sv].AsUInt64(),
+ .RequestBytes = 0,
+ .StartTime = Segment["time_start"sv].AsDateTime(),
+ .EndTime = Segment["time_end"sv].AsDateTime()});
- TotalRequestCount += Info.RequestCount;
- MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex);
+ TotalRequestCount += Info.RequestCount;
+ MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex);
+ }
+ }
+ }
+ catch (std::exception&)
+ {
+ }
+
+ std::sort(begin(m_KnownSegments), end(m_KnownSegments), [](const auto& Lhs, const auto& Rhs) {
+ return Lhs.SegmentIndex < Rhs.SegmentIndex;
+ });
+
+ uint64_t SegmentRequestOffset = 0;
+
+ for (SegmentInfo& Info : m_KnownSegments)
+ {
+ Info.BaseRequestIndex = SegmentRequestOffset;
+ SegmentRequestOffset += Info.RequestCount;
}
m_SegmentReaders.resize(MaxSegmentIndex + 1);
@@ -791,7 +874,7 @@ RecordedRequestsReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer)
if (!SegmentReaderPtr)
{
RecordedRequestsSegmentReader* NewSegment = new RecordedRequestsSegmentReader;
- NewSegment->BeginRead(m_BasePath / fmt::format("segment_{:06}", SegmentIndex), m_InMemory);
+ NewSegment->BeginRead(m_BasePath / MakeSegmentPath(SegmentIndex), m_InMemory);
SegmentReaderPtr.reset(NewSegment);
}
@@ -821,7 +904,6 @@ public:
DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); }
virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); }
-private:
virtual uint64_t RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override
{
return m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer);
@@ -829,6 +911,7 @@ private:
virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {}
virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {}
+private:
RecordedRequestsWriter m_RecordedRequests;
};
@@ -837,23 +920,41 @@ class DiskRequestReplayer : public IRpcRequestReplayer
public:
DiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory)
{
- m_RequestCount = m_RequestBuffer.BeginRead(BasePath, InMemory);
+ m_RequestCount = m_RequestReader.BeginRead(BasePath, InMemory);
}
- virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); }
+ virtual ~DiskRequestReplayer() { m_RequestReader.EndRead(); }
+
+ static bool IsCompatible(const std::filesystem::path& BasePath)
+ {
+ if (std::filesystem::exists(BasePath / "rpc_recording_info.zcb"))
+ {
+ return true;
+ }
- static bool IsCompatible(const std::filesystem::path& BasePath) { return std::filesystem::exists(BasePath / "rpc_recording_info.zcb"); }
+ const std::filesystem::path SegmentZero = BasePath / MakeSegmentPath(0);
+
+ if (std::filesystem::exists(SegmentZero / "rpc_segment_info.zcb") && std::filesystem::exists(SegmentZero / "index.bin"))
+ {
+ // top-level metadata is missing, possibly because of premature exit
+ // on the recording side
+
+ return true;
+ }
+
+ return false;
+ }
-private:
virtual uint64_t GetRequestCount() const override { return m_RequestCount; }
virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override
{
- return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer);
+ return m_RequestReader.ReadRequest(RequestIndex, OutBuffer);
}
virtual ZenContentType GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; }
+private:
std::uint64_t m_RequestCount;
- RecordedRequestsReader m_RequestBuffer;
+ RecordedRequestsReader m_RequestReader;
};
} // namespace zen::cache::v2
@@ -881,4 +982,66 @@ MakeDiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory)
}
}
+#if ZEN_WITH_TESTS
+
+void
+rpcrecord_forcelink()
+{
+}
+
+TEST_SUITE_BEGIN("rpc.recording");
+
+TEST_CASE("rpc.record")
+{
+ ScopedTemporaryDirectory TempDir;
+ auto Path = TempDir.Path();
+
+ const Oid SessionId = GetSessionId();
+
+ using namespace std::literals;
+
+ {
+ cache::v2::DiskRequestRecorder Recorder{Path};
+
+ for (int i = 0; i < 1000; ++i)
+ {
+ RecordedRequestInfo RequestInfo{.ContentType = ZenContentType::kCbObject,
+ .AcceptType = ZenContentType::kCbObject,
+ .SessionId = SessionId};
+
+ CbObjectWriter RequestPayload;
+ RequestPayload << "test"sv << true;
+ RequestPayload << "index"sv << i;
+ CbObject Req = RequestPayload.Save();
+ IoBuffer RequestBuffer = Req.GetBuffer().AsIoBuffer();
+
+ const uint64_t Index = Recorder.RecordRequest(RequestInfo, RequestBuffer);
+
+ CHECK(Index == i);
+ }
+ }
+
+ {
+ cache::v2::DiskRequestReplayer Replayer{Path, false};
+
+ for (int i = 0; i < 1000; ++i)
+ {
+ IoBuffer RequestBuffer;
+ RecordedRequestInfo RequestInfo = Replayer.GetRequest(i, RequestBuffer);
+
+ CHECK(RequestInfo.AcceptType == ZenContentType::kCbObject);
+ CHECK(RequestInfo.ContentType == ZenContentType::kCbObject);
+ CHECK(RequestInfo.SessionId == SessionId);
+
+ CbObject Req = LoadCompactBinaryObject(RequestBuffer);
+ CHECK_EQ(Req["index"sv].AsInt32(), i);
+ CHECK_EQ(Req["test"sv].AsBool(), true);
+ }
+ }
+}
+
+TEST_SUITE_END();
+
+#endif
+
} // namespace zen::cache
diff --git a/src/zenutil/include/zenutil/cache/rpcrecording.h b/src/zenutil/include/zenutil/cache/rpcrecording.h
index fd5df26ad..ab9b92dd3 100644
--- a/src/zenutil/include/zenutil/cache/rpcrecording.h
+++ b/src/zenutil/include/zenutil/cache/rpcrecording.h
@@ -4,6 +4,9 @@
#include <zencore/compositebuffer.h>
#include <zencore/iobuffer.h>
+#include <zencore/uid.h>
+
+#include <compare>
namespace zen::cache {
@@ -13,6 +16,7 @@ struct RecordedRequestInfo
ZenContentType AcceptType;
Oid SessionId;
+ inline std::strong_ordering operator<=>(const RecordedRequestInfo& Rhs) const = default;
static const RecordedRequestInfo NullRequest;
};
@@ -37,4 +41,6 @@ public:
std::unique_ptr<cache::IRpcRequestRecorder> MakeDiskRequestRecorder(const std::filesystem::path& BasePath);
std::unique_ptr<cache::IRpcRequestReplayer> MakeDiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory);
+void rpcrecord_forcelink();
+
} // namespace zen::cache
diff --git a/src/zenutil/zenutil.cpp b/src/zenutil/zenutil.cpp
index df075ea3f..d9d6c83a2 100644
--- a/src/zenutil/zenutil.cpp
+++ b/src/zenutil/zenutil.cpp
@@ -5,6 +5,7 @@
#if ZEN_WITH_TESTS
# include <zenutil/basicfile.h>
+# include <zenutil/cache/rpcrecording.h>
namespace zen {
@@ -12,6 +13,7 @@ void
zenutil_forcelinktests()
{
basicfile_forcelink();
+ cache::rpcrecord_forcelink();
}
} // namespace zen