diff options
| author | Dan Engelbrecht <[email protected]> | 2023-02-21 13:50:51 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-02-21 04:50:51 -0800 |
| commit | d57115c5de8296b9a28d093cbb6cca96786943eb (patch) | |
| tree | f0047ede22346a4676e767569eb64f0df7f9f170 | |
| parent | Refactor CacheBuckets to allow for storing RawHash/RawSize (#232) (diff) | |
| download | zen-d57115c5de8296b9a28d093cbb6cca96786943eb.tar.xz zen-d57115c5de8296b9a28d093cbb6cca96786943eb.zip | |
add `--showmethodstats` option for rpc replay to show method statistics (#233)
* add `--showmethodstats` option for rpc replay to show method statistics
| -rw-r--r-- | CHANGELOG.md | 1 | ||||
| -rw-r--r-- | zen/cmds/rpcreplay.cpp | 140 | ||||
| -rw-r--r-- | zen/cmds/rpcreplay.h | 3 |
3 files changed, 104 insertions, 40 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 1598d4496..638e9261e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ - `--offset` Offset into request playback to start at - `--stride` The stride to use when selecting requests to playback - `--onhost` Replay the recording inside the zenserver bypassing http overhead + - `--showmethodstats` Show statistics of which RPC methods are used - Improvement: FileCas now keeps an up to date index of all the entries improving performance when getting cache misses on large payloads - Changed: Exit with failure code on port conflict rather than reporting crash to Sentry diff --git a/zen/cmds/rpcreplay.cpp b/zen/cmds/rpcreplay.cpp index 228a4daa9..89a1fe631 100644 --- a/zen/cmds/rpcreplay.cpp +++ b/zen/cmds/rpcreplay.cpp @@ -8,6 +8,7 @@ #include <zencore/timer.h> #include <zencore/workthreadpool.h> #include <zenhttp/httpcommon.h> +#include <zenhttp/httpshared.h> #include <zenutil/cache/rpcrecording.h> ZEN_THIRD_PARTY_INCLUDES_START @@ -20,6 +21,8 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { +using namespace std::literals; + RpcStartRecordingCommand::RpcStartRecordingCommand() { m_Options.add_options()("h,help", "Print help"); @@ -46,7 +49,7 @@ RpcStartRecordingCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char } cpr::Session Session; - Session.SetUrl(fmt::format("{}/z$/exec$/start-recording", m_HostName)); + Session.SetUrl(fmt::format("{}/z$/exec$/start-recording"sv, m_HostName)); Session.SetParameters({{"path", m_RecordingPath}}); cpr::Response Response = Session.Post(); ZEN_CONSOLE("{}", FormatResponse(Response)); @@ -74,7 +77,7 @@ RpcStopRecordingCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char* } cpr::Session Session; - Session.SetUrl(fmt::format("{}/z$/exec$/stop-recording", m_HostName)); + Session.SetUrl(fmt::format("{}/z$/exec$/stop-recording"sv, m_HostName)); cpr::Response Response = Session.Post(); ZEN_CONSOLE("{}", FormatResponse(Response)); return GetReturnCode(Response); @@ -96,6 +99,12 @@ RpcReplayCommand::RpcReplayCommand() m_Options.add_option("", "", "onhost", "Replay on host, bypassing http/network layer", cxxopts::value(m_OnHost), "<onhost>"); m_Options.add_option("", "", + "showmethodstats", + "Show statistics of which RPC methods are used", + cxxopts::value(m_ShowMethodStats), + "<showmethodstats>"); + m_Options.add_option("", + "", "offset", "Offset into request recording to start replay", cxxopts::value(m_Offset)->default_value("0"), @@ -131,7 +140,7 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (m_OnHost) { cpr::Session Session; - Session.SetUrl(fmt::format("{}/z$/exec$/replay-recording", m_HostName)); + Session.SetUrl(fmt::format("{}/z$/exec$/replay-recording"sv, m_HostName)); Session.SetParameters({{"path", m_RecordingPath}, {"thread-count", fmt::format("{}", m_ThreadCount)}}); cpr::Response Response = Session.Post(); ZEN_CONSOLE("{}", FormatResponse(Response)); @@ -156,7 +165,7 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) for (int ProcessIndex = 0; ProcessIndex < m_ProcessCount; ++ProcessIndex) { std::string CommandLine = - fmt::format("{} rpc-record-replay --hosturl {} --path \"{}\" --offset {} --stride {} --numthreads {} --numproc {}", + fmt::format("{} rpc-record-replay --hosturl {} --path \"{}\" --offset {} --stride {} --numthreads {} --numproc {}"sv, argv[0], m_HostName, m_RecordingPath, @@ -178,50 +187,96 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } else { + std::map<std::string, size_t> MethodTypes; + RwLock MethodTypesLock; + WorkerThreadPool WorkerPool(m_ThreadCount); Latch WorkLatch(m_ThreadCount); for (int WorkerIndex = 0; WorkerIndex < m_ThreadCount; ++WorkerIndex) { - WorkerPool.ScheduleWork( - [HostName = m_HostName, &WorkLatch, EntryCount, &EntryOffset, &Replayer, &BytesSent, &BytesReceived, Stride = m_Stride]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); - - cpr::Session Session; - Session.SetUrl(fmt::format("{}/z$/$rpc", HostName)); - - uint64_t EntryIndex = EntryOffset.fetch_add(Stride) - Stride; - while (EntryIndex < EntryCount) + WorkerPool.ScheduleWork([HostName = m_HostName, + &WorkLatch, + EntryCount, + &EntryOffset, + &Replayer, + &BytesSent, + &BytesReceived, + &MethodTypes, + &MethodTypesLock, + ShowMethodStats = m_ShowMethodStats, + Stride = m_Stride]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + + cpr::Session Session; + Session.SetUrl(fmt::format("{}/z$/$rpc"sv, HostName)); + + uint64_t EntryIndex = EntryOffset.fetch_add(Stride) - Stride; + while (EntryIndex < EntryCount) + { + IoBuffer Payload; + std::pair<ZenContentType, ZenContentType> Types = Replayer->GetRequest(EntryIndex, Payload); + ZenContentType RequestContentType = Types.first; + ZenContentType AcceptContentType = Types.second; + + if (ShowMethodStats) { - IoBuffer Payload; - std::pair<ZenContentType, ZenContentType> Types = Replayer->GetRequest(EntryIndex, Payload); - ZenContentType RequestContentType = Types.first; - ZenContentType AcceptContentType = Types.second; - - Session.SetHeader({{"Content-Type", std::string(MapContentTypeToString(RequestContentType))}, - {"Accept", std::string(MapContentTypeToString(AcceptContentType))}}); - uint64_t Offset = 0; - auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, Payload.GetSize() - Offset); - IoBuffer PayloadRange = IoBuffer(Payload, Offset, size); - MutableMemoryView Data(buffer, size); - Data.CopyFrom(PayloadRange.GetView()); - Offset += size; - return true; - }; - Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); - cpr::Response Response = Session.Post(); - BytesSent.fetch_add(Payload.GetSize()); - if (Response.error || !(IsHttpSuccessCode(Response.status_code) || - Response.status_code == gsl::narrow<long>(HttpResponseCode::NotFound))) + std::string MethodName; + switch (RequestContentType) { - ZEN_CONSOLE("{}", FormatResponse(Response)); - break; + case ZenContentType::kCbPackage: + { + CbPackage Request; + if (ParsePackageMessageWithLegacyFallback(Payload, Request)) + { + MethodName = Request.GetObject()["Method"sv].AsString(); + } + } + break; + case ZenContentType::kCbObject: + { + CbObject Request = LoadCompactBinaryObject(Payload); + MethodName = Request["Method"sv].AsString(); + } + break; } - BytesReceived.fetch_add(Response.downloaded_bytes); - EntryIndex = EntryOffset.fetch_add(Stride) - Stride; + { + RwLock::ExclusiveLockScope __(MethodTypesLock); + if (auto It = MethodTypes.find(MethodName); It != MethodTypes.end()) + { + It->second++; + } + else + { + MethodTypes[MethodName] = 1; + } + } + } + + Session.SetHeader({{"Content-Type", std::string(MapContentTypeToString(RequestContentType))}, + {"Accept", std::string(MapContentTypeToString(AcceptContentType))}}); + uint64_t Offset = 0; + auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) { + size = Min<size_t>(size, Payload.GetSize() - Offset); + IoBuffer PayloadRange = IoBuffer(Payload, Offset, size); + MutableMemoryView Data(buffer, size); + Data.CopyFrom(PayloadRange.GetView()); + Offset += size; + return true; + }; + Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); + cpr::Response Response = Session.Post(); + BytesSent.fetch_add(Payload.GetSize()); + if (Response.error || + !(IsHttpSuccessCode(Response.status_code) || Response.status_code == gsl::narrow<long>(HttpResponseCode::NotFound))) + { + ZEN_CONSOLE("{}", FormatResponse(Response)); + break; } - }); + BytesReceived.fetch_add(Response.downloaded_bytes); + EntryIndex = EntryOffset.fetch_add(Stride) - Stride; + } + }); } while (!WorkLatch.Wait(1000)) @@ -232,6 +287,13 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) NiceBytes(BytesSent.load()), NiceBytes(BytesReceived.load())); } + if (m_ShowMethodStats) + { + for (const auto& It : MethodTypes) + { + ZEN_CONSOLE("{}: {}", It.first, It.second); + } + } } const uint64_t RequestsSent = (EntryOffset.load() - m_Offset) / m_Stride; diff --git a/zen/cmds/rpcreplay.h b/zen/cmds/rpcreplay.h index 6f476f01e..f1acf1eac 100644 --- a/zen/cmds/rpcreplay.h +++ b/zen/cmds/rpcreplay.h @@ -48,7 +48,8 @@ private: cxxopts::Options m_Options{"rpc-record-replay", "Replays a previously recorded session of cache rpc requests to a target host"}; std::string m_HostName; std::string m_RecordingPath; - bool m_OnHost = false; + bool m_OnHost = false; + bool m_ShowMethodStats = false; int m_ProcessCount; int m_ThreadCount; uint64_t m_Offset; |