diff options
| author | Stefan Boberg <[email protected]> | 2022-06-10 15:42:53 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2022-06-10 15:42:53 +0200 |
| commit | db8e1c167801fb5ff8d96d0fba7dee3c869e61fa (patch) | |
| tree | 1d9b41b49affcb79cec337802c1614c0af2a6ac5 | |
| parent | cidstore: propagate the correct content type (diff) | |
| download | zen-db8e1c167801fb5ff8d96d0fba7dee3c869e61fa.tar.xz zen-db8e1c167801fb5ff8d96d0fba7dee3c869e61fa.zip | |
cbpackage: added initial support for marshaling of attachment by local reference
this mode allows local clients to avoid unnecessary copying of data from zen and instead reference data directly
| -rw-r--r-- | zenhttp/httpshared.cpp | 348 | ||||
| -rw-r--r-- | zenhttp/include/zenhttp/httpshared.h | 62 |
2 files changed, 380 insertions, 30 deletions
diff --git a/zenhttp/httpshared.cpp b/zenhttp/httpshared.cpp index f2ce17e16..7b92a0587 100644 --- a/zenhttp/httpshared.cpp +++ b/zenhttp/httpshared.cpp @@ -5,20 +5,35 @@ #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> #include <zencore/compositebuffer.h> +#include <zencore/filesystem.h> #include <zencore/iobuffer.h> #include <zencore/iohash.h> #include <zencore/stream.h> #include <zencore/testing.h> +#include <zencore/testutils.h> + +#include <zencore/fmtutils.h> #include <span> #include <vector> namespace zen { +std::vector<IoBuffer> +FormatPackageMessage(const CbPackage& Data) +{ + return FormatPackageMessage(Data, FormatFlags::kDefault); +} CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data) { - std::vector<IoBuffer> Message = FormatPackageMessage(Data); + return FormatPackageMessageBuffer(Data, FormatFlags::kDefault); +} + +CompositeBuffer +FormatPackageMessageBuffer(const CbPackage& Data, FormatFlags Flags) +{ + std::vector<IoBuffer> Message = FormatPackageMessage(Data, Flags); std::vector<SharedBuffer> Buffers; @@ -31,11 +46,11 @@ FormatPackageMessageBuffer(const CbPackage& Data) } std::vector<IoBuffer> -FormatPackageMessage(const CbPackage& Data) +FormatPackageMessage(const CbPackage& Data, FormatFlags Flags) { const std::span<const CbAttachment>& Attachments = Data.GetAttachments(); + std::vector<IoBuffer> ResponseBuffers; - std::vector<IoBuffer> ResponseBuffers; ResponseBuffers.reserve(3 + Attachments.size()); // TODO: may want to use an additional fudge factor here to avoid growing since each // attachment is likely to consist of several buffers @@ -47,9 +62,8 @@ FormatPackageMessage(const CbPackage& Data) // Attachment metadata array - IoBuffer AttachmentMetadataBuffer = IoBuffer{sizeof(CbAttachmentEntry) * (Attachments.size() + /* root */ 1)}; - - CbAttachmentEntry* AttachmentInfo = reinterpret_cast<CbAttachmentEntry*>(AttachmentMetadataBuffer.MutableData()); + IoBuffer AttachmentMetadataBuffer = IoBuffer{sizeof(CbAttachmentEntry) * (Attachments.size() + /* root */ 1)}; + CbAttachmentEntry* AttachmentInfo = reinterpret_cast<CbAttachmentEntry*>(AttachmentMetadataBuffer.MutableData()); ResponseBuffers.push_back(AttachmentMetadataBuffer); // Attachment metadata @@ -62,6 +76,44 @@ FormatPackageMessage(const CbPackage& Data) // Attachment payloads + auto MarshalLocal = [&AttachmentInfo, &ResponseBuffers](const std::string& Path8, + CbAttachmentReferenceHeader& LocalRef, + const IoHash& AttachmentHash, + bool IsCompressed) { + IoBuffer RefBuffer = IoBuffer(sizeof CbAttachmentReferenceHeader + Path8.size()); + + CbAttachmentReferenceHeader* RefHdr = RefBuffer.MutableData<CbAttachmentReferenceHeader>(); + *RefHdr++ = LocalRef; + memcpy(RefHdr, Path8.data(), Path8.size()); + + *AttachmentInfo++ = {.PayloadSize = RefBuffer.GetSize(), + .Flags = (IsCompressed ? CbAttachmentEntry::kIsCompressed : 0u) | CbAttachmentEntry::kIsLocalRef, + .AttachmentHash = AttachmentHash}; + + ResponseBuffers.push_back(std::move(RefBuffer)); + }; + + auto IsLocalRef = [](const CompositeBuffer& AttachmentBinary, CbAttachmentReferenceHeader& LocalRef, std::string& Path8) -> bool { + const SharedBuffer& Segment = AttachmentBinary.GetSegments().front(); + IoBufferFileReference Ref; + const IoBuffer& SegmentBuffer = Segment.AsIoBuffer(); + + if (!SegmentBuffer.GetFileReference(Ref)) + { + return false; + } + + ExtendablePathBuilder<256> LocalRefFile; + LocalRefFile.Append(std::filesystem::absolute(PathFromHandle(Ref.FileHandle))); + Path8 = LocalRefFile.ToUtf8(); + + LocalRef.AbsolutePathLength = gsl::narrow<uint16_t>(Path8.size()); + LocalRef.PayloadByteOffset = Ref.FileChunkOffset; + LocalRef.PayloadByteSize = Ref.FileChunkSize; + + return true; + }; + for (const CbAttachment& Attachment : Attachments) { if (Attachment.IsNull()) @@ -70,15 +122,39 @@ FormatPackageMessage(const CbPackage& Data) } else if (CompressedBuffer AttachmentBuffer = Attachment.AsCompressedBinary()) { - CompositeBuffer Compressed = AttachmentBuffer.GetCompressed(); + CompositeBuffer Compressed = AttachmentBuffer.GetCompressed(); + IoHash AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash()); + + // If the data is either not backed by a file, or there are multiple + // fragments then we cannot marshal it by local reference. We might + // want/need to extend this in the future to allow multiple chunk + // segments to be marshaled at once - *AttachmentInfo++ = {.PayloadSize = AttachmentBuffer.GetCompressedSize(), - .Flags = CbAttachmentEntry::kIsCompressed, - .AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash())}; + bool MarshalByLocalRef = EnumHasAllFlags(Flags, FormatFlags::kAllowLocalReferences) && (Compressed.GetSegments().size() == 1); - for (const SharedBuffer& Segment : Compressed.GetSegments()) + CbAttachmentReferenceHeader LocalRef; + std::string Path8; + + if (MarshalByLocalRef) { - ResponseBuffers.push_back(Segment.AsIoBuffer()); + MarshalByLocalRef = IsLocalRef(Compressed, LocalRef, Path8); + } + + if (MarshalByLocalRef) + { + const bool IsCompressed = true; + MarshalLocal(Path8, LocalRef, AttachmentHash, IsCompressed); + } + else + { + *AttachmentInfo++ = {.PayloadSize = AttachmentBuffer.GetCompressedSize(), + .Flags = CbAttachmentEntry::kIsCompressed, + .AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash())}; + + for (const SharedBuffer& Segment : Compressed.GetSegments()) + { + ResponseBuffers.push_back(Segment.AsIoBuffer()); + } } } else if (CbObject AttachmentObject = Attachment.AsObject()) @@ -92,11 +168,31 @@ FormatPackageMessage(const CbPackage& Data) } else if (CompositeBuffer AttachmentBinary = Attachment.AsCompositeBinary()) { - *AttachmentInfo++ = {.PayloadSize = AttachmentBinary.GetSize(), .Flags = 0, .AttachmentHash = Attachment.GetHash()}; + IoHash AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash()); + bool MarshalByLocalRef = + EnumHasAllFlags(Flags, FormatFlags::kAllowLocalReferences) && (AttachmentBinary.GetSegments().size() == 1); + + CbAttachmentReferenceHeader LocalRef; + std::string Path8; + + if (MarshalByLocalRef) + { + MarshalByLocalRef = IsLocalRef(AttachmentBinary, LocalRef, Path8); + } - for (const SharedBuffer& Segment : AttachmentBinary.GetSegments()) + if (MarshalByLocalRef) { - ResponseBuffers.push_back(Segment.AsIoBuffer()); + const bool IsCompressed = false; + MarshalLocal(Path8, LocalRef, AttachmentHash, IsCompressed); + } + else + { + *AttachmentInfo++ = {.PayloadSize = AttachmentBinary.GetSize(), .Flags = 0, .AttachmentHash = Attachment.GetHash()}; + + for (const SharedBuffer& Segment : AttachmentBinary.GetSegments()) + { + ResponseBuffers.push_back(Segment.AsIoBuffer()); + } } } else @@ -108,6 +204,27 @@ FormatPackageMessage(const CbPackage& Data) return ResponseBuffers; } +bool +IsPackageMessage(IoBuffer Payload) +{ + if (!Payload) + { + return false; + } + + BinaryReader Reader(Payload); + + CbPackageHeader Hdr; + Reader.Read(&Hdr, sizeof Hdr); + + if (Hdr.HeaderMagic != kCbPkgMagic) + { + return false; + } + + return true; +} + CbPackage ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint64_t)> CreateBuffer) { @@ -145,7 +262,38 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint Reader.Read(AttachmentBuffer.MutableData(), AttachmentSize); - if (Entry.Flags & CbAttachmentEntry::kIsCompressed) + if (Entry.Flags & CbAttachmentEntry::kIsLocalRef) + { + // Marshal local reference - a "pointer" to the chunk backing file + + ZEN_ASSERT(AttachmentBuffer.Size() >= sizeof(CbAttachmentReferenceHeader)); + + const CbAttachmentReferenceHeader* AttachRefHdr = AttachmentBuffer.Data<CbAttachmentReferenceHeader>(); + const char8_t* PathPointer = reinterpret_cast<const char8_t*>(AttachRefHdr + 1); + + ZEN_ASSERT(AttachmentBuffer.Size() >= (sizeof(CbAttachmentReferenceHeader) + AttachRefHdr->AbsolutePathLength)); + + std::filesystem::path Path{PathPointer}; + + if (IoBuffer ChunkReference = + IoBufferBuilder::MakeFromFile(Path, AttachRefHdr->PayloadByteOffset, AttachRefHdr->PayloadByteSize)) + { + CompressedBuffer CompBuf(CompressedBuffer::FromCompressed(SharedBuffer(ChunkReference))); + CbAttachment Attachment(std::move(CompBuf)); + Package.AddAttachment(Attachment); + } + else + { + // Unable to open chunk reference + + throw std::runtime_error(fmt::format("unable to resolve chunk #{} at '{}' (offset {}, size {})", + i, + PathToUtf8(Path), + AttachRefHdr->PayloadByteOffset, + AttachRefHdr->PayloadByteSize)); + } + } + else if (Entry.Flags & CbAttachmentEntry::kIsCompressed) { CompressedBuffer CompBuf(CompressedBuffer::FromCompressed(SharedBuffer(AttachmentBuffer))); @@ -153,6 +301,7 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint { if (i == 0) { + // First payload is always a compact binary object Package.SetObject(LoadCompactBinaryObject(std::move(CompBuf))); } else @@ -204,10 +353,8 @@ CbPackageReader::SetPayloadBufferCreator(std::function<IoBuffer(const IoHash& Ci m_CreateBuffer = CreateBuffer; } -/** Process data - */ uint64_t -CbPackageReader::ProcessHeaderData(const void* Data, uint64_t DataBytes) +CbPackageReader::ProcessPackageHeaderData(const void* Data, uint64_t DataBytes) { ZEN_ASSERT(m_CurrentState != State::kReadingBuffers); @@ -232,6 +379,10 @@ CbPackageReader::ProcessHeaderData(const void* Data, uint64_t DataBytes) for (CbAttachmentEntry& Entry : m_AttachmentEntries) { + // This preallocates memory for payloads but note that for the local references + // the caller will need to handle the payload differently (i.e it's a + // CbAttachmentReferenceHeader not the actual payload) + m_PayloadBuffers.push_back(IoBuffer{Entry.PayloadSize}); } @@ -244,6 +395,92 @@ CbPackageReader::ProcessHeaderData(const void* Data, uint64_t DataBytes) } } +IoBuffer +CbPackageReader::MarshalLocalChunkReference(IoBuffer AttachmentBuffer) +{ + // Marshal local reference - a "pointer" to the chunk backing file + + ZEN_ASSERT(AttachmentBuffer.Size() >= sizeof(CbAttachmentReferenceHeader)); + + const CbAttachmentReferenceHeader* AttachRefHdr = AttachmentBuffer.Data<CbAttachmentReferenceHeader>(); + const char8_t* PathPointer = reinterpret_cast<const char8_t*>(AttachRefHdr + 1); + + ZEN_ASSERT(AttachmentBuffer.Size() >= (sizeof(CbAttachmentReferenceHeader) + AttachRefHdr->AbsolutePathLength)); + + std::filesystem::path Path{PathPointer}; + + IoBuffer ChunkReference = IoBufferBuilder::MakeFromFile(Path, AttachRefHdr->PayloadByteOffset, AttachRefHdr->PayloadByteSize); + + if (!ChunkReference) + { + // Unable to open chunk reference + + throw std::runtime_error(fmt::format("unable to resolve local reference to '{}' (offset {}, size {})", + PathToUtf8(Path), + AttachRefHdr->PayloadByteOffset, + AttachRefHdr->PayloadByteSize)); + } + + return ChunkReference; +}; + +void +CbPackageReader::Finalize() +{ + if (m_AttachmentEntries.empty()) + { + return; + } + + m_Attachments.reserve(m_AttachmentEntries.size() - 1); + + int CurrentAttachmentIndex = 0; + for (CbAttachmentEntry& Entry : m_AttachmentEntries) + { + IoBuffer AttachmentBuffer = m_PayloadBuffers[CurrentAttachmentIndex]; + + if (CurrentAttachmentIndex == 0) + { + // Root object + if (Entry.Flags & CbAttachmentEntry::kIsObject) + { + if (Entry.Flags & CbAttachmentEntry::kIsLocalRef) + { + m_RootObject = LoadCompactBinaryObject(MarshalLocalChunkReference(AttachmentBuffer)); + } + else if (Entry.Flags & CbAttachmentEntry::kIsCompressed) + { + m_RootObject = LoadCompactBinaryObject(CompressedBuffer::FromCompressed(SharedBuffer(AttachmentBuffer))); + } + else + { + m_RootObject = LoadCompactBinaryObject(std::move(AttachmentBuffer)); + } + } + else + { + throw std::runtime_error("missing or invalid root object"); + } + } + else if (Entry.Flags & CbAttachmentEntry::kIsLocalRef) + { + IoBuffer ChunkReference = MarshalLocalChunkReference(AttachmentBuffer); + + if (Entry.Flags & CbAttachmentEntry::kIsCompressed) + { + m_Attachments.push_back(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(ChunkReference)))); + } + else + { + m_Attachments.push_back(CbAttachment( + CompressedBuffer::Compress(SharedBuffer(ChunkReference), OodleCompressor::NotSet, OodleCompressionLevel::None))); + } + } + + ++CurrentAttachmentIndex; + } +} + /** ______________________ _____________________________ \__ ___/\_ _____// _____/\__ ___/ _____/ @@ -291,15 +528,82 @@ TEST_CASE("CbPackage.Serialization") }; CbPackageReader Reader; - uint64_t InitialRead = Reader.ProcessHeaderData(nullptr, 0); - uint64_t NextBytes = Reader.ProcessHeaderData(ConsumeBytes(InitialRead), InitialRead); - NextBytes = Reader.ProcessHeaderData(ConsumeBytes(NextBytes), NextBytes); + uint64_t InitialRead = Reader.ProcessPackageHeaderData(nullptr, 0); + uint64_t NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(InitialRead), InitialRead); + NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(NextBytes), NextBytes); + auto Buffers = Reader.GetPayloadBuffers(); + + for (auto& PayloadBuffer : Buffers) + { + CopyBytes(PayloadBuffer.MutableData(), PayloadBuffer.GetSize()); + } + + Reader.Finalize(); +} + +TEST_CASE("CbPackage.LocalRef") +{ + ScopedTemporaryDirectory TempDir; + + auto Path1 = TempDir.Path() / "abcd"; + auto Path2 = TempDir.Path() / "efgh"; + + { + IoBuffer Buffer1 = IoBufferBuilder::MakeCloneFromMemory(MakeMemoryView("abcd")); + IoBuffer Buffer2 = IoBufferBuilder::MakeCloneFromMemory(MakeMemoryView("efgh")); + + WriteFile(Path1, Buffer1); + WriteFile(Path2, Buffer2); + } + + // Make a test package + + IoBuffer FileBuffer1 = IoBufferBuilder::MakeFromFile(Path1); + IoBuffer FileBuffer2 = IoBufferBuilder::MakeFromFile(Path2); + + CbAttachment Attach1{SharedBuffer(FileBuffer1)}; + CbAttachment Attach2{SharedBuffer(FileBuffer2)}; + + CbObjectWriter Cbo; + Cbo.AddAttachment("abcd", Attach1); + Cbo.AddAttachment("efgh", Attach2); + + CbPackage Pkg; + Pkg.AddAttachment(Attach1); + Pkg.AddAttachment(Attach2); + Pkg.SetObject(Cbo.Save()); + + SharedBuffer Buffer = FormatPackageMessageBuffer(Pkg, FormatFlags::kAllowLocalReferences).Flatten(); + const uint8_t* CursorPtr = reinterpret_cast<const uint8_t*>(Buffer.GetData()); + uint64_t RemainingBytes = Buffer.GetSize(); + + auto ConsumeBytes = [&](uint64_t ByteCount) { + ZEN_ASSERT(ByteCount <= RemainingBytes); + void* ReturnPtr = (void*)CursorPtr; + CursorPtr += ByteCount; + RemainingBytes -= ByteCount; + return ReturnPtr; + }; + + auto CopyBytes = [&](void* TargetBuffer, uint64_t ByteCount) { + ZEN_ASSERT(ByteCount <= RemainingBytes); + memcpy(TargetBuffer, CursorPtr, ByteCount); + CursorPtr += ByteCount; + RemainingBytes -= ByteCount; + }; + + CbPackageReader Reader; + uint64_t InitialRead = Reader.ProcessPackageHeaderData(nullptr, 0); + uint64_t NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(InitialRead), InitialRead); + NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(NextBytes), NextBytes); auto Buffers = Reader.GetPayloadBuffers(); for (auto& PayloadBuffer : Buffers) { CopyBytes(PayloadBuffer.MutableData(), PayloadBuffer.GetSize()); } + + Reader.Finalize(); } void diff --git a/zenhttp/include/zenhttp/httpshared.h b/zenhttp/include/zenhttp/httpshared.h index a6a61485f..24ce0c85a 100644 --- a/zenhttp/include/zenhttp/httpshared.h +++ b/zenhttp/include/zenhttp/httpshared.h @@ -2,10 +2,12 @@ #pragma once +#include <zencore/compactbinarypackage.h> #include <zencore/iobuffer.h> #include <zencore/iohash.h> #include <functional> +#include <gsl/gsl-lite.hpp> namespace zen { @@ -24,6 +26,13 @@ class CompositeBuffer; Structures and code related to handling CbPackage transactions + CbPackage instances are marshaled across the wire using a distinct message + format. We don't use the CbPackage serialization format provided by the + CbPackage implementation itself since that does not provide much flexibility + in how the attachment payloads are transmitted. The scheme below separates + metadata cleanly from payloads and this enables us to more efficiently + transmit them either via sendfile/TransmitFile like mechanisms, or by + reference/memory mapping in the local case. */ struct CbPackageHeader @@ -43,33 +52,59 @@ enum : uint32_t struct CbAttachmentEntry { - uint64_t PayloadSize; - uint32_t Flags; - IoHash AttachmentHash; + uint64_t PayloadSize; // Size of the associated payload data in the message + uint32_t Flags; // See flags below + IoHash AttachmentHash; // Content Id for the attachment enum { kIsCompressed = (1u << 0), // Is marshaled using compressed buffer storage format kIsObject = (1u << 1), // Is compact binary object kIsError = (1u << 2), // Is error (compact binary formatted) object + kIsLocalRef = (1u << 3), // Is "local reference" }; }; +struct CbAttachmentReferenceHeader +{ + uint64_t PayloadByteOffset = 0; + uint64_t PayloadByteSize = ~0u; + uint16_t AbsolutePathLength = 0; + + // This header will be followed by UTF8 encoded absolute path to backing file +}; + static_assert(sizeof(CbAttachmentEntry) == 32); -std::vector<IoBuffer> FormatPackageMessage(const CbPackage& Data); -CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data); +enum class FormatFlags +{ + kDefault = 0, + kAllowLocalReferences = (1u << 0) +}; + +gsl_DEFINE_ENUM_BITMASK_OPERATORS(FormatFlags); + +std::vector<IoBuffer> FormatPackageMessage(const CbPackage& Data, FormatFlags Flags); +CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data, FormatFlags Flags); CbPackage ParsePackageMessage( IoBuffer Payload, std::function<IoBuffer(const IoHash& Cid, uint64_t Size)> CreateBuffer = [](const IoHash&, uint64_t Size) -> IoBuffer { return IoBuffer{Size}; }); +bool IsPackageMessage(IoBuffer Payload); + +std::vector<IoBuffer> FormatPackageMessage(const CbPackage& Data); +CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data); /** Streaming reader for compact binary packages The goal is to ultimately support zero-copy I/O, but for now there'll be some copying involved on some platforms at least. + This approach to deserializing CbPackage data is more efficient than + `ParsePackageMessage` since it does not require the entire message to + be resident in a memory buffer + */ class CbPackageReader { @@ -79,11 +114,18 @@ public: void SetPayloadBufferCreator(std::function<IoBuffer(const IoHash& Cid, uint64_t Size)> CreateBuffer); - /** Process header data + /** Process compact binary package data stream + + The data stream must be in the serialization format produced by FormatPackageMessage + + \return How many bytes must be fed to this function in the next call */ - uint64_t ProcessHeaderData(const void* Data, uint64_t DataBytes); + uint64_t ProcessPackageHeaderData(const void* Data, uint64_t DataBytes); - std::span<IoBuffer> GetPayloadBuffers() { return m_PayloadBuffers; } + void Finalize(); + const std::vector<CbAttachment>& GetAttachments() { return m_Attachments; } + CbObject GetRootObject() { return m_RootObject; } + std::span<IoBuffer> GetPayloadBuffers() { return m_PayloadBuffers; } private: enum class State @@ -97,7 +139,11 @@ private: std::function<IoBuffer(const IoHash& Cid, uint64_t Size)> m_CreateBuffer; std::vector<IoBuffer> m_PayloadBuffers; std::vector<CbAttachmentEntry> m_AttachmentEntries; + std::vector<CbAttachment> m_Attachments; + CbObject m_RootObject; CbPackageHeader m_PackageHeader; + + IoBuffer MarshalLocalChunkReference(IoBuffer AttachmentBuffer); }; void forcelink_httpshared(); |