aboutsummaryrefslogtreecommitdiff
path: root/zenhttp/httpshared.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'zenhttp/httpshared.cpp')
-rw-r--r--zenhttp/httpshared.cpp348
1 files changed, 326 insertions, 22 deletions
diff --git a/zenhttp/httpshared.cpp b/zenhttp/httpshared.cpp
index f2ce17e16..7b92a0587 100644
--- a/zenhttp/httpshared.cpp
+++ b/zenhttp/httpshared.cpp
@@ -5,20 +5,35 @@
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
#include <zencore/compositebuffer.h>
+#include <zencore/filesystem.h>
#include <zencore/iobuffer.h>
#include <zencore/iohash.h>
#include <zencore/stream.h>
#include <zencore/testing.h>
+#include <zencore/testutils.h>
+
+#include <zencore/fmtutils.h>
#include <span>
#include <vector>
namespace zen {
+std::vector<IoBuffer>
+FormatPackageMessage(const CbPackage& Data)
+{
+ return FormatPackageMessage(Data, FormatFlags::kDefault);
+}
CompositeBuffer
FormatPackageMessageBuffer(const CbPackage& Data)
{
- std::vector<IoBuffer> Message = FormatPackageMessage(Data);
+ return FormatPackageMessageBuffer(Data, FormatFlags::kDefault);
+}
+
+CompositeBuffer
+FormatPackageMessageBuffer(const CbPackage& Data, FormatFlags Flags)
+{
+ std::vector<IoBuffer> Message = FormatPackageMessage(Data, Flags);
std::vector<SharedBuffer> Buffers;
@@ -31,11 +46,11 @@ FormatPackageMessageBuffer(const CbPackage& Data)
}
std::vector<IoBuffer>
-FormatPackageMessage(const CbPackage& Data)
+FormatPackageMessage(const CbPackage& Data, FormatFlags Flags)
{
const std::span<const CbAttachment>& Attachments = Data.GetAttachments();
+ std::vector<IoBuffer> ResponseBuffers;
- std::vector<IoBuffer> ResponseBuffers;
ResponseBuffers.reserve(3 + Attachments.size()); // TODO: may want to use an additional fudge factor here to avoid growing since each
// attachment is likely to consist of several buffers
@@ -47,9 +62,8 @@ FormatPackageMessage(const CbPackage& Data)
// Attachment metadata array
- IoBuffer AttachmentMetadataBuffer = IoBuffer{sizeof(CbAttachmentEntry) * (Attachments.size() + /* root */ 1)};
-
- CbAttachmentEntry* AttachmentInfo = reinterpret_cast<CbAttachmentEntry*>(AttachmentMetadataBuffer.MutableData());
+ IoBuffer AttachmentMetadataBuffer = IoBuffer{sizeof(CbAttachmentEntry) * (Attachments.size() + /* root */ 1)};
+ CbAttachmentEntry* AttachmentInfo = reinterpret_cast<CbAttachmentEntry*>(AttachmentMetadataBuffer.MutableData());
ResponseBuffers.push_back(AttachmentMetadataBuffer); // Attachment metadata
@@ -62,6 +76,44 @@ FormatPackageMessage(const CbPackage& Data)
// Attachment payloads
+ auto MarshalLocal = [&AttachmentInfo, &ResponseBuffers](const std::string& Path8,
+ CbAttachmentReferenceHeader& LocalRef,
+ const IoHash& AttachmentHash,
+ bool IsCompressed) {
+ IoBuffer RefBuffer = IoBuffer(sizeof CbAttachmentReferenceHeader + Path8.size());
+
+ CbAttachmentReferenceHeader* RefHdr = RefBuffer.MutableData<CbAttachmentReferenceHeader>();
+ *RefHdr++ = LocalRef;
+ memcpy(RefHdr, Path8.data(), Path8.size());
+
+ *AttachmentInfo++ = {.PayloadSize = RefBuffer.GetSize(),
+ .Flags = (IsCompressed ? CbAttachmentEntry::kIsCompressed : 0u) | CbAttachmentEntry::kIsLocalRef,
+ .AttachmentHash = AttachmentHash};
+
+ ResponseBuffers.push_back(std::move(RefBuffer));
+ };
+
+ auto IsLocalRef = [](const CompositeBuffer& AttachmentBinary, CbAttachmentReferenceHeader& LocalRef, std::string& Path8) -> bool {
+ const SharedBuffer& Segment = AttachmentBinary.GetSegments().front();
+ IoBufferFileReference Ref;
+ const IoBuffer& SegmentBuffer = Segment.AsIoBuffer();
+
+ if (!SegmentBuffer.GetFileReference(Ref))
+ {
+ return false;
+ }
+
+ ExtendablePathBuilder<256> LocalRefFile;
+ LocalRefFile.Append(std::filesystem::absolute(PathFromHandle(Ref.FileHandle)));
+ Path8 = LocalRefFile.ToUtf8();
+
+ LocalRef.AbsolutePathLength = gsl::narrow<uint16_t>(Path8.size());
+ LocalRef.PayloadByteOffset = Ref.FileChunkOffset;
+ LocalRef.PayloadByteSize = Ref.FileChunkSize;
+
+ return true;
+ };
+
for (const CbAttachment& Attachment : Attachments)
{
if (Attachment.IsNull())
@@ -70,15 +122,39 @@ FormatPackageMessage(const CbPackage& Data)
}
else if (CompressedBuffer AttachmentBuffer = Attachment.AsCompressedBinary())
{
- CompositeBuffer Compressed = AttachmentBuffer.GetCompressed();
+ CompositeBuffer Compressed = AttachmentBuffer.GetCompressed();
+ IoHash AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash());
+
+ // If the data is either not backed by a file, or there are multiple
+ // fragments then we cannot marshal it by local reference. We might
+ // want/need to extend this in the future to allow multiple chunk
+ // segments to be marshaled at once
- *AttachmentInfo++ = {.PayloadSize = AttachmentBuffer.GetCompressedSize(),
- .Flags = CbAttachmentEntry::kIsCompressed,
- .AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash())};
+ bool MarshalByLocalRef = EnumHasAllFlags(Flags, FormatFlags::kAllowLocalReferences) && (Compressed.GetSegments().size() == 1);
- for (const SharedBuffer& Segment : Compressed.GetSegments())
+ CbAttachmentReferenceHeader LocalRef;
+ std::string Path8;
+
+ if (MarshalByLocalRef)
{
- ResponseBuffers.push_back(Segment.AsIoBuffer());
+ MarshalByLocalRef = IsLocalRef(Compressed, LocalRef, Path8);
+ }
+
+ if (MarshalByLocalRef)
+ {
+ const bool IsCompressed = true;
+ MarshalLocal(Path8, LocalRef, AttachmentHash, IsCompressed);
+ }
+ else
+ {
+ *AttachmentInfo++ = {.PayloadSize = AttachmentBuffer.GetCompressedSize(),
+ .Flags = CbAttachmentEntry::kIsCompressed,
+ .AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash())};
+
+ for (const SharedBuffer& Segment : Compressed.GetSegments())
+ {
+ ResponseBuffers.push_back(Segment.AsIoBuffer());
+ }
}
}
else if (CbObject AttachmentObject = Attachment.AsObject())
@@ -92,11 +168,31 @@ FormatPackageMessage(const CbPackage& Data)
}
else if (CompositeBuffer AttachmentBinary = Attachment.AsCompositeBinary())
{
- *AttachmentInfo++ = {.PayloadSize = AttachmentBinary.GetSize(), .Flags = 0, .AttachmentHash = Attachment.GetHash()};
+ IoHash AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash());
+ bool MarshalByLocalRef =
+ EnumHasAllFlags(Flags, FormatFlags::kAllowLocalReferences) && (AttachmentBinary.GetSegments().size() == 1);
+
+ CbAttachmentReferenceHeader LocalRef;
+ std::string Path8;
+
+ if (MarshalByLocalRef)
+ {
+ MarshalByLocalRef = IsLocalRef(AttachmentBinary, LocalRef, Path8);
+ }
- for (const SharedBuffer& Segment : AttachmentBinary.GetSegments())
+ if (MarshalByLocalRef)
{
- ResponseBuffers.push_back(Segment.AsIoBuffer());
+ const bool IsCompressed = false;
+ MarshalLocal(Path8, LocalRef, AttachmentHash, IsCompressed);
+ }
+ else
+ {
+ *AttachmentInfo++ = {.PayloadSize = AttachmentBinary.GetSize(), .Flags = 0, .AttachmentHash = Attachment.GetHash()};
+
+ for (const SharedBuffer& Segment : AttachmentBinary.GetSegments())
+ {
+ ResponseBuffers.push_back(Segment.AsIoBuffer());
+ }
}
}
else
@@ -108,6 +204,27 @@ FormatPackageMessage(const CbPackage& Data)
return ResponseBuffers;
}
+bool
+IsPackageMessage(IoBuffer Payload)
+{
+ if (!Payload)
+ {
+ return false;
+ }
+
+ BinaryReader Reader(Payload);
+
+ CbPackageHeader Hdr;
+ Reader.Read(&Hdr, sizeof Hdr);
+
+ if (Hdr.HeaderMagic != kCbPkgMagic)
+ {
+ return false;
+ }
+
+ return true;
+}
+
CbPackage
ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint64_t)> CreateBuffer)
{
@@ -145,7 +262,38 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint
Reader.Read(AttachmentBuffer.MutableData(), AttachmentSize);
- if (Entry.Flags & CbAttachmentEntry::kIsCompressed)
+ if (Entry.Flags & CbAttachmentEntry::kIsLocalRef)
+ {
+ // Marshal local reference - a "pointer" to the chunk backing file
+
+ ZEN_ASSERT(AttachmentBuffer.Size() >= sizeof(CbAttachmentReferenceHeader));
+
+ const CbAttachmentReferenceHeader* AttachRefHdr = AttachmentBuffer.Data<CbAttachmentReferenceHeader>();
+ const char8_t* PathPointer = reinterpret_cast<const char8_t*>(AttachRefHdr + 1);
+
+ ZEN_ASSERT(AttachmentBuffer.Size() >= (sizeof(CbAttachmentReferenceHeader) + AttachRefHdr->AbsolutePathLength));
+
+ std::filesystem::path Path{PathPointer};
+
+ if (IoBuffer ChunkReference =
+ IoBufferBuilder::MakeFromFile(Path, AttachRefHdr->PayloadByteOffset, AttachRefHdr->PayloadByteSize))
+ {
+ CompressedBuffer CompBuf(CompressedBuffer::FromCompressed(SharedBuffer(ChunkReference)));
+ CbAttachment Attachment(std::move(CompBuf));
+ Package.AddAttachment(Attachment);
+ }
+ else
+ {
+ // Unable to open chunk reference
+
+ throw std::runtime_error(fmt::format("unable to resolve chunk #{} at '{}' (offset {}, size {})",
+ i,
+ PathToUtf8(Path),
+ AttachRefHdr->PayloadByteOffset,
+ AttachRefHdr->PayloadByteSize));
+ }
+ }
+ else if (Entry.Flags & CbAttachmentEntry::kIsCompressed)
{
CompressedBuffer CompBuf(CompressedBuffer::FromCompressed(SharedBuffer(AttachmentBuffer)));
@@ -153,6 +301,7 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint
{
if (i == 0)
{
+ // First payload is always a compact binary object
Package.SetObject(LoadCompactBinaryObject(std::move(CompBuf)));
}
else
@@ -204,10 +353,8 @@ CbPackageReader::SetPayloadBufferCreator(std::function<IoBuffer(const IoHash& Ci
m_CreateBuffer = CreateBuffer;
}
-/** Process data
- */
uint64_t
-CbPackageReader::ProcessHeaderData(const void* Data, uint64_t DataBytes)
+CbPackageReader::ProcessPackageHeaderData(const void* Data, uint64_t DataBytes)
{
ZEN_ASSERT(m_CurrentState != State::kReadingBuffers);
@@ -232,6 +379,10 @@ CbPackageReader::ProcessHeaderData(const void* Data, uint64_t DataBytes)
for (CbAttachmentEntry& Entry : m_AttachmentEntries)
{
+ // This preallocates memory for payloads but note that for the local references
+ // the caller will need to handle the payload differently (i.e it's a
+ // CbAttachmentReferenceHeader not the actual payload)
+
m_PayloadBuffers.push_back(IoBuffer{Entry.PayloadSize});
}
@@ -244,6 +395,92 @@ CbPackageReader::ProcessHeaderData(const void* Data, uint64_t DataBytes)
}
}
+IoBuffer
+CbPackageReader::MarshalLocalChunkReference(IoBuffer AttachmentBuffer)
+{
+ // Marshal local reference - a "pointer" to the chunk backing file
+
+ ZEN_ASSERT(AttachmentBuffer.Size() >= sizeof(CbAttachmentReferenceHeader));
+
+ const CbAttachmentReferenceHeader* AttachRefHdr = AttachmentBuffer.Data<CbAttachmentReferenceHeader>();
+ const char8_t* PathPointer = reinterpret_cast<const char8_t*>(AttachRefHdr + 1);
+
+ ZEN_ASSERT(AttachmentBuffer.Size() >= (sizeof(CbAttachmentReferenceHeader) + AttachRefHdr->AbsolutePathLength));
+
+ std::filesystem::path Path{PathPointer};
+
+ IoBuffer ChunkReference = IoBufferBuilder::MakeFromFile(Path, AttachRefHdr->PayloadByteOffset, AttachRefHdr->PayloadByteSize);
+
+ if (!ChunkReference)
+ {
+ // Unable to open chunk reference
+
+ throw std::runtime_error(fmt::format("unable to resolve local reference to '{}' (offset {}, size {})",
+ PathToUtf8(Path),
+ AttachRefHdr->PayloadByteOffset,
+ AttachRefHdr->PayloadByteSize));
+ }
+
+ return ChunkReference;
+};
+
+void
+CbPackageReader::Finalize()
+{
+ if (m_AttachmentEntries.empty())
+ {
+ return;
+ }
+
+ m_Attachments.reserve(m_AttachmentEntries.size() - 1);
+
+ int CurrentAttachmentIndex = 0;
+ for (CbAttachmentEntry& Entry : m_AttachmentEntries)
+ {
+ IoBuffer AttachmentBuffer = m_PayloadBuffers[CurrentAttachmentIndex];
+
+ if (CurrentAttachmentIndex == 0)
+ {
+ // Root object
+ if (Entry.Flags & CbAttachmentEntry::kIsObject)
+ {
+ if (Entry.Flags & CbAttachmentEntry::kIsLocalRef)
+ {
+ m_RootObject = LoadCompactBinaryObject(MarshalLocalChunkReference(AttachmentBuffer));
+ }
+ else if (Entry.Flags & CbAttachmentEntry::kIsCompressed)
+ {
+ m_RootObject = LoadCompactBinaryObject(CompressedBuffer::FromCompressed(SharedBuffer(AttachmentBuffer)));
+ }
+ else
+ {
+ m_RootObject = LoadCompactBinaryObject(std::move(AttachmentBuffer));
+ }
+ }
+ else
+ {
+ throw std::runtime_error("missing or invalid root object");
+ }
+ }
+ else if (Entry.Flags & CbAttachmentEntry::kIsLocalRef)
+ {
+ IoBuffer ChunkReference = MarshalLocalChunkReference(AttachmentBuffer);
+
+ if (Entry.Flags & CbAttachmentEntry::kIsCompressed)
+ {
+ m_Attachments.push_back(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(ChunkReference))));
+ }
+ else
+ {
+ m_Attachments.push_back(CbAttachment(
+ CompressedBuffer::Compress(SharedBuffer(ChunkReference), OodleCompressor::NotSet, OodleCompressionLevel::None)));
+ }
+ }
+
+ ++CurrentAttachmentIndex;
+ }
+}
+
/**
______________________ _____________________________
\__ ___/\_ _____// _____/\__ ___/ _____/
@@ -291,15 +528,82 @@ TEST_CASE("CbPackage.Serialization")
};
CbPackageReader Reader;
- uint64_t InitialRead = Reader.ProcessHeaderData(nullptr, 0);
- uint64_t NextBytes = Reader.ProcessHeaderData(ConsumeBytes(InitialRead), InitialRead);
- NextBytes = Reader.ProcessHeaderData(ConsumeBytes(NextBytes), NextBytes);
+ uint64_t InitialRead = Reader.ProcessPackageHeaderData(nullptr, 0);
+ uint64_t NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(InitialRead), InitialRead);
+ NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(NextBytes), NextBytes);
+ auto Buffers = Reader.GetPayloadBuffers();
+
+ for (auto& PayloadBuffer : Buffers)
+ {
+ CopyBytes(PayloadBuffer.MutableData(), PayloadBuffer.GetSize());
+ }
+
+ Reader.Finalize();
+}
+
+TEST_CASE("CbPackage.LocalRef")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ auto Path1 = TempDir.Path() / "abcd";
+ auto Path2 = TempDir.Path() / "efgh";
+
+ {
+ IoBuffer Buffer1 = IoBufferBuilder::MakeCloneFromMemory(MakeMemoryView("abcd"));
+ IoBuffer Buffer2 = IoBufferBuilder::MakeCloneFromMemory(MakeMemoryView("efgh"));
+
+ WriteFile(Path1, Buffer1);
+ WriteFile(Path2, Buffer2);
+ }
+
+ // Make a test package
+
+ IoBuffer FileBuffer1 = IoBufferBuilder::MakeFromFile(Path1);
+ IoBuffer FileBuffer2 = IoBufferBuilder::MakeFromFile(Path2);
+
+ CbAttachment Attach1{SharedBuffer(FileBuffer1)};
+ CbAttachment Attach2{SharedBuffer(FileBuffer2)};
+
+ CbObjectWriter Cbo;
+ Cbo.AddAttachment("abcd", Attach1);
+ Cbo.AddAttachment("efgh", Attach2);
+
+ CbPackage Pkg;
+ Pkg.AddAttachment(Attach1);
+ Pkg.AddAttachment(Attach2);
+ Pkg.SetObject(Cbo.Save());
+
+ SharedBuffer Buffer = FormatPackageMessageBuffer(Pkg, FormatFlags::kAllowLocalReferences).Flatten();
+ const uint8_t* CursorPtr = reinterpret_cast<const uint8_t*>(Buffer.GetData());
+ uint64_t RemainingBytes = Buffer.GetSize();
+
+ auto ConsumeBytes = [&](uint64_t ByteCount) {
+ ZEN_ASSERT(ByteCount <= RemainingBytes);
+ void* ReturnPtr = (void*)CursorPtr;
+ CursorPtr += ByteCount;
+ RemainingBytes -= ByteCount;
+ return ReturnPtr;
+ };
+
+ auto CopyBytes = [&](void* TargetBuffer, uint64_t ByteCount) {
+ ZEN_ASSERT(ByteCount <= RemainingBytes);
+ memcpy(TargetBuffer, CursorPtr, ByteCount);
+ CursorPtr += ByteCount;
+ RemainingBytes -= ByteCount;
+ };
+
+ CbPackageReader Reader;
+ uint64_t InitialRead = Reader.ProcessPackageHeaderData(nullptr, 0);
+ uint64_t NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(InitialRead), InitialRead);
+ NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(NextBytes), NextBytes);
auto Buffers = Reader.GetPayloadBuffers();
for (auto& PayloadBuffer : Buffers)
{
CopyBytes(PayloadBuffer.MutableData(), PayloadBuffer.GetSize());
}
+
+ Reader.Finalize();
}
void