aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2025-06-27 14:57:14 +0200
committerStefan Boberg <[email protected]>2025-06-27 14:57:14 +0200
commitd5d44045e969edade3c2c3d2fa1b115ac9f03b21 (patch)
tree5d08cf26cbd0a9056d064a339539a2749fdf84fc /src
parentminor: output schema sketch (diff)
downloadzen-d5d44045e969edade3c2c3d2fa1b115ac9f03b21.tar.xz
zen-d5d44045e969edade3c2c3d2fa1b115ac9f03b21.zip
fleshed out RPC recording analysis command, now parses every request
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/rpcreplay_cmd.cpp167
-rw-r--r--src/zen/cmds/rpcreplay_cmd.h2
-rw-r--r--src/zenutil/cache/rpcrecording.cpp41
-rw-r--r--src/zenutil/include/zenutil/cache/rpcrecording.h5
4 files changed, 199 insertions, 16 deletions
diff --git a/src/zen/cmds/rpcreplay_cmd.cpp b/src/zen/cmds/rpcreplay_cmd.cpp
index a44d8defb..b45f3f50a 100644
--- a/src/zen/cmds/rpcreplay_cmd.cpp
+++ b/src/zen/cmds/rpcreplay_cmd.cpp
@@ -14,6 +14,7 @@
#include <zencore/workthreadpool.h>
#include <zenhttp/httpcommon.h>
#include <zenhttp/packageformat.h>
+#include <zenutil/cache/cacherequests.h>
#include <zenutil/cache/rpcrecording.h>
ZEN_THIRD_PARTY_INCLUDES_START
@@ -488,6 +489,13 @@ RpcReplayAnalyzeCommand::RpcReplayAnalyzeCommand()
{
m_Options.add_options()("h,help", "Print help");
m_Options.add_option("", "p", "path", "Recording file path", cxxopts::value(m_RecordingPath), "<path>");
+ m_Options.add_option("",
+ "",
+ "offset",
+ "Offset into request sequence where analysis should start",
+ cxxopts::value(m_Offset)->default_value("0"),
+ "<offset>");
+ m_Options.add_option("", "", "limit", "Max number of requests to analyze", cxxopts::value(m_Limit)->default_value("0"), "<limit>");
m_Options.add_option("", "", "dry", "Do a dry run", cxxopts::value(m_DryRun), "<enable>");
m_Options.parse_positional("path");
@@ -510,7 +518,162 @@ RpcReplayAnalyzeCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char*
std::unique_ptr<cache::IRpcRequestAnalyzer> Replayer = cache::MakeDiskRequestAnalyzer(m_RecordingPath);
uint64_t EntryCount = Replayer->GetRequestCount();
- ZEN_CONSOLE("entry count = {}", EntryCount);
+ ZEN_CONSOLE("Recording entry count = {}", EntryCount);
+
+ const uint64_t RangeStart = m_Offset;
+ const uint64_t RangeEnd = m_Limit ? (m_Offset + m_Limit) : EntryCount;
+
+ uint64_t RequestCount = 0;
+ uint64_t BadRequestCount = 0;
+
+ const uint64_t RangeCount = RangeEnd - RangeStart;
+
+ std::locale Locale("en_US.UTF-8");
+
+ ZEN_CONSOLE("Iterating over [#{},#{}) entry count = {}", RangeStart, RangeEnd, RangeEnd - RangeStart);
+
+ Replayer->IterateRange(RangeStart, RangeEnd, [&](const zen::cache::RecordedRequestInfo RequestInfo, const IoBuffer& Payload) {
+ ++RequestCount;
+
+ if (RequestCount % 1000 == 0)
+ {
+ ZEN_CONSOLE("{}", fmt::format(Locale, "... {:L} / {:L} ({}%)", RequestCount, RangeCount, 100 * RequestCount / RangeCount));
+ }
+
+ if (RequestInfo == zen::cache::RecordedRequestInfo::NullRequest)
+ {
+ return;
+ }
+
+ CbPackage RequestPackage;
+ CbObject Request;
+
+ switch (RequestInfo.ContentType)
+ {
+ case ZenContentType::kCbPackage:
+ if (ParsePackageMessageWithLegacyFallback(Payload, RequestPackage))
+ {
+ Request = RequestPackage.GetObject();
+ }
+ break;
+
+ case ZenContentType::kCbObject:
+ Request = LoadCompactBinaryObject(Payload);
+ break;
+ }
+
+ if (!Request)
+ {
+ ++BadRequestCount;
+ return;
+ }
+
+ const std::string_view MethodName = Request["Method"sv].AsString();
+
+ switch (HashStringDjb2(MethodName))
+ {
+ case HashStringDjb2("GetCacheValues"sv):
+ {
+ cacherequests::GetCacheValuesRequest Gcv;
+ if (!Gcv.Parse(Request))
+ {
+ ++BadRequestCount;
+ return;
+ }
+
+ const auto BatchSize = Gcv.Requests.size();
+
+ for (const auto& Req : Gcv.Requests)
+ {
+ ZEN_CONSOLE("[{}] GCV {}/{}", BatchSize, Req.Key.Bucket, Req.Key.Hash);
+ }
+ }
+ break;
+
+ case HashStringDjb2("GetCacheRecords"sv):
+ {
+ cacherequests::GetCacheRecordsRequest Gcr;
+ if (!Gcr.Parse(Request))
+ {
+ ++BadRequestCount;
+ return;
+ }
+
+ const auto BatchSize = Gcr.Requests.size();
+
+ for (const auto& Req : Gcr.Requests)
+ {
+ ZEN_CONSOLE("[{}] GCR {}/{}", BatchSize, Req.Key.Bucket, Req.Key.Hash);
+ }
+ }
+ break;
+
+ case HashStringDjb2("GetCacheChunks"sv):
+ {
+ cacherequests::GetCacheChunksRequest Gcc;
+ if (!Gcc.Parse(Request))
+ {
+ ++BadRequestCount;
+ return;
+ }
+
+ const auto BatchSize = Gcc.Requests.size();
+
+ for (const auto& Req : Gcc.Requests)
+ {
+ ZEN_CONSOLE("[{}] GCC {}/{}", BatchSize, Req.Key.Bucket, Req.Key.Hash);
+ }
+ }
+ break;
+
+ case HashStringDjb2("PutCacheRecords"sv):
+ {
+ cacherequests::PutCacheRecordsRequest Pcr;
+ if (!Pcr.Parse(RequestPackage))
+ {
+ ++BadRequestCount;
+ return;
+ }
+ ZEN_CONSOLE("request size: {}, package: {}", Request.GetSize(), Payload.GetSize());
+
+ const auto BatchSize = Pcr.Requests.size();
+
+ for (const auto& Req : Pcr.Requests)
+ {
+ ZEN_CONSOLE("[{}] PCR {}/{}", BatchSize, Req.Key.Bucket, Req.Key.Hash);
+ }
+ }
+ break;
+
+ case HashStringDjb2("PutCacheValues"sv):
+ {
+ cacherequests::PutCacheValuesRequest Pcv;
+ if (!Pcv.Parse(RequestPackage))
+ {
+ ++BadRequestCount;
+ return;
+ }
+ ZEN_CONSOLE("request size: {}, package: {}", Request.GetSize(), Payload.GetSize());
+
+ const auto BatchSize = Pcv.Requests.size();
+
+ for (const auto& Req : Pcv.Requests)
+ {
+ ZEN_CONSOLE("[{}] PCV {}/{}", BatchSize, Req.Key.Bucket, Req.Key.Hash);
+ }
+ }
+ break;
+
+ default:
+ ++BadRequestCount;
+ return;
+ }
+
+ if (RequestPackage)
+ {
+ ZEN_CONSOLE("{} : {}", RequestPackage.GetAttachments().size(), RequestPackage.GetObjectHash());
+ }
+ });
/////////////////////////////////////////////////////////////////////////
//
@@ -526,7 +689,7 @@ RpcReplayAnalyzeCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char*
// keys
//
- // <key_id>,<bucket>,<key>
+ // <key_id>,<bucket_id>,<key>
// sessions
//
diff --git a/src/zen/cmds/rpcreplay_cmd.h b/src/zen/cmds/rpcreplay_cmd.h
index 35ff62463..64bfa47bc 100644
--- a/src/zen/cmds/rpcreplay_cmd.h
+++ b/src/zen/cmds/rpcreplay_cmd.h
@@ -76,6 +76,8 @@ private:
cxxopts::Options m_Options{"rpc-record-analyze", "Analyze a previously recorded session of cache rpc requests"};
std::string m_RecordingPath;
bool m_DryRun = false;
+ uint64_t m_Offset = 0;
+ uint64_t m_Limit = 0;
};
} // namespace zen
diff --git a/src/zenutil/cache/rpcrecording.cpp b/src/zenutil/cache/rpcrecording.cpp
index f7a733597..0c788ddf3 100644
--- a/src/zenutil/cache/rpcrecording.cpp
+++ b/src/zenutil/cache/rpcrecording.cpp
@@ -201,12 +201,13 @@ private:
struct SegmentInfo
{
- uint64_t SegmentIndex;
- uint64_t BaseRequestIndex;
- uint64_t RequestCount;
- uint64_t RequestBytes;
- DateTime StartTime{0};
- DateTime EndTime{0};
+ uint64_t SegmentIndex;
+ uint64_t BaseRequestIndex;
+ uint64_t RequestCount;
+ uint64_t RequestBytes;
+ DateTime StartTime{0};
+ DateTime EndTime{0};
+ std::filesystem::path BasePath;
};
bool m_InMemory = false;
@@ -787,7 +788,8 @@ RecordedRequestsReader::BeginRead(const std::filesystem::path& BasePath, bool In
.RequestCount = Segment["entry_count"sv].AsUInt64(),
.RequestBytes = 0,
.StartTime = Segment["time_start"sv].AsDateTime(),
- .EndTime = Segment["time_end"sv].AsDateTime()});
+ .EndTime = Segment["time_end"sv].AsDateTime(),
+ .BasePath = ZcbPath.parent_path()});
TotalRequestCount += Info.RequestCount;
MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex);
@@ -833,11 +835,11 @@ RecordedRequestsReader::EndRead()
RecordedRequestInfo
RecordedRequestsReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const
{
- auto EnsureSegment = [&](uint64_t SegmentIndex) -> const RecordedRequestsSegmentReader& {
+ auto EnsureSegment = [&](const SegmentInfo& Segment) -> const RecordedRequestsSegmentReader& {
{
RwLock::SharedLockScope _(m_SegmentLock);
- if (auto SegmentReaderPtr = m_SegmentReaders[SegmentIndex].get())
+ if (auto SegmentReaderPtr = m_SegmentReaders[Segment.SegmentIndex].get())
{
return *SegmentReaderPtr;
}
@@ -845,12 +847,12 @@ RecordedRequestsReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer)
RwLock::ExclusiveLockScope _(m_SegmentLock);
- auto& SegmentReaderPtr = m_SegmentReaders[SegmentIndex];
+ auto& SegmentReaderPtr = m_SegmentReaders[Segment.SegmentIndex];
if (!SegmentReaderPtr)
{
RecordedRequestsSegmentReader* NewSegment = new RecordedRequestsSegmentReader;
- NewSegment->BeginRead(m_BasePath / MakeSegmentPath(SegmentIndex), m_InMemory);
+ NewSegment->BeginRead(Segment.BasePath, m_InMemory);
SegmentReaderPtr.reset(NewSegment);
}
@@ -859,11 +861,11 @@ RecordedRequestsReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer)
// This is pretty slow for large replays, should have an index lookup for this instead
- for (auto& Segment : m_KnownSegments)
+ for (const SegmentInfo& Segment : m_KnownSegments)
{
if (Segment.RequestCount > RequestIndex)
{
- return EnsureSegment(Segment.SegmentIndex).ReadRequest(RequestIndex, OutBuffer);
+ return EnsureSegment(Segment).ReadRequest(RequestIndex, OutBuffer);
}
else
{
@@ -899,6 +901,19 @@ public:
virtual uint64_t GetRequestCount() const override { return m_RequestCount; }
+ virtual void IterateRange(uint64_t RequestIndex,
+ uint64_t EndRequestIndex,
+ std::function<void(const RecordedRequestInfo& Info, const IoBuffer& RequestBuffer)>&& Func) override
+ {
+ while (RequestIndex < EndRequestIndex)
+ {
+ IoBuffer Buffer;
+ RecordedRequestInfo Info = m_RequestReader.ReadRequest(RequestIndex, /* out */ Buffer);
+ Func(Info, Buffer);
+ ++RequestIndex;
+ };
+ }
+
private:
std::uint64_t m_RequestCount;
RecordedRequestsReader m_RequestReader;
diff --git a/src/zenutil/include/zenutil/cache/rpcrecording.h b/src/zenutil/include/zenutil/cache/rpcrecording.h
index c63024141..9db3b46f9 100644
--- a/src/zenutil/include/zenutil/cache/rpcrecording.h
+++ b/src/zenutil/include/zenutil/cache/rpcrecording.h
@@ -39,7 +39,10 @@ class IRpcRequestAnalyzer
{
public:
virtual ~IRpcRequestAnalyzer() {}
- virtual uint64_t GetRequestCount() const = 0;
+ virtual uint64_t GetRequestCount() const = 0;
+ virtual void IterateRange(uint64_t RequestIndex,
+ uint64_t EndRequestIndex,
+ std::function<void(const RecordedRequestInfo& Info, const IoBuffer& RequestBuffer)>&& Func) = 0;
};
std::unique_ptr<cache::IRpcRequestRecorder> MakeDiskRequestRecorder(const std::filesystem::path& BasePath);