diff options
| author | Stefan Boberg <[email protected]> | 2021-09-28 21:58:40 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2021-09-28 21:58:40 +0200 |
| commit | 1caecce8474bd55986d4233614336eb627135504 (patch) | |
| tree | de7b0bc8799295d1257a29ff5a49148e0bae321d | |
| parent | Removed MemoryOutStream, MemoryInStream (diff) | |
| download | zen-1caecce8474bd55986d4233614336eb627135504.tar.xz zen-1caecce8474bd55986d4233614336eb627135504.zip | |
Added preliminary CbPackageReader, for handling incremental compact binary package streaming
| -rw-r--r-- | zenhttp/httpshared.cpp | 138 | ||||
| -rw-r--r-- | zenhttp/httpshared.h | 51 | ||||
| -rw-r--r-- | zenhttp/include/zenhttp/httpshared.h | 55 | ||||
| -rw-r--r-- | zenhttp/zenhttp.cpp | 4 | ||||
| -rw-r--r-- | zenhttp/zenhttp.vcxproj | 2 | ||||
| -rw-r--r-- | zenhttp/zenhttp.vcxproj.filters | 2 |
6 files changed, 186 insertions, 66 deletions
diff --git a/zenhttp/httpshared.cpp b/zenhttp/httpshared.cpp index b0c5493db..c703409af 100644 --- a/zenhttp/httpshared.cpp +++ b/zenhttp/httpshared.cpp @@ -2,11 +2,13 @@ #include <zenhttp/httpshared.h> +#include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> #include <zencore/compositebuffer.h> #include <zencore/iobuffer.h> #include <zencore/iohash.h> #include <zencore/stream.h> +#include <zencore/testing.h> #include <span> #include <vector> @@ -58,9 +60,7 @@ FormatPackageMessage(const CbPackage& Data) IoBuffer RootIoBuffer = Data.GetObject().GetBuffer().AsIoBuffer(); ResponseBuffers.push_back(RootIoBuffer); // Root object - *AttachmentInfo++ = {.AttachmentSize = RootIoBuffer.Size(), - .Flags = CbAttachmentEntry::kIsObject, - .AttachmentHash = Data.GetObjectHash()}; + *AttachmentInfo++ = {.PayloadSize = RootIoBuffer.Size(), .Flags = CbAttachmentEntry::kIsObject, .AttachmentHash = Data.GetObjectHash()}; // Attachment payloads @@ -74,7 +74,7 @@ FormatPackageMessage(const CbPackage& Data) { CompositeBuffer Compressed = AttachmentBuffer.GetCompressed(); - *AttachmentInfo++ = {.AttachmentSize = AttachmentBuffer.GetCompressedSize(), + *AttachmentInfo++ = {.PayloadSize = AttachmentBuffer.GetCompressedSize(), .Flags = CbAttachmentEntry::kIsCompressed, .AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash())}; @@ -89,13 +89,13 @@ FormatPackageMessage(const CbPackage& Data) IoBuffer ObjIoBuffer = AttachmentObject.GetBuffer().AsIoBuffer(); ResponseBuffers.push_back(ObjIoBuffer); - *AttachmentInfo++ = {.AttachmentSize = ObjIoBuffer.Size(), + *AttachmentInfo++ = {.PayloadSize = ObjIoBuffer.Size(), .Flags = CbAttachmentEntry::kIsObject, .AttachmentHash = Attachment.GetHash()}; } else if (CompositeBuffer AttachmentBinary = Attachment.AsCompositeBinary()) { - *AttachmentInfo++ = {.AttachmentSize = AttachmentBinary.GetSize(), .Flags = 0, .AttachmentHash = Attachment.GetHash()}; + *AttachmentInfo++ = {.PayloadSize = AttachmentBinary.GetSize(), .Flags = 0, .AttachmentHash = Attachment.GetHash()}; for (const SharedBuffer& Segment : AttachmentBinary.GetSegments()) { @@ -120,8 +120,7 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint return {}; } - MemoryInStream InStream(Payload); - BinaryReader Reader(InStream); + BinaryReader Reader(Payload); CbPackageHeader Hdr; Reader.Read(&Hdr, sizeof Hdr); @@ -142,7 +141,7 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint for (uint32_t i = 0; i < ChunkCount; ++i) { const CbAttachmentEntry& Entry = AttachmentEntries[i]; - const uint64_t AttachmentSize = Entry.AttachmentSize; + const uint64_t AttachmentSize = Entry.PayloadSize; IoBuffer AttachmentBuffer = CreateBuffer(Entry.AttachmentHash, AttachmentSize); ZEN_ASSERT(AttachmentBuffer); @@ -195,4 +194,123 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint return Package; } -} // namespace zen
\ No newline at end of file +CbPackageReader::CbPackageReader() : m_CreateBuffer([](const IoHash&, uint64_t Size) -> IoBuffer { return IoBuffer{Size}; }) +{ +} + +CbPackageReader::~CbPackageReader() +{ +} + +void +CbPackageReader::SetPayloadBufferCreator(std::function<IoBuffer(const IoHash& Cid, uint64_t Size)> CreateBuffer) +{ + m_CreateBuffer = CreateBuffer; +} + +/** Process data + */ +uint64_t +CbPackageReader::ProcessHeaderData(const void* Data, uint64_t DataBytes) +{ + ZEN_ASSERT(m_CurrentState != State::kReadingBuffers); + + switch (m_CurrentState) + { + case State::kInitialState: + ZEN_ASSERT(Data == nullptr); + m_CurrentState = State::kReadingHeader; + return sizeof m_PackageHeader; + + case State::kReadingHeader: + ZEN_ASSERT(DataBytes == sizeof m_PackageHeader); + memcpy(&m_PackageHeader, Data, sizeof m_PackageHeader); + ZEN_ASSERT(m_PackageHeader.HeaderMagic == kCbPkgMagic); + m_CurrentState = State::kReadingAttachmentEntries; + m_AttachmentEntries.resize(m_PackageHeader.AttachmentCount + 1); + return (m_PackageHeader.AttachmentCount + 1) * sizeof(CbAttachmentEntry); + + case State::kReadingAttachmentEntries: + ZEN_ASSERT(DataBytes == ((m_PackageHeader.AttachmentCount + 1) * sizeof(CbAttachmentEntry))); + memcpy(m_AttachmentEntries.data(), Data, DataBytes); + + for (CbAttachmentEntry& Entry : m_AttachmentEntries) + { + m_PayloadBuffers.push_back(IoBuffer{Entry.PayloadSize}); + } + + m_CurrentState = State::kReadingBuffers; + return 0; + + default: + ZEN_ASSERT(false); + return 0; + } +} + +/** + ______________________ _____________________________ + \__ ___/\_ _____// _____/\__ ___/ _____/ + | | | __)_ \_____ \ | | \_____ \ + | | | \/ \ | | / \ + |____| /_______ /_______ / |____| /_______ / + \/ \/ \/ + */ + +#if ZEN_WITH_TESTS + +TEST_CASE("CbPackage.Serialization") +{ + // Make a test package + + CbAttachment Attach1{SharedBuffer::MakeView(MakeMemoryView("abcd"))}; + CbAttachment Attach2{SharedBuffer::MakeView(MakeMemoryView("efgh"))}; + + CbObjectWriter Cbo; + Cbo.AddAttachment("abcd", Attach1); + Cbo.AddAttachment("efgh", Attach2); + + CbPackage Pkg; + Pkg.AddAttachment(Attach1); + Pkg.AddAttachment(Attach2); + Pkg.SetObject(Cbo.Save()); + + SharedBuffer Buffer = FormatPackageMessageBuffer(Pkg).Flatten(); + const uint8_t* CursorPtr = reinterpret_cast<const uint8_t*>(Buffer.GetData()); + uint64_t RemainingBytes = Buffer.GetSize(); + + auto ConsumeBytes = [&](uint64_t ByteCount) { + ZEN_ASSERT(ByteCount <= RemainingBytes); + void* ReturnPtr = (void*)CursorPtr; + CursorPtr += ByteCount; + RemainingBytes -= ByteCount; + return ReturnPtr; + }; + + auto CopyBytes = [&](void* TargetBuffer, uint64_t ByteCount) { + ZEN_ASSERT(ByteCount <= RemainingBytes); + memcpy(TargetBuffer, CursorPtr, ByteCount); + CursorPtr += ByteCount; + RemainingBytes -= ByteCount; + }; + + CbPackageReader Reader; + uint64_t InitialRead = Reader.ProcessHeaderData(nullptr, 0); + uint64_t NextBytes = Reader.ProcessHeaderData(ConsumeBytes(InitialRead), InitialRead); + NextBytes = Reader.ProcessHeaderData(ConsumeBytes(NextBytes), NextBytes); + auto Buffers = Reader.GetPayloadBuffers(); + + for (auto& PayloadBuffer : Buffers) + { + CopyBytes(PayloadBuffer.MutableData(), PayloadBuffer.GetSize()); + } +} + +void +forcelink_httpshared() +{ +} + +#endif + +} // namespace zen diff --git a/zenhttp/httpshared.h b/zenhttp/httpshared.h deleted file mode 100644 index 92c1ef9c6..000000000 --- a/zenhttp/httpshared.h +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/iobuffer.h> -#include <zencore/iohash.h> - -#include <functional> - -namespace zen { - -class IoBuffer; -class CbPackage; -class CompositeBuffer; - -struct CbPackageHeader -{ - uint32_t HeaderMagic; - uint32_t AttachmentCount; - uint32_t Reserved1; - uint32_t Reserved2; -}; - -static_assert(sizeof(CbPackageHeader) == 16); - -static constinit uint32_t kCbPkgMagic = 0xaa77aacc; - -struct CbAttachmentEntry -{ - uint64_t AttachmentSize; - uint32_t Flags; - IoHash AttachmentHash; - - enum - { - kIsCompressed = (1u << 0), // Is marshaled using compressed buffer storage format - kIsObject = (1u << 1), // Is compact binary object - }; -}; - -static_assert(sizeof(CbAttachmentEntry) == 32); - -std::vector<IoBuffer> FormatPackageMessage(const CbPackage& Data); -CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data); -CbPackage ParsePackageMessage( - IoBuffer Payload, - std::function<IoBuffer(const IoHash& Cid, uint64_t Size)> CreateBuffer = [](const IoHash&, uint64_t Size) -> IoBuffer { - return IoBuffer{Size}; - }); - -} // namespace zen diff --git a/zenhttp/include/zenhttp/httpshared.h b/zenhttp/include/zenhttp/httpshared.h index 92c1ef9c6..2e728577d 100644 --- a/zenhttp/include/zenhttp/httpshared.h +++ b/zenhttp/include/zenhttp/httpshared.h @@ -13,10 +13,23 @@ class IoBuffer; class CbPackage; class CompositeBuffer; +/** _____ _ _____ _ + / ____| | | __ \ | | + | | | |__ | |__) |_ _ ___| | ____ _ __ _ ___ + | | | '_ \| ___/ _` |/ __| |/ / _` |/ _` |/ _ \ + | |____| |_) | | | (_| | (__| < (_| | (_| | __/ + \_____|_.__/|_| \__,_|\___|_|\_\__,_|\__, |\___| + __/ | + |___/ + + Structures and code related to handling CbPackage transactions + + */ + struct CbPackageHeader { uint32_t HeaderMagic; - uint32_t AttachmentCount; + uint32_t AttachmentCount; // TODO: should add ability to opt out of implicit root document? uint32_t Reserved1; uint32_t Reserved2; }; @@ -27,7 +40,7 @@ static constinit uint32_t kCbPkgMagic = 0xaa77aacc; struct CbAttachmentEntry { - uint64_t AttachmentSize; + uint64_t PayloadSize; uint32_t Flags; IoHash AttachmentHash; @@ -35,6 +48,7 @@ struct CbAttachmentEntry { kIsCompressed = (1u << 0), // Is marshaled using compressed buffer storage format kIsObject = (1u << 1), // Is compact binary object + kIsError = (1u << 2), // Is error (compact binary formatted) object }; }; @@ -48,4 +62,41 @@ CbPackage ParsePackageMessage( return IoBuffer{Size}; }); +/** Streaming reader for compact binary packages + + The goal is to ultimately support zero-copy I/O, but for now there'll be some + copying involved on some platforms at least. + + */ +class CbPackageReader +{ +public: + CbPackageReader(); + ~CbPackageReader(); + + void SetPayloadBufferCreator(std::function<IoBuffer(const IoHash& Cid, uint64_t Size)> CreateBuffer); + + /** Process header data + */ + uint64_t ProcessHeaderData(const void* Data, uint64_t DataBytes); + + std::span<IoBuffer> GetPayloadBuffers() { return m_PayloadBuffers; } + +private: + enum class State + { + kInitialState, + kReadingHeader, + kReadingAttachmentEntries, + kReadingBuffers + } m_CurrentState = State::kInitialState; + + std::function<IoBuffer(const IoHash& Cid, uint64_t Size)> m_CreateBuffer; + std::vector<IoBuffer> m_PayloadBuffers; + std::vector<CbAttachmentEntry> m_AttachmentEntries; + CbPackageHeader m_PackageHeader; +}; + +void forcelink_httpshared(); + } // namespace zen diff --git a/zenhttp/zenhttp.cpp b/zenhttp/zenhttp.cpp index 637486f55..0194abdcb 100644 --- a/zenhttp/zenhttp.cpp +++ b/zenhttp/zenhttp.cpp @@ -3,6 +3,7 @@ #include <zenhttp/zenhttp.h> #include <zenhttp/httpserver.h> +#include <zenhttp/httpshared.h> namespace zen { @@ -10,6 +11,7 @@ void zenhttp_forcelinktests() { http_forcelink(); + forcelink_httpshared(); } -} // namespace zen
\ No newline at end of file +} // namespace zen diff --git a/zenhttp/zenhttp.vcxproj b/zenhttp/zenhttp.vcxproj index eca9898d3..899cf4bd1 100644 --- a/zenhttp/zenhttp.vcxproj +++ b/zenhttp/zenhttp.vcxproj @@ -104,12 +104,12 @@ </ItemGroup> <ItemGroup> <ClInclude Include="httpnull.h" /> - <ClInclude Include="httpshared.h" /> <ClInclude Include="httpsys.h" /> <ClInclude Include="httpuws.h" /> <ClInclude Include="include\zenhttp\httpclient.h" /> <ClInclude Include="include\zenhttp\httpcommon.h" /> <ClInclude Include="include\zenhttp\httpserver.h" /> + <ClInclude Include="include\zenhttp\httpshared.h" /> <ClInclude Include="include\zenhttp\zenhttp.h" /> <ClInclude Include="iothreadpool.h" /> </ItemGroup> diff --git a/zenhttp/zenhttp.vcxproj.filters b/zenhttp/zenhttp.vcxproj.filters index 17f71bed1..2e968055c 100644 --- a/zenhttp/zenhttp.vcxproj.filters +++ b/zenhttp/zenhttp.vcxproj.filters @@ -18,8 +18,8 @@ <ClInclude Include="include\zenhttp\zenhttp.h" /> <ClInclude Include="httpnull.h" /> <ClInclude Include="httpuws.h" /> - <ClInclude Include="httpshared.h" /> <ClInclude Include="include\zenhttp\httpcommon.h" /> + <ClInclude Include="include\zenhttp\httpshared.h" /> </ItemGroup> <ItemGroup> <None Include="xmake.lua" /> |