aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-02-21 13:50:51 +0100
committerGitHub <[email protected]>2023-02-21 04:50:51 -0800
commitd57115c5de8296b9a28d093cbb6cca96786943eb (patch)
treef0047ede22346a4676e767569eb64f0df7f9f170
parentRefactor CacheBuckets to allow for storing RawHash/RawSize (#232) (diff)
downloadzen-d57115c5de8296b9a28d093cbb6cca96786943eb.tar.xz
zen-d57115c5de8296b9a28d093cbb6cca96786943eb.zip
add `--showmethodstats` option for rpc replay to show method statistics (#233)
* add `--showmethodstats` option for rpc replay to show method statistics
-rw-r--r--CHANGELOG.md1
-rw-r--r--zen/cmds/rpcreplay.cpp140
-rw-r--r--zen/cmds/rpcreplay.h3
3 files changed, 104 insertions, 40 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 1598d4496..638e9261e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -9,6 +9,7 @@
- `--offset` Offset into request playback to start at
- `--stride` The stride to use when selecting requests to playback
- `--onhost` Replay the recording inside the zenserver bypassing http overhead
+ - `--showmethodstats` Show statistics of which RPC methods are used
- Improvement: FileCas now keeps an up to date index of all the entries improving performance when getting cache misses on large payloads
- Changed: Exit with failure code on port conflict rather than reporting crash to Sentry
diff --git a/zen/cmds/rpcreplay.cpp b/zen/cmds/rpcreplay.cpp
index 228a4daa9..89a1fe631 100644
--- a/zen/cmds/rpcreplay.cpp
+++ b/zen/cmds/rpcreplay.cpp
@@ -8,6 +8,7 @@
#include <zencore/timer.h>
#include <zencore/workthreadpool.h>
#include <zenhttp/httpcommon.h>
+#include <zenhttp/httpshared.h>
#include <zenutil/cache/rpcrecording.h>
ZEN_THIRD_PARTY_INCLUDES_START
@@ -20,6 +21,8 @@ ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
+using namespace std::literals;
+
RpcStartRecordingCommand::RpcStartRecordingCommand()
{
m_Options.add_options()("h,help", "Print help");
@@ -46,7 +49,7 @@ RpcStartRecordingCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char
}
cpr::Session Session;
- Session.SetUrl(fmt::format("{}/z$/exec$/start-recording", m_HostName));
+ Session.SetUrl(fmt::format("{}/z$/exec$/start-recording"sv, m_HostName));
Session.SetParameters({{"path", m_RecordingPath}});
cpr::Response Response = Session.Post();
ZEN_CONSOLE("{}", FormatResponse(Response));
@@ -74,7 +77,7 @@ RpcStopRecordingCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char*
}
cpr::Session Session;
- Session.SetUrl(fmt::format("{}/z$/exec$/stop-recording", m_HostName));
+ Session.SetUrl(fmt::format("{}/z$/exec$/stop-recording"sv, m_HostName));
cpr::Response Response = Session.Post();
ZEN_CONSOLE("{}", FormatResponse(Response));
return GetReturnCode(Response);
@@ -96,6 +99,12 @@ RpcReplayCommand::RpcReplayCommand()
m_Options.add_option("", "", "onhost", "Replay on host, bypassing http/network layer", cxxopts::value(m_OnHost), "<onhost>");
m_Options.add_option("",
"",
+ "showmethodstats",
+ "Show statistics of which RPC methods are used",
+ cxxopts::value(m_ShowMethodStats),
+ "<showmethodstats>");
+ m_Options.add_option("",
+ "",
"offset",
"Offset into request recording to start replay",
cxxopts::value(m_Offset)->default_value("0"),
@@ -131,7 +140,7 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (m_OnHost)
{
cpr::Session Session;
- Session.SetUrl(fmt::format("{}/z$/exec$/replay-recording", m_HostName));
+ Session.SetUrl(fmt::format("{}/z$/exec$/replay-recording"sv, m_HostName));
Session.SetParameters({{"path", m_RecordingPath}, {"thread-count", fmt::format("{}", m_ThreadCount)}});
cpr::Response Response = Session.Post();
ZEN_CONSOLE("{}", FormatResponse(Response));
@@ -156,7 +165,7 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
for (int ProcessIndex = 0; ProcessIndex < m_ProcessCount; ++ProcessIndex)
{
std::string CommandLine =
- fmt::format("{} rpc-record-replay --hosturl {} --path \"{}\" --offset {} --stride {} --numthreads {} --numproc {}",
+ fmt::format("{} rpc-record-replay --hosturl {} --path \"{}\" --offset {} --stride {} --numthreads {} --numproc {}"sv,
argv[0],
m_HostName,
m_RecordingPath,
@@ -178,50 +187,96 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
else
{
+ std::map<std::string, size_t> MethodTypes;
+ RwLock MethodTypesLock;
+
WorkerThreadPool WorkerPool(m_ThreadCount);
Latch WorkLatch(m_ThreadCount);
for (int WorkerIndex = 0; WorkerIndex < m_ThreadCount; ++WorkerIndex)
{
- WorkerPool.ScheduleWork(
- [HostName = m_HostName, &WorkLatch, EntryCount, &EntryOffset, &Replayer, &BytesSent, &BytesReceived, Stride = m_Stride]() {
- auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
-
- cpr::Session Session;
- Session.SetUrl(fmt::format("{}/z$/$rpc", HostName));
-
- uint64_t EntryIndex = EntryOffset.fetch_add(Stride) - Stride;
- while (EntryIndex < EntryCount)
+ WorkerPool.ScheduleWork([HostName = m_HostName,
+ &WorkLatch,
+ EntryCount,
+ &EntryOffset,
+ &Replayer,
+ &BytesSent,
+ &BytesReceived,
+ &MethodTypes,
+ &MethodTypesLock,
+ ShowMethodStats = m_ShowMethodStats,
+ Stride = m_Stride]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+
+ cpr::Session Session;
+ Session.SetUrl(fmt::format("{}/z$/$rpc"sv, HostName));
+
+ uint64_t EntryIndex = EntryOffset.fetch_add(Stride) - Stride;
+ while (EntryIndex < EntryCount)
+ {
+ IoBuffer Payload;
+ std::pair<ZenContentType, ZenContentType> Types = Replayer->GetRequest(EntryIndex, Payload);
+ ZenContentType RequestContentType = Types.first;
+ ZenContentType AcceptContentType = Types.second;
+
+ if (ShowMethodStats)
{
- IoBuffer Payload;
- std::pair<ZenContentType, ZenContentType> Types = Replayer->GetRequest(EntryIndex, Payload);
- ZenContentType RequestContentType = Types.first;
- ZenContentType AcceptContentType = Types.second;
-
- Session.SetHeader({{"Content-Type", std::string(MapContentTypeToString(RequestContentType))},
- {"Accept", std::string(MapContentTypeToString(AcceptContentType))}});
- uint64_t Offset = 0;
- auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) {
- size = Min<size_t>(size, Payload.GetSize() - Offset);
- IoBuffer PayloadRange = IoBuffer(Payload, Offset, size);
- MutableMemoryView Data(buffer, size);
- Data.CopyFrom(PayloadRange.GetView());
- Offset += size;
- return true;
- };
- Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
- cpr::Response Response = Session.Post();
- BytesSent.fetch_add(Payload.GetSize());
- if (Response.error || !(IsHttpSuccessCode(Response.status_code) ||
- Response.status_code == gsl::narrow<long>(HttpResponseCode::NotFound)))
+ std::string MethodName;
+ switch (RequestContentType)
{
- ZEN_CONSOLE("{}", FormatResponse(Response));
- break;
+ case ZenContentType::kCbPackage:
+ {
+ CbPackage Request;
+ if (ParsePackageMessageWithLegacyFallback(Payload, Request))
+ {
+ MethodName = Request.GetObject()["Method"sv].AsString();
+ }
+ }
+ break;
+ case ZenContentType::kCbObject:
+ {
+ CbObject Request = LoadCompactBinaryObject(Payload);
+ MethodName = Request["Method"sv].AsString();
+ }
+ break;
}
- BytesReceived.fetch_add(Response.downloaded_bytes);
- EntryIndex = EntryOffset.fetch_add(Stride) - Stride;
+ {
+ RwLock::ExclusiveLockScope __(MethodTypesLock);
+ if (auto It = MethodTypes.find(MethodName); It != MethodTypes.end())
+ {
+ It->second++;
+ }
+ else
+ {
+ MethodTypes[MethodName] = 1;
+ }
+ }
+ }
+
+ Session.SetHeader({{"Content-Type", std::string(MapContentTypeToString(RequestContentType))},
+ {"Accept", std::string(MapContentTypeToString(AcceptContentType))}});
+ uint64_t Offset = 0;
+ auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) {
+ size = Min<size_t>(size, Payload.GetSize() - Offset);
+ IoBuffer PayloadRange = IoBuffer(Payload, Offset, size);
+ MutableMemoryView Data(buffer, size);
+ Data.CopyFrom(PayloadRange.GetView());
+ Offset += size;
+ return true;
+ };
+ Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
+ cpr::Response Response = Session.Post();
+ BytesSent.fetch_add(Payload.GetSize());
+ if (Response.error ||
+ !(IsHttpSuccessCode(Response.status_code) || Response.status_code == gsl::narrow<long>(HttpResponseCode::NotFound)))
+ {
+ ZEN_CONSOLE("{}", FormatResponse(Response));
+ break;
}
- });
+ BytesReceived.fetch_add(Response.downloaded_bytes);
+ EntryIndex = EntryOffset.fetch_add(Stride) - Stride;
+ }
+ });
}
while (!WorkLatch.Wait(1000))
@@ -232,6 +287,13 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
NiceBytes(BytesSent.load()),
NiceBytes(BytesReceived.load()));
}
+ if (m_ShowMethodStats)
+ {
+ for (const auto& It : MethodTypes)
+ {
+ ZEN_CONSOLE("{}: {}", It.first, It.second);
+ }
+ }
}
const uint64_t RequestsSent = (EntryOffset.load() - m_Offset) / m_Stride;
diff --git a/zen/cmds/rpcreplay.h b/zen/cmds/rpcreplay.h
index 6f476f01e..f1acf1eac 100644
--- a/zen/cmds/rpcreplay.h
+++ b/zen/cmds/rpcreplay.h
@@ -48,7 +48,8 @@ private:
cxxopts::Options m_Options{"rpc-record-replay", "Replays a previously recorded session of cache rpc requests to a target host"};
std::string m_HostName;
std::string m_RecordingPath;
- bool m_OnHost = false;
+ bool m_OnHost = false;
+ bool m_ShowMethodStats = false;
int m_ProcessCount;
int m_ThreadCount;
uint64_t m_Offset;