aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-12-19 12:06:13 +0100
committerGitHub <[email protected]>2023-12-19 12:06:13 +0100
commit519d942d809e740a3b1fe5a1f6a57a4cfe43408b (patch)
tree9b3c084e21bb7fd5e6bb3335e890647062d0703b /src/zenutil
parentadded mimalloc_hooks (diff)
parentensure we can build without trace (#619) (diff)
downloadzen-273-integrated-memory-tracking.tar.xz
zen-273-integrated-memory-tracking.zip
Merge branch 'main' into 273-integrated-memory-tracking273-integrated-memory-tracking
Diffstat (limited to 'src/zenutil')
-rw-r--r--src/zenutil/basicfile.cpp32
-rw-r--r--src/zenutil/cache/rpcrecording.cpp341
-rw-r--r--src/zenutil/include/zenutil/cache/rpcrecording.h6
-rw-r--r--src/zenutil/logging.cpp33
-rw-r--r--src/zenutil/zenutil.cpp2
5 files changed, 313 insertions, 101 deletions
diff --git a/src/zenutil/basicfile.cpp b/src/zenutil/basicfile.cpp
index 173b22449..024b1e5bf 100644
--- a/src/zenutil/basicfile.cpp
+++ b/src/zenutil/basicfile.cpp
@@ -241,7 +241,8 @@ BasicFile::Read(void* Data, uint64_t BytesToRead, uint64_t FileOffset)
if (!Success)
{
- ThrowLastError(fmt::format("Failed to read from file '{}'", zen::PathFromHandle(m_FileHandle)));
+ std::error_code Dummy;
+ ThrowLastError(fmt::format("Failed to read from file '{}'", zen::PathFromHandle(m_FileHandle, Dummy)));
}
BytesToRead -= NumberOfBytesToRead;
@@ -374,7 +375,8 @@ BasicFile::Write(const void* Data, uint64_t Size, uint64_t Offset)
if (Ec)
{
- throw std::system_error(Ec, fmt::format("Failed to write to file '{}'", zen::PathFromHandle(m_FileHandle)));
+ std::error_code Dummy;
+ throw std::system_error(Ec, fmt::format("Failed to write to file '{}'", zen::PathFromHandle(m_FileHandle, Dummy)));
}
}
@@ -426,7 +428,8 @@ BasicFile::FileSize()
int Error = zen::GetLastError();
if (Error)
{
- ThrowSystemError(Error, fmt::format("Failed to get file size from file '{}'", PathFromHandle(m_FileHandle)));
+ std::error_code Dummy;
+ ThrowSystemError(Error, fmt::format("Failed to get file size from file '{}'", PathFromHandle(m_FileHandle, Dummy)));
}
}
return uint64_t(liFileSize.QuadPart);
@@ -436,7 +439,8 @@ BasicFile::FileSize()
struct stat Stat;
if (fstat(Fd, &Stat) == -1)
{
- ThrowSystemError(GetLastError(), fmt::format("Failed to get file size from file '{}'", PathFromHandle(m_FileHandle)));
+ std::error_code Dummy;
+ ThrowSystemError(GetLastError(), fmt::format("Failed to get file size from file '{}'", PathFromHandle(m_FileHandle, Dummy)));
}
return uint64_t(Stat.st_size);
#endif
@@ -483,7 +487,9 @@ BasicFile::SetFileSize(uint64_t FileSize)
int Error = zen::GetLastError();
if (Error)
{
- ThrowSystemError(Error, fmt::format("Failed to set file pointer to {} for file {}", FileSize, PathFromHandle(m_FileHandle)));
+ std::error_code Dummy;
+ ThrowSystemError(Error,
+ fmt::format("Failed to set file pointer to {} for file {}", FileSize, PathFromHandle(m_FileHandle, Dummy)));
}
}
OK = ::SetEndOfFile(m_FileHandle);
@@ -492,7 +498,9 @@ BasicFile::SetFileSize(uint64_t FileSize)
int Error = zen::GetLastError();
if (Error)
{
- ThrowSystemError(Error, fmt::format("Failed to set end of file to {} for file {}", FileSize, PathFromHandle(m_FileHandle)));
+ std::error_code Dummy;
+ ThrowSystemError(Error,
+ fmt::format("Failed to set end of file to {} for file {}", FileSize, PathFromHandle(m_FileHandle, Dummy)));
}
}
#elif ZEN_PLATFORM_MAC
@@ -502,7 +510,9 @@ BasicFile::SetFileSize(uint64_t FileSize)
int Error = zen::GetLastError();
if (Error)
{
- ThrowSystemError(Error, fmt::format("Failed to set truncate file to {} for file {}", FileSize, PathFromHandle(m_FileHandle)));
+ std::error_code Dummy;
+ ThrowSystemError(Error,
+ fmt::format("Failed to set truncate file to {} for file {}", FileSize, PathFromHandle(m_FileHandle, Dummy)));
}
}
#else
@@ -512,7 +522,9 @@ BasicFile::SetFileSize(uint64_t FileSize)
int Error = zen::GetLastError();
if (Error)
{
- ThrowSystemError(Error, fmt::format("Failed to set truncate file to {} for file {}", FileSize, PathFromHandle(m_FileHandle)));
+ std::error_code Dummy;
+ ThrowSystemError(Error,
+ fmt::format("Failed to set truncate file to {} for file {}", FileSize, PathFromHandle(m_FileHandle, Dummy)));
}
}
if (FileSize > 0)
@@ -520,7 +532,9 @@ BasicFile::SetFileSize(uint64_t FileSize)
int Error = posix_fallocate64(Fd, 0, (off64_t)FileSize);
if (Error)
{
- ThrowSystemError(Error, fmt::format("Failed to allocate space of {} for file {}", FileSize, PathFromHandle(m_FileHandle)));
+ std::error_code Dummy;
+ ThrowSystemError(Error,
+ fmt::format("Failed to allocate space of {} for file {}", FileSize, PathFromHandle(m_FileHandle, Dummy)));
}
}
#endif
diff --git a/src/zenutil/cache/rpcrecording.cpp b/src/zenutil/cache/rpcrecording.cpp
index 054ac0e56..b8f9d65ef 100644
--- a/src/zenutil/cache/rpcrecording.cpp
+++ b/src/zenutil/cache/rpcrecording.cpp
@@ -2,8 +2,12 @@
#include <zencore/compactbinarybuilder.h>
#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
#include <zencore/logging.h>
+#include <zencore/session.h>
#include <zencore/system.h>
+#include <zencore/testing.h>
+#include <zencore/testutils.h>
#include <zenutil/basicfile.h>
#include <zenutil/cache/rpcrecording.h>
@@ -229,7 +233,6 @@ public:
}
virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); }
-private:
virtual uint64_t GetRequestCount() const override { return m_RequestCount; }
virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override
@@ -238,6 +241,7 @@ private:
}
virtual ZenContentType GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; }
+private:
std::uint64_t m_RequestCount;
RecordedRequestsReader m_RequestBuffer;
};
@@ -256,15 +260,22 @@ struct RecordedRequest
uint32_t Length; // 4 bytes
ZenContentType ContentType; // 1 byte
ZenContentType AcceptType; // 1 byte
- uint8_t Padding; // 1 byte
+ uint8_t OffsetHigh; // 1 byte
uint8_t Padding2; // 1 byte
Oid SessionId; // 12 bytes
+
+ inline uint64_t GetOffset() const { return uint64_t(Offset) + (uint64_t(OffsetHigh) << 32); }
+ inline void SetOffset(uint64_t NewOffset)
+ {
+ Offset = gsl::narrow_cast<uint32_t>(NewOffset & 0xffff'ffff);
+ OffsetHigh = gsl::narrow_cast<uint8_t>(NewOffset >> 32);
+ }
};
static_assert(sizeof(RecordedRequest) == 24);
const uint64_t RecordedRequestBlockSize = 1 * 1024 * 1024 * 1024; // 1GiB
-const uint64_t StandaloneFileSizeThreshold = 1 * 1024 * 1024ull; // 1MiB
+const uint64_t StandaloneFileSizeThreshold = 16 * 1024 * 1024ull; // 16MiB
const uint64_t SegmentRequestCount = 10 * 1000 * 1000;
const uint64_t LooseFileThreshold = 5000; // Somewhat arbitrary, but we try to keep the
// number of files in a directory below this level
@@ -272,6 +283,12 @@ const uint64_t LooseFileThreshold = 5000; // Somewhat arbitrary, but we try
const uint64_t SegmentByteThreshold = 16ull * 1024 * 1024 * 1024;
const TimeSpan SegmentTimeThreshold{/* hours */ 1, /* minutes */ 0, /* seconds */ 0};
+std::string
+MakeSegmentPath(uint64_t SegmentIndex)
+{
+ return fmt::format("segment_{:06}", SegmentIndex);
+}
+
struct RecordedRequestsSegmentWriter
{
RecordedRequestsSegmentWriter() = default;
@@ -291,6 +308,7 @@ struct RecordedRequestsSegmentWriter
return m_RequestCount;
}
+ RwLock::SharedLockScope _(m_Lock);
return m_Entries.size();
}
inline uint64_t GetBaseRequestIndex() const { return m_RequestBaseIndex; }
@@ -321,6 +339,7 @@ struct RecordedRequestsWriter
RecordedRequestsSegmentWriter& EnsureCurrentSegment();
void CommitCurrentSegment(RwLock::ExclusiveLockScope&);
void EndWrite();
+ void WriteRecordingMetadata();
uint64_t WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer);
private:
@@ -434,50 +453,58 @@ uint64_t
RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer)
{
const uint64_t RequestBufferSize = RequestBuffer.GetSize();
+ uint64_t RequestIndex = ~0ull;
- RwLock::ExclusiveLockScope Lock(m_Lock);
- uint64_t RequestIndex = m_Entries.size();
- RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0u,
- .Length = uint32_t(RequestBufferSize & 0xffffFFFFu),
- .ContentType = RequestInfo.ContentType,
- .AcceptType = RequestInfo.AcceptType,
- .Padding = 0,
- .Padding2 = 0,
- .SessionId = RequestInfo.SessionId});
-
- if (Entry.Length < StandaloneFileSizeThreshold)
{
- const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize);
+ RwLock::ExclusiveLockScope Lock(m_Lock);
+ RequestIndex = m_Entries.size();
+ RecordedRequest& Entry = m_Entries.emplace_back(RecordedRequest{.Offset = ~0u,
+ .Length = uint32_t(RequestBufferSize & 0xffffFFFFu),
+ .ContentType = RequestInfo.ContentType,
+ .AcceptType = RequestInfo.AcceptType,
+ .OffsetHigh = 0,
+ .Padding2 = 0,
+ .SessionId = RequestInfo.SessionId});
- if (BlockIndex == m_BlockFiles.size())
+ if (Entry.Length < StandaloneFileSizeThreshold)
{
- std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>());
- NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate);
- m_ChunkOffset = BlockIndex * RecordedRequestBlockSize;
- ++m_FileCount;
- }
+ const uint32_t BlockIndex = gsl::narrow<uint32_t>((m_ChunkOffset + Entry.Length) / RecordedRequestBlockSize);
- ZEN_ASSERT(BlockIndex < m_BlockFiles.size());
- BasicFile* BlockFile = m_BlockFiles[BlockIndex].get();
- ZEN_ASSERT(BlockFile != nullptr);
+ if (BlockIndex == m_BlockFiles.size())
+ {
+ std::unique_ptr<BasicFile>& NewBlockFile = m_BlockFiles.emplace_back(std::make_unique<BasicFile>());
+ NewBlockFile->Open(m_BasePath / fmt::format("chunks{}.bin", BlockIndex), BasicFile::Mode::kTruncate);
+ m_ChunkOffset = BlockIndex * RecordedRequestBlockSize;
+ ++m_FileCount;
+ }
- Entry.Offset = uint32_t(m_ChunkOffset & 0xffffFFFF);
- m_ChunkOffset = RoundUp(m_ChunkOffset + Entry.Length, 1u << 4u);
- Lock.ReleaseNow();
+ ZEN_ASSERT(BlockIndex < m_BlockFiles.size());
+ BasicFile* BlockFile = m_BlockFiles[BlockIndex].get();
+ ZEN_ASSERT(BlockFile != nullptr);
- std::error_code Ec;
- BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), Entry.Offset - BlockIndex * RecordedRequestBlockSize, Ec);
- if (Ec)
- {
- Entry.Length = 0;
- return ~0ull;
- }
+ // Note that this is the overall logical offset, not the offset within a single file
+ const uint64_t ChunkWriteOffset = m_ChunkOffset;
+ m_ChunkOffset = RoundUp(ChunkWriteOffset + Entry.Length, 1u << 4u);
+ Entry.SetOffset(ChunkWriteOffset);
+ Lock.ReleaseNow();
- m_RequestsByteCount.fetch_add(RequestBuffer.GetSize());
+ std::error_code Ec;
+ BlockFile->Write(RequestBuffer.Data(), RequestBuffer.Size(), ChunkWriteOffset - (BlockIndex * RecordedRequestBlockSize), Ec);
+ if (Ec)
+ {
+ // We cannot simply use `Entry` here because the vector may
+ // have been reallocated causing the entry to be in a different
+ // location
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Entries[RequestIndex].Length = 0;
+ return ~0ull;
+ }
- return RequestIndex;
+ m_RequestsByteCount.fetch_add(RequestBuffer.GetSize());
+
+ return RequestIndex;
+ }
}
- Lock.ReleaseNow();
// Write request data to standalone file
@@ -491,7 +518,8 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn
// The exact value of the entry is not important, we will use
// the size of the standalone file regardless when performing
// the read
- Entry.Length = std::numeric_limits<uint32_t>::max();
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Entries[RequestIndex].Length = std::numeric_limits<uint32_t>::max();
}
++m_FileCount;
@@ -501,7 +529,8 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn
if (Ec)
{
- Entry.Length = 0;
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Entries[RequestIndex].Length = 0;
return ~0ull;
}
@@ -511,7 +540,8 @@ RecordedRequestsSegmentWriter::WriteRequest(const RecordedRequestInfo& RequestIn
}
catch (std::exception&)
{
- Entry.Length = 0;
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_Entries[RequestIndex].Length = 0;
return ~0ull;
}
}
@@ -529,7 +559,7 @@ RecordedRequestsSegmentReader::BeginRead(const std::filesystem::path& BasePath,
{
if (R.Offset != ~0u)
{
- MaxChunkPosition = Max(MaxChunkPosition, R.Offset + R.Length);
+ MaxChunkPosition = Max(MaxChunkPosition, R.GetOffset() + R.Length);
}
}
uint32_t BlockCount = gsl::narrow<uint32_t>(MaxChunkPosition / RecordedRequestBlockSize) + 1;
@@ -547,6 +577,7 @@ RecordedRequestsSegmentReader::BeginRead(const std::filesystem::path& BasePath,
}
return m_Entries.size();
}
+
void
RecordedRequestsSegmentReader::EndRead()
{
@@ -571,9 +602,10 @@ RecordedRequestsSegmentReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutB
if (Entry.Offset != ~0u)
{
// Inline in block file
- uint32_t BlockIndex = gsl::narrow<uint32_t>((Entry.Offset + Entry.Length) / RecordedRequestBlockSize);
- uint64_t ChunkOffset = Entry.Offset - (BlockIndex * RecordedRequestBlockSize);
- OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length);
+ const uint64_t EntryOffset = Entry.GetOffset();
+ const uint32_t BlockIndex = gsl::narrow<uint32_t>((EntryOffset + Entry.Length) / RecordedRequestBlockSize);
+ const uint64_t ChunkOffset = EntryOffset - (BlockIndex * RecordedRequestBlockSize);
+ OutBuffer = IoBuffer(m_BlockFiles[BlockIndex], ChunkOffset, Entry.Length);
return RequestInfo;
}
@@ -600,6 +632,8 @@ RecordedRequestsWriter::EnsureCurrentSegment()
{
bool StartNewSegment = false;
+ TimeSpan SegmentAge(DateTime::NowTicks() - m_CurrentWriter->GetStartTime().GetTicks());
+
if (m_CurrentWriter->GetRequestCount() >= SegmentRequestCount)
{
ZEN_DEBUG("starting new RPC recording segment due to request count >= {}", SegmentRequestCount);
@@ -615,6 +649,12 @@ RecordedRequestsWriter::EnsureCurrentSegment()
ZEN_DEBUG("starting new RPC recording segment due to footprint >= {} bytes", SegmentByteThreshold);
StartNewSegment = true;
}
+ else if (SegmentAge >= SegmentTimeThreshold)
+ {
+ ZEN_DEBUG("starting new RPC recording segment due to age >= {}",
+ NiceTimeSpanMs(SegmentTimeThreshold.GetTicks() / TimeSpan::TicksPerMillisecond));
+ StartNewSegment = true;
+ }
if (StartNewSegment)
{
@@ -627,7 +667,7 @@ RecordedRequestsWriter::EnsureCurrentSegment()
const uint64_t SegmentIndex = m_FinishedSegments.size();
m_CurrentWriter = std::make_unique<RecordedRequestsSegmentWriter>();
- m_CurrentWriter->BeginWrite(m_BasePath / fmt::format("segment_{:06}", SegmentIndex), SegmentIndex, m_NextSegmentBaseIndex);
+ m_CurrentWriter->BeginWrite(m_BasePath / MakeSegmentPath(SegmentIndex), SegmentIndex, m_NextSegmentBaseIndex);
}
return *m_CurrentWriter;
@@ -654,8 +694,22 @@ RecordedRequestsWriter::EndWrite()
CommitCurrentSegment(_);
- // Emit some metadata alongside the recording
+ WriteRecordingMetadata();
+}
+
+uint64_t
+RecordedRequestsWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer)
+{
+ RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment();
+
+ const uint64_t SegmentLocalIndex = Writer.WriteRequest(RequestInfo, RequestBuffer);
+
+ return Writer.GetBaseRequestIndex() + SegmentLocalIndex;
+}
+void
+RecordedRequestsWriter::WriteRecordingMetadata()
+{
try
{
DateTime EndTime = DateTime::Now();
@@ -702,16 +756,6 @@ RecordedRequestsWriter::EndWrite()
}
}
-uint64_t
-RecordedRequestsWriter::WriteRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer)
-{
- RecordedRequestsSegmentWriter& Writer = EnsureCurrentSegment();
-
- const uint64_t SegmentLocalIndex = Writer.WriteRequest(RequestInfo, RequestBuffer);
-
- return Writer.GetBaseRequestIndex() + SegmentLocalIndex;
-}
-
//////////////////////////////////////////////////////////////////////////
uint64_t
@@ -720,26 +764,89 @@ RecordedRequestsReader::BeginRead(const std::filesystem::path& BasePath, bool In
m_InMemory = InMemory;
m_BasePath = BasePath;
- BasicFile InfoFile;
- InfoFile.Open(m_BasePath / "rpc_recording_info.zcb", BasicFile::Mode::kRead);
- CbObject CbInfo = LoadCompactBinaryObject(InfoFile.ReadAll());
+ std::error_code Ec;
+ BasicFile InfoFile;
+ InfoFile.Open(m_BasePath / "rpc_recording_info.zcb", BasicFile::Mode::kRead, Ec);
+
+ if (!Ec)
+ {
+ try
+ {
+ CbObject CbInfo = LoadCompactBinaryObject(InfoFile.ReadAll());
+
+ uint64_t TotalRequestCount = 0;
+ uint64_t MaxSegmentIndex = 0;
+
+ for (auto SegmentElement : CbInfo["segments"])
+ {
+ CbObjectView Segment = SegmentElement.AsObjectView();
+
+ const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment"sv].AsUInt64(),
+ .BaseRequestIndex = Segment["base_index"sv].AsUInt64(),
+ .RequestCount = Segment["request_count"sv].AsUInt64(),
+ .RequestBytes = Segment["request_bytes"sv].AsUInt64(),
+ .StartTime = Segment["start_time"sv].AsDateTime(),
+ .EndTime = Segment["end_time"sv].AsDateTime()});
+
+ TotalRequestCount += Info.RequestCount;
+ MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex);
+ }
+
+ m_SegmentReaders.resize(MaxSegmentIndex + 1);
+
+ return TotalRequestCount;
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("could not read metadata file: {}", Ex.what());
+ }
+ }
+
+ ZEN_INFO("recovering segment info for '{}'", BasePath);
uint64_t TotalRequestCount = 0;
uint64_t MaxSegmentIndex = 0;
- for (auto SegmentElement : CbInfo["segments"])
+ try
+ {
+ for (int SegmentIndex = 0;; ++SegmentIndex)
+ {
+ const std::filesystem::path ZcbPath = BasePath / MakeSegmentPath(SegmentIndex) / "rpc_segment_info.zcb";
+ FileContents Fc = ReadFile(ZcbPath);
+
+ if (Fc.ErrorCode)
+ break;
+
+ if (IoBuffer SegmentInfoBuffer = Fc.Flatten())
+ {
+ CbObject Segment = LoadCompactBinaryObject(SegmentInfoBuffer);
+
+ const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment_index"sv].AsUInt64(),
+ .BaseRequestIndex = 0,
+ .RequestCount = Segment["entry_count"sv].AsUInt64(),
+ .RequestBytes = 0,
+ .StartTime = Segment["time_start"sv].AsDateTime(),
+ .EndTime = Segment["time_end"sv].AsDateTime()});
+
+ TotalRequestCount += Info.RequestCount;
+ MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex);
+ }
+ }
+ }
+ catch (std::exception&)
{
- CbObjectView Segment = SegmentElement.AsObjectView();
+ }
+
+ std::sort(begin(m_KnownSegments), end(m_KnownSegments), [](const auto& Lhs, const auto& Rhs) {
+ return Lhs.SegmentIndex < Rhs.SegmentIndex;
+ });
- const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment"sv].AsUInt64(),
- .BaseRequestIndex = Segment["base_index"sv].AsUInt64(),
- .RequestCount = Segment["request_count"sv].AsUInt64(),
- .RequestBytes = Segment["request_bytes"sv].AsUInt64(),
- .StartTime = Segment["start_time"sv].AsDateTime(),
- .EndTime = Segment["end_time"sv].AsDateTime()});
+ uint64_t SegmentRequestOffset = 0;
- TotalRequestCount += Info.RequestCount;
- MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex);
+ for (SegmentInfo& Info : m_KnownSegments)
+ {
+ Info.BaseRequestIndex = SegmentRequestOffset;
+ SegmentRequestOffset += Info.RequestCount;
}
m_SegmentReaders.resize(MaxSegmentIndex + 1);
@@ -776,7 +883,7 @@ RecordedRequestsReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer)
if (!SegmentReaderPtr)
{
RecordedRequestsSegmentReader* NewSegment = new RecordedRequestsSegmentReader;
- NewSegment->BeginRead(m_BasePath / fmt::format("segment_{:06}", SegmentIndex), m_InMemory);
+ NewSegment->BeginRead(m_BasePath / MakeSegmentPath(SegmentIndex), m_InMemory);
SegmentReaderPtr.reset(NewSegment);
}
@@ -806,7 +913,6 @@ public:
DiskRequestRecorder(const std::filesystem::path& BasePath) { m_RecordedRequests.BeginWrite(BasePath); }
virtual ~DiskRequestRecorder() { m_RecordedRequests.EndWrite(); }
-private:
virtual uint64_t RecordRequest(const RecordedRequestInfo& RequestInfo, const IoBuffer& RequestBuffer) override
{
return m_RecordedRequests.WriteRequest(RequestInfo, RequestBuffer);
@@ -814,6 +920,7 @@ private:
virtual void RecordResponse(uint64_t, const ZenContentType, const IoBuffer&) override {}
virtual void RecordResponse(uint64_t, const ZenContentType, const CompositeBuffer&) override {}
+private:
RecordedRequestsWriter m_RecordedRequests;
};
@@ -822,23 +929,41 @@ class DiskRequestReplayer : public IRpcRequestReplayer
public:
DiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory)
{
- m_RequestCount = m_RequestBuffer.BeginRead(BasePath, InMemory);
+ m_RequestCount = m_RequestReader.BeginRead(BasePath, InMemory);
}
- virtual ~DiskRequestReplayer() { m_RequestBuffer.EndRead(); }
+ virtual ~DiskRequestReplayer() { m_RequestReader.EndRead(); }
- static bool IsCompatible(const std::filesystem::path& BasePath) { return std::filesystem::exists(BasePath / "rpc_recording_info.zcb"); }
+ static bool IsCompatible(const std::filesystem::path& BasePath)
+ {
+ if (std::filesystem::exists(BasePath / "rpc_recording_info.zcb"))
+ {
+ return true;
+ }
+
+ const std::filesystem::path SegmentZero = BasePath / MakeSegmentPath(0);
+
+ if (std::filesystem::exists(SegmentZero / "rpc_segment_info.zcb") && std::filesystem::exists(SegmentZero / "index.bin"))
+ {
+ // top-level metadata is missing, possibly because of premature exit
+ // on the recording side
+
+ return true;
+ }
+
+ return false;
+ }
-private:
virtual uint64_t GetRequestCount() const override { return m_RequestCount; }
virtual RecordedRequestInfo GetRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) override
{
- return m_RequestBuffer.ReadRequest(RequestIndex, OutBuffer);
+ return m_RequestReader.ReadRequest(RequestIndex, OutBuffer);
}
virtual ZenContentType GetResponse(uint64_t, IoBuffer&) override { return ZenContentType::kUnknownContentType; }
+private:
std::uint64_t m_RequestCount;
- RecordedRequestsReader m_RequestBuffer;
+ RecordedRequestsReader m_RequestReader;
};
} // namespace zen::cache::v2
@@ -866,4 +991,66 @@ MakeDiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory)
}
}
+#if ZEN_WITH_TESTS
+
+void
+rpcrecord_forcelink()
+{
+}
+
+TEST_SUITE_BEGIN("rpc.recording");
+
+TEST_CASE("rpc.record")
+{
+ ScopedTemporaryDirectory TempDir;
+ auto Path = TempDir.Path();
+
+ const Oid SessionId = GetSessionId();
+
+ using namespace std::literals;
+
+ {
+ cache::v2::DiskRequestRecorder Recorder{Path};
+
+ for (int i = 0; i < 1000; ++i)
+ {
+ RecordedRequestInfo RequestInfo{.ContentType = ZenContentType::kCbObject,
+ .AcceptType = ZenContentType::kCbObject,
+ .SessionId = SessionId};
+
+ CbObjectWriter RequestPayload;
+ RequestPayload << "test"sv << true;
+ RequestPayload << "index"sv << i;
+ CbObject Req = RequestPayload.Save();
+ IoBuffer RequestBuffer = Req.GetBuffer().AsIoBuffer();
+
+ const uint64_t Index = Recorder.RecordRequest(RequestInfo, RequestBuffer);
+
+ CHECK(Index == i);
+ }
+ }
+
+ {
+ cache::v2::DiskRequestReplayer Replayer{Path, false};
+
+ for (int i = 0; i < 1000; ++i)
+ {
+ IoBuffer RequestBuffer;
+ RecordedRequestInfo RequestInfo = Replayer.GetRequest(i, RequestBuffer);
+
+ CHECK(RequestInfo.AcceptType == ZenContentType::kCbObject);
+ CHECK(RequestInfo.ContentType == ZenContentType::kCbObject);
+ CHECK(RequestInfo.SessionId == SessionId);
+
+ CbObject Req = LoadCompactBinaryObject(RequestBuffer);
+ CHECK_EQ(Req["index"sv].AsInt32(), i);
+ CHECK_EQ(Req["test"sv].AsBool(), true);
+ }
+ }
+}
+
+TEST_SUITE_END();
+
+#endif
+
} // namespace zen::cache
diff --git a/src/zenutil/include/zenutil/cache/rpcrecording.h b/src/zenutil/include/zenutil/cache/rpcrecording.h
index fd5df26ad..ab9b92dd3 100644
--- a/src/zenutil/include/zenutil/cache/rpcrecording.h
+++ b/src/zenutil/include/zenutil/cache/rpcrecording.h
@@ -4,6 +4,9 @@
#include <zencore/compositebuffer.h>
#include <zencore/iobuffer.h>
+#include <zencore/uid.h>
+
+#include <compare>
namespace zen::cache {
@@ -13,6 +16,7 @@ struct RecordedRequestInfo
ZenContentType AcceptType;
Oid SessionId;
+ inline std::strong_ordering operator<=>(const RecordedRequestInfo& Rhs) const = default;
static const RecordedRequestInfo NullRequest;
};
@@ -37,4 +41,6 @@ public:
std::unique_ptr<cache::IRpcRequestRecorder> MakeDiskRequestRecorder(const std::filesystem::path& BasePath);
std::unique_ptr<cache::IRpcRequestReplayer> MakeDiskRequestReplayer(const std::filesystem::path& BasePath, bool InMemory);
+void rpcrecord_forcelink();
+
} // namespace zen::cache
diff --git a/src/zenutil/logging.cpp b/src/zenutil/logging.cpp
index fedfdc7e8..64230ea81 100644
--- a/src/zenutil/logging.cpp
+++ b/src/zenutil/logging.cpp
@@ -23,6 +23,7 @@ ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
+static bool g_IsLoggingInitialized;
spdlog::sink_ptr g_FileSink;
spdlog::sink_ptr
@@ -83,6 +84,14 @@ BeginInitializeLogging(const LoggingOptions& LogOptions)
/* max size */ 128 * 1024 * 1024,
/* max files */ 16,
/* rotate on open */ true);
+ if (LogOptions.AbsLogFile.extension() == ".json")
+ {
+ FileSink->set_formatter(std::make_unique<logging::json_formatter>(LogOptions.LogId));
+ }
+ else
+ {
+ FileSink->set_formatter(std::make_unique<logging::full_formatter>(LogOptions.LogId)); // this will have a date prefix
+ }
}
std::set_terminate([]() { ZEN_CRITICAL("Program exited abnormally via std::terminate()"); });
@@ -173,31 +182,25 @@ FinishInitializeLogging(const LoggingOptions& LogOptions)
spdlog::set_formatter(
std::make_unique<logging::full_formatter>(LogOptions.LogId, std::chrono::system_clock::now())); // default to duration prefix
- if (g_FileSink)
- {
- if (LogOptions.AbsLogFile.extension() == ".json")
- {
- g_FileSink->set_formatter(std::make_unique<logging::json_formatter>(LogOptions.LogId));
- }
- else
- {
- g_FileSink->set_formatter(std::make_unique<logging::full_formatter>(LogOptions.LogId)); // this will have a date prefix
- }
- }
-
const std::string StartLogTime = zen::DateTime::Now().ToIso8601();
spdlog::apply_all([&](auto Logger) { Logger->info("log starting at {}", StartLogTime); });
+
+ g_IsLoggingInitialized = true;
}
void
ShutdownLogging()
{
- g_FileSink.reset();
+ if (g_IsLoggingInitialized)
+ {
+ auto DefaultLogger = zen::logging::Default();
+ ZEN_LOG_INFO(DefaultLogger, "log ending at {}", zen::DateTime::Now().ToIso8601());
+ }
- auto DefaultLogger = zen::logging::Default();
- ZEN_LOG_INFO(DefaultLogger, "log ending at {}", zen::DateTime::Now().ToIso8601());
zen::logging::ShutdownLogging();
+
+ g_FileSink.reset();
}
} // namespace zen
diff --git a/src/zenutil/zenutil.cpp b/src/zenutil/zenutil.cpp
index 00db5a25b..eba3613f1 100644
--- a/src/zenutil/zenutil.cpp
+++ b/src/zenutil/zenutil.cpp
@@ -6,6 +6,7 @@
# include <zenutil/basicfile.h>
# include <zenutil/process.h>
+# include <zenutil/cache/rpcrecording.h>
namespace zen {
@@ -14,6 +15,7 @@ zenutil_forcelinktests()
{
basicfile_forcelink();
process_forcelink();
+ cache::rpcrecord_forcelink();
}
} // namespace zen