diff options
| author | Stefan Boberg <[email protected]> | 2025-06-11 15:13:53 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2025-06-11 15:13:53 +0200 |
| commit | 5c5ac2c01991dbc7f3b1560f7a2ec7b40cf5008e (patch) | |
| tree | 98117be92d2968952268ff24f803bcfb847556ab | |
| parent | faster scavenge (#417) (diff) | |
| download | zen-5c5ac2c01991dbc7f3b1560f7a2ec7b40cf5008e.tar.xz zen-5c5ac2c01991dbc7f3b1560f7a2ec7b40cf5008e.zip | |
Skeleton of rpc recording analysis command
Also changes the RecordedRequestsReader segment recovery implementation so that it can traverse a folder hierarchy to find available recording segments
| -rw-r--r-- | src/zen/cmds/rpcreplay_cmd.cpp | 36 | ||||
| -rw-r--r-- | src/zen/cmds/rpcreplay_cmd.h | 15 | ||||
| -rw-r--r-- | src/zen/zen.cpp | 2 | ||||
| -rw-r--r-- | src/zenutil/cache/rpcrecording.cpp | 97 |
4 files changed, 123 insertions, 27 deletions
diff --git a/src/zen/cmds/rpcreplay_cmd.cpp b/src/zen/cmds/rpcreplay_cmd.cpp index 4fc38d92a..270213045 100644 --- a/src/zen/cmds/rpcreplay_cmd.cpp +++ b/src/zen/cmds/rpcreplay_cmd.cpp @@ -177,7 +177,7 @@ RpcReplayCommand::~RpcReplayCommand() = default; int RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { - ZEN_UNUSED(GlobalOptions, argc, argv); + ZEN_UNUSED(GlobalOptions); if (!ParseOptions(argc, argv)) { @@ -482,4 +482,38 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) return 0; } +////////////////////////////////////////////////////////////////////////// + +RpcReplayAnalyzeCommand::RpcReplayAnalyzeCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "p", "path", "Recording file path", cxxopts::value(m_RecordingPath), "<path>"); + m_Options.add_option("", "", "dry", "Do a dry run", cxxopts::value(m_DryRun), "<enable>"); + + m_Options.parse_positional("path"); +} + +RpcReplayAnalyzeCommand::~RpcReplayAnalyzeCommand() +{ +} + +int +RpcReplayAnalyzeCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + ZEN_UNUSED(GlobalOptions); + + if (!ParseOptions(argc, argv)) + { + return 0; + } + + const bool InMemory = true; + std::unique_ptr<cache::IRpcRequestReplayer> Replayer = cache::MakeDiskRequestReplayer(m_RecordingPath, InMemory); + uint64_t EntryCount = Replayer->GetRequestCount(); + + ZEN_CONSOLE("entry count = {}", EntryCount); + + return 0; +} + } // namespace zen diff --git a/src/zen/cmds/rpcreplay_cmd.h b/src/zen/cmds/rpcreplay_cmd.h index 42cdd4ac1..35ff62463 100644 --- a/src/zen/cmds/rpcreplay_cmd.h +++ b/src/zen/cmds/rpcreplay_cmd.h @@ -63,4 +63,19 @@ private: bool m_DryRun = false; }; +class RpcReplayAnalyzeCommand : public CacheStoreCommand +{ +public: + RpcReplayAnalyzeCommand(); + ~RpcReplayAnalyzeCommand(); + + virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override; + virtual cxxopts::Options& Options() override { return m_Options; } + +private: + cxxopts::Options m_Options{"rpc-record-analyze", "Analyze a previously recorded session of cache rpc requests"}; + std::string m_RecordingPath; + bool m_DryRun = false; +}; + } // namespace zen diff --git a/src/zen/zen.cpp b/src/zen/zen.cpp index 53fdf3d36..56c5bcc09 100644 --- a/src/zen/zen.cpp +++ b/src/zen/zen.cpp @@ -641,6 +641,7 @@ main(int argc, char** argv) ProjectInfoCommand ProjectInfoCmd; ProjectStatsCommand ProjectStatsCmd; PsCommand PsCmd; + RpcReplayAnalyzeCommand RpcAnalyzeCmd; RpcReplayCommand RpcReplayCmd; RpcStartRecordingCommand RpcStartRecordingCmd; RpcStopRecordingCommand RpcStopRecordingCmd; @@ -698,6 +699,7 @@ main(int argc, char** argv) {"project-info", &ProjectInfoCmd, "Info on project or project oplog"}, {"project-stats", &ProjectStatsCmd, "Stats on project store"}, {"ps", &PsCmd, "Enumerate running zen server instances"}, + {"rpc-record-analyze", &RpcAnalyzeCmd, "Analyzes a previously recorded session of rpc requests"}, {"rpc-record-replay", &RpcReplayCmd, "Replays a previously recorded session of rpc requests"}, {"rpc-record-start", &RpcStartRecordingCmd, "Starts recording of cache rpc requests on a host"}, {"rpc-record-stop", &RpcStopRecordingCmd, "Stops recording of cache rpc requests on a host"}, diff --git a/src/zenutil/cache/rpcrecording.cpp b/src/zenutil/cache/rpcrecording.cpp index 46e80f6b7..2eb8b3258 100644 --- a/src/zenutil/cache/rpcrecording.cpp +++ b/src/zenutil/cache/rpcrecording.cpp @@ -4,6 +4,7 @@ #include <zencore/basicfile.h> #include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryvalidation.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> @@ -931,29 +932,60 @@ RecordedRequestsReader::BeginRead(const std::filesystem::path& BasePath, bool In uint64_t TotalRequestCount = 0; uint64_t MaxSegmentIndex = 0; + auto IterateSegments = [&](const std::filesystem::path& RootPath) { + std::vector<std::filesystem::path> Paths; + + std::error_code Ec; + std::filesystem::recursive_directory_iterator DirIt(RootPath, Ec); + + if (!Ec) + { + for (auto& Dir : DirIt) + { + if (Dir.is_regular_file() && Dir.path().filename().string().ends_with("rpc_segment_info.zcb")) + { + Paths.emplace_back(Dir.path()); + } + } + } + + std::sort(begin(Paths), end(Paths)); + + return Paths; + }; + try { - for (int SegmentIndex = 0;; ++SegmentIndex) + auto SegmentPaths = IterateSegments(BasePath); + + for (const std::filesystem::path ZcbPath : SegmentPaths) { - const std::filesystem::path ZcbPath = BasePath / MakeSegmentPath(SegmentIndex) / "rpc_segment_info.zcb"; - FileContents Fc = ReadFile(ZcbPath); + FileContents Fc = ReadFile(ZcbPath); if (Fc.ErrorCode) - break; - - if (IoBuffer SegmentInfoBuffer = Fc.Flatten()) { - CbObject Segment = LoadCompactBinaryObject(SegmentInfoBuffer); - - const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment_index"sv].AsUInt64(), - .BaseRequestIndex = 0, - .RequestCount = Segment["entry_count"sv].AsUInt64(), - .RequestBytes = 0, - .StartTime = Segment["time_start"sv].AsDateTime(), - .EndTime = Segment["time_end"sv].AsDateTime()}); - - TotalRequestCount += Info.RequestCount; - MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex); + ZEN_WARN("Error opening '{}': {}", ZcbPath, Fc.ErrorCode.message()); + } + else + { + if (IoBuffer SegmentInfoBuffer = Fc.Flatten()) + { + if (ValidateCompactBinaryRange(SegmentInfoBuffer.GetView(), CbValidateMode::Default) == CbValidateError::None) + { + CbObject Segment = LoadCompactBinaryObject(SegmentInfoBuffer); + + const SegmentInfo& Info = + m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment_index"sv].AsUInt64(), + .BaseRequestIndex = 0, + .RequestCount = Segment["entry_count"sv].AsUInt64(), + .RequestBytes = 0, + .StartTime = Segment["time_start"sv].AsDateTime(), + .EndTime = Segment["time_end"sv].AsDateTime()}); + + TotalRequestCount += Info.RequestCount; + MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex); + } + } } } } @@ -1014,6 +1046,8 @@ RecordedRequestsReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer) return *(SegmentReaderPtr.get()); }; + // This is pretty slow for large replays, should have an index lookup for this instead + for (auto& Segment : m_KnownSegments) { if (Segment.RequestCount > RequestIndex) @@ -1055,24 +1089,35 @@ public: } virtual ~DiskRequestReplayer() { m_RequestReader.EndRead(); } - static bool IsCompatible(const std::filesystem::path& BasePath) + static bool HasSegments(const std::filesystem::path& RootPath) { - if (IsFile(BasePath / "rpc_recording_info.zcb")) + std::error_code Ec; + std::filesystem::recursive_directory_iterator DirIt(RootPath, Ec); + + if (Ec) { - return true; + return false; } - const std::filesystem::path SegmentZero = BasePath / MakeSegmentPath(0); - - if (IsFile(SegmentZero / "rpc_segment_info.zcb") && IsFile(SegmentZero / "index.bin")) + for (auto& Dir : DirIt) { - // top-level metadata is missing, possibly because of premature exit - // on the recording side + if (Dir.is_regular_file() && Dir.path().filename().string().ends_with("rpc_segment_info.zcb")) + { + return true; + } + } + + return false; + }; + static bool IsCompatible(const std::filesystem::path& BasePath) + { + if (IsFile(BasePath / "rpc_recording_info.zcb")) + { return true; } - return false; + return HasSegments(BasePath); } virtual uint64_t GetRequestCount() const override { return m_RequestCount; } |