aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-04-05 16:28:02 +0200
committerDan Engelbrecht <[email protected]>2024-04-05 16:28:02 +0200
commitcf72f74de2e041e565d51a857c398ad9ae365ad5 (patch)
tree7076e9ba6507a10b5a7c298a480096fe79e39682
parent5.4.4-pre0 (diff)
downloadzen-de/better-composite-iterator.tar.xz
zen-de/better-composite-iterator.zip
buffered composite iteratorde/better-composite-iterator
-rw-r--r--src/zenhttp/httpclient.cpp115
1 files changed, 99 insertions, 16 deletions
diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp
index 262785a0a..1b9affeda 100644
--- a/src/zenhttp/httpclient.cpp
+++ b/src/zenhttp/httpclient.cpp
@@ -53,6 +53,66 @@ AsCprBody(const IoBuffer& Obj)
//////////////////////////////////////////////////////////////////////////
+class SegmentIterator
+{
+public:
+ SegmentIterator(std::span<const SharedBuffer> Buffers) : m_Buffers(Buffers) {}
+
+ MemoryView GetView(size_t Size)
+ {
+ if (SegmentIndex == m_Buffers.size())
+ {
+ return {};
+ }
+ if (!CurrentSegment)
+ {
+ SegmentSize = m_Buffers[SegmentIndex].GetSize();
+ if (SegmentSize > BatchSize)
+ {
+ CurrentSegment = IoBuffer(m_Buffers[SegmentIndex].AsIoBuffer(), 0, BatchSize);
+ }
+ else
+ {
+ CurrentSegment = m_Buffers[SegmentIndex].AsIoBuffer();
+ }
+ }
+ else
+ {
+ if (SegmentOffset >= BatchOffset + CurrentSegment.GetSize())
+ {
+ BatchOffset += CurrentSegment.GetSize();
+ CurrentSegment = IoBuffer(m_Buffers[SegmentIndex].AsIoBuffer(), BatchOffset, BatchSize);
+ }
+ }
+ ZEN_ASSERT(SegmentOffset < SegmentSize);
+ return CurrentSegment.GetView().Mid(SegmentOffset - BatchOffset, Size);
+ }
+
+ void Advance(size_t Size)
+ {
+ ZEN_ASSERT(Size <= SegmentSize);
+ ZEN_ASSERT(SegmentIndex < m_Buffers.size());
+ SegmentOffset += Size;
+ if (SegmentOffset == SegmentSize)
+ {
+ BatchOffset = 0;
+ SegmentOffset = 0;
+ SegmentSize = 0;
+ SegmentIndex++;
+ CurrentSegment = {};
+ }
+ }
+
+private:
+ static constexpr uint64_t BatchSize = 512u * 1024u;
+ std::span<const SharedBuffer> m_Buffers;
+ size_t SegmentIndex = 0;
+ size_t SegmentSize = 0;
+ uint64_t SegmentOffset = 0;
+ uint64_t BatchOffset = 0;
+ IoBuffer CurrentSegment;
+};
+
static HttpClient::Response
ResponseWithPayload(cpr::Response& HttpResponse, const HttpResponseCode WorkResponseCode, IoBuffer&& Payload)
{
@@ -769,15 +829,27 @@ HttpClient::Post(std::string_view Url, const CompositeBuffer& Payload, ZenConten
return CommonResponse(DoWithRetry(
[&]() {
- uint64_t SizeLeft = Payload.GetSize();
- CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0);
- auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) {
- size = Min<size_t>(size, SizeLeft);
- MutableMemoryView Data(buffer, size);
- Payload.CopyTo(Data, BufferIt);
- SizeLeft -= size;
- return true;
+ // uint64_t SizeLeft = Payload.GetSize();
+ // CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0);
+ // auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) {
+ // size = Min<size_t>(size, SizeLeft);
+ // MutableMemoryView Data(buffer, size);
+ // Payload.CopyTo(Data, BufferIt);
+ // SizeLeft -= size;
+ // return true;
+ // };
+
+ SegmentIterator BufferIt(Payload.GetSegments());
+
+ auto ReadCallback = [&BufferIt](char* buffer, size_t& size, intptr_t) {
+ MemoryView Source = BufferIt.GetView(size);
+ size = Source.GetSize();
+ MutableMemoryView Data(buffer, size);
+ Data.CopyFrom(Source);
+ BufferIt.Advance(size);
+ return true;
};
+
Impl::Session Sess =
m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
Sess->UpdateHeader({HeaderContentType(ContentType)});
@@ -828,15 +900,26 @@ HttpClient::Upload(std::string_view Url, const CompositeBuffer& Payload, ZenCont
m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
Sess->UpdateHeader({HeaderContentType(ContentType)});
- uint64_t SizeLeft = Payload.GetSize();
- CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0);
- auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) {
- size = Min<size_t>(size, SizeLeft);
- MutableMemoryView Data(buffer, size);
- Payload.CopyTo(Data, BufferIt);
- SizeLeft -= size;
- return true;
+ // uint64_t SizeLeft = Payload.GetSize();
+ // CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0);
+ // auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) {
+ // size = Min<size_t>(size, SizeLeft);
+ // MutableMemoryView Data(buffer, size);
+ // Payload.CopyTo(Data, BufferIt);
+ // SizeLeft -= size;
+ // return true;
+ // };
+
+ SegmentIterator BufferIt(Payload.GetSegments());
+
+ auto ReadCallback = [&BufferIt](char* buffer, size_t& size, intptr_t) {
+ MemoryView Source = BufferIt.GetView(size);
+ MutableMemoryView Data(buffer, Source.GetSize());
+ Data.CopyFrom(Source);
+ BufferIt.Advance(Source.GetSize());
+ return true;
};
+
return Sess.Put(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
},
m_ConnectionSettings.RetryCount));