diff options
| author | Dan Engelbrecht <[email protected]> | 2024-01-24 11:41:18 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-01-24 11:41:18 +0100 |
| commit | 0e63573fbe9973f6b922656a785817a711581b78 (patch) | |
| tree | 48e18f0b4aea958a536ba50f72f589a580c4b798 /src/zenhttp/httpclient.cpp | |
| parent | oplog import/export improvements (#634) (diff) | |
| download | zen-0e63573fbe9973f6b922656a785817a711581b78.tar.xz zen-0e63573fbe9973f6b922656a785817a711581b78.zip | |
Add retry with optional resume logic to HttpClient::Download (#639)
- Improvement: Refactored Jupiter upstream to use HttpClient
- Improvement: Added retry and resume logic to HttpClient
- Improvement: Added authentication support to HttpClient
- Improvement: Clearer logging in GCV2 compact of FileCas/BlockStore
- Improvement: Size details in oplog import logging
Diffstat (limited to 'src/zenhttp/httpclient.cpp')
| -rw-r--r-- | src/zenhttp/httpclient.cpp | 530 |
1 files changed, 393 insertions, 137 deletions
diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp index a29a08a3c..3b2a3baec 100644 --- a/src/zenhttp/httpclient.cpp +++ b/src/zenhttp/httpclient.cpp @@ -13,6 +13,7 @@ #include <zencore/session.h> #include <zencore/sharedbuffer.h> #include <zencore/stream.h> +#include <zencore/string.h> #include <zencore/testing.h> #include <zencore/trace.h> #include <zenhttp/formatters.h> @@ -38,19 +39,19 @@ using namespace std::literals; // // CPR helpers -cpr::Body +static cpr::Body AsCprBody(const CbObject& Obj) { return cpr::Body((const char*)Obj.GetBuffer().GetData(), Obj.GetBuffer().GetSize()); } -cpr::Body +static cpr::Body AsCprBody(const IoBuffer& Obj) { return cpr::Body((const char*)Obj.GetData(), Obj.GetSize()); } -cpr::Body +static cpr::Body AsCprBody(const CompositeBuffer& Buffers) { SharedBuffer Buffer = Buffers.Flatten(); @@ -62,7 +63,7 @@ AsCprBody(const CompositeBuffer& Buffers) ////////////////////////////////////////////////////////////////////////// -HttpClient::Response +static HttpClient::Response ResponseWithPayload(cpr::Response& HttpResponse, const HttpResponseCode WorkResponseCode, IoBuffer&& Payload) { // This ends up doing a memcpy, would be good to get rid of it by streaming results @@ -89,7 +90,7 @@ ResponseWithPayload(cpr::Response& HttpResponse, const HttpResponseCode WorkResp .ElapsedSeconds = HttpResponse.elapsed}; } -HttpClient::Response +static HttpClient::Response CommonResponse(cpr::Response&& HttpResponse, IoBuffer&& Payload = {}) { const HttpResponseCode WorkResponseCode = HttpResponseCode(HttpResponse.status_code); @@ -123,6 +124,51 @@ CommonResponse(cpr::Response&& HttpResponse, IoBuffer&& Payload = {}) } } +static bool +ShouldRetry(const cpr::Response& Response) +{ + switch (Response.error.code) + { + case cpr::ErrorCode::OK: + break; + case cpr::ErrorCode::OPERATION_TIMEDOUT: + case cpr::ErrorCode::NETWORK_RECEIVE_ERROR: + case cpr::ErrorCode::NETWORK_SEND_FAILURE: + return true; + default: + return false; + } + switch ((HttpResponseCode)Response.status_code) + { + case HttpResponseCode::GatewayTimeout: + case HttpResponseCode::RequestTimeout: + return true; + default: + return false; + } +}; + +static cpr::Response +DoWithRetry(std::function<cpr::Response()>&& Func, uint8_t RetryCount) +{ + uint8_t Attempt = 0; + cpr::Response Result = Func(); + while (Attempt < RetryCount && ShouldRetry(Result)) + { + Sleep(100 * (Attempt + 1)); + Attempt++; + ZEN_INFO("{} Attempt {}/{}", CommonResponse(std::move(Result)).ErrorMessage("Retry"), Attempt, RetryCount + 1); + Result = Func(); + } + return Result; +} + +static std::pair<std::string, std::string> +HeaderContentType(ZenContentType ContentType) +{ + return std::make_pair("Content-Type", std::string(MapContentTypeToString(ContentType))); +} + ////////////////////////////////////////////////////////////////////////// struct HttpClient::Impl : public RefCounted @@ -144,9 +190,13 @@ struct HttpClient::Impl : public RefCounted ZEN_TRACE("GET {}", Result); return Result; } - inline cpr::Response Download(cpr::WriteCallback&& write) + inline cpr::Response Download(cpr::WriteCallback&& Write, std::optional<cpr::HeaderCallback>&& Header = {}) { - cpr::Response Result = CprSession->Download(write); + if (Header) + { + CprSession->SetHeaderCallback(std::move(Header.value())); + } + cpr::Response Result = CprSession->Download(Write); ZEN_TRACE("GET {}", Result); return Result; } @@ -185,11 +235,13 @@ struct HttpClient::Impl : public RefCounted Session& operator=(Session&&) = delete; }; - Session AllocSession(const std::string_view BaseUrl, - const std::string_view Url, - const HttpClientSettings& ConnectionSettings, - const KeyValueMap& AdditionalHeader, - const KeyValueMap& Parameters); + Session AllocSession(const std::string_view BaseUrl, + const std::string_view Url, + const HttpClientSettings& ConnectionSettings, + const KeyValueMap& AdditionalHeader, + const KeyValueMap& Parameters, + const std::string_view SessionId, + std::optional<HttpClientAccessToken> AccessToken); LoggerRef Logger() { return m_Log; } @@ -217,29 +269,26 @@ HttpClient::Impl::~Impl() } HttpClient::Impl::Session -HttpClient::Impl::AllocSession(const std::string_view BaseUrl, - const std::string_view ResourcePath, - const HttpClientSettings& ConnectionSettings, - const KeyValueMap& AdditionalHeader, - const KeyValueMap& Parameters) +HttpClient::Impl::AllocSession(const std::string_view BaseUrl, + const std::string_view ResourcePath, + const HttpClientSettings& ConnectionSettings, + const KeyValueMap& AdditionalHeader, + const KeyValueMap& Parameters, + const std::string_view SessionId, + std::optional<HttpClientAccessToken> AccessToken) { - bool IsNew = false; cpr::Session* CprSession = nullptr; m_SessionLock.WithExclusiveLock([&] { - if (m_Sessions.empty()) - { - CprSession = new cpr::Session(); - IsNew = true; - } - else + if (!m_Sessions.empty()) { CprSession = m_Sessions.back(); m_Sessions.pop_back(); } }); - if (IsNew) + if (CprSession == nullptr) { + CprSession = new cpr::Session(); CprSession->SetConnectTimeout(ConnectionSettings.ConnectTimeout); CprSession->SetTimeout(ConnectionSettings.Timeout); if (ConnectionSettings.AssumeHttp2) @@ -252,9 +301,13 @@ HttpClient::Impl::AllocSession(const std::string_view BaseUrl, { CprSession->SetHeader(cpr::Header(AdditionalHeader->begin(), AdditionalHeader->end())); } - else + if (!SessionId.empty()) { - CprSession->SetHeader({}); + CprSession->UpdateHeader({{"UE-Session", std::string(SessionId)}}); + } + if (AccessToken) + { + CprSession->UpdateHeader({{"Authorization", AccessToken->Value}}); } if (!Parameters->empty()) { @@ -430,6 +483,9 @@ public: return Buffer; } + uint64_t GetSize() const { return m_WriteOffset; } + void ResetWritePos(uint64_t WriteOffset) { m_WriteOffset = WriteOffset; } + private: void* m_FileHandle; std::uint64_t m_WriteOffset; @@ -452,12 +508,46 @@ HttpClient::~HttpClient() { } +bool +HttpClient::Authenticate() +{ + std::optional<HttpClientAccessToken> Token = GetAccessToken(); + if (!Token) + { + return false; + } + return Token->IsValid(); +} + +const std::optional<HttpClientAccessToken> +HttpClient::GetAccessToken() +{ + if (!m_ConnectionSettings.AccessTokenProvider.has_value()) + { + return {}; + } + { + RwLock::SharedLockScope _(m_AccessTokenLock); + if (m_CachedAccessToken.IsValid()) + { + return m_CachedAccessToken; + } + } + RwLock::ExclusiveLockScope _(m_AccessTokenLock); + if (m_CachedAccessToken.IsValid()) + { + return m_CachedAccessToken; + } + m_CachedAccessToken = m_ConnectionSettings.AccessTokenProvider.value()(); + return m_CachedAccessToken; +} + HttpClient::Response HttpClient::TransactPackage(std::string_view Url, CbPackage Package, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("HttpClient::TransactPackage"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); + Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); // First, list of offered chunks for filtering on the server end @@ -482,7 +572,7 @@ HttpClient::TransactPackage(std::string_view Url, CbPackage Package, const KeyVa BinaryWriter MemWriter; Writer.Save(MemWriter); - Sess->UpdateHeader({{"Content-Type", "application/x-ue-offer"}, {"UE-Session", m_SessionId}, {"UE-Request", RequestIdString}}); + Sess->UpdateHeader({HeaderContentType(HttpContentType::kCbPackageOffer), {"UE-Request", RequestIdString}}); Sess->SetBody(cpr::Body{(const char*)MemWriter.Data(), MemWriter.Size()}); cpr::Response FilterResponse = Sess.Post(); @@ -524,7 +614,7 @@ HttpClient::TransactPackage(std::string_view Url, CbPackage Package, const KeyVa CompositeBuffer Message = FormatPackageMessageBuffer(SendPackage); SharedBuffer FlatMessage = Message.Flatten(); - Sess->UpdateHeader({{"Content-Type", "application/x-ue-cbpkg"}, {"UE-Session", m_SessionId}, {"UE-Request", RequestIdString}}); + Sess->UpdateHeader({HeaderContentType(HttpContentType::kCbPackage), {"UE-Request", RequestIdString}}); Sess->SetBody(cpr::Body{(const char*)FlatMessage.GetData(), FlatMessage.GetSize()}); cpr::Response FilterResponse = Sess.Post(); @@ -556,20 +646,28 @@ HttpClient::Put(std::string_view Url, const IoBuffer& Payload, const KeyValueMap { ZEN_TRACE_CPU("HttpClient::Put"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); - Sess->SetBody(AsCprBody(Payload)); - Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(Payload.GetContentType()))}}); - - return CommonResponse(Sess.Put()); + return CommonResponse(DoWithRetry( + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + Sess->SetBody(AsCprBody(Payload)); + Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())}); + return Sess.Put(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response HttpClient::Get(std::string_view Url, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters) { ZEN_TRACE_CPU("HttpClient::Get"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters); - - return CommonResponse(Sess.Get()); + return CommonResponse(DoWithRetry( + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters, m_SessionId, GetAccessToken()); + return Sess.Get(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -577,9 +675,13 @@ HttpClient::Head(std::string_view Url, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("HttpClient::Head"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); - - return CommonResponse(Sess.Head()); + return CommonResponse(DoWithRetry( + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + return Sess.Head(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -587,9 +689,13 @@ HttpClient::Delete(std::string_view Url, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("HttpClient::Delete"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); - - return CommonResponse(Sess.Delete()); + return CommonResponse(DoWithRetry( + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + return Sess.Delete(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -597,9 +703,13 @@ HttpClient::Post(std::string_view Url, const KeyValueMap& AdditionalHeader, cons { ZEN_TRACE_CPU("HttpClient::PostNoPayload"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters); - - return CommonResponse(Sess.Post()); + return CommonResponse(DoWithRetry( + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters, m_SessionId, GetAccessToken()); + return Sess.Post(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -607,12 +717,16 @@ HttpClient::Post(std::string_view Url, const IoBuffer& Payload, const KeyValueMa { ZEN_TRACE_CPU("HttpClient::PostWithPayload"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); + return CommonResponse(DoWithRetry( + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); - Sess->SetBody(AsCprBody(Payload)); - Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(Payload.GetContentType()))}}); - - return CommonResponse(Sess.Post()); + Sess->SetBody(AsCprBody(Payload)); + Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())}); + return Sess.Post(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -620,12 +734,16 @@ HttpClient::Post(std::string_view Url, CbObject Payload, const KeyValueMap& Addi { ZEN_TRACE_CPU("HttpClient::PostObjectPayload"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); - - Sess->SetBody(AsCprBody(Payload)); - Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(ZenContentType::kCbObject))}}); + return CommonResponse(DoWithRetry( + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); - return CommonResponse(Sess.Post()); + Sess->SetBody(AsCprBody(Payload)); + Sess->UpdateHeader({HeaderContentType(ZenContentType::kCbObject)}); + return Sess.Post(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -633,13 +751,17 @@ HttpClient::Post(std::string_view Url, CbPackage Pkg, const KeyValueMap& Additio { ZEN_TRACE_CPU("HttpClient::PostPackage"); - CompositeBuffer Message = zen::FormatPackageMessageBuffer(Pkg); - - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); - Sess->SetBody(AsCprBody(Message)); - Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(ZenContentType::kCbPackage))}}); - - return CommonResponse(Sess.Post()); + return CommonResponse(DoWithRetry( + [&]() { + CompositeBuffer Message = zen::FormatPackageMessageBuffer(Pkg); + + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + Sess->SetBody(AsCprBody(Message)); + Sess->UpdateHeader({HeaderContentType(ZenContentType::kCbPackage)}); + return Sess.Post(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -647,27 +769,32 @@ HttpClient::Upload(std::string_view Url, const IoBuffer& Payload, const KeyValue { ZEN_TRACE_CPU("HttpClient::Upload"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); - Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(Payload.GetContentType()))}}); + return CommonResponse(DoWithRetry( + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())}); - uint64_t Offset = 0; - if (Payload.IsWholeFile()) - { - auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, Payload.GetSize() - Offset); - IoBuffer PayloadRange = IoBuffer(Payload, Offset, size); - MutableMemoryView Data(buffer, size); - Data.CopyFrom(PayloadRange.GetView()); - Offset += size; - return true; - }; - Sess->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); - } - else - { - Sess->SetBody(AsCprBody(Payload)); - } - return CommonResponse(Sess.Put()); + uint64_t Offset = 0; + if (Payload.IsWholeFile()) + { + auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) { + size = Min<size_t>(size, Payload.GetSize() - Offset); + IoBuffer PayloadRange = IoBuffer(Payload, Offset, size); + MutableMemoryView Data(buffer, size); + Data.CopyFrom(PayloadRange.GetView()); + Offset += size; + return true; + }; + Sess->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); + } + else + { + Sess->SetBody(AsCprBody(Payload)); + } + return Sess.Put(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -675,21 +802,26 @@ HttpClient::Upload(std::string_view Url, const CompositeBuffer& Payload, ZenCont { ZEN_TRACE_CPU("HttpClient::Upload"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); - Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(ContentType))}}); - - uint64_t SizeLeft = Payload.GetSize(); - CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); - auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, SizeLeft); - MutableMemoryView Data(buffer, size); - Payload.CopyTo(Data, BufferIt); - SizeLeft -= size; - return true; - }; - Sess->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); - - return CommonResponse(Sess.Put()); + return CommonResponse(DoWithRetry( + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + Sess->UpdateHeader({HeaderContentType(ContentType)}); + + uint64_t SizeLeft = Payload.GetSize(); + CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); + auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { + size = Min<size_t>(size, SizeLeft); + MutableMemoryView Data(buffer, size); + Payload.CopyTo(Data, BufferIt); + SizeLeft -= size; + return true; + }; + Sess->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); + + return Sess.Put(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -697,46 +829,170 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold { ZEN_TRACE_CPU("HttpClient::Download"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); - std::string PayloadString; std::unique_ptr<TempPayloadFile> PayloadFile; - - cpr::Response Response = Sess.Download(cpr::WriteCallback{[&](std::string data, intptr_t) { - if (!PayloadFile && (PayloadString.length() + data.length()) > (1024 * 1024)) - { - PayloadFile = std::make_unique<TempPayloadFile>(); - std::error_code Ec = PayloadFile->Open(TempFolderPath); - if (Ec) - { - ZEN_WARN("Failed to create temp file in '{}' for HttpClient::Download. Reason: {}", TempFolderPath.string(), Ec.message()); - return false; - } - PayloadFile->Write(PayloadString); - PayloadString.clear(); - } - if (PayloadFile) - { - std::error_code Ec = PayloadFile->Write(data); - if (Ec) - { - ZEN_WARN("Failed to write to temp file in '{}' for HttpClient::Download. Reason: {}", - TempFolderPath.string(), - Ec.message()); - return false; - } - } - else - { - PayloadString.append(data); - } - return true; - }}); - - if (!PayloadString.empty()) - { - Response.text = std::move(PayloadString); - } + cpr::Response Response = DoWithRetry( + [&]() { + auto DownloadCallback = [&](std::string data, intptr_t) { + if (!PayloadFile && (PayloadString.length() + data.length()) > (1024 * 1024)) + { + PayloadFile = std::make_unique<TempPayloadFile>(); + std::error_code Ec = PayloadFile->Open(TempFolderPath); + if (Ec) + { + ZEN_WARN("Failed to create temp file in '{}' for HttpClient::Download. Reason: {}", + TempFolderPath.string(), + Ec.message()); + return false; + } + PayloadFile->Write(PayloadString); + PayloadString.clear(); + } + if (PayloadFile) + { + std::error_code Ec = PayloadFile->Write(data); + if (Ec) + { + ZEN_WARN("Failed to write to temp file in '{}' for HttpClient::Download. Reason: {}", + TempFolderPath.string(), + Ec.message()); + return false; + } + } + else + { + PayloadString.append(data); + } + return true; + }; + + cpr::Response Response; + { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + Response = Sess.Download(cpr::WriteCallback{DownloadCallback}); + } + + if (m_ConnectionSettings.AllowResume) + { + auto SupportsRanges = [](const cpr::Response& Response) -> bool { + if (Response.header.find("Content-Range") != Response.header.end()) + { + return true; + } + if (auto It = Response.header.find("Accept-Ranges"); It != Response.header.end()) + { + return It->second == "bytes"; + } + return false; + }; + + auto ShouldResume = [&SupportsRanges](const cpr::Response& Response) -> bool { + if (ShouldRetry(Response)) + { + return SupportsRanges(Response); + } + return false; + }; + + if (ShouldResume(Response)) + { + auto It = Response.header.find("Content-Length"); + if (It != Response.header.end()) + { + std::optional<int64_t> ContentLength = ParseInt<int64_t>(It->second); + if (ContentLength) + { + auto HeaderCallback = [&](std::string header, intptr_t) { + size_t DelimiterPos = header.find(':'); + if (DelimiterPos != std::string::npos) + { + std::string Key = header.substr(0, DelimiterPos); + constexpr AsciiSet WhitespaceCharacters(" \v\f\t\r\n"); + Key = AsciiSet::TrimSuffixWith(Key, WhitespaceCharacters); + Key = AsciiSet::TrimPrefixWith(Key, WhitespaceCharacters); + + std::string Value = header.substr(DelimiterPos + 1); + Value = AsciiSet::TrimSuffixWith(Value, WhitespaceCharacters); + Value = AsciiSet::TrimPrefixWith(Value, WhitespaceCharacters); + + Response.header.insert_or_assign(Key, Value); + + if (Key == "Content-Range"sv) + { + if (Value.starts_with("bytes ")) + { + size_t RangeStartEnd = Value.find('-', 6); + if (RangeStartEnd != std::string::npos) + { + const auto Start = ParseInt<uint64_t>(Value.substr(6, RangeStartEnd - 6)); + if (Start) + { + uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length(); + if (Start.value() == DownloadedSize) + { + return 1; + } + else if (Start.value() > DownloadedSize) + { + return 0; + } + if (PayloadFile) + { + PayloadFile->ResetWritePos(Start.value()); + } + else + { + PayloadString = PayloadString.substr(0, Start.value()); + } + return 1; + } + } + } + return 0; + } + } + return 1; + }; + + KeyValueMap HeadersWithRange(AdditionalHeader); + do + { + uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length(); + + std::string Range = fmt::format("{}-{}", DownloadedSize, ContentLength.value()); + if (auto RangeIt = HeadersWithRange.Entries.find("Range"); RangeIt != HeadersWithRange.Entries.end()) + { + if (RangeIt->second == Range) + { + // If we didn't make any progress, abort + break; + } + } + HeadersWithRange.Entries.insert_or_assign("Range", Range); + + Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, + Url, + m_ConnectionSettings, + HeadersWithRange, + {}, + m_SessionId, + GetAccessToken()); + Response.header.clear(); + Response = Sess.Download(cpr::WriteCallback{DownloadCallback}, cpr::HeaderCallback{HeaderCallback}); + } while (ShouldResume(Response)); + } + } + } + } + + if (!PayloadString.empty()) + { + Response.text = std::move(PayloadString); + } + return Response; + }, + m_ConnectionSettings.RetryCount); return CommonResponse(std::move(Response), PayloadFile ? PayloadFile->DetachToIoBuffer() : IoBuffer{}); } |