// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace zen { std::vector FormatPackageMessage(const CbPackage& Data) { return FormatPackageMessage(Data, FormatFlags::kDefault); } CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data) { return FormatPackageMessageBuffer(Data, FormatFlags::kDefault); } CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data, FormatFlags Flags) { std::vector Message = FormatPackageMessage(Data, Flags); std::vector Buffers; for (IoBuffer& Buf : Message) { Buffers.push_back(SharedBuffer(Buf)); } return CompositeBuffer(std::move(Buffers)); } std::vector FormatPackageMessage(const CbPackage& Data, FormatFlags Flags) { const std::span& Attachments = Data.GetAttachments(); std::vector ResponseBuffers; ResponseBuffers.reserve(3 + Attachments.size()); // TODO: may want to use an additional fudge factor here to avoid growing since each // attachment is likely to consist of several buffers // Fixed size header CbPackageHeader Hdr{.HeaderMagic = kCbPkgMagic, .AttachmentCount = gsl::narrow(Attachments.size())}; ResponseBuffers.push_back(IoBufferBuilder::MakeCloneFromMemory(&Hdr, sizeof Hdr)); // Attachment metadata array IoBuffer AttachmentMetadataBuffer = IoBuffer{sizeof(CbAttachmentEntry) * (Attachments.size() + /* root */ 1)}; CbAttachmentEntry* AttachmentInfo = reinterpret_cast(AttachmentMetadataBuffer.MutableData()); ResponseBuffers.push_back(AttachmentMetadataBuffer); // Attachment metadata // Root object IoBuffer RootIoBuffer = Data.GetObject().GetBuffer().AsIoBuffer(); ResponseBuffers.push_back(RootIoBuffer); // Root object *AttachmentInfo++ = {.PayloadSize = RootIoBuffer.Size(), .Flags = CbAttachmentEntry::kIsObject, .AttachmentHash = Data.GetObjectHash()}; // Attachment payloads auto MarshalLocal = [&AttachmentInfo, &ResponseBuffers](const std::string& Path8, CbAttachmentReferenceHeader& LocalRef, const IoHash& AttachmentHash, bool IsCompressed) { IoBuffer RefBuffer(sizeof(CbAttachmentReferenceHeader) + Path8.size()); CbAttachmentReferenceHeader* RefHdr = RefBuffer.MutableData(); *RefHdr++ = LocalRef; memcpy(RefHdr, Path8.data(), Path8.size()); *AttachmentInfo++ = {.PayloadSize = RefBuffer.GetSize(), .Flags = (IsCompressed ? uint32_t(CbAttachmentEntry::kIsCompressed) : 0u) | CbAttachmentEntry::kIsLocalRef, .AttachmentHash = AttachmentHash}; ResponseBuffers.push_back(std::move(RefBuffer)); }; auto IsLocalRef = [](const CompositeBuffer& AttachmentBinary, bool DenyPartialLocalReferences, CbAttachmentReferenceHeader& LocalRef, std::string& Path8) -> bool { const SharedBuffer& Segment = AttachmentBinary.GetSegments().front(); IoBufferFileReference Ref; const IoBuffer& SegmentBuffer = Segment.AsIoBuffer(); if (!SegmentBuffer.GetFileReference(Ref)) { return false; } if (DenyPartialLocalReferences && !SegmentBuffer.IsWholeFile()) { return false; } ExtendablePathBuilder<256> LocalRefFile; LocalRefFile.Append(std::filesystem::absolute(PathFromHandle(Ref.FileHandle))); Path8 = LocalRefFile.ToUtf8(); LocalRef.AbsolutePathLength = gsl::narrow(Path8.size()); LocalRef.PayloadByteOffset = Ref.FileChunkOffset; LocalRef.PayloadByteSize = Ref.FileChunkSize; return true; }; for (const CbAttachment& Attachment : Attachments) { if (Attachment.IsNull()) { ZEN_NOT_IMPLEMENTED("Null attachments are not supported"); } else if (CompressedBuffer AttachmentBuffer = Attachment.AsCompressedBinary()) { CompositeBuffer Compressed = AttachmentBuffer.GetCompressed(); IoHash AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash()); // If the data is either not backed by a file, or there are multiple // fragments then we cannot marshal it by local reference. We might // want/need to extend this in the future to allow multiple chunk // segments to be marshaled at once bool MarshalByLocalRef = EnumHasAllFlags(Flags, FormatFlags::kAllowLocalReferences) && (Compressed.GetSegments().size() == 1); bool DenyPartialLocalReferences = EnumHasAllFlags(Flags, FormatFlags::kDenyPartialLocalReferences); CbAttachmentReferenceHeader LocalRef; std::string Path8; if (MarshalByLocalRef) { MarshalByLocalRef = IsLocalRef(Compressed, DenyPartialLocalReferences, LocalRef, Path8); } if (MarshalByLocalRef) { const bool IsCompressed = true; MarshalLocal(Path8, LocalRef, AttachmentHash, IsCompressed); ZEN_DEBUG("Marshalled '{}' as file of {} bytes", Path8, Compressed.GetSize()); } else { *AttachmentInfo++ = {.PayloadSize = AttachmentBuffer.GetCompressedSize(), .Flags = CbAttachmentEntry::kIsCompressed, .AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash())}; for (const SharedBuffer& Segment : Compressed.GetSegments()) { ResponseBuffers.push_back(Segment.AsIoBuffer()); } } } else if (CbObject AttachmentObject = Attachment.AsObject()) { IoBuffer ObjIoBuffer = AttachmentObject.GetBuffer().AsIoBuffer(); ResponseBuffers.push_back(ObjIoBuffer); *AttachmentInfo++ = {.PayloadSize = ObjIoBuffer.Size(), .Flags = CbAttachmentEntry::kIsObject, .AttachmentHash = Attachment.GetHash()}; } else if (CompositeBuffer AttachmentBinary = Attachment.AsCompositeBinary()) { IoHash AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash()); bool MarshalByLocalRef = EnumHasAllFlags(Flags, FormatFlags::kAllowLocalReferences) && (AttachmentBinary.GetSegments().size() == 1); bool DenyPartialLocalReferences = EnumHasAllFlags(Flags, FormatFlags::kDenyPartialLocalReferences); CbAttachmentReferenceHeader LocalRef; std::string Path8; if (MarshalByLocalRef) { MarshalByLocalRef = IsLocalRef(AttachmentBinary, DenyPartialLocalReferences, LocalRef, Path8); } if (MarshalByLocalRef) { const bool IsCompressed = false; MarshalLocal(Path8, LocalRef, AttachmentHash, IsCompressed); ZEN_DEBUG("Marshalled '{}' as file of {} bytes", Path8, AttachmentBinary.GetSize()); } else { *AttachmentInfo++ = {.PayloadSize = AttachmentBinary.GetSize(), .Flags = 0, .AttachmentHash = Attachment.GetHash()}; for (const SharedBuffer& Segment : AttachmentBinary.GetSegments()) { ResponseBuffers.push_back(Segment.AsIoBuffer()); } } } else { ZEN_NOT_IMPLEMENTED("Unknown attachment kind"); } } return ResponseBuffers; } bool IsPackageMessage(IoBuffer Payload) { if (!Payload) { return false; } BinaryReader Reader(Payload); CbPackageHeader Hdr; Reader.Read(&Hdr, sizeof Hdr); if (Hdr.HeaderMagic != kCbPkgMagic) { return false; } return true; } CbPackage ParsePackageMessage(IoBuffer Payload, std::function CreateBuffer) { if (!Payload) { return {}; } BinaryReader Reader(Payload); CbPackageHeader Hdr; Reader.Read(&Hdr, sizeof Hdr); if (Hdr.HeaderMagic != kCbPkgMagic) { throw std::runtime_error("invalid CbPackage header magic"); } const uint32_t ChunkCount = Hdr.AttachmentCount + 1; std::unique_ptr AttachmentEntries{new CbAttachmentEntry[ChunkCount]}; Reader.Read(AttachmentEntries.get(), sizeof(CbAttachmentEntry) * ChunkCount); CbPackage Package; for (uint32_t i = 0; i < ChunkCount; ++i) { const CbAttachmentEntry& Entry = AttachmentEntries[i]; const uint64_t AttachmentSize = Entry.PayloadSize; const IoBuffer AttachmentBuffer(Payload, Reader.CurrentOffset(), AttachmentSize); Reader.Skip(AttachmentSize); if (Entry.Flags & CbAttachmentEntry::kIsLocalRef) { // Marshal local reference - a "pointer" to the chunk backing file ZEN_ASSERT(AttachmentBuffer.Size() >= sizeof(CbAttachmentReferenceHeader)); const CbAttachmentReferenceHeader* AttachRefHdr = AttachmentBuffer.Data(); const char8_t* PathPointer = reinterpret_cast(AttachRefHdr + 1); ZEN_ASSERT(AttachmentBuffer.Size() >= (sizeof(CbAttachmentReferenceHeader) + AttachRefHdr->AbsolutePathLength)); std::filesystem::path Path{std::u8string_view(PathPointer, AttachRefHdr->AbsolutePathLength)}; if (IoBuffer ChunkReference = IoBufferBuilder::MakeFromFile(Path, AttachRefHdr->PayloadByteOffset, AttachRefHdr->PayloadByteSize)) { CompressedBuffer CompBuf(CompressedBuffer::FromCompressed(SharedBuffer(ChunkReference))); if (!CompBuf) { throw std::runtime_error(fmt::format("invalid format for chunk #{} at '{}' (offset {}, size {})", i, PathToUtf8(Path), AttachRefHdr->PayloadByteOffset, AttachRefHdr->PayloadByteSize)); } CbAttachment Attachment(std::move(CompBuf)); Package.AddAttachment(Attachment); } else { // Unable to open chunk reference throw std::runtime_error(fmt::format("unable to resolve chunk #{} at '{}' (offset {}, size {})", i, PathToUtf8(Path), AttachRefHdr->PayloadByteOffset, AttachRefHdr->PayloadByteSize)); } } else if (Entry.Flags & CbAttachmentEntry::kIsCompressed) { if (Entry.Flags & CbAttachmentEntry::kIsObject) { if (i == 0) { CompressedBuffer CompBuf(CompressedBuffer::FromCompressed(SharedBuffer(AttachmentBuffer))); if (!CompBuf) { throw std::runtime_error(fmt::format("invalid format for chunk #{} expected compressed buffer for CbObject", i)); } // First payload is always a compact binary object Package.SetObject(LoadCompactBinaryObject(std::move(CompBuf))); } else { ZEN_NOT_IMPLEMENTED("Object attachments are not currently supported"); } } else { // Make a copy of the buffer so we attachements don't reference the entire payload IoBuffer AttachmentBufferCopy = CreateBuffer(Entry.AttachmentHash, AttachmentSize); ZEN_ASSERT(AttachmentBufferCopy); ZEN_ASSERT(AttachmentBufferCopy.Size() == AttachmentSize); AttachmentBufferCopy.GetMutableView().CopyFrom(AttachmentBuffer.GetView()); CompressedBuffer CompBuf(CompressedBuffer::FromCompressed(SharedBuffer(AttachmentBufferCopy))); if (!CompBuf) { throw std::runtime_error(fmt::format("invalid format for chunk #{} expected compressed buffer for attachment", i)); } CbAttachment Attachment(std::move(CompBuf)); Package.AddAttachment(Attachment); } } else /* not compressed */ { if (Entry.Flags & CbAttachmentEntry::kIsObject) { if (i == 0) { Package.SetObject(LoadCompactBinaryObject(AttachmentBuffer)); } else { ZEN_NOT_IMPLEMENTED("Object attachments are not currently supported"); } } else { // Make a copy of the buffer so we attachements don't reference the entire payload IoBuffer AttachmentBufferCopy = CreateBuffer(Entry.AttachmentHash, AttachmentSize); ZEN_ASSERT(AttachmentBufferCopy); ZEN_ASSERT(AttachmentBufferCopy.Size() == AttachmentSize); AttachmentBufferCopy.GetMutableView().CopyFrom(AttachmentBuffer.GetView()); CbAttachment Attachment(SharedBuffer{AttachmentBufferCopy}); Package.AddAttachment(Attachment); } } } return Package; } bool ParsePackageMessageWithLegacyFallback(const IoBuffer& Response, CbPackage& OutPackage) { if (IsPackageMessage(Response)) { OutPackage = ParsePackageMessage(Response); return true; } return OutPackage.TryLoad(Response); } CbPackageReader::CbPackageReader() : m_CreateBuffer([](const IoHash&, uint64_t Size) -> IoBuffer { return IoBuffer{Size}; }) { } CbPackageReader::~CbPackageReader() { } void CbPackageReader::SetPayloadBufferCreator(std::function CreateBuffer) { m_CreateBuffer = CreateBuffer; } uint64_t CbPackageReader::ProcessPackageHeaderData(const void* Data, uint64_t DataBytes) { ZEN_ASSERT(m_CurrentState != State::kReadingBuffers); switch (m_CurrentState) { case State::kInitialState: ZEN_ASSERT(Data == nullptr); m_CurrentState = State::kReadingHeader; return sizeof m_PackageHeader; case State::kReadingHeader: ZEN_ASSERT(DataBytes == sizeof m_PackageHeader); memcpy(&m_PackageHeader, Data, sizeof m_PackageHeader); ZEN_ASSERT(m_PackageHeader.HeaderMagic == kCbPkgMagic); m_CurrentState = State::kReadingAttachmentEntries; m_AttachmentEntries.resize(m_PackageHeader.AttachmentCount + 1); return (m_PackageHeader.AttachmentCount + 1) * sizeof(CbAttachmentEntry); case State::kReadingAttachmentEntries: ZEN_ASSERT(DataBytes == ((m_PackageHeader.AttachmentCount + 1) * sizeof(CbAttachmentEntry))); memcpy(m_AttachmentEntries.data(), Data, DataBytes); for (CbAttachmentEntry& Entry : m_AttachmentEntries) { // This preallocates memory for payloads but note that for the local references // the caller will need to handle the payload differently (i.e it's a // CbAttachmentReferenceHeader not the actual payload) m_PayloadBuffers.push_back(IoBuffer{Entry.PayloadSize}); } m_CurrentState = State::kReadingBuffers; return 0; default: ZEN_ASSERT(false); return 0; } } IoBuffer CbPackageReader::MarshalLocalChunkReference(IoBuffer AttachmentBuffer) { // Marshal local reference - a "pointer" to the chunk backing file ZEN_ASSERT(AttachmentBuffer.Size() >= sizeof(CbAttachmentReferenceHeader)); const CbAttachmentReferenceHeader* AttachRefHdr = AttachmentBuffer.Data(); const char8_t* PathPointer = reinterpret_cast(AttachRefHdr + 1); ZEN_ASSERT(AttachmentBuffer.Size() >= (sizeof(CbAttachmentReferenceHeader) + AttachRefHdr->AbsolutePathLength)); std::u8string_view PathView{PathPointer, AttachRefHdr->AbsolutePathLength}; std::filesystem::path Path{PathView}; IoBuffer ChunkReference = IoBufferBuilder::MakeFromFile(Path, AttachRefHdr->PayloadByteOffset, AttachRefHdr->PayloadByteSize); if (!ChunkReference) { // Unable to open chunk reference throw std::runtime_error(fmt::format("unable to resolve local reference to '{}' (offset {}, size {})", PathToUtf8(Path), AttachRefHdr->PayloadByteOffset, AttachRefHdr->PayloadByteSize)); } return ChunkReference; }; void CbPackageReader::Finalize() { if (m_AttachmentEntries.empty()) { return; } m_Attachments.reserve(m_AttachmentEntries.size() - 1); int CurrentAttachmentIndex = 0; for (CbAttachmentEntry& Entry : m_AttachmentEntries) { IoBuffer AttachmentBuffer = m_PayloadBuffers[CurrentAttachmentIndex]; if (CurrentAttachmentIndex == 0) { // Root object if (Entry.Flags & CbAttachmentEntry::kIsObject) { if (Entry.Flags & CbAttachmentEntry::kIsLocalRef) { m_RootObject = LoadCompactBinaryObject(MarshalLocalChunkReference(AttachmentBuffer)); } else if (Entry.Flags & CbAttachmentEntry::kIsCompressed) { m_RootObject = LoadCompactBinaryObject(CompressedBuffer::FromCompressed(SharedBuffer(AttachmentBuffer))); } else { m_RootObject = LoadCompactBinaryObject(std::move(AttachmentBuffer)); } } else { throw std::runtime_error("missing or invalid root object"); } } else if (Entry.Flags & CbAttachmentEntry::kIsLocalRef) { IoBuffer ChunkReference = MarshalLocalChunkReference(AttachmentBuffer); if (Entry.Flags & CbAttachmentEntry::kIsCompressed) { m_Attachments.push_back(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(ChunkReference)))); } else { m_Attachments.push_back(CbAttachment( CompressedBuffer::Compress(SharedBuffer(ChunkReference), OodleCompressor::NotSet, OodleCompressionLevel::None))); } } ++CurrentAttachmentIndex; } } /** ______________________ _____________________________ \__ ___/\_ _____// _____/\__ ___/ _____/ | | | __)_ \_____ \ | | \_____ \ | | | \/ \ | | / \ |____| /_______ /_______ / |____| /_______ / \/ \/ \/ */ #if ZEN_WITH_TESTS TEST_CASE("CbPackage.Serialization") { // Make a test package CbAttachment Attach1{SharedBuffer::MakeView(MakeMemoryView("abcd"))}; CbAttachment Attach2{SharedBuffer::MakeView(MakeMemoryView("efgh"))}; CbObjectWriter Cbo; Cbo.AddAttachment("abcd", Attach1); Cbo.AddAttachment("efgh", Attach2); CbPackage Pkg; Pkg.AddAttachment(Attach1); Pkg.AddAttachment(Attach2); Pkg.SetObject(Cbo.Save()); SharedBuffer Buffer = FormatPackageMessageBuffer(Pkg).Flatten(); const uint8_t* CursorPtr = reinterpret_cast(Buffer.GetData()); uint64_t RemainingBytes = Buffer.GetSize(); auto ConsumeBytes = [&](uint64_t ByteCount) { ZEN_ASSERT(ByteCount <= RemainingBytes); void* ReturnPtr = (void*)CursorPtr; CursorPtr += ByteCount; RemainingBytes -= ByteCount; return ReturnPtr; }; auto CopyBytes = [&](void* TargetBuffer, uint64_t ByteCount) { ZEN_ASSERT(ByteCount <= RemainingBytes); memcpy(TargetBuffer, CursorPtr, ByteCount); CursorPtr += ByteCount; RemainingBytes -= ByteCount; }; CbPackageReader Reader; uint64_t InitialRead = Reader.ProcessPackageHeaderData(nullptr, 0); uint64_t NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(InitialRead), InitialRead); NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(NextBytes), NextBytes); auto Buffers = Reader.GetPayloadBuffers(); for (auto& PayloadBuffer : Buffers) { CopyBytes(PayloadBuffer.MutableData(), PayloadBuffer.GetSize()); } Reader.Finalize(); } TEST_CASE("CbPackage.LocalRef") { ScopedTemporaryDirectory TempDir; auto Path1 = TempDir.Path() / "abcd"; auto Path2 = TempDir.Path() / "efgh"; { IoBuffer Buffer1 = IoBufferBuilder::MakeCloneFromMemory(MakeMemoryView("abcd")); IoBuffer Buffer2 = IoBufferBuilder::MakeCloneFromMemory(MakeMemoryView("efgh")); WriteFile(Path1, Buffer1); WriteFile(Path2, Buffer2); } // Make a test package IoBuffer FileBuffer1 = IoBufferBuilder::MakeFromFile(Path1); IoBuffer FileBuffer2 = IoBufferBuilder::MakeFromFile(Path2); CbAttachment Attach1{SharedBuffer(FileBuffer1)}; CbAttachment Attach2{SharedBuffer(FileBuffer2)}; CbObjectWriter Cbo; Cbo.AddAttachment("abcd", Attach1); Cbo.AddAttachment("efgh", Attach2); CbPackage Pkg; Pkg.AddAttachment(Attach1); Pkg.AddAttachment(Attach2); Pkg.SetObject(Cbo.Save()); SharedBuffer Buffer = FormatPackageMessageBuffer(Pkg, FormatFlags::kAllowLocalReferences).Flatten(); const uint8_t* CursorPtr = reinterpret_cast(Buffer.GetData()); uint64_t RemainingBytes = Buffer.GetSize(); auto ConsumeBytes = [&](uint64_t ByteCount) { ZEN_ASSERT(ByteCount <= RemainingBytes); void* ReturnPtr = (void*)CursorPtr; CursorPtr += ByteCount; RemainingBytes -= ByteCount; return ReturnPtr; }; auto CopyBytes = [&](void* TargetBuffer, uint64_t ByteCount) { ZEN_ASSERT(ByteCount <= RemainingBytes); memcpy(TargetBuffer, CursorPtr, ByteCount); CursorPtr += ByteCount; RemainingBytes -= ByteCount; }; CbPackageReader Reader; uint64_t InitialRead = Reader.ProcessPackageHeaderData(nullptr, 0); uint64_t NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(InitialRead), InitialRead); NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(NextBytes), NextBytes); auto Buffers = Reader.GetPayloadBuffers(); for (auto& PayloadBuffer : Buffers) { CopyBytes(PayloadBuffer.MutableData(), PayloadBuffer.GetSize()); } Reader.Finalize(); } void forcelink_httpshared() { } #endif } // namespace zen