diff options
| author | Stefan Boberg <[email protected]> | 2023-12-15 14:09:12 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-12-15 14:09:12 +0100 |
| commit | 9266e40239f241b7e38fa93719004b323ddedf10 (patch) | |
| tree | 01ac8a3b38aa5913edab7ba10f299a48a01d8fe0 /src/zenutil/cache | |
| parent | windows executable signing (#566) (diff) | |
| download | zen-9266e40239f241b7e38fa93719004b323ddedf10.tar.xz zen-9266e40239f241b7e38fa93719004b323ddedf10.zip | |
fixed v2 rpc recording issue with >4GB data per segment (#612)
* fixed v2 rpc recording issue with >4GB data per segment
* implemented recovery logic to deal with partial RPC recordings
* added check for invalid/null requests in RPC replay
* also made sure at least one worker thread is configured
* fix problem where "null" requests would cause infinite loop!
* added basic RPC recorder tests
Diffstat (limited to 'src/zenutil/cache')
| -rw-r--r-- | src/zenutil/cache/rpcrecording.cpp | 227 |
1 files changed, 195 insertions, 32 deletions
diff --git a/src/zenutil/cache/rpcrecording.cpp b/src/zenutil/cache/rpcrecording.cpp index 00cecb8f7..c782f0920 100644 --- a/src/zenutil/cache/rpcrecording.cpp +++ b/src/zenutil/cache/rpcrecording.cpp @@ -4,7 +4,10 @@ #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> +#include <zencore/session.h> #include <zencore/system.h> +#include <zencore/testing.h> +#include <zencore/testutils.h> #include <zenutil/basicfile.h> #include <zenutil/cache/rpcrecording.h> @@ -230,7 +233,6 @@ public: } virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); } -private: virtual uint64_t GetRequestCount() const override { return m_RequestCount; } virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override @@ -239,6 +241,7 @@ private: } virtual ZenContentType GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; } +private: std::uint64_t m_RequestCount; RecordedRequestsReader m_RequestBuffer; }; @@ -257,9 +260,16 @@ struct RecordedRequest uint32_t Length; // 4 bytes ZenContentType ContentType; // 1 byte ZenContentType AcceptType; // 1 byte - uint8_t Padding; // 1 byte + uint8_t OffsetHigh; // 1 byte uint8_t Padding2; // 1 byte Oid SessionId; // 12 bytes + + inline uint64_t GetOffset() const { return uint64_t(Offset) + (uint64_t(OffsetHigh) << 32); } + inline void SetOffset(uint64_t NewOffset) + { + Offset = gsl::narrow_cast<uint32_t>(NewOffset & 0xffff'ffff); + OffsetHigh = gsl::narrow_cast<uint8_t>(NewOffset >> 32); + } }; static_assert(sizeof(RecordedRequest) == 24); @@ -273,6 +283,12 @@ const uint64_t LooseFileThreshold = 5000; // Somewhat arbitrary, but we try const uint64_t SegmentByteThreshold = 16ull * 1024 * 1024 * 1024; const TimeSpan SegmentTimeThreshold{/* hours */ 1, /* minutes */ 0, /* seconds */ 0}; +std::string +MakeSegmentPath(uint64_t SegmentIndex) +{ + return fmt::format("segment_{:06}", SegmentIndex); +} + struct RecordedRequestsSegmentWriter { RecordedRequestsSegmentWriter() = default; @@ -443,7 +459,7 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn .Length = uint32_t(RequestBufferSize & 0xffffFFFFu), .ContentType = RequestInfo.ContentType, .AcceptType = RequestInfo.AcceptType, - .Padding = 0, + .OffsetHigh = 0, .Padding2 = 0, .SessionId = RequestInfo.SessionId}); @@ -463,12 +479,15 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn BasicFile* BlockFile = m_BlockFiles[BlockIndex].get(); ZEN_ASSERT(BlockFile != nullptr); - Entry.Offset = uint32_t(m_ChunkOffset & 0xffffFFFF); - m_ChunkOffset = RoundUp(m_ChunkOffset + Entry.Length, 1u << 4u); + // Note that this is the overall logical offset, not the offset within a single file + const uint64_t ChunkWriteOffset = m_ChunkOffset; + m_ChunkOffset = RoundUp(ChunkWriteOffset + Entry.Length, 1u << 4u); Lock.ReleaseNow(); + Entry.SetOffset(ChunkWriteOffset); + std::error_code Ec; - BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), Entry.Offset - BlockIndex * RecordedRequestBlockSize, Ec); + BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), ChunkWriteOffset - (BlockIndex * RecordedRequestBlockSize), Ec); if (Ec) { Entry.Length = 0; @@ -531,7 +550,7 @@ RecordedRequestsSegmentReader::BeginRead(const std::filesystem::path& BasePath, { if (R.Offset != ~0u) { - MaxChunkPosition = Max(MaxChunkPosition, R.Offset + R.Length); + MaxChunkPosition = Max(MaxChunkPosition, R.GetOffset() + R.Length); } } uint32_t BlockCount = gsl::narrow<uint32_t>(MaxChunkPosition / RecordedRequestBlockSize) + 1; @@ -574,9 +593,10 @@ RecordedRequestsSegmentReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutB if (Entry.Offset != ~0u) { // Inline in block file - uint32_t BlockIndex = gsl::narrow<uint32_t>((Entry.Offset + Entry.Length) / RecordedRequestBlockSize); - uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize); - OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length); + const uint64_t EntryOffset = Entry.GetOffset(); + const uint32_t BlockIndex = gsl::narrow<uint32_t>((EntryOffset + Entry.Length) / RecordedRequestBlockSize); + const uint64_t ChunkOffset = EntryOffset - (BlockIndex * RecordedRequestBlockSize); + OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length); return RequestInfo; } @@ -638,7 +658,7 @@ RecordedRequestsWriter::EnsureCurrentSegment() const uint64_t SegmentIndex = m_FinishedSegments.size(); m_CurrentWriter = std::make_unique<RecordedRequestsSegmentWriter>(); - m_CurrentWriter->BeginWrite(m_BasePath / fmt::format("segment_{:06}", SegmentIndex), SegmentIndex, m_NextSegmentBaseIndex); + m_CurrentWriter->BeginWrite(m_BasePath / MakeSegmentPath(SegmentIndex), SegmentIndex, m_NextSegmentBaseIndex); } return *m_CurrentWriter; @@ -735,26 +755,89 @@ RecordedRequestsReader::BeginRead(const std::filesystem::path& BasePath, bool In m_InMemory = InMemory; m_BasePath = BasePath; - BasicFile InfoFile; - InfoFile.Open(m_BasePath / "rpc_recording_info.zcb", BasicFile::Mode::kRead); - CbObject CbInfo = LoadCompactBinaryObject(InfoFile.ReadAll()); + std::error_code Ec; + BasicFile InfoFile; + InfoFile.Open(m_BasePath / "rpc_recording_info.zcb", BasicFile::Mode::kRead, Ec); + + if (!Ec) + { + try + { + CbObject CbInfo = LoadCompactBinaryObject(InfoFile.ReadAll()); + + uint64_t TotalRequestCount = 0; + uint64_t MaxSegmentIndex = 0; + + for (auto SegmentElement : CbInfo["segments"]) + { + CbObjectView Segment = SegmentElement.AsObjectView(); + + const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment"sv].AsUInt64(), + .BaseRequestIndex = Segment["base_index"sv].AsUInt64(), + .RequestCount = Segment["request_count"sv].AsUInt64(), + .RequestBytes = Segment["request_bytes"sv].AsUInt64(), + .StartTime = Segment["start_time"sv].AsDateTime(), + .EndTime = Segment["end_time"sv].AsDateTime()}); + + TotalRequestCount += Info.RequestCount; + MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex); + } + + m_SegmentReaders.resize(MaxSegmentIndex + 1); + + return TotalRequestCount; + } + catch (std::exception& Ex) + { + ZEN_WARN("could not read metadata file: {}", Ex.what()); + } + } + + ZEN_INFO("recovering segment info for '{}'", BasePath); uint64_t TotalRequestCount = 0; uint64_t MaxSegmentIndex = 0; - for (auto SegmentElement : CbInfo["segments"]) + try { - CbObjectView Segment = SegmentElement.AsObjectView(); + for (int SegmentIndex = 0;; ++SegmentIndex) + { + const std::filesystem::path ZcbPath = BasePath / MakeSegmentPath(SegmentIndex) / "rpc_segment_info.zcb"; + FileContents Fc = ReadFile(ZcbPath); + + if (Fc.ErrorCode) + break; + + if (IoBuffer SegmentInfoBuffer = Fc.Flatten()) + { + CbObject Segment = LoadCompactBinaryObject(SegmentInfoBuffer); - const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment"sv].AsUInt64(), - .BaseRequestIndex = Segment["base_index"sv].AsUInt64(), - .RequestCount = Segment["request_count"sv].AsUInt64(), - .RequestBytes = Segment["request_bytes"sv].AsUInt64(), - .StartTime = Segment["start_time"sv].AsDateTime(), - .EndTime = Segment["end_time"sv].AsDateTime()}); + const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment_index"sv].AsUInt64(), + .BaseRequestIndex = 0, + .RequestCount = Segment["entry_count"sv].AsUInt64(), + .RequestBytes = 0, + .StartTime = Segment["time_start"sv].AsDateTime(), + .EndTime = Segment["time_end"sv].AsDateTime()}); - TotalRequestCount += Info.RequestCount; - MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex); + TotalRequestCount += Info.RequestCount; + MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex); + } + } + } + catch (std::exception&) + { + } + + std::sort(begin(m_KnownSegments), end(m_KnownSegments), [](const auto& Lhs, const auto& Rhs) { + return Lhs.SegmentIndex < Rhs.SegmentIndex; + }); + + uint64_t SegmentRequestOffset = 0; + + for (SegmentInfo& Info : m_KnownSegments) + { + Info.BaseRequestIndex = SegmentRequestOffset; + SegmentRequestOffset += Info.RequestCount; } m_SegmentReaders.resize(MaxSegmentIndex + 1); @@ -791,7 +874,7 @@ RecordedRequestsReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) if (!SegmentReaderPtr) { RecordedRequestsSegmentReader* NewSegment = new RecordedRequestsSegmentReader; - NewSegment->BeginRead(m_BasePath / fmt::format("segment_{:06}", SegmentIndex), m_InMemory); + NewSegment->BeginRead(m_BasePath / MakeSegmentPath(SegmentIndex), m_InMemory); SegmentReaderPtr.reset(NewSegment); } @@ -821,7 +904,6 @@ public: DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); } virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); } -private: virtual uint64_t RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override { return m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer); @@ -829,6 +911,7 @@ private: virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {} virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {} +private: RecordedRequestsWriter m_RecordedRequests; }; @@ -837,23 +920,41 @@ class DiskRequestReplayer : public IRpcRequestReplayer public: DiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory) { - m_RequestCount = m_RequestBuffer.BeginRead(BasePath, InMemory); + m_RequestCount = m_RequestReader.BeginRead(BasePath, InMemory); } - virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); } + virtual ~DiskRequestReplayer() { m_RequestReader.EndRead(); } + + static bool IsCompatible(const std::filesystem::path& BasePath) + { + if (std::filesystem::exists(BasePath / "rpc_recording_info.zcb")) + { + return true; + } - static bool IsCompatible(const std::filesystem::path& BasePath) { return std::filesystem::exists(BasePath / "rpc_recording_info.zcb"); } + const std::filesystem::path SegmentZero = BasePath / MakeSegmentPath(0); + + if (std::filesystem::exists(SegmentZero / "rpc_segment_info.zcb") && std::filesystem::exists(SegmentZero / "index.bin")) + { + // top-level metadata is missing, possibly because of premature exit + // on the recording side + + return true; + } + + return false; + } -private: virtual uint64_t GetRequestCount() const override { return m_RequestCount; } virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override { - return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer); + return m_RequestReader.ReadRequest(RequestIndex, OutBuffer); } virtual ZenContentType GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; } +private: std::uint64_t m_RequestCount; - RecordedRequestsReader m_RequestBuffer; + RecordedRequestsReader m_RequestReader; }; } // namespace zen::cache::v2 @@ -881,4 +982,66 @@ MakeDiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory) } } +#if ZEN_WITH_TESTS + +void +rpcrecord_forcelink() +{ +} + +TEST_SUITE_BEGIN("rpc.recording"); + +TEST_CASE("rpc.record") +{ + ScopedTemporaryDirectory TempDir; + auto Path = TempDir.Path(); + + const Oid SessionId = GetSessionId(); + + using namespace std::literals; + + { + cache::v2::DiskRequestRecorder Recorder{Path}; + + for (int i = 0; i < 1000; ++i) + { + RecordedRequestInfo RequestInfo{.ContentType = ZenContentType::kCbObject, + .AcceptType = ZenContentType::kCbObject, + .SessionId = SessionId}; + + CbObjectWriter RequestPayload; + RequestPayload << "test"sv << true; + RequestPayload << "index"sv << i; + CbObject Req = RequestPayload.Save(); + IoBuffer RequestBuffer = Req.GetBuffer().AsIoBuffer(); + + const uint64_t Index = Recorder.RecordRequest(RequestInfo, RequestBuffer); + + CHECK(Index == i); + } + } + + { + cache::v2::DiskRequestReplayer Replayer{Path, false}; + + for (int i = 0; i < 1000; ++i) + { + IoBuffer RequestBuffer; + RecordedRequestInfo RequestInfo = Replayer.GetRequest(i, RequestBuffer); + + CHECK(RequestInfo.AcceptType == ZenContentType::kCbObject); + CHECK(RequestInfo.ContentType == ZenContentType::kCbObject); + CHECK(RequestInfo.SessionId == SessionId); + + CbObject Req = LoadCompactBinaryObject(RequestBuffer); + CHECK_EQ(Req["index"sv].AsInt32(), i); + CHECK_EQ(Req["test"sv].AsBool(), true); + } + } +} + +TEST_SUITE_END(); + +#endif + } // namespace zen::cache |