aboutsummaryrefslogtreecommitdiff
path: root/zenhttp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2022-06-10 15:42:53 +0200
committerStefan Boberg <[email protected]>2022-06-10 15:42:53 +0200
commitdb8e1c167801fb5ff8d96d0fba7dee3c869e61fa (patch)
tree1d9b41b49affcb79cec337802c1614c0af2a6ac5 /zenhttp
parentcidstore: propagate the correct content type (diff)
downloadzen-db8e1c167801fb5ff8d96d0fba7dee3c869e61fa.tar.xz
zen-db8e1c167801fb5ff8d96d0fba7dee3c869e61fa.zip
cbpackage: added initial support for marshaling of attachment by local reference
this mode allows local clients to avoid unnecessary copying of data from zen and instead reference data directly
Diffstat (limited to 'zenhttp')
-rw-r--r--zenhttp/httpshared.cpp348
-rw-r--r--zenhttp/include/zenhttp/httpshared.h62
2 files changed, 380 insertions, 30 deletions
diff --git a/zenhttp/httpshared.cpp b/zenhttp/httpshared.cpp
index f2ce17e16..7b92a0587 100644
--- a/zenhttp/httpshared.cpp
+++ b/zenhttp/httpshared.cpp
@@ -5,20 +5,35 @@
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
#include <zencore/compositebuffer.h>
+#include <zencore/filesystem.h>
#include <zencore/iobuffer.h>
#include <zencore/iohash.h>
#include <zencore/stream.h>
#include <zencore/testing.h>
+#include <zencore/testutils.h>
+
+#include <zencore/fmtutils.h>
#include <span>
#include <vector>
namespace zen {
+std::vector<IoBuffer>
+FormatPackageMessage(const CbPackage& Data)
+{
+ return FormatPackageMessage(Data, FormatFlags::kDefault);
+}
CompositeBuffer
FormatPackageMessageBuffer(const CbPackage& Data)
{
- std::vector<IoBuffer> Message = FormatPackageMessage(Data);
+ return FormatPackageMessageBuffer(Data, FormatFlags::kDefault);
+}
+
+CompositeBuffer
+FormatPackageMessageBuffer(const CbPackage& Data, FormatFlags Flags)
+{
+ std::vector<IoBuffer> Message = FormatPackageMessage(Data, Flags);
std::vector<SharedBuffer> Buffers;
@@ -31,11 +46,11 @@ FormatPackageMessageBuffer(const CbPackage& Data)
}
std::vector<IoBuffer>
-FormatPackageMessage(const CbPackage& Data)
+FormatPackageMessage(const CbPackage& Data, FormatFlags Flags)
{
const std::span<const CbAttachment>& Attachments = Data.GetAttachments();
+ std::vector<IoBuffer> ResponseBuffers;
- std::vector<IoBuffer> ResponseBuffers;
ResponseBuffers.reserve(3 + Attachments.size()); // TODO: may want to use an additional fudge factor here to avoid growing since each
// attachment is likely to consist of several buffers
@@ -47,9 +62,8 @@ FormatPackageMessage(const CbPackage& Data)
// Attachment metadata array
- IoBuffer AttachmentMetadataBuffer = IoBuffer{sizeof(CbAttachmentEntry) * (Attachments.size() + /* root */ 1)};
-
- CbAttachmentEntry* AttachmentInfo = reinterpret_cast<CbAttachmentEntry*>(AttachmentMetadataBuffer.MutableData());
+ IoBuffer AttachmentMetadataBuffer = IoBuffer{sizeof(CbAttachmentEntry) * (Attachments.size() + /* root */ 1)};
+ CbAttachmentEntry* AttachmentInfo = reinterpret_cast<CbAttachmentEntry*>(AttachmentMetadataBuffer.MutableData());
ResponseBuffers.push_back(AttachmentMetadataBuffer); // Attachment metadata
@@ -62,6 +76,44 @@ FormatPackageMessage(const CbPackage& Data)
// Attachment payloads
+ auto MarshalLocal = [&AttachmentInfo, &ResponseBuffers](const std::string& Path8,
+ CbAttachmentReferenceHeader& LocalRef,
+ const IoHash& AttachmentHash,
+ bool IsCompressed) {
+ IoBuffer RefBuffer = IoBuffer(sizeof CbAttachmentReferenceHeader + Path8.size());
+
+ CbAttachmentReferenceHeader* RefHdr = RefBuffer.MutableData<CbAttachmentReferenceHeader>();
+ *RefHdr++ = LocalRef;
+ memcpy(RefHdr, Path8.data(), Path8.size());
+
+ *AttachmentInfo++ = {.PayloadSize = RefBuffer.GetSize(),
+ .Flags = (IsCompressed ? CbAttachmentEntry::kIsCompressed : 0u) | CbAttachmentEntry::kIsLocalRef,
+ .AttachmentHash = AttachmentHash};
+
+ ResponseBuffers.push_back(std::move(RefBuffer));
+ };
+
+ auto IsLocalRef = [](const CompositeBuffer& AttachmentBinary, CbAttachmentReferenceHeader& LocalRef, std::string& Path8) -> bool {
+ const SharedBuffer& Segment = AttachmentBinary.GetSegments().front();
+ IoBufferFileReference Ref;
+ const IoBuffer& SegmentBuffer = Segment.AsIoBuffer();
+
+ if (!SegmentBuffer.GetFileReference(Ref))
+ {
+ return false;
+ }
+
+ ExtendablePathBuilder<256> LocalRefFile;
+ LocalRefFile.Append(std::filesystem::absolute(PathFromHandle(Ref.FileHandle)));
+ Path8 = LocalRefFile.ToUtf8();
+
+ LocalRef.AbsolutePathLength = gsl::narrow<uint16_t>(Path8.size());
+ LocalRef.PayloadByteOffset = Ref.FileChunkOffset;
+ LocalRef.PayloadByteSize = Ref.FileChunkSize;
+
+ return true;
+ };
+
for (const CbAttachment& Attachment : Attachments)
{
if (Attachment.IsNull())
@@ -70,15 +122,39 @@ FormatPackageMessage(const CbPackage& Data)
}
else if (CompressedBuffer AttachmentBuffer = Attachment.AsCompressedBinary())
{
- CompositeBuffer Compressed = AttachmentBuffer.GetCompressed();
+ CompositeBuffer Compressed = AttachmentBuffer.GetCompressed();
+ IoHash AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash());
+
+ // If the data is either not backed by a file, or there are multiple
+ // fragments then we cannot marshal it by local reference. We might
+ // want/need to extend this in the future to allow multiple chunk
+ // segments to be marshaled at once
- *AttachmentInfo++ = {.PayloadSize = AttachmentBuffer.GetCompressedSize(),
- .Flags = CbAttachmentEntry::kIsCompressed,
- .AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash())};
+ bool MarshalByLocalRef = EnumHasAllFlags(Flags, FormatFlags::kAllowLocalReferences) && (Compressed.GetSegments().size() == 1);
- for (const SharedBuffer& Segment : Compressed.GetSegments())
+ CbAttachmentReferenceHeader LocalRef;
+ std::string Path8;
+
+ if (MarshalByLocalRef)
{
- ResponseBuffers.push_back(Segment.AsIoBuffer());
+ MarshalByLocalRef = IsLocalRef(Compressed, LocalRef, Path8);
+ }
+
+ if (MarshalByLocalRef)
+ {
+ const bool IsCompressed = true;
+ MarshalLocal(Path8, LocalRef, AttachmentHash, IsCompressed);
+ }
+ else
+ {
+ *AttachmentInfo++ = {.PayloadSize = AttachmentBuffer.GetCompressedSize(),
+ .Flags = CbAttachmentEntry::kIsCompressed,
+ .AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash())};
+
+ for (const SharedBuffer& Segment : Compressed.GetSegments())
+ {
+ ResponseBuffers.push_back(Segment.AsIoBuffer());
+ }
}
}
else if (CbObject AttachmentObject = Attachment.AsObject())
@@ -92,11 +168,31 @@ FormatPackageMessage(const CbPackage& Data)
}
else if (CompositeBuffer AttachmentBinary = Attachment.AsCompositeBinary())
{
- *AttachmentInfo++ = {.PayloadSize = AttachmentBinary.GetSize(), .Flags = 0, .AttachmentHash = Attachment.GetHash()};
+ IoHash AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash());
+ bool MarshalByLocalRef =
+ EnumHasAllFlags(Flags, FormatFlags::kAllowLocalReferences) && (AttachmentBinary.GetSegments().size() == 1);
+
+ CbAttachmentReferenceHeader LocalRef;
+ std::string Path8;
+
+ if (MarshalByLocalRef)
+ {
+ MarshalByLocalRef = IsLocalRef(AttachmentBinary, LocalRef, Path8);
+ }
- for (const SharedBuffer& Segment : AttachmentBinary.GetSegments())
+ if (MarshalByLocalRef)
{
- ResponseBuffers.push_back(Segment.AsIoBuffer());
+ const bool IsCompressed = false;
+ MarshalLocal(Path8, LocalRef, AttachmentHash, IsCompressed);
+ }
+ else
+ {
+ *AttachmentInfo++ = {.PayloadSize = AttachmentBinary.GetSize(), .Flags = 0, .AttachmentHash = Attachment.GetHash()};
+
+ for (const SharedBuffer& Segment : AttachmentBinary.GetSegments())
+ {
+ ResponseBuffers.push_back(Segment.AsIoBuffer());
+ }
}
}
else
@@ -108,6 +204,27 @@ FormatPackageMessage(const CbPackage& Data)
return ResponseBuffers;
}
+bool
+IsPackageMessage(IoBuffer Payload)
+{
+ if (!Payload)
+ {
+ return false;
+ }
+
+ BinaryReader Reader(Payload);
+
+ CbPackageHeader Hdr;
+ Reader.Read(&Hdr, sizeof Hdr);
+
+ if (Hdr.HeaderMagic != kCbPkgMagic)
+ {
+ return false;
+ }
+
+ return true;
+}
+
CbPackage
ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint64_t)> CreateBuffer)
{
@@ -145,7 +262,38 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint
Reader.Read(AttachmentBuffer.MutableData(), AttachmentSize);
- if (Entry.Flags & CbAttachmentEntry::kIsCompressed)
+ if (Entry.Flags & CbAttachmentEntry::kIsLocalRef)
+ {
+ // Marshal local reference - a "pointer" to the chunk backing file
+
+ ZEN_ASSERT(AttachmentBuffer.Size() >= sizeof(CbAttachmentReferenceHeader));
+
+ const CbAttachmentReferenceHeader* AttachRefHdr = AttachmentBuffer.Data<CbAttachmentReferenceHeader>();
+ const char8_t* PathPointer = reinterpret_cast<const char8_t*>(AttachRefHdr + 1);
+
+ ZEN_ASSERT(AttachmentBuffer.Size() >= (sizeof(CbAttachmentReferenceHeader) + AttachRefHdr->AbsolutePathLength));
+
+ std::filesystem::path Path{PathPointer};
+
+ if (IoBuffer ChunkReference =
+ IoBufferBuilder::MakeFromFile(Path, AttachRefHdr->PayloadByteOffset, AttachRefHdr->PayloadByteSize))
+ {
+ CompressedBuffer CompBuf(CompressedBuffer::FromCompressed(SharedBuffer(ChunkReference)));
+ CbAttachment Attachment(std::move(CompBuf));
+ Package.AddAttachment(Attachment);
+ }
+ else
+ {
+ // Unable to open chunk reference
+
+ throw std::runtime_error(fmt::format("unable to resolve chunk #{} at '{}' (offset {}, size {})",
+ i,
+ PathToUtf8(Path),
+ AttachRefHdr->PayloadByteOffset,
+ AttachRefHdr->PayloadByteSize));
+ }
+ }
+ else if (Entry.Flags & CbAttachmentEntry::kIsCompressed)
{
CompressedBuffer CompBuf(CompressedBuffer::FromCompressed(SharedBuffer(AttachmentBuffer)));
@@ -153,6 +301,7 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint
{
if (i == 0)
{
+ // First payload is always a compact binary object
Package.SetObject(LoadCompactBinaryObject(std::move(CompBuf)));
}
else
@@ -204,10 +353,8 @@ CbPackageReader::SetPayloadBufferCreator(std::function<IoBuffer(const IoHash& Ci
m_CreateBuffer = CreateBuffer;
}
-/** Process data
- */
uint64_t
-CbPackageReader::ProcessHeaderData(const void* Data, uint64_t DataBytes)
+CbPackageReader::ProcessPackageHeaderData(const void* Data, uint64_t DataBytes)
{
ZEN_ASSERT(m_CurrentState != State::kReadingBuffers);
@@ -232,6 +379,10 @@ CbPackageReader::ProcessHeaderData(const void* Data, uint64_t DataBytes)
for (CbAttachmentEntry& Entry : m_AttachmentEntries)
{
+ // This preallocates memory for payloads but note that for the local references
+ // the caller will need to handle the payload differently (i.e it's a
+ // CbAttachmentReferenceHeader not the actual payload)
+
m_PayloadBuffers.push_back(IoBuffer{Entry.PayloadSize});
}
@@ -244,6 +395,92 @@ CbPackageReader::ProcessHeaderData(const void* Data, uint64_t DataBytes)
}
}
+IoBuffer
+CbPackageReader::MarshalLocalChunkReference(IoBuffer AttachmentBuffer)
+{
+ // Marshal local reference - a "pointer" to the chunk backing file
+
+ ZEN_ASSERT(AttachmentBuffer.Size() >= sizeof(CbAttachmentReferenceHeader));
+
+ const CbAttachmentReferenceHeader* AttachRefHdr = AttachmentBuffer.Data<CbAttachmentReferenceHeader>();
+ const char8_t* PathPointer = reinterpret_cast<const char8_t*>(AttachRefHdr + 1);
+
+ ZEN_ASSERT(AttachmentBuffer.Size() >= (sizeof(CbAttachmentReferenceHeader) + AttachRefHdr->AbsolutePathLength));
+
+ std::filesystem::path Path{PathPointer};
+
+ IoBuffer ChunkReference = IoBufferBuilder::MakeFromFile(Path, AttachRefHdr->PayloadByteOffset, AttachRefHdr->PayloadByteSize);
+
+ if (!ChunkReference)
+ {
+ // Unable to open chunk reference
+
+ throw std::runtime_error(fmt::format("unable to resolve local reference to '{}' (offset {}, size {})",
+ PathToUtf8(Path),
+ AttachRefHdr->PayloadByteOffset,
+ AttachRefHdr->PayloadByteSize));
+ }
+
+ return ChunkReference;
+};
+
+void
+CbPackageReader::Finalize()
+{
+ if (m_AttachmentEntries.empty())
+ {
+ return;
+ }
+
+ m_Attachments.reserve(m_AttachmentEntries.size() - 1);
+
+ int CurrentAttachmentIndex = 0;
+ for (CbAttachmentEntry& Entry : m_AttachmentEntries)
+ {
+ IoBuffer AttachmentBuffer = m_PayloadBuffers[CurrentAttachmentIndex];
+
+ if (CurrentAttachmentIndex == 0)
+ {
+ // Root object
+ if (Entry.Flags & CbAttachmentEntry::kIsObject)
+ {
+ if (Entry.Flags & CbAttachmentEntry::kIsLocalRef)
+ {
+ m_RootObject = LoadCompactBinaryObject(MarshalLocalChunkReference(AttachmentBuffer));
+ }
+ else if (Entry.Flags & CbAttachmentEntry::kIsCompressed)
+ {
+ m_RootObject = LoadCompactBinaryObject(CompressedBuffer::FromCompressed(SharedBuffer(AttachmentBuffer)));
+ }
+ else
+ {
+ m_RootObject = LoadCompactBinaryObject(std::move(AttachmentBuffer));
+ }
+ }
+ else
+ {
+ throw std::runtime_error("missing or invalid root object");
+ }
+ }
+ else if (Entry.Flags & CbAttachmentEntry::kIsLocalRef)
+ {
+ IoBuffer ChunkReference = MarshalLocalChunkReference(AttachmentBuffer);
+
+ if (Entry.Flags & CbAttachmentEntry::kIsCompressed)
+ {
+ m_Attachments.push_back(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(ChunkReference))));
+ }
+ else
+ {
+ m_Attachments.push_back(CbAttachment(
+ CompressedBuffer::Compress(SharedBuffer(ChunkReference), OodleCompressor::NotSet, OodleCompressionLevel::None)));
+ }
+ }
+
+ ++CurrentAttachmentIndex;
+ }
+}
+
/**
______________________ _____________________________
\__ ___/\_ _____// _____/\__ ___/ _____/
@@ -291,15 +528,82 @@ TEST_CASE("CbPackage.Serialization")
};
CbPackageReader Reader;
- uint64_t InitialRead = Reader.ProcessHeaderData(nullptr, 0);
- uint64_t NextBytes = Reader.ProcessHeaderData(ConsumeBytes(InitialRead), InitialRead);
- NextBytes = Reader.ProcessHeaderData(ConsumeBytes(NextBytes), NextBytes);
+ uint64_t InitialRead = Reader.ProcessPackageHeaderData(nullptr, 0);
+ uint64_t NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(InitialRead), InitialRead);
+ NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(NextBytes), NextBytes);
+ auto Buffers = Reader.GetPayloadBuffers();
+
+ for (auto& PayloadBuffer : Buffers)
+ {
+ CopyBytes(PayloadBuffer.MutableData(), PayloadBuffer.GetSize());
+ }
+
+ Reader.Finalize();
+}
+
+TEST_CASE("CbPackage.LocalRef")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ auto Path1 = TempDir.Path() / "abcd";
+ auto Path2 = TempDir.Path() / "efgh";
+
+ {
+ IoBuffer Buffer1 = IoBufferBuilder::MakeCloneFromMemory(MakeMemoryView("abcd"));
+ IoBuffer Buffer2 = IoBufferBuilder::MakeCloneFromMemory(MakeMemoryView("efgh"));
+
+ WriteFile(Path1, Buffer1);
+ WriteFile(Path2, Buffer2);
+ }
+
+ // Make a test package
+
+ IoBuffer FileBuffer1 = IoBufferBuilder::MakeFromFile(Path1);
+ IoBuffer FileBuffer2 = IoBufferBuilder::MakeFromFile(Path2);
+
+ CbAttachment Attach1{SharedBuffer(FileBuffer1)};
+ CbAttachment Attach2{SharedBuffer(FileBuffer2)};
+
+ CbObjectWriter Cbo;
+ Cbo.AddAttachment("abcd", Attach1);
+ Cbo.AddAttachment("efgh", Attach2);
+
+ CbPackage Pkg;
+ Pkg.AddAttachment(Attach1);
+ Pkg.AddAttachment(Attach2);
+ Pkg.SetObject(Cbo.Save());
+
+ SharedBuffer Buffer = FormatPackageMessageBuffer(Pkg, FormatFlags::kAllowLocalReferences).Flatten();
+ const uint8_t* CursorPtr = reinterpret_cast<const uint8_t*>(Buffer.GetData());
+ uint64_t RemainingBytes = Buffer.GetSize();
+
+ auto ConsumeBytes = [&](uint64_t ByteCount) {
+ ZEN_ASSERT(ByteCount <= RemainingBytes);
+ void* ReturnPtr = (void*)CursorPtr;
+ CursorPtr += ByteCount;
+ RemainingBytes -= ByteCount;
+ return ReturnPtr;
+ };
+
+ auto CopyBytes = [&](void* TargetBuffer, uint64_t ByteCount) {
+ ZEN_ASSERT(ByteCount <= RemainingBytes);
+ memcpy(TargetBuffer, CursorPtr, ByteCount);
+ CursorPtr += ByteCount;
+ RemainingBytes -= ByteCount;
+ };
+
+ CbPackageReader Reader;
+ uint64_t InitialRead = Reader.ProcessPackageHeaderData(nullptr, 0);
+ uint64_t NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(InitialRead), InitialRead);
+ NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(NextBytes), NextBytes);
auto Buffers = Reader.GetPayloadBuffers();
for (auto& PayloadBuffer : Buffers)
{
CopyBytes(PayloadBuffer.MutableData(), PayloadBuffer.GetSize());
}
+
+ Reader.Finalize();
}
void
diff --git a/zenhttp/include/zenhttp/httpshared.h b/zenhttp/include/zenhttp/httpshared.h
index a6a61485f..24ce0c85a 100644
--- a/zenhttp/include/zenhttp/httpshared.h
+++ b/zenhttp/include/zenhttp/httpshared.h
@@ -2,10 +2,12 @@
#pragma once
+#include <zencore/compactbinarypackage.h>
#include <zencore/iobuffer.h>
#include <zencore/iohash.h>
#include <functional>
+#include <gsl/gsl-lite.hpp>
namespace zen {
@@ -24,6 +26,13 @@ class CompositeBuffer;
Structures and code related to handling CbPackage transactions
+ CbPackage instances are marshaled across the wire using a distinct message
+ format. We don't use the CbPackage serialization format provided by the
+ CbPackage implementation itself since that does not provide much flexibility
+ in how the attachment payloads are transmitted. The scheme below separates
+ metadata cleanly from payloads and this enables us to more efficiently
+ transmit them either via sendfile/TransmitFile like mechanisms, or by
+ reference/memory mapping in the local case.
*/
struct CbPackageHeader
@@ -43,33 +52,59 @@ enum : uint32_t
struct CbAttachmentEntry
{
- uint64_t PayloadSize;
- uint32_t Flags;
- IoHash AttachmentHash;
+ uint64_t PayloadSize; // Size of the associated payload data in the message
+ uint32_t Flags; // See flags below
+ IoHash AttachmentHash; // Content Id for the attachment
enum
{
kIsCompressed = (1u << 0), // Is marshaled using compressed buffer storage format
kIsObject = (1u << 1), // Is compact binary object
kIsError = (1u << 2), // Is error (compact binary formatted) object
+ kIsLocalRef = (1u << 3), // Is "local reference"
};
};
+struct CbAttachmentReferenceHeader
+{
+ uint64_t PayloadByteOffset = 0;
+ uint64_t PayloadByteSize = ~0u;
+ uint16_t AbsolutePathLength = 0;
+
+ // This header will be followed by UTF8 encoded absolute path to backing file
+};
+
static_assert(sizeof(CbAttachmentEntry) == 32);
-std::vector<IoBuffer> FormatPackageMessage(const CbPackage& Data);
-CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data);
+enum class FormatFlags
+{
+ kDefault = 0,
+ kAllowLocalReferences = (1u << 0)
+};
+
+gsl_DEFINE_ENUM_BITMASK_OPERATORS(FormatFlags);
+
+std::vector<IoBuffer> FormatPackageMessage(const CbPackage& Data, FormatFlags Flags);
+CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data, FormatFlags Flags);
CbPackage ParsePackageMessage(
IoBuffer Payload,
std::function<IoBuffer(const IoHash& Cid, uint64_t Size)> CreateBuffer = [](const IoHash&, uint64_t Size) -> IoBuffer {
return IoBuffer{Size};
});
+bool IsPackageMessage(IoBuffer Payload);
+
+std::vector<IoBuffer> FormatPackageMessage(const CbPackage& Data);
+CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data);
/** Streaming reader for compact binary packages
The goal is to ultimately support zero-copy I/O, but for now there'll be some
copying involved on some platforms at least.
+ This approach to deserializing CbPackage data is more efficient than
+ `ParsePackageMessage` since it does not require the entire message to
+ be resident in a memory buffer
+
*/
class CbPackageReader
{
@@ -79,11 +114,18 @@ public:
void SetPayloadBufferCreator(std::function<IoBuffer(const IoHash& Cid, uint64_t Size)> CreateBuffer);
- /** Process header data
+ /** Process compact binary package data stream
+
+ The data stream must be in the serialization format produced by FormatPackageMessage
+
+ \return How many bytes must be fed to this function in the next call
*/
- uint64_t ProcessHeaderData(const void* Data, uint64_t DataBytes);
+ uint64_t ProcessPackageHeaderData(const void* Data, uint64_t DataBytes);
- std::span<IoBuffer> GetPayloadBuffers() { return m_PayloadBuffers; }
+ void Finalize();
+ const std::vector<CbAttachment>& GetAttachments() { return m_Attachments; }
+ CbObject GetRootObject() { return m_RootObject; }
+ std::span<IoBuffer> GetPayloadBuffers() { return m_PayloadBuffers; }
private:
enum class State
@@ -97,7 +139,11 @@ private:
std::function<IoBuffer(const IoHash& Cid, uint64_t Size)> m_CreateBuffer;
std::vector<IoBuffer> m_PayloadBuffers;
std::vector<CbAttachmentEntry> m_AttachmentEntries;
+ std::vector<CbAttachment> m_Attachments;
+ CbObject m_RootObject;
CbPackageHeader m_PackageHeader;
+
+ IoBuffer MarshalLocalChunkReference(IoBuffer AttachmentBuffer);
};
void forcelink_httpshared();