aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2025-06-11 15:13:53 +0200
committerStefan Boberg <[email protected]>2025-06-11 15:13:53 +0200
commit5c5ac2c01991dbc7f3b1560f7a2ec7b40cf5008e (patch)
tree98117be92d2968952268ff24f803bcfb847556ab
parentfaster scavenge (#417) (diff)
downloadzen-5c5ac2c01991dbc7f3b1560f7a2ec7b40cf5008e.tar.xz
zen-5c5ac2c01991dbc7f3b1560f7a2ec7b40cf5008e.zip
Skeleton of rpc recording analysis command
Also changes the RecordedRequestsReader segment recovery implementation so that it can traverse a folder hierarchy to find available recording segments
-rw-r--r--src/zen/cmds/rpcreplay_cmd.cpp36
-rw-r--r--src/zen/cmds/rpcreplay_cmd.h15
-rw-r--r--src/zen/zen.cpp2
-rw-r--r--src/zenutil/cache/rpcrecording.cpp97
4 files changed, 123 insertions, 27 deletions
diff --git a/src/zen/cmds/rpcreplay_cmd.cpp b/src/zen/cmds/rpcreplay_cmd.cpp
index 4fc38d92a..270213045 100644
--- a/src/zen/cmds/rpcreplay_cmd.cpp
+++ b/src/zen/cmds/rpcreplay_cmd.cpp
@@ -177,7 +177,7 @@ RpcReplayCommand::~RpcReplayCommand() = default;
int
RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
- ZEN_UNUSED(GlobalOptions, argc, argv);
+ ZEN_UNUSED(GlobalOptions);
if (!ParseOptions(argc, argv))
{
@@ -482,4 +482,38 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
return 0;
}
+//////////////////////////////////////////////////////////////////////////
+
+RpcReplayAnalyzeCommand::RpcReplayAnalyzeCommand()
+{
+ m_Options.add_options()("h,help", "Print help");
+ m_Options.add_option("", "p", "path", "Recording file path", cxxopts::value(m_RecordingPath), "<path>");
+ m_Options.add_option("", "", "dry", "Do a dry run", cxxopts::value(m_DryRun), "<enable>");
+
+ m_Options.parse_positional("path");
+}
+
+RpcReplayAnalyzeCommand::~RpcReplayAnalyzeCommand()
+{
+}
+
+int
+RpcReplayAnalyzeCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
+{
+ ZEN_UNUSED(GlobalOptions);
+
+ if (!ParseOptions(argc, argv))
+ {
+ return 0;
+ }
+
+ const bool InMemory = true;
+ std::unique_ptr<cache::IRpcRequestReplayer> Replayer = cache::MakeDiskRequestReplayer(m_RecordingPath, InMemory);
+ uint64_t EntryCount = Replayer->GetRequestCount();
+
+ ZEN_CONSOLE("entry count = {}", EntryCount);
+
+ return 0;
+}
+
} // namespace zen
diff --git a/src/zen/cmds/rpcreplay_cmd.h b/src/zen/cmds/rpcreplay_cmd.h
index 42cdd4ac1..35ff62463 100644
--- a/src/zen/cmds/rpcreplay_cmd.h
+++ b/src/zen/cmds/rpcreplay_cmd.h
@@ -63,4 +63,19 @@ private:
bool m_DryRun = false;
};
+class RpcReplayAnalyzeCommand : public CacheStoreCommand
+{
+public:
+ RpcReplayAnalyzeCommand();
+ ~RpcReplayAnalyzeCommand();
+
+ virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override;
+ virtual cxxopts::Options& Options() override { return m_Options; }
+
+private:
+ cxxopts::Options m_Options{"rpc-record-analyze", "Analyze a previously recorded session of cache rpc requests"};
+ std::string m_RecordingPath;
+ bool m_DryRun = false;
+};
+
} // namespace zen
diff --git a/src/zen/zen.cpp b/src/zen/zen.cpp
index 53fdf3d36..56c5bcc09 100644
--- a/src/zen/zen.cpp
+++ b/src/zen/zen.cpp
@@ -641,6 +641,7 @@ main(int argc, char** argv)
ProjectInfoCommand ProjectInfoCmd;
ProjectStatsCommand ProjectStatsCmd;
PsCommand PsCmd;
+ RpcReplayAnalyzeCommand RpcAnalyzeCmd;
RpcReplayCommand RpcReplayCmd;
RpcStartRecordingCommand RpcStartRecordingCmd;
RpcStopRecordingCommand RpcStopRecordingCmd;
@@ -698,6 +699,7 @@ main(int argc, char** argv)
{"project-info", &ProjectInfoCmd, "Info on project or project oplog"},
{"project-stats", &ProjectStatsCmd, "Stats on project store"},
{"ps", &PsCmd, "Enumerate running zen server instances"},
+ {"rpc-record-analyze", &RpcAnalyzeCmd, "Analyzes a previously recorded session of rpc requests"},
{"rpc-record-replay", &RpcReplayCmd, "Replays a previously recorded session of rpc requests"},
{"rpc-record-start", &RpcStartRecordingCmd, "Starts recording of cache rpc requests on a host"},
{"rpc-record-stop", &RpcStopRecordingCmd, "Stops recording of cache rpc requests on a host"},
diff --git a/src/zenutil/cache/rpcrecording.cpp b/src/zenutil/cache/rpcrecording.cpp
index 46e80f6b7..2eb8b3258 100644
--- a/src/zenutil/cache/rpcrecording.cpp
+++ b/src/zenutil/cache/rpcrecording.cpp
@@ -4,6 +4,7 @@
#include <zencore/basicfile.h>
#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinaryvalidation.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
@@ -931,29 +932,60 @@ RecordedRequestsReader::BeginRead(const std::filesystem::path& BasePath, bool In
uint64_t TotalRequestCount = 0;
uint64_t MaxSegmentIndex = 0;
+ auto IterateSegments = [&](const std::filesystem::path& RootPath) {
+ std::vector<std::filesystem::path> Paths;
+
+ std::error_code Ec;
+ std::filesystem::recursive_directory_iterator DirIt(RootPath, Ec);
+
+ if (!Ec)
+ {
+ for (auto& Dir : DirIt)
+ {
+ if (Dir.is_regular_file() && Dir.path().filename().string().ends_with("rpc_segment_info.zcb"))
+ {
+ Paths.emplace_back(Dir.path());
+ }
+ }
+ }
+
+ std::sort(begin(Paths), end(Paths));
+
+ return Paths;
+ };
+
try
{
- for (int SegmentIndex = 0;; ++SegmentIndex)
+ auto SegmentPaths = IterateSegments(BasePath);
+
+ for (const std::filesystem::path ZcbPath : SegmentPaths)
{
- const std::filesystem::path ZcbPath = BasePath / MakeSegmentPath(SegmentIndex) / "rpc_segment_info.zcb";
- FileContents Fc = ReadFile(ZcbPath);
+ FileContents Fc = ReadFile(ZcbPath);
if (Fc.ErrorCode)
- break;
-
- if (IoBuffer SegmentInfoBuffer = Fc.Flatten())
{
- CbObject Segment = LoadCompactBinaryObject(SegmentInfoBuffer);
-
- const SegmentInfo& Info = m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment_index"sv].AsUInt64(),
- .BaseRequestIndex = 0,
- .RequestCount = Segment["entry_count"sv].AsUInt64(),
- .RequestBytes = 0,
- .StartTime = Segment["time_start"sv].AsDateTime(),
- .EndTime = Segment["time_end"sv].AsDateTime()});
-
- TotalRequestCount += Info.RequestCount;
- MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex);
+ ZEN_WARN("Error opening '{}': {}", ZcbPath, Fc.ErrorCode.message());
+ }
+ else
+ {
+ if (IoBuffer SegmentInfoBuffer = Fc.Flatten())
+ {
+ if (ValidateCompactBinaryRange(SegmentInfoBuffer.GetView(), CbValidateMode::Default) == CbValidateError::None)
+ {
+ CbObject Segment = LoadCompactBinaryObject(SegmentInfoBuffer);
+
+ const SegmentInfo& Info =
+ m_KnownSegments.emplace_back(SegmentInfo{.SegmentIndex = Segment["segment_index"sv].AsUInt64(),
+ .BaseRequestIndex = 0,
+ .RequestCount = Segment["entry_count"sv].AsUInt64(),
+ .RequestBytes = 0,
+ .StartTime = Segment["time_start"sv].AsDateTime(),
+ .EndTime = Segment["time_end"sv].AsDateTime()});
+
+ TotalRequestCount += Info.RequestCount;
+ MaxSegmentIndex = Max(MaxSegmentIndex, Info.SegmentIndex);
+ }
+ }
}
}
}
@@ -1014,6 +1046,8 @@ RecordedRequestsReader::ReadRequest(uint64_t RequestIndex, IoBuffer& OutBuffer)
return *(SegmentReaderPtr.get());
};
+ // This is pretty slow for large replays, should have an index lookup for this instead
+
for (auto& Segment : m_KnownSegments)
{
if (Segment.RequestCount > RequestIndex)
@@ -1055,24 +1089,35 @@ public:
}
virtual ~DiskRequestReplayer() { m_RequestReader.EndRead(); }
- static bool IsCompatible(const std::filesystem::path& BasePath)
+ static bool HasSegments(const std::filesystem::path& RootPath)
{
- if (IsFile(BasePath / "rpc_recording_info.zcb"))
+ std::error_code Ec;
+ std::filesystem::recursive_directory_iterator DirIt(RootPath, Ec);
+
+ if (Ec)
{
- return true;
+ return false;
}
- const std::filesystem::path SegmentZero = BasePath / MakeSegmentPath(0);
-
- if (IsFile(SegmentZero / "rpc_segment_info.zcb") && IsFile(SegmentZero / "index.bin"))
+ for (auto& Dir : DirIt)
{
- // top-level metadata is missing, possibly because of premature exit
- // on the recording side
+ if (Dir.is_regular_file() && Dir.path().filename().string().ends_with("rpc_segment_info.zcb"))
+ {
+ return true;
+ }
+ }
+
+ return false;
+ };
+ static bool IsCompatible(const std::filesystem::path& BasePath)
+ {
+ if (IsFile(BasePath / "rpc_recording_info.zcb"))
+ {
return true;
}
- return false;
+ return HasSegments(BasePath);
}
virtual uint64_t GetRequestCount() const override { return m_RequestCount; }