aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-03-21 16:39:49 +0100
committerGitHub <[email protected]>2023-03-21 08:39:49 -0700
commitef0c849f5eb82cdce7e799d8833b6970faf2a405 (patch)
treed6179272a890115132ce4995d3005a650181c903
parentsend payloads as duplicated handles (#240) (diff)
downloadzen-ef0c849f5eb82cdce7e799d8833b6970faf2a405.tar.xz
zen-ef0c849f5eb82cdce7e799d8833b6970faf2a405.zip
De/fix rpc replay to handle pid (#243)
* allow access to CbWriter::Save(MutableMemoryView Buffer) in CbObjectWriter to avoid extra memory allocation and copy * fix entry index counting in rpcreplay * adjust target pid in rpc requests for rpc replay * allow control in rpc-record-replay over how attachments are sent * changelog
-rw-r--r--CHANGELOG.md6
-rw-r--r--zen/cmds/rpcreplay.cpp207
-rw-r--r--zen/cmds/rpcreplay.h6
-rw-r--r--zencore/include/zencore/compactbinarybuilder.h12
4 files changed, 175 insertions, 56 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b69085813..1409b276c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -10,6 +10,12 @@
- `--stride` The stride to use when selecting requests to playback
- `--onhost` Replay the recording inside the zenserver bypassing http overhead
- `--showmethodstats` Show statistics of which RPC methods are used
+ - `--forceallowlocalrefs` Force the requests to allow local references (file path/file handle)
+ - `--disablelocalrefs` Force disable local references in request (file path/file handle)
+ - `--forceallowlocalhandlerefs` Force the requests to allow local references via duplicated file handles for requests that allow local refs
+ - `--disablelocalhandlerefs` Force disable local references via duplicated file handles in requests
+ - `--forceallowpartiallocalref` Force the requests to allow local references for files that are not saved as whole files for requests that allow local refs
+ - `--disablepartiallocalrefs` Force disable local references for files that are not saved as whole files for requests that allow local refs
- Feature: `--junit` switch to `xmake test` to generate junit style reports of tests.
- Feature: CI build on GitHub now uploads junit test reports as artifact to the check for PR validation and mainline validation
- Feature: Payloads from zenserver can now be sent using duplicated file handles if caller requests provides client ProcessId (Windows only).
diff --git a/zen/cmds/rpcreplay.cpp b/zen/cmds/rpcreplay.cpp
index 89a1fe631..e28c27e2d 100644
--- a/zen/cmds/rpcreplay.cpp
+++ b/zen/cmds/rpcreplay.cpp
@@ -2,9 +2,11 @@
#include "rpcreplay.h"
+#include <zencore/compactbinarybuilder.h>
#include <zencore/filesystem.h>
#include <zencore/logging.h>
#include <zencore/scopeguard.h>
+#include <zencore/stream.h>
#include <zencore/timer.h>
#include <zencore/workthreadpool.h>
#include <zenhttp/httpcommon.h>
@@ -116,6 +118,38 @@ RpcReplayCommand::RpcReplayCommand()
cxxopts::value(m_Stride)->default_value("1"),
"<stride>");
m_Options.add_option("", "", "numproc", "Number of worker processes", cxxopts::value(m_ProcessCount)->default_value("1"), "<count>");
+ m_Options.add_option("",
+ "",
+ "forceallowlocalrefs",
+ "Force enable local refs in requests",
+ cxxopts::value(m_ForceAllowLocalRefs),
+ "<enable>");
+ m_Options
+ .add_option("", "", "disablelocalrefs", "Force disable local refs in requests", cxxopts::value(m_DisableLocalRefs), "<enable>");
+ m_Options.add_option("",
+ "",
+ "forceallowlocalhandlerefs",
+ "Force enable local refs as handles in requests",
+ cxxopts::value(m_ForceAllowLocalHandleRef),
+ "<enable>");
+ m_Options.add_option("",
+ "",
+ "disablelocalhandlerefs",
+ "Force disable local refs as handles in requests",
+ cxxopts::value(m_DisableLocalHandleRefs),
+ "<enable>");
+ m_Options.add_option("",
+ "",
+ "forceallowpartiallocalrefs",
+ "Force enable local refs for all sizes",
+ cxxopts::value(m_ForceAllowPartialLocalRefs),
+ "<enable>");
+ m_Options.add_option("",
+ "",
+ "disablepartiallocalrefs",
+ "Force disable local refs for all sizes",
+ cxxopts::value(m_DisablePartialLocalRefs),
+ "<enable>");
m_Options.parse_positional("path");
}
@@ -195,52 +229,71 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
Latch WorkLatch(m_ThreadCount);
for (int WorkerIndex = 0; WorkerIndex < m_ThreadCount; ++WorkerIndex)
{
- WorkerPool.ScheduleWork([HostName = m_HostName,
- &WorkLatch,
- EntryCount,
- &EntryOffset,
- &Replayer,
- &BytesSent,
- &BytesReceived,
- &MethodTypes,
- &MethodTypesLock,
- ShowMethodStats = m_ShowMethodStats,
- Stride = m_Stride]() {
- auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
-
- cpr::Session Session;
- Session.SetUrl(fmt::format("{}/z$/$rpc"sv, HostName));
-
- uint64_t EntryIndex = EntryOffset.fetch_add(Stride) - Stride;
- while (EntryIndex < EntryCount)
- {
- IoBuffer Payload;
- std::pair<ZenContentType, ZenContentType> Types = Replayer->GetRequest(EntryIndex, Payload);
- ZenContentType RequestContentType = Types.first;
- ZenContentType AcceptContentType = Types.second;
-
- if (ShowMethodStats)
+ WorkerPool.ScheduleWork(
+ [this, &WorkLatch, EntryCount, &EntryOffset, &Replayer, &BytesSent, &BytesReceived, &MethodTypes, &MethodTypesLock]() {
+ auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); });
+
+ cpr::Session Session;
+ Session.SetUrl(fmt::format("{}/z$/$rpc"sv, m_HostName));
+
+ uint64_t EntryIndex = EntryOffset.fetch_add(m_Stride);
+ while (EntryIndex < EntryCount)
{
- std::string MethodName;
+ IoBuffer Payload;
+ std::pair<ZenContentType, ZenContentType> Types = Replayer->GetRequest(EntryIndex, Payload);
+ ZenContentType RequestContentType = Types.first;
+ ZenContentType AcceptContentType = Types.second;
+
+ CbPackage RequestPackage;
+ CbObject Request;
switch (RequestContentType)
{
case ZenContentType::kCbPackage:
{
- CbPackage Request;
- if (ParsePackageMessageWithLegacyFallback(Payload, Request))
+ if (ParsePackageMessageWithLegacyFallback(Payload, RequestPackage))
{
- MethodName = Request.GetObject()["Method"sv].AsString();
+ Request = RequestPackage.GetObject();
}
}
break;
case ZenContentType::kCbObject:
{
- CbObject Request = LoadCompactBinaryObject(Payload);
- MethodName = Request["Method"sv].AsString();
+ Request = LoadCompactBinaryObject(Payload);
}
break;
}
+
+ RpcAcceptOptions OriginalAcceptOptions = static_cast<RpcAcceptOptions>(Request["AcceptFlags"sv].AsUInt16(0u));
+ int OriginalProcessPid = Request["Pid"sv].AsInt32(0);
+
+ int AdjustedPid = 0;
+ RpcAcceptOptions AdjustedAcceptOptions = RpcAcceptOptions::kNone;
+ if (!m_DisableLocalRefs)
{
+ if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowLocalReferences) || m_ForceAllowLocalRefs)
+ {
+ AdjustedAcceptOptions |= RpcAcceptOptions::kAllowLocalReferences;
+ if (!m_DisablePartialLocalRefs)
+ {
+ if (EnumHasAnyFlags(OriginalAcceptOptions, RpcAcceptOptions::kAllowPartialLocalReferences) ||
+ m_ForceAllowPartialLocalRefs)
+ {
+ AdjustedAcceptOptions |= RpcAcceptOptions::kAllowPartialLocalReferences;
+ }
+ }
+ if (!m_DisableLocalHandleRefs)
+ {
+ if (OriginalProcessPid != 0 || m_ForceAllowLocalHandleRef)
+ {
+ AdjustedPid = GetCurrentProcessId();
+ }
+ }
+ }
+ }
+
+ if (m_ShowMethodStats)
+ {
+ std::string MethodName = std::string(Request["Method"sv].AsString());
RwLock::ExclusiveLockScope __(MethodTypesLock);
if (auto It = MethodTypes.find(MethodName); It != MethodTypes.end())
{
@@ -251,32 +304,76 @@ RpcReplayCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
MethodTypes[MethodName] = 1;
}
}
- }
- Session.SetHeader({{"Content-Type", std::string(MapContentTypeToString(RequestContentType))},
- {"Accept", std::string(MapContentTypeToString(AcceptContentType))}});
- uint64_t Offset = 0;
- auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) {
- size = Min<size_t>(size, Payload.GetSize() - Offset);
- IoBuffer PayloadRange = IoBuffer(Payload, Offset, size);
- MutableMemoryView Data(buffer, size);
- Data.CopyFrom(PayloadRange.GetView());
- Offset += size;
- return true;
- };
- Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
- cpr::Response Response = Session.Post();
- BytesSent.fetch_add(Payload.GetSize());
- if (Response.error ||
- !(IsHttpSuccessCode(Response.status_code) || Response.status_code == gsl::narrow<long>(HttpResponseCode::NotFound)))
- {
- ZEN_CONSOLE("{}", FormatResponse(Response));
- break;
+ if (OriginalAcceptOptions != AdjustedAcceptOptions || OriginalProcessPid != AdjustedPid)
+ {
+ CbObjectWriter RequestCopyWriter;
+ for (const CbFieldView& Field : Request)
+ {
+ if (!Field.HasName())
+ {
+ RequestCopyWriter.AddField(Field);
+ continue;
+ }
+ std::string_view FieldName = Field.GetName();
+ if (FieldName == "Pid"sv)
+ {
+ continue;
+ }
+ if (FieldName == "AcceptFlags"sv)
+ {
+ continue;
+ }
+ RequestCopyWriter.AddField(FieldName, Field);
+ }
+ if (AdjustedPid != 0)
+ {
+ RequestCopyWriter.AddInteger("Pid"sv, AdjustedPid);
+ }
+ if (AdjustedAcceptOptions != RpcAcceptOptions::kNone)
+ {
+ RequestCopyWriter.AddInteger("AcceptFlags"sv, static_cast<uint16_t>(AdjustedAcceptOptions));
+ }
+
+ if (RequestContentType == ZenContentType::kCbPackage)
+ {
+ RequestPackage.SetObject(RequestCopyWriter.Save());
+ std::vector<IoBuffer> Buffers = FormatPackageMessage(RequestPackage);
+ std::vector<SharedBuffer> SharedBuffers(Buffers.begin(), Buffers.end());
+ Payload = CompositeBuffer(std::move(SharedBuffers)).Flatten().AsIoBuffer();
+ }
+ else
+ {
+ RequestCopyWriter.Finalize();
+ Payload = IoBuffer(RequestCopyWriter.GetSaveSize());
+ RequestCopyWriter.Save(Payload.GetMutableView());
+ }
+ }
+
+ Session.SetHeader({{"Content-Type", std::string(MapContentTypeToString(RequestContentType))},
+ {"Accept", std::string(MapContentTypeToString(AcceptContentType))}});
+ uint64_t Offset = 0;
+ auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) {
+ size = Min<size_t>(size, Payload.GetSize() - Offset);
+ IoBuffer PayloadRange = IoBuffer(Payload, Offset, size);
+ MutableMemoryView Data(buffer, size);
+ Data.CopyFrom(PayloadRange.GetView());
+ Offset += size;
+ return true;
+ };
+ Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
+ cpr::Response Response = Session.Post();
+ BytesSent.fetch_add(Payload.GetSize());
+ if (Response.error || !(IsHttpSuccessCode(Response.status_code) ||
+ Response.status_code == gsl::narrow<long>(HttpResponseCode::NotFound)))
+ {
+ ZEN_CONSOLE("{}", FormatResponse(Response));
+ break;
+ }
+ BytesReceived.fetch_add(Response.downloaded_bytes);
+ EntryIndex = EntryOffset.fetch_add(m_Stride);
}
- BytesReceived.fetch_add(Response.downloaded_bytes);
- EntryIndex = EntryOffset.fetch_add(Stride) - Stride;
- }
- });
+ });
}
while (!WorkLatch.Wait(1000))
diff --git a/zen/cmds/rpcreplay.h b/zen/cmds/rpcreplay.h
index f1acf1eac..742e5ec5b 100644
--- a/zen/cmds/rpcreplay.h
+++ b/zen/cmds/rpcreplay.h
@@ -54,6 +54,12 @@ private:
int m_ThreadCount;
uint64_t m_Offset;
uint64_t m_Stride;
+ bool m_ForceAllowLocalRefs;
+ bool m_DisableLocalRefs;
+ bool m_ForceAllowLocalHandleRef;
+ bool m_DisableLocalHandleRefs;
+ bool m_ForceAllowPartialLocalRefs;
+ bool m_DisablePartialLocalRefs;
};
} // namespace zen
diff --git a/zencore/include/zencore/compactbinarybuilder.h b/zencore/include/zencore/compactbinarybuilder.h
index 5311bbb07..4be8c2ba5 100644
--- a/zencore/include/zencore/compactbinarybuilder.h
+++ b/zencore/include/zencore/compactbinarybuilder.h
@@ -455,7 +455,17 @@ public:
return CbWriter::Save(Writer);
}
- uint64_t GetSaveSize() = delete;
+ ZENCORE_API CbFieldViewIterator Save(MutableMemoryView Buffer)
+ {
+ ZEN_ASSERT(m_Finalized);
+ return CbWriter::Save(Buffer);
+ }
+
+ uint64_t GetSaveSize()
+ {
+ ZEN_ASSERT(m_Finalized);
+ return CbWriter::GetSaveSize();
+ }
void Finalize()
{