aboutsummaryrefslogtreecommitdiff
path: root/src/zenhttp/httpclient.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-01-24 11:41:18 +0100
committerGitHub <[email protected]>2024-01-24 11:41:18 +0100
commit0e63573fbe9973f6b922656a785817a711581b78 (patch)
tree48e18f0b4aea958a536ba50f72f589a580c4b798 /src/zenhttp/httpclient.cpp
parentoplog import/export improvements (#634) (diff)
downloadzen-0e63573fbe9973f6b922656a785817a711581b78.tar.xz
zen-0e63573fbe9973f6b922656a785817a711581b78.zip
Add retry with optional resume logic to HttpClient::Download (#639)
- Improvement: Refactored Jupiter upstream to use HttpClient - Improvement: Added retry and resume logic to HttpClient - Improvement: Added authentication support to HttpClient - Improvement: Clearer logging in GCV2 compact of FileCas/BlockStore - Improvement: Size details in oplog import logging
Diffstat (limited to 'src/zenhttp/httpclient.cpp')
-rw-r--r--src/zenhttp/httpclient.cpp530
1 files changed, 393 insertions, 137 deletions
diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp
index a29a08a3c..3b2a3baec 100644
--- a/src/zenhttp/httpclient.cpp
+++ b/src/zenhttp/httpclient.cpp
@@ -13,6 +13,7 @@
#include <zencore/session.h>
#include <zencore/sharedbuffer.h>
#include <zencore/stream.h>
+#include <zencore/string.h>
#include <zencore/testing.h>
#include <zencore/trace.h>
#include <zenhttp/formatters.h>
@@ -38,19 +39,19 @@ using namespace std::literals;
//
// CPR helpers
-cpr::Body
+static cpr::Body
AsCprBody(const CbObject& Obj)
{
return cpr::Body((const char*)Obj.GetBuffer().GetData(), Obj.GetBuffer().GetSize());
}
-cpr::Body
+static cpr::Body
AsCprBody(const IoBuffer& Obj)
{
return cpr::Body((const char*)Obj.GetData(), Obj.GetSize());
}
-cpr::Body
+static cpr::Body
AsCprBody(const CompositeBuffer& Buffers)
{
SharedBuffer Buffer = Buffers.Flatten();
@@ -62,7 +63,7 @@ AsCprBody(const CompositeBuffer& Buffers)
//////////////////////////////////////////////////////////////////////////
-HttpClient::Response
+static HttpClient::Response
ResponseWithPayload(cpr::Response& HttpResponse, const HttpResponseCode WorkResponseCode, IoBuffer&& Payload)
{
// This ends up doing a memcpy, would be good to get rid of it by streaming results
@@ -89,7 +90,7 @@ ResponseWithPayload(cpr::Response& HttpResponse, const HttpResponseCode WorkResp
.ElapsedSeconds = HttpResponse.elapsed};
}
-HttpClient::Response
+static HttpClient::Response
CommonResponse(cpr::Response&& HttpResponse, IoBuffer&& Payload = {})
{
const HttpResponseCode WorkResponseCode = HttpResponseCode(HttpResponse.status_code);
@@ -123,6 +124,51 @@ CommonResponse(cpr::Response&& HttpResponse, IoBuffer&& Payload = {})
}
}
+static bool
+ShouldRetry(const cpr::Response& Response)
+{
+ switch (Response.error.code)
+ {
+ case cpr::ErrorCode::OK:
+ break;
+ case cpr::ErrorCode::OPERATION_TIMEDOUT:
+ case cpr::ErrorCode::NETWORK_RECEIVE_ERROR:
+ case cpr::ErrorCode::NETWORK_SEND_FAILURE:
+ return true;
+ default:
+ return false;
+ }
+ switch ((HttpResponseCode)Response.status_code)
+ {
+ case HttpResponseCode::GatewayTimeout:
+ case HttpResponseCode::RequestTimeout:
+ return true;
+ default:
+ return false;
+ }
+};
+
+static cpr::Response
+DoWithRetry(std::function<cpr::Response()>&& Func, uint8_t RetryCount)
+{
+ uint8_t Attempt = 0;
+ cpr::Response Result = Func();
+ while (Attempt < RetryCount && ShouldRetry(Result))
+ {
+ Sleep(100 * (Attempt + 1));
+ Attempt++;
+ ZEN_INFO("{} Attempt {}/{}", CommonResponse(std::move(Result)).ErrorMessage("Retry"), Attempt, RetryCount + 1);
+ Result = Func();
+ }
+ return Result;
+}
+
+static std::pair<std::string, std::string>
+HeaderContentType(ZenContentType ContentType)
+{
+ return std::make_pair("Content-Type", std::string(MapContentTypeToString(ContentType)));
+}
+
//////////////////////////////////////////////////////////////////////////
struct HttpClient::Impl : public RefCounted
@@ -144,9 +190,13 @@ struct HttpClient::Impl : public RefCounted
ZEN_TRACE("GET {}", Result);
return Result;
}
- inline cpr::Response Download(cpr::WriteCallback&& write)
+ inline cpr::Response Download(cpr::WriteCallback&& Write, std::optional<cpr::HeaderCallback>&& Header = {})
{
- cpr::Response Result = CprSession->Download(write);
+ if (Header)
+ {
+ CprSession->SetHeaderCallback(std::move(Header.value()));
+ }
+ cpr::Response Result = CprSession->Download(Write);
ZEN_TRACE("GET {}", Result);
return Result;
}
@@ -185,11 +235,13 @@ struct HttpClient::Impl : public RefCounted
Session& operator=(Session&&) = delete;
};
- Session AllocSession(const std::string_view BaseUrl,
- const std::string_view Url,
- const HttpClientSettings& ConnectionSettings,
- const KeyValueMap& AdditionalHeader,
- const KeyValueMap& Parameters);
+ Session AllocSession(const std::string_view BaseUrl,
+ const std::string_view Url,
+ const HttpClientSettings& ConnectionSettings,
+ const KeyValueMap& AdditionalHeader,
+ const KeyValueMap& Parameters,
+ const std::string_view SessionId,
+ std::optional<HttpClientAccessToken> AccessToken);
LoggerRef Logger() { return m_Log; }
@@ -217,29 +269,26 @@ HttpClient::Impl::~Impl()
}
HttpClient::Impl::Session
-HttpClient::Impl::AllocSession(const std::string_view BaseUrl,
- const std::string_view ResourcePath,
- const HttpClientSettings& ConnectionSettings,
- const KeyValueMap& AdditionalHeader,
- const KeyValueMap& Parameters)
+HttpClient::Impl::AllocSession(const std::string_view BaseUrl,
+ const std::string_view ResourcePath,
+ const HttpClientSettings& ConnectionSettings,
+ const KeyValueMap& AdditionalHeader,
+ const KeyValueMap& Parameters,
+ const std::string_view SessionId,
+ std::optional<HttpClientAccessToken> AccessToken)
{
- bool IsNew = false;
cpr::Session* CprSession = nullptr;
m_SessionLock.WithExclusiveLock([&] {
- if (m_Sessions.empty())
- {
- CprSession = new cpr::Session();
- IsNew = true;
- }
- else
+ if (!m_Sessions.empty())
{
CprSession = m_Sessions.back();
m_Sessions.pop_back();
}
});
- if (IsNew)
+ if (CprSession == nullptr)
{
+ CprSession = new cpr::Session();
CprSession->SetConnectTimeout(ConnectionSettings.ConnectTimeout);
CprSession->SetTimeout(ConnectionSettings.Timeout);
if (ConnectionSettings.AssumeHttp2)
@@ -252,9 +301,13 @@ HttpClient::Impl::AllocSession(const std::string_view BaseUrl,
{
CprSession->SetHeader(cpr::Header(AdditionalHeader->begin(), AdditionalHeader->end()));
}
- else
+ if (!SessionId.empty())
{
- CprSession->SetHeader({});
+ CprSession->UpdateHeader({{"UE-Session", std::string(SessionId)}});
+ }
+ if (AccessToken)
+ {
+ CprSession->UpdateHeader({{"Authorization", AccessToken->Value}});
}
if (!Parameters->empty())
{
@@ -430,6 +483,9 @@ public:
return Buffer;
}
+ uint64_t GetSize() const { return m_WriteOffset; }
+ void ResetWritePos(uint64_t WriteOffset) { m_WriteOffset = WriteOffset; }
+
private:
void* m_FileHandle;
std::uint64_t m_WriteOffset;
@@ -452,12 +508,46 @@ HttpClient::~HttpClient()
{
}
+bool
+HttpClient::Authenticate()
+{
+ std::optional<HttpClientAccessToken> Token = GetAccessToken();
+ if (!Token)
+ {
+ return false;
+ }
+ return Token->IsValid();
+}
+
+const std::optional<HttpClientAccessToken>
+HttpClient::GetAccessToken()
+{
+ if (!m_ConnectionSettings.AccessTokenProvider.has_value())
+ {
+ return {};
+ }
+ {
+ RwLock::SharedLockScope _(m_AccessTokenLock);
+ if (m_CachedAccessToken.IsValid())
+ {
+ return m_CachedAccessToken;
+ }
+ }
+ RwLock::ExclusiveLockScope _(m_AccessTokenLock);
+ if (m_CachedAccessToken.IsValid())
+ {
+ return m_CachedAccessToken;
+ }
+ m_CachedAccessToken = m_ConnectionSettings.AccessTokenProvider.value()();
+ return m_CachedAccessToken;
+}
+
HttpClient::Response
HttpClient::TransactPackage(std::string_view Url, CbPackage Package, const KeyValueMap& AdditionalHeader)
{
ZEN_TRACE_CPU("HttpClient::TransactPackage");
- Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {});
+ Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
// First, list of offered chunks for filtering on the server end
@@ -482,7 +572,7 @@ HttpClient::TransactPackage(std::string_view Url, CbPackage Package, const KeyVa
BinaryWriter MemWriter;
Writer.Save(MemWriter);
- Sess->UpdateHeader({{"Content-Type", "application/x-ue-offer"}, {"UE-Session", m_SessionId}, {"UE-Request", RequestIdString}});
+ Sess->UpdateHeader({HeaderContentType(HttpContentType::kCbPackageOffer), {"UE-Request", RequestIdString}});
Sess->SetBody(cpr::Body{(const char*)MemWriter.Data(), MemWriter.Size()});
cpr::Response FilterResponse = Sess.Post();
@@ -524,7 +614,7 @@ HttpClient::TransactPackage(std::string_view Url, CbPackage Package, const KeyVa
CompositeBuffer Message = FormatPackageMessageBuffer(SendPackage);
SharedBuffer FlatMessage = Message.Flatten();
- Sess->UpdateHeader({{"Content-Type", "application/x-ue-cbpkg"}, {"UE-Session", m_SessionId}, {"UE-Request", RequestIdString}});
+ Sess->UpdateHeader({HeaderContentType(HttpContentType::kCbPackage), {"UE-Request", RequestIdString}});
Sess->SetBody(cpr::Body{(const char*)FlatMessage.GetData(), FlatMessage.GetSize()});
cpr::Response FilterResponse = Sess.Post();
@@ -556,20 +646,28 @@ HttpClient::Put(std::string_view Url, const IoBuffer& Payload, const KeyValueMap
{
ZEN_TRACE_CPU("HttpClient::Put");
- Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {});
- Sess->SetBody(AsCprBody(Payload));
- Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(Payload.GetContentType()))}});
-
- return CommonResponse(Sess.Put());
+ return CommonResponse(DoWithRetry(
+ [&]() {
+ Impl::Session Sess =
+ m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
+ Sess->SetBody(AsCprBody(Payload));
+ Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())});
+ return Sess.Put();
+ },
+ m_ConnectionSettings.RetryCount));
}
HttpClient::Response
HttpClient::Get(std::string_view Url, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters)
{
ZEN_TRACE_CPU("HttpClient::Get");
- Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters);
-
- return CommonResponse(Sess.Get());
+ return CommonResponse(DoWithRetry(
+ [&]() {
+ Impl::Session Sess =
+ m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters, m_SessionId, GetAccessToken());
+ return Sess.Get();
+ },
+ m_ConnectionSettings.RetryCount));
}
HttpClient::Response
@@ -577,9 +675,13 @@ HttpClient::Head(std::string_view Url, const KeyValueMap& AdditionalHeader)
{
ZEN_TRACE_CPU("HttpClient::Head");
- Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {});
-
- return CommonResponse(Sess.Head());
+ return CommonResponse(DoWithRetry(
+ [&]() {
+ Impl::Session Sess =
+ m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
+ return Sess.Head();
+ },
+ m_ConnectionSettings.RetryCount));
}
HttpClient::Response
@@ -587,9 +689,13 @@ HttpClient::Delete(std::string_view Url, const KeyValueMap& AdditionalHeader)
{
ZEN_TRACE_CPU("HttpClient::Delete");
- Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {});
-
- return CommonResponse(Sess.Delete());
+ return CommonResponse(DoWithRetry(
+ [&]() {
+ Impl::Session Sess =
+ m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
+ return Sess.Delete();
+ },
+ m_ConnectionSettings.RetryCount));
}
HttpClient::Response
@@ -597,9 +703,13 @@ HttpClient::Post(std::string_view Url, const KeyValueMap& AdditionalHeader, cons
{
ZEN_TRACE_CPU("HttpClient::PostNoPayload");
- Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters);
-
- return CommonResponse(Sess.Post());
+ return CommonResponse(DoWithRetry(
+ [&]() {
+ Impl::Session Sess =
+ m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters, m_SessionId, GetAccessToken());
+ return Sess.Post();
+ },
+ m_ConnectionSettings.RetryCount));
}
HttpClient::Response
@@ -607,12 +717,16 @@ HttpClient::Post(std::string_view Url, const IoBuffer& Payload, const KeyValueMa
{
ZEN_TRACE_CPU("HttpClient::PostWithPayload");
- Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {});
+ return CommonResponse(DoWithRetry(
+ [&]() {
+ Impl::Session Sess =
+ m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
- Sess->SetBody(AsCprBody(Payload));
- Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(Payload.GetContentType()))}});
-
- return CommonResponse(Sess.Post());
+ Sess->SetBody(AsCprBody(Payload));
+ Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())});
+ return Sess.Post();
+ },
+ m_ConnectionSettings.RetryCount));
}
HttpClient::Response
@@ -620,12 +734,16 @@ HttpClient::Post(std::string_view Url, CbObject Payload, const KeyValueMap& Addi
{
ZEN_TRACE_CPU("HttpClient::PostObjectPayload");
- Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {});
-
- Sess->SetBody(AsCprBody(Payload));
- Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(ZenContentType::kCbObject))}});
+ return CommonResponse(DoWithRetry(
+ [&]() {
+ Impl::Session Sess =
+ m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
- return CommonResponse(Sess.Post());
+ Sess->SetBody(AsCprBody(Payload));
+ Sess->UpdateHeader({HeaderContentType(ZenContentType::kCbObject)});
+ return Sess.Post();
+ },
+ m_ConnectionSettings.RetryCount));
}
HttpClient::Response
@@ -633,13 +751,17 @@ HttpClient::Post(std::string_view Url, CbPackage Pkg, const KeyValueMap& Additio
{
ZEN_TRACE_CPU("HttpClient::PostPackage");
- CompositeBuffer Message = zen::FormatPackageMessageBuffer(Pkg);
-
- Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {});
- Sess->SetBody(AsCprBody(Message));
- Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(ZenContentType::kCbPackage))}});
-
- return CommonResponse(Sess.Post());
+ return CommonResponse(DoWithRetry(
+ [&]() {
+ CompositeBuffer Message = zen::FormatPackageMessageBuffer(Pkg);
+
+ Impl::Session Sess =
+ m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
+ Sess->SetBody(AsCprBody(Message));
+ Sess->UpdateHeader({HeaderContentType(ZenContentType::kCbPackage)});
+ return Sess.Post();
+ },
+ m_ConnectionSettings.RetryCount));
}
HttpClient::Response
@@ -647,27 +769,32 @@ HttpClient::Upload(std::string_view Url, const IoBuffer& Payload, const KeyValue
{
ZEN_TRACE_CPU("HttpClient::Upload");
- Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {});
- Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(Payload.GetContentType()))}});
+ return CommonResponse(DoWithRetry(
+ [&]() {
+ Impl::Session Sess =
+ m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
+ Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())});
- uint64_t Offset = 0;
- if (Payload.IsWholeFile())
- {
- auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) {
- size = Min<size_t>(size, Payload.GetSize() - Offset);
- IoBuffer PayloadRange = IoBuffer(Payload, Offset, size);
- MutableMemoryView Data(buffer, size);
- Data.CopyFrom(PayloadRange.GetView());
- Offset += size;
- return true;
- };
- Sess->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
- }
- else
- {
- Sess->SetBody(AsCprBody(Payload));
- }
- return CommonResponse(Sess.Put());
+ uint64_t Offset = 0;
+ if (Payload.IsWholeFile())
+ {
+ auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) {
+ size = Min<size_t>(size, Payload.GetSize() - Offset);
+ IoBuffer PayloadRange = IoBuffer(Payload, Offset, size);
+ MutableMemoryView Data(buffer, size);
+ Data.CopyFrom(PayloadRange.GetView());
+ Offset += size;
+ return true;
+ };
+ Sess->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
+ }
+ else
+ {
+ Sess->SetBody(AsCprBody(Payload));
+ }
+ return Sess.Put();
+ },
+ m_ConnectionSettings.RetryCount));
}
HttpClient::Response
@@ -675,21 +802,26 @@ HttpClient::Upload(std::string_view Url, const CompositeBuffer& Payload, ZenCont
{
ZEN_TRACE_CPU("HttpClient::Upload");
- Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {});
- Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(ContentType))}});
-
- uint64_t SizeLeft = Payload.GetSize();
- CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0);
- auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) {
- size = Min<size_t>(size, SizeLeft);
- MutableMemoryView Data(buffer, size);
- Payload.CopyTo(Data, BufferIt);
- SizeLeft -= size;
- return true;
- };
- Sess->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
-
- return CommonResponse(Sess.Put());
+ return CommonResponse(DoWithRetry(
+ [&]() {
+ Impl::Session Sess =
+ m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
+ Sess->UpdateHeader({HeaderContentType(ContentType)});
+
+ uint64_t SizeLeft = Payload.GetSize();
+ CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0);
+ auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) {
+ size = Min<size_t>(size, SizeLeft);
+ MutableMemoryView Data(buffer, size);
+ Payload.CopyTo(Data, BufferIt);
+ SizeLeft -= size;
+ return true;
+ };
+ Sess->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
+
+ return Sess.Put();
+ },
+ m_ConnectionSettings.RetryCount));
}
HttpClient::Response
@@ -697,46 +829,170 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold
{
ZEN_TRACE_CPU("HttpClient::Download");
- Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {});
-
std::string PayloadString;
std::unique_ptr<TempPayloadFile> PayloadFile;
-
- cpr::Response Response = Sess.Download(cpr::WriteCallback{[&](std::string data, intptr_t) {
- if (!PayloadFile && (PayloadString.length() + data.length()) > (1024 * 1024))
- {
- PayloadFile = std::make_unique<TempPayloadFile>();
- std::error_code Ec = PayloadFile->Open(TempFolderPath);
- if (Ec)
- {
- ZEN_WARN("Failed to create temp file in '{}' for HttpClient::Download. Reason: {}", TempFolderPath.string(), Ec.message());
- return false;
- }
- PayloadFile->Write(PayloadString);
- PayloadString.clear();
- }
- if (PayloadFile)
- {
- std::error_code Ec = PayloadFile->Write(data);
- if (Ec)
- {
- ZEN_WARN("Failed to write to temp file in '{}' for HttpClient::Download. Reason: {}",
- TempFolderPath.string(),
- Ec.message());
- return false;
- }
- }
- else
- {
- PayloadString.append(data);
- }
- return true;
- }});
-
- if (!PayloadString.empty())
- {
- Response.text = std::move(PayloadString);
- }
+ cpr::Response Response = DoWithRetry(
+ [&]() {
+ auto DownloadCallback = [&](std::string data, intptr_t) {
+ if (!PayloadFile && (PayloadString.length() + data.length()) > (1024 * 1024))
+ {
+ PayloadFile = std::make_unique<TempPayloadFile>();
+ std::error_code Ec = PayloadFile->Open(TempFolderPath);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to create temp file in '{}' for HttpClient::Download. Reason: {}",
+ TempFolderPath.string(),
+ Ec.message());
+ return false;
+ }
+ PayloadFile->Write(PayloadString);
+ PayloadString.clear();
+ }
+ if (PayloadFile)
+ {
+ std::error_code Ec = PayloadFile->Write(data);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to write to temp file in '{}' for HttpClient::Download. Reason: {}",
+ TempFolderPath.string(),
+ Ec.message());
+ return false;
+ }
+ }
+ else
+ {
+ PayloadString.append(data);
+ }
+ return true;
+ };
+
+ cpr::Response Response;
+ {
+ Impl::Session Sess =
+ m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
+ Response = Sess.Download(cpr::WriteCallback{DownloadCallback});
+ }
+
+ if (m_ConnectionSettings.AllowResume)
+ {
+ auto SupportsRanges = [](const cpr::Response& Response) -> bool {
+ if (Response.header.find("Content-Range") != Response.header.end())
+ {
+ return true;
+ }
+ if (auto It = Response.header.find("Accept-Ranges"); It != Response.header.end())
+ {
+ return It->second == "bytes";
+ }
+ return false;
+ };
+
+ auto ShouldResume = [&SupportsRanges](const cpr::Response& Response) -> bool {
+ if (ShouldRetry(Response))
+ {
+ return SupportsRanges(Response);
+ }
+ return false;
+ };
+
+ if (ShouldResume(Response))
+ {
+ auto It = Response.header.find("Content-Length");
+ if (It != Response.header.end())
+ {
+ std::optional<int64_t> ContentLength = ParseInt<int64_t>(It->second);
+ if (ContentLength)
+ {
+ auto HeaderCallback = [&](std::string header, intptr_t) {
+ size_t DelimiterPos = header.find(':');
+ if (DelimiterPos != std::string::npos)
+ {
+ std::string Key = header.substr(0, DelimiterPos);
+ constexpr AsciiSet WhitespaceCharacters(" \v\f\t\r\n");
+ Key = AsciiSet::TrimSuffixWith(Key, WhitespaceCharacters);
+ Key = AsciiSet::TrimPrefixWith(Key, WhitespaceCharacters);
+
+ std::string Value = header.substr(DelimiterPos + 1);
+ Value = AsciiSet::TrimSuffixWith(Value, WhitespaceCharacters);
+ Value = AsciiSet::TrimPrefixWith(Value, WhitespaceCharacters);
+
+ Response.header.insert_or_assign(Key, Value);
+
+ if (Key == "Content-Range"sv)
+ {
+ if (Value.starts_with("bytes "))
+ {
+ size_t RangeStartEnd = Value.find('-', 6);
+ if (RangeStartEnd != std::string::npos)
+ {
+ const auto Start = ParseInt<uint64_t>(Value.substr(6, RangeStartEnd - 6));
+ if (Start)
+ {
+ uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length();
+ if (Start.value() == DownloadedSize)
+ {
+ return 1;
+ }
+ else if (Start.value() > DownloadedSize)
+ {
+ return 0;
+ }
+ if (PayloadFile)
+ {
+ PayloadFile->ResetWritePos(Start.value());
+ }
+ else
+ {
+ PayloadString = PayloadString.substr(0, Start.value());
+ }
+ return 1;
+ }
+ }
+ }
+ return 0;
+ }
+ }
+ return 1;
+ };
+
+ KeyValueMap HeadersWithRange(AdditionalHeader);
+ do
+ {
+ uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length();
+
+ std::string Range = fmt::format("{}-{}", DownloadedSize, ContentLength.value());
+ if (auto RangeIt = HeadersWithRange.Entries.find("Range"); RangeIt != HeadersWithRange.Entries.end())
+ {
+ if (RangeIt->second == Range)
+ {
+ // If we didn't make any progress, abort
+ break;
+ }
+ }
+ HeadersWithRange.Entries.insert_or_assign("Range", Range);
+
+ Impl::Session Sess = m_Impl->AllocSession(m_BaseUri,
+ Url,
+ m_ConnectionSettings,
+ HeadersWithRange,
+ {},
+ m_SessionId,
+ GetAccessToken());
+ Response.header.clear();
+ Response = Sess.Download(cpr::WriteCallback{DownloadCallback}, cpr::HeaderCallback{HeaderCallback});
+ } while (ShouldResume(Response));
+ }
+ }
+ }
+ }
+
+ if (!PayloadString.empty())
+ {
+ Response.text = std::move(PayloadString);
+ }
+ return Response;
+ },
+ m_ConnectionSettings.RetryCount);
return CommonResponse(std::move(Response), PayloadFile ? PayloadFile->DetachToIoBuffer() : IoBuffer{});
}