diff options
| author | Dan Engelbrecht <[email protected]> | 2023-03-21 16:39:49 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-03-21 08:39:49 -0700 |
| commit | ef0c849f5eb82cdce7e799d8833b6970faf2a405 (patch) | |
| tree | d6179272a890115132ce4995d3005a650181c903 | |
| parent | send payloads as duplicated handles (#240) (diff) | |
| download | zen-ef0c849f5eb82cdce7e799d8833b6970faf2a405.tar.xz zen-ef0c849f5eb82cdce7e799d8833b6970faf2a405.zip | |
De/fix rpc replay to handle pid (#243)
* allow access to CbWriter::Save(MutableMemoryView Buffer) in CbObjectWriter to avoid extra memory allocation and copy
* fix entry index counting in rpcreplay
* adjust target pid in rpc requests for rpc replay
* allow control in rpc-record-replay over how attachments are sent
* changelog
| -rw-r--r-- | CHANGELOG.md | 6 | ||||
| -rw-r--r-- | zen/cmds/rpcreplay.cpp | 207 | ||||
| -rw-r--r-- | zen/cmds/rpcreplay.h | 6 | ||||
| -rw-r--r-- | zencore/include/zencore/compactbinarybuilder.h | 12 |
4 files changed, 175 insertions, 56 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index b69085813..1409b276c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,12 @@ - `--stride` The stride to use when selecting requests to playback - `--onhost` Replay the recording inside the zenserver bypassing http overhead - `--showmethodstats` Show statistics of which RPC methods are used + - `--forceallowlocalrefs` Force the requests to allow local references (file path/file handle) + - `--disablelocalrefs` Force disable local references in request (file path/file handle) + - `--forceallowlocalhandlerefs` Force the requests to allow local references via duplicated file handles for requests that allow local refs + - `--disablelocalhandlerefs` Force disable local references via duplicated file handles in requests + - `--forceallowpartiallocalref` Force the requests to allow local references for files that are not saved as whole files for requests that allow local refs + - `--disablepartiallocalrefs` Force disable local references for files that are not saved as whole files for requests that allow local refs - Feature: `--junit` switch to `xmake test` to generate junit style reports of tests. - Feature: CI build on GitHub now uploads junit test reports as artifact to the check for PR validation and mainline validation - Feature: Payloads from zenserver can now be sent using duplicated file handles if caller requests provides client ProcessId (Windows only). diff --git a/zen/cmds/rpcreplay.cpp b/zen/cmds/rpcreplay.cpp index 89a1fe631..e28c27e2d 100644 --- a/zen/cmds/rpcreplay.cpp +++ b/zen/cmds/rpcreplay.cpp @@ -2,9 +2,11 @@ #include "rpcreplay.h" +#include <zencore/compactbinarybuilder.h> #include <zencore/filesystem.h> #include <zencore/logging.h> #include <zencore/scopeguard.h> +#include <zencore/stream.h> #include <zencore/timer.h> #include <zencore/workthreadpool.h> #include <zenhttp/httpcommon.h> @@ -116,6 +118,38 @@ RpcReplayCommand::RpcReplayCommand() cxxopts::value(m_Stride)->default_value("1"), "<stride>"); m_Options.add_option("", "", "numproc", "Number of worker processes", cxxopts::value(m_ProcessCount)->default_value("1"), "<count>"); + m_Options.add_option("", + "", + "forceallowlocalrefs", + "Force enable local refs in requests", + cxxopts::value(m_ForceAllowLocalRefs), + "<enable>"); + m_Options + .add_option("", "", "disablelocalrefs", "Force disable local refs in requests", cxxopts::value(m_DisableLocalRefs), "<enable>"); + m_Options.add_option("", + "", + "forceallowlocalhandlerefs", + "Force enable local refs as handles in requests", + cxxopts::value(m_ForceAllowLocalHandleRef), + "<enable>"); + m_Options.add_option("", + "", + "disablelocalhandlerefs", + "Force disable local refs as handles in requests", + cxxopts::value(m_DisableLocalHandleRefs), + "<enable>"); + m_Options.add_option("", + "", + "forceallowpartiallocalrefs", + "Force enable local refs for all sizes", + cxxopts::value(m_ForceAllowPartialLocalRefs), + "<enable>"); + m_Options.add_option("", + "", + "disablepartiallocalrefs", + "Force disable local refs for all sizes", + cxxopts::value(m_DisablePartialLocalRefs), + "<enable>"); m_Options.parse_positional("path"); } @@ -195,52 +229,71 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) Latch WorkLatch(m_ThreadCount); for (int WorkerIndex = 0; WorkerIndex < m_ThreadCount; ++WorkerIndex) { - WorkerPool.ScheduleWork([HostName = m_HostName, - &WorkLatch, - EntryCount, - &EntryOffset, - &Replayer, - &BytesSent, - &BytesReceived, - &MethodTypes, - &MethodTypesLock, - ShowMethodStats = m_ShowMethodStats, - Stride = m_Stride]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); - - cpr::Session Session; - Session.SetUrl(fmt::format("{}/z$/$rpc"sv, HostName)); - - uint64_t EntryIndex = EntryOffset.fetch_add(Stride) - Stride; - while (EntryIndex < EntryCount) - { - IoBuffer Payload; - std::pair<ZenContentType, ZenContentType> Types = Replayer->GetRequest(EntryIndex, Payload); - ZenContentType RequestContentType = Types.first; - ZenContentType AcceptContentType = Types.second; - - if (ShowMethodStats) + WorkerPool.ScheduleWork( + [this, &WorkLatch, EntryCount, &EntryOffset, &Replayer, &BytesSent, &BytesReceived, &MethodTypes, &MethodTypesLock]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + + cpr::Session Session; + Session.SetUrl(fmt::format("{}/z$/$rpc"sv, m_HostName)); + + uint64_t EntryIndex = EntryOffset.fetch_add(m_Stride); + while (EntryIndex < EntryCount) { - std::string MethodName; + IoBuffer Payload; + std::pair<ZenContentType, ZenContentType> Types = Replayer->GetRequest(EntryIndex, Payload); + ZenContentType RequestContentType = Types.first; + ZenContentType AcceptContentType = Types.second; + + CbPackage RequestPackage; + CbObject Request; switch (RequestContentType) { case ZenContentType::kCbPackage: { - CbPackage Request; - if (ParsePackageMessageWithLegacyFallback(Payload, Request)) + if (ParsePackageMessageWithLegacyFallback(Payload, RequestPackage)) { - MethodName = Request.GetObject()["Method"sv].AsString(); + Request = RequestPackage.GetObject(); } } break; case ZenContentType::kCbObject: { - CbObject Request = LoadCompactBinaryObject(Payload); - MethodName = Request["Method"sv].AsString(); + Request = LoadCompactBinaryObject(Payload); } break; } + + RpcAcceptOptions OriginalAcceptOptions = static_cast<RpcAcceptOptions>(Request["AcceptFlags"sv].AsUInt16(0u)); + int OriginalProcessPid = Request["Pid"sv].AsInt32(0); + + int AdjustedPid = 0; + RpcAcceptOptions AdjustedAcceptOptions = RpcAcceptOptions::kNone; + if (!m_DisableLocalRefs) { + if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowLocalReferences) || m_ForceAllowLocalRefs) + { + AdjustedAcceptOptions |= RpcAcceptOptions::kAllowLocalReferences; + if (!m_DisablePartialLocalRefs) + { + if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowPartialLocalReferences) || + m_ForceAllowPartialLocalRefs) + { + AdjustedAcceptOptions |= RpcAcceptOptions::kAllowPartialLocalReferences; + } + } + if (!m_DisableLocalHandleRefs) + { + if (OriginalProcessPid != 0 || m_ForceAllowLocalHandleRef) + { + AdjustedPid = GetCurrentProcessId(); + } + } + } + } + + if (m_ShowMethodStats) + { + std::string MethodName = std::string(Request["Method"sv].AsString()); RwLock::ExclusiveLockScope __(MethodTypesLock); if (auto It = MethodTypes.find(MethodName); It != MethodTypes.end()) { @@ -251,32 +304,76 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) MethodTypes[MethodName] = 1; } } - } - Session.SetHeader({{"Content-Type", std::string(MapContentTypeToString(RequestContentType))}, - {"Accept", std::string(MapContentTypeToString(AcceptContentType))}}); - uint64_t Offset = 0; - auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, Payload.GetSize() - Offset); - IoBuffer PayloadRange = IoBuffer(Payload, Offset, size); - MutableMemoryView Data(buffer, size); - Data.CopyFrom(PayloadRange.GetView()); - Offset += size; - return true; - }; - Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); - cpr::Response Response = Session.Post(); - BytesSent.fetch_add(Payload.GetSize()); - if (Response.error || - !(IsHttpSuccessCode(Response.status_code) || Response.status_code == gsl::narrow<long>(HttpResponseCode::NotFound))) - { - ZEN_CONSOLE("{}", FormatResponse(Response)); - break; + if (OriginalAcceptOptions != AdjustedAcceptOptions || OriginalProcessPid != AdjustedPid) + { + CbObjectWriter RequestCopyWriter; + for (const CbFieldView& Field : Request) + { + if (!Field.HasName()) + { + RequestCopyWriter.AddField(Field); + continue; + } + std::string_view FieldName = Field.GetName(); + if (FieldName == "Pid"sv) + { + continue; + } + if (FieldName == "AcceptFlags"sv) + { + continue; + } + RequestCopyWriter.AddField(FieldName, Field); + } + if (AdjustedPid != 0) + { + RequestCopyWriter.AddInteger("Pid"sv, AdjustedPid); + } + if (AdjustedAcceptOptions != RpcAcceptOptions::kNone) + { + RequestCopyWriter.AddInteger("AcceptFlags"sv, static_cast<uint16_t>(AdjustedAcceptOptions)); + } + + if (RequestContentType == ZenContentType::kCbPackage) + { + RequestPackage.SetObject(RequestCopyWriter.Save()); + std::vector<IoBuffer> Buffers = FormatPackageMessage(RequestPackage); + std::vector<SharedBuffer> SharedBuffers(Buffers.begin(), Buffers.end()); + Payload = CompositeBuffer(std::move(SharedBuffers)).Flatten().AsIoBuffer(); + } + else + { + RequestCopyWriter.Finalize(); + Payload = IoBuffer(RequestCopyWriter.GetSaveSize()); + RequestCopyWriter.Save(Payload.GetMutableView()); + } + } + + Session.SetHeader({{"Content-Type", std::string(MapContentTypeToString(RequestContentType))}, + {"Accept", std::string(MapContentTypeToString(AcceptContentType))}}); + uint64_t Offset = 0; + auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) { + size = Min<size_t>(size, Payload.GetSize() - Offset); + IoBuffer PayloadRange = IoBuffer(Payload, Offset, size); + MutableMemoryView Data(buffer, size); + Data.CopyFrom(PayloadRange.GetView()); + Offset += size; + return true; + }; + Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); + cpr::Response Response = Session.Post(); + BytesSent.fetch_add(Payload.GetSize()); + if (Response.error || !(IsHttpSuccessCode(Response.status_code) || + Response.status_code == gsl::narrow<long>(HttpResponseCode::NotFound))) + { + ZEN_CONSOLE("{}", FormatResponse(Response)); + break; + } + BytesReceived.fetch_add(Response.downloaded_bytes); + EntryIndex = EntryOffset.fetch_add(m_Stride); } - BytesReceived.fetch_add(Response.downloaded_bytes); - EntryIndex = EntryOffset.fetch_add(Stride) - Stride; - } - }); + }); } while (!WorkLatch.Wait(1000)) diff --git a/zen/cmds/rpcreplay.h b/zen/cmds/rpcreplay.h index f1acf1eac..742e5ec5b 100644 --- a/zen/cmds/rpcreplay.h +++ b/zen/cmds/rpcreplay.h @@ -54,6 +54,12 @@ private: int m_ThreadCount; uint64_t m_Offset; uint64_t m_Stride; + bool m_ForceAllowLocalRefs; + bool m_DisableLocalRefs; + bool m_ForceAllowLocalHandleRef; + bool m_DisableLocalHandleRefs; + bool m_ForceAllowPartialLocalRefs; + bool m_DisablePartialLocalRefs; }; } // namespace zen diff --git a/zencore/include/zencore/compactbinarybuilder.h b/zencore/include/zencore/compactbinarybuilder.h index 5311bbb07..4be8c2ba5 100644 --- a/zencore/include/zencore/compactbinarybuilder.h +++ b/zencore/include/zencore/compactbinarybuilder.h @@ -455,7 +455,17 @@ public: return CbWriter::Save(Writer); } - uint64_t GetSaveSize() = delete; + ZENCORE_API CbFieldViewIterator Save(MutableMemoryView Buffer) + { + ZEN_ASSERT(m_Finalized); + return CbWriter::Save(Buffer); + } + + uint64_t GetSaveSize() + { + ZEN_ASSERT(m_Finalized); + return CbWriter::GetSaveSize(); + } void Finalize() { |