diff options
| author | Stefan Boberg <[email protected]> | 2025-06-27 14:57:14 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2025-06-27 14:57:14 +0200 |
| commit | d5d44045e969edade3c2c3d2fa1b115ac9f03b21 (patch) | |
| tree | 5d08cf26cbd0a9056d064a339539a2749fdf84fc /src | |
| parent | minor: output schema sketch (diff) | |
| download | zen-d5d44045e969edade3c2c3d2fa1b115ac9f03b21.tar.xz zen-d5d44045e969edade3c2c3d2fa1b115ac9f03b21.zip | |
fleshed out RPC recording analysis command, now parses every request
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/rpcreplay_cmd.cpp | 167 | ||||
| -rw-r--r-- | src/zen/cmds/rpcreplay_cmd.h | 2 | ||||
| -rw-r--r-- | src/zenutil/cache/rpcrecording.cpp | 41 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/cache/rpcrecording.h | 5 |
4 files changed, 199 insertions, 16 deletions
diff --git a/src/zen/cmds/rpcreplay_cmd.cpp b/src/zen/cmds/rpcreplay_cmd.cpp index a44d8defb..b45f3f50a 100644 --- a/src/zen/cmds/rpcreplay_cmd.cpp +++ b/src/zen/cmds/rpcreplay_cmd.cpp @@ -14,6 +14,7 @@ #include <zencore/workthreadpool.h> #include <zenhttp/httpcommon.h> #include <zenhttp/packageformat.h> +#include <zenutil/cache/cacherequests.h> #include <zenutil/cache/rpcrecording.h> ZEN_THIRD_PARTY_INCLUDES_START @@ -488,6 +489,13 @@ RpcReplayAnalyzeCommand::RpcReplayAnalyzeCommand() { m_Options.add_options()("h,help", "Print help"); m_Options.add_option("", "p", "path", "Recording file path", cxxopts::value(m_RecordingPath), "<path>"); + m_Options.add_option("", + "", + "offset", + "Offset into request sequence where analysis should start", + cxxopts::value(m_Offset)->default_value("0"), + "<offset>"); + m_Options.add_option("", "", "limit", "Max number of requests to analyze", cxxopts::value(m_Limit)->default_value("0"), "<limit>"); m_Options.add_option("", "", "dry", "Do a dry run", cxxopts::value(m_DryRun), "<enable>"); m_Options.parse_positional("path"); @@ -510,7 +518,162 @@ RpcReplayAnalyzeCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char* std::unique_ptr<cache::IRpcRequestAnalyzer> Replayer = cache::MakeDiskRequestAnalyzer(m_RecordingPath); uint64_t EntryCount = Replayer->GetRequestCount(); - ZEN_CONSOLE("entry count = {}", EntryCount); + ZEN_CONSOLE("Recording entry count = {}", EntryCount); + + const uint64_t RangeStart = m_Offset; + const uint64_t RangeEnd = m_Limit ? (m_Offset + m_Limit) : EntryCount; + + uint64_t RequestCount = 0; + uint64_t BadRequestCount = 0; + + const uint64_t RangeCount = RangeEnd - RangeStart; + + std::locale Locale("en_US.UTF-8"); + + ZEN_CONSOLE("Iterating over [#{},#{}) entry count = {}", RangeStart, RangeEnd, RangeEnd - RangeStart); + + Replayer->IterateRange(RangeStart, RangeEnd, [&](const zen::cache::RecordedRequestInfo RequestInfo, const IoBuffer& Payload) { + ++RequestCount; + + if (RequestCount % 1000 == 0) + { + ZEN_CONSOLE("{}", fmt::format(Locale, "... {:L} / {:L} ({}%)", RequestCount, RangeCount, 100 * RequestCount / RangeCount)); + } + + if (RequestInfo == zen::cache::RecordedRequestInfo::NullRequest) + { + return; + } + + CbPackage RequestPackage; + CbObject Request; + + switch (RequestInfo.ContentType) + { + case ZenContentType::kCbPackage: + if (ParsePackageMessageWithLegacyFallback(Payload, RequestPackage)) + { + Request = RequestPackage.GetObject(); + } + break; + + case ZenContentType::kCbObject: + Request = LoadCompactBinaryObject(Payload); + break; + } + + if (!Request) + { + ++BadRequestCount; + return; + } + + const std::string_view MethodName = Request["Method"sv].AsString(); + + switch (HashStringDjb2(MethodName)) + { + case HashStringDjb2("GetCacheValues"sv): + { + cacherequests::GetCacheValuesRequest Gcv; + if (!Gcv.Parse(Request)) + { + ++BadRequestCount; + return; + } + + const auto BatchSize = Gcv.Requests.size(); + + for (const auto& Req : Gcv.Requests) + { + ZEN_CONSOLE("[{}] GCV {}/{}", BatchSize, Req.Key.Bucket, Req.Key.Hash); + } + } + break; + + case HashStringDjb2("GetCacheRecords"sv): + { + cacherequests::GetCacheRecordsRequest Gcr; + if (!Gcr.Parse(Request)) + { + ++BadRequestCount; + return; + } + + const auto BatchSize = Gcr.Requests.size(); + + for (const auto& Req : Gcr.Requests) + { + ZEN_CONSOLE("[{}] GCR {}/{}", BatchSize, Req.Key.Bucket, Req.Key.Hash); + } + } + break; + + case HashStringDjb2("GetCacheChunks"sv): + { + cacherequests::GetCacheChunksRequest Gcc; + if (!Gcc.Parse(Request)) + { + ++BadRequestCount; + return; + } + + const auto BatchSize = Gcc.Requests.size(); + + for (const auto& Req : Gcc.Requests) + { + ZEN_CONSOLE("[{}] GCC {}/{}", BatchSize, Req.Key.Bucket, Req.Key.Hash); + } + } + break; + + case HashStringDjb2("PutCacheRecords"sv): + { + cacherequests::PutCacheRecordsRequest Pcr; + if (!Pcr.Parse(RequestPackage)) + { + ++BadRequestCount; + return; + } + ZEN_CONSOLE("request size: {}, package: {}", Request.GetSize(), Payload.GetSize()); + + const auto BatchSize = Pcr.Requests.size(); + + for (const auto& Req : Pcr.Requests) + { + ZEN_CONSOLE("[{}] PCR {}/{}", BatchSize, Req.Key.Bucket, Req.Key.Hash); + } + } + break; + + case HashStringDjb2("PutCacheValues"sv): + { + cacherequests::PutCacheValuesRequest Pcv; + if (!Pcv.Parse(RequestPackage)) + { + ++BadRequestCount; + return; + } + ZEN_CONSOLE("request size: {}, package: {}", Request.GetSize(), Payload.GetSize()); + + const auto BatchSize = Pcv.Requests.size(); + + for (const auto& Req : Pcv.Requests) + { + ZEN_CONSOLE("[{}] PCV {}/{}", BatchSize, Req.Key.Bucket, Req.Key.Hash); + } + } + break; + + default: + ++BadRequestCount; + return; + } + + if (RequestPackage) + { + ZEN_CONSOLE("{} : {}", RequestPackage.GetAttachments().size(), RequestPackage.GetObjectHash()); + } + }); ///////////////////////////////////////////////////////////////////////// // @@ -526,7 +689,7 @@ RpcReplayAnalyzeCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char* // keys // - // <key_id>,<bucket>,<key> + // <key_id>,<bucket_id>,<key> // sessions // diff --git a/src/zen/cmds/rpcreplay_cmd.h b/src/zen/cmds/rpcreplay_cmd.h index 35ff62463..64bfa47bc 100644 --- a/src/zen/cmds/rpcreplay_cmd.h +++ b/src/zen/cmds/rpcreplay_cmd.h @@ -76,6 +76,8 @@ private: cxxopts::Options m_Options{"rpc-record-analyze", "Analyze a previously recorded session of cache rpc requests"}; std::string m_RecordingPath; bool m_DryRun = false; + uint64_t m_Offset = 0; + uint64_t m_Limit = 0; }; } // namespace zen diff --git a/src/zenutil/cache/rpcrecording.cpp b/src/zenutil/cache/rpcrecording.cpp index f7a733597..0c788ddf3 100644 --- a/src/zenutil/cache/rpcrecording.cpp +++ b/src/zenutil/cache/rpcrecording.cpp @@ -201,12 +201,13 @@ private: struct SegmentInfo { - uint64_t SegmentIndex; - uint64_t BaseRequestIndex; - uint64_t RequestCount; - uint64_t RequestBytes; - DateTime StartTime{0}; - DateTime EndTime{0}; + uint64_t SegmentIndex; + uint64_t BaseRequestIndex; + uint64_t RequestCount; + uint64_t RequestBytes; + DateTime StartTime{0}; + DateTime EndTime{0}; + std::filesystem::path BasePath; }; bool m_InMemory = false; @@ -787,7 +788,8 @@ RecordedRequestsReader::BeginRead(const std::filesystem::path& BasePath, bool In .RequestCount = Segment["entry_count"sv].AsUInt64(), .RequestBytes = 0, .StartTime = Segment["time_start"sv].AsDateTime(), - .EndTime = Segment["time_end"sv].AsDateTime()}); + .EndTime = Segment["time_end"sv].AsDateTime(), + .BasePath = ZcbPath.parent_path()}); TotalRequestCount += Info.RequestCount; MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex); @@ -833,11 +835,11 @@ RecordedRequestsReader::EndRead() RecordedRequestInfo RecordedRequestsReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) const { - auto EnsureSegment = [&](uint64_t SegmentIndex) -> const RecordedRequestsSegmentReader& { + auto EnsureSegment = [&](const SegmentInfo& Segment) -> const RecordedRequestsSegmentReader& { { RwLock::SharedLockScope _(m_SegmentLock); - if (auto SegmentReaderPtr = m_SegmentReaders[SegmentIndex].get()) + if (auto SegmentReaderPtr = m_SegmentReaders[Segment.SegmentIndex].get()) { return *SegmentReaderPtr; } @@ -845,12 +847,12 @@ RecordedRequestsReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) RwLock::ExclusiveLockScope _(m_SegmentLock); - auto& SegmentReaderPtr = m_SegmentReaders[SegmentIndex]; + auto& SegmentReaderPtr = m_SegmentReaders[Segment.SegmentIndex]; if (!SegmentReaderPtr) { RecordedRequestsSegmentReader* NewSegment = new RecordedRequestsSegmentReader; - NewSegment->BeginRead(m_BasePath / MakeSegmentPath(SegmentIndex), m_InMemory); + NewSegment->BeginRead(Segment.BasePath, m_InMemory); SegmentReaderPtr.reset(NewSegment); } @@ -859,11 +861,11 @@ RecordedRequestsReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) // This is pretty slow for large replays, should have an index lookup for this instead - for (auto& Segment : m_KnownSegments) + for (const SegmentInfo& Segment : m_KnownSegments) { if (Segment.RequestCount > RequestIndex) { - return EnsureSegment(Segment.SegmentIndex).ReadRequest(RequestIndex, OutBuffer); + return EnsureSegment(Segment).ReadRequest(RequestIndex, OutBuffer); } else { @@ -899,6 +901,19 @@ public: virtual uint64_t GetRequestCount() const override { return m_RequestCount; } + virtual void IterateRange(uint64_t RequestIndex, + uint64_t EndRequestIndex, + std::function<void(const RecordedRequestInfo& Info, const IoBuffer& RequestBuffer)>&& Func) override + { + while (RequestIndex < EndRequestIndex) + { + IoBuffer Buffer; + RecordedRequestInfo Info = m_RequestReader.ReadRequest(RequestIndex, /* out */ Buffer); + Func(Info, Buffer); + ++RequestIndex; + }; + } + private: std::uint64_t m_RequestCount; RecordedRequestsReader m_RequestReader; diff --git a/src/zenutil/include/zenutil/cache/rpcrecording.h b/src/zenutil/include/zenutil/cache/rpcrecording.h index c63024141..9db3b46f9 100644 --- a/src/zenutil/include/zenutil/cache/rpcrecording.h +++ b/src/zenutil/include/zenutil/cache/rpcrecording.h @@ -39,7 +39,10 @@ class IRpcRequestAnalyzer { public: virtual ~IRpcRequestAnalyzer() {} - virtual uint64_t GetRequestCount() const = 0; + virtual uint64_t GetRequestCount() const = 0; + virtual void IterateRange(uint64_t RequestIndex, + uint64_t EndRequestIndex, + std::function<void(const RecordedRequestInfo& Info, const IoBuffer& RequestBuffer)>&& Func) = 0; }; std::unique_ptr<cache::IRpcRequestRecorder> MakeDiskRequestRecorder(const std::filesystem::path& BasePath); |