diff options
Diffstat (limited to 'zenhttp/httpshared.cpp')
| -rw-r--r-- | zenhttp/httpshared.cpp | 348 |
1 files changed, 326 insertions, 22 deletions
diff --git a/zenhttp/httpshared.cpp b/zenhttp/httpshared.cpp index f2ce17e16..7b92a0587 100644 --- a/zenhttp/httpshared.cpp +++ b/zenhttp/httpshared.cpp @@ -5,20 +5,35 @@ #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> #include <zencore/compositebuffer.h> +#include <zencore/filesystem.h> #include <zencore/iobuffer.h> #include <zencore/iohash.h> #include <zencore/stream.h> #include <zencore/testing.h> +#include <zencore/testutils.h> + +#include <zencore/fmtutils.h> #include <span> #include <vector> namespace zen { +std::vector<IoBuffer> +FormatPackageMessage(const CbPackage& Data) +{ + return FormatPackageMessage(Data, FormatFlags::kDefault); +} CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data) { - std::vector<IoBuffer> Message = FormatPackageMessage(Data); + return FormatPackageMessageBuffer(Data, FormatFlags::kDefault); +} + +CompositeBuffer +FormatPackageMessageBuffer(const CbPackage& Data, FormatFlags Flags) +{ + std::vector<IoBuffer> Message = FormatPackageMessage(Data, Flags); std::vector<SharedBuffer> Buffers; @@ -31,11 +46,11 @@ FormatPackageMessageBuffer(const CbPackage& Data) } std::vector<IoBuffer> -FormatPackageMessage(const CbPackage& Data) +FormatPackageMessage(const CbPackage& Data, FormatFlags Flags) { const std::span<const CbAttachment>& Attachments = Data.GetAttachments(); + std::vector<IoBuffer> ResponseBuffers; - std::vector<IoBuffer> ResponseBuffers; ResponseBuffers.reserve(3 + Attachments.size()); // TODO: may want to use an additional fudge factor here to avoid growing since each // attachment is likely to consist of several buffers @@ -47,9 +62,8 @@ FormatPackageMessage(const CbPackage& Data) // Attachment metadata array - IoBuffer AttachmentMetadataBuffer = IoBuffer{sizeof(CbAttachmentEntry) * (Attachments.size() + /* root */ 1)}; - - CbAttachmentEntry* AttachmentInfo = reinterpret_cast<CbAttachmentEntry*>(AttachmentMetadataBuffer.MutableData()); + IoBuffer AttachmentMetadataBuffer = IoBuffer{sizeof(CbAttachmentEntry) * (Attachments.size() + /* root */ 1)}; + CbAttachmentEntry* AttachmentInfo = reinterpret_cast<CbAttachmentEntry*>(AttachmentMetadataBuffer.MutableData()); ResponseBuffers.push_back(AttachmentMetadataBuffer); // Attachment metadata @@ -62,6 +76,44 @@ FormatPackageMessage(const CbPackage& Data) // Attachment payloads + auto MarshalLocal = [&AttachmentInfo, &ResponseBuffers](const std::string& Path8, + CbAttachmentReferenceHeader& LocalRef, + const IoHash& AttachmentHash, + bool IsCompressed) { + IoBuffer RefBuffer = IoBuffer(sizeof CbAttachmentReferenceHeader + Path8.size()); + + CbAttachmentReferenceHeader* RefHdr = RefBuffer.MutableData<CbAttachmentReferenceHeader>(); + *RefHdr++ = LocalRef; + memcpy(RefHdr, Path8.data(), Path8.size()); + + *AttachmentInfo++ = {.PayloadSize = RefBuffer.GetSize(), + .Flags = (IsCompressed ? CbAttachmentEntry::kIsCompressed : 0u) | CbAttachmentEntry::kIsLocalRef, + .AttachmentHash = AttachmentHash}; + + ResponseBuffers.push_back(std::move(RefBuffer)); + }; + + auto IsLocalRef = [](const CompositeBuffer& AttachmentBinary, CbAttachmentReferenceHeader& LocalRef, std::string& Path8) -> bool { + const SharedBuffer& Segment = AttachmentBinary.GetSegments().front(); + IoBufferFileReference Ref; + const IoBuffer& SegmentBuffer = Segment.AsIoBuffer(); + + if (!SegmentBuffer.GetFileReference(Ref)) + { + return false; + } + + ExtendablePathBuilder<256> LocalRefFile; + LocalRefFile.Append(std::filesystem::absolute(PathFromHandle(Ref.FileHandle))); + Path8 = LocalRefFile.ToUtf8(); + + LocalRef.AbsolutePathLength = gsl::narrow<uint16_t>(Path8.size()); + LocalRef.PayloadByteOffset = Ref.FileChunkOffset; + LocalRef.PayloadByteSize = Ref.FileChunkSize; + + return true; + }; + for (const CbAttachment& Attachment : Attachments) { if (Attachment.IsNull()) @@ -70,15 +122,39 @@ FormatPackageMessage(const CbPackage& Data) } else if (CompressedBuffer AttachmentBuffer = Attachment.AsCompressedBinary()) { - CompositeBuffer Compressed = AttachmentBuffer.GetCompressed(); + CompositeBuffer Compressed = AttachmentBuffer.GetCompressed(); + IoHash AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash()); + + // If the data is either not backed by a file, or there are multiple + // fragments then we cannot marshal it by local reference. We might + // want/need to extend this in the future to allow multiple chunk + // segments to be marshaled at once - *AttachmentInfo++ = {.PayloadSize = AttachmentBuffer.GetCompressedSize(), - .Flags = CbAttachmentEntry::kIsCompressed, - .AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash())}; + bool MarshalByLocalRef = EnumHasAllFlags(Flags, FormatFlags::kAllowLocalReferences) && (Compressed.GetSegments().size() == 1); - for (const SharedBuffer& Segment : Compressed.GetSegments()) + CbAttachmentReferenceHeader LocalRef; + std::string Path8; + + if (MarshalByLocalRef) { - ResponseBuffers.push_back(Segment.AsIoBuffer()); + MarshalByLocalRef = IsLocalRef(Compressed, LocalRef, Path8); + } + + if (MarshalByLocalRef) + { + const bool IsCompressed = true; + MarshalLocal(Path8, LocalRef, AttachmentHash, IsCompressed); + } + else + { + *AttachmentInfo++ = {.PayloadSize = AttachmentBuffer.GetCompressedSize(), + .Flags = CbAttachmentEntry::kIsCompressed, + .AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash())}; + + for (const SharedBuffer& Segment : Compressed.GetSegments()) + { + ResponseBuffers.push_back(Segment.AsIoBuffer()); + } } } else if (CbObject AttachmentObject = Attachment.AsObject()) @@ -92,11 +168,31 @@ FormatPackageMessage(const CbPackage& Data) } else if (CompositeBuffer AttachmentBinary = Attachment.AsCompositeBinary()) { - *AttachmentInfo++ = {.PayloadSize = AttachmentBinary.GetSize(), .Flags = 0, .AttachmentHash = Attachment.GetHash()}; + IoHash AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash()); + bool MarshalByLocalRef = + EnumHasAllFlags(Flags, FormatFlags::kAllowLocalReferences) && (AttachmentBinary.GetSegments().size() == 1); + + CbAttachmentReferenceHeader LocalRef; + std::string Path8; + + if (MarshalByLocalRef) + { + MarshalByLocalRef = IsLocalRef(AttachmentBinary, LocalRef, Path8); + } - for (const SharedBuffer& Segment : AttachmentBinary.GetSegments()) + if (MarshalByLocalRef) { - ResponseBuffers.push_back(Segment.AsIoBuffer()); + const bool IsCompressed = false; + MarshalLocal(Path8, LocalRef, AttachmentHash, IsCompressed); + } + else + { + *AttachmentInfo++ = {.PayloadSize = AttachmentBinary.GetSize(), .Flags = 0, .AttachmentHash = Attachment.GetHash()}; + + for (const SharedBuffer& Segment : AttachmentBinary.GetSegments()) + { + ResponseBuffers.push_back(Segment.AsIoBuffer()); + } } } else @@ -108,6 +204,27 @@ FormatPackageMessage(const CbPackage& Data) return ResponseBuffers; } +bool +IsPackageMessage(IoBuffer Payload) +{ + if (!Payload) + { + return false; + } + + BinaryReader Reader(Payload); + + CbPackageHeader Hdr; + Reader.Read(&Hdr, sizeof Hdr); + + if (Hdr.HeaderMagic != kCbPkgMagic) + { + return false; + } + + return true; +} + CbPackage ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint64_t)> CreateBuffer) { @@ -145,7 +262,38 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint Reader.Read(AttachmentBuffer.MutableData(), AttachmentSize); - if (Entry.Flags & CbAttachmentEntry::kIsCompressed) + if (Entry.Flags & CbAttachmentEntry::kIsLocalRef) + { + // Marshal local reference - a "pointer" to the chunk backing file + + ZEN_ASSERT(AttachmentBuffer.Size() >= sizeof(CbAttachmentReferenceHeader)); + + const CbAttachmentReferenceHeader* AttachRefHdr = AttachmentBuffer.Data<CbAttachmentReferenceHeader>(); + const char8_t* PathPointer = reinterpret_cast<const char8_t*>(AttachRefHdr + 1); + + ZEN_ASSERT(AttachmentBuffer.Size() >= (sizeof(CbAttachmentReferenceHeader) + AttachRefHdr->AbsolutePathLength)); + + std::filesystem::path Path{PathPointer}; + + if (IoBuffer ChunkReference = + IoBufferBuilder::MakeFromFile(Path, AttachRefHdr->PayloadByteOffset, AttachRefHdr->PayloadByteSize)) + { + CompressedBuffer CompBuf(CompressedBuffer::FromCompressed(SharedBuffer(ChunkReference))); + CbAttachment Attachment(std::move(CompBuf)); + Package.AddAttachment(Attachment); + } + else + { + // Unable to open chunk reference + + throw std::runtime_error(fmt::format("unable to resolve chunk #{} at '{}' (offset {}, size {})", + i, + PathToUtf8(Path), + AttachRefHdr->PayloadByteOffset, + AttachRefHdr->PayloadByteSize)); + } + } + else if (Entry.Flags & CbAttachmentEntry::kIsCompressed) { CompressedBuffer CompBuf(CompressedBuffer::FromCompressed(SharedBuffer(AttachmentBuffer))); @@ -153,6 +301,7 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint { if (i == 0) { + // First payload is always a compact binary object Package.SetObject(LoadCompactBinaryObject(std::move(CompBuf))); } else @@ -204,10 +353,8 @@ CbPackageReader::SetPayloadBufferCreator(std::function<IoBuffer(const IoHash& Ci m_CreateBuffer = CreateBuffer; } -/** Process data - */ uint64_t -CbPackageReader::ProcessHeaderData(const void* Data, uint64_t DataBytes) +CbPackageReader::ProcessPackageHeaderData(const void* Data, uint64_t DataBytes) { ZEN_ASSERT(m_CurrentState != State::kReadingBuffers); @@ -232,6 +379,10 @@ CbPackageReader::ProcessHeaderData(const void* Data, uint64_t DataBytes) for (CbAttachmentEntry& Entry : m_AttachmentEntries) { + // This preallocates memory for payloads but note that for the local references + // the caller will need to handle the payload differently (i.e it's a + // CbAttachmentReferenceHeader not the actual payload) + m_PayloadBuffers.push_back(IoBuffer{Entry.PayloadSize}); } @@ -244,6 +395,92 @@ CbPackageReader::ProcessHeaderData(const void* Data, uint64_t DataBytes) } } +IoBuffer +CbPackageReader::MarshalLocalChunkReference(IoBuffer AttachmentBuffer) +{ + // Marshal local reference - a "pointer" to the chunk backing file + + ZEN_ASSERT(AttachmentBuffer.Size() >= sizeof(CbAttachmentReferenceHeader)); + + const CbAttachmentReferenceHeader* AttachRefHdr = AttachmentBuffer.Data<CbAttachmentReferenceHeader>(); + const char8_t* PathPointer = reinterpret_cast<const char8_t*>(AttachRefHdr + 1); + + ZEN_ASSERT(AttachmentBuffer.Size() >= (sizeof(CbAttachmentReferenceHeader) + AttachRefHdr->AbsolutePathLength)); + + std::filesystem::path Path{PathPointer}; + + IoBuffer ChunkReference = IoBufferBuilder::MakeFromFile(Path, AttachRefHdr->PayloadByteOffset, AttachRefHdr->PayloadByteSize); + + if (!ChunkReference) + { + // Unable to open chunk reference + + throw std::runtime_error(fmt::format("unable to resolve local reference to '{}' (offset {}, size {})", + PathToUtf8(Path), + AttachRefHdr->PayloadByteOffset, + AttachRefHdr->PayloadByteSize)); + } + + return ChunkReference; +}; + +void +CbPackageReader::Finalize() +{ + if (m_AttachmentEntries.empty()) + { + return; + } + + m_Attachments.reserve(m_AttachmentEntries.size() - 1); + + int CurrentAttachmentIndex = 0; + for (CbAttachmentEntry& Entry : m_AttachmentEntries) + { + IoBuffer AttachmentBuffer = m_PayloadBuffers[CurrentAttachmentIndex]; + + if (CurrentAttachmentIndex == 0) + { + // Root object + if (Entry.Flags & CbAttachmentEntry::kIsObject) + { + if (Entry.Flags & CbAttachmentEntry::kIsLocalRef) + { + m_RootObject = LoadCompactBinaryObject(MarshalLocalChunkReference(AttachmentBuffer)); + } + else if (Entry.Flags & CbAttachmentEntry::kIsCompressed) + { + m_RootObject = LoadCompactBinaryObject(CompressedBuffer::FromCompressed(SharedBuffer(AttachmentBuffer))); + } + else + { + m_RootObject = LoadCompactBinaryObject(std::move(AttachmentBuffer)); + } + } + else + { + throw std::runtime_error("missing or invalid root object"); + } + } + else if (Entry.Flags & CbAttachmentEntry::kIsLocalRef) + { + IoBuffer ChunkReference = MarshalLocalChunkReference(AttachmentBuffer); + + if (Entry.Flags & CbAttachmentEntry::kIsCompressed) + { + m_Attachments.push_back(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(ChunkReference)))); + } + else + { + m_Attachments.push_back(CbAttachment( + CompressedBuffer::Compress(SharedBuffer(ChunkReference), OodleCompressor::NotSet, OodleCompressionLevel::None))); + } + } + + ++CurrentAttachmentIndex; + } +} + /** ______________________ _____________________________ \__ ___/\_ _____// _____/\__ ___/ _____/ @@ -291,15 +528,82 @@ TEST_CASE("CbPackage.Serialization") }; CbPackageReader Reader; - uint64_t InitialRead = Reader.ProcessHeaderData(nullptr, 0); - uint64_t NextBytes = Reader.ProcessHeaderData(ConsumeBytes(InitialRead), InitialRead); - NextBytes = Reader.ProcessHeaderData(ConsumeBytes(NextBytes), NextBytes); + uint64_t InitialRead = Reader.ProcessPackageHeaderData(nullptr, 0); + uint64_t NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(InitialRead), InitialRead); + NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(NextBytes), NextBytes); + auto Buffers = Reader.GetPayloadBuffers(); + + for (auto& PayloadBuffer : Buffers) + { + CopyBytes(PayloadBuffer.MutableData(), PayloadBuffer.GetSize()); + } + + Reader.Finalize(); +} + +TEST_CASE("CbPackage.LocalRef") +{ + ScopedTemporaryDirectory TempDir; + + auto Path1 = TempDir.Path() / "abcd"; + auto Path2 = TempDir.Path() / "efgh"; + + { + IoBuffer Buffer1 = IoBufferBuilder::MakeCloneFromMemory(MakeMemoryView("abcd")); + IoBuffer Buffer2 = IoBufferBuilder::MakeCloneFromMemory(MakeMemoryView("efgh")); + + WriteFile(Path1, Buffer1); + WriteFile(Path2, Buffer2); + } + + // Make a test package + + IoBuffer FileBuffer1 = IoBufferBuilder::MakeFromFile(Path1); + IoBuffer FileBuffer2 = IoBufferBuilder::MakeFromFile(Path2); + + CbAttachment Attach1{SharedBuffer(FileBuffer1)}; + CbAttachment Attach2{SharedBuffer(FileBuffer2)}; + + CbObjectWriter Cbo; + Cbo.AddAttachment("abcd", Attach1); + Cbo.AddAttachment("efgh", Attach2); + + CbPackage Pkg; + Pkg.AddAttachment(Attach1); + Pkg.AddAttachment(Attach2); + Pkg.SetObject(Cbo.Save()); + + SharedBuffer Buffer = FormatPackageMessageBuffer(Pkg, FormatFlags::kAllowLocalReferences).Flatten(); + const uint8_t* CursorPtr = reinterpret_cast<const uint8_t*>(Buffer.GetData()); + uint64_t RemainingBytes = Buffer.GetSize(); + + auto ConsumeBytes = [&](uint64_t ByteCount) { + ZEN_ASSERT(ByteCount <= RemainingBytes); + void* ReturnPtr = (void*)CursorPtr; + CursorPtr += ByteCount; + RemainingBytes -= ByteCount; + return ReturnPtr; + }; + + auto CopyBytes = [&](void* TargetBuffer, uint64_t ByteCount) { + ZEN_ASSERT(ByteCount <= RemainingBytes); + memcpy(TargetBuffer, CursorPtr, ByteCount); + CursorPtr += ByteCount; + RemainingBytes -= ByteCount; + }; + + CbPackageReader Reader; + uint64_t InitialRead = Reader.ProcessPackageHeaderData(nullptr, 0); + uint64_t NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(InitialRead), InitialRead); + NextBytes = Reader.ProcessPackageHeaderData(ConsumeBytes(NextBytes), NextBytes); auto Buffers = Reader.GetPayloadBuffers(); for (auto& PayloadBuffer : Buffers) { CopyBytes(PayloadBuffer.MutableData(), PayloadBuffer.GetSize()); } + + Reader.Finalize(); } void |