diff options
| author | Stefan Boberg <[email protected]> | 2022-06-10 15:42:53 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2022-06-10 15:42:53 +0200 |
| commit | db8e1c167801fb5ff8d96d0fba7dee3c869e61fa (patch) | |
| tree | 1d9b41b49affcb79cec337802c1614c0af2a6ac5 /zenhttp/httpshared.cpp | |
| parent | cidstore: propagate the correct content type (diff) | |
| download | zen-db8e1c167801fb5ff8d96d0fba7dee3c869e61fa.tar.xz zen-db8e1c167801fb5ff8d96d0fba7dee3c869e61fa.zip | |
cbpackage: added initial support for marshaling of attachment by local reference
this mode allows local clients to avoid unnecessary copying of data from zen and instead reference data directly
Diffstat (limited to 'zenhttp/httpshared.cpp')
| -rw-r--r-- | zenhttp/httpshared.cpp | 348 |
1 files changed, 326 insertions, 22 deletions
diff --git a/zenhttp/httpshared.cpp b/zenhttp/httpshared.cpp index f2ce17e16..7b92a0587 100644 --- a/zenhttp/httpshared.cpp +++ b/zenhttp/httpshared.cpp @@ -5,20 +5,35 @@ #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> #include <zencore/compositebuffer.h> +#include <zencore/filesystem.h> #include <zencore/iobuffer.h> #include <zencore/iohash.h> #include <zencore/stream.h> #include <zencore/testing.h> +#include <zencore/testutils.h> + +#include <zencore/fmtutils.h> #include <span> #include <vector> namespace zen { +std::vector<IoBuffer> +FormatPackageMessage(const CbPackage& Data) +{ + return FormatPackageMessage(Data, FormatFlags::kDefault); +} CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data) { - std::vector<IoBuffer> Message = FormatPackageMessage(Data); + return FormatPackageMessageBuffer(Data, FormatFlags::kDefault); +} + +CompositeBuffer +FormatPackageMessageBuffer(const CbPackage& Data, FormatFlags Flags) +{ + std::vector<IoBuffer> Message = FormatPackageMessage(Data, Flags); std::vector<SharedBuffer> Buffers; @@ -31,11 +46,11 @@ FormatPackageMessageBuffer(const CbPackage& Data) } std::vector<IoBuffer> -FormatPackageMessage(const CbPackage& Data) +FormatPackageMessage(const CbPackage& Data, FormatFlags Flags) { const std::span<const CbAttachment>& Attachments = Data.GetAttachments(); + std::vector<IoBuffer> ResponseBuffers; - std::vector<IoBuffer> ResponseBuffers; ResponseBuffers.reserve(3 + Attachments.size()); // TODO: may want to use an additional fudge factor here to avoid growing since each // attachment is likely to consist of several buffers @@ -47,9 +62,8 @@ FormatPackageMessage(const CbPackage& Data) // Attachment metadata array - IoBuffer AttachmentMetadataBuffer = IoBuffer{sizeof(CbAttachmentEntry) * (Attachments.size() + /* root */ 1)}; - - CbAttachmentEntry* AttachmentInfo = reinterpret_cast<CbAttachmentEntry*>(AttachmentMetadataBuffer.MutableData()); + IoBuffer AttachmentMetadataBuffer = IoBuffer{sizeof(CbAttachmentEntry) * (Attachments.size() + /* root */ 1)}; + CbAttachmentEntry* AttachmentInfo = reinterpret_cast<CbAttachmentEntry*>(AttachmentMetadataBuffer.MutableData()); ResponseBuffers.push_back(AttachmentMetadataBuffer); // Attachment metadata @@ -62,6 +76,44 @@ FormatPackageMessage(const CbPackage& Data) // Attachment payloads + auto MarshalLocal = [&AttachmentInfo, &ResponseBuffers](const std::string& Path8, + CbAttachmentReferenceHeader& LocalRef, + const IoHash& AttachmentHash, + bool IsCompressed) { + IoBuffer RefBuffer = IoBuffer(sizeof CbAttachmentReferenceHeader + Path8.size()); + + CbAttachmentReferenceHeader* RefHdr = RefBuffer.MutableData<CbAttachmentReferenceHeader>(); + *RefHdr++ = LocalRef; + memcpy(RefHdr, Path8.data(), Path8.size()); + + *AttachmentInfo++ = {.PayloadSize = RefBuffer.GetSize(), + .Flags = (IsCompressed ? CbAttachmentEntry::kIsCompressed : 0u) | CbAttachmentEntry::kIsLocalRef, + .AttachmentHash = AttachmentHash}; + + ResponseBuffers.push_back(std::move(RefBuffer)); + }; + + auto IsLocalRef = [](const CompositeBuffer& AttachmentBinary, CbAttachmentReferenceHeader& LocalRef, std::string& Path8) -> bool { + const SharedBuffer& Segment = AttachmentBinary.GetSegments().front(); + IoBufferFileReference Ref; + const IoBuffer& SegmentBuffer = Segment.AsIoBuffer(); + + if (!SegmentBuffer.GetFileReference(Ref)) + { + return false; + } + + ExtendablePathBuilder<256> LocalRefFile; + LocalRefFile.Append(std::filesystem::absolute(PathFromHandle(Ref.FileHandle))); + Path8 = LocalRefFile.ToUtf8(); + + LocalRef.AbsolutePathLength = gsl::narrow<uint16_t>(Path8.size()); + LocalRef.PayloadByteOffset = Ref.FileChunkOffset; + LocalRef.PayloadByteSize = Ref.FileChunkSize; + + return true; + }; + for (const CbAttachment& Attachment : Attachments) { if (Attachment.IsNull()) @@ -70,15 +122,39 @@ FormatPackageMessage(const CbPackage& Data) } else if (CompressedBuffer AttachmentBuffer = Attachment.AsCompressedBinary()) { - CompositeBuffer Compressed = AttachmentBuffer.GetCompressed(); + CompositeBuffer Compressed = AttachmentBuffer.GetCompressed(); + IoHash AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash()); + + // If the data is either not backed by a file, or there are multiple + // fragments then we cannot marshal it by local reference. We might + // want/need to extend this in the future to allow multiple chunk + // segments to be marshaled at once - *AttachmentInfo++ = {.PayloadSize = AttachmentBuffer.GetCompressedSize(), - .Flags = CbAttachmentEntry::kIsCompressed, - .AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash())}; + bool MarshalByLocalRef = EnumHasAllFlags(Flags, FormatFlags::kAllowLocalReferences) && (Compressed.GetSegments().size() == 1); - for (const SharedBuffer& Segment : Compressed.GetSegments()) + CbAttachmentReferenceHeader LocalRef; + std::string Path8; + + if (MarshalByLocalRef) { - ResponseBuffers.push_back(Segment.AsIoBuffer()); + MarshalByLocalRef = IsLocalRef(Compressed, LocalRef, Path8); + } + + if (MarshalByLocalRef) + { + const bool IsCompressed = true; + MarshalLocal(Path8, LocalRef, AttachmentHash, IsCompressed); + } + else + { + *AttachmentInfo++ = {.PayloadSize = AttachmentBuffer.GetCompressedSize(), + .Flags = CbAttachmentEntry::kIsCompressed, + .AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash())}; + + for (const SharedBuffer& Segment : Compressed.GetSegments()) + { + ResponseBuffers.push_back(Segment.AsIoBuffer()); + } } } else if (CbObject AttachmentObject = Attachment.AsObject()) @@ -92,11 +168,31 @@ FormatPackageMessage(const CbPackage& Data) } else if (CompositeBuffer AttachmentBinary = Attachment.AsCompositeBinary()) { - *AttachmentInfo++ = {.PayloadSize = AttachmentBinary.GetSize(), .Flags = 0, .AttachmentHash = Attachment.GetHash()}; + IoHash AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash()); + bool MarshalByLocalRef = + EnumHasAllFlags(Flags, FormatFlags::kAllowLocalReferences) && (AttachmentBinary.GetSegments().size() == 1); + + CbAttachmentReferenceHeader LocalRef; + std::string Path8; + + if (MarshalByLocalRef) + { + MarshalByLocalRef = IsLocalRef(AttachmentBinary, LocalRef, Path8); + } - for (const SharedBuffer& Segment : AttachmentBinary.GetSegments()) + if (MarshalByLocalRef) { - ResponseBuffers.push_back(Segment.AsIoBuffer()); + const bool IsCompressed = false; + MarshalLocal(Path8, LocalRef, AttachmentHash, IsCompressed); + } + else + { + *AttachmentInfo++ = {.PayloadSize = AttachmentBinary.GetSize(), .Flags = 0, .AttachmentHash = Attachment.GetHash()}; + + for (const SharedBuffer& Segment : AttachmentBinary.GetSegments()) + { + ResponseBuffers.push_back(Segment.AsIoBuffer()); + } } } else @@ -108,6 +204,27 @@ FormatPackageMessage(const CbPackage& Data) return ResponseBuffers; } +bool +IsPackageMessage(IoBuffer Payload) +{ + if (!Payload) + { + return false; + } + + BinaryReader Reader(Payload); + + CbPackageHeader Hdr; + Reader.Read(&Hdr, sizeof Hdr); + + if (Hdr.HeaderMagic != kCbPkgMagic) + { + return false; + } + + return true; +} + CbPackage ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint64_t)> CreateBuffer) { @@ -145,7 +262,38 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint Reader.Read(AttachmentBuffer.MutableData(), AttachmentSize); - if (Entry.Flags & CbAttachmentEntry::kIsCompressed) + if (Entry.Flags & CbAttachmentEntry::kIsLocalRef) + { + // Marshal local reference - a "pointer" to the chunk backing file + + ZEN_ASSERT(AttachmentBuffer.Size() >= sizeof(CbAttachmentReferenceHeader)); + + const CbAttachmentReferenceHeader* AttachRefHdr = AttachmentBuffer.Data<CbAttachmentReferenceHeader>(); + const char8_t* PathPointer = reinterpret_cast<const char8_t*>(AttachRefHdr + 1); + + ZEN_ASSERT(AttachmentBuffer.Size() >= (sizeof(CbAttachmentReferenceHeader) + AttachRefHdr->AbsolutePathLength)); + + std::filesystem::path Path{PathPointer}; + + if (IoBuffer ChunkReference = + IoBufferBuilder::MakeFromFile(Path, AttachRefHdr->PayloadByteOffset, AttachRefHdr->PayloadByteSize)) + { + CompressedBuffer CompBuf(CompressedBuffer::FromCompressed(SharedBuffer(ChunkReference))); + CbAttachment Attachment(std::move(CompBuf)); + Package.AddAttachment(Attachment); + } + else + { + // Unable to open chunk reference + + throw std::runtime_error(fmt::format("unable to resolve chunk #{} at '{}' (offset {}, size {})", + i, + PathToUtf8(Path), + AttachRefHdr->PayloadByteOffset, + AttachRefHdr->PayloadByteSize)); + } + } + else if (Entry.Flags & CbAttachmentEntry::kIsCompressed) { CompressedBuffer CompBuf(CompressedBuffer::FromCompressed(SharedBuffer(AttachmentBuffer))); @@ -153,6 +301,7 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint { if (i == 0) { + // First payload is always a compact binary object Package.SetObject(LoadCompactBinaryObject(std::move(CompBuf))); } else @@ -204,10 +353,8 @@ CbPackageReader::SetPayloadBufferCreator(std::function<IoBuffer(const IoHash& Ci m_CreateBuffer = CreateBuffer; } -/** Process data - */ uint64_t -CbPackageReader::ProcessHeaderData(const void* Data, uint64_t DataBytes) +CbPackageReader::ProcessPackageHeaderData(const void* Data, uint64_t DataBytes) { ZEN_ASSERT(m_CurrentState != State::kReadingBuffers); @@ -232,6 +379,10 @@ CbPackageReader::ProcessHeaderData(const void* Data, uint64_t DataBytes) for (CbAttachmentEntry& Entry : m_AttachmentEntries) { + // This preallocates memory for payloads but note that for the local references + // the caller will need to handle the payload differently (i.e it's a + // CbAttachmentReferenceHeader not the actual payload) + m_PayloadBuffers.push_back(IoBuffer{Entry.PayloadSize}); } @@ -244,6 +395,92 @@ CbPackageReader::ProcessHeaderData(const void* Data, uint64_t DataBytes) } } +IoBuffer +CbPackageReader::MarshalLocalChunkReference(IoBuffer AttachmentBuffer) +{ + // Marshal local reference - a "pointer" to the chunk backing file + + ZEN_ASSERT(AttachmentBuffer.Size() >= sizeof(CbAttachmentReferenceHeader)); + + const CbAttachmentReferenceHeader* AttachRefHdr = AttachmentBuffer.Data<CbAttachmentReferenceHeader>(); + const char8_t* PathPointer = reinterpret_cast<const char8_t*>(AttachRefHdr + 1); + + ZEN_ASSERT(AttachmentBuffer.Size() >= (sizeof(CbAttachmentReferenceHeader) + AttachRefHdr->AbsolutePathLength)); + + std::filesystem::path Path{PathPointer}; + + IoBuffer ChunkReference = IoBufferBuilder::MakeFromFile(Path, AttachRefHdr->PayloadByteOffset, AttachRefHdr->PayloadByteSize); + + if (!ChunkReference) + { + // Unable to open chunk reference + + throw std::runtime_error(fmt::format("unable to resolve local reference to '{}' (offset {}, size {})", + PathToUtf8(Path), + AttachRefHdr->PayloadByteOffset, + AttachRefHdr->PayloadByteSize)); + } + + return ChunkReference; +}; + +void +CbPackageReader::Finalize() +{ + if (m_AttachmentEntries.empty()) + { + return; + } + + m_Attachments.reserve(m_AttachmentEntries.size() - 1); + + int CurrentAttachmentIndex = 0; + for (CbAttachmentEntry& Entry : m_AttachmentEntries) + { + IoBuffer AttachmentBuffer = m_PayloadBuffers[CurrentAttachmentIndex]; + + if (CurrentAttachmentIndex == 0) + { + // Root object + if (Entry.Flags & CbAttachmentEntry::kIsObject) + { + if (Entry.Flags & CbAttachmentEntry::kIsLocalRef) + { + m_RootObject = LoadCompactBinaryObject(MarshalLocalChunkReference(AttachmentBuffer)); + } + else if (Entry.Flags & CbAttachmentEntry::kIsCompressed) + { + m_RootObject = LoadCompactBinaryObject(CompressedBuffer::FromCompressed(SharedBuffer(AttachmentBuffer))); + } + else + { + m_RootObject = LoadCompactBinaryObject(std::move(AttachmentBuffer)); + } + } + else + { + throw std::runtime_error("missing or invalid root object"); + } + } + else if (Entry.Flags & CbAttachmentEntry::kIsLocalRef) + { + IoBuffer ChunkReference = MarshalLocalChunkReference(AttachmentBuffer); + + if (Entry.Flags & CbAttachmentEntry::kIsCompressed) + { + m_Attachments.push_back(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(ChunkReference)))); + } + else + { + m_Attachments.push_back(CbAttachment( + CompressedBuffer::Compress(SharedBuffer(ChunkReference), OodleCompressor::NotSet, OodleCompressionLevel::None))); + } + } + + ++CurrentAttachmentIndex; + } +} + /** ______________________ _____________________________ \__ ___/\_ _____// _____/\__ ___/ _____/ @@ -291,15 +528,82 @@ TEST_CASE("CbPackage.Serialization") }; CbPackageReader Reader; - uint64_t InitialRead = Reader.ProcessHeaderData(nullptr, 0); - uint64_t NextBytes = Reader.ProcessHeaderData(ConsumeBytes(InitialRead), InitialRead); - NextBytes = Reader.ProcessHeaderData(ConsumeBytes(NextBytes), NextBytes); + uint64_t InitialRead = Reader.ProcessPackageHeaderData(nullptr, 0); + uint64_t NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(InitialRead), InitialRead); + NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(NextBytes), NextBytes); + auto Buffers = Reader.GetPayloadBuffers(); + + for (auto& PayloadBuffer : Buffers) + { + CopyBytes(PayloadBuffer.MutableData(), PayloadBuffer.GetSize()); + } + + Reader.Finalize(); +} + +TEST_CASE("CbPackage.LocalRef") +{ + ScopedTemporaryDirectory TempDir; + + auto Path1 = TempDir.Path() / "abcd"; + auto Path2 = TempDir.Path() / "efgh"; + + { + IoBuffer Buffer1 = IoBufferBuilder::MakeCloneFromMemory(MakeMemoryView("abcd")); + IoBuffer Buffer2 = IoBufferBuilder::MakeCloneFromMemory(MakeMemoryView("efgh")); + + WriteFile(Path1, Buffer1); + WriteFile(Path2, Buffer2); + } + + // Make a test package + + IoBuffer FileBuffer1 = IoBufferBuilder::MakeFromFile(Path1); + IoBuffer FileBuffer2 = IoBufferBuilder::MakeFromFile(Path2); + + CbAttachment Attach1{SharedBuffer(FileBuffer1)}; + CbAttachment Attach2{SharedBuffer(FileBuffer2)}; + + CbObjectWriter Cbo; + Cbo.AddAttachment("abcd", Attach1); + Cbo.AddAttachment("efgh", Attach2); + + CbPackage Pkg; + Pkg.AddAttachment(Attach1); + Pkg.AddAttachment(Attach2); + Pkg.SetObject(Cbo.Save()); + + SharedBuffer Buffer = FormatPackageMessageBuffer(Pkg, FormatFlags::kAllowLocalReferences).Flatten(); + const uint8_t* CursorPtr = reinterpret_cast<const uint8_t*>(Buffer.GetData()); + uint64_t RemainingBytes = Buffer.GetSize(); + + auto ConsumeBytes = [&](uint64_t ByteCount) { + ZEN_ASSERT(ByteCount <= RemainingBytes); + void* ReturnPtr = (void*)CursorPtr; + CursorPtr += ByteCount; + RemainingBytes -= ByteCount; + return ReturnPtr; + }; + + auto CopyBytes = [&](void* TargetBuffer, uint64_t ByteCount) { + ZEN_ASSERT(ByteCount <= RemainingBytes); + memcpy(TargetBuffer, CursorPtr, ByteCount); + CursorPtr += ByteCount; + RemainingBytes -= ByteCount; + }; + + CbPackageReader Reader; + uint64_t InitialRead = Reader.ProcessPackageHeaderData(nullptr, 0); + uint64_t NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(InitialRead), InitialRead); + NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(NextBytes), NextBytes); auto Buffers = Reader.GetPayloadBuffers(); for (auto& PayloadBuffer : Buffers) { CopyBytes(PayloadBuffer.MutableData(), PayloadBuffer.GetSize()); } + + Reader.Finalize(); } void |