From db8e1c167801fb5ff8d96d0fba7dee3c869e61fa Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Fri, 10 Jun 2022 15:42:53 +0200 Subject: cbpackage: added initial support for marshaling of attachment by local reference this mode allows local clients to avoid unnecessary copying of data from zen and instead reference data directly --- zenhttp/httpshared.cpp | 348 +++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 326 insertions(+), 22 deletions(-) (limited to 'zenhttp/httpshared.cpp') diff --git a/zenhttp/httpshared.cpp b/zenhttp/httpshared.cpp index f2ce17e16..7b92a0587 100644 --- a/zenhttp/httpshared.cpp +++ b/zenhttp/httpshared.cpp @@ -5,20 +5,35 @@ #include #include #include +#include #include #include #include #include +#include + +#include #include #include namespace zen { +std::vector +FormatPackageMessage(const CbPackage& Data) +{ + return FormatPackageMessage(Data, FormatFlags::kDefault); +} CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data) { - std::vector Message = FormatPackageMessage(Data); + return FormatPackageMessageBuffer(Data, FormatFlags::kDefault); +} + +CompositeBuffer +FormatPackageMessageBuffer(const CbPackage& Data, FormatFlags Flags) +{ + std::vector Message = FormatPackageMessage(Data, Flags); std::vector Buffers; @@ -31,11 +46,11 @@ FormatPackageMessageBuffer(const CbPackage& Data) } std::vector -FormatPackageMessage(const CbPackage& Data) +FormatPackageMessage(const CbPackage& Data, FormatFlags Flags) { const std::span& Attachments = Data.GetAttachments(); + std::vector ResponseBuffers; - std::vector ResponseBuffers; ResponseBuffers.reserve(3 + Attachments.size()); // TODO: may want to use an additional fudge factor here to avoid growing since each // attachment is likely to consist of several buffers @@ -47,9 +62,8 @@ FormatPackageMessage(const CbPackage& Data) // Attachment metadata array - IoBuffer AttachmentMetadataBuffer = IoBuffer{sizeof(CbAttachmentEntry) * (Attachments.size() + /* root */ 1)}; - - CbAttachmentEntry* AttachmentInfo = reinterpret_cast(AttachmentMetadataBuffer.MutableData()); + IoBuffer AttachmentMetadataBuffer = IoBuffer{sizeof(CbAttachmentEntry) * (Attachments.size() + /* root */ 1)}; + CbAttachmentEntry* AttachmentInfo = reinterpret_cast(AttachmentMetadataBuffer.MutableData()); ResponseBuffers.push_back(AttachmentMetadataBuffer); // Attachment metadata @@ -62,6 +76,44 @@ FormatPackageMessage(const CbPackage& Data) // Attachment payloads + auto MarshalLocal = [&AttachmentInfo, &ResponseBuffers](const std::string& Path8, + CbAttachmentReferenceHeader& LocalRef, + const IoHash& AttachmentHash, + bool IsCompressed) { + IoBuffer RefBuffer = IoBuffer(sizeof CbAttachmentReferenceHeader + Path8.size()); + + CbAttachmentReferenceHeader* RefHdr = RefBuffer.MutableData(); + *RefHdr++ = LocalRef; + memcpy(RefHdr, Path8.data(), Path8.size()); + + *AttachmentInfo++ = {.PayloadSize = RefBuffer.GetSize(), + .Flags = (IsCompressed ? CbAttachmentEntry::kIsCompressed : 0u) | CbAttachmentEntry::kIsLocalRef, + .AttachmentHash = AttachmentHash}; + + ResponseBuffers.push_back(std::move(RefBuffer)); + }; + + auto IsLocalRef = [](const CompositeBuffer& AttachmentBinary, CbAttachmentReferenceHeader& LocalRef, std::string& Path8) -> bool { + const SharedBuffer& Segment = AttachmentBinary.GetSegments().front(); + IoBufferFileReference Ref; + const IoBuffer& SegmentBuffer = Segment.AsIoBuffer(); + + if (!SegmentBuffer.GetFileReference(Ref)) + { + return false; + } + + ExtendablePathBuilder<256> LocalRefFile; + LocalRefFile.Append(std::filesystem::absolute(PathFromHandle(Ref.FileHandle))); + Path8 = LocalRefFile.ToUtf8(); + + LocalRef.AbsolutePathLength = gsl::narrow(Path8.size()); + LocalRef.PayloadByteOffset = Ref.FileChunkOffset; + LocalRef.PayloadByteSize = Ref.FileChunkSize; + + return true; + }; + for (const CbAttachment& Attachment : Attachments) { if (Attachment.IsNull()) @@ -70,15 +122,39 @@ FormatPackageMessage(const CbPackage& Data) } else if (CompressedBuffer AttachmentBuffer = Attachment.AsCompressedBinary()) { - CompositeBuffer Compressed = AttachmentBuffer.GetCompressed(); + CompositeBuffer Compressed = AttachmentBuffer.GetCompressed(); + IoHash AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash()); + + // If the data is either not backed by a file, or there are multiple + // fragments then we cannot marshal it by local reference. We might + // want/need to extend this in the future to allow multiple chunk + // segments to be marshaled at once - *AttachmentInfo++ = {.PayloadSize = AttachmentBuffer.GetCompressedSize(), - .Flags = CbAttachmentEntry::kIsCompressed, - .AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash())}; + bool MarshalByLocalRef = EnumHasAllFlags(Flags, FormatFlags::kAllowLocalReferences) && (Compressed.GetSegments().size() == 1); - for (const SharedBuffer& Segment : Compressed.GetSegments()) + CbAttachmentReferenceHeader LocalRef; + std::string Path8; + + if (MarshalByLocalRef) { - ResponseBuffers.push_back(Segment.AsIoBuffer()); + MarshalByLocalRef = IsLocalRef(Compressed, LocalRef, Path8); + } + + if (MarshalByLocalRef) + { + const bool IsCompressed = true; + MarshalLocal(Path8, LocalRef, AttachmentHash, IsCompressed); + } + else + { + *AttachmentInfo++ = {.PayloadSize = AttachmentBuffer.GetCompressedSize(), + .Flags = CbAttachmentEntry::kIsCompressed, + .AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash())}; + + for (const SharedBuffer& Segment : Compressed.GetSegments()) + { + ResponseBuffers.push_back(Segment.AsIoBuffer()); + } } } else if (CbObject AttachmentObject = Attachment.AsObject()) @@ -92,11 +168,31 @@ FormatPackageMessage(const CbPackage& Data) } else if (CompositeBuffer AttachmentBinary = Attachment.AsCompositeBinary()) { - *AttachmentInfo++ = {.PayloadSize = AttachmentBinary.GetSize(), .Flags = 0, .AttachmentHash = Attachment.GetHash()}; + IoHash AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash()); + bool MarshalByLocalRef = + EnumHasAllFlags(Flags, FormatFlags::kAllowLocalReferences) && (AttachmentBinary.GetSegments().size() == 1); + + CbAttachmentReferenceHeader LocalRef; + std::string Path8; + + if (MarshalByLocalRef) + { + MarshalByLocalRef = IsLocalRef(AttachmentBinary, LocalRef, Path8); + } - for (const SharedBuffer& Segment : AttachmentBinary.GetSegments()) + if (MarshalByLocalRef) { - ResponseBuffers.push_back(Segment.AsIoBuffer()); + const bool IsCompressed = false; + MarshalLocal(Path8, LocalRef, AttachmentHash, IsCompressed); + } + else + { + *AttachmentInfo++ = {.PayloadSize = AttachmentBinary.GetSize(), .Flags = 0, .AttachmentHash = Attachment.GetHash()}; + + for (const SharedBuffer& Segment : AttachmentBinary.GetSegments()) + { + ResponseBuffers.push_back(Segment.AsIoBuffer()); + } } } else @@ -108,6 +204,27 @@ FormatPackageMessage(const CbPackage& Data) return ResponseBuffers; } +bool +IsPackageMessage(IoBuffer Payload) +{ + if (!Payload) + { + return false; + } + + BinaryReader Reader(Payload); + + CbPackageHeader Hdr; + Reader.Read(&Hdr, sizeof Hdr); + + if (Hdr.HeaderMagic != kCbPkgMagic) + { + return false; + } + + return true; +} + CbPackage ParsePackageMessage(IoBuffer Payload, std::function CreateBuffer) { @@ -145,7 +262,38 @@ ParsePackageMessage(IoBuffer Payload, std::function= sizeof(CbAttachmentReferenceHeader)); + + const CbAttachmentReferenceHeader* AttachRefHdr = AttachmentBuffer.Data(); + const char8_t* PathPointer = reinterpret_cast(AttachRefHdr + 1); + + ZEN_ASSERT(AttachmentBuffer.Size() >= (sizeof(CbAttachmentReferenceHeader) + AttachRefHdr->AbsolutePathLength)); + + std::filesystem::path Path{PathPointer}; + + if (IoBuffer ChunkReference = + IoBufferBuilder::MakeFromFile(Path, AttachRefHdr->PayloadByteOffset, AttachRefHdr->PayloadByteSize)) + { + CompressedBuffer CompBuf(CompressedBuffer::FromCompressed(SharedBuffer(ChunkReference))); + CbAttachment Attachment(std::move(CompBuf)); + Package.AddAttachment(Attachment); + } + else + { + // Unable to open chunk reference + + throw std::runtime_error(fmt::format("unable to resolve chunk #{} at '{}' (offset {}, size {})", + i, + PathToUtf8(Path), + AttachRefHdr->PayloadByteOffset, + AttachRefHdr->PayloadByteSize)); + } + } + else if (Entry.Flags & CbAttachmentEntry::kIsCompressed) { CompressedBuffer CompBuf(CompressedBuffer::FromCompressed(SharedBuffer(AttachmentBuffer))); @@ -153,6 +301,7 @@ ParsePackageMessage(IoBuffer Payload, std::function= sizeof(CbAttachmentReferenceHeader)); + + const CbAttachmentReferenceHeader* AttachRefHdr = AttachmentBuffer.Data(); + const char8_t* PathPointer = reinterpret_cast(AttachRefHdr + 1); + + ZEN_ASSERT(AttachmentBuffer.Size() >= (sizeof(CbAttachmentReferenceHeader) + AttachRefHdr->AbsolutePathLength)); + + std::filesystem::path Path{PathPointer}; + + IoBuffer ChunkReference = IoBufferBuilder::MakeFromFile(Path, AttachRefHdr->PayloadByteOffset, AttachRefHdr->PayloadByteSize); + + if (!ChunkReference) + { + // Unable to open chunk reference + + throw std::runtime_error(fmt::format("unable to resolve local reference to '{}' (offset {}, size {})", + PathToUtf8(Path), + AttachRefHdr->PayloadByteOffset, + AttachRefHdr->PayloadByteSize)); + } + + return ChunkReference; +}; + +void +CbPackageReader::Finalize() +{ + if (m_AttachmentEntries.empty()) + { + return; + } + + m_Attachments.reserve(m_AttachmentEntries.size() - 1); + + int CurrentAttachmentIndex = 0; + for (CbAttachmentEntry& Entry : m_AttachmentEntries) + { + IoBuffer AttachmentBuffer = m_PayloadBuffers[CurrentAttachmentIndex]; + + if (CurrentAttachmentIndex == 0) + { + // Root object + if (Entry.Flags & CbAttachmentEntry::kIsObject) + { + if (Entry.Flags & CbAttachmentEntry::kIsLocalRef) + { + m_RootObject = LoadCompactBinaryObject(MarshalLocalChunkReference(AttachmentBuffer)); + } + else if (Entry.Flags & CbAttachmentEntry::kIsCompressed) + { + m_RootObject = LoadCompactBinaryObject(CompressedBuffer::FromCompressed(SharedBuffer(AttachmentBuffer))); + } + else + { + m_RootObject = LoadCompactBinaryObject(std::move(AttachmentBuffer)); + } + } + else + { + throw std::runtime_error("missing or invalid root object"); + } + } + else if (Entry.Flags & CbAttachmentEntry::kIsLocalRef) + { + IoBuffer ChunkReference = MarshalLocalChunkReference(AttachmentBuffer); + + if (Entry.Flags & CbAttachmentEntry::kIsCompressed) + { + m_Attachments.push_back(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(ChunkReference)))); + } + else + { + m_Attachments.push_back(CbAttachment( + CompressedBuffer::Compress(SharedBuffer(ChunkReference), OodleCompressor::NotSet, OodleCompressionLevel::None))); + } + } + + ++CurrentAttachmentIndex; + } +} + /** ______________________ _____________________________ \__ ___/\_ _____// _____/\__ ___/ _____/ @@ -291,15 +528,82 @@ TEST_CASE("CbPackage.Serialization") }; CbPackageReader Reader; - uint64_t InitialRead = Reader.ProcessHeaderData(nullptr, 0); - uint64_t NextBytes = Reader.ProcessHeaderData(ConsumeBytes(InitialRead), InitialRead); - NextBytes = Reader.ProcessHeaderData(ConsumeBytes(NextBytes), NextBytes); + uint64_t InitialRead = Reader.ProcessPackageHeaderData(nullptr, 0); + uint64_t NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(InitialRead), InitialRead); + NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(NextBytes), NextBytes); + auto Buffers = Reader.GetPayloadBuffers(); + + for (auto& PayloadBuffer : Buffers) + { + CopyBytes(PayloadBuffer.MutableData(), PayloadBuffer.GetSize()); + } + + Reader.Finalize(); +} + +TEST_CASE("CbPackage.LocalRef") +{ + ScopedTemporaryDirectory TempDir; + + auto Path1 = TempDir.Path() / "abcd"; + auto Path2 = TempDir.Path() / "efgh"; + + { + IoBuffer Buffer1 = IoBufferBuilder::MakeCloneFromMemory(MakeMemoryView("abcd")); + IoBuffer Buffer2 = IoBufferBuilder::MakeCloneFromMemory(MakeMemoryView("efgh")); + + WriteFile(Path1, Buffer1); + WriteFile(Path2, Buffer2); + } + + // Make a test package + + IoBuffer FileBuffer1 = IoBufferBuilder::MakeFromFile(Path1); + IoBuffer FileBuffer2 = IoBufferBuilder::MakeFromFile(Path2); + + CbAttachment Attach1{SharedBuffer(FileBuffer1)}; + CbAttachment Attach2{SharedBuffer(FileBuffer2)}; + + CbObjectWriter Cbo; + Cbo.AddAttachment("abcd", Attach1); + Cbo.AddAttachment("efgh", Attach2); + + CbPackage Pkg; + Pkg.AddAttachment(Attach1); + Pkg.AddAttachment(Attach2); + Pkg.SetObject(Cbo.Save()); + + SharedBuffer Buffer = FormatPackageMessageBuffer(Pkg, FormatFlags::kAllowLocalReferences).Flatten(); + const uint8_t* CursorPtr = reinterpret_cast(Buffer.GetData()); + uint64_t RemainingBytes = Buffer.GetSize(); + + auto ConsumeBytes = [&](uint64_t ByteCount) { + ZEN_ASSERT(ByteCount <= RemainingBytes); + void* ReturnPtr = (void*)CursorPtr; + CursorPtr += ByteCount; + RemainingBytes -= ByteCount; + return ReturnPtr; + }; + + auto CopyBytes = [&](void* TargetBuffer, uint64_t ByteCount) { + ZEN_ASSERT(ByteCount <= RemainingBytes); + memcpy(TargetBuffer, CursorPtr, ByteCount); + CursorPtr += ByteCount; + RemainingBytes -= ByteCount; + }; + + CbPackageReader Reader; + uint64_t InitialRead = Reader.ProcessPackageHeaderData(nullptr, 0); + uint64_t NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(InitialRead), InitialRead); + NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(NextBytes), NextBytes); auto Buffers = Reader.GetPayloadBuffers(); for (auto& PayloadBuffer : Buffers) { CopyBytes(PayloadBuffer.MutableData(), PayloadBuffer.GetSize()); } + + Reader.Finalize(); } void -- cgit v1.2.3