aboutsummaryrefslogtreecommitdiff
path: root/src/zenhttp/clients/httpclientcpr.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-02-27 13:12:10 +0100
committerGitHub Enterprise <[email protected]>2026-02-27 13:12:10 +0100
commit9e7019aa16b19cd87aa6af3ef39825edb039c8be (patch)
tree389df8874e04efc77830043a6830e7aa053cc50f /src/zenhttp/clients/httpclientcpr.cpp
parentPorted "lane trace" feature from UE (by way of IAX) (#771) (diff)
downloadzen-9e7019aa16b19cd87aa6af3ef39825edb039c8be.tar.xz
zen-9e7019aa16b19cd87aa6af3ef39825edb039c8be.zip
add support in http client to accept multi-range responses (#788)
* add support in http client to accept multi-range responses
Diffstat (limited to 'src/zenhttp/clients/httpclientcpr.cpp')
-rw-r--r--src/zenhttp/clients/httpclientcpr.cpp530
1 files changed, 292 insertions, 238 deletions
diff --git a/src/zenhttp/clients/httpclientcpr.cpp b/src/zenhttp/clients/httpclientcpr.cpp
index 5d92b3b6b..6bc63db09 100644
--- a/src/zenhttp/clients/httpclientcpr.cpp
+++ b/src/zenhttp/clients/httpclientcpr.cpp
@@ -162,10 +162,11 @@ CprHttpClient::~CprHttpClient()
}
HttpClient::Response
-CprHttpClient::ResponseWithPayload(std::string_view SessionId,
- cpr::Response&& HttpResponse,
- const HttpResponseCode WorkResponseCode,
- IoBuffer&& Payload)
+CprHttpClient::ResponseWithPayload(std::string_view SessionId,
+ cpr::Response&& HttpResponse,
+ const HttpResponseCode WorkResponseCode,
+ IoBuffer&& Payload,
+ std::vector<HttpClient::Response::MultipartBoundary>&& BoundaryPositions)
{
// This ends up doing a memcpy, would be good to get rid of it by streaming results
// into buffer directly
@@ -174,7 +175,6 @@ CprHttpClient::ResponseWithPayload(std::string_view SessionId,
if (auto It = HttpResponse.header.find("Content-Type"); It != HttpResponse.header.end())
{
const HttpContentType ContentType = ParseContentType(It->second);
-
ResponseBuffer.SetContentType(ContentType);
}
@@ -188,16 +188,26 @@ CprHttpClient::ResponseWithPayload(std::string_view SessionId,
}
}
+ std::sort(BoundaryPositions.begin(),
+ BoundaryPositions.end(),
+ [](const HttpClient::Response::MultipartBoundary& Lhs, const HttpClient::Response::MultipartBoundary& Rhs) {
+ return Lhs.RangeOffset < Rhs.RangeOffset;
+ });
+
return HttpClient::Response{.StatusCode = WorkResponseCode,
.ResponsePayload = std::move(ResponseBuffer),
.Header = HttpClient::KeyValueMap(HttpResponse.header.begin(), HttpResponse.header.end()),
.UploadedBytes = gsl::narrow<int64_t>(HttpResponse.uploaded_bytes),
.DownloadedBytes = gsl::narrow<int64_t>(HttpResponse.downloaded_bytes),
- .ElapsedSeconds = HttpResponse.elapsed};
+ .ElapsedSeconds = HttpResponse.elapsed,
+ .Ranges = std::move(BoundaryPositions)};
}
HttpClient::Response
-CprHttpClient::CommonResponse(std::string_view SessionId, cpr::Response&& HttpResponse, IoBuffer&& Payload)
+CprHttpClient::CommonResponse(std::string_view SessionId,
+ cpr::Response&& HttpResponse,
+ IoBuffer&& Payload,
+ std::vector<HttpClient::Response::MultipartBoundary>&& BoundaryPositions)
{
const HttpResponseCode WorkResponseCode = HttpResponseCode(HttpResponse.status_code);
if (HttpResponse.error)
@@ -235,7 +245,7 @@ CprHttpClient::CommonResponse(std::string_view SessionId, cpr::Response&& HttpRe
}
else
{
- return ResponseWithPayload(SessionId, std::move(HttpResponse), WorkResponseCode, std::move(Payload));
+ return ResponseWithPayload(SessionId, std::move(HttpResponse), WorkResponseCode, std::move(Payload), std::move(BoundaryPositions));
}
}
@@ -896,236 +906,280 @@ CprHttpClient::Download(std::string_view Url, const std::filesystem::path& TempF
std::string PayloadString;
std::unique_ptr<detail::TempPayloadFile> PayloadFile;
- cpr::Response Response = DoWithRetry(
- m_SessionId,
- [&]() {
- auto GetHeader = [&](std::string header) -> std::pair<std::string, std::string> {
- size_t DelimiterPos = header.find(':');
- if (DelimiterPos != std::string::npos)
- {
- std::string Key = header.substr(0, DelimiterPos);
- constexpr AsciiSet WhitespaceCharacters(" \v\f\t\r\n");
- Key = AsciiSet::TrimSuffixWith(Key, WhitespaceCharacters);
- Key = AsciiSet::TrimPrefixWith(Key, WhitespaceCharacters);
-
- std::string Value = header.substr(DelimiterPos + 1);
- Value = AsciiSet::TrimSuffixWith(Value, WhitespaceCharacters);
- Value = AsciiSet::TrimPrefixWith(Value, WhitespaceCharacters);
-
- return std::make_pair(Key, Value);
- }
- return std::make_pair(header, "");
- };
-
- auto DownloadCallback = [&](std::string data, intptr_t) {
- if (m_CheckIfAbortFunction && m_CheckIfAbortFunction())
- {
- return false;
- }
- if (PayloadFile)
- {
- ZEN_ASSERT(PayloadString.empty());
- std::error_code Ec = PayloadFile->Write(data);
- if (Ec)
- {
- ZEN_WARN("Failed to write to temp file in '{}' for HttpClient::Download. Reason: {}",
- TempFolderPath.string(),
- Ec.message());
- return false;
- }
- }
- else
- {
- PayloadString.append(data);
- }
- return true;
- };
-
- uint64_t RequestedContentLength = (uint64_t)-1;
- if (auto RangeIt = AdditionalHeader.Entries.find("Range"); RangeIt != AdditionalHeader.Entries.end())
- {
- if (RangeIt->second.starts_with("bytes"))
- {
- size_t RangeStartPos = RangeIt->second.find('=', 5);
- if (RangeStartPos != std::string::npos)
- {
- RangeStartPos++;
- size_t RangeSplitPos = RangeIt->second.find('-', RangeStartPos);
- if (RangeSplitPos != std::string::npos)
- {
- std::optional<size_t> RequestedRangeStart =
- ParseInt<size_t>(RangeIt->second.substr(RangeStartPos, RangeSplitPos - RangeStartPos));
- std::optional<size_t> RequestedRangeEnd = ParseInt<size_t>(RangeIt->second.substr(RangeStartPos + 1));
- if (RequestedRangeStart.has_value() && RequestedRangeEnd.has_value())
- {
- RequestedContentLength = RequestedRangeEnd.value() - 1;
- }
- }
- }
- }
- }
-
- cpr::Response Response;
- {
- std::vector<std::pair<std::string, std::string>> ReceivedHeaders;
- auto HeaderCallback = [&](std::string header, intptr_t) {
- std::pair<std::string, std::string> Header = GetHeader(header);
- if (Header.first == "Content-Length"sv)
- {
- std::optional<size_t> ContentLength = ParseInt<size_t>(Header.second);
- if (ContentLength.has_value())
- {
- if (ContentLength.value() > m_ConnectionSettings.MaximumInMemoryDownloadSize)
- {
- PayloadFile = std::make_unique<detail::TempPayloadFile>();
- std::error_code Ec = PayloadFile->Open(TempFolderPath, ContentLength.value());
- if (Ec)
- {
- ZEN_WARN("Failed to create temp file in '{}' for HttpClient::Download. Reason: {}",
- TempFolderPath.string(),
- Ec.message());
- PayloadFile.reset();
- }
- }
- else
- {
- PayloadString.reserve(ContentLength.value());
- }
- }
- }
- if (!Header.first.empty())
- {
- ReceivedHeaders.emplace_back(std::move(Header));
- }
- return 1;
- };
-
- Session Sess = AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
- Response = Sess.Download(cpr::WriteCallback{DownloadCallback}, cpr::HeaderCallback{HeaderCallback});
- for (const std::pair<std::string, std::string>& H : ReceivedHeaders)
- {
- Response.header.insert_or_assign(H.first, H.second);
- }
- }
- if (m_ConnectionSettings.AllowResume)
- {
- auto SupportsRanges = [](const cpr::Response& Response) -> bool {
- if (Response.header.find("Content-Range") != Response.header.end())
- {
- return true;
- }
- if (auto It = Response.header.find("Accept-Ranges"); It != Response.header.end())
- {
- return It->second == "bytes"sv;
- }
- return false;
- };
-
- auto ShouldResume = [&SupportsRanges](const cpr::Response& Response) -> bool {
- if (ShouldRetry(Response))
- {
- return SupportsRanges(Response);
- }
- return false;
- };
-
- if (ShouldResume(Response))
- {
- auto It = Response.header.find("Content-Length");
- if (It != Response.header.end())
- {
- uint64_t ContentLength = RequestedContentLength;
- if (ContentLength == uint64_t(-1))
- {
- if (auto ParsedContentLength = ParseInt<int64_t>(It->second); ParsedContentLength.has_value())
- {
- ContentLength = ParsedContentLength.value();
- }
- }
-
- std::vector<std::pair<std::string, std::string>> ReceivedHeaders;
-
- auto HeaderCallback = [&](std::string header, intptr_t) {
- std::pair<std::string, std::string> Header = GetHeader(header);
- if (!Header.first.empty())
- {
- ReceivedHeaders.emplace_back(std::move(Header));
- }
-
- if (Header.first == "Content-Range"sv)
- {
- if (Header.second.starts_with("bytes "sv))
- {
- size_t RangeStartEnd = Header.second.find('-', 6);
- if (RangeStartEnd != std::string::npos)
- {
- const auto Start = ParseInt<uint64_t>(Header.second.substr(6, RangeStartEnd - 6));
- if (Start)
- {
- uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length();
- if (Start.value() == DownloadedSize)
- {
- return 1;
- }
- else if (Start.value() > DownloadedSize)
- {
- return 0;
- }
- if (PayloadFile)
- {
- PayloadFile->ResetWritePos(Start.value());
- }
- else
- {
- PayloadString = PayloadString.substr(0, Start.value());
- }
- return 1;
- }
- }
- }
- return 0;
- }
- return 1;
- };
-
- KeyValueMap HeadersWithRange(AdditionalHeader);
- do
- {
- uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length();
-
- std::string Range = fmt::format("bytes={}-{}", DownloadedSize, DownloadedSize + ContentLength - 1);
- if (auto RangeIt = HeadersWithRange.Entries.find("Range"); RangeIt != HeadersWithRange.Entries.end())
- {
- if (RangeIt->second == Range)
- {
- // If we didn't make any progress, abort
- break;
- }
- }
- HeadersWithRange.Entries.insert_or_assign("Range", Range);
-
- Session Sess =
- AllocSession(m_BaseUri, Url, m_ConnectionSettings, HeadersWithRange, {}, m_SessionId, GetAccessToken());
- Response = Sess.Download(cpr::WriteCallback{DownloadCallback}, cpr::HeaderCallback{HeaderCallback});
- for (const std::pair<std::string, std::string>& H : ReceivedHeaders)
- {
- Response.header.insert_or_assign(H.first, H.second);
- }
- ReceivedHeaders.clear();
- } while (ShouldResume(Response));
- }
- }
- }
-
- if (!PayloadString.empty())
- {
- Response.text = std::move(PayloadString);
- }
- return Response;
- },
- PayloadFile);
-
- return CommonResponse(m_SessionId, std::move(Response), PayloadFile ? PayloadFile->DetachToIoBuffer() : IoBuffer{});
+
+ HttpContentType ContentType = HttpContentType::kUnknownContentType;
+ detail::MultipartBoundaryParser BoundaryParser;
+ bool IsMultiRangeResponse = false;
+
+ cpr::Response Response = DoWithRetry(
+ m_SessionId,
+ [&]() {
+ auto DownloadCallback = [&](std::string data, intptr_t) {
+ if (m_CheckIfAbortFunction && m_CheckIfAbortFunction())
+ {
+ return false;
+ }
+
+ if (IsMultiRangeResponse)
+ {
+ BoundaryParser.ParseInput(data);
+ }
+
+ if (PayloadFile)
+ {
+ ZEN_ASSERT(PayloadString.empty());
+ std::error_code Ec = PayloadFile->Write(data);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to write to temp file in '{}' for HttpClient::Download. Reason: {}",
+ TempFolderPath.string(),
+ Ec.message());
+ return false;
+ }
+ }
+ else
+ {
+ PayloadString.append(data);
+ }
+ return true;
+ };
+
+ uint64_t RequestedContentLength = (uint64_t)-1;
+ if (auto RangeIt = AdditionalHeader.Entries.find("Range"); RangeIt != AdditionalHeader.Entries.end())
+ {
+ if (RangeIt->second.starts_with("bytes"))
+ {
+ std::string_view RangeValue(RangeIt->second);
+ size_t RangeStartPos = RangeValue.find('=', 5);
+ if (RangeStartPos != std::string::npos)
+ {
+ RangeStartPos++;
+ while (RangeValue[RangeStartPos] == ' ')
+ {
+ RangeStartPos++;
+ }
+ RequestedContentLength = 0;
+
+ while (RangeStartPos < RangeValue.length())
+ {
+ size_t RangeEnd = RangeValue.find_first_of(", \r\n", RangeStartPos);
+ if (RangeEnd == std::string::npos)
+ {
+ RangeEnd = RangeValue.length();
+ }
+
+ std::string_view RangeString = RangeValue.substr(RangeStartPos, RangeEnd - RangeStartPos);
+ size_t RangeSplitPos = RangeString.find('-');
+ if (RangeSplitPos != std::string::npos)
+ {
+ std::optional<size_t> RequestedRangeStart = ParseInt<size_t>(RangeString.substr(0, RangeSplitPos));
+ std::optional<size_t> RequestedRangeEnd = ParseInt<size_t>(RangeString.substr(RangeSplitPos + 1));
+ if (RequestedRangeStart.has_value() && RequestedRangeEnd.has_value())
+ {
+ RequestedContentLength += RequestedRangeEnd.value() - 1;
+ }
+ }
+ RangeStartPos = RangeEnd;
+ while (RangeStartPos != RangeValue.length() &&
+ (RangeValue[RangeStartPos] == ',' || RangeValue[RangeStartPos] == ' '))
+ {
+ RangeStartPos++;
+ }
+ }
+ }
+ }
+ }
+
+ cpr::Response Response;
+ {
+ std::vector<std::pair<std::string, std::string>> ReceivedHeaders;
+ auto HeaderCallback = [&](std::string header, intptr_t) {
+ const std::pair<std::string_view, std::string_view> Header = detail::GetHeaderKeyAndValue(header);
+ if (Header.first == "Content-Length"sv)
+ {
+ std::optional<size_t> ContentLength = ParseInt<size_t>(Header.second);
+ if (ContentLength.has_value())
+ {
+ if (ContentLength.value() > m_ConnectionSettings.MaximumInMemoryDownloadSize)
+ {
+ PayloadFile = std::make_unique<detail::TempPayloadFile>();
+ std::error_code Ec = PayloadFile->Open(TempFolderPath, ContentLength.value());
+ if (Ec)
+ {
+ ZEN_WARN("Failed to create temp file in '{}' for HttpClient::Download. Reason: {}",
+ TempFolderPath.string(),
+ Ec.message());
+ PayloadFile.reset();
+ }
+ }
+ else
+ {
+ PayloadString.reserve(ContentLength.value());
+ }
+ }
+ }
+ else if (Header.first == "Content-Type")
+ {
+ IsMultiRangeResponse = BoundaryParser.Init(Header.second);
+ if (!IsMultiRangeResponse)
+ {
+ ContentType = ParseContentType(Header.second);
+ }
+ }
+ else if (Header.first == "Content-Range")
+ {
+ if (!IsMultiRangeResponse)
+ {
+ std::pair<uint64_t, uint64_t> Range = detail::ParseContentRange(Header.second);
+ if (Range.second != 0)
+ {
+ BoundaryParser.Boundaries.push_back(HttpClient::Response::MultipartBoundary{.OffsetInPayload = 0,
+ .RangeOffset = Range.first,
+ .RangeLength = Range.second,
+ .ContentType = ContentType});
+ }
+ }
+ }
+ if (!Header.first.empty())
+ {
+ ReceivedHeaders.emplace_back(std::move(Header));
+ }
+ return 1;
+ };
+
+ Session Sess = AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
+ Response = Sess.Download(cpr::WriteCallback{DownloadCallback}, cpr::HeaderCallback{HeaderCallback});
+ for (const std::pair<std::string, std::string>& H : ReceivedHeaders)
+ {
+ Response.header.insert_or_assign(H.first, H.second);
+ }
+ }
+ if (m_ConnectionSettings.AllowResume)
+ {
+ auto SupportsRanges = [](const cpr::Response& Response) -> bool {
+ if (Response.header.find("Content-Range") != Response.header.end())
+ {
+ return true;
+ }
+ if (auto It = Response.header.find("Accept-Ranges"); It != Response.header.end())
+ {
+ return It->second == "bytes"sv;
+ }
+ return false;
+ };
+
+ auto ShouldResume = [&SupportsRanges, &IsMultiRangeResponse](const cpr::Response& Response) -> bool {
+ if (IsMultiRangeResponse)
+ {
+ return false;
+ }
+ if (ShouldRetry(Response))
+ {
+ return SupportsRanges(Response);
+ }
+ return false;
+ };
+
+ if (ShouldResume(Response))
+ {
+ auto It = Response.header.find("Content-Length");
+ if (It != Response.header.end())
+ {
+ uint64_t ContentLength = RequestedContentLength;
+ if (ContentLength == uint64_t(-1))
+ {
+ if (auto ParsedContentLength = ParseInt<int64_t>(It->second); ParsedContentLength.has_value())
+ {
+ ContentLength = ParsedContentLength.value();
+ }
+ }
+
+ std::vector<std::pair<std::string, std::string>> ReceivedHeaders;
+
+ auto HeaderCallback = [&](std::string header, intptr_t) {
+ const std::pair<std::string_view, std::string_view> Header = detail::GetHeaderKeyAndValue(header);
+ if (!Header.first.empty())
+ {
+ ReceivedHeaders.emplace_back(std::move(Header));
+ }
+
+ if (Header.first == "Content-Range"sv)
+ {
+ if (Header.second.starts_with("bytes "sv))
+ {
+ size_t RangeStartEnd = Header.second.find('-', 6);
+ if (RangeStartEnd != std::string::npos)
+ {
+ const auto Start = ParseInt<uint64_t>(Header.second.substr(6, RangeStartEnd - 6));
+ if (Start)
+ {
+ uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length();
+ if (Start.value() == DownloadedSize)
+ {
+ return 1;
+ }
+ else if (Start.value() > DownloadedSize)
+ {
+ return 0;
+ }
+ if (PayloadFile)
+ {
+ PayloadFile->ResetWritePos(Start.value());
+ }
+ else
+ {
+ PayloadString = PayloadString.substr(0, Start.value());
+ }
+ return 1;
+ }
+ }
+ }
+ return 0;
+ }
+ return 1;
+ };
+
+ KeyValueMap HeadersWithRange(AdditionalHeader);
+ do
+ {
+ uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length();
+
+ std::string Range = fmt::format("bytes={}-{}", DownloadedSize, DownloadedSize + ContentLength - 1);
+ if (auto RangeIt = HeadersWithRange.Entries.find("Range"); RangeIt != HeadersWithRange.Entries.end())
+ {
+ if (RangeIt->second == Range)
+ {
+ // If we didn't make any progress, abort
+ break;
+ }
+ }
+ HeadersWithRange.Entries.insert_or_assign("Range", Range);
+
+ Session Sess =
+ AllocSession(m_BaseUri, Url, m_ConnectionSettings, HeadersWithRange, {}, m_SessionId, GetAccessToken());
+ Response = Sess.Download(cpr::WriteCallback{DownloadCallback}, cpr::HeaderCallback{HeaderCallback});
+ for (const std::pair<std::string, std::string>& H : ReceivedHeaders)
+ {
+ Response.header.insert_or_assign(H.first, H.second);
+ }
+ ReceivedHeaders.clear();
+ } while (ShouldResume(Response));
+ }
+ }
+ }
+
+ if (!PayloadString.empty())
+ {
+ Response.text = std::move(PayloadString);
+ }
+ return Response;
+ },
+ PayloadFile);
+
+ return CommonResponse(m_SessionId,
+ std::move(Response),
+ PayloadFile ? PayloadFile->DetachToIoBuffer() : IoBuffer{},
+ std::move(BoundaryParser.Boundaries));
}
} // namespace zen