diff options
| author | Dan Engelbrecht <[email protected]> | 2023-03-14 14:36:18 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-03-14 06:36:18 -0700 |
| commit | fea4fa0095668e392aa3333450e93afc1784762b (patch) | |
| tree | c80ffdf3824ba75ee9b7c312010cdb84c48aae46 | |
| parent | removed catch2 (#241) (diff) | |
| download | zen-fea4fa0095668e392aa3333450e93afc1784762b.tar.xz zen-fea4fa0095668e392aa3333450e93afc1784762b.zip | |
send payloads as duplicated handles (#240)
* send payloads as duplicated handles if requestor provides process id and allows local file references.
* linux/macos fixes
* tests
* fix access rights when duplicating handle
* fix closing of duplicated handles on error
* cleanup
* changelog
| -rw-r--r-- | CHANGELOG.md | 1 | ||||
| -rw-r--r-- | zenhttp/httpshared.cpp | 154 | ||||
| -rw-r--r-- | zenhttp/include/zenhttp/httpshared.h | 13 | ||||
| -rw-r--r-- | zenserver-test/zenserver-test.cpp | 163 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 48 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 3 | ||||
| -rw-r--r-- | zenutil/cache/cacherequests.cpp | 36 | ||||
| -rw-r--r-- | zenutil/include/zenutil/cache/cacherequests.h | 6 |
8 files changed, 367 insertions, 57 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index fc6ec9420..b69085813 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ - `--showmethodstats` Show statistics of which RPC methods are used - Feature: `--junit` switch to `xmake test` to generate junit style reports of tests. - Feature: CI build on GitHub now uploads junit test reports as artifact to the check for PR validation and mainline validation +- Feature: Payloads from zenserver can now be sent using duplicated file handles if caller requests provides client ProcessId (Windows only). - Bugfix: Make sure async responses are sent async correctly in httpsys - Improvement: FileCas now keeps an up to date index of all the entries improving performance when getting cache misses on large payloads - Improvement: Structured cache now keeps RawHash and RawSize in memory avoiding materialization of cache values before sending response diff --git a/zenhttp/httpshared.cpp b/zenhttp/httpshared.cpp index b6346413f..7aade56d2 100644 --- a/zenhttp/httpshared.cpp +++ b/zenhttp/httpshared.cpp @@ -10,6 +10,7 @@ #include <zencore/iobuffer.h> #include <zencore/iohash.h> #include <zencore/logging.h> +#include <zencore/scopeguard.h> #include <zencore/stream.h> #include <zencore/testing.h> #include <zencore/testutils.h> @@ -23,21 +24,23 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { +const std::string_view HandlePrefix(":?#:"); + std::vector<IoBuffer> -FormatPackageMessage(const CbPackage& Data) +FormatPackageMessage(const CbPackage& Data, int TargetProcessPid) { - return FormatPackageMessage(Data, FormatFlags::kDefault); + return FormatPackageMessage(Data, FormatFlags::kDefault, TargetProcessPid); } CompositeBuffer -FormatPackageMessageBuffer(const CbPackage& Data) +FormatPackageMessageBuffer(const CbPackage& Data, int TargetProcessPid) { - return FormatPackageMessageBuffer(Data, FormatFlags::kDefault); + return FormatPackageMessageBuffer(Data, FormatFlags::kDefault, TargetProcessPid); } CompositeBuffer -FormatPackageMessageBuffer(const CbPackage& Data, FormatFlags Flags) +FormatPackageMessageBuffer(const CbPackage& Data, FormatFlags Flags, int TargetProcessPid) { - std::vector<IoBuffer> Message = FormatPackageMessage(Data, Flags); + std::vector<IoBuffer> Message = FormatPackageMessage(Data, Flags, TargetProcessPid); std::vector<SharedBuffer> Buffers; @@ -50,8 +53,44 @@ FormatPackageMessageBuffer(const CbPackage& Data, FormatFlags Flags) } std::vector<IoBuffer> -FormatPackageMessage(const CbPackage& Data, FormatFlags Flags) +FormatPackageMessage(const CbPackage& Data, FormatFlags Flags, int TargetProcessPid) { + void* TargetProcessHandle = nullptr; +#if ZEN_PLATFORM_WINDOWS + std::vector<HANDLE> DuplicatedHandles; + auto _ = MakeGuard([&DuplicatedHandles, &TargetProcessHandle]() { + if (TargetProcessHandle == nullptr) + { + return; + } + + for (HANDLE DuplicatedHandle : DuplicatedHandles) + { + HANDLE ClosingHandle; + if (::DuplicateHandle((HANDLE)TargetProcessHandle, + DuplicatedHandle, + GetCurrentProcess(), + &ClosingHandle, + 0, + FALSE, + DUPLICATE_CLOSE_SOURCE | DUPLICATE_SAME_ACCESS) == TRUE) + { + ::CloseHandle(ClosingHandle); + } + } + ::CloseHandle((HANDLE)TargetProcessHandle); + TargetProcessHandle = nullptr; + }); + + if (EnumHasAllFlags(Flags, FormatFlags::kAllowLocalReferences) && TargetProcessPid != 0) + { + TargetProcessHandle = OpenProcess(PROCESS_DUP_HANDLE, FALSE, TargetProcessPid); + } +#else + ZEN_UNUSED(TargetProcessPid); + void* DuplicatedHandles = nullptr; +#endif // ZEN_PLATFORM_WINDOWS + const std::span<const CbAttachment>& Attachments = Data.GetAttachments(); std::vector<IoBuffer> ResponseBuffers; @@ -99,10 +138,11 @@ FormatPackageMessage(const CbPackage& Data, FormatFlags Flags) tsl::robin_map<void*, std::string> FileNameMap; - auto IsLocalRef = [&FileNameMap](const CompositeBuffer& AttachmentBinary, - bool DenyPartialLocalReferences, - CbAttachmentReferenceHeader& LocalRef, - std::string& Path8) -> bool { + auto IsLocalRef = [&FileNameMap, &DuplicatedHandles](const CompositeBuffer& AttachmentBinary, + bool DenyPartialLocalReferences, + void* TargetProcessHandle, + CbAttachmentReferenceHeader& LocalRef, + std::string& Path8) -> bool { const SharedBuffer& Segment = AttachmentBinary.GetSegments().front(); IoBufferFileReference Ref; const IoBuffer& SegmentBuffer = Segment.AsIoBuffer(); @@ -123,9 +163,36 @@ FormatPackageMessage(const CbPackage& Data, FormatFlags Flags) } else { - ExtendablePathBuilder<256> LocalRefFile; - LocalRefFile.Append(std::filesystem::absolute(PathFromHandle(Ref.FileHandle))); - Path8 = LocalRefFile.ToUtf8(); + bool UseFilePath = true; +#if ZEN_PLATFORM_WINDOWS + if (TargetProcessHandle != nullptr) + { + HANDLE TargetHandle = INVALID_HANDLE_VALUE; + BOOL OK = ::DuplicateHandle(GetCurrentProcess(), + Ref.FileHandle, + (HANDLE)TargetProcessHandle, + &TargetHandle, + FILE_GENERIC_READ, + FALSE, + 0); + if (OK) + { + DuplicatedHandles.push_back(TargetHandle); + Path8 = fmt::format("{}{}", HandlePrefix, reinterpret_cast<uint64_t>(TargetHandle)); + UseFilePath = false; + } + } +#else // ZEN_PLATFORM_WINDOWS + ZEN_UNUSED(TargetProcessHandle); + // Not supported on Linux/Mac. Could potentially use pidfd_getfd() but that requires a fairly new Linux kernel/includes and to + // deal with acceess rights etc. +#endif // ZEN_PLATFORM_WINDOWS + if (UseFilePath) + { + ExtendablePathBuilder<256> LocalRefFile; + LocalRefFile.Append(std::filesystem::absolute(PathFromHandle(Ref.FileHandle))); + Path8 = LocalRefFile.ToUtf8(); + } FileNameMap.insert_or_assign(Ref.FileHandle, Path8); } @@ -159,14 +226,18 @@ FormatPackageMessage(const CbPackage& Data, FormatFlags Flags) if (MarshalByLocalRef) { - MarshalByLocalRef = IsLocalRef(Compressed, DenyPartialLocalReferences, LocalRef, Path8); + MarshalByLocalRef = IsLocalRef(Compressed, DenyPartialLocalReferences, TargetProcessHandle, LocalRef, Path8); } if (MarshalByLocalRef) { const bool IsCompressed = true; + bool IsHandle = false; +#if ZEN_PLATFORM_WINDOWS + IsHandle = Path8.starts_with(HandlePrefix); +#endif MarshalLocal(Path8, LocalRef, AttachmentHash, IsCompressed); - ZEN_DEBUG("Marshalled '{}' as file of {} bytes", Path8, Compressed.GetSize()); + ZEN_DEBUG("Marshalled '{}' as file {} of {} bytes", Path8, IsHandle ? "handle" : "path", Compressed.GetSize()); } else { @@ -201,14 +272,18 @@ FormatPackageMessage(const CbPackage& Data, FormatFlags Flags) if (MarshalByLocalRef) { - MarshalByLocalRef = IsLocalRef(AttachmentBinary, DenyPartialLocalReferences, LocalRef, Path8); + MarshalByLocalRef = IsLocalRef(AttachmentBinary, DenyPartialLocalReferences, TargetProcessHandle, LocalRef, Path8); } if (MarshalByLocalRef) { const bool IsCompressed = false; + bool IsHandle = false; +#if ZEN_PLATFORM_WINDOWS + IsHandle = Path8.starts_with(HandlePrefix); +#endif MarshalLocal(Path8, LocalRef, AttachmentHash, IsCompressed); - ZEN_DEBUG("Marshalled '{}' as file of {} bytes", Path8, AttachmentBinary.GetSize()); + ZEN_DEBUG("Marshalled '{}' as file {} of {} bytes", Path8, IsHandle ? "handle" : "path", AttachmentBinary.GetSize()); } else { @@ -226,6 +301,9 @@ FormatPackageMessage(const CbPackage& Data, FormatFlags Flags) } } FileNameMap.clear(); +#if ZEN_PLATFORM_WINDOWS + DuplicatedHandles.clear(); +#endif // ZEN_PLATFORM_WINDOWS return ResponseBuffers; } @@ -282,6 +360,8 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint tsl::robin_map<std::string, IoBuffer> PartialFileBuffers; + // TODO: Throwing before this loop completes could result in leaking handles as we might not have picked up all the handles in the + // message for (uint32_t i = 0; i < ChunkCount; ++i) { const CbAttachmentEntry& Entry = AttachmentEntries[i]; @@ -297,20 +377,46 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint ZEN_ASSERT(AttachmentBuffer.Size() >= sizeof(CbAttachmentReferenceHeader)); const CbAttachmentReferenceHeader* AttachRefHdr = AttachmentBuffer.Data<CbAttachmentReferenceHeader>(); - const char8_t* PathPointer = reinterpret_cast<const char8_t*>(AttachRefHdr + 1); + const char* PathPointer = reinterpret_cast<const char*>(AttachRefHdr + 1); ZEN_ASSERT(AttachmentBuffer.Size() >= (sizeof(CbAttachmentReferenceHeader) + AttachRefHdr->AbsolutePathLength)); - - std::filesystem::path Path{std::u8string_view(PathPointer, AttachRefHdr->AbsolutePathLength)}; + std::string_view PathView(PathPointer, AttachRefHdr->AbsolutePathLength); IoBuffer FullFileBuffer; + + std::filesystem::path Path(Utf8ToWide(PathView)); if (auto It = PartialFileBuffers.find(Path.string()); It != PartialFileBuffers.end()) { FullFileBuffer = It->second; } else { - FullFileBuffer = PartialFileBuffers.insert_or_assign(Path.string(), IoBufferBuilder::MakeFromFile(Path)).first->second; + if (PathView.starts_with(HandlePrefix)) + { +#if ZEN_PLATFORM_WINDOWS + std::string_view HandleString(PathView.substr(HandlePrefix.length())); + std::optional<uint64_t> HandleNumber(ParseInt<uint64_t>(HandleString)); + if (HandleNumber.has_value()) + { + HANDLE FileHandle = HANDLE(HandleNumber.value()); + ULARGE_INTEGER liFileSize; + liFileSize.LowPart = ::GetFileSize(FileHandle, &liFileSize.HighPart); + if (liFileSize.LowPart != INVALID_FILE_SIZE) + { + FullFileBuffer = IoBuffer(IoBuffer::File, (void*)FileHandle, 0, uint64_t(liFileSize.QuadPart)); + PartialFileBuffers.insert_or_assign(Path.string(), FullFileBuffer); + } + } +#else // ZEN_PLATFORM_WINDOWS + // Not supported on Linux/Mac. Could potentially use pidfd_getfd() but that requires a fairly new Linux kernel/includes + // and to deal with acceess rights etc. + ZEN_ASSERT(false); +#endif // ZEN_PLATFORM_WINDOWS + } + else + { + FullFileBuffer = PartialFileBuffers.insert_or_assign(Path.string(), IoBufferBuilder::MakeFromFile(Path)).first->second; + } } if (!FullFileBuffer) @@ -318,7 +424,7 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint // Unable to open chunk reference throw std::runtime_error(fmt::format("unable to resolve chunk #{} at '{}' (offset {}, size {})", i, - PathToUtf8(Path), + Path, AttachRefHdr->PayloadByteOffset, AttachRefHdr->PayloadByteSize)); } @@ -332,7 +438,7 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint { throw std::runtime_error(fmt::format("invalid format for chunk #{} at '{}' (offset {}, size {})", i, - PathToUtf8(Path), + Path, AttachRefHdr->PayloadByteOffset, AttachRefHdr->PayloadByteSize)); } diff --git a/zenhttp/include/zenhttp/httpshared.h b/zenhttp/include/zenhttp/httpshared.h index 7ab9c9339..d335572c5 100644 --- a/zenhttp/include/zenhttp/httpshared.h +++ b/zenhttp/include/zenhttp/httpshared.h @@ -87,14 +87,15 @@ gsl_DEFINE_ENUM_BITMASK_OPERATORS(FormatFlags); enum class RpcAcceptOptions : uint16_t { - kNone = 0, - kAllowLocalReferences = (1u << 0), + kNone = 0, + kAllowLocalReferences = (1u << 0), + kAllowPartialLocalReferences = (1u << 1) }; gsl_DEFINE_ENUM_BITMASK_OPERATORS(RpcAcceptOptions); -std::vector<IoBuffer> FormatPackageMessage(const CbPackage& Data, FormatFlags Flags); -CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data, FormatFlags Flags); +std::vector<IoBuffer> FormatPackageMessage(const CbPackage& Data, FormatFlags Flags, int TargetProcessPid = 0); +CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data, FormatFlags Flags, int TargetProcessPid = 0); CbPackage ParsePackageMessage( IoBuffer Payload, std::function<IoBuffer(const IoHash& Cid, uint64_t Size)> CreateBuffer = [](const IoHash&, uint64_t Size) -> IoBuffer { @@ -104,8 +105,8 @@ bool IsPackageMessage(IoBuffer Payload); bool ParsePackageMessageWithLegacyFallback(const IoBuffer& Response, CbPackage& OutPackage); -std::vector<IoBuffer> FormatPackageMessage(const CbPackage& Data); -CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data); +std::vector<IoBuffer> FormatPackageMessage(const CbPackage& Data, int TargetProcessPid = 0); +CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data, int TargetProcessPid = 0); /** Streaming reader for compact binary packages diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index c1d2e8838..079f70984 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -1334,11 +1334,16 @@ TEST_CASE("zcache.rpc") CachePolicy RecordPolicy) { std::vector<uint8_t> Data; Data.resize(PayloadSize); - for (size_t Idx = 0; Idx < PayloadSize; ++Idx) + uint32_t DataSeed = *reinterpret_cast<const uint32_t*>(&CacheKey.Hash.Hash[0]); + uint16_t* DataPtr = reinterpret_cast<uint16_t*>(Data.data()); + for (size_t Idx = 0; Idx < PayloadSize / 2; ++Idx) { - Data[Idx] = Idx % 255; + DataPtr[Idx] = static_cast<uint16_t>((Idx + DataSeed) % 0xffffu); + } + if (PayloadSize & 1) + { + Data[PayloadSize - 1] = static_cast<uint8_t>((PayloadSize - 1) & 0xff); } - CompressedBuffer Value = zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size())); Request.Requests.push_back({.Key = CacheKey, .Values = {{.Id = Oid::NewOid(), .Body = std::move(Value)}}, .Policy = RecordPolicy}); }; @@ -1347,13 +1352,14 @@ TEST_CASE("zcache.rpc") std::string_view Namespace, std::string_view Bucket, size_t Num, - size_t PayloadSize = 1024) -> std::vector<CacheKey> { + size_t PayloadSize = 1024, + size_t KeyOffset = 1) -> std::vector<CacheKey> { std::vector<zen::CacheKey> OutKeys; for (uint32_t Key = 1; Key <= Num; ++Key) { zen::IoHash KeyHash; - ((uint32_t*)(KeyHash.Hash))[0] = Key; + ((uint32_t*)(KeyHash.Hash))[0] = gsl::narrow<uint32_t>(KeyOffset + Key); const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, KeyHash); cacherequests::PutCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, .Namespace = std::string(Namespace)}; @@ -1384,8 +1390,12 @@ TEST_CASE("zcache.rpc") auto GetCacheRecords = [](std::string_view BaseUri, std::string_view Namespace, std::span<zen::CacheKey> Keys, - zen::CachePolicy Policy) -> GetCacheRecordResult { + zen::CachePolicy Policy, + zen::RpcAcceptOptions AcceptOptions = zen::RpcAcceptOptions::kNone, + int Pid = 0) -> GetCacheRecordResult { cacherequests::GetCacheRecordsRequest Request = {.AcceptMagic = kCbPkgMagic, + .AcceptOptions = static_cast<uint16_t>(AcceptOptions), + .ProcessPid = Pid, .DefaultPolicy = Policy, .Namespace = std::string(Namespace)}; for (const CacheKey& Key : Keys) @@ -1547,6 +1557,147 @@ TEST_CASE("zcache.rpc") CHECK(Record->Key == ExpectedKey); } } + + SUBCASE("RpcAcceptOptions") + { + using namespace utils; + + std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + const uint16_t PortNumber = 13337; + const auto BaseUri = fmt::format("http://localhost:{}/z$", PortNumber); + + ZenServerInstance Inst(TestEnv); + Inst.SetTestDir(TestDir); + Inst.SpawnServer(PortNumber); + Inst.WaitUntilReady(); + + std::vector<zen::CacheKey> SmallKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 4, 1024); + std::vector<zen::CacheKey> LargeKeys = PutCacheRecords(BaseUri, "ue4.ddc"sv, "mastodon"sv, 4, 1024 * 1024 * 16, SmallKeys.size()); + + std::vector<zen::CacheKey> Keys(SmallKeys.begin(), SmallKeys.end()); + Keys.insert(Keys.end(), LargeKeys.begin(), LargeKeys.end()); + + { + GetCacheRecordResult Result = GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, CachePolicy::Default); + + CHECK(Result.Result.Results.size() == Keys.size()); + + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer(); + IoBufferFileReference Ref; + bool IsFileRef = Body.GetFileReference(Ref); + CHECK(!IsFileRef); + } + } + } + + // File path, but only for large files + { + GetCacheRecordResult Result = + GetCacheRecords(BaseUri, "ue4.ddc"sv, Keys, CachePolicy::Default, RpcAcceptOptions::kAllowLocalReferences); + + CHECK(Result.Result.Results.size() == Keys.size()); + + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer(); + IoBufferFileReference Ref; + bool IsFileRef = Body.GetFileReference(Ref); + CHECK(IsFileRef == (Body.Size() > 1024)); + } + } + } + + // File path, for all files + { + GetCacheRecordResult Result = + GetCacheRecords(BaseUri, + "ue4.ddc"sv, + Keys, + CachePolicy::Default, + RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialLocalReferences); + + CHECK(Result.Result.Results.size() == Keys.size()); + + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer(); + IoBufferFileReference Ref; + bool IsFileRef = Body.GetFileReference(Ref); + CHECK(IsFileRef); + } + } + } + + // File handle, but only for large files + { + GetCacheRecordResult Result = GetCacheRecords(BaseUri, + "ue4.ddc"sv, + Keys, + CachePolicy::Default, + RpcAcceptOptions::kAllowLocalReferences, + GetCurrentProcessId()); + + CHECK(Result.Result.Results.size() == Keys.size()); + + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer(); + IoBufferFileReference Ref; + bool IsFileRef = Body.GetFileReference(Ref); + CHECK(IsFileRef == (Body.Size() > 1024)); + } + } + } + + // File handle, for all files + { + GetCacheRecordResult Result = + GetCacheRecords(BaseUri, + "ue4.ddc"sv, + Keys, + CachePolicy::Default, + RpcAcceptOptions::kAllowLocalReferences | RpcAcceptOptions::kAllowPartialLocalReferences, + GetCurrentProcessId()); + + CHECK(Result.Result.Results.size() == Keys.size()); + + for (size_t Index = 0; const std::optional<cacherequests::GetCacheRecordResult>& Record : Result.Result.Results) + { + CHECK(Record); + const CacheKey& ExpectedKey = Keys[Index++]; + CHECK(Record->Key == ExpectedKey); + for (const cacherequests::GetCacheRecordResultValue& Value : Record->Values) + { + const IoBuffer& Body = Value.Body.GetCompressed().Flatten().AsIoBuffer(); + IoBufferFileReference Ref; + bool IsFileRef = Body.GetFileReference(Ref); + CHECK(IsFileRef); + } + } + } + } } TEST_CASE("zcache.failing.upstream") diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 154676d77..8539d9c16 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -1306,7 +1306,8 @@ CbPackage HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType, IoBuffer&& Body, uint32_t& OutAcceptMagic, - RpcAcceptOptions& OutAcceptFlags) + RpcAcceptOptions& OutAcceptFlags, + int& OutTargetProcessId) { CbPackage Package; CbObjectView Object; @@ -1321,8 +1322,9 @@ HttpStructuredCacheService::HandleRpcRequest(const ZenContentType ContentType, Package = ParsePackageMessage(Body); Object = Package.GetObject(); } - OutAcceptMagic = Object["Accept"sv].AsUInt32(); - OutAcceptFlags = static_cast<RpcAcceptOptions>(Object["AcceptFlags"sv].AsUInt16(0u)); + OutAcceptMagic = Object["Accept"sv].AsUInt32(); + OutAcceptFlags = static_cast<RpcAcceptOptions>(Object["AcceptFlags"sv].AsUInt16(0u)); + OutTargetProcessId = Object["Pid"sv].AsInt32(0); const std::string_view Method = Object["Method"sv].AsString(); @@ -1367,14 +1369,20 @@ HttpStructuredCacheService::ReplayRequestRecorder(cache::IRpcRequestReplayer& Re { uint32_t AcceptMagic = 0; RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; - CbPackage RpcResult = HandleRpcRequest(ContentType.first, std::move(Body), AcceptMagic, AcceptFlags); + int TargetPid = 0; + CbPackage RpcResult = HandleRpcRequest(ContentType.first, std::move(Body), AcceptMagic, AcceptFlags, TargetPid); if (AcceptMagic == kCbPkgMagic) { - bool AllowFileReferences = EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences); - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer( - RpcResult, - AllowFileReferences ? FormatFlags::kAllowLocalReferences | FormatFlags::kDenyPartialLocalReferences - : FormatFlags::kDefault); + FormatFlags Flags = FormatFlags::kDefault; + if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) + { + Flags |= FormatFlags::kAllowLocalReferences; + if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) + { + Flags |= FormatFlags::kDenyPartialLocalReferences; + } + } + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetPid); ZEN_ASSERT(RpcResponseBuffer.GetSize() > 0); } else @@ -1417,9 +1425,10 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) [this, Body = Request.ReadPayload(), ContentType, AcceptType](HttpServerRequest& AsyncRequest) mutable { std::uint64_t RequestIndex = m_RequestRecorder ? m_RequestRecorder->RecordRequest(ContentType, AcceptType, Body) : ~0ull; - uint32_t AcceptMagic = 0; - RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; - CbPackage RpcResult = HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags); + uint32_t AcceptMagic = 0; + RpcAcceptOptions AcceptFlags = RpcAcceptOptions::kNone; + int TargetProcessId = 0; + CbPackage RpcResult = HandleRpcRequest(ContentType, std::move(Body), AcceptMagic, AcceptFlags, TargetProcessId); if (RpcResult.IsNull()) { AsyncRequest.WriteResponse(HttpResponseCode::BadRequest); @@ -1427,11 +1436,16 @@ HttpStructuredCacheService::HandleRpcRequest(HttpServerRequest& Request) } if (AcceptMagic == kCbPkgMagic) { - bool AllowFileReferences = EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences); - CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer( - RpcResult, - AllowFileReferences ? FormatFlags::kAllowLocalReferences | FormatFlags::kDenyPartialLocalReferences - : FormatFlags::kDefault); + FormatFlags Flags = FormatFlags::kDefault; + if (EnumHasAllFlags(AcceptFlags, RpcAcceptOptions::kAllowLocalReferences)) + { + Flags |= FormatFlags::kAllowLocalReferences; + if (!EnumHasAnyFlags(AcceptFlags, RpcAcceptOptions::kAllowPartialLocalReferences)) + { + Flags |= FormatFlags::kDenyPartialLocalReferences; + } + } + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(RpcResult, Flags, TargetProcessId); if (RequestIndex != ~0ull) { ZEN_ASSERT(m_RequestRecorder); diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index f606a1cf5..e9c58c3d6 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -119,7 +119,8 @@ private: CbPackage HandleRpcRequest(const ZenContentType ContentType, IoBuffer&& Body, uint32_t& OutAcceptMagic, - RpcAcceptOptions& OutAcceptFlags); + RpcAcceptOptions& OutAcceptFlags, + int& OutTargetProcessId); void HandleCacheRequest(HttpServerRequest& Request); void HandleCacheNamespaceRequest(HttpServerRequest& Request, std::string_view Namespace); diff --git a/zenutil/cache/cacherequests.cpp b/zenutil/cache/cacherequests.cpp index 5cbb2e04b..4c865ec22 100644 --- a/zenutil/cache/cacherequests.cpp +++ b/zenutil/cache/cacherequests.cpp @@ -332,7 +332,9 @@ namespace cacherequests { bool GetCacheRecordsRequest::Parse(const CbObjectView& RpcRequest) { ZEN_ASSERT(RpcRequest["Method"].AsString() == "GetCacheRecords"); - AcceptMagic = RpcRequest["AcceptType"].AsUInt32(0); + AcceptMagic = RpcRequest["AcceptType"].AsUInt32(0); + AcceptOptions = RpcRequest["AcceptFlags"].AsUInt16(0); + ProcessPid = RpcRequest["Pid"].AsInt32(0); CbObjectView Params = RpcRequest["Params"].AsObjectView(); std::optional<std::string> RequestNamespace = GetRequestNamespace(Params); @@ -373,6 +375,14 @@ namespace cacherequests { { Writer << "Accept" << AcceptMagic; } + if (AcceptOptions != 0) + { + Writer << "AcceptFlags" << AcceptOptions; + } + if (ProcessPid != 0) + { + Writer << "Pid" << ProcessPid; + } Writer.BeginObject("Params"); { @@ -656,7 +666,9 @@ namespace cacherequests { bool GetCacheValuesRequest::Parse(const CbObjectView& BatchObject) { ZEN_ASSERT(BatchObject["Method"].AsString() == "GetCacheValues"); - AcceptMagic = BatchObject["AcceptType"].AsUInt32(0); + AcceptMagic = BatchObject["AcceptType"].AsUInt32(0); + AcceptOptions = BatchObject["AcceptFlags"].AsUInt16(0); + ProcessPid = BatchObject["Pid"].AsInt32(0); CbObjectView Params = BatchObject["Params"].AsObjectView(); std::optional<std::string> RequestNamespace = cacherequests::GetRequestNamespace(Params); @@ -696,6 +708,14 @@ namespace cacherequests { { Writer << "Accept" << AcceptMagic; } + if (AcceptOptions != 0) + { + Writer << "AcceptFlags" << AcceptOptions; + } + if (ProcessPid != 0) + { + Writer << "Pid" << ProcessPid; + } Writer.BeginObject("Params"); { @@ -817,7 +837,9 @@ namespace cacherequests { bool GetCacheChunksRequest::Parse(const CbObjectView& BatchObject) { ZEN_ASSERT(BatchObject["Method"].AsString() == "GetCacheChunks"); - AcceptMagic = BatchObject["AcceptType"].AsUInt32(0); + AcceptMagic = BatchObject["AcceptType"].AsUInt32(0); + AcceptOptions = BatchObject["AcceptFlags"].AsUInt16(0); + ProcessPid = BatchObject["Pid"].AsInt32(0); CbObjectView Params = BatchObject["Params"].AsObjectView(); std::optional<std::string> RequestNamespace = cacherequests::GetRequestNamespace(Params); @@ -862,6 +884,14 @@ namespace cacherequests { { Writer << "Accept" << AcceptMagic; } + if (AcceptOptions != 0) + { + Writer << "AcceptFlags" << AcceptOptions; + } + if (ProcessPid != 0) + { + Writer << "Pid" << ProcessPid; + } Writer.BeginObject("Params"); { diff --git a/zenutil/include/zenutil/cache/cacherequests.h b/zenutil/include/zenutil/cache/cacherequests.h index 4eebd3121..f1999ebfe 100644 --- a/zenutil/include/zenutil/cache/cacherequests.h +++ b/zenutil/include/zenutil/cache/cacherequests.h @@ -107,6 +107,8 @@ namespace cacherequests { struct GetCacheRecordsRequest { uint32_t AcceptMagic = 0; + uint16_t AcceptOptions = 0; + int32_t ProcessPid = 0; CachePolicy DefaultPolicy = CachePolicy::Default; std::string Namespace; std::vector<GetCacheRecordRequest> Requests; @@ -181,6 +183,8 @@ namespace cacherequests { struct GetCacheValuesRequest { uint32_t AcceptMagic = 0; + uint16_t AcceptOptions = 0; + int32_t ProcessPid = 0; CachePolicy DefaultPolicy = CachePolicy::Default; std::string Namespace; std::vector<GetCacheValueRequest> Requests; @@ -222,6 +226,8 @@ namespace cacherequests { struct GetCacheChunksRequest { uint32_t AcceptMagic = 0; + uint16_t AcceptOptions = 0; + int32_t ProcessPid = 0; CachePolicy DefaultPolicy = CachePolicy::Default; std::string Namespace; std::vector<GetCacheChunkRequest> Requests; |