diff options
| author | Dan Engelbrecht <[email protected]> | 2024-04-05 16:28:02 +0200 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2024-04-05 16:28:02 +0200 |
| commit | cf72f74de2e041e565d51a857c398ad9ae365ad5 (patch) | |
| tree | 7076e9ba6507a10b5a7c298a480096fe79e39682 /src | |
| parent | 5.4.4-pre0 (diff) | |
| download | zen-de/better-composite-iterator.tar.xz zen-de/better-composite-iterator.zip | |
buffered composite iteratorde/better-composite-iterator
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenhttp/httpclient.cpp | 115 |
1 files changed, 99 insertions, 16 deletions
diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp index 262785a0a..1b9affeda 100644 --- a/src/zenhttp/httpclient.cpp +++ b/src/zenhttp/httpclient.cpp @@ -53,6 +53,66 @@ AsCprBody(const IoBuffer& Obj) ////////////////////////////////////////////////////////////////////////// +class SegmentIterator +{ +public: + SegmentIterator(std::span<const SharedBuffer> Buffers) : m_Buffers(Buffers) {} + + MemoryView GetView(size_t Size) + { + if (SegmentIndex == m_Buffers.size()) + { + return {}; + } + if (!CurrentSegment) + { + SegmentSize = m_Buffers[SegmentIndex].GetSize(); + if (SegmentSize > BatchSize) + { + CurrentSegment = IoBuffer(m_Buffers[SegmentIndex].AsIoBuffer(), 0, BatchSize); + } + else + { + CurrentSegment = m_Buffers[SegmentIndex].AsIoBuffer(); + } + } + else + { + if (SegmentOffset >= BatchOffset + CurrentSegment.GetSize()) + { + BatchOffset += CurrentSegment.GetSize(); + CurrentSegment = IoBuffer(m_Buffers[SegmentIndex].AsIoBuffer(), BatchOffset, BatchSize); + } + } + ZEN_ASSERT(SegmentOffset < SegmentSize); + return CurrentSegment.GetView().Mid(SegmentOffset - BatchOffset, Size); + } + + void Advance(size_t Size) + { + ZEN_ASSERT(Size <= SegmentSize); + ZEN_ASSERT(SegmentIndex < m_Buffers.size()); + SegmentOffset += Size; + if (SegmentOffset == SegmentSize) + { + BatchOffset = 0; + SegmentOffset = 0; + SegmentSize = 0; + SegmentIndex++; + CurrentSegment = {}; + } + } + +private: + static constexpr uint64_t BatchSize = 512u * 1024u; + std::span<const SharedBuffer> m_Buffers; + size_t SegmentIndex = 0; + size_t SegmentSize = 0; + uint64_t SegmentOffset = 0; + uint64_t BatchOffset = 0; + IoBuffer CurrentSegment; +}; + static HttpClient::Response ResponseWithPayload(cpr::Response& HttpResponse, const HttpResponseCode WorkResponseCode, IoBuffer&& Payload) { @@ -769,15 +829,27 @@ HttpClient::Post(std::string_view Url, const CompositeBuffer& Payload, ZenConten return CommonResponse(DoWithRetry( [&]() { - uint64_t SizeLeft = Payload.GetSize(); - CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); - auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, SizeLeft); - MutableMemoryView Data(buffer, size); - Payload.CopyTo(Data, BufferIt); - SizeLeft -= size; - return true; + // uint64_t SizeLeft = Payload.GetSize(); + // CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); + // auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { + // size = Min<size_t>(size, SizeLeft); + // MutableMemoryView Data(buffer, size); + // Payload.CopyTo(Data, BufferIt); + // SizeLeft -= size; + // return true; + // }; + + SegmentIterator BufferIt(Payload.GetSegments()); + + auto ReadCallback = [&BufferIt](char* buffer, size_t& size, intptr_t) { + MemoryView Source = BufferIt.GetView(size); + size = Source.GetSize(); + MutableMemoryView Data(buffer, size); + Data.CopyFrom(Source); + BufferIt.Advance(size); + return true; }; + Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); Sess->UpdateHeader({HeaderContentType(ContentType)}); @@ -828,15 +900,26 @@ HttpClient::Upload(std::string_view Url, const CompositeBuffer& Payload, ZenCont m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); Sess->UpdateHeader({HeaderContentType(ContentType)}); - uint64_t SizeLeft = Payload.GetSize(); - CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); - auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, SizeLeft); - MutableMemoryView Data(buffer, size); - Payload.CopyTo(Data, BufferIt); - SizeLeft -= size; - return true; + // uint64_t SizeLeft = Payload.GetSize(); + // CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); + // auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { + // size = Min<size_t>(size, SizeLeft); + // MutableMemoryView Data(buffer, size); + // Payload.CopyTo(Data, BufferIt); + // SizeLeft -= size; + // return true; + // }; + + SegmentIterator BufferIt(Payload.GetSegments()); + + auto ReadCallback = [&BufferIt](char* buffer, size_t& size, intptr_t) { + MemoryView Source = BufferIt.GetView(size); + MutableMemoryView Data(buffer, Source.GetSize()); + Data.CopyFrom(Source); + BufferIt.Advance(Source.GetSize()); + return true; }; + return Sess.Put(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); }, m_ConnectionSettings.RetryCount)); |