aboutsummaryrefslogtreecommitdiff
path: root/src/zen/cmds/rpcreplay_cmd.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-12-19 12:06:13 +0100
committerGitHub <[email protected]>2023-12-19 12:06:13 +0100
commit519d942d809e740a3b1fe5a1f6a57a4cfe43408b (patch)
tree9b3c084e21bb7fd5e6bb3335e890647062d0703b /src/zen/cmds/rpcreplay_cmd.cpp
parentadded mimalloc_hooks (diff)
parentensure we can build without trace (#619) (diff)
downloadarchived-zen-273-integrated-memory-tracking.tar.xz
archived-zen-273-integrated-memory-tracking.zip
Merge branch 'main' into 273-integrated-memory-tracking273-integrated-memory-tracking
Diffstat (limited to 'src/zen/cmds/rpcreplay_cmd.cpp')
-rw-r--r--src/zen/cmds/rpcreplay_cmd.cpp234
1 files changed, 120 insertions, 114 deletions
diff --git a/src/zen/cmds/rpcreplay_cmd.cpp b/src/zen/cmds/rpcreplay_cmd.cpp
index 409d3393e..d307ef0e8 100644
--- a/src/zen/cmds/rpcreplay_cmd.cpp
+++ b/src/zen/cmds/rpcreplay_cmd.cpp
@@ -201,6 +201,8 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
throw std::runtime_error(fmt::format("could not find recording at '{}'", m_RecordingPath));
}
+ m_ThreadCount = Max(m_ThreadCount, 1);
+
Stopwatch TotalTimer;
if (m_OnHost)
@@ -282,152 +284,156 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
uint64_t EntryIndex = EntryOffset.fetch_add(m_Stride);
while (EntryIndex < EntryCount)
{
- IoBuffer Payload;
- zen::cache::RecordedRequestInfo RequestInfo = Replayer->GetRequest(EntryIndex, /* out */ Payload);
-
- CbPackage RequestPackage;
- CbObject Request;
+ IoBuffer Payload;
+ const zen::cache::RecordedRequestInfo RequestInfo = Replayer->GetRequest(EntryIndex, /* out */ Payload);
- switch (RequestInfo.ContentType)
+ if (RequestInfo != zen::cache::RecordedRequestInfo::NullRequest)
{
- case ZenContentType::kCbPackage:
- {
- if (ParsePackageMessageWithLegacyFallback(Payload, RequestPackage))
+ CbPackage RequestPackage;
+ CbObject Request;
+
+ switch (RequestInfo.ContentType)
+ {
+ case ZenContentType::kCbPackage:
{
- Request = RequestPackage.GetObject();
+ if (ParsePackageMessageWithLegacyFallback(Payload, RequestPackage))
+ {
+ Request = RequestPackage.GetObject();
+ }
}
- }
- break;
- case ZenContentType::kCbObject:
- {
- Request = LoadCompactBinaryObject(Payload);
- }
- break;
- }
+ break;
+ case ZenContentType::kCbObject:
+ {
+ Request = LoadCompactBinaryObject(Payload);
+ }
+ break;
+ }
- RpcAcceptOptions OriginalAcceptOptions = static_cast<RpcAcceptOptions>(Request["AcceptFlags"sv].AsUInt16(0u));
- int OriginalProcessPid = Request["Pid"sv].AsInt32(0);
+ RpcAcceptOptions OriginalAcceptOptions = static_cast<RpcAcceptOptions>(Request["AcceptFlags"sv].AsUInt16(0u));
+ int OriginalProcessPid = Request["Pid"sv].AsInt32(0);
- int AdjustedPid = 0;
- RpcAcceptOptions AdjustedAcceptOptions = RpcAcceptOptions::kNone;
+ int AdjustedPid = 0;
+ RpcAcceptOptions AdjustedAcceptOptions = RpcAcceptOptions::kNone;
- if (!m_DisableLocalRefs)
- {
- if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowLocalReferences) || m_ForceAllowLocalRefs)
+ if (!m_DisableLocalRefs)
{
- AdjustedAcceptOptions |= RpcAcceptOptions::kAllowLocalReferences;
- if (!m_DisablePartialLocalRefs)
+ if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowLocalReferences) ||
+ m_ForceAllowLocalRefs)
{
- if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowPartialLocalReferences) ||
- m_ForceAllowPartialLocalRefs)
+ AdjustedAcceptOptions |= RpcAcceptOptions::kAllowLocalReferences;
+ if (!m_DisablePartialLocalRefs)
{
- AdjustedAcceptOptions |= RpcAcceptOptions::kAllowPartialLocalReferences;
+ if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowPartialLocalReferences) ||
+ m_ForceAllowPartialLocalRefs)
+ {
+ AdjustedAcceptOptions |= RpcAcceptOptions::kAllowPartialLocalReferences;
+ }
}
- }
- if (!m_DisableLocalHandleRefs)
- {
- if (OriginalProcessPid != 0 || m_ForceAllowLocalHandleRef)
+ if (!m_DisableLocalHandleRefs)
{
- AdjustedPid = GetCurrentProcessId();
+ if (OriginalProcessPid != 0 || m_ForceAllowLocalHandleRef)
+ {
+ AdjustedPid = GetCurrentProcessId();
+ }
}
}
}
- }
- if (m_ShowMethodStats)
- {
- std::string MethodName = std::string(Request["Method"sv].AsString());
- if (auto It = LocalMethodTypes.find(MethodName); It != LocalMethodTypes.end())
- {
- It->second++;
- }
- else
+ if (m_ShowMethodStats)
{
- LocalMethodTypes[MethodName] = 1;
+ std::string MethodName = std::string(Request["Method"sv].AsString());
+ if (auto It = LocalMethodTypes.find(MethodName); It != LocalMethodTypes.end())
+ {
+ It->second++;
+ }
+ else
+ {
+ LocalMethodTypes[MethodName] = 1;
+ }
}
- }
- if (OriginalAcceptOptions != AdjustedAcceptOptions || OriginalProcessPid != AdjustedPid)
- {
- CbObjectWriter RequestCopyWriter;
- for (const CbFieldView& Field : Request)
+ if (OriginalAcceptOptions != AdjustedAcceptOptions || OriginalProcessPid != AdjustedPid)
{
- if (!Field.HasName())
+ CbObjectWriter RequestCopyWriter;
+ for (const CbFieldView& Field : Request)
{
- RequestCopyWriter.AddField(Field);
- continue;
+ if (!Field.HasName())
+ {
+ RequestCopyWriter.AddField(Field);
+ continue;
+ }
+ std::string_view FieldName = Field.GetName();
+ if (FieldName == "Pid"sv)
+ {
+ continue;
+ }
+ if (FieldName == "AcceptFlags"sv)
+ {
+ continue;
+ }
+ RequestCopyWriter.AddField(FieldName, Field);
}
- std::string_view FieldName = Field.GetName();
- if (FieldName == "Pid"sv)
+ if (AdjustedPid != 0)
{
- continue;
+ RequestCopyWriter.AddInteger("Pid"sv, AdjustedPid);
}
- if (FieldName == "AcceptFlags"sv)
+ if (AdjustedAcceptOptions != RpcAcceptOptions::kNone)
{
- continue;
+ RequestCopyWriter.AddInteger("AcceptFlags"sv, static_cast<uint16_t>(AdjustedAcceptOptions));
}
- RequestCopyWriter.AddField(FieldName, Field);
- }
- if (AdjustedPid != 0)
- {
- RequestCopyWriter.AddInteger("Pid"sv, AdjustedPid);
- }
- if (AdjustedAcceptOptions != RpcAcceptOptions::kNone)
- {
- RequestCopyWriter.AddInteger("AcceptFlags"sv, static_cast<uint16_t>(AdjustedAcceptOptions));
- }
- if (RequestInfo.ContentType == ZenContentType::kCbPackage)
- {
- RequestPackage.SetObject(RequestCopyWriter.Save());
- std::vector<IoBuffer> Buffers = FormatPackageMessage(RequestPackage);
- std::vector<SharedBuffer> SharedBuffers(Buffers.begin(), Buffers.end());
- Payload = CompositeBuffer(std::move(SharedBuffers)).Flatten().AsIoBuffer();
- }
- else
- {
- RequestCopyWriter.Finalize();
- Payload = IoBuffer(RequestCopyWriter.GetSaveSize());
- RequestCopyWriter.Save(Payload.GetMutableView());
+ if (RequestInfo.ContentType == ZenContentType::kCbPackage)
+ {
+ RequestPackage.SetObject(RequestCopyWriter.Save());
+ std::vector<IoBuffer> Buffers = FormatPackageMessage(RequestPackage);
+ std::vector<SharedBuffer> SharedBuffers(Buffers.begin(), Buffers.end());
+ Payload = CompositeBuffer(std::move(SharedBuffers)).Flatten().AsIoBuffer();
+ }
+ else
+ {
+ RequestCopyWriter.Finalize();
+ Payload = IoBuffer(RequestCopyWriter.GetSaveSize());
+ RequestCopyWriter.Save(Payload.GetMutableView());
+ }
}
- }
-
- if (!m_DryRun)
- {
- StringBuilder<32> SessionIdString;
- if (RequestInfo.SessionId != Oid::Zero)
+ if (!m_DryRun)
{
- RequestInfo.SessionId.ToString(SessionIdString);
- }
- else
- {
- GetSessionId().ToString(SessionIdString);
- }
+ StringBuilder<32> SessionIdString;
- Session.SetHeader({{"Content-Type", std::string(MapContentTypeToString(RequestInfo.ContentType))},
- {"Accept", std::string(MapContentTypeToString(RequestInfo.AcceptType))},
- {"UE-Session", std::string(SessionIdString)}});
-
- uint64_t Offset = 0;
- auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) {
- size = Min<size_t>(size, Payload.GetSize() - Offset);
- IoBuffer PayloadRange = IoBuffer(Payload, Offset, size);
- MutableMemoryView Data(buffer, size);
- Data.CopyFrom(PayloadRange.GetView());
- Offset += size;
- return true;
- };
- Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
- cpr::Response Response = Session.Post();
- BytesSent.fetch_add(Payload.GetSize());
- if (Response.error || !(IsHttpSuccessCode(Response.status_code) ||
- Response.status_code == gsl::narrow<long>(HttpResponseCode::NotFound)))
- {
- ZEN_CONSOLE("{}", FormatHttpResponse(Response));
- break;
+ if (RequestInfo.SessionId != Oid::Zero)
+ {
+ RequestInfo.SessionId.ToString(SessionIdString);
+ }
+ else
+ {
+ GetSessionId().ToString(SessionIdString);
+ }
+
+ Session.SetHeader({{"Content-Type", std::string(MapContentTypeToString(RequestInfo.ContentType))},
+ {"Accept", std::string(MapContentTypeToString(RequestInfo.AcceptType))},
+ {"UE-Session", std::string(SessionIdString)}});
+
+ uint64_t Offset = 0;
+ auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) {
+ size = Min<size_t>(size, Payload.GetSize() - Offset);
+ IoBuffer PayloadRange = IoBuffer(Payload, Offset, size);
+ MutableMemoryView Data(buffer, size);
+ Data.CopyFrom(PayloadRange.GetView());
+ Offset += size;
+ return true;
+ };
+ Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
+ cpr::Response Response = Session.Post();
+ BytesSent.fetch_add(Payload.GetSize());
+ if (Response.error || !(IsHttpSuccessCode(Response.status_code) ||
+ Response.status_code == gsl::narrow<long>(HttpResponseCode::NotFound)))
+ {
+ ZEN_CONSOLE("{}", FormatHttpResponse(Response));
+ break;
+ }
+ BytesReceived.fetch_add(Response.downloaded_bytes);
}
- BytesReceived.fetch_add(Response.downloaded_bytes);
}
EntryIndex = EntryOffset.fetch_add(m_Stride);