diff options
| author | Stefan Boberg <[email protected]> | 2023-12-15 14:09:12 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-12-15 14:09:12 +0100 |
| commit | 9266e40239f241b7e38fa93719004b323ddedf10 (patch) | |
| tree | 01ac8a3b38aa5913edab7ba10f299a48a01d8fe0 /src | |
| parent | windows executable signing (#566) (diff) | |
| download | zen-9266e40239f241b7e38fa93719004b323ddedf10.tar.xz zen-9266e40239f241b7e38fa93719004b323ddedf10.zip | |
fixed v2 rpc recording issue with >4GB data per segment (#612)
* fixed v2 rpc recording issue with >4GB data per segment
* implemented recovery logic to deal with partial RPC recordings
* added check for invalid/null requests in RPC replay
* also made sure at least one worker thread is configured
* fix problem where "null" requests would cause infinite loop!
* added basic RPC recorder tests
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/rpcreplay_cmd.cpp | 234 | ||||
| -rw-r--r-- | src/zenutil/cache/rpcrecording.cpp | 227 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/cache/rpcrecording.h | 6 | ||||
| -rw-r--r-- | src/zenutil/zenutil.cpp | 2 |
4 files changed, 323 insertions, 146 deletions
diff --git a/src/zen/cmds/rpcreplay_cmd.cpp b/src/zen/cmds/rpcreplay_cmd.cpp index 202829aa0..53f45358e 100644 --- a/src/zen/cmds/rpcreplay_cmd.cpp +++ b/src/zen/cmds/rpcreplay_cmd.cpp @@ -201,6 +201,8 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) throw std::runtime_error(fmt::format("could not find recording at '{}'", m_RecordingPath)); } + m_ThreadCount = Max(m_ThreadCount, 1); + Stopwatch TotalTimer; if (m_OnHost) @@ -282,152 +284,156 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) uint64_t EntryIndex = EntryOffset.fetch_add(m_Stride); while (EntryIndex < EntryCount) { - IoBuffer Payload; - zen::cache::RecordedRequestInfo RequestInfo = Replayer->GetRequest(EntryIndex, /* out */ Payload); - - CbPackage RequestPackage; - CbObject Request; + IoBuffer Payload; + const zen::cache::RecordedRequestInfo RequestInfo = Replayer->GetRequest(EntryIndex, /* out */ Payload); - switch (RequestInfo.ContentType) + if (RequestInfo != zen::cache::RecordedRequestInfo::NullRequest) { - case ZenContentType::kCbPackage: - { - if (ParsePackageMessageWithLegacyFallback(Payload, RequestPackage)) + CbPackage RequestPackage; + CbObject Request; + + switch (RequestInfo.ContentType) + { + case ZenContentType::kCbPackage: { - Request = RequestPackage.GetObject(); + if (ParsePackageMessageWithLegacyFallback(Payload, RequestPackage)) + { + Request = RequestPackage.GetObject(); + } } - } - break; - case ZenContentType::kCbObject: - { - Request = LoadCompactBinaryObject(Payload); - } - break; - } + break; + case ZenContentType::kCbObject: + { + Request = LoadCompactBinaryObject(Payload); + } + break; + } - RpcAcceptOptions OriginalAcceptOptions = static_cast<RpcAcceptOptions>(Request["AcceptFlags"sv].AsUInt16(0u)); - int OriginalProcessPid = Request["Pid"sv].AsInt32(0); + RpcAcceptOptions OriginalAcceptOptions = static_cast<RpcAcceptOptions>(Request["AcceptFlags"sv].AsUInt16(0u)); + int OriginalProcessPid = Request["Pid"sv].AsInt32(0); - int AdjustedPid = 0; - RpcAcceptOptions AdjustedAcceptOptions = RpcAcceptOptions::kNone; + int AdjustedPid = 0; + RpcAcceptOptions AdjustedAcceptOptions = RpcAcceptOptions::kNone; - if (!m_DisableLocalRefs) - { - if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowLocalReferences) || m_ForceAllowLocalRefs) + if (!m_DisableLocalRefs) { - AdjustedAcceptOptions |= RpcAcceptOptions::kAllowLocalReferences; - if (!m_DisablePartialLocalRefs) + if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowLocalReferences) || + m_ForceAllowLocalRefs) { - if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowPartialLocalReferences) || - m_ForceAllowPartialLocalRefs) + AdjustedAcceptOptions |= RpcAcceptOptions::kAllowLocalReferences; + if (!m_DisablePartialLocalRefs) { - AdjustedAcceptOptions |= RpcAcceptOptions::kAllowPartialLocalReferences; + if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowPartialLocalReferences) || + m_ForceAllowPartialLocalRefs) + { + AdjustedAcceptOptions |= RpcAcceptOptions::kAllowPartialLocalReferences; + } } - } - if (!m_DisableLocalHandleRefs) - { - if (OriginalProcessPid != 0 || m_ForceAllowLocalHandleRef) + if (!m_DisableLocalHandleRefs) { - AdjustedPid = GetCurrentProcessId(); + if (OriginalProcessPid != 0 || m_ForceAllowLocalHandleRef) + { + AdjustedPid = GetCurrentProcessId(); + } } } } - } - if (m_ShowMethodStats) - { - std::string MethodName = std::string(Request["Method"sv].AsString()); - if (auto It = LocalMethodTypes.find(MethodName); It != LocalMethodTypes.end()) - { - It->second++; - } - else + if (m_ShowMethodStats) { - LocalMethodTypes[MethodName] = 1; + std::string MethodName = std::string(Request["Method"sv].AsString()); + if (auto It = LocalMethodTypes.find(MethodName); It != LocalMethodTypes.end()) + { + It->second++; + } + else + { + LocalMethodTypes[MethodName] = 1; + } } - } - if (OriginalAcceptOptions != AdjustedAcceptOptions || OriginalProcessPid != AdjustedPid) - { - CbObjectWriter RequestCopyWriter; - for (const CbFieldView& Field : Request) + if (OriginalAcceptOptions != AdjustedAcceptOptions || OriginalProcessPid != AdjustedPid) { - if (!Field.HasName()) + CbObjectWriter RequestCopyWriter; + for (const CbFieldView& Field : Request) { - RequestCopyWriter.AddField(Field); - continue; + if (!Field.HasName()) + { + RequestCopyWriter.AddField(Field); + continue; + } + std::string_view FieldName = Field.GetName(); + if (FieldName == "Pid"sv) + { + continue; + } + if (FieldName == "AcceptFlags"sv) + { + continue; + } + RequestCopyWriter.AddField(FieldName, Field); } - std::string_view FieldName = Field.GetName(); - if (FieldName == "Pid"sv) + if (AdjustedPid != 0) { - continue; + RequestCopyWriter.AddInteger("Pid"sv, AdjustedPid); } - if (FieldName == "AcceptFlags"sv) + if (AdjustedAcceptOptions != RpcAcceptOptions::kNone) { - continue; + RequestCopyWriter.AddInteger("AcceptFlags"sv, static_cast<uint16_t>(AdjustedAcceptOptions)); } - RequestCopyWriter.AddField(FieldName, Field); - } - if (AdjustedPid != 0) - { - RequestCopyWriter.AddInteger("Pid"sv, AdjustedPid); - } - if (AdjustedAcceptOptions != RpcAcceptOptions::kNone) - { - RequestCopyWriter.AddInteger("AcceptFlags"sv, static_cast<uint16_t>(AdjustedAcceptOptions)); - } - if (RequestInfo.ContentType == ZenContentType::kCbPackage) - { - RequestPackage.SetObject(RequestCopyWriter.Save()); - std::vector<IoBuffer> Buffers = FormatPackageMessage(RequestPackage); - std::vector<SharedBuffer> SharedBuffers(Buffers.begin(), Buffers.end()); - Payload = CompositeBuffer(std::move(SharedBuffers)).Flatten().AsIoBuffer(); - } - else - { - RequestCopyWriter.Finalize(); - Payload = IoBuffer(RequestCopyWriter.GetSaveSize()); - RequestCopyWriter.Save(Payload.GetMutableView()); + if (RequestInfo.ContentType == ZenContentType::kCbPackage) + { + RequestPackage.SetObject(RequestCopyWriter.Save()); + std::vector<IoBuffer> Buffers = FormatPackageMessage(RequestPackage); + std::vector<SharedBuffer> SharedBuffers(Buffers.begin(), Buffers.end()); + Payload = CompositeBuffer(std::move(SharedBuffers)).Flatten().AsIoBuffer(); + } + else + { + RequestCopyWriter.Finalize(); + Payload = IoBuffer(RequestCopyWriter.GetSaveSize()); + RequestCopyWriter.Save(Payload.GetMutableView()); + } } - } - - if (!m_DryRun) - { - StringBuilder<32> SessionIdString; - if (RequestInfo.SessionId != Oid::Zero) + if (!m_DryRun) { - RequestInfo.SessionId.ToString(SessionIdString); - } - else - { - GetSessionId().ToString(SessionIdString); - } + StringBuilder<32> SessionIdString; - Session.SetHeader({{"Content-Type", std::string(MapContentTypeToString(RequestInfo.ContentType))}, - {"Accept", std::string(MapContentTypeToString(RequestInfo.AcceptType))}, - {"UE-Session", std::string(SessionIdString)}}); - - uint64_t Offset = 0; - auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, Payload.GetSize() - Offset); - IoBuffer PayloadRange = IoBuffer(Payload, Offset, size); - MutableMemoryView Data(buffer, size); - Data.CopyFrom(PayloadRange.GetView()); - Offset += size; - return true; - }; - Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); - cpr::Response Response = Session.Post(); - BytesSent.fetch_add(Payload.GetSize()); - if (Response.error || !(IsHttpSuccessCode(Response.status_code) || - Response.status_code == gsl::narrow<long>(HttpResponseCode::NotFound))) - { - ZEN_CONSOLE("{}", FormatHttpResponse(Response)); - break; + if (RequestInfo.SessionId != Oid::Zero) + { + RequestInfo.SessionId.ToString(SessionIdString); + } + else + { + GetSessionId().ToString(SessionIdString); + } + + Session.SetHeader({{"Content-Type", std::string(MapContentTypeToString(RequestInfo.ContentType))}, + {"Accept", std::string(MapContentTypeToString(RequestInfo.AcceptType))}, + {"UE-Session", std::string(SessionIdString)}}); + + uint64_t Offset = 0; + auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) { + size = Min<size_t>(size, Payload.GetSize() - Offset); + IoBuffer PayloadRange = IoBuffer(Payload, Offset, size); + MutableMemoryView Data(buffer, size); + Data.CopyFrom(PayloadRange.GetView()); + Offset += size; + return true; + }; + Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); + cpr::Response Response = Session.Post(); + BytesSent.fetch_add(Payload.GetSize()); + if (Response.error || !(IsHttpSuccessCode(Response.status_code) || + Response.status_code == gsl::narrow<long>(HttpResponseCode::NotFound))) + { + ZEN_CONSOLE("{}", FormatHttpResponse(Response)); + break; + } + BytesReceived.fetch_add(Response.downloaded_bytes); } - BytesReceived.fetch_add(Response.downloaded_bytes); } EntryIndex = EntryOffset.fetch_add(m_Stride); diff --git a/src/zenutil/cache/rpcrecording.cpp b/src/zenutil/cache/rpcrecording.cpp index 00cecb8f7..c782f0920 100644 --- a/src/zenutil/cache/rpcrecording.cpp +++ b/src/zenutil/cache/rpcrecording.cpp @@ -4,7 +4,10 @@ #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> +#include <zencore/session.h> #include <zencore/system.h> +#include <zencore/testing.h> +#include <zencore/testutils.h> #include <zenutil/basicfile.h> #include <zenutil/cache/rpcrecording.h> @@ -230,7 +233,6 @@ public: } virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); } -private: virtual uint64_t GetRequestCount() const override { return m_RequestCount; } virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override @@ -239,6 +241,7 @@ private: } virtual ZenContentType GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; } +private: std::uint64_t m_RequestCount; RecordedRequestsReader m_RequestBuffer; }; @@ -257,9 +260,16 @@ struct RecordedRequest uint32_t Length; // 4 bytes ZenContentType ContentType; // 1 byte ZenContentType AcceptType; // 1 byte - uint8_t Padding; // 1 byte + uint8_t OffsetHigh; // 1 byte uint8_t Padding2; // 1 byte Oid SessionId; // 12 bytes + + inline uint64_t GetOffset() const { return uint64_t(Offset) + (uint64_t(OffsetHigh) << 32); } + inline void SetOffset(uint64_t NewOffset) + { + Offset = gsl::narrow_cast<uint32_t>(NewOffset & 0xffff'ffff); + OffsetHigh = gsl::narrow_cast<uint8_t>(NewOffset >> 32); + } }; static_assert(sizeof(RecordedRequest) == 24); @@ -273,6 +283,12 @@ const uint64_t LooseFileThreshold = 5000; // Somewhat arbitrary, but we try const uint64_t SegmentByteThreshold = 16ull * 1024 * 1024 * 1024; const TimeSpan SegmentTimeThreshold{/* hours */ 1, /* minutes */ 0, /* seconds */ 0}; +std::string +MakeSegmentPath(uint64_t SegmentIndex) +{ + return fmt::format("segment_{:06}", SegmentIndex); +} + struct RecordedRequestsSegmentWriter { RecordedRequestsSegmentWriter() = default; @@ -443,7 +459,7 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn .Length = uint32_t(RequestBufferSize & 0xffffFFFFu), .ContentType = RequestInfo.ContentType, .AcceptType = RequestInfo.AcceptType, - .Padding = 0, + .OffsetHigh = 0, .Padding2 = 0, .SessionId = RequestInfo.SessionId}); @@ -463,12 +479,15 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn BasicFile* BlockFile = m_BlockFiles[BlockIndex].get(); ZEN_ASSERT(BlockFile != nullptr); - Entry.Offset = uint32_t(m_ChunkOffset & 0xffffFFFF); - m_ChunkOffset = RoundUp(m_ChunkOffset + Entry.Length, 1u << 4u); + // Note that this is the overall logical offset, not the offset within a single file + const uint64_t ChunkWriteOffset = m_ChunkOffset; + m_ChunkOffset = RoundUp(ChunkWriteOffset + Entry.Length, 1u << 4u); Lock.ReleaseNow(); + Entry.SetOffset(ChunkWriteOffset); + std::error_code Ec; - BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), Entry.Offset - BlockIndex * RecordedRequestBlockSize, Ec); + BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), ChunkWriteOffset - (BlockIndex * RecordedRequestBlockSize), Ec); if (Ec) { Entry.Length = 0; @@ -531,7 +550,7 @@ RecordedRequestsSegmentReader::BeginRead(const std::filesystem::path& BasePath, { if (R.Offset != ~0u) { - MaxChunkPosition = Max(MaxChunkPosition, R.Offset + R.Length); + MaxChunkPosition = Max(MaxChunkPosition, R.GetOffset() + R.Length); } } uint32_t BlockCount = gsl::narrow<uint32_t>(MaxChunkPosition / RecordedRequestBlockSize) + 1; @@ -574,9 +593,10 @@ RecordedRequestsSegmentReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutB if (Entry.Offset != ~0u) { // Inline in block file - uint32_t BlockIndex = gsl::narrow<uint32_t>((Entry.Offset + Entry.Length) / RecordedRequestBlockSize); - uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize); - OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length); + const uint64_t EntryOffset = Entry.GetOffset(); + const uint32_t BlockIndex = gsl::narrow<uint32_t>((EntryOffset + Entry.Length) / RecordedRequestBlockSize); + const uint64_t ChunkOffset = EntryOffset - (BlockIndex * RecordedRequestBlockSize); + OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length); return RequestInfo; } @@ -638,7 +658,7 @@ RecordedRequestsWriter::EnsureCurrentSegment() const uint64_t SegmentIndex = m_FinishedSegments.size(); m_CurrentWriter = std::make_unique<RecordedRequestsSegmentWriter>(); - m_CurrentWriter->BeginWrite(m_BasePath / fmt::format("segment_{:06}", SegmentIndex), SegmentIndex, m_NextSegmentBaseIndex); + m_CurrentWriter->BeginWrite(m_BasePath / MakeSegmentPath(SegmentIndex), SegmentIndex, m_NextSegmentBaseIndex); } return *m_CurrentWriter; @@ -735,26 +755,89 @@ RecordedRequestsReader::BeginRead(const std::filesystem::path& BasePath, bool In m_InMemory = InMemory; m_BasePath = BasePath; - BasicFile InfoFile; - InfoFile.Open(m_BasePath / "rpc_recording_info.zcb", BasicFile::Mode::kRead); - CbObject CbInfo = LoadCompactBinaryObject(InfoFile.ReadAll()); + std::error_code Ec; + BasicFile InfoFile; + InfoFile.Open(m_BasePath / "rpc_recording_info.zcb", BasicFile::Mode::kRead, Ec); + + if (!Ec) + { + try + { + CbObject CbInfo = LoadCompactBinaryObject(InfoFile.ReadAll()); + + uint64_t TotalRequestCount = 0; + uint64_t MaxSegmentIndex = 0; + + for (auto SegmentElement : CbInfo["segments"]) + { + CbObjectView Segment = SegmentElement.AsObjectView(); + + const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment"sv].AsUInt64(), + .BaseRequestIndex = Segment["base_index"sv].AsUInt64(), + .RequestCount = Segment["request_count"sv].AsUInt64(), + .RequestBytes = Segment["request_bytes"sv].AsUInt64(), + .StartTime = Segment["start_time"sv].AsDateTime(), + .EndTime = Segment["end_time"sv].AsDateTime()}); + + TotalRequestCount += Info.RequestCount; + MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex); + } + + m_SegmentReaders.resize(MaxSegmentIndex + 1); + + return TotalRequestCount; + } + catch (std::exception& Ex) + { + ZEN_WARN("could not read metadata file: {}", Ex.what()); + } + } + + ZEN_INFO("recovering segment info for '{}'", BasePath); uint64_t TotalRequestCount = 0; uint64_t MaxSegmentIndex = 0; - for (auto SegmentElement : CbInfo["segments"]) + try { - CbObjectView Segment = SegmentElement.AsObjectView(); + for (int SegmentIndex = 0;; ++SegmentIndex) + { + const std::filesystem::path ZcbPath = BasePath / MakeSegmentPath(SegmentIndex) / "rpc_segment_info.zcb"; + FileContents Fc = ReadFile(ZcbPath); + + if (Fc.ErrorCode) + break; + + if (IoBuffer SegmentInfoBuffer = Fc.Flatten()) + { + CbObject Segment = LoadCompactBinaryObject(SegmentInfoBuffer); - const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment"sv].AsUInt64(), - .BaseRequestIndex = Segment["base_index"sv].AsUInt64(), - .RequestCount = Segment["request_count"sv].AsUInt64(), - .RequestBytes = Segment["request_bytes"sv].AsUInt64(), - .StartTime = Segment["start_time"sv].AsDateTime(), - .EndTime = Segment["end_time"sv].AsDateTime()}); + const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment_index"sv].AsUInt64(), + .BaseRequestIndex = 0, + .RequestCount = Segment["entry_count"sv].AsUInt64(), + .RequestBytes = 0, + .StartTime = Segment["time_start"sv].AsDateTime(), + .EndTime = Segment["time_end"sv].AsDateTime()}); - TotalRequestCount += Info.RequestCount; - MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex); + TotalRequestCount += Info.RequestCount; + MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex); + } + } + } + catch (std::exception&) + { + } + + std::sort(begin(m_KnownSegments), end(m_KnownSegments), [](const auto& Lhs, const auto& Rhs) { + return Lhs.SegmentIndex < Rhs.SegmentIndex; + }); + + uint64_t SegmentRequestOffset = 0; + + for (SegmentInfo& Info : m_KnownSegments) + { + Info.BaseRequestIndex = SegmentRequestOffset; + SegmentRequestOffset += Info.RequestCount; } m_SegmentReaders.resize(MaxSegmentIndex + 1); @@ -791,7 +874,7 @@ RecordedRequestsReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) if (!SegmentReaderPtr) { RecordedRequestsSegmentReader* NewSegment = new RecordedRequestsSegmentReader; - NewSegment->BeginRead(m_BasePath / fmt::format("segment_{:06}", SegmentIndex), m_InMemory); + NewSegment->BeginRead(m_BasePath / MakeSegmentPath(SegmentIndex), m_InMemory); SegmentReaderPtr.reset(NewSegment); } @@ -821,7 +904,6 @@ public: DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); } virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); } -private: virtual uint64_t RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override { return m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer); @@ -829,6 +911,7 @@ private: virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {} virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {} +private: RecordedRequestsWriter m_RecordedRequests; }; @@ -837,23 +920,41 @@ class DiskRequestReplayer : public IRpcRequestReplayer public: DiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory) { - m_RequestCount = m_RequestBuffer.BeginRead(BasePath, InMemory); + m_RequestCount = m_RequestReader.BeginRead(BasePath, InMemory); } - virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); } + virtual ~DiskRequestReplayer() { m_RequestReader.EndRead(); } + + static bool IsCompatible(const std::filesystem::path& BasePath) + { + if (std::filesystem::exists(BasePath / "rpc_recording_info.zcb")) + { + return true; + } - static bool IsCompatible(const std::filesystem::path& BasePath) { return std::filesystem::exists(BasePath / "rpc_recording_info.zcb"); } + const std::filesystem::path SegmentZero = BasePath / MakeSegmentPath(0); + + if (std::filesystem::exists(SegmentZero / "rpc_segment_info.zcb") && std::filesystem::exists(SegmentZero / "index.bin")) + { + // top-level metadata is missing, possibly because of premature exit + // on the recording side + + return true; + } + + return false; + } -private: virtual uint64_t GetRequestCount() const override { return m_RequestCount; } virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override { - return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer); + return m_RequestReader.ReadRequest(RequestIndex, OutBuffer); } virtual ZenContentType GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; } +private: std::uint64_t m_RequestCount; - RecordedRequestsReader m_RequestBuffer; + RecordedRequestsReader m_RequestReader; }; } // namespace zen::cache::v2 @@ -881,4 +982,66 @@ MakeDiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory) } } +#if ZEN_WITH_TESTS + +void +rpcrecord_forcelink() +{ +} + +TEST_SUITE_BEGIN("rpc.recording"); + +TEST_CASE("rpc.record") +{ + ScopedTemporaryDirectory TempDir; + auto Path = TempDir.Path(); + + const Oid SessionId = GetSessionId(); + + using namespace std::literals; + + { + cache::v2::DiskRequestRecorder Recorder{Path}; + + for (int i = 0; i < 1000; ++i) + { + RecordedRequestInfo RequestInfo{.ContentType = ZenContentType::kCbObject, + .AcceptType = ZenContentType::kCbObject, + .SessionId = SessionId}; + + CbObjectWriter RequestPayload; + RequestPayload << "test"sv << true; + RequestPayload << "index"sv << i; + CbObject Req = RequestPayload.Save(); + IoBuffer RequestBuffer = Req.GetBuffer().AsIoBuffer(); + + const uint64_t Index = Recorder.RecordRequest(RequestInfo, RequestBuffer); + + CHECK(Index == i); + } + } + + { + cache::v2::DiskRequestReplayer Replayer{Path, false}; + + for (int i = 0; i < 1000; ++i) + { + IoBuffer RequestBuffer; + RecordedRequestInfo RequestInfo = Replayer.GetRequest(i, RequestBuffer); + + CHECK(RequestInfo.AcceptType == ZenContentType::kCbObject); + CHECK(RequestInfo.ContentType == ZenContentType::kCbObject); + CHECK(RequestInfo.SessionId == SessionId); + + CbObject Req = LoadCompactBinaryObject(RequestBuffer); + CHECK_EQ(Req["index"sv].AsInt32(), i); + CHECK_EQ(Req["test"sv].AsBool(), true); + } + } +} + +TEST_SUITE_END(); + +#endif + } // namespace zen::cache diff --git a/src/zenutil/include/zenutil/cache/rpcrecording.h b/src/zenutil/include/zenutil/cache/rpcrecording.h index fd5df26ad..ab9b92dd3 100644 --- a/src/zenutil/include/zenutil/cache/rpcrecording.h +++ b/src/zenutil/include/zenutil/cache/rpcrecording.h @@ -4,6 +4,9 @@ #include <zencore/compositebuffer.h> #include <zencore/iobuffer.h> +#include <zencore/uid.h> + +#include <compare> namespace zen::cache { @@ -13,6 +16,7 @@ struct RecordedRequestInfo ZenContentType AcceptType; Oid SessionId; + inline std::strong_ordering operator<=>(const RecordedRequestInfo& Rhs) const = default; static const RecordedRequestInfo NullRequest; }; @@ -37,4 +41,6 @@ public: std::unique_ptr<cache::IRpcRequestRecorder> MakeDiskRequestRecorder(const std::filesystem::path& BasePath); std::unique_ptr<cache::IRpcRequestReplayer> MakeDiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory); +void rpcrecord_forcelink(); + } // namespace zen::cache diff --git a/src/zenutil/zenutil.cpp b/src/zenutil/zenutil.cpp index df075ea3f..d9d6c83a2 100644 --- a/src/zenutil/zenutil.cpp +++ b/src/zenutil/zenutil.cpp @@ -5,6 +5,7 @@ #if ZEN_WITH_TESTS # include <zenutil/basicfile.h> +# include <zenutil/cache/rpcrecording.h> namespace zen { @@ -12,6 +13,7 @@ void zenutil_forcelinktests() { basicfile_forcelink(); + cache::rpcrecord_forcelink(); } } // namespace zen |