diff options
| author | Stefan Boberg <[email protected]> | 2023-12-19 12:06:13 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-12-19 12:06:13 +0100 |
| commit | 519d942d809e740a3b1fe5a1f6a57a4cfe43408b (patch) | |
| tree | 9b3c084e21bb7fd5e6bb3335e890647062d0703b /src/zenutil | |
| parent | added mimalloc_hooks (diff) | |
| parent | ensure we can build without trace (#619) (diff) | |
| download | zen-273-integrated-memory-tracking.tar.xz zen-273-integrated-memory-tracking.zip | |
Merge branch 'main' into 273-integrated-memory-tracking273-integrated-memory-tracking
Diffstat (limited to 'src/zenutil')
| -rw-r--r-- | src/zenutil/basicfile.cpp | 32 | ||||
| -rw-r--r-- | src/zenutil/cache/rpcrecording.cpp | 341 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/cache/rpcrecording.h | 6 | ||||
| -rw-r--r-- | src/zenutil/logging.cpp | 33 | ||||
| -rw-r--r-- | src/zenutil/zenutil.cpp | 2 |
5 files changed, 313 insertions, 101 deletions
diff --git a/src/zenutil/basicfile.cpp b/src/zenutil/basicfile.cpp index 173b22449..024b1e5bf 100644 --- a/src/zenutil/basicfile.cpp +++ b/src/zenutil/basicfile.cpp @@ -241,7 +241,8 @@ BasicFile::Read(void* Data, uint64_t BytesToRead, uint64_t FileOffset) if (!Success) { - ThrowLastError(fmt::format("Failed to read from file '{}'", zen::PathFromHandle(m_FileHandle))); + std::error_code Dummy; + ThrowLastError(fmt::format("Failed to read from file '{}'", zen::PathFromHandle(m_FileHandle, Dummy))); } BytesToRead -= NumberOfBytesToRead; @@ -374,7 +375,8 @@ BasicFile::Write(const void* Data, uint64_t Size, uint64_t Offset) if (Ec) { - throw std::system_error(Ec, fmt::format("Failed to write to file '{}'", zen::PathFromHandle(m_FileHandle))); + std::error_code Dummy; + throw std::system_error(Ec, fmt::format("Failed to write to file '{}'", zen::PathFromHandle(m_FileHandle, Dummy))); } } @@ -426,7 +428,8 @@ BasicFile::FileSize() int Error = zen::GetLastError(); if (Error) { - ThrowSystemError(Error, fmt::format("Failed to get file size from file '{}'", PathFromHandle(m_FileHandle))); + std::error_code Dummy; + ThrowSystemError(Error, fmt::format("Failed to get file size from file '{}'", PathFromHandle(m_FileHandle, Dummy))); } } return uint64_t(liFileSize.QuadPart); @@ -436,7 +439,8 @@ BasicFile::FileSize() struct stat Stat; if (fstat(Fd, &Stat) == -1) { - ThrowSystemError(GetLastError(), fmt::format("Failed to get file size from file '{}'", PathFromHandle(m_FileHandle))); + std::error_code Dummy; + ThrowSystemError(GetLastError(), fmt::format("Failed to get file size from file '{}'", PathFromHandle(m_FileHandle, Dummy))); } return uint64_t(Stat.st_size); #endif @@ -483,7 +487,9 @@ BasicFile::SetFileSize(uint64_t FileSize) int Error = zen::GetLastError(); if (Error) { - ThrowSystemError(Error, fmt::format("Failed to set file pointer to {} for file {}", FileSize, PathFromHandle(m_FileHandle))); + std::error_code Dummy; + ThrowSystemError(Error, + fmt::format("Failed to set file pointer to {} for file {}", FileSize, PathFromHandle(m_FileHandle, Dummy))); } } OK = ::SetEndOfFile(m_FileHandle); @@ -492,7 +498,9 @@ BasicFile::SetFileSize(uint64_t FileSize) int Error = zen::GetLastError(); if (Error) { - ThrowSystemError(Error, fmt::format("Failed to set end of file to {} for file {}", FileSize, PathFromHandle(m_FileHandle))); + std::error_code Dummy; + ThrowSystemError(Error, + fmt::format("Failed to set end of file to {} for file {}", FileSize, PathFromHandle(m_FileHandle, Dummy))); } } #elif ZEN_PLATFORM_MAC @@ -502,7 +510,9 @@ BasicFile::SetFileSize(uint64_t FileSize) int Error = zen::GetLastError(); if (Error) { - ThrowSystemError(Error, fmt::format("Failed to set truncate file to {} for file {}", FileSize, PathFromHandle(m_FileHandle))); + std::error_code Dummy; + ThrowSystemError(Error, + fmt::format("Failed to set truncate file to {} for file {}", FileSize, PathFromHandle(m_FileHandle, Dummy))); } } #else @@ -512,7 +522,9 @@ BasicFile::SetFileSize(uint64_t FileSize) int Error = zen::GetLastError(); if (Error) { - ThrowSystemError(Error, fmt::format("Failed to set truncate file to {} for file {}", FileSize, PathFromHandle(m_FileHandle))); + std::error_code Dummy; + ThrowSystemError(Error, + fmt::format("Failed to set truncate file to {} for file {}", FileSize, PathFromHandle(m_FileHandle, Dummy))); } } if (FileSize > 0) @@ -520,7 +532,9 @@ BasicFile::SetFileSize(uint64_t FileSize) int Error = posix_fallocate64(Fd, 0, (off64_t)FileSize); if (Error) { - ThrowSystemError(Error, fmt::format("Failed to allocate space of {} for file {}", FileSize, PathFromHandle(m_FileHandle))); + std::error_code Dummy; + ThrowSystemError(Error, + fmt::format("Failed to allocate space of {} for file {}", FileSize, PathFromHandle(m_FileHandle, Dummy))); } } #endif diff --git a/src/zenutil/cache/rpcrecording.cpp b/src/zenutil/cache/rpcrecording.cpp index 054ac0e56..b8f9d65ef 100644 --- a/src/zenutil/cache/rpcrecording.cpp +++ b/src/zenutil/cache/rpcrecording.cpp @@ -2,8 +2,12 @@ #include <zencore/compactbinarybuilder.h> #include <zencore/filesystem.h> +#include <zencore/fmtutils.h> #include <zencore/logging.h> +#include <zencore/session.h> #include <zencore/system.h> +#include <zencore/testing.h> +#include <zencore/testutils.h> #include <zenutil/basicfile.h> #include <zenutil/cache/rpcrecording.h> @@ -229,7 +233,6 @@ public: } virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); } -private: virtual uint64_t GetRequestCount() const override { return m_RequestCount; } virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override @@ -238,6 +241,7 @@ private: } virtual ZenContentType GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; } +private: std::uint64_t m_RequestCount; RecordedRequestsReader m_RequestBuffer; }; @@ -256,15 +260,22 @@ struct RecordedRequest uint32_t Length; // 4 bytes ZenContentType ContentType; // 1 byte ZenContentType AcceptType; // 1 byte - uint8_t Padding; // 1 byte + uint8_t OffsetHigh; // 1 byte uint8_t Padding2; // 1 byte Oid SessionId; // 12 bytes + + inline uint64_t GetOffset() const { return uint64_t(Offset) + (uint64_t(OffsetHigh) << 32); } + inline void SetOffset(uint64_t NewOffset) + { + Offset = gsl::narrow_cast<uint32_t>(NewOffset & 0xffff'ffff); + OffsetHigh = gsl::narrow_cast<uint8_t>(NewOffset >> 32); + } }; static_assert(sizeof(RecordedRequest) == 24); const uint64_t RecordedRequestBlockSize = 1 * 1024 * 1024 * 1024; // 1GiB -const uint64_t StandaloneFileSizeThreshold = 1 * 1024 * 1024ull; // 1MiB +const uint64_t StandaloneFileSizeThreshold = 16 * 1024 * 1024ull; // 16MiB const uint64_t SegmentRequestCount = 10 * 1000 * 1000; const uint64_t LooseFileThreshold = 5000; // Somewhat arbitrary, but we try to keep the // number of files in a directory below this level @@ -272,6 +283,12 @@ const uint64_t LooseFileThreshold = 5000; // Somewhat arbitrary, but we try const uint64_t SegmentByteThreshold = 16ull * 1024 * 1024 * 1024; const TimeSpan SegmentTimeThreshold{/* hours */ 1, /* minutes */ 0, /* seconds */ 0}; +std::string +MakeSegmentPath(uint64_t SegmentIndex) +{ + return fmt::format("segment_{:06}", SegmentIndex); +} + struct RecordedRequestsSegmentWriter { RecordedRequestsSegmentWriter() = default; @@ -291,6 +308,7 @@ struct RecordedRequestsSegmentWriter return m_RequestCount; } + RwLock::SharedLockScope _(m_Lock); return m_Entries.size(); } inline uint64_t GetBaseRequestIndex() const { return m_RequestBaseIndex; } @@ -321,6 +339,7 @@ struct RecordedRequestsWriter RecordedRequestsSegmentWriter& EnsureCurrentSegment(); void CommitCurrentSegment(RwLock::ExclusiveLockScope&); void EndWrite(); + void WriteRecordingMetadata(); uint64_t WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer); private: @@ -434,50 +453,58 @@ uint64_t RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) { const uint64_t RequestBufferSize = RequestBuffer.GetSize(); + uint64_t RequestIndex = ~0ull; - RwLock::ExclusiveLockScope Lock(m_Lock); - uint64_t RequestIndex = m_Entries.size(); - RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0u, - .Length = uint32_t(RequestBufferSize & 0xffffFFFFu), - .ContentType = RequestInfo.ContentType, - .AcceptType = RequestInfo.AcceptType, - .Padding = 0, - .Padding2 = 0, - .SessionId = RequestInfo.SessionId}); - - if (Entry.Length < StandaloneFileSizeThreshold) { - const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize); + RwLock::ExclusiveLockScope Lock(m_Lock); + RequestIndex = m_Entries.size(); + RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0u, + .Length = uint32_t(RequestBufferSize & 0xffffFFFFu), + .ContentType = RequestInfo.ContentType, + .AcceptType = RequestInfo.AcceptType, + .OffsetHigh = 0, + .Padding2 = 0, + .SessionId = RequestInfo.SessionId}); - if (BlockIndex == m_BlockFiles.size()) + if (Entry.Length < StandaloneFileSizeThreshold) { - std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>()); - NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate); - m_ChunkOffset = BlockIndex * RecordedRequestBlockSize; - ++m_FileCount; - } + const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize); - ZEN_ASSERT(BlockIndex < m_BlockFiles.size()); - BasicFile* BlockFile = m_BlockFiles[BlockIndex].get(); - ZEN_ASSERT(BlockFile != nullptr); + if (BlockIndex == m_BlockFiles.size()) + { + std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>()); + NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate); + m_ChunkOffset = BlockIndex * RecordedRequestBlockSize; + ++m_FileCount; + } - Entry.Offset = uint32_t(m_ChunkOffset & 0xffffFFFF); - m_ChunkOffset = RoundUp(m_ChunkOffset + Entry.Length, 1u << 4u); - Lock.ReleaseNow(); + ZEN_ASSERT(BlockIndex < m_BlockFiles.size()); + BasicFile* BlockFile = m_BlockFiles[BlockIndex].get(); + ZEN_ASSERT(BlockFile != nullptr); - std::error_code Ec; - BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), Entry.Offset - BlockIndex * RecordedRequestBlockSize, Ec); - if (Ec) - { - Entry.Length = 0; - return ~0ull; - } + // Note that this is the overall logical offset, not the offset within a single file + const uint64_t ChunkWriteOffset = m_ChunkOffset; + m_ChunkOffset = RoundUp(ChunkWriteOffset + Entry.Length, 1u << 4u); + Entry.SetOffset(ChunkWriteOffset); + Lock.ReleaseNow(); - m_RequestsByteCount.fetch_add(RequestBuffer.GetSize()); + std::error_code Ec; + BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), ChunkWriteOffset - (BlockIndex * RecordedRequestBlockSize), Ec); + if (Ec) + { + // We cannot simply use `Entry` here because the vector may + // have been reallocated causing the entry to be in a different + // location + RwLock::ExclusiveLockScope _(m_Lock); + m_Entries[RequestIndex].Length = 0; + return ~0ull; + } - return RequestIndex; + m_RequestsByteCount.fetch_add(RequestBuffer.GetSize()); + + return RequestIndex; + } } - Lock.ReleaseNow(); // Write request data to standalone file @@ -491,7 +518,8 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn // The exact value of the entry is not important, we will use // the size of the standalone file regardless when performing // the read - Entry.Length = std::numeric_limits<uint32_t>::max(); + RwLock::ExclusiveLockScope _(m_Lock); + m_Entries[RequestIndex].Length = std::numeric_limits<uint32_t>::max(); } ++m_FileCount; @@ -501,7 +529,8 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn if (Ec) { - Entry.Length = 0; + RwLock::ExclusiveLockScope _(m_Lock); + m_Entries[RequestIndex].Length = 0; return ~0ull; } @@ -511,7 +540,8 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn } catch (std::exception&) { - Entry.Length = 0; + RwLock::ExclusiveLockScope _(m_Lock); + m_Entries[RequestIndex].Length = 0; return ~0ull; } } @@ -529,7 +559,7 @@ RecordedRequestsSegmentReader::BeginRead(const std::filesystem::path& BasePath, { if (R.Offset != ~0u) { - MaxChunkPosition = Max(MaxChunkPosition, R.Offset + R.Length); + MaxChunkPosition = Max(MaxChunkPosition, R.GetOffset() + R.Length); } } uint32_t BlockCount = gsl::narrow<uint32_t>(MaxChunkPosition / RecordedRequestBlockSize) + 1; @@ -547,6 +577,7 @@ RecordedRequestsSegmentReader::BeginRead(const std::filesystem::path& BasePath, } return m_Entries.size(); } + void RecordedRequestsSegmentReader::EndRead() { @@ -571,9 +602,10 @@ RecordedRequestsSegmentReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutB if (Entry.Offset != ~0u) { // Inline in block file - uint32_t BlockIndex = gsl::narrow<uint32_t>((Entry.Offset + Entry.Length) / RecordedRequestBlockSize); - uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize); - OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length); + const uint64_t EntryOffset = Entry.GetOffset(); + const uint32_t BlockIndex = gsl::narrow<uint32_t>((EntryOffset + Entry.Length) / RecordedRequestBlockSize); + const uint64_t ChunkOffset = EntryOffset - (BlockIndex * RecordedRequestBlockSize); + OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length); return RequestInfo; } @@ -600,6 +632,8 @@ RecordedRequestsWriter::EnsureCurrentSegment() { bool StartNewSegment = false; + TimeSpan SegmentAge(DateTime::NowTicks() - m_CurrentWriter->GetStartTime().GetTicks()); + if (m_CurrentWriter->GetRequestCount() >= SegmentRequestCount) { ZEN_DEBUG("starting new RPC recording segment due to request count >= {}", SegmentRequestCount); @@ -615,6 +649,12 @@ RecordedRequestsWriter::EnsureCurrentSegment() ZEN_DEBUG("starting new RPC recording segment due to footprint >= {} bytes", SegmentByteThreshold); StartNewSegment = true; } + else if (SegmentAge >= SegmentTimeThreshold) + { + ZEN_DEBUG("starting new RPC recording segment due to age >= {}", + NiceTimeSpanMs(SegmentTimeThreshold.GetTicks() / TimeSpan::TicksPerMillisecond)); + StartNewSegment = true; + } if (StartNewSegment) { @@ -627,7 +667,7 @@ RecordedRequestsWriter::EnsureCurrentSegment() const uint64_t SegmentIndex = m_FinishedSegments.size(); m_CurrentWriter = std::make_unique<RecordedRequestsSegmentWriter>(); - m_CurrentWriter->BeginWrite(m_BasePath / fmt::format("segment_{:06}", SegmentIndex), SegmentIndex, m_NextSegmentBaseIndex); + m_CurrentWriter->BeginWrite(m_BasePath / MakeSegmentPath(SegmentIndex), SegmentIndex, m_NextSegmentBaseIndex); } return *m_CurrentWriter; @@ -654,8 +694,22 @@ RecordedRequestsWriter::EndWrite() CommitCurrentSegment(_); - // Emit some metadata alongside the recording + WriteRecordingMetadata(); +} + +uint64_t +RecordedRequestsWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) +{ + RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment(); + + const uint64_t SegmentLocalIndex = Writer.WriteRequest(RequestInfo, RequestBuffer); + + return Writer.GetBaseRequestIndex() + SegmentLocalIndex; +} +void +RecordedRequestsWriter::WriteRecordingMetadata() +{ try { DateTime EndTime = DateTime::Now(); @@ -702,16 +756,6 @@ RecordedRequestsWriter::EndWrite() } } -uint64_t -RecordedRequestsWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) -{ - RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment(); - - const uint64_t SegmentLocalIndex = Writer.WriteRequest(RequestInfo, RequestBuffer); - - return Writer.GetBaseRequestIndex() + SegmentLocalIndex; -} - ////////////////////////////////////////////////////////////////////////// uint64_t @@ -720,26 +764,89 @@ RecordedRequestsReader::BeginRead(const std::filesystem::path& BasePath, bool In m_InMemory = InMemory; m_BasePath = BasePath; - BasicFile InfoFile; - InfoFile.Open(m_BasePath / "rpc_recording_info.zcb", BasicFile::Mode::kRead); - CbObject CbInfo = LoadCompactBinaryObject(InfoFile.ReadAll()); + std::error_code Ec; + BasicFile InfoFile; + InfoFile.Open(m_BasePath / "rpc_recording_info.zcb", BasicFile::Mode::kRead, Ec); + + if (!Ec) + { + try + { + CbObject CbInfo = LoadCompactBinaryObject(InfoFile.ReadAll()); + + uint64_t TotalRequestCount = 0; + uint64_t MaxSegmentIndex = 0; + + for (auto SegmentElement : CbInfo["segments"]) + { + CbObjectView Segment = SegmentElement.AsObjectView(); + + const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment"sv].AsUInt64(), + .BaseRequestIndex = Segment["base_index"sv].AsUInt64(), + .RequestCount = Segment["request_count"sv].AsUInt64(), + .RequestBytes = Segment["request_bytes"sv].AsUInt64(), + .StartTime = Segment["start_time"sv].AsDateTime(), + .EndTime = Segment["end_time"sv].AsDateTime()}); + + TotalRequestCount += Info.RequestCount; + MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex); + } + + m_SegmentReaders.resize(MaxSegmentIndex + 1); + + return TotalRequestCount; + } + catch (std::exception& Ex) + { + ZEN_WARN("could not read metadata file: {}", Ex.what()); + } + } + + ZEN_INFO("recovering segment info for '{}'", BasePath); uint64_t TotalRequestCount = 0; uint64_t MaxSegmentIndex = 0; - for (auto SegmentElement : CbInfo["segments"]) + try + { + for (int SegmentIndex = 0;; ++SegmentIndex) + { + const std::filesystem::path ZcbPath = BasePath / MakeSegmentPath(SegmentIndex) / "rpc_segment_info.zcb"; + FileContents Fc = ReadFile(ZcbPath); + + if (Fc.ErrorCode) + break; + + if (IoBuffer SegmentInfoBuffer = Fc.Flatten()) + { + CbObject Segment = LoadCompactBinaryObject(SegmentInfoBuffer); + + const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment_index"sv].AsUInt64(), + .BaseRequestIndex = 0, + .RequestCount = Segment["entry_count"sv].AsUInt64(), + .RequestBytes = 0, + .StartTime = Segment["time_start"sv].AsDateTime(), + .EndTime = Segment["time_end"sv].AsDateTime()}); + + TotalRequestCount += Info.RequestCount; + MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex); + } + } + } + catch (std::exception&) { - CbObjectView Segment = SegmentElement.AsObjectView(); + } + + std::sort(begin(m_KnownSegments), end(m_KnownSegments), [](const auto& Lhs, const auto& Rhs) { + return Lhs.SegmentIndex < Rhs.SegmentIndex; + }); - const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment"sv].AsUInt64(), - .BaseRequestIndex = Segment["base_index"sv].AsUInt64(), - .RequestCount = Segment["request_count"sv].AsUInt64(), - .RequestBytes = Segment["request_bytes"sv].AsUInt64(), - .StartTime = Segment["start_time"sv].AsDateTime(), - .EndTime = Segment["end_time"sv].AsDateTime()}); + uint64_t SegmentRequestOffset = 0; - TotalRequestCount += Info.RequestCount; - MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex); + for (SegmentInfo& Info : m_KnownSegments) + { + Info.BaseRequestIndex = SegmentRequestOffset; + SegmentRequestOffset += Info.RequestCount; } m_SegmentReaders.resize(MaxSegmentIndex + 1); @@ -776,7 +883,7 @@ RecordedRequestsReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) if (!SegmentReaderPtr) { RecordedRequestsSegmentReader* NewSegment = new RecordedRequestsSegmentReader; - NewSegment->BeginRead(m_BasePath / fmt::format("segment_{:06}", SegmentIndex), m_InMemory); + NewSegment->BeginRead(m_BasePath / MakeSegmentPath(SegmentIndex), m_InMemory); SegmentReaderPtr.reset(NewSegment); } @@ -806,7 +913,6 @@ public: DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); } virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); } -private: virtual uint64_t RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override { return m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer); @@ -814,6 +920,7 @@ private: virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {} virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {} +private: RecordedRequestsWriter m_RecordedRequests; }; @@ -822,23 +929,41 @@ class DiskRequestReplayer : public IRpcRequestReplayer public: DiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory) { - m_RequestCount = m_RequestBuffer.BeginRead(BasePath, InMemory); + m_RequestCount = m_RequestReader.BeginRead(BasePath, InMemory); } - virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); } + virtual ~DiskRequestReplayer() { m_RequestReader.EndRead(); } - static bool IsCompatible(const std::filesystem::path& BasePath) { return std::filesystem::exists(BasePath / "rpc_recording_info.zcb"); } + static bool IsCompatible(const std::filesystem::path& BasePath) + { + if (std::filesystem::exists(BasePath / "rpc_recording_info.zcb")) + { + return true; + } + + const std::filesystem::path SegmentZero = BasePath / MakeSegmentPath(0); + + if (std::filesystem::exists(SegmentZero / "rpc_segment_info.zcb") && std::filesystem::exists(SegmentZero / "index.bin")) + { + // top-level metadata is missing, possibly because of premature exit + // on the recording side + + return true; + } + + return false; + } -private: virtual uint64_t GetRequestCount() const override { return m_RequestCount; } virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override { - return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer); + return m_RequestReader.ReadRequest(RequestIndex, OutBuffer); } virtual ZenContentType GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; } +private: std::uint64_t m_RequestCount; - RecordedRequestsReader m_RequestBuffer; + RecordedRequestsReader m_RequestReader; }; } // namespace zen::cache::v2 @@ -866,4 +991,66 @@ MakeDiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory) } } +#if ZEN_WITH_TESTS + +void +rpcrecord_forcelink() +{ +} + +TEST_SUITE_BEGIN("rpc.recording"); + +TEST_CASE("rpc.record") +{ + ScopedTemporaryDirectory TempDir; + auto Path = TempDir.Path(); + + const Oid SessionId = GetSessionId(); + + using namespace std::literals; + + { + cache::v2::DiskRequestRecorder Recorder{Path}; + + for (int i = 0; i < 1000; ++i) + { + RecordedRequestInfo RequestInfo{.ContentType = ZenContentType::kCbObject, + .AcceptType = ZenContentType::kCbObject, + .SessionId = SessionId}; + + CbObjectWriter RequestPayload; + RequestPayload << "test"sv << true; + RequestPayload << "index"sv << i; + CbObject Req = RequestPayload.Save(); + IoBuffer RequestBuffer = Req.GetBuffer().AsIoBuffer(); + + const uint64_t Index = Recorder.RecordRequest(RequestInfo, RequestBuffer); + + CHECK(Index == i); + } + } + + { + cache::v2::DiskRequestReplayer Replayer{Path, false}; + + for (int i = 0; i < 1000; ++i) + { + IoBuffer RequestBuffer; + RecordedRequestInfo RequestInfo = Replayer.GetRequest(i, RequestBuffer); + + CHECK(RequestInfo.AcceptType == ZenContentType::kCbObject); + CHECK(RequestInfo.ContentType == ZenContentType::kCbObject); + CHECK(RequestInfo.SessionId == SessionId); + + CbObject Req = LoadCompactBinaryObject(RequestBuffer); + CHECK_EQ(Req["index"sv].AsInt32(), i); + CHECK_EQ(Req["test"sv].AsBool(), true); + } + } +} + +TEST_SUITE_END(); + +#endif + } // namespace zen::cache diff --git a/src/zenutil/include/zenutil/cache/rpcrecording.h b/src/zenutil/include/zenutil/cache/rpcrecording.h index fd5df26ad..ab9b92dd3 100644 --- a/src/zenutil/include/zenutil/cache/rpcrecording.h +++ b/src/zenutil/include/zenutil/cache/rpcrecording.h @@ -4,6 +4,9 @@ #include <zencore/compositebuffer.h> #include <zencore/iobuffer.h> +#include <zencore/uid.h> + +#include <compare> namespace zen::cache { @@ -13,6 +16,7 @@ struct RecordedRequestInfo ZenContentType AcceptType; Oid SessionId; + inline std::strong_ordering operator<=>(const RecordedRequestInfo& Rhs) const = default; static const RecordedRequestInfo NullRequest; }; @@ -37,4 +41,6 @@ public: std::unique_ptr<cache::IRpcRequestRecorder> MakeDiskRequestRecorder(const std::filesystem::path& BasePath); std::unique_ptr<cache::IRpcRequestReplayer> MakeDiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory); +void rpcrecord_forcelink(); + } // namespace zen::cache diff --git a/src/zenutil/logging.cpp b/src/zenutil/logging.cpp index fedfdc7e8..64230ea81 100644 --- a/src/zenutil/logging.cpp +++ b/src/zenutil/logging.cpp @@ -23,6 +23,7 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { +static bool g_IsLoggingInitialized; spdlog::sink_ptr g_FileSink; spdlog::sink_ptr @@ -83,6 +84,14 @@ BeginInitializeLogging(const LoggingOptions& LogOptions) /* max size */ 128 * 1024 * 1024, /* max files */ 16, /* rotate on open */ true); + if (LogOptions.AbsLogFile.extension() == ".json") + { + FileSink->set_formatter(std::make_unique<logging::json_formatter>(LogOptions.LogId)); + } + else + { + FileSink->set_formatter(std::make_unique<logging::full_formatter>(LogOptions.LogId)); // this will have a date prefix + } } std::set_terminate([]() { ZEN_CRITICAL("Program exited abnormally via std::terminate()"); }); @@ -173,31 +182,25 @@ FinishInitializeLogging(const LoggingOptions& LogOptions) spdlog::set_formatter( std::make_unique<logging::full_formatter>(LogOptions.LogId, std::chrono::system_clock::now())); // default to duration prefix - if (g_FileSink) - { - if (LogOptions.AbsLogFile.extension() == ".json") - { - g_FileSink->set_formatter(std::make_unique<logging::json_formatter>(LogOptions.LogId)); - } - else - { - g_FileSink->set_formatter(std::make_unique<logging::full_formatter>(LogOptions.LogId)); // this will have a date prefix - } - } - const std::string StartLogTime = zen::DateTime::Now().ToIso8601(); spdlog::apply_all([&](auto Logger) { Logger->info("log starting at {}", StartLogTime); }); + + g_IsLoggingInitialized = true; } void ShutdownLogging() { - g_FileSink.reset(); + if (g_IsLoggingInitialized) + { + auto DefaultLogger = zen::logging::Default(); + ZEN_LOG_INFO(DefaultLogger, "log ending at {}", zen::DateTime::Now().ToIso8601()); + } - auto DefaultLogger = zen::logging::Default(); - ZEN_LOG_INFO(DefaultLogger, "log ending at {}", zen::DateTime::Now().ToIso8601()); zen::logging::ShutdownLogging(); + + g_FileSink.reset(); } } // namespace zen diff --git a/src/zenutil/zenutil.cpp b/src/zenutil/zenutil.cpp index 00db5a25b..eba3613f1 100644 --- a/src/zenutil/zenutil.cpp +++ b/src/zenutil/zenutil.cpp @@ -6,6 +6,7 @@ # include <zenutil/basicfile.h> # include <zenutil/process.h> +# include <zenutil/cache/rpcrecording.h> namespace zen { @@ -14,6 +15,7 @@ zenutil_forcelinktests() { basicfile_forcelink(); process_forcelink(); + cache::rpcrecord_forcelink(); } } // namespace zen |