From 1caecce8474bd55986d4233614336eb627135504 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Tue, 28 Sep 2021 21:58:40 +0200 Subject: Added preliminary CbPackageReader, for handling incremental compact binary package streaming --- zenhttp/httpshared.cpp | 138 +++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 128 insertions(+), 10 deletions(-) (limited to 'zenhttp/httpshared.cpp') diff --git a/zenhttp/httpshared.cpp b/zenhttp/httpshared.cpp index b0c5493db..c703409af 100644 --- a/zenhttp/httpshared.cpp +++ b/zenhttp/httpshared.cpp @@ -2,11 +2,13 @@ #include +#include #include #include #include #include #include +#include #include #include @@ -58,9 +60,7 @@ FormatPackageMessage(const CbPackage& Data) IoBuffer RootIoBuffer = Data.GetObject().GetBuffer().AsIoBuffer(); ResponseBuffers.push_back(RootIoBuffer); // Root object - *AttachmentInfo++ = {.AttachmentSize = RootIoBuffer.Size(), - .Flags = CbAttachmentEntry::kIsObject, - .AttachmentHash = Data.GetObjectHash()}; + *AttachmentInfo++ = {.PayloadSize = RootIoBuffer.Size(), .Flags = CbAttachmentEntry::kIsObject, .AttachmentHash = Data.GetObjectHash()}; // Attachment payloads @@ -74,7 +74,7 @@ FormatPackageMessage(const CbPackage& Data) { CompositeBuffer Compressed = AttachmentBuffer.GetCompressed(); - *AttachmentInfo++ = {.AttachmentSize = AttachmentBuffer.GetCompressedSize(), + *AttachmentInfo++ = {.PayloadSize = AttachmentBuffer.GetCompressedSize(), .Flags = CbAttachmentEntry::kIsCompressed, .AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash())}; @@ -89,13 +89,13 @@ FormatPackageMessage(const CbPackage& Data) IoBuffer ObjIoBuffer = AttachmentObject.GetBuffer().AsIoBuffer(); ResponseBuffers.push_back(ObjIoBuffer); - *AttachmentInfo++ = {.AttachmentSize = ObjIoBuffer.Size(), + *AttachmentInfo++ = {.PayloadSize = ObjIoBuffer.Size(), .Flags = CbAttachmentEntry::kIsObject, .AttachmentHash = Attachment.GetHash()}; } else if (CompositeBuffer AttachmentBinary = Attachment.AsCompositeBinary()) { - *AttachmentInfo++ = {.AttachmentSize = AttachmentBinary.GetSize(), .Flags = 0, .AttachmentHash = Attachment.GetHash()}; + *AttachmentInfo++ = {.PayloadSize = AttachmentBinary.GetSize(), .Flags = 0, .AttachmentHash = Attachment.GetHash()}; for (const SharedBuffer& Segment : AttachmentBinary.GetSegments()) { @@ -120,8 +120,7 @@ ParsePackageMessage(IoBuffer Payload, std::function IoBuffer { return IoBuffer{Size}; }) +{ +} + +CbPackageReader::~CbPackageReader() +{ +} + +void +CbPackageReader::SetPayloadBufferCreator(std::function CreateBuffer) +{ + m_CreateBuffer = CreateBuffer; +} + +/** Process data + */ +uint64_t +CbPackageReader::ProcessHeaderData(const void* Data, uint64_t DataBytes) +{ + ZEN_ASSERT(m_CurrentState != State::kReadingBuffers); + + switch (m_CurrentState) + { + case State::kInitialState: + ZEN_ASSERT(Data == nullptr); + m_CurrentState = State::kReadingHeader; + return sizeof m_PackageHeader; + + case State::kReadingHeader: + ZEN_ASSERT(DataBytes == sizeof m_PackageHeader); + memcpy(&m_PackageHeader, Data, sizeof m_PackageHeader); + ZEN_ASSERT(m_PackageHeader.HeaderMagic == kCbPkgMagic); + m_CurrentState = State::kReadingAttachmentEntries; + m_AttachmentEntries.resize(m_PackageHeader.AttachmentCount + 1); + return (m_PackageHeader.AttachmentCount + 1) * sizeof(CbAttachmentEntry); + + case State::kReadingAttachmentEntries: + ZEN_ASSERT(DataBytes == ((m_PackageHeader.AttachmentCount + 1) * sizeof(CbAttachmentEntry))); + memcpy(m_AttachmentEntries.data(), Data, DataBytes); + + for (CbAttachmentEntry& Entry : m_AttachmentEntries) + { + m_PayloadBuffers.push_back(IoBuffer{Entry.PayloadSize}); + } + + m_CurrentState = State::kReadingBuffers; + return 0; + + default: + ZEN_ASSERT(false); + return 0; + } +} + +/** + ______________________ _____________________________ + \__ ___/\_ _____// _____/\__ ___/ _____/ + | | | __)_ \_____ \ | | \_____ \ + | | | \/ \ | | / \ + |____| /_______ /_______ / |____| /_______ / + \/ \/ \/ + */ + +#if ZEN_WITH_TESTS + +TEST_CASE("CbPackage.Serialization") +{ + // Make a test package + + CbAttachment Attach1{SharedBuffer::MakeView(MakeMemoryView("abcd"))}; + CbAttachment Attach2{SharedBuffer::MakeView(MakeMemoryView("efgh"))}; + + CbObjectWriter Cbo; + Cbo.AddAttachment("abcd", Attach1); + Cbo.AddAttachment("efgh", Attach2); + + CbPackage Pkg; + Pkg.AddAttachment(Attach1); + Pkg.AddAttachment(Attach2); + Pkg.SetObject(Cbo.Save()); + + SharedBuffer Buffer = FormatPackageMessageBuffer(Pkg).Flatten(); + const uint8_t* CursorPtr = reinterpret_cast(Buffer.GetData()); + uint64_t RemainingBytes = Buffer.GetSize(); + + auto ConsumeBytes = [&](uint64_t ByteCount) { + ZEN_ASSERT(ByteCount <= RemainingBytes); + void* ReturnPtr = (void*)CursorPtr; + CursorPtr += ByteCount; + RemainingBytes -= ByteCount; + return ReturnPtr; + }; + + auto CopyBytes = [&](void* TargetBuffer, uint64_t ByteCount) { + ZEN_ASSERT(ByteCount <= RemainingBytes); + memcpy(TargetBuffer, CursorPtr, ByteCount); + CursorPtr += ByteCount; + RemainingBytes -= ByteCount; + }; + + CbPackageReader Reader; + uint64_t InitialRead = Reader.ProcessHeaderData(nullptr, 0); + uint64_t NextBytes = Reader.ProcessHeaderData(ConsumeBytes(InitialRead), InitialRead); + NextBytes = Reader.ProcessHeaderData(ConsumeBytes(NextBytes), NextBytes); + auto Buffers = Reader.GetPayloadBuffers(); + + for (auto& PayloadBuffer : Buffers) + { + CopyBytes(PayloadBuffer.MutableData(), PayloadBuffer.GetSize()); + } +} + +void +forcelink_httpshared() +{ +} + +#endif + +} // namespace zen -- cgit v1.2.3