aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2021-09-28 21:58:40 +0200
committerStefan Boberg <[email protected]>2021-09-28 21:58:40 +0200
commit1caecce8474bd55986d4233614336eb627135504 (patch)
treede7b0bc8799295d1257a29ff5a49148e0bae321d
parentRemoved MemoryOutStream, MemoryInStream (diff)
downloadzen-1caecce8474bd55986d4233614336eb627135504.tar.xz
zen-1caecce8474bd55986d4233614336eb627135504.zip
Added preliminary CbPackageReader, for handling incremental compact binary package streaming
-rw-r--r--zenhttp/httpshared.cpp138
-rw-r--r--zenhttp/httpshared.h51
-rw-r--r--zenhttp/include/zenhttp/httpshared.h55
-rw-r--r--zenhttp/zenhttp.cpp4
-rw-r--r--zenhttp/zenhttp.vcxproj2
-rw-r--r--zenhttp/zenhttp.vcxproj.filters2
6 files changed, 186 insertions, 66 deletions
diff --git a/zenhttp/httpshared.cpp b/zenhttp/httpshared.cpp
index b0c5493db..c703409af 100644
--- a/zenhttp/httpshared.cpp
+++ b/zenhttp/httpshared.cpp
@@ -2,11 +2,13 @@
#include <zenhttp/httpshared.h>
+#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
#include <zencore/compositebuffer.h>
#include <zencore/iobuffer.h>
#include <zencore/iohash.h>
#include <zencore/stream.h>
+#include <zencore/testing.h>
#include <span>
#include <vector>
@@ -58,9 +60,7 @@ FormatPackageMessage(const CbPackage& Data)
IoBuffer RootIoBuffer = Data.GetObject().GetBuffer().AsIoBuffer();
ResponseBuffers.push_back(RootIoBuffer); // Root object
- *AttachmentInfo++ = {.AttachmentSize = RootIoBuffer.Size(),
- .Flags = CbAttachmentEntry::kIsObject,
- .AttachmentHash = Data.GetObjectHash()};
+ *AttachmentInfo++ = {.PayloadSize = RootIoBuffer.Size(), .Flags = CbAttachmentEntry::kIsObject, .AttachmentHash = Data.GetObjectHash()};
// Attachment payloads
@@ -74,7 +74,7 @@ FormatPackageMessage(const CbPackage& Data)
{
CompositeBuffer Compressed = AttachmentBuffer.GetCompressed();
- *AttachmentInfo++ = {.AttachmentSize = AttachmentBuffer.GetCompressedSize(),
+ *AttachmentInfo++ = {.PayloadSize = AttachmentBuffer.GetCompressedSize(),
.Flags = CbAttachmentEntry::kIsCompressed,
.AttachmentHash = IoHash::FromBLAKE3(AttachmentBuffer.GetRawHash())};
@@ -89,13 +89,13 @@ FormatPackageMessage(const CbPackage& Data)
IoBuffer ObjIoBuffer = AttachmentObject.GetBuffer().AsIoBuffer();
ResponseBuffers.push_back(ObjIoBuffer);
- *AttachmentInfo++ = {.AttachmentSize = ObjIoBuffer.Size(),
+ *AttachmentInfo++ = {.PayloadSize = ObjIoBuffer.Size(),
.Flags = CbAttachmentEntry::kIsObject,
.AttachmentHash = Attachment.GetHash()};
}
else if (CompositeBuffer AttachmentBinary = Attachment.AsCompositeBinary())
{
- *AttachmentInfo++ = {.AttachmentSize = AttachmentBinary.GetSize(), .Flags = 0, .AttachmentHash = Attachment.GetHash()};
+ *AttachmentInfo++ = {.PayloadSize = AttachmentBinary.GetSize(), .Flags = 0, .AttachmentHash = Attachment.GetHash()};
for (const SharedBuffer& Segment : AttachmentBinary.GetSegments())
{
@@ -120,8 +120,7 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint
return {};
}
- MemoryInStream InStream(Payload);
- BinaryReader Reader(InStream);
+ BinaryReader Reader(Payload);
CbPackageHeader Hdr;
Reader.Read(&Hdr, sizeof Hdr);
@@ -142,7 +141,7 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint
for (uint32_t i = 0; i < ChunkCount; ++i)
{
const CbAttachmentEntry& Entry = AttachmentEntries[i];
- const uint64_t AttachmentSize = Entry.AttachmentSize;
+ const uint64_t AttachmentSize = Entry.PayloadSize;
IoBuffer AttachmentBuffer = CreateBuffer(Entry.AttachmentHash, AttachmentSize);
ZEN_ASSERT(AttachmentBuffer);
@@ -195,4 +194,123 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint
return Package;
}
-} // namespace zen \ No newline at end of file
+CbPackageReader::CbPackageReader() : m_CreateBuffer([](const IoHash&, uint64_t Size) -> IoBuffer { return IoBuffer{Size}; })
+{
+}
+
+CbPackageReader::~CbPackageReader()
+{
+}
+
+void
+CbPackageReader::SetPayloadBufferCreator(std::function<IoBuffer(const IoHash& Cid, uint64_t Size)> CreateBuffer)
+{
+ m_CreateBuffer = CreateBuffer;
+}
+
+/** Process data
+ */
+uint64_t
+CbPackageReader::ProcessHeaderData(const void* Data, uint64_t DataBytes)
+{
+ ZEN_ASSERT(m_CurrentState != State::kReadingBuffers);
+
+ switch (m_CurrentState)
+ {
+ case State::kInitialState:
+ ZEN_ASSERT(Data == nullptr);
+ m_CurrentState = State::kReadingHeader;
+ return sizeof m_PackageHeader;
+
+ case State::kReadingHeader:
+ ZEN_ASSERT(DataBytes == sizeof m_PackageHeader);
+ memcpy(&m_PackageHeader, Data, sizeof m_PackageHeader);
+ ZEN_ASSERT(m_PackageHeader.HeaderMagic == kCbPkgMagic);
+ m_CurrentState = State::kReadingAttachmentEntries;
+ m_AttachmentEntries.resize(m_PackageHeader.AttachmentCount + 1);
+ return (m_PackageHeader.AttachmentCount + 1) * sizeof(CbAttachmentEntry);
+
+ case State::kReadingAttachmentEntries:
+ ZEN_ASSERT(DataBytes == ((m_PackageHeader.AttachmentCount + 1) * sizeof(CbAttachmentEntry)));
+ memcpy(m_AttachmentEntries.data(), Data, DataBytes);
+
+ for (CbAttachmentEntry& Entry : m_AttachmentEntries)
+ {
+ m_PayloadBuffers.push_back(IoBuffer{Entry.PayloadSize});
+ }
+
+ m_CurrentState = State::kReadingBuffers;
+ return 0;
+
+ default:
+ ZEN_ASSERT(false);
+ return 0;
+ }
+}
+
+/**
+ ______________________ _____________________________
+ \__ ___/\_ _____// _____/\__ ___/ _____/
+ | | | __)_ \_____ \ | | \_____ \
+ | | | \/ \ | | / \
+ |____| /_______ /_______ / |____| /_______ /
+ \/ \/ \/
+ */
+
+#if ZEN_WITH_TESTS
+
+TEST_CASE("CbPackage.Serialization")
+{
+ // Make a test package
+
+ CbAttachment Attach1{SharedBuffer::MakeView(MakeMemoryView("abcd"))};
+ CbAttachment Attach2{SharedBuffer::MakeView(MakeMemoryView("efgh"))};
+
+ CbObjectWriter Cbo;
+ Cbo.AddAttachment("abcd", Attach1);
+ Cbo.AddAttachment("efgh", Attach2);
+
+ CbPackage Pkg;
+ Pkg.AddAttachment(Attach1);
+ Pkg.AddAttachment(Attach2);
+ Pkg.SetObject(Cbo.Save());
+
+ SharedBuffer Buffer = FormatPackageMessageBuffer(Pkg).Flatten();
+ const uint8_t* CursorPtr = reinterpret_cast<const uint8_t*>(Buffer.GetData());
+ uint64_t RemainingBytes = Buffer.GetSize();
+
+ auto ConsumeBytes = [&](uint64_t ByteCount) {
+ ZEN_ASSERT(ByteCount <= RemainingBytes);
+ void* ReturnPtr = (void*)CursorPtr;
+ CursorPtr += ByteCount;
+ RemainingBytes -= ByteCount;
+ return ReturnPtr;
+ };
+
+ auto CopyBytes = [&](void* TargetBuffer, uint64_t ByteCount) {
+ ZEN_ASSERT(ByteCount <= RemainingBytes);
+ memcpy(TargetBuffer, CursorPtr, ByteCount);
+ CursorPtr += ByteCount;
+ RemainingBytes -= ByteCount;
+ };
+
+ CbPackageReader Reader;
+ uint64_t InitialRead = Reader.ProcessHeaderData(nullptr, 0);
+ uint64_t NextBytes = Reader.ProcessHeaderData(ConsumeBytes(InitialRead), InitialRead);
+ NextBytes = Reader.ProcessHeaderData(ConsumeBytes(NextBytes), NextBytes);
+ auto Buffers = Reader.GetPayloadBuffers();
+
+ for (auto& PayloadBuffer : Buffers)
+ {
+ CopyBytes(PayloadBuffer.MutableData(), PayloadBuffer.GetSize());
+ }
+}
+
+void
+forcelink_httpshared()
+{
+}
+
+#endif
+
+} // namespace zen
diff --git a/zenhttp/httpshared.h b/zenhttp/httpshared.h
deleted file mode 100644
index 92c1ef9c6..000000000
--- a/zenhttp/httpshared.h
+++ /dev/null
@@ -1,51 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <zencore/iobuffer.h>
-#include <zencore/iohash.h>
-
-#include <functional>
-
-namespace zen {
-
-class IoBuffer;
-class CbPackage;
-class CompositeBuffer;
-
-struct CbPackageHeader
-{
- uint32_t HeaderMagic;
- uint32_t AttachmentCount;
- uint32_t Reserved1;
- uint32_t Reserved2;
-};
-
-static_assert(sizeof(CbPackageHeader) == 16);
-
-static constinit uint32_t kCbPkgMagic = 0xaa77aacc;
-
-struct CbAttachmentEntry
-{
- uint64_t AttachmentSize;
- uint32_t Flags;
- IoHash AttachmentHash;
-
- enum
- {
- kIsCompressed = (1u << 0), // Is marshaled using compressed buffer storage format
- kIsObject = (1u << 1), // Is compact binary object
- };
-};
-
-static_assert(sizeof(CbAttachmentEntry) == 32);
-
-std::vector<IoBuffer> FormatPackageMessage(const CbPackage& Data);
-CompositeBuffer FormatPackageMessageBuffer(const CbPackage& Data);
-CbPackage ParsePackageMessage(
- IoBuffer Payload,
- std::function<IoBuffer(const IoHash& Cid, uint64_t Size)> CreateBuffer = [](const IoHash&, uint64_t Size) -> IoBuffer {
- return IoBuffer{Size};
- });
-
-} // namespace zen
diff --git a/zenhttp/include/zenhttp/httpshared.h b/zenhttp/include/zenhttp/httpshared.h
index 92c1ef9c6..2e728577d 100644
--- a/zenhttp/include/zenhttp/httpshared.h
+++ b/zenhttp/include/zenhttp/httpshared.h
@@ -13,10 +13,23 @@ class IoBuffer;
class CbPackage;
class CompositeBuffer;
+/** _____ _ _____ _
+ / ____| | | __ \ | |
+ | | | |__ | |__) |_ _ ___| | ____ _ __ _ ___
+ | | | '_ \| ___/ _` |/ __| |/ / _` |/ _` |/ _ \
+ | |____| |_) | | | (_| | (__| < (_| | (_| | __/
+ \_____|_.__/|_| \__,_|\___|_|\_\__,_|\__, |\___|
+ __/ |
+ |___/
+
+ Structures and code related to handling CbPackage transactions
+
+ */
+
struct CbPackageHeader
{
uint32_t HeaderMagic;
- uint32_t AttachmentCount;
+ uint32_t AttachmentCount; // TODO: should add ability to opt out of implicit root document?
uint32_t Reserved1;
uint32_t Reserved2;
};
@@ -27,7 +40,7 @@ static constinit uint32_t kCbPkgMagic = 0xaa77aacc;
struct CbAttachmentEntry
{
- uint64_t AttachmentSize;
+ uint64_t PayloadSize;
uint32_t Flags;
IoHash AttachmentHash;
@@ -35,6 +48,7 @@ struct CbAttachmentEntry
{
kIsCompressed = (1u << 0), // Is marshaled using compressed buffer storage format
kIsObject = (1u << 1), // Is compact binary object
+ kIsError = (1u << 2), // Is error (compact binary formatted) object
};
};
@@ -48,4 +62,41 @@ CbPackage ParsePackageMessage(
return IoBuffer{Size};
});
+/** Streaming reader for compact binary packages
+
+ The goal is to ultimately support zero-copy I/O, but for now there'll be some
+ copying involved on some platforms at least.
+
+ */
+class CbPackageReader
+{
+public:
+ CbPackageReader();
+ ~CbPackageReader();
+
+ void SetPayloadBufferCreator(std::function<IoBuffer(const IoHash& Cid, uint64_t Size)> CreateBuffer);
+
+ /** Process header data
+ */
+ uint64_t ProcessHeaderData(const void* Data, uint64_t DataBytes);
+
+ std::span<IoBuffer> GetPayloadBuffers() { return m_PayloadBuffers; }
+
+private:
+ enum class State
+ {
+ kInitialState,
+ kReadingHeader,
+ kReadingAttachmentEntries,
+ kReadingBuffers
+ } m_CurrentState = State::kInitialState;
+
+ std::function<IoBuffer(const IoHash& Cid, uint64_t Size)> m_CreateBuffer;
+ std::vector<IoBuffer> m_PayloadBuffers;
+ std::vector<CbAttachmentEntry> m_AttachmentEntries;
+ CbPackageHeader m_PackageHeader;
+};
+
+void forcelink_httpshared();
+
} // namespace zen
diff --git a/zenhttp/zenhttp.cpp b/zenhttp/zenhttp.cpp
index 637486f55..0194abdcb 100644
--- a/zenhttp/zenhttp.cpp
+++ b/zenhttp/zenhttp.cpp
@@ -3,6 +3,7 @@
#include <zenhttp/zenhttp.h>
#include <zenhttp/httpserver.h>
+#include <zenhttp/httpshared.h>
namespace zen {
@@ -10,6 +11,7 @@ void
zenhttp_forcelinktests()
{
http_forcelink();
+ forcelink_httpshared();
}
-} // namespace zen \ No newline at end of file
+} // namespace zen
diff --git a/zenhttp/zenhttp.vcxproj b/zenhttp/zenhttp.vcxproj
index eca9898d3..899cf4bd1 100644
--- a/zenhttp/zenhttp.vcxproj
+++ b/zenhttp/zenhttp.vcxproj
@@ -104,12 +104,12 @@
</ItemGroup>
<ItemGroup>
<ClInclude Include="httpnull.h" />
- <ClInclude Include="httpshared.h" />
<ClInclude Include="httpsys.h" />
<ClInclude Include="httpuws.h" />
<ClInclude Include="include\zenhttp\httpclient.h" />
<ClInclude Include="include\zenhttp\httpcommon.h" />
<ClInclude Include="include\zenhttp\httpserver.h" />
+ <ClInclude Include="include\zenhttp\httpshared.h" />
<ClInclude Include="include\zenhttp\zenhttp.h" />
<ClInclude Include="iothreadpool.h" />
</ItemGroup>
diff --git a/zenhttp/zenhttp.vcxproj.filters b/zenhttp/zenhttp.vcxproj.filters
index 17f71bed1..2e968055c 100644
--- a/zenhttp/zenhttp.vcxproj.filters
+++ b/zenhttp/zenhttp.vcxproj.filters
@@ -18,8 +18,8 @@
<ClInclude Include="include\zenhttp\zenhttp.h" />
<ClInclude Include="httpnull.h" />
<ClInclude Include="httpuws.h" />
- <ClInclude Include="httpshared.h" />
<ClInclude Include="include\zenhttp\httpcommon.h" />
+ <ClInclude Include="include\zenhttp\httpshared.h" />
</ItemGroup>
<ItemGroup>
<None Include="xmake.lua" />