aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2022-06-10 15:42:53 +0200
committerStefan Boberg <[email protected]>2022-06-10 15:42:53 +0200
commitdb8e1c167801fb5ff8d96d0fba7dee3c869e61fa (patch)
tree1d9b41b49affcb79cec337802c1614c0af2a6ac5
parentcidstore: propagate the correct content type (diff)
downloadzen-db8e1c167801fb5ff8d96d0fba7dee3c869e61fa.tar.xz
zen-db8e1c167801fb5ff8d96d0fba7dee3c869e61fa.zip
cbpackage: added initial support for marshaling of attachment by local reference
this mode allows local clients to avoid unnecessary copying of data from zen and instead reference data directly
-rw-r--r--zenhttp/httpshared.cpp348
-rw-r--r--zenhttp/include/zenhttp/httpshared.h62
2 files changed, 380 insertions, 30 deletions
diff --git a/zenhttp/httpshared.cpp b/zenhttp/httpshared.cpp
index f2ce17e16..7b92a0587 100644
--- a/zenhttp/httpshared.cpp
+++ b/zenhttp/httpshared.cpp
@@ -5,20 +5,35 @@
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
#include <zencore/compositebuffer.h>
+#include <zencore/filesystem.h>
#include <zencore/iobuffer.h>
#include <zencore/iohash.h>
#include <zencore/stream.h>
#include <zencore/testing.h>
+#include <zencore/testutils.h>
+
+#include <zencore/fmtutils.h>
#include <span>
#include <vector>
namespace zen {
+std::vector<IoBuffer>
+FormatPackageMessage(const CbPackage& Data)
+{
+ return FormatPackageMessage(Data, FormatFlags::kDefault);
+}
CompositeBuffer
FormatPackageMessageBuffer(const CbPackage& Data)
{
- std::vector<IoBuffer> Message = FormatPackageMessage(Data);
+ return FormatPackageMessageBuffer(Data, FormatFlags::kDefault);
+}
+
+CompositeBuffer
+FormatPackageMessageBuffer(const CbPackage& Data, FormatFlags Flags)
+{
+ std::vector<IoBuffer> Message = FormatPackageMessage(Data, Flags);
std::vector<SharedBuffer> Buffers;
@@ -31,11 +46,11 @@ FormatPackageMessageBuffer(const CbPackage& Data)
}
std::vector<IoBuffer>
-FormatPackageMessage(const CbPackage& Data)
+FormatPackageMessage(const CbPackage& Data, FormatFlags Flags)
{
const std::span<const CbAttachment>& Attachments = Data.GetAttachments();
+ std::vector<IoBuffer> ResponseBuffers;
- std::vector<IoBuffer> ResponseBuffers;
ResponseBuffers.reserve(3 + Attachments.size()); // TODO: may want to use an additional fudge factor here to avoid growing since each
// attachment is likely to consist of several buffers
@@ -47,9 +62,8 @@ FormatPackageMessage(const CbPackage& Data)
// Attachment metadata array
- IoBuffer AttachmentMetadataBuffer = IoBuffer{sizeof(CbAttachmentEntry) * (Attachments.size() + /* root */ 1)};
-
- CbAttachmentEntry* AttachmentInfo = reinterpret_cast<CbAttachmentEntry*>(AttachmentMetadataBuffer.MutableData());
+ IoBuffer AttachmentMetadataBuffer = IoBuffer{sizeof(CbAttachmentEntry) * (Attachments.size() + /* root */ 1)};
+ CbAttachmentEntry* AttachmentInfo = reinterpret_cast<CbAttachmentEntry*>(AttachmentMetadataBuffer.MutableData());
ResponseBuffers.push_back(AttachmentMetadataBuffer); // Attachment metadata
@@ -62,6 +76,44 @@ FormatPackageMessage(const CbPackage& Data)
// Attachment payloads
+ auto MarshalLocal = [&AttachmentInfo, &ResponseBuffers](const std::string& Path8,
+ CbAttachmentReferenceHeader& LocalRef,
+ const IoHash& AttachmentHash,
+ bool IsCompressed) {
+ IoBuffer RefBuffer = IoBuffer(sizeof CbAttachmentReferenceHeader + Path8.size());
+
+ CbAttachmentReferenceHeader* RefHdr = RefBuffer.MutableData<CbAttachmentReferenceHeader>();
+ *RefHdr++ = LocalRef;
+ memcpy(RefHdr, Path8.data(), Path8.size());
+
+ *AttachmentInfo++ = {.PayloadSize = RefBuffer.GetSize(),
+ .Flags = (IsCompressed ? CbAttachmentEntry::kIsCompressed : 0u) | CbAttachmentEntry::kIsLocalRef,
+ .AttachmentHash = AttachmentHash};
+
+ ResponseBuffers.push_back(std::move(RefBuffer));
+ };
+
+ auto IsLocalRef = [](const CompositeBuffer& AttachmentBinary, CbAttachmentReferenceHeader& LocalRef, std::string& Path8) -> bool {
+ const SharedBuffer& Segment = AttachmentBinary.GetSegments().front();
+ IoBufferFileReference Ref;
+ const IoBuffer& SegmentBuffer = Segment.AsIoBuffer();
+
+ if (!SegmentBuffer.GetFileReference(Ref))
+ {
+ return false;
+ }
+
+ ExtendablePathBuilder<256> LocalRefFile;
+ LocalRefFile.Append(std::filesystem::absolute(PathFromHandle(Ref.FileHandle)));
+ Path8 = LocalRefFile.ToUtf8();
+
+ LocalRef.AbsolutePathLength = gsl::narrow<uint16_t>(Path8.size());
+ LocalRef.PayloadByteOffset = Ref.FileChunkOffset;
+ LocalRef.PayloadByteSize = Ref.FileChunkSize;
+
+ return true;
+ };
+
for (const CbAttachment& Attachment : Attachments)
{
if (Attachment.IsNull())
@@ -70,15 +122,39 @@ FormatPackageMessage(const CbPackage& Data)
}
else if (CompressedBuffer AttachmentBuffer = Attachment.AsCompressedBinary())
{
- CompositeBuffer Compressed = AttachmentBuffer.GetCompressed();
+ CompositeBuffer Compressed = AttachmentBuffer.GetCompressed();
+ IoHash AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash());
+
+ // If the data is either not backed by a file, or there are multiple
+ // fragments then we cannot marshal it by local reference. We might
+ // want/need to extend this in the future to allow multiple chunk
+ // segments to be marshaled at once
- *AttachmentInfo++ = {.PayloadSize = AttachmentBuffer.GetCompressedSize(),
- .Flags = CbAttachmentEntry::kIsCompressed,
- .AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash())};
+ bool MarshalByLocalRef = EnumHasAllFlags(Flags, FormatFlags::kAllowLocalReferences) && (Compressed.GetSegments().size() == 1);
- for (const SharedBuffer& Segment : Compressed.GetSegments())
+ CbAttachmentReferenceHeader LocalRef;
+ std::string Path8;
+
+ if (MarshalByLocalRef)
{
- ResponseBuffers.push_back(Segment.AsIoBuffer());
+ MarshalByLocalRef = IsLocalRef(Compressed, LocalRef, Path8);
+ }
+
+ if (MarshalByLocalRef)
+ {
+ const bool IsCompressed = true;
+ MarshalLocal(Path8, LocalRef, AttachmentHash, IsCompressed);
+ }
+ else
+ {
+ *AttachmentInfo++ = {.PayloadSize = AttachmentBuffer.GetCompressedSize(),
+ .Flags = CbAttachmentEntry::kIsCompressed,
+ .AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash())};
+
+ for (const SharedBuffer& Segment : Compressed.GetSegments())
+ {
+ ResponseBuffers.push_back(Segment.AsIoBuffer());
+ }
}
}
else if (CbObject AttachmentObject = Attachment.AsObject())
@@ -92,11 +168,31 @@ FormatPackageMessage(const CbPackage& Data)
}
else if (CompositeBuffer AttachmentBinary = Attachment.AsCompositeBinary())
{
- *AttachmentInfo++ = {.PayloadSize = AttachmentBinary.GetSize(), .Flags = 0, .AttachmentHash = Attachment.GetHash()};
+ IoHash AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash());
+ bool MarshalByLocalRef =
+ EnumHasAllFlags(Flags, FormatFlags::kAllowLocalReferences) && (AttachmentBinary.GetSegments().size() == 1);
+
+ CbAttachmentReferenceHeader LocalRef;
+ std::string Path8;
+
+ if (MarshalByLocalRef)
+ {
+ MarshalByLocalRef = IsLocalRef(AttachmentBinary, LocalRef, Path8);
+ }
- for (const SharedBuffer& Segment : AttachmentBinary.GetSegments())
+ if (MarshalByLocalRef)
{
- ResponseBuffers.push_back(Segment.AsIoBuffer());
+ const bool IsCompressed = false;
+ MarshalLocal(Path8, LocalRef, AttachmentHash, IsCompressed);
+ }
+ else
+ {
+ *AttachmentInfo++ = {.PayloadSize = AttachmentBinary.GetSize(), .Flags = 0, .AttachmentHash = Attachment.GetHash()};
+
+ for (const SharedBuffer& Segment : AttachmentBinary.GetSegments())
+ {
+ ResponseBuffers.push_back(Segment.AsIoBuffer());
+ }
}
}
else
@@ -108,6 +204,27 @@ FormatPackageMessage(const CbPackage& Data)
return ResponseBuffers;
}
+bool
+IsPackageMessage(IoBuffer Payload)
+{
+ if (!Payload)
+ {
+ return false;
+ }
+
+ BinaryReader Reader(Payload);
+
+ CbPackageHeader Hdr;
+ Reader.Read(&Hdr, sizeof Hdr);
+
+ if (Hdr.HeaderMagic != kCbPkgMagic)
+ {
+ return false;
+ }
+
+ return true;
+}
+
CbPackage
ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint64_t)> CreateBuffer)
{
@@ -145,7 +262,38 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint
Reader.Read(AttachmentBuffer.MutableData(), AttachmentSize);
- if (Entry.Flags & CbAttachmentEntry::kIsCompressed)
+ if (Entry.Flags & CbAttachmentEntry::kIsLocalRef)
+ {
+ // Marshal local reference - a "pointer" to the chunk backing file
+
+ ZEN_ASSERT(AttachmentBuffer.Size() >= sizeof(CbAttachmentReferenceHeader));
+
+ const CbAttachmentReferenceHeader* AttachRefHdr = AttachmentBuffer.Data<CbAttachmentReferenceHeader>();
+ const char8_t* PathPointer = reinterpret_cast<const char8_t*>(AttachRefHdr + 1);
+
+ ZEN_ASSERT(AttachmentBuffer.Size() >= (sizeof(CbAttachmentReferenceHeader) + AttachRefHdr->AbsolutePathLength));
+
+ std::filesystem::path Path{PathPointer};
+
+ if (IoBuffer ChunkReference =
+ IoBufferBuilder::MakeFromFile(Path, AttachRefHdr->PayloadByteOffset, AttachRefHdr->PayloadByteSize))
+ {
+ CompressedBuffer CompBuf(CompressedBuffer::FromCompressed(SharedBuffer(ChunkReference)));
+ CbAttachment Attachment(std::move(CompBuf));
+ Package.AddAttachment(Attachment);
+ }
+ else
+ {
+ // Unable to open chunk reference
+
+ throw std::runtime_error(fmt::format("unable to resolve chunk #{} at '{}' (offset {}, size {})",
+ i,
+ PathToUtf8(Path),
+ AttachRefHdr->PayloadByteOffset,
+ AttachRefHdr->PayloadByteSize));
+ }
+ }
+ else if (Entry.Flags & CbAttachmentEntry::kIsCompressed)
{
CompressedBuffer CompBuf(CompressedBuffer::FromCompressed(SharedBuffer(AttachmentBuffer)));
@@ -153,6 +301,7 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint
{
if (i == 0)
{
+ // First payload is always a compact binary object
Package.SetObject(LoadCompactBinaryObject(std::move(CompBuf)));
}
else
@@ -204,10 +353,8 @@ CbPackageReader::SetPayloadBufferCreator(std::function<IoBuffer(const IoHash& Ci
m_CreateBuffer = CreateBuffer;
}
-/** Process data
- */
uint64_t
-CbPackageReader::ProcessHeaderData(const void* Data, uint64_t DataBytes)
+CbPackageReader::ProcessPackageHeaderData(const void* Data, uint64_t DataBytes)
{
ZEN_ASSERT(m_CurrentState != State::kReadingBuffers);
@@ -232,6 +379,10 @@ CbPackageReader::ProcessHeaderData(const void* Data, uint64_t DataBytes)
for (CbAttachmentEntry& Entry : m_AttachmentEntries)
{
+ // This preallocates memory for payloads but note that for the local references
+ // the caller will need to handle the payload differently (i.e it's a
+ // CbAttachmentReferenceHeader not the actual payload)
+
m_PayloadBuffers.push_back(IoBuffer{Entry.PayloadSize});
}
@@ -244,6 +395,92 @@ CbPackageReader::ProcessHeaderData(const void* Data, uint64_t DataBytes)
}
}
+IoBuffer
+CbPackageReader::MarshalLocalChunkReference(IoBuffer AttachmentBuffer)
+{
+ // Marshal local reference - a "pointer" to the chunk backing file
+
+ ZEN_ASSERT(AttachmentBuffer.Size() >= sizeof(CbAttachmentReferenceHeader));
+
+ const CbAttachmentReferenceHeader* AttachRefHdr = AttachmentBuffer.Data<CbAttachmentReferenceHeader>();
+ const char8_t* PathPointer = reinterpret_cast<const char8_t*>(AttachRefHdr + 1);
+
+ ZEN_ASSERT(AttachmentBuffer.Size() >= (sizeof(CbAttachmentReferenceHeader) + AttachRefHdr->AbsolutePathLength));
+
+ std::filesystem::path Path{PathPointer};
+
+ IoBuffer ChunkReference = IoBufferBuilder::MakeFromFile(Path, AttachRefHdr->PayloadByteOffset, AttachRefHdr->PayloadByteSize);
+
+ if (!ChunkReference)
+ {
+ // Unable to open chunk reference
+
+ throw std::runtime_error(fmt::format("unable to resolve local reference to '{}' (offset {}, size {})",
+ PathToUtf8(Path),
+ AttachRefHdr->PayloadByteOffset,
+ AttachRefHdr->PayloadByteSize));
+ }
+
+ return ChunkReference;
+};
+
+void
+CbPackageReader::Finalize()
+{
+ if (m_AttachmentEntries.empty())
+ {
+ return;
+ }
+
+ m_Attachments.reserve(m_AttachmentEntries.size() - 1);
+
+ int CurrentAttachmentIndex = 0;
+ for (CbAttachmentEntry& Entry : m_AttachmentEntries)
+ {
+ IoBuffer AttachmentBuffer = m_PayloadBuffers[CurrentAttachmentIndex];
+
+ if (CurrentAttachmentIndex == 0)
+ {
+ // Root object
+ if (Entry.Flags & CbAttachmentEntry::kIsObject)
+ {
+ if (Entry.Flags & CbAttachmentEntry::kIsLocalRef)
+ {
+ m_RootObject = LoadCompactBinaryObject(MarshalLocalChunkReference(AttachmentBuffer));
+ }
+ else if (Entry.Flags & CbAttachmentEntry::kIsCompressed)
+ {
+ m_RootObject = LoadCompactBinaryObject(CompressedBuffer::FromCompressed(SharedBuffer(AttachmentBuffer)));
+ }
+ else
+ {
+ m_RootObject = LoadCompactBinaryObject(std::move(AttachmentBuffer));
+ }
+ }
+ else
+ {
+ throw std::runtime_error("missing or invalid root object");
+ }
+ }
+ else if (Entry.Flags & CbAttachmentEntry::kIsLocalRef)
+ {
+ IoBuffer ChunkReference = MarshalLocalChunkReference(AttachmentBuffer);
+
+ if (Entry.Flags & CbAttachmentEntry::kIsCompressed)
+ {
+ m_Attachments.push_back(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(ChunkReference))));
+ }
+ else
+ {
+ m_Attachments.push_back(CbAttachment(
+ CompressedBuffer::Compress(SharedBuffer(ChunkReference), OodleCompressor::NotSet, OodleCompressionLevel::None)));
+ }
+ }
+
+ ++CurrentAttachmentIndex;
+ }
+}
+
/**
______________________ _____________________________
\__ ___/\_ _____// _____/\__ ___/ _____/
@@ -291,15 +528,82 @@ TEST_CASE("CbPackage.Serialization")
};
CbPackageReader Reader;
- uint64_t InitialRead = Reader.ProcessHeaderData(nullptr, 0);
- uint64_t NextBytes = Reader.ProcessHeaderData(ConsumeBytes(InitialRead), InitialRead);
- NextBytes = Reader.ProcessHeaderData(ConsumeBytes(NextBytes), NextBytes);
+ uint64_t InitialRead = Reader.ProcessPackageHeaderData(nullptr, 0);
+ uint64_t NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(InitialRead), InitialRead);
+ NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(NextBytes), NextBytes);
+ auto Buffers = Reader.GetPayloadBuffers();
+
+ for (auto& PayloadBuffer : Buffers)
+ {
+ CopyBytes(PayloadBuffer.MutableData(), PayloadBuffer.GetSize());
+ }
+
+ Reader.Finalize();
+}
+
+TEST_CASE("CbPackage.LocalRef")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ auto Path1 = TempDir.Path() / "abcd";
+ auto Path2 = TempDir.Path() / "efgh";
+
+ {
+ IoBuffer Buffer1 = IoBufferBuilder::MakeCloneFromMemory(MakeMemoryView("abcd"));
+ IoBuffer Buffer2 = IoBufferBuilder::MakeCloneFromMemory(MakeMemoryView("efgh"));
+
+ WriteFile(Path1, Buffer1);
+ WriteFile(Path2, Buffer2);
+ }
+
+ // Make a test package
+
+ IoBuffer FileBuffer1 = IoBufferBuilder::MakeFromFile(Path1);
+ IoBuffer FileBuffer2 = IoBufferBuilder::MakeFromFile(Path2);
+
+ CbAttachment Attach1{SharedBuffer(FileBuffer1)};
+ CbAttachment Attach2{SharedBuffer(FileBuffer2)};
+
+ CbObjectWriter Cbo;
+ Cbo.AddAttachment("abcd", Attach1);
+ Cbo.AddAttachment("efgh", Attach2);
+
+ CbPackage Pkg;
+ Pkg.AddAttachment(Attach1);
+ Pkg.AddAttachment(Attach2);
+ Pkg.SetObject(Cbo.Save());
+
+ SharedBuffer Buffer = FormatPackageMessageBuffer(Pkg, FormatFlags::kAllowLocalReferences).Flatten();
+ const uint8_t* CursorPtr = reinterpret_cast<const uint8_t*>(Buffer.GetData());
+ uint64_t RemainingBytes = Buffer.GetSize();
+
+ auto ConsumeBytes = [&](uint64_t ByteCount) {
+ ZEN_ASSERT(ByteCount <= RemainingBytes);
+ void* ReturnPtr = (void*)CursorPtr;
+ CursorPtr += ByteCount;
+ RemainingBytes -= ByteCount;
+ return ReturnPtr;
+ };
+
+ auto CopyBytes = [&](void* TargetBuffer, uint64_t ByteCount) {
+ ZEN_ASSERT(ByteCount <= RemainingBytes);
+ memcpy(TargetBuffer, CursorPtr, ByteCount);
+ CursorPtr += ByteCount;
+ RemainingBytes -= ByteCount;
+ };
+
+ CbPackageReader Reader;
+ uint64_t InitialRead = Reader.ProcessPackageHeaderData(nullptr, 0);
+ uint64_t NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(InitialRead), InitialRead);
+ NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(NextBytes), NextBytes);
auto Buffers = Reader.GetPayloadBuffers();
for (auto& PayloadBuffer : Buffers)
{
CopyBytes(PayloadBuffer.MutableData(), PayloadBuffer.GetSize());
}
+
+ Reader.Finalize();
}
void
diff --git a/zenhttp/include/zenhttp/httpshared.h b/zenhttp/include/zenhttp/httpshared.h
index a6a61485f..24ce0c85a 100644
--- a/zenhttp/include/zenhttp/httpshared.h
+++ b/zenhttp/include/zenhttp/httpshared.h
@@ -2,10 +2,12 @@
#pragma once
+#include <zencore/compactbinarypackage.h>
#include <zencore/iobuffer.h>
#include <zencore/iohash.h>
#include <functional>
+#include <gsl/gsl-lite.hpp>
namespace zen {
@@ -24,6 +26,13 @@ class CompositeBuffer;
Structures and code related to handling CbPackage transactions
+ CbPackage instances are marshaled across the wire using a distinct message
+ format. We don't use the CbPackage serialization format provided by the
+ CbPackage implementation itself since that does not provide much flexibility
+ in how the attachment payloads are transmitted. The scheme below separates
+ metadata cleanly from payloads and this enables us to more efficiently
+ transmit them either via sendfile/TransmitFile like mechanisms, or by
+ reference/memory mapping in the local case.
*/
struct CbPackageHeader
@@ -43,33 +52,59 @@ enum : uint32_t
struct CbAttachmentEntry
{
- uint64_t PayloadSize;
- uint32_t Flags;
- IoHash AttachmentHash;
+ uint64_t PayloadSize; // Size of the associated payload data in the message
+ uint32_t Flags; // See flags below
+ IoHash AttachmentHash; // Content Id for the attachment
enum
{
kIsCompressed = (1u << 0), // Is marshaled using compressed buffer storage format
kIsObject = (1u << 1), // Is compact binary object
kIsError = (1u << 2), // Is error (compact binary formatted) object
+ kIsLocalRef = (1u << 3), // Is "local reference"
};
};
+struct CbAttachmentReferenceHeader
+{
+ uint64_t PayloadByteOffset = 0;
+ uint64_t PayloadByteSize = ~0u;
+ uint16_t AbsolutePathLength = 0;
+
+ // This header will be followed by UTF8 encoded absolute path to backing file
+};
+
static_assert(sizeof(CbAttachmentEntry) == 32);
-std::vector<IoBuffer> FormatPackageMessage(const CbPackage& Data);
-CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data);
+enum class FormatFlags
+{
+ kDefault = 0,
+ kAllowLocalReferences = (1u << 0)
+};
+
+gsl_DEFINE_ENUM_BITMASK_OPERATORS(FormatFlags);
+
+std::vector<IoBuffer> FormatPackageMessage(const CbPackage& Data, FormatFlags Flags);
+CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data, FormatFlags Flags);
CbPackage ParsePackageMessage(
IoBuffer Payload,
std::function<IoBuffer(const IoHash& Cid, uint64_t Size)> CreateBuffer = [](const IoHash&, uint64_t Size) -> IoBuffer {
return IoBuffer{Size};
});
+bool IsPackageMessage(IoBuffer Payload);
+
+std::vector<IoBuffer> FormatPackageMessage(const CbPackage& Data);
+CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data);
/** Streaming reader for compact binary packages
The goal is to ultimately support zero-copy I/O, but for now there'll be some
copying involved on some platforms at least.
+ This approach to deserializing CbPackage data is more efficient than
+ `ParsePackageMessage` since it does not require the entire message to
+ be resident in a memory buffer
+
*/
class CbPackageReader
{
@@ -79,11 +114,18 @@ public:
void SetPayloadBufferCreator(std::function<IoBuffer(const IoHash& Cid, uint64_t Size)> CreateBuffer);
- /** Process header data
+ /** Process compact binary package data stream
+
+ The data stream must be in the serialization format produced by FormatPackageMessage
+
+ \return How many bytes must be fed to this function in the next call
*/
- uint64_t ProcessHeaderData(const void* Data, uint64_t DataBytes);
+ uint64_t ProcessPackageHeaderData(const void* Data, uint64_t DataBytes);
- std::span<IoBuffer> GetPayloadBuffers() { return m_PayloadBuffers; }
+ void Finalize();
+ const std::vector<CbAttachment>& GetAttachments() { return m_Attachments; }
+ CbObject GetRootObject() { return m_RootObject; }
+ std::span<IoBuffer> GetPayloadBuffers() { return m_PayloadBuffers; }
private:
enum class State
@@ -97,7 +139,11 @@ private:
std::function<IoBuffer(const IoHash& Cid, uint64_t Size)> m_CreateBuffer;
std::vector<IoBuffer> m_PayloadBuffers;
std::vector<CbAttachmentEntry> m_AttachmentEntries;
+ std::vector<CbAttachment> m_Attachments;
+ CbObject m_RootObject;
CbPackageHeader m_PackageHeader;
+
+ IoBuffer MarshalLocalChunkReference(IoBuffer AttachmentBuffer);
};
void forcelink_httpshared();