// Copyright Epic Games, Inc. All Rights Reserved. #include "httpclientcurl.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace zen { HttpClientBase* CreateCurlHttpClient(std::string_view BaseUri, const HttpClientSettings& ConnectionSettings, std::function&& CheckIfAbortFunction) { return new CurlHttpClient(BaseUri, ConnectionSettings, std::move(CheckIfAbortFunction)); } static std::atomic CurlHttpClientRequestIdCounter{0}; ////////////////////////////////////////////////////////////////////////// static HttpClientErrorCode MapCurlError(CURLcode Code) { switch (Code) { case CURLE_OK: return HttpClientErrorCode::kOK; case CURLE_COULDNT_CONNECT: return HttpClientErrorCode::kConnectionFailure; case CURLE_COULDNT_RESOLVE_HOST: return HttpClientErrorCode::kHostResolutionFailure; case CURLE_COULDNT_RESOLVE_PROXY: return HttpClientErrorCode::kProxyResolutionFailure; case CURLE_RECV_ERROR: return HttpClientErrorCode::kNetworkReceiveError; case CURLE_SEND_ERROR: return HttpClientErrorCode::kNetworkSendFailure; case CURLE_OPERATION_TIMEDOUT: return HttpClientErrorCode::kOperationTimedOut; case CURLE_SSL_CONNECT_ERROR: return HttpClientErrorCode::kSSLConnectError; case CURLE_SSL_CERTPROBLEM: return HttpClientErrorCode::kSSLCertificateError; case CURLE_PEER_FAILED_VERIFICATION: return HttpClientErrorCode::kSSLCACertError; case CURLE_SSL_CIPHER: case CURLE_SSL_ENGINE_NOTFOUND: case CURLE_SSL_ENGINE_SETFAILED: return HttpClientErrorCode::kGenericSSLError; case CURLE_ABORTED_BY_CALLBACK: return HttpClientErrorCode::kRequestCancelled; default: return HttpClientErrorCode::kOtherError; } } ////////////////////////////////////////////////////////////////////////// // // Curl callback helpers struct WriteCallbackData { std::string* Body = nullptr; std::function* CheckIfAbortFunction = nullptr; }; static size_t CurlWriteCallback(char* Ptr, size_t Size, size_t Nmemb, void* UserData) { auto* Data = static_cast(UserData); size_t TotalBytes = Size * Nmemb; if (Data->CheckIfAbortFunction && *Data->CheckIfAbortFunction && (*Data->CheckIfAbortFunction)()) { return 0; // Signal abort to curl } Data->Body->append(Ptr, TotalBytes); return TotalBytes; } struct HeaderCallbackData { std::vector>* Headers = nullptr; }; // Trims trailing CRLF, splits on the first colon, and trims whitespace from key and value. // Returns nullopt for blank lines or lines without a colon (e.g. HTTP status lines). static std::optional> ParseHeaderLine(std::string_view Line) { while (!Line.empty() && (Line.back() == '\r' || Line.back() == '\n')) { Line.remove_suffix(1); } if (Line.empty()) { return std::nullopt; } size_t ColonPos = Line.find(':'); if (ColonPos == std::string_view::npos) { return std::nullopt; } std::string_view Key = Line.substr(0, ColonPos); std::string_view Value = Line.substr(ColonPos + 1); while (!Key.empty() && Key.back() == ' ') { Key.remove_suffix(1); } while (!Value.empty() && Value.front() == ' ') { Value.remove_prefix(1); } return std::pair{Key, Value}; } static size_t CurlHeaderCallback(char* Buffer, size_t Size, size_t Nmemb, void* UserData) { auto* Data = static_cast(UserData); size_t TotalBytes = Size * Nmemb; if (auto Header = ParseHeaderLine(std::string_view(Buffer, TotalBytes))) { auto& [Key, Value] = *Header; Data->Headers->emplace_back(std::string(Key), std::string(Value)); } return TotalBytes; } struct ReadCallbackData { const uint8_t* DataPtr = nullptr; size_t DataSize = 0; size_t Offset = 0; std::function* CheckIfAbortFunction = nullptr; }; static size_t CurlReadCallback(char* Buffer, size_t Size, size_t Nmemb, void* UserData) { auto* Data = static_cast(UserData); size_t MaxRead = Size * Nmemb; if (Data->CheckIfAbortFunction && *Data->CheckIfAbortFunction && (*Data->CheckIfAbortFunction)()) { return CURL_READFUNC_ABORT; } size_t Remaining = Data->DataSize - Data->Offset; size_t ToRead = std::min(MaxRead, Remaining); if (ToRead > 0) { memcpy(Buffer, Data->DataPtr + Data->Offset, ToRead); Data->Offset += ToRead; } return ToRead; } struct StreamReadCallbackData { detail::CompositeBufferReadStream* Reader = nullptr; std::function* CheckIfAbortFunction = nullptr; }; static size_t CurlStreamReadCallback(char* Buffer, size_t Size, size_t Nmemb, void* UserData) { auto* Data = static_cast(UserData); size_t MaxRead = Size * Nmemb; if (Data->CheckIfAbortFunction && *Data->CheckIfAbortFunction && (*Data->CheckIfAbortFunction)()) { return CURL_READFUNC_ABORT; } return Data->Reader->Read(Buffer, MaxRead); } struct FileReadCallbackData { detail::BufferedReadFileStream* Buffer = nullptr; uint64_t TotalSize = 0; uint64_t Offset = 0; std::function* CheckIfAbortFunction = nullptr; }; static size_t CurlFileReadCallback(char* Buffer, size_t Size, size_t Nmemb, void* UserData) { auto* Data = static_cast(UserData); size_t MaxRead = Size * Nmemb; if (Data->CheckIfAbortFunction && *Data->CheckIfAbortFunction && (*Data->CheckIfAbortFunction)()) { return CURL_READFUNC_ABORT; } size_t Remaining = Data->TotalSize - Data->Offset; size_t ToRead = std::min(MaxRead, Remaining); if (ToRead > 0) { Data->Buffer->Read(Buffer, ToRead); Data->Offset += ToRead; } return ToRead; } static int CurlDebugCallback(CURL* Handle, curl_infotype Type, char* Data, size_t Size, void* UserPtr) { ZEN_UNUSED(Handle); LoggerRef LogRef = *static_cast(UserPtr); auto Log = [&]() -> LoggerRef { return LogRef; }; std::string_view DataView(Data, Size); // Trim trailing newlines while (!DataView.empty() && (DataView.back() == '\r' || DataView.back() == '\n')) { DataView.remove_suffix(1); } switch (Type) { case CURLINFO_TEXT: if (DataView.find("need more data"sv) == std::string_view::npos) { ZEN_INFO("TEXT: {}", DataView); } break; case CURLINFO_HEADER_IN: ZEN_INFO("HIN : {}", DataView); break; case CURLINFO_HEADER_OUT: if (auto TokenPos = DataView.find("Authorization: Bearer "sv); TokenPos != std::string_view::npos) { std::string Copy(DataView); auto BearerStart = TokenPos + 22; auto BearerEnd = Copy.find_first_of("\r\n", BearerStart); if (BearerEnd == std::string::npos) { BearerEnd = Copy.length(); } Copy.replace(Copy.begin() + BearerStart, Copy.begin() + BearerEnd, fmt::format("[{} char token]", BearerEnd - BearerStart)); ZEN_INFO("HOUT: {}", Copy); } else { ZEN_INFO("HOUT: {}", DataView); } break; default: break; } return 0; } ////////////////////////////////////////////////////////////////////////// static std::pair HeaderContentType(ZenContentType ContentType) { return std::make_pair("Content-Type", std::string(MapContentTypeToString(ContentType))); } static curl_slist* BuildHeaderList(const HttpClient::KeyValueMap& AdditionalHeader, std::string_view SessionId, const std::optional& AccessToken, const std::vector>& ExtraHeaders = {}) { curl_slist* Headers = nullptr; for (const auto& [Key, Value] : *AdditionalHeader) { ExtendableStringBuilder<64> HeaderLine; HeaderLine << Key << ": " << Value; Headers = curl_slist_append(Headers, HeaderLine.c_str()); } if (!SessionId.empty()) { ExtendableStringBuilder<64> SessionHeader; SessionHeader << "UE-Session: " << SessionId; Headers = curl_slist_append(Headers, SessionHeader.c_str()); } if (AccessToken) { ExtendableStringBuilder<128> AuthHeader; AuthHeader << "Authorization: " << AccessToken->Value; Headers = curl_slist_append(Headers, AuthHeader.c_str()); } for (const auto& [Key, Value] : ExtraHeaders) { ExtendableStringBuilder<128> HeaderLine; HeaderLine << Key << ": " << Value; Headers = curl_slist_append(Headers, HeaderLine.c_str()); } return Headers; } static HttpClient::KeyValueMap BuildHeaderMap(const std::vector>& Headers) { HttpClient::KeyValueMap HeaderMap; for (const auto& [Key, Value] : Headers) { HeaderMap->insert_or_assign(Key, Value); } return HeaderMap; } // Scans response headers for Content-Type and applies it to the buffer. static void ApplyContentTypeFromHeaders(IoBuffer& Buffer, const std::vector>& Headers) { for (const auto& [Key, Value] : Headers) { if (StrCaseCompare(Key, "Content-Type") == 0) { Buffer.SetContentType(ParseContentType(Value)); break; } } } static void AppendUrlEncoded(StringBuilderBase& Out, std::string_view Input) { static constexpr char HexDigits[] = "0123456789ABCDEF"; static constexpr AsciiSet Unreserved("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~"); for (char C : Input) { if (Unreserved.Contains(C)) { Out.Append(C); } else { uint8_t Byte = static_cast(C); char Encoded[3] = {'%', HexDigits[Byte >> 4], HexDigits[Byte & 0x0F]}; Out.Append(std::string_view(Encoded, 3)); } } } static void BuildUrlWithParameters(StringBuilderBase& Url, std::string_view BaseUrl, std::string_view ResourcePath, const HttpClient::KeyValueMap& Parameters) { Url.Append(BaseUrl); Url.Append(ResourcePath); if (!Parameters->empty()) { char Separator = '?'; for (const auto& [Key, Value] : *Parameters) { Url.Append(Separator); AppendUrlEncoded(Url, Key); Url.Append('='); AppendUrlEncoded(Url, Value); Separator = '&'; } } } ////////////////////////////////////////////////////////////////////////// CurlHttpClient::CurlHttpClient(std::string_view BaseUri, const HttpClientSettings& ConnectionSettings, std::function&& CheckIfAbortFunction) : HttpClientBase(BaseUri, ConnectionSettings, std::move(CheckIfAbortFunction)) { } CurlHttpClient::~CurlHttpClient() { ZEN_TRACE_CPU("CurlHttpClient::~CurlHttpClient"); m_SessionLock.WithExclusiveLock([&] { for (auto* Handle : m_Sessions) { curl_easy_cleanup(Handle); } m_Sessions.clear(); }); } CurlHttpClient::Session::~Session() { if (HeaderList) { curl_slist_free_all(HeaderList); } Outer->ReleaseSession(Handle); } void CurlHttpClient::Session::SetHeaders(curl_slist* Headers) { if (HeaderList) { curl_slist_free_all(HeaderList); } HeaderList = Headers; curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, HeaderList); } CurlHttpClient::CurlResult CurlHttpClient::Session::PerformWithResponseCallbacks() { std::string Body; WriteCallbackData WriteData{.Body = &Body, .CheckIfAbortFunction = Outer->m_CheckIfAbortFunction ? &Outer->m_CheckIfAbortFunction : nullptr}; HeaderCallbackData HdrData{}; std::vector> ResponseHeaders; HdrData.Headers = &ResponseHeaders; curl_easy_setopt(Handle, CURLOPT_WRITEFUNCTION, CurlWriteCallback); curl_easy_setopt(Handle, CURLOPT_WRITEDATA, &WriteData); curl_easy_setopt(Handle, CURLOPT_HEADERFUNCTION, CurlHeaderCallback); curl_easy_setopt(Handle, CURLOPT_HEADERDATA, &HdrData); CurlResult Result = Perform(); Result.Body = std::move(Body); Result.Headers = std::move(ResponseHeaders); return Result; } CurlHttpClient::CurlResult CurlHttpClient::Session::Perform() { CurlResult Result; char ErrorBuffer[CURL_ERROR_SIZE] = {}; curl_easy_setopt(Handle, CURLOPT_ERRORBUFFER, ErrorBuffer); Result.ErrorCode = curl_easy_perform(Handle); if (Result.ErrorCode != CURLE_OK) { Result.ErrorMessage = ErrorBuffer[0] ? std::string(ErrorBuffer) : curl_easy_strerror(Result.ErrorCode); } curl_easy_getinfo(Handle, CURLINFO_RESPONSE_CODE, &Result.StatusCode); double Elapsed = 0; curl_easy_getinfo(Handle, CURLINFO_TOTAL_TIME, &Elapsed); Result.ElapsedSeconds = Elapsed; curl_off_t UpBytes = 0; curl_easy_getinfo(Handle, CURLINFO_SIZE_UPLOAD_T, &UpBytes); Result.UploadedBytes = static_cast(UpBytes); curl_off_t DownBytes = 0; curl_easy_getinfo(Handle, CURLINFO_SIZE_DOWNLOAD_T, &DownBytes); Result.DownloadedBytes = static_cast(DownBytes); return Result; } bool CurlHttpClient::ShouldLogErrorCode(HttpResponseCode ResponseCode) const { if (m_CheckIfAbortFunction && m_CheckIfAbortFunction()) { return false; } const auto& Expected = m_ConnectionSettings.ExpectedErrorCodes; return std::find(Expected.begin(), Expected.end(), ResponseCode) == Expected.end(); } HttpClient::Response CurlHttpClient::ResponseWithPayload(std::string_view SessionId, CurlResult&& Result, const HttpResponseCode WorkResponseCode, IoBuffer&& Payload, std::vector&& BoundaryPositions) { IoBuffer ResponseBuffer = Payload ? std::move(Payload) : IoBuffer(IoBuffer::Clone, Result.Body.data(), Result.Body.size()); ApplyContentTypeFromHeaders(ResponseBuffer, Result.Headers); if (!IsHttpSuccessCode(WorkResponseCode) && WorkResponseCode != HttpResponseCode::NotFound) { if (ShouldLogErrorCode(WorkResponseCode)) { ZEN_WARN("HttpClient request failed (session: {}): status={}, url={}", SessionId, static_cast(WorkResponseCode), m_BaseUri); } } std::sort(BoundaryPositions.begin(), BoundaryPositions.end(), [](const HttpClient::Response::MultipartBoundary& Lhs, const HttpClient::Response::MultipartBoundary& Rhs) { return Lhs.RangeOffset < Rhs.RangeOffset; }); return HttpClient::Response{.StatusCode = WorkResponseCode, .ResponsePayload = std::move(ResponseBuffer), .Header = BuildHeaderMap(Result.Headers), .UploadedBytes = Result.UploadedBytes, .DownloadedBytes = Result.DownloadedBytes, .ElapsedSeconds = Result.ElapsedSeconds, .Ranges = std::move(BoundaryPositions)}; } HttpClient::Response CurlHttpClient::CommonResponse(std::string_view SessionId, CurlResult&& Result, IoBuffer&& Payload, std::vector&& BoundaryPositions) { const HttpResponseCode WorkResponseCode = HttpResponseCode(Result.StatusCode); if (Result.ErrorCode != CURLE_OK) { const bool Quiet = m_CheckIfAbortFunction && m_CheckIfAbortFunction(); if (!Quiet) { if (Result.ErrorCode != CURLE_OPERATION_TIMEDOUT && Result.ErrorCode != CURLE_COULDNT_CONNECT && Result.ErrorCode != CURLE_ABORTED_BY_CALLBACK) { ZEN_WARN("HttpClient client failure (session: {}): ({}) '{}'", SessionId, static_cast(Result.ErrorCode), Result.ErrorMessage); } } return HttpClient::Response{ .StatusCode = WorkResponseCode, .ResponsePayload = IoBufferBuilder::MakeCloneFromMemory(Result.Body.data(), Result.Body.size()), .Header = BuildHeaderMap(Result.Headers), .UploadedBytes = Result.UploadedBytes, .DownloadedBytes = Result.DownloadedBytes, .ElapsedSeconds = Result.ElapsedSeconds, .Error = HttpClient::ErrorContext{.ErrorCode = MapCurlError(Result.ErrorCode), .ErrorMessage = Result.ErrorMessage}}; } if (WorkResponseCode == HttpResponseCode::NoContent || (Result.Body.empty() && !Payload)) { return HttpClient::Response{.StatusCode = WorkResponseCode, .Header = BuildHeaderMap(Result.Headers), .UploadedBytes = Result.UploadedBytes, .DownloadedBytes = Result.DownloadedBytes, .ElapsedSeconds = Result.ElapsedSeconds}; } else { return ResponseWithPayload(SessionId, std::move(Result), WorkResponseCode, std::move(Payload), std::move(BoundaryPositions)); } } bool CurlHttpClient::ValidatePayload(CurlResult& Result, std::unique_ptr& PayloadFile) { ZEN_TRACE_CPU("ValidatePayload"); IoBuffer ResponseBuffer = (Result.Body.empty() && PayloadFile) ? PayloadFile->BorrowIoBuffer() : IoBuffer(IoBuffer::Wrap, Result.Body.data(), Result.Body.size()); // Collect relevant headers in a single pass std::string_view ContentLengthValue; std::string_view IoHashValue; std::string_view ContentTypeValue; for (const auto& [Key, Value] : Result.Headers) { if (ContentLengthValue.empty() && StrCaseCompare(Key, "Content-Length") == 0) { ContentLengthValue = Value; } else if (IoHashValue.empty() && StrCaseCompare(Key, "X-Jupiter-IoHash") == 0) { IoHashValue = Value; } else if (ContentTypeValue.empty() && StrCaseCompare(Key, "Content-Type") == 0) { ContentTypeValue = Value; } } // Validate Content-Length if (!ContentLengthValue.empty()) { std::optional ExpectedContentSize = ParseInt(ContentLengthValue); if (!ExpectedContentSize.has_value()) { Result.ErrorCode = CURLE_RECV_ERROR; Result.ErrorMessage = fmt::format("Can not parse Content-Length header. Value: '{}'", ContentLengthValue); return false; } if (ExpectedContentSize.value() != ResponseBuffer.GetSize()) { Result.ErrorCode = CURLE_RECV_ERROR; Result.ErrorMessage = fmt::format("Payload size {} does not match Content-Length {}", ResponseBuffer.GetSize(), ContentLengthValue); return false; } } if (Result.StatusCode == static_cast(HttpResponseCode::PartialContent)) { return true; } // Validate X-Jupiter-IoHash if (!IoHashValue.empty()) { IoHash ExpectedPayloadHash; if (IoHash::TryParse(IoHashValue, ExpectedPayloadHash)) { IoHash PayloadHash = IoHash::HashBuffer(ResponseBuffer); if (PayloadHash != ExpectedPayloadHash) { Result.ErrorCode = CURLE_RECV_ERROR; Result.ErrorMessage = fmt::format("Payload hash {} does not match X-Jupiter-IoHash {}", PayloadHash.ToHexString(), ExpectedPayloadHash.ToHexString()); return false; } } } // Validate content-type specific payload if (ContentTypeValue == "application/x-ue-comp") { IoHash RawHash; uint64_t RawSize; if (CompressedBuffer::ValidateCompressedHeader(ResponseBuffer, RawHash, RawSize, /*OutOptionalTotalCompressedSize*/ nullptr)) { return true; } else { Result.ErrorCode = CURLE_RECV_ERROR; Result.ErrorMessage = "Compressed binary failed validation"; return false; } } if (ContentTypeValue == "application/x-ue-cb") { if (CbValidateError Error = ValidateCompactBinary(ResponseBuffer.GetView(), CbValidateMode::Default); Error == CbValidateError::None) { return true; } else { Result.ErrorCode = CURLE_RECV_ERROR; Result.ErrorMessage = fmt::format("Compact binary failed validation: {}", ToString(Error)); return false; } } return true; } bool CurlHttpClient::ShouldRetry(const CurlResult& Result) { switch (Result.ErrorCode) { case CURLE_OK: break; case CURLE_RECV_ERROR: case CURLE_SEND_ERROR: case CURLE_OPERATION_TIMEDOUT: return true; default: return false; } switch (static_cast(Result.StatusCode)) { case HttpResponseCode::RequestTimeout: case HttpResponseCode::TooManyRequests: case HttpResponseCode::InternalServerError: case HttpResponseCode::BadGateway: case HttpResponseCode::ServiceUnavailable: case HttpResponseCode::GatewayTimeout: return true; default: return false; } } CurlHttpClient::CurlResult CurlHttpClient::DoWithRetry(std::string_view SessionId, std::function&& Func, std::function&& Validate) { uint8_t Attempt = 0; CurlResult Result = Func(); while (Attempt < m_ConnectionSettings.RetryCount) { if (m_CheckIfAbortFunction && m_CheckIfAbortFunction()) { return Result; } if (!ShouldRetry(Result)) { if (Result.ErrorCode != CURLE_OK || !IsHttpSuccessCode(Result.StatusCode)) { break; } if (Validate(Result)) { break; } } Sleep(100 * (Attempt + 1)); Attempt++; if (ShouldLogErrorCode(HttpResponseCode(Result.StatusCode))) { if (Result.ErrorCode != CURLE_OK) { ZEN_INFO("Retry (session: {}): HTTP error ({}) '{}' Attempt {}/{}", SessionId, static_cast(MapCurlError(Result.ErrorCode)), Result.ErrorMessage, Attempt, m_ConnectionSettings.RetryCount + 1); } else { ZEN_INFO("Retry (session: {}): HTTP status ({}) '{}' Attempt {}/{}", SessionId, Result.StatusCode, zen::ToString(HttpResponseCode(Result.StatusCode)), Attempt, m_ConnectionSettings.RetryCount + 1); } } Result = Func(); } return Result; } CurlHttpClient::CurlResult CurlHttpClient::DoWithRetry(std::string_view SessionId, std::function&& Func, std::unique_ptr& PayloadFile) { return DoWithRetry(SessionId, std::move(Func), [&](CurlResult& Result) { return ValidatePayload(Result, PayloadFile); }); } ////////////////////////////////////////////////////////////////////////// CurlHttpClient::Session CurlHttpClient::AllocSession(std::string_view ResourcePath, const KeyValueMap& Parameters) { ZEN_TRACE_CPU("CurlHttpClient::AllocSession"); CURL* Handle = nullptr; m_SessionLock.WithExclusiveLock([&] { if (!m_Sessions.empty()) { Handle = m_Sessions.back(); m_Sessions.pop_back(); } }); if (Handle == nullptr) { Handle = curl_easy_init(); if (Handle == nullptr) { ThrowOutOfMemory("curl_easy_init"); } } else { curl_easy_reset(Handle); } // Unix domain socket if (!m_ConnectionSettings.UnixSocketPath.empty()) { std::string SocketPathUtf8 = PathToUtf8(m_ConnectionSettings.UnixSocketPath); curl_easy_setopt(Handle, CURLOPT_UNIX_SOCKET_PATH, SocketPathUtf8.c_str()); } // Build URL with parameters ExtendableStringBuilder<256> Url; BuildUrlWithParameters(Url, m_BaseUri, ResourcePath, Parameters); curl_easy_setopt(Handle, CURLOPT_URL, Url.c_str()); // Timeouts if (m_ConnectionSettings.ConnectTimeout.count() > 0) { curl_easy_setopt(Handle, CURLOPT_CONNECTTIMEOUT_MS, static_cast(m_ConnectionSettings.ConnectTimeout.count())); } if (m_ConnectionSettings.Timeout.count() > 0) { curl_easy_setopt(Handle, CURLOPT_TIMEOUT_MS, static_cast(m_ConnectionSettings.Timeout.count())); } // HTTP/2 if (m_ConnectionSettings.AssumeHttp2) { curl_easy_setopt(Handle, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2_PRIOR_KNOWLEDGE); } // Verbose/debug if (m_ConnectionSettings.Verbose) { curl_easy_setopt(Handle, CURLOPT_VERBOSE, 1L); curl_easy_setopt(Handle, CURLOPT_DEBUGFUNCTION, CurlDebugCallback); curl_easy_setopt(Handle, CURLOPT_DEBUGDATA, &m_Log); } // SSL options if (m_ConnectionSettings.InsecureSsl) { curl_easy_setopt(Handle, CURLOPT_SSL_VERIFYPEER, 0L); curl_easy_setopt(Handle, CURLOPT_SSL_VERIFYHOST, 0L); } if (!m_ConnectionSettings.CaBundlePath.empty()) { curl_easy_setopt(Handle, CURLOPT_CAINFO, m_ConnectionSettings.CaBundlePath.c_str()); } // Disable signal handling for thread safety curl_easy_setopt(Handle, CURLOPT_NOSIGNAL, 1L); if (m_ConnectionSettings.ForbidReuseConnection) { curl_easy_setopt(Handle, CURLOPT_FORBID_REUSE, 1L); } // Note: Headers are NOT set here. Each method builds its own header list // (potentially adding method-specific headers like Content-Type) and passes // ownership to the Session via SetHeaders(). return Session(this, Handle); } void CurlHttpClient::ReleaseSession(CURL* Handle) { ZEN_TRACE_CPU("CurlHttpClient::ReleaseSession"); m_SessionLock.WithExclusiveLock([&] { m_Sessions.push_back(Handle); }); } ////////////////////////////////////////////////////////////////////////// // TransactPackage is a two-phase protocol (offer + send) with server-side state // between phases, so retrying individual phases would be incorrect. CurlHttpClient::Response CurlHttpClient::TransactPackage(std::string_view Url, CbPackage Package, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("CurlHttpClient::TransactPackage"); // First, list of offered chunks for filtering on the server end std::vector AttachmentsToSend; std::span Attachments = Package.GetAttachments(); const uint32_t RequestId = ++CurlHttpClientRequestIdCounter; auto RequestIdString = fmt::to_string(RequestId); if (!Attachments.empty()) { CbObjectWriter Writer; Writer.BeginArray("offer"); for (const CbAttachment& Attachment : Attachments) { Writer.AddHash(Attachment.GetHash()); } Writer.EndArray(); BinaryWriter MemWriter; Writer.Save(MemWriter); std::vector> OfferExtraHeaders; OfferExtraHeaders.emplace_back(HeaderContentType(HttpContentType::kCbPackageOffer)); OfferExtraHeaders.emplace_back("UE-Request", RequestIdString); Session Sess = AllocSession(Url, {}); CURL* H = Sess.Get(); Sess.SetHeaders(BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken(), OfferExtraHeaders)); curl_easy_setopt(H, CURLOPT_POST, 1L); curl_easy_setopt(H, CURLOPT_POSTFIELDS, reinterpret_cast(MemWriter.Data())); curl_easy_setopt(H, CURLOPT_POSTFIELDSIZE_LARGE, static_cast(MemWriter.Size())); CurlResult Result = Sess.PerformWithResponseCallbacks(); if (Result.ErrorCode == CURLE_OK && IsHttpSuccessCode(Result.StatusCode)) { IoBuffer ResponseBuffer(IoBuffer::Wrap, Result.Body.data(), Result.Body.size()); CbValidateError ValidationError = CbValidateError::None; if (CbObject ResponseObject = ValidateAndReadCompactBinaryObject(std::move(ResponseBuffer), ValidationError); ValidationError == CbValidateError::None) { for (CbFieldView& Entry : ResponseObject["need"]) { ZEN_ASSERT(Entry.IsHash()); AttachmentsToSend.push_back(Entry.AsHash()); } } } } // Prepare package for send CbPackage SendPackage; SendPackage.SetObject(Package.GetObject(), Package.GetObjectHash()); for (const IoHash& AttachmentCid : AttachmentsToSend) { const CbAttachment* Attachment = Package.FindAttachment(AttachmentCid); if (Attachment) { SendPackage.AddAttachment(*Attachment); } } // Transmit package payload CompositeBuffer Message = FormatPackageMessageBuffer(SendPackage); SharedBuffer FlatMessage = Message.Flatten(); std::vector> PkgExtraHeaders; PkgExtraHeaders.emplace_back(HeaderContentType(HttpContentType::kCbPackage)); PkgExtraHeaders.emplace_back("UE-Request", RequestIdString); Session Sess = AllocSession(Url, {}); CURL* H = Sess.Get(); Sess.SetHeaders(BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken(), PkgExtraHeaders)); curl_easy_setopt(H, CURLOPT_POST, 1L); curl_easy_setopt(H, CURLOPT_POSTFIELDS, reinterpret_cast(FlatMessage.GetData())); curl_easy_setopt(H, CURLOPT_POSTFIELDSIZE_LARGE, static_cast(FlatMessage.GetSize())); CurlResult Result = Sess.PerformWithResponseCallbacks(); return CommonResponse(m_SessionId, std::move(Result), {}, {}); } ////////////////////////////////////////////////////////////////////////// // // Standard HTTP verbs // CurlHttpClient::Response CurlHttpClient::Put(std::string_view Url, const IoBuffer& Payload, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("CurlHttpClient::Put"); return CommonResponse( m_SessionId, DoWithRetry( m_SessionId, [&]() -> CurlResult { Session Sess = AllocSession(Url, {}); CURL* H = Sess.Get(); Sess.SetHeaders( BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken(), {HeaderContentType(Payload.GetContentType())})); curl_easy_setopt(H, CURLOPT_UPLOAD, 1L); curl_easy_setopt(H, CURLOPT_INFILESIZE_LARGE, static_cast(Payload.GetSize())); ReadCallbackData ReadData{.DataPtr = static_cast(Payload.GetData()), .DataSize = Payload.GetSize(), .CheckIfAbortFunction = m_CheckIfAbortFunction ? &m_CheckIfAbortFunction : nullptr}; curl_easy_setopt(H, CURLOPT_READFUNCTION, CurlReadCallback); curl_easy_setopt(H, CURLOPT_READDATA, &ReadData); return Sess.PerformWithResponseCallbacks(); }), {}); } CurlHttpClient::Response CurlHttpClient::Put(std::string_view Url, const KeyValueMap& Parameters) { ZEN_TRACE_CPU("CurlHttpClient::Put"); return CommonResponse( m_SessionId, DoWithRetry(m_SessionId, [&]() -> CurlResult { KeyValueMap HeaderWithContentLength{std::pair{"Content-Length", "0"}}; Session Sess = AllocSession(Url, Parameters); CURL* H = Sess.Get(); Sess.SetHeaders(BuildHeaderList(HeaderWithContentLength, m_SessionId, GetAccessToken())); curl_easy_setopt(H, CURLOPT_UPLOAD, 1L); curl_easy_setopt(H, CURLOPT_INFILESIZE_LARGE, 0LL); return Sess.PerformWithResponseCallbacks(); }), {}); } CurlHttpClient::Response CurlHttpClient::Get(std::string_view Url, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters) { ZEN_TRACE_CPU("CurlHttpClient::Get"); return CommonResponse(m_SessionId, DoWithRetry( m_SessionId, [&]() -> CurlResult { Session Sess = AllocSession(Url, Parameters); Sess.SetHeaders(BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken())); curl_easy_setopt(Sess.Get(), CURLOPT_HTTPGET, 1L); return Sess.PerformWithResponseCallbacks(); }, [this](CurlResult& Result) { std::unique_ptr NoTempFile; return ValidatePayload(Result, NoTempFile); }), {}); } CurlHttpClient::Response CurlHttpClient::Head(std::string_view Url, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("CurlHttpClient::Head"); return CommonResponse(m_SessionId, DoWithRetry(m_SessionId, [&]() -> CurlResult { Session Sess = AllocSession(Url, {}); Sess.SetHeaders(BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken())); curl_easy_setopt(Sess.Get(), CURLOPT_NOBODY, 1L); return Sess.PerformWithResponseCallbacks(); }), {}); } CurlHttpClient::Response CurlHttpClient::Delete(std::string_view Url, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("CurlHttpClient::Delete"); return CommonResponse(m_SessionId, DoWithRetry(m_SessionId, [&]() -> CurlResult { Session Sess = AllocSession(Url, {}); Sess.SetHeaders(BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken())); curl_easy_setopt(Sess.Get(), CURLOPT_CUSTOMREQUEST, "DELETE"); return Sess.PerformWithResponseCallbacks(); }), {}); } CurlHttpClient::Response CurlHttpClient::Post(std::string_view Url, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters) { ZEN_TRACE_CPU("CurlHttpClient::PostNoPayload"); return CommonResponse(m_SessionId, DoWithRetry(m_SessionId, [&]() -> CurlResult { Session Sess = AllocSession(Url, Parameters); Sess.SetHeaders(BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken())); curl_easy_setopt(Sess.Get(), CURLOPT_POST, 1L); curl_easy_setopt(Sess.Get(), CURLOPT_POSTFIELDSIZE, 0L); return Sess.PerformWithResponseCallbacks(); }), {}); } CurlHttpClient::Response CurlHttpClient::Post(std::string_view Url, const IoBuffer& Payload, const KeyValueMap& AdditionalHeader) { return Post(Url, Payload, Payload.GetContentType(), AdditionalHeader); } CurlHttpClient::Response CurlHttpClient::Post(std::string_view Url, const IoBuffer& Payload, ZenContentType ContentType, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("CurlHttpClient::PostWithPayload"); return CommonResponse( m_SessionId, DoWithRetry( m_SessionId, [&]() -> CurlResult { Session Sess = AllocSession(Url, {}); CURL* H = Sess.Get(); Sess.SetHeaders(BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken(), {HeaderContentType(ContentType)})); IoBufferFileReference FileRef = {nullptr, 0, 0}; if (Payload.GetFileReference(FileRef)) { detail::BufferedReadFileStream Buffer(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize, 512u * 1024u); FileReadCallbackData ReadData{.Buffer = &Buffer, .TotalSize = Payload.GetSize(), .CheckIfAbortFunction = m_CheckIfAbortFunction ? &m_CheckIfAbortFunction : nullptr}; curl_easy_setopt(H, CURLOPT_POST, 1L); curl_easy_setopt(H, CURLOPT_POSTFIELDSIZE_LARGE, static_cast(Payload.GetSize())); curl_easy_setopt(H, CURLOPT_READFUNCTION, CurlFileReadCallback); curl_easy_setopt(H, CURLOPT_READDATA, &ReadData); return Sess.PerformWithResponseCallbacks(); } curl_easy_setopt(H, CURLOPT_POST, 1L); curl_easy_setopt(H, CURLOPT_POSTFIELDS, reinterpret_cast(Payload.GetData())); curl_easy_setopt(H, CURLOPT_POSTFIELDSIZE_LARGE, static_cast(Payload.GetSize())); return Sess.PerformWithResponseCallbacks(); }), {}); } CurlHttpClient::Response CurlHttpClient::Post(std::string_view Url, CbObject Payload, const KeyValueMap& AdditionalHeader, const std::filesystem::path& TempFolderPath) { ZEN_TRACE_CPU("CurlHttpClient::PostObjectPayload"); std::string PayloadString; std::unique_ptr PayloadFile; CurlResult Result = DoWithRetry( m_SessionId, [&]() -> CurlResult { PayloadString.clear(); PayloadFile.reset(); Session Sess = AllocSession(Url, {}); CURL* H = Sess.Get(); Sess.SetHeaders( BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken(), {HeaderContentType(ZenContentType::kCbObject)})); curl_easy_setopt(H, CURLOPT_POST, 1L); curl_easy_setopt(H, CURLOPT_POSTFIELDS, reinterpret_cast(Payload.GetBuffer().GetData())); curl_easy_setopt(H, CURLOPT_POSTFIELDSIZE_LARGE, static_cast(Payload.GetBuffer().GetSize())); struct PostHeaderCallbackData { std::vector>* Headers = nullptr; std::unique_ptr* PayloadFile = nullptr; std::string* PayloadString = nullptr; const std::filesystem::path* TempFolderPath = nullptr; uint64_t MaxInMemorySize = 0; LoggerRef Log; }; PostHeaderCallbackData PostHdrData; std::vector> ResponseHeaders; PostHdrData.Headers = &ResponseHeaders; PostHdrData.PayloadFile = &PayloadFile; PostHdrData.PayloadString = &PayloadString; PostHdrData.TempFolderPath = &TempFolderPath; PostHdrData.MaxInMemorySize = m_ConnectionSettings.MaximumInMemoryDownloadSize; PostHdrData.Log = m_Log; auto HeaderCb = [](char* Buffer, size_t Size, size_t Nmemb, void* UserData) -> size_t { auto* Data = static_cast(UserData); size_t TotalBytes = Size * Nmemb; if (auto Header = ParseHeaderLine(std::string_view(Buffer, TotalBytes))) { auto& [Key, Value] = *Header; if (StrCaseCompare(Key, "Content-Length") == 0) { std::optional ContentLength = ParseInt(Value); if (ContentLength.has_value()) { if (!Data->TempFolderPath->empty() && ContentLength.value() > Data->MaxInMemorySize) { *Data->PayloadFile = std::make_unique(); std::error_code Ec = (*Data->PayloadFile)->Open(*Data->TempFolderPath, ContentLength.value()); if (Ec) { auto Log = [&]() -> LoggerRef { return Data->Log; }; ZEN_WARN("Failed to create temp file in '{}' for HttpClient::Post. Reason: {}", Data->TempFolderPath->string(), Ec.message()); Data->PayloadFile->reset(); } } else { Data->PayloadString->reserve(ContentLength.value()); } } } Data->Headers->emplace_back(std::string(Key), std::string(Value)); } return TotalBytes; }; curl_easy_setopt(H, CURLOPT_HEADERFUNCTION, static_cast(HeaderCb)); curl_easy_setopt(H, CURLOPT_HEADERDATA, &PostHdrData); struct PostWriteCallbackData { std::string* PayloadString = nullptr; std::unique_ptr* PayloadFile = nullptr; std::function* CheckIfAbortFunction = nullptr; const std::filesystem::path* TempFolderPath = nullptr; LoggerRef Log; }; PostWriteCallbackData PostWriteData; PostWriteData.PayloadString = &PayloadString; PostWriteData.PayloadFile = &PayloadFile; PostWriteData.CheckIfAbortFunction = m_CheckIfAbortFunction ? &m_CheckIfAbortFunction : nullptr; PostWriteData.TempFolderPath = &TempFolderPath; PostWriteData.Log = m_Log; auto WriteCb = [](char* Ptr, size_t Size, size_t Nmemb, void* UserData) -> size_t { auto* Data = static_cast(UserData); size_t TotalBytes = Size * Nmemb; if (Data->CheckIfAbortFunction && *Data->CheckIfAbortFunction && (*Data->CheckIfAbortFunction)()) { return 0; } if (*Data->PayloadFile) { std::error_code Ec = (*Data->PayloadFile)->Write(std::string_view(Ptr, TotalBytes)); if (Ec) { auto Log = [&]() -> LoggerRef { return Data->Log; }; ZEN_WARN("Failed to write to temp file in '{}' for HttpClient::Post. Reason: {}", Data->TempFolderPath->string(), Ec.message()); return 0; } } else { Data->PayloadString->append(Ptr, TotalBytes); } return TotalBytes; }; curl_easy_setopt(H, CURLOPT_WRITEFUNCTION, static_cast(WriteCb)); curl_easy_setopt(H, CURLOPT_WRITEDATA, &PostWriteData); CurlResult Res = Sess.Perform(); Res.Headers = std::move(ResponseHeaders); if (!PayloadString.empty()) { Res.Body = std::move(PayloadString); } return Res; }, PayloadFile); return CommonResponse(m_SessionId, std::move(Result), PayloadFile ? PayloadFile->DetachToIoBuffer() : IoBuffer{}, {}); } CurlHttpClient::Response CurlHttpClient::Post(std::string_view Url, CbPackage Pkg, const KeyValueMap& AdditionalHeader) { return Post(Url, zen::FormatPackageMessageBuffer(Pkg), ZenContentType::kCbPackage, AdditionalHeader); } CurlHttpClient::Response CurlHttpClient::Post(std::string_view Url, const CompositeBuffer& Payload, ZenContentType ContentType, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("CurlHttpClient::Post"); return CommonResponse( m_SessionId, DoWithRetry(m_SessionId, [&]() -> CurlResult { Session Sess = AllocSession(Url, {}); CURL* H = Sess.Get(); Sess.SetHeaders(BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken(), {HeaderContentType(ContentType)})); detail::CompositeBufferReadStream Reader(Payload, 512u * 1024u); StreamReadCallbackData ReadData{.Reader = &Reader, .CheckIfAbortFunction = m_CheckIfAbortFunction ? &m_CheckIfAbortFunction : nullptr}; curl_easy_setopt(H, CURLOPT_POST, 1L); curl_easy_setopt(H, CURLOPT_POSTFIELDSIZE_LARGE, static_cast(Payload.GetSize())); curl_easy_setopt(H, CURLOPT_READFUNCTION, CurlStreamReadCallback); curl_easy_setopt(H, CURLOPT_READDATA, &ReadData); return Sess.PerformWithResponseCallbacks(); }), {}); } CurlHttpClient::Response CurlHttpClient::Upload(std::string_view Url, const IoBuffer& Payload, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("CurlHttpClient::Upload"); return CommonResponse( m_SessionId, DoWithRetry( m_SessionId, [&]() -> CurlResult { Session Sess = AllocSession(Url, {}); CURL* H = Sess.Get(); Sess.SetHeaders( BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken(), {HeaderContentType(Payload.GetContentType())})); curl_easy_setopt(H, CURLOPT_UPLOAD, 1L); curl_easy_setopt(H, CURLOPT_INFILESIZE_LARGE, static_cast(Payload.GetSize())); IoBufferFileReference FileRef = {nullptr, 0, 0}; if (Payload.GetFileReference(FileRef)) { detail::BufferedReadFileStream Buffer(FileRef.FileHandle, FileRef.FileChunkOffset, FileRef.FileChunkSize, 512u * 1024u); FileReadCallbackData ReadData{.Buffer = &Buffer, .TotalSize = Payload.GetSize(), .CheckIfAbortFunction = m_CheckIfAbortFunction ? &m_CheckIfAbortFunction : nullptr}; curl_easy_setopt(H, CURLOPT_READFUNCTION, CurlFileReadCallback); curl_easy_setopt(H, CURLOPT_READDATA, &ReadData); return Sess.PerformWithResponseCallbacks(); } ReadCallbackData ReadData{.DataPtr = static_cast(Payload.GetData()), .DataSize = Payload.GetSize(), .CheckIfAbortFunction = m_CheckIfAbortFunction ? &m_CheckIfAbortFunction : nullptr}; curl_easy_setopt(H, CURLOPT_READFUNCTION, CurlReadCallback); curl_easy_setopt(H, CURLOPT_READDATA, &ReadData); return Sess.PerformWithResponseCallbacks(); }), {}); } CurlHttpClient::Response CurlHttpClient::Upload(std::string_view Url, const CompositeBuffer& Payload, ZenContentType ContentType, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("CurlHttpClient::Upload"); return CommonResponse( m_SessionId, DoWithRetry(m_SessionId, [&]() -> CurlResult { Session Sess = AllocSession(Url, {}); CURL* H = Sess.Get(); Sess.SetHeaders(BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken(), {HeaderContentType(ContentType)})); curl_easy_setopt(H, CURLOPT_UPLOAD, 1L); curl_easy_setopt(H, CURLOPT_INFILESIZE_LARGE, static_cast(Payload.GetSize())); detail::CompositeBufferReadStream Reader(Payload, 512u * 1024u); StreamReadCallbackData ReadData{.Reader = &Reader, .CheckIfAbortFunction = m_CheckIfAbortFunction ? &m_CheckIfAbortFunction : nullptr}; curl_easy_setopt(H, CURLOPT_READFUNCTION, CurlStreamReadCallback); curl_easy_setopt(H, CURLOPT_READDATA, &ReadData); return Sess.PerformWithResponseCallbacks(); }), {}); } CurlHttpClient::Response CurlHttpClient::Download(std::string_view Url, const std::filesystem::path& TempFolderPath, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("CurlHttpClient::Download"); std::string PayloadString; std::unique_ptr PayloadFile; HttpContentType ContentType = HttpContentType::kUnknownContentType; detail::MultipartBoundaryParser BoundaryParser; bool IsMultiRangeResponse = false; CurlResult Result = DoWithRetry( m_SessionId, [&]() -> CurlResult { Session Sess = AllocSession(Url, {}); CURL* H = Sess.Get(); Sess.SetHeaders(BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken())); curl_easy_setopt(H, CURLOPT_HTTPGET, 1L); // Reset state from any previous attempt PayloadString.clear(); PayloadFile.reset(); BoundaryParser.Boundaries.clear(); ContentType = HttpContentType::kUnknownContentType; IsMultiRangeResponse = false; // Track requested content length from Range header (sum all ranges) uint64_t RequestedContentLength = (uint64_t)-1; if (auto RangeIt = AdditionalHeader.Entries.find("Range"); RangeIt != AdditionalHeader.Entries.end()) { if (RangeIt->second.starts_with("bytes")) { std::string_view RangeValue(RangeIt->second); size_t RangeStartPos = RangeValue.find('=', 5); if (RangeStartPos != std::string_view::npos) { RangeStartPos++; while (RangeStartPos < RangeValue.length() && RangeValue[RangeStartPos] == ' ') { RangeStartPos++; } RequestedContentLength = 0; while (RangeStartPos < RangeValue.length()) { size_t RangeEnd = RangeValue.find_first_of(", \r\n", RangeStartPos); if (RangeEnd == std::string_view::npos) { RangeEnd = RangeValue.length(); } std::string_view RangeString = RangeValue.substr(RangeStartPos, RangeEnd - RangeStartPos); size_t RangeSplitPos = RangeString.find('-'); if (RangeSplitPos != std::string_view::npos) { std::optional RequestedRangeStart = ParseInt(RangeString.substr(0, RangeSplitPos)); std::optional RequestedRangeEnd = ParseInt(RangeString.substr(RangeSplitPos + 1)); if (RequestedRangeStart.has_value() && RequestedRangeEnd.has_value()) { RequestedContentLength += RequestedRangeEnd.value() - RequestedRangeStart.value() + 1; } } RangeStartPos = RangeEnd; while (RangeStartPos != RangeValue.length() && (RangeValue[RangeStartPos] == ',' || RangeValue[RangeStartPos] == ' ')) { RangeStartPos++; } } } } } // Header callback that detects Content-Length and switches to file-backed storage when needed struct DownloadHeaderCallbackData { std::vector>* Headers = nullptr; std::unique_ptr* PayloadFile = nullptr; std::string* PayloadString = nullptr; const std::filesystem::path* TempFolderPath = nullptr; uint64_t MaxInMemorySize = 0; LoggerRef Log; detail::MultipartBoundaryParser* BoundaryParser = nullptr; bool* IsMultiRange = nullptr; HttpContentType* ContentTypeOut = nullptr; }; DownloadHeaderCallbackData DlHdrData; std::vector> ResponseHeaders; DlHdrData.Headers = &ResponseHeaders; DlHdrData.PayloadFile = &PayloadFile; DlHdrData.PayloadString = &PayloadString; DlHdrData.TempFolderPath = &TempFolderPath; DlHdrData.MaxInMemorySize = m_ConnectionSettings.MaximumInMemoryDownloadSize; DlHdrData.Log = m_Log; DlHdrData.BoundaryParser = &BoundaryParser; DlHdrData.IsMultiRange = &IsMultiRangeResponse; DlHdrData.ContentTypeOut = &ContentType; auto HeaderCb = [](char* Buffer, size_t Size, size_t Nmemb, void* UserData) -> size_t { auto* Data = static_cast(UserData); size_t TotalBytes = Size * Nmemb; if (auto Header = ParseHeaderLine(std::string_view(Buffer, TotalBytes))) { auto& [KeyView, Value] = *Header; const std::string Key(KeyView); if (StrCaseCompare(Key, "Content-Length") == 0) { std::optional ContentLength = ParseInt(Value); if (ContentLength.has_value()) { if (ContentLength.value() > Data->MaxInMemorySize) { *Data->PayloadFile = std::make_unique(); std::error_code Ec = (*Data->PayloadFile)->Open(*Data->TempFolderPath, ContentLength.value()); if (Ec) { auto Log = [&]() -> LoggerRef { return Data->Log; }; ZEN_WARN("Failed to create temp file in '{}' for HttpClient::Download. Reason: {}", Data->TempFolderPath->string(), Ec.message()); Data->PayloadFile->reset(); } } else { Data->PayloadString->reserve(ContentLength.value()); } } } else if (StrCaseCompare(Key, "Content-Type") == 0) { *Data->IsMultiRange = Data->BoundaryParser->Init(Value); if (!*Data->IsMultiRange) { *Data->ContentTypeOut = ParseContentType(Value); } } else if (StrCaseCompare(Key, "Content-Range") == 0) { if (!*Data->IsMultiRange) { std::pair Range = detail::ParseContentRange(Value); if (Range.second != 0) { Data->BoundaryParser->Boundaries.push_back( HttpClient::Response::MultipartBoundary{.OffsetInPayload = 0, .RangeOffset = Range.first, .RangeLength = Range.second, .ContentType = *Data->ContentTypeOut}); } } } Data->Headers->emplace_back(Key, std::string(Value)); } return TotalBytes; }; curl_easy_setopt(H, CURLOPT_HEADERFUNCTION, static_cast(HeaderCb)); curl_easy_setopt(H, CURLOPT_HEADERDATA, &DlHdrData); // Write callback that directs data to file or string struct DownloadWriteCallbackData { std::string* PayloadString = nullptr; std::unique_ptr* PayloadFile = nullptr; std::function* CheckIfAbortFunction = nullptr; const std::filesystem::path* TempFolderPath = nullptr; LoggerRef Log; detail::MultipartBoundaryParser* BoundaryParser = nullptr; bool* IsMultiRange = nullptr; }; DownloadWriteCallbackData DlWriteData; DlWriteData.PayloadString = &PayloadString; DlWriteData.PayloadFile = &PayloadFile; DlWriteData.CheckIfAbortFunction = m_CheckIfAbortFunction ? &m_CheckIfAbortFunction : nullptr; DlWriteData.TempFolderPath = &TempFolderPath; DlWriteData.Log = m_Log; DlWriteData.BoundaryParser = &BoundaryParser; DlWriteData.IsMultiRange = &IsMultiRangeResponse; auto WriteCb = [](char* Ptr, size_t Size, size_t Nmemb, void* UserData) -> size_t { auto* Data = static_cast(UserData); size_t TotalBytes = Size * Nmemb; if (Data->CheckIfAbortFunction && *Data->CheckIfAbortFunction && (*Data->CheckIfAbortFunction)()) { return 0; } if (*Data->IsMultiRange) { Data->BoundaryParser->ParseInput(std::string_view(Ptr, TotalBytes)); } if (*Data->PayloadFile) { std::error_code Ec = (*Data->PayloadFile)->Write(std::string_view(Ptr, TotalBytes)); if (Ec) { auto Log = [&]() -> LoggerRef { return Data->Log; }; ZEN_WARN("Failed to write to temp file in '{}' for HttpClient::Download. Reason: {}", Data->TempFolderPath->string(), Ec.message()); return 0; } } else { Data->PayloadString->append(Ptr, TotalBytes); } return TotalBytes; }; curl_easy_setopt(H, CURLOPT_WRITEFUNCTION, static_cast(WriteCb)); curl_easy_setopt(H, CURLOPT_WRITEDATA, &DlWriteData); CurlResult Res = Sess.Perform(); Res.Headers = std::move(ResponseHeaders); // Handle resume logic if (m_ConnectionSettings.AllowResume) { auto SupportsRanges = [](const CurlResult& R) -> bool { for (const auto& [K, V] : R.Headers) { if (StrCaseCompare(K, "Content-Range") == 0) { return true; } if (StrCaseCompare(K, "Accept-Ranges") == 0) { return V == "bytes"sv; } } return false; }; auto ShouldResumeCheck = [&SupportsRanges, &IsMultiRangeResponse](const CurlResult& R) -> bool { if (IsMultiRangeResponse) { return false; } if (ShouldRetry(R)) { return SupportsRanges(R); } return false; }; if (ShouldResumeCheck(Res)) { // Find Content-Length std::string ContentLengthValue; for (const auto& [K, V] : Res.Headers) { if (StrCaseCompare(K, "Content-Length") == 0) { ContentLengthValue = V; break; } } if (!ContentLengthValue.empty()) { uint64_t ContentLength = RequestedContentLength; if (ContentLength == uint64_t(-1)) { if (auto ParsedContentLength = ParseInt(ContentLengthValue); ParsedContentLength.has_value()) { ContentLength = ParsedContentLength.value(); } } KeyValueMap HeadersWithRange(AdditionalHeader); uint8_t ResumeAttempt = 0; do { uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length(); std::string Range = fmt::format("bytes={}-{}", DownloadedSize, DownloadedSize + ContentLength - 1); if (auto RangeIt = HeadersWithRange.Entries.find("Range"); RangeIt != HeadersWithRange.Entries.end()) { if (RangeIt->second == Range) { break; // No progress, abort } } HeadersWithRange.Entries.insert_or_assign("Range", Range); Session ResumeSess = AllocSession(Url, {}); CURL* ResumeH = ResumeSess.Get(); ResumeSess.SetHeaders(BuildHeaderList(HeadersWithRange, m_SessionId, GetAccessToken())); curl_easy_setopt(ResumeH, CURLOPT_HTTPGET, 1L); std::vector> ResumeHeaders; struct ResumeHeaderCbData { std::vector>* Headers = nullptr; std::unique_ptr* PayloadFile = nullptr; std::string* PayloadString = nullptr; }; ResumeHeaderCbData ResumeHdrData; ResumeHdrData.Headers = &ResumeHeaders; ResumeHdrData.PayloadFile = &PayloadFile; ResumeHdrData.PayloadString = &PayloadString; auto ResumeHeaderCb = [](char* Buffer, size_t Size, size_t Nmemb, void* UserData) -> size_t { auto* Data = static_cast(UserData); size_t TotalBytes = Size * Nmemb; auto Header = ParseHeaderLine(std::string_view(Buffer, TotalBytes)); if (!Header) { return TotalBytes; } auto& [Key, Value] = *Header; if (StrCaseCompare(Key, "Content-Range") == 0) { if (Value.starts_with("bytes "sv)) { size_t RangeStartEnd = Value.find('-', 6); if (RangeStartEnd != std::string_view::npos) { const std::optional Start = ParseInt(Value.substr(6, RangeStartEnd - 6)); if (Start) { uint64_t DownloadedSize = *Data->PayloadFile ? (*Data->PayloadFile)->GetSize() : Data->PayloadString->length(); if (Start.value() == DownloadedSize) { Data->Headers->emplace_back(std::string(Key), std::string(Value)); return TotalBytes; } else if (Start.value() > DownloadedSize) { return 0; } if (*Data->PayloadFile) { (*Data->PayloadFile)->ResetWritePos(Start.value()); } else { *Data->PayloadString = Data->PayloadString->substr(0, Start.value()); } Data->Headers->emplace_back(std::string(Key), std::string(Value)); return TotalBytes; } } } return 0; } Data->Headers->emplace_back(std::string(Key), std::string(Value)); return TotalBytes; }; curl_easy_setopt(ResumeH, CURLOPT_HEADERFUNCTION, static_cast(ResumeHeaderCb)); curl_easy_setopt(ResumeH, CURLOPT_HEADERDATA, &ResumeHdrData); curl_easy_setopt(ResumeH, CURLOPT_WRITEFUNCTION, static_cast(WriteCb)); curl_easy_setopt(ResumeH, CURLOPT_WRITEDATA, &DlWriteData); Res = ResumeSess.Perform(); Res.Headers = std::move(ResumeHeaders); ResumeAttempt++; } while (ResumeAttempt < m_ConnectionSettings.RetryCount && ShouldResumeCheck(Res)); } } } if (!PayloadString.empty()) { Res.Body = std::move(PayloadString); } return Res; }, PayloadFile); return CommonResponse(m_SessionId, std::move(Result), PayloadFile ? PayloadFile->DetachToIoBuffer() : IoBuffer{}, std::move(BoundaryParser.Boundaries)); } } // namespace zen