aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-03-14 14:36:18 +0100
committerGitHub <[email protected]>2023-03-14 06:36:18 -0700
commitfea4fa0095668e392aa3333450e93afc1784762b (patch)
treec80ffdf3824ba75ee9b7c312010cdb84c48aae46
parentremoved catch2 (#241) (diff)
downloadzen-fea4fa0095668e392aa3333450e93afc1784762b.tar.xz
zen-fea4fa0095668e392aa3333450e93afc1784762b.zip
send payloads as duplicated handles (#240)
* send payloads as duplicated handles if requestor provides process id and allows local file references. * linux/macos fixes * tests * fix access rights when duplicating handle * fix closing of duplicated handles on error * cleanup * changelog
-rw-r--r--CHANGELOG.md1
-rw-r--r--zenhttp/httpshared.cpp154
-rw-r--r--zenhttp/include/zenhttp/httpshared.h13
-rw-r--r--zenserver-test/zenserver-test.cpp163
-rw-r--r--zenserver/cache/structuredcache.cpp48
-rw-r--r--zenserver/cache/structuredcache.h3
-rw-r--r--zenutil/cache/cacherequests.cpp36
-rw-r--r--zenutil/include/zenutil/cache/cacherequests.h6
8 files changed, 367 insertions, 57 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index fc6ec9420..b69085813 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -12,6 +12,7 @@
- `--showmethodstats` Show statistics of which RPC methods are used
- Feature: `--junit` switch to `xmake test` to generate junit style reports of tests.
- Feature: CI build on GitHub now uploads junit test reports as artifact to the check for PR validation and mainline validation
+- Feature: Payloads from zenserver can now be sent using duplicated file handles if caller requests provides client ProcessId (Windows only).
- Bugfix: Make sure async responses are sent async correctly in httpsys
- Improvement: FileCas now keeps an up to date index of all the entries improving performance when getting cache misses on large payloads
- Improvement: Structured cache now keeps RawHash and RawSize in memory avoiding materialization of cache values before sending response
diff --git a/zenhttp/httpshared.cpp b/zenhttp/httpshared.cpp
index b6346413f..7aade56d2 100644
--- a/zenhttp/httpshared.cpp
+++ b/zenhttp/httpshared.cpp
@@ -10,6 +10,7 @@
#include <zencore/iobuffer.h>
#include <zencore/iohash.h>
#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
#include <zencore/stream.h>
#include <zencore/testing.h>
#include <zencore/testutils.h>
@@ -23,21 +24,23 @@ ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
+const std::string_view HandlePrefix(":?#:");
+
std::vector<IoBuffer>
-FormatPackageMessage(const CbPackage& Data)
+FormatPackageMessage(const CbPackage& Data, int TargetProcessPid)
{
- return FormatPackageMessage(Data, FormatFlags::kDefault);
+ return FormatPackageMessage(Data, FormatFlags::kDefault, TargetProcessPid);
}
CompositeBuffer
-FormatPackageMessageBuffer(const CbPackage& Data)
+FormatPackageMessageBuffer(const CbPackage& Data, int TargetProcessPid)
{
- return FormatPackageMessageBuffer(Data, FormatFlags::kDefault);
+ return FormatPackageMessageBuffer(Data, FormatFlags::kDefault, TargetProcessPid);
}
CompositeBuffer
-FormatPackageMessageBuffer(const CbPackage& Data, FormatFlags Flags)
+FormatPackageMessageBuffer(const CbPackage& Data, FormatFlags Flags, int TargetProcessPid)
{
- std::vector<IoBuffer> Message = FormatPackageMessage(Data, Flags);
+ std::vector<IoBuffer> Message = FormatPackageMessage(Data, Flags, TargetProcessPid);
std::vector<SharedBuffer> Buffers;
@@ -50,8 +53,44 @@ FormatPackageMessageBuffer(const CbPackage& Data, FormatFlags Flags)
}
std::vector<IoBuffer>
-FormatPackageMessage(const CbPackage& Data, FormatFlags Flags)
+FormatPackageMessage(const CbPackage& Data, FormatFlags Flags, int TargetProcessPid)
{
+ void* TargetProcessHandle = nullptr;
+#if ZEN_PLATFORM_WINDOWS
+ std::vector<HANDLE> DuplicatedHandles;
+ auto _ = MakeGuard([&DuplicatedHandles, &TargetProcessHandle]() {
+ if (TargetProcessHandle == nullptr)
+ {
+ return;
+ }
+
+ for (HANDLE DuplicatedHandle : DuplicatedHandles)
+ {
+ HANDLE ClosingHandle;
+ if (::DuplicateHandle((HANDLE)TargetProcessHandle,
+ DuplicatedHandle,
+ GetCurrentProcess(),
+ &ClosingHandle,
+ 0,
+ FALSE,
+ DUPLICATE_CLOSE_SOURCE | DUPLICATE_SAME_ACCESS) == TRUE)
+ {
+ ::CloseHandle(ClosingHandle);
+ }
+ }
+ ::CloseHandle((HANDLE)TargetProcessHandle);
+ TargetProcessHandle = nullptr;
+ });
+
+ if (EnumHasAllFlags(Flags, FormatFlags::kAllowLocalReferences) && TargetProcessPid != 0)
+ {
+ TargetProcessHandle = OpenProcess(PROCESS_DUP_HANDLE, FALSE, TargetProcessPid);
+ }
+#else
+ ZEN_UNUSED(TargetProcessPid);
+ void* DuplicatedHandles = nullptr;
+#endif // ZEN_PLATFORM_WINDOWS
+
const std::span<const CbAttachment>& Attachments = Data.GetAttachments();
std::vector<IoBuffer> ResponseBuffers;
@@ -99,10 +138,11 @@ FormatPackageMessage(const CbPackage& Data, FormatFlags Flags)
tsl::robin_map<void*, std::string> FileNameMap;
- auto IsLocalRef = [&FileNameMap](const CompositeBuffer& AttachmentBinary,
- bool DenyPartialLocalReferences,
- CbAttachmentReferenceHeader& LocalRef,
- std::string& Path8) -> bool {
+ auto IsLocalRef = [&FileNameMap, &DuplicatedHandles](const CompositeBuffer& AttachmentBinary,
+ bool DenyPartialLocalReferences,
+ void* TargetProcessHandle,
+ CbAttachmentReferenceHeader& LocalRef,
+ std::string& Path8) -> bool {
const SharedBuffer& Segment = AttachmentBinary.GetSegments().front();
IoBufferFileReference Ref;
const IoBuffer& SegmentBuffer = Segment.AsIoBuffer();
@@ -123,9 +163,36 @@ FormatPackageMessage(const CbPackage& Data, FormatFlags Flags)
}
else
{
- ExtendablePathBuilder<256> LocalRefFile;
- LocalRefFile.Append(std::filesystem::absolute(PathFromHandle(Ref.FileHandle)));
- Path8 = LocalRefFile.ToUtf8();
+ bool UseFilePath = true;
+#if ZEN_PLATFORM_WINDOWS
+ if (TargetProcessHandle != nullptr)
+ {
+ HANDLE TargetHandle = INVALID_HANDLE_VALUE;
+ BOOL OK = ::DuplicateHandle(GetCurrentProcess(),
+ Ref.FileHandle,
+ (HANDLE)TargetProcessHandle,
+ &TargetHandle,
+ FILE_GENERIC_READ,
+ FALSE,
+ 0);
+ if (OK)
+ {
+ DuplicatedHandles.push_back(TargetHandle);
+ Path8 = fmt::format("{}{}", HandlePrefix, reinterpret_cast<uint64_t>(TargetHandle));
+ UseFilePath = false;
+ }
+ }
+#else // ZEN_PLATFORM_WINDOWS
+ ZEN_UNUSED(TargetProcessHandle);
+ // Not supported on Linux/Mac. Could potentially use pidfd_getfd() but that requires a fairly new Linux kernel/includes and to
+ // deal with acceess rights etc.
+#endif // ZEN_PLATFORM_WINDOWS
+ if (UseFilePath)
+ {
+ ExtendablePathBuilder<256> LocalRefFile;
+ LocalRefFile.Append(std::filesystem::absolute(PathFromHandle(Ref.FileHandle)));
+ Path8 = LocalRefFile.ToUtf8();
+ }
FileNameMap.insert_or_assign(Ref.FileHandle, Path8);
}
@@ -159,14 +226,18 @@ FormatPackageMessage(const CbPackage& Data, FormatFlags Flags)
if (MarshalByLocalRef)
{
- MarshalByLocalRef = IsLocalRef(Compressed, DenyPartialLocalReferences, LocalRef, Path8);
+ MarshalByLocalRef = IsLocalRef(Compressed, DenyPartialLocalReferences, TargetProcessHandle, LocalRef, Path8);
}
if (MarshalByLocalRef)
{
const bool IsCompressed = true;
+ bool IsHandle = false;
+#if ZEN_PLATFORM_WINDOWS
+ IsHandle = Path8.starts_with(HandlePrefix);
+#endif
MarshalLocal(Path8, LocalRef, AttachmentHash, IsCompressed);
- ZEN_DEBUG("Marshalled '{}' as file of {} bytes", Path8, Compressed.GetSize());
+ ZEN_DEBUG("Marshalled '{}' as file {} of {} bytes", Path8, IsHandle ? "handle" : "path", Compressed.GetSize());
}
else
{
@@ -201,14 +272,18 @@ FormatPackageMessage(const CbPackage& Data, FormatFlags Flags)
if (MarshalByLocalRef)
{
- MarshalByLocalRef = IsLocalRef(AttachmentBinary, DenyPartialLocalReferences, LocalRef, Path8);
+ MarshalByLocalRef = IsLocalRef(AttachmentBinary, DenyPartialLocalReferences, TargetProcessHandle, LocalRef, Path8);
}
if (MarshalByLocalRef)
{
const bool IsCompressed = false;
+ bool IsHandle = false;
+#if ZEN_PLATFORM_WINDOWS
+ IsHandle = Path8.starts_with(HandlePrefix);
+#endif
MarshalLocal(Path8, LocalRef, AttachmentHash, IsCompressed);
- ZEN_DEBUG("Marshalled '{}' as file of {} bytes", Path8, AttachmentBinary.GetSize());
+ ZEN_DEBUG("Marshalled '{}' as file {} of {} bytes", Path8, IsHandle ? "handle" : "path", AttachmentBinary.GetSize());
}
else
{
@@ -226,6 +301,9 @@ FormatPackageMessage(const CbPackage& Data, FormatFlags Flags)
}
}
FileNameMap.clear();
+#if ZEN_PLATFORM_WINDOWS
+ DuplicatedHandles.clear();
+#endif // ZEN_PLATFORM_WINDOWS
return ResponseBuffers;
}
@@ -282,6 +360,8 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint
tsl::robin_map<std::string, IoBuffer> PartialFileBuffers;
+ // TODO: Throwing before this loop completes could result in leaking handles as we might not have picked up all the handles in the
+ // message
for (uint32_t i = 0; i < ChunkCount; ++i)
{
const CbAttachmentEntry& Entry = AttachmentEntries[i];
@@ -297,20 +377,46 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint
ZEN_ASSERT(AttachmentBuffer.Size() >= sizeof(CbAttachmentReferenceHeader));
const CbAttachmentReferenceHeader* AttachRefHdr = AttachmentBuffer.Data<CbAttachmentReferenceHeader>();
- const char8_t* PathPointer = reinterpret_cast<const char8_t*>(AttachRefHdr + 1);
+ const char* PathPointer = reinterpret_cast<const char*>(AttachRefHdr + 1);
ZEN_ASSERT(AttachmentBuffer.Size() >= (sizeof(CbAttachmentReferenceHeader) + AttachRefHdr->AbsolutePathLength));
-
- std::filesystem::path Path{std::u8string_view(PathPointer, AttachRefHdr->AbsolutePathLength)};
+ std::string_view PathView(PathPointer, AttachRefHdr->AbsolutePathLength);
IoBuffer FullFileBuffer;
+
+ std::filesystem::path Path(Utf8ToWide(PathView));
if (auto It = PartialFileBuffers.find(Path.string()); It != PartialFileBuffers.end())
{
FullFileBuffer = It->second;
}
else
{
- FullFileBuffer = PartialFileBuffers.insert_or_assign(Path.string(), IoBufferBuilder::MakeFromFile(Path)).first->second;
+ if (PathView.starts_with(HandlePrefix))
+ {
+#if ZEN_PLATFORM_WINDOWS
+ std::string_view HandleString(PathView.substr(HandlePrefix.length()));
+ std::optional<uint64_t> HandleNumber(ParseInt<uint64_t>(HandleString));
+ if (HandleNumber.has_value())
+ {
+ HANDLE FileHandle = HANDLE(HandleNumber.value());
+ ULARGE_INTEGER liFileSize;
+ liFileSize.LowPart = ::GetFileSize(FileHandle, &liFileSize.HighPart);
+ if (liFileSize.LowPart != INVALID_FILE_SIZE)
+ {
+ FullFileBuffer = IoBuffer(IoBuffer::File, (void*)FileHandle, 0, uint64_t(liFileSize.QuadPart));
+ PartialFileBuffers.insert_or_assign(Path.string(), FullFileBuffer);
+ }
+ }
+#else // ZEN_PLATFORM_WINDOWS
+ // Not supported on Linux/Mac. Could potentially use pidfd_getfd() but that requires a fairly new Linux kernel/includes
+ // and to deal with acceess rights etc.
+ ZEN_ASSERT(false);
+#endif // ZEN_PLATFORM_WINDOWS
+ }
+ else
+ {
+ FullFileBuffer = PartialFileBuffers.insert_or_assign(Path.string(), IoBufferBuilder::MakeFromFile(Path)).first->second;
+ }
}
if (!FullFileBuffer)
@@ -318,7 +424,7 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint
// Unable to open chunk reference
throw std::runtime_error(fmt::format("unable to resolve chunk #{} at '{}' (offset {}, size {})",
i,
- PathToUtf8(Path),
+ Path,
AttachRefHdr->PayloadByteOffset,
AttachRefHdr->PayloadByteSize));
}
@@ -332,7 +438,7 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint
{
throw std::runtime_error(fmt::format("invalid format for chunk #{} at '{}' (offset {}, size {})",
i,
- PathToUtf8(Path),
+ Path,
AttachRefHdr->PayloadByteOffset,
AttachRefHdr->PayloadByteSize));
}
diff --git a/zenhttp/include/zenhttp/httpshared.h b/zenhttp/include/zenhttp/httpshared.h
index 7ab9c9339..d335572c5 100644
--- a/zenhttp/include/zenhttp/httpshared.h
+++ b/zenhttp/include/zenhttp/httpshared.h
@@ -87,14 +87,15 @@ gsl_DEFINE_ENUM_BITMASK_OPERATORS(FormatFlags);
enum class RpcAcceptOptions : uint16_t
{
- kNone = 0,
- kAllowLocalReferences = (1u << 0),
+ kNone = 0,
+ kAllowLocalReferences = (1u << 0),
+ kAllowPartialLocalReferences = (1u << 1)
};
gsl_DEFINE_ENUM_BITMASK_OPERATORS(RpcAcceptOptions);
-std::vector<IoBuffer> FormatPackageMessage(const CbPackage& Data, FormatFlags Flags);
-CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data, FormatFlags Flags);
+std::vector<IoBuffer> FormatPackageMessage(const CbPackage& Data, FormatFlags Flags, int TargetProcessPid = 0);
+CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data, FormatFlags Flags, int TargetProcessPid = 0);
CbPackage ParsePackageMessage(
IoBuffer Payload,
std::function<IoBuffer(const IoHash& Cid, uint64_t Size)> CreateBuffer = [](const IoHash&, uint64_t Size) -> IoBuffer {
@@ -104,8 +105,8 @@ bool IsPackageMessage(IoBuffer Payload);
bool ParsePackageMessageWithLegacyFallback(const IoBuffer& Response, CbPackage& OutPackage);
-std::vector<IoBuffer> FormatPackageMessage(const CbPackage& Data);
-CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data);
+std::vector<IoBuffer> FormatPackageMessage(const CbPackage& Data, int TargetProcessPid = 0);
+CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data, int TargetProcessPid = 0);
/** Streaming reader for compact binary packages
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp
index c1d2e8838..079f70984 100644
--- a/zenserver-test/zenserver-test.cpp
+++ b/zenserver-test/zenserver-test.cpp
@@ -1334,11 +1334,16 @@ TEST_CASE("zcache.rpc")
CachePolicy RecordPolicy) {
std::vector<uint8_t> Data;
Data.resize(PayloadSize);
- for (size_t Idx = 0; Idx < PayloadSize; ++Idx)
+ uint32_t DataSeed = *reinterpret_cast<const uint32_t*>(&CacheKey.Hash.Hash[0]);
+ uint16_t* DataPtr = reinterpret_cast<uint16_t*>(Data.data());
+ for (size_t Idx = 0; Idx < PayloadSize / 2; ++Idx)
{
- Data[Idx] = Idx % 255;
+ DataPtr[Idx] = static_cast<uint16_t>((Idx + DataSeed) % 0xffffu);
+ }
+ if (PayloadSize & 1)
+ {
+ Data[PayloadSize - 1] = static_cast<uint8_t>((PayloadSize - 1) & 0xff);
}
-
CompressedBuffer Value = zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size()));
Request.Requests.push_back({.Key = CacheKey, .Values = {{.Id = Oid::NewOid(), .Body = std::move(Value)}}, .Policy = RecordPolicy});
};
@@ -1347,13 +1352,14 @@ TEST_CASE("zcache.rpc")
std::string_view Namespace,
std::string_view Bucket,
size_t Num,
- size_t PayloadSize = 1024) -> std::vector<CacheKey> {
+ size_t PayloadSize = 1024,
+ size_t KeyOffset = 1) -> std::vector<CacheKey> {
std::vector<zen::CacheKey> OutKeys;
for (uint32_t Key = 1; Key <= Num; ++Key)
{
zen::IoHash KeyHash;
- ((uint32_t*)(KeyHash.Hash))[0] = Key;
+ ((uint32_t*)(KeyHash.Hash))[0] = gsl::narrow<uint32_t>(KeyOffset + Key);
const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash);
cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)};
@@ -1384,8 +1390,12 @@ TEST_CASE("zcache.rpc")
auto GetCacheRecords = [](std::string_view BaseUri,
std::string_view Namespace,
std::span<zen::CacheKey> Keys,
- zen::CachePolicy Policy) -> GetCacheRecordResult {
+ zen::CachePolicy Policy,
+ zen::RpcAcceptOptions AcceptOptions = zen::RpcAcceptOptions::kNone,
+ int Pid = 0) -> GetCacheRecordResult {
cacherequests::GetCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic,
+ .AcceptOptions = static_cast<uint16_t>(AcceptOptions),
+ .ProcessPid = Pid,
.DefaultPolicy = Policy,
.Namespace = std::string(Namespace)};
for (const CacheKey& Key : Keys)
@@ -1547,6 +1557,147 @@ TEST_CASE("zcache.rpc")
CHECK(Record->Key == ExpectedKey);
}
}
+
+ SUBCASE("RpcAcceptOptions")
+ {
+ using namespace utils;
+
+ std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
+ const uint16_t PortNumber = 13337;
+ const auto BaseUri = fmt::format("http://localhost:{}/z$", PortNumber);
+
+ ZenServerInstance Inst(TestEnv);
+ Inst.SetTestDir(TestDir);
+ Inst.SpawnServer(PortNumber);
+ Inst.WaitUntilReady();
+
+ std::vector<zen::CacheKey> SmallKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 4, 1024);
+ std::vector<zen::CacheKey> LargeKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 4, 1024 * 1024 * 16, SmallKeys.size());
+
+ std::vector<zen::CacheKey> Keys(SmallKeys.begin(), SmallKeys.end());
+ Keys.insert(Keys.end(), LargeKeys.begin(), LargeKeys.end());
+
+ {
+ GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, CachePolicy::Default);
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ CHECK(Record);
+ const CacheKey& ExpectedKey = Keys[Index++];
+ CHECK(Record->Key == ExpectedKey);
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer();
+ IoBufferFileReference Ref;
+ bool IsFileRef = Body.GetFileReference(Ref);
+ CHECK(!IsFileRef);
+ }
+ }
+ }
+
+ // File path, but only for large files
+ {
+ GetCacheRecordResult Result =
+ GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, CachePolicy::Default, RpcAcceptOptions::kAllowLocalReferences);
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ CHECK(Record);
+ const CacheKey& ExpectedKey = Keys[Index++];
+ CHECK(Record->Key == ExpectedKey);
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer();
+ IoBufferFileReference Ref;
+ bool IsFileRef = Body.GetFileReference(Ref);
+ CHECK(IsFileRef == (Body.Size() > 1024));
+ }
+ }
+ }
+
+ // File path, for all files
+ {
+ GetCacheRecordResult Result =
+ GetCacheRecords(BaseUri,
+ "ue4.ddc"sv,
+ Keys,
+ CachePolicy::Default,
+ RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialLocalReferences);
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ CHECK(Record);
+ const CacheKey& ExpectedKey = Keys[Index++];
+ CHECK(Record->Key == ExpectedKey);
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer();
+ IoBufferFileReference Ref;
+ bool IsFileRef = Body.GetFileReference(Ref);
+ CHECK(IsFileRef);
+ }
+ }
+ }
+
+ // File handle, but only for large files
+ {
+ GetCacheRecordResult Result = GetCacheRecords(BaseUri,
+ "ue4.ddc"sv,
+ Keys,
+ CachePolicy::Default,
+ RpcAcceptOptions::kAllowLocalReferences,
+ GetCurrentProcessId());
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ CHECK(Record);
+ const CacheKey& ExpectedKey = Keys[Index++];
+ CHECK(Record->Key == ExpectedKey);
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer();
+ IoBufferFileReference Ref;
+ bool IsFileRef = Body.GetFileReference(Ref);
+ CHECK(IsFileRef == (Body.Size() > 1024));
+ }
+ }
+ }
+
+ // File handle, for all files
+ {
+ GetCacheRecordResult Result =
+ GetCacheRecords(BaseUri,
+ "ue4.ddc"sv,
+ Keys,
+ CachePolicy::Default,
+ RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialLocalReferences,
+ GetCurrentProcessId());
+
+ CHECK(Result.Result.Results.size() == Keys.size());
+
+ for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results)
+ {
+ CHECK(Record);
+ const CacheKey& ExpectedKey = Keys[Index++];
+ CHECK(Record->Key == ExpectedKey);
+ for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values)
+ {
+ const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer();
+ IoBufferFileReference Ref;
+ bool IsFileRef = Body.GetFileReference(Ref);
+ CHECK(IsFileRef);
+ }
+ }
+ }
+ }
}
TEST_CASE("zcache.failing.upstream")
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 154676d77..8539d9c16 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -1306,7 +1306,8 @@ CbPackage
HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType,
IoBuffer&& Body,
uint32_t& OutAcceptMagic,
- RpcAcceptOptions& OutAcceptFlags)
+ RpcAcceptOptions& OutAcceptFlags,
+ int& OutTargetProcessId)
{
CbPackage Package;
CbObjectView Object;
@@ -1321,8 +1322,9 @@ HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType,
Package = ParsePackageMessage(Body);
Object = Package.GetObject();
}
- OutAcceptMagic = Object["Accept"sv].AsUInt32();
- OutAcceptFlags = static_cast<RpcAcceptOptions>(Object["AcceptFlags"sv].AsUInt16(0u));
+ OutAcceptMagic = Object["Accept"sv].AsUInt32();
+ OutAcceptFlags = static_cast<RpcAcceptOptions>(Object["AcceptFlags"sv].AsUInt16(0u));
+ OutTargetProcessId = Object["Pid"sv].AsInt32(0);
const std::string_view Method = Object["Method"sv].AsString();
@@ -1367,14 +1369,20 @@ HttpStructuredCacheService::ReplayRequestRecorder(cache::IRpcRequestReplayer& Re
{
uint32_t AcceptMagic = 0;
RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
- CbPackage RpcResult = HandleRpcRequest(ContentType.first, std::move(Body), AcceptMagic, AcceptFlags);
+ int TargetPid = 0;
+ CbPackage RpcResult = HandleRpcRequest(ContentType.first, std::move(Body), AcceptMagic, AcceptFlags, TargetPid);
if (AcceptMagic == kCbPkgMagic)
{
- bool AllowFileReferences = EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences);
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(
- RpcResult,
- AllowFileReferences ? FormatFlags::kAllowLocalReferences | FormatFlags::kDenyPartialLocalReferences
- : FormatFlags::kDefault);
+ FormatFlags Flags = FormatFlags::kDefault;
+ if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences))
+ {
+ Flags |= FormatFlags::kAllowLocalReferences;
+ if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences))
+ {
+ Flags |= FormatFlags::kDenyPartialLocalReferences;
+ }
+ }
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetPid);
ZEN_ASSERT(RpcResponseBuffer.GetSize() > 0);
}
else
@@ -1417,9 +1425,10 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
[this, Body = Request.ReadPayload(), ContentType, AcceptType](HttpServerRequest& AsyncRequest) mutable {
std::uint64_t RequestIndex =
m_RequestRecorder ? m_RequestRecorder->RecordRequest(ContentType, AcceptType, Body) : ~0ull;
- uint32_t AcceptMagic = 0;
- RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
- CbPackage RpcResult = HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags);
+ uint32_t AcceptMagic = 0;
+ RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone;
+ int TargetProcessId = 0;
+ CbPackage RpcResult = HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags, TargetProcessId);
if (RpcResult.IsNull())
{
AsyncRequest.WriteResponse(HttpResponseCode::BadRequest);
@@ -1427,11 +1436,16 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request)
}
if (AcceptMagic == kCbPkgMagic)
{
- bool AllowFileReferences = EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences);
- CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(
- RpcResult,
- AllowFileReferences ? FormatFlags::kAllowLocalReferences | FormatFlags::kDenyPartialLocalReferences
- : FormatFlags::kDefault);
+ FormatFlags Flags = FormatFlags::kDefault;
+ if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences))
+ {
+ Flags |= FormatFlags::kAllowLocalReferences;
+ if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences))
+ {
+ Flags |= FormatFlags::kDenyPartialLocalReferences;
+ }
+ }
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessId);
if (RequestIndex != ~0ull)
{
ZEN_ASSERT(m_RequestRecorder);
diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h
index f606a1cf5..e9c58c3d6 100644
--- a/zenserver/cache/structuredcache.h
+++ b/zenserver/cache/structuredcache.h
@@ -119,7 +119,8 @@ private:
CbPackage HandleRpcRequest(const ZenContentType ContentType,
IoBuffer&& Body,
uint32_t& OutAcceptMagic,
- RpcAcceptOptions& OutAcceptFlags);
+ RpcAcceptOptions& OutAcceptFlags,
+ int& OutTargetProcessId);
void HandleCacheRequest(HttpServerRequest& Request);
void HandleCacheNamespaceRequest(HttpServerRequest& Request, std::string_view Namespace);
diff --git a/zenutil/cache/cacherequests.cpp b/zenutil/cache/cacherequests.cpp
index 5cbb2e04b..4c865ec22 100644
--- a/zenutil/cache/cacherequests.cpp
+++ b/zenutil/cache/cacherequests.cpp
@@ -332,7 +332,9 @@ namespace cacherequests {
bool GetCacheRecordsRequest::Parse(const CbObjectView& RpcRequest)
{
ZEN_ASSERT(RpcRequest["Method"].AsString() == "GetCacheRecords");
- AcceptMagic = RpcRequest["AcceptType"].AsUInt32(0);
+ AcceptMagic = RpcRequest["AcceptType"].AsUInt32(0);
+ AcceptOptions = RpcRequest["AcceptFlags"].AsUInt16(0);
+ ProcessPid = RpcRequest["Pid"].AsInt32(0);
CbObjectView Params = RpcRequest["Params"].AsObjectView();
std::optional<std::string> RequestNamespace = GetRequestNamespace(Params);
@@ -373,6 +375,14 @@ namespace cacherequests {
{
Writer << "Accept" << AcceptMagic;
}
+ if (AcceptOptions != 0)
+ {
+ Writer << "AcceptFlags" << AcceptOptions;
+ }
+ if (ProcessPid != 0)
+ {
+ Writer << "Pid" << ProcessPid;
+ }
Writer.BeginObject("Params");
{
@@ -656,7 +666,9 @@ namespace cacherequests {
bool GetCacheValuesRequest::Parse(const CbObjectView& BatchObject)
{
ZEN_ASSERT(BatchObject["Method"].AsString() == "GetCacheValues");
- AcceptMagic = BatchObject["AcceptType"].AsUInt32(0);
+ AcceptMagic = BatchObject["AcceptType"].AsUInt32(0);
+ AcceptOptions = BatchObject["AcceptFlags"].AsUInt16(0);
+ ProcessPid = BatchObject["Pid"].AsInt32(0);
CbObjectView Params = BatchObject["Params"].AsObjectView();
std::optional<std::string> RequestNamespace = cacherequests::GetRequestNamespace(Params);
@@ -696,6 +708,14 @@ namespace cacherequests {
{
Writer << "Accept" << AcceptMagic;
}
+ if (AcceptOptions != 0)
+ {
+ Writer << "AcceptFlags" << AcceptOptions;
+ }
+ if (ProcessPid != 0)
+ {
+ Writer << "Pid" << ProcessPid;
+ }
Writer.BeginObject("Params");
{
@@ -817,7 +837,9 @@ namespace cacherequests {
bool GetCacheChunksRequest::Parse(const CbObjectView& BatchObject)
{
ZEN_ASSERT(BatchObject["Method"].AsString() == "GetCacheChunks");
- AcceptMagic = BatchObject["AcceptType"].AsUInt32(0);
+ AcceptMagic = BatchObject["AcceptType"].AsUInt32(0);
+ AcceptOptions = BatchObject["AcceptFlags"].AsUInt16(0);
+ ProcessPid = BatchObject["Pid"].AsInt32(0);
CbObjectView Params = BatchObject["Params"].AsObjectView();
std::optional<std::string> RequestNamespace = cacherequests::GetRequestNamespace(Params);
@@ -862,6 +884,14 @@ namespace cacherequests {
{
Writer << "Accept" << AcceptMagic;
}
+ if (AcceptOptions != 0)
+ {
+ Writer << "AcceptFlags" << AcceptOptions;
+ }
+ if (ProcessPid != 0)
+ {
+ Writer << "Pid" << ProcessPid;
+ }
Writer.BeginObject("Params");
{
diff --git a/zenutil/include/zenutil/cache/cacherequests.h b/zenutil/include/zenutil/cache/cacherequests.h
index 4eebd3121..f1999ebfe 100644
--- a/zenutil/include/zenutil/cache/cacherequests.h
+++ b/zenutil/include/zenutil/cache/cacherequests.h
@@ -107,6 +107,8 @@ namespace cacherequests {
struct GetCacheRecordsRequest
{
uint32_t AcceptMagic = 0;
+ uint16_t AcceptOptions = 0;
+ int32_t ProcessPid = 0;
CachePolicy DefaultPolicy = CachePolicy::Default;
std::string Namespace;
std::vector<GetCacheRecordRequest> Requests;
@@ -181,6 +183,8 @@ namespace cacherequests {
struct GetCacheValuesRequest
{
uint32_t AcceptMagic = 0;
+ uint16_t AcceptOptions = 0;
+ int32_t ProcessPid = 0;
CachePolicy DefaultPolicy = CachePolicy::Default;
std::string Namespace;
std::vector<GetCacheValueRequest> Requests;
@@ -222,6 +226,8 @@ namespace cacherequests {
struct GetCacheChunksRequest
{
uint32_t AcceptMagic = 0;
+ uint16_t AcceptOptions = 0;
+ int32_t ProcessPid = 0;
CachePolicy DefaultPolicy = CachePolicy::Default;
std::string Namespace;
std::vector<GetCacheChunkRequest> Requests;