// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #include #include #include namespace zen { CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data) { std::vector Message = FormatPackageMessage(Data); std::vector Buffers; for (IoBuffer& Buf : Message) { Buffers.push_back(SharedBuffer(Buf)); } return CompositeBuffer(std::move(Buffers)); } std::vector FormatPackageMessage(const CbPackage& Data) { const std::span& Attachments = Data.GetAttachments(); std::vector ResponseBuffers; ResponseBuffers.reserve(3 + Attachments.size()); // TODO: may want to use an additional fudge factor here to avoid growing since each // attachment is likely to consist of several buffers uint64_t TotalAttachmentsSize = 0; // Fixed size header CbPackageHeader Hdr{.HeaderMagic = kCbPkgMagic, .AttachmentCount = gsl::narrow(Attachments.size())}; ResponseBuffers.push_back(IoBufferBuilder::MakeCloneFromMemory(&Hdr, sizeof Hdr)); // Attachment metadata array IoBuffer AttachmentMetadataBuffer = IoBuffer{sizeof(CbAttachmentEntry) * (Attachments.size() + /* root */ 1)}; CbAttachmentEntry* AttachmentInfo = reinterpret_cast(AttachmentMetadataBuffer.MutableData()); ResponseBuffers.push_back(AttachmentMetadataBuffer); // Attachment metadata // Root object IoBuffer RootIoBuffer = Data.GetObject().GetBuffer().AsIoBuffer(); ResponseBuffers.push_back(RootIoBuffer); // Root object *AttachmentInfo++ = {.PayloadSize = RootIoBuffer.Size(), .Flags = CbAttachmentEntry::kIsObject, .AttachmentHash = Data.GetObjectHash()}; // Attachment payloads for (const CbAttachment& Attachment : Attachments) { if (Attachment.IsNull()) { ZEN_NOT_IMPLEMENTED("Null attachments are not supported"); } else if (CompressedBuffer AttachmentBuffer = Attachment.AsCompressedBinary()) { CompositeBuffer Compressed = AttachmentBuffer.GetCompressed(); *AttachmentInfo++ = {.PayloadSize = AttachmentBuffer.GetCompressedSize(), .Flags = CbAttachmentEntry::kIsCompressed, .AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash())}; for (const SharedBuffer& Segment : Compressed.GetSegments()) { ResponseBuffers.push_back(Segment.AsIoBuffer()); TotalAttachmentsSize += Segment.GetSize(); } } else if (CbObject AttachmentObject = Attachment.AsObject()) { IoBuffer ObjIoBuffer = AttachmentObject.GetBuffer().AsIoBuffer(); ResponseBuffers.push_back(ObjIoBuffer); *AttachmentInfo++ = {.PayloadSize = ObjIoBuffer.Size(), .Flags = CbAttachmentEntry::kIsObject, .AttachmentHash = Attachment.GetHash()}; } else if (CompositeBuffer AttachmentBinary = Attachment.AsCompositeBinary()) { *AttachmentInfo++ = {.PayloadSize = AttachmentBinary.GetSize(), .Flags = 0, .AttachmentHash = Attachment.GetHash()}; for (const SharedBuffer& Segment : AttachmentBinary.GetSegments()) { ResponseBuffers.push_back(Segment.AsIoBuffer()); TotalAttachmentsSize += Segment.GetSize(); } } else { ZEN_NOT_IMPLEMENTED("Unknown attachment kind"); } } return ResponseBuffers; } CbPackage ParsePackageMessage(IoBuffer Payload, std::function CreateBuffer) { if (!Payload) { return {}; } BinaryReader Reader(Payload); CbPackageHeader Hdr; Reader.Read(&Hdr, sizeof Hdr); if (Hdr.HeaderMagic != kCbPkgMagic) { throw std::runtime_error("invalid CbPackage header magic"); } const uint32_t ChunkCount = Hdr.AttachmentCount + 1; std::unique_ptr AttachmentEntries{new CbAttachmentEntry[ChunkCount]}; Reader.Read(AttachmentEntries.get(), sizeof(CbAttachmentEntry) * ChunkCount); CbPackage Package; for (uint32_t i = 0; i < ChunkCount; ++i) { const CbAttachmentEntry& Entry = AttachmentEntries[i]; const uint64_t AttachmentSize = Entry.PayloadSize; IoBuffer AttachmentBuffer = CreateBuffer(Entry.AttachmentHash, AttachmentSize); ZEN_ASSERT(AttachmentBuffer); ZEN_ASSERT(AttachmentBuffer.Size() == AttachmentSize); Reader.Read(AttachmentBuffer.MutableData(), AttachmentSize); if (Entry.Flags & CbAttachmentEntry::kIsCompressed) { CompressedBuffer CompBuf(CompressedBuffer::FromCompressed(SharedBuffer(AttachmentBuffer))); if (Entry.Flags & CbAttachmentEntry::kIsObject) { if (i == 0) { Package.SetObject(LoadCompactBinaryObject(std::move(CompBuf))); } else { ZEN_NOT_IMPLEMENTED("Object attachments are not currently supported"); } } else { CbAttachment Attachment(std::move(CompBuf)); Package.AddAttachment(Attachment); } } else /* not compressed */ { if (Entry.Flags & CbAttachmentEntry::kIsObject) { if (i == 0) { Package.SetObject(LoadCompactBinaryObject(AttachmentBuffer)); } else { ZEN_NOT_IMPLEMENTED("Object attachments are not currently supported"); } } else { CbAttachment Attachment(SharedBuffer{AttachmentBuffer}); Package.AddAttachment(Attachment); } } } return Package; } CbPackageReader::CbPackageReader() : m_CreateBuffer([](const IoHash&, uint64_t Size) -> IoBuffer { return IoBuffer{Size}; }) { } CbPackageReader::~CbPackageReader() { } void CbPackageReader::SetPayloadBufferCreator(std::function CreateBuffer) { m_CreateBuffer = CreateBuffer; } /** Process data */ uint64_t CbPackageReader::ProcessHeaderData(const void* Data, uint64_t DataBytes) { ZEN_ASSERT(m_CurrentState != State::kReadingBuffers); switch (m_CurrentState) { case State::kInitialState: ZEN_ASSERT(Data == nullptr); m_CurrentState = State::kReadingHeader; return sizeof m_PackageHeader; case State::kReadingHeader: ZEN_ASSERT(DataBytes == sizeof m_PackageHeader); memcpy(&m_PackageHeader, Data, sizeof m_PackageHeader); ZEN_ASSERT(m_PackageHeader.HeaderMagic == kCbPkgMagic); m_CurrentState = State::kReadingAttachmentEntries; m_AttachmentEntries.resize(m_PackageHeader.AttachmentCount + 1); return (m_PackageHeader.AttachmentCount + 1) * sizeof(CbAttachmentEntry); case State::kReadingAttachmentEntries: ZEN_ASSERT(DataBytes == ((m_PackageHeader.AttachmentCount + 1) * sizeof(CbAttachmentEntry))); memcpy(m_AttachmentEntries.data(), Data, DataBytes); for (CbAttachmentEntry& Entry : m_AttachmentEntries) { m_PayloadBuffers.push_back(IoBuffer{Entry.PayloadSize}); } m_CurrentState = State::kReadingBuffers; return 0; default: ZEN_ASSERT(false); return 0; } } /** ______________________ _____________________________ \__ ___/\_ _____// _____/\__ ___/ _____/ | | | __)_ \_____ \ | | \_____ \ | | | \/ \ | | / \ |____| /_______ /_______ / |____| /_______ / \/ \/ \/ */ #if ZEN_WITH_TESTS TEST_CASE("CbPackage.Serialization") { // Make a test package CbAttachment Attach1{SharedBuffer::MakeView(MakeMemoryView("abcd"))}; CbAttachment Attach2{SharedBuffer::MakeView(MakeMemoryView("efgh"))}; CbObjectWriter Cbo; Cbo.AddAttachment("abcd", Attach1); Cbo.AddAttachment("efgh", Attach2); CbPackage Pkg; Pkg.AddAttachment(Attach1); Pkg.AddAttachment(Attach2); Pkg.SetObject(Cbo.Save()); SharedBuffer Buffer = FormatPackageMessageBuffer(Pkg).Flatten(); const uint8_t* CursorPtr = reinterpret_cast(Buffer.GetData()); uint64_t RemainingBytes = Buffer.GetSize(); auto ConsumeBytes = [&](uint64_t ByteCount) { ZEN_ASSERT(ByteCount <= RemainingBytes); void* ReturnPtr = (void*)CursorPtr; CursorPtr += ByteCount; RemainingBytes -= ByteCount; return ReturnPtr; }; auto CopyBytes = [&](void* TargetBuffer, uint64_t ByteCount) { ZEN_ASSERT(ByteCount <= RemainingBytes); memcpy(TargetBuffer, CursorPtr, ByteCount); CursorPtr += ByteCount; RemainingBytes -= ByteCount; }; CbPackageReader Reader; uint64_t InitialRead = Reader.ProcessHeaderData(nullptr, 0); uint64_t NextBytes = Reader.ProcessHeaderData(ConsumeBytes(InitialRead), InitialRead); NextBytes = Reader.ProcessHeaderData(ConsumeBytes(NextBytes), NextBytes); auto Buffers = Reader.GetPayloadBuffers(); for (auto& PayloadBuffer : Buffers) { CopyBytes(PayloadBuffer.MutableData(), PayloadBuffer.GetSize()); } } void forcelink_httpshared() { } #endif } // namespace zen