diff options
| author | Stefan Boberg <[email protected]> | 2023-12-19 12:06:13 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-12-19 12:06:13 +0100 |
| commit | 519d942d809e740a3b1fe5a1f6a57a4cfe43408b (patch) | |
| tree | 9b3c084e21bb7fd5e6bb3335e890647062d0703b /src/zen/cmds/rpcreplay_cmd.cpp | |
| parent | added mimalloc_hooks (diff) | |
| parent | ensure we can build without trace (#619) (diff) | |
| download | archived-zen-273-integrated-memory-tracking.tar.xz archived-zen-273-integrated-memory-tracking.zip | |
Merge branch 'main' into 273-integrated-memory-tracking273-integrated-memory-tracking
Diffstat (limited to 'src/zen/cmds/rpcreplay_cmd.cpp')
| -rw-r--r-- | src/zen/cmds/rpcreplay_cmd.cpp | 234 |
1 files changed, 120 insertions, 114 deletions
diff --git a/src/zen/cmds/rpcreplay_cmd.cpp b/src/zen/cmds/rpcreplay_cmd.cpp index 409d3393e..d307ef0e8 100644 --- a/src/zen/cmds/rpcreplay_cmd.cpp +++ b/src/zen/cmds/rpcreplay_cmd.cpp @@ -201,6 +201,8 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) throw std::runtime_error(fmt::format("could not find recording at '{}'", m_RecordingPath)); } + m_ThreadCount = Max(m_ThreadCount, 1); + Stopwatch TotalTimer; if (m_OnHost) @@ -282,152 +284,156 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) uint64_t EntryIndex = EntryOffset.fetch_add(m_Stride); while (EntryIndex < EntryCount) { - IoBuffer Payload; - zen::cache::RecordedRequestInfo RequestInfo = Replayer->GetRequest(EntryIndex, /* out */ Payload); - - CbPackage RequestPackage; - CbObject Request; + IoBuffer Payload; + const zen::cache::RecordedRequestInfo RequestInfo = Replayer->GetRequest(EntryIndex, /* out */ Payload); - switch (RequestInfo.ContentType) + if (RequestInfo != zen::cache::RecordedRequestInfo::NullRequest) { - case ZenContentType::kCbPackage: - { - if (ParsePackageMessageWithLegacyFallback(Payload, RequestPackage)) + CbPackage RequestPackage; + CbObject Request; + + switch (RequestInfo.ContentType) + { + case ZenContentType::kCbPackage: { - Request = RequestPackage.GetObject(); + if (ParsePackageMessageWithLegacyFallback(Payload, RequestPackage)) + { + Request = RequestPackage.GetObject(); + } } - } - break; - case ZenContentType::kCbObject: - { - Request = LoadCompactBinaryObject(Payload); - } - break; - } + break; + case ZenContentType::kCbObject: + { + Request = LoadCompactBinaryObject(Payload); + } + break; + } - RpcAcceptOptions OriginalAcceptOptions = static_cast<RpcAcceptOptions>(Request["AcceptFlags"sv].AsUInt16(0u)); - int OriginalProcessPid = Request["Pid"sv].AsInt32(0); + RpcAcceptOptions OriginalAcceptOptions = static_cast<RpcAcceptOptions>(Request["AcceptFlags"sv].AsUInt16(0u)); + int OriginalProcessPid = Request["Pid"sv].AsInt32(0); - int AdjustedPid = 0; - RpcAcceptOptions AdjustedAcceptOptions = RpcAcceptOptions::kNone; + int AdjustedPid = 0; + RpcAcceptOptions AdjustedAcceptOptions = RpcAcceptOptions::kNone; - if (!m_DisableLocalRefs) - { - if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowLocalReferences) || m_ForceAllowLocalRefs) + if (!m_DisableLocalRefs) { - AdjustedAcceptOptions |= RpcAcceptOptions::kAllowLocalReferences; - if (!m_DisablePartialLocalRefs) + if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowLocalReferences) || + m_ForceAllowLocalRefs) { - if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowPartialLocalReferences) || - m_ForceAllowPartialLocalRefs) + AdjustedAcceptOptions |= RpcAcceptOptions::kAllowLocalReferences; + if (!m_DisablePartialLocalRefs) { - AdjustedAcceptOptions |= RpcAcceptOptions::kAllowPartialLocalReferences; + if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowPartialLocalReferences) || + m_ForceAllowPartialLocalRefs) + { + AdjustedAcceptOptions |= RpcAcceptOptions::kAllowPartialLocalReferences; + } } - } - if (!m_DisableLocalHandleRefs) - { - if (OriginalProcessPid != 0 || m_ForceAllowLocalHandleRef) + if (!m_DisableLocalHandleRefs) { - AdjustedPid = GetCurrentProcessId(); + if (OriginalProcessPid != 0 || m_ForceAllowLocalHandleRef) + { + AdjustedPid = GetCurrentProcessId(); + } } } } - } - if (m_ShowMethodStats) - { - std::string MethodName = std::string(Request["Method"sv].AsString()); - if (auto It = LocalMethodTypes.find(MethodName); It != LocalMethodTypes.end()) - { - It->second++; - } - else + if (m_ShowMethodStats) { - LocalMethodTypes[MethodName] = 1; + std::string MethodName = std::string(Request["Method"sv].AsString()); + if (auto It = LocalMethodTypes.find(MethodName); It != LocalMethodTypes.end()) + { + It->second++; + } + else + { + LocalMethodTypes[MethodName] = 1; + } } - } - if (OriginalAcceptOptions != AdjustedAcceptOptions || OriginalProcessPid != AdjustedPid) - { - CbObjectWriter RequestCopyWriter; - for (const CbFieldView& Field : Request) + if (OriginalAcceptOptions != AdjustedAcceptOptions || OriginalProcessPid != AdjustedPid) { - if (!Field.HasName()) + CbObjectWriter RequestCopyWriter; + for (const CbFieldView& Field : Request) { - RequestCopyWriter.AddField(Field); - continue; + if (!Field.HasName()) + { + RequestCopyWriter.AddField(Field); + continue; + } + std::string_view FieldName = Field.GetName(); + if (FieldName == "Pid"sv) + { + continue; + } + if (FieldName == "AcceptFlags"sv) + { + continue; + } + RequestCopyWriter.AddField(FieldName, Field); } - std::string_view FieldName = Field.GetName(); - if (FieldName == "Pid"sv) + if (AdjustedPid != 0) { - continue; + RequestCopyWriter.AddInteger("Pid"sv, AdjustedPid); } - if (FieldName == "AcceptFlags"sv) + if (AdjustedAcceptOptions != RpcAcceptOptions::kNone) { - continue; + RequestCopyWriter.AddInteger("AcceptFlags"sv, static_cast<uint16_t>(AdjustedAcceptOptions)); } - RequestCopyWriter.AddField(FieldName, Field); - } - if (AdjustedPid != 0) - { - RequestCopyWriter.AddInteger("Pid"sv, AdjustedPid); - } - if (AdjustedAcceptOptions != RpcAcceptOptions::kNone) - { - RequestCopyWriter.AddInteger("AcceptFlags"sv, static_cast<uint16_t>(AdjustedAcceptOptions)); - } - if (RequestInfo.ContentType == ZenContentType::kCbPackage) - { - RequestPackage.SetObject(RequestCopyWriter.Save()); - std::vector<IoBuffer> Buffers = FormatPackageMessage(RequestPackage); - std::vector<SharedBuffer> SharedBuffers(Buffers.begin(), Buffers.end()); - Payload = CompositeBuffer(std::move(SharedBuffers)).Flatten().AsIoBuffer(); - } - else - { - RequestCopyWriter.Finalize(); - Payload = IoBuffer(RequestCopyWriter.GetSaveSize()); - RequestCopyWriter.Save(Payload.GetMutableView()); + if (RequestInfo.ContentType == ZenContentType::kCbPackage) + { + RequestPackage.SetObject(RequestCopyWriter.Save()); + std::vector<IoBuffer> Buffers = FormatPackageMessage(RequestPackage); + std::vector<SharedBuffer> SharedBuffers(Buffers.begin(), Buffers.end()); + Payload = CompositeBuffer(std::move(SharedBuffers)).Flatten().AsIoBuffer(); + } + else + { + RequestCopyWriter.Finalize(); + Payload = IoBuffer(RequestCopyWriter.GetSaveSize()); + RequestCopyWriter.Save(Payload.GetMutableView()); + } } - } - - if (!m_DryRun) - { - StringBuilder<32> SessionIdString; - if (RequestInfo.SessionId != Oid::Zero) + if (!m_DryRun) { - RequestInfo.SessionId.ToString(SessionIdString); - } - else - { - GetSessionId().ToString(SessionIdString); - } + StringBuilder<32> SessionIdString; - Session.SetHeader({{"Content-Type", std::string(MapContentTypeToString(RequestInfo.ContentType))}, - {"Accept", std::string(MapContentTypeToString(RequestInfo.AcceptType))}, - {"UE-Session", std::string(SessionIdString)}}); - - uint64_t Offset = 0; - auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, Payload.GetSize() - Offset); - IoBuffer PayloadRange = IoBuffer(Payload, Offset, size); - MutableMemoryView Data(buffer, size); - Data.CopyFrom(PayloadRange.GetView()); - Offset += size; - return true; - }; - Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); - cpr::Response Response = Session.Post(); - BytesSent.fetch_add(Payload.GetSize()); - if (Response.error || !(IsHttpSuccessCode(Response.status_code) || - Response.status_code == gsl::narrow<long>(HttpResponseCode::NotFound))) - { - ZEN_CONSOLE("{}", FormatHttpResponse(Response)); - break; + if (RequestInfo.SessionId != Oid::Zero) + { + RequestInfo.SessionId.ToString(SessionIdString); + } + else + { + GetSessionId().ToString(SessionIdString); + } + + Session.SetHeader({{"Content-Type", std::string(MapContentTypeToString(RequestInfo.ContentType))}, + {"Accept", std::string(MapContentTypeToString(RequestInfo.AcceptType))}, + {"UE-Session", std::string(SessionIdString)}}); + + uint64_t Offset = 0; + auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) { + size = Min<size_t>(size, Payload.GetSize() - Offset); + IoBuffer PayloadRange = IoBuffer(Payload, Offset, size); + MutableMemoryView Data(buffer, size); + Data.CopyFrom(PayloadRange.GetView()); + Offset += size; + return true; + }; + Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); + cpr::Response Response = Session.Post(); + BytesSent.fetch_add(Payload.GetSize()); + if (Response.error || !(IsHttpSuccessCode(Response.status_code) || + Response.status_code == gsl::narrow<long>(HttpResponseCode::NotFound))) + { + ZEN_CONSOLE("{}", FormatHttpResponse(Response)); + break; + } + BytesReceived.fetch_add(Response.downloaded_bytes); } - BytesReceived.fetch_add(Response.downloaded_bytes); } EntryIndex = EntryOffset.fetch_add(m_Stride); |