From 1a3a175a2ca0c06a29e6a679c325395c8008a17e Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Wed, 11 Mar 2026 16:12:00 +0100 Subject: added streaming download of payloads http client Post (#824) * added streaming download of payloads in cpr client ::Post * curlclient Post streaming download * case sensitivity fixes for http headers * move over missing functionality from crpclient to httpclient --- src/zenhttp/clients/httpclientcurl.cpp | 235 ++++++++++++++++++++++++++------- 1 file changed, 189 insertions(+), 46 deletions(-) (limited to 'src/zenhttp/clients/httpclientcurl.cpp') diff --git a/src/zenhttp/clients/httpclientcurl.cpp b/src/zenhttp/clients/httpclientcurl.cpp index 3cb749018..341adc5f7 100644 --- a/src/zenhttp/clients/httpclientcurl.cpp +++ b/src/zenhttp/clients/httpclientcurl.cpp @@ -413,7 +413,7 @@ CurlHttpClient::ResponseWithPayload(std::string_view SessionId, for (const auto& [Key, Value] : Result.Headers) { - if (Key == "Content-Type") + if (StrCaseCompare(Key.c_str(), "Content-Type") == 0) { const HttpContentType ContentType = ParseContentType(Value); ResponseBuffer.SetContentType(ContentType); @@ -522,7 +522,7 @@ CurlHttpClient::ValidatePayload(CurlResult& Result, std::unique_ptr ExpectedContentSize = ParseInt(Value); if (!ExpectedContentSize.has_value()) @@ -549,7 +549,7 @@ CurlHttpClient::ValidatePayload(CurlResult& Result, std::unique_ptr PayloadFile; + + CurlResult Result = DoWithRetry( m_SessionId, - DoWithRetry( - m_SessionId, - [&]() -> CurlResult { - Session Sess = AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); - CURL* H = Sess.Get(); + [&]() -> CurlResult { + PayloadString.clear(); + PayloadFile.reset(); - curl_slist* Headers = - BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken(), {HeaderContentType(ZenContentType::kCbObject)}); - curl_easy_setopt(H, CURLOPT_HTTPHEADER, Headers); + Session Sess = AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + CURL* H = Sess.Get(); - curl_easy_setopt(H, CURLOPT_POST, 1L); - curl_easy_setopt(H, CURLOPT_POSTFIELDS, reinterpret_cast(Payload.GetBuffer().GetData())); - curl_easy_setopt(H, CURLOPT_POSTFIELDSIZE_LARGE, static_cast(Payload.GetBuffer().GetSize())); + curl_slist* Headers = + BuildHeaderList(AdditionalHeader, m_SessionId, GetAccessToken(), {HeaderContentType(ZenContentType::kCbObject)}); + curl_easy_setopt(H, CURLOPT_HTTPHEADER, Headers); - std::string Body; - WriteCallbackData WriteData{.Body = &Body}; - HeaderCallbackData HdrData{}; - std::vector> ResponseHeaders; - HdrData.Headers = &ResponseHeaders; + curl_easy_setopt(H, CURLOPT_POST, 1L); + curl_easy_setopt(H, CURLOPT_POSTFIELDS, reinterpret_cast(Payload.GetBuffer().GetData())); + curl_easy_setopt(H, CURLOPT_POSTFIELDSIZE_LARGE, static_cast(Payload.GetBuffer().GetSize())); - curl_easy_setopt(H, CURLOPT_WRITEFUNCTION, CurlWriteCallback); - curl_easy_setopt(H, CURLOPT_WRITEDATA, &WriteData); - curl_easy_setopt(H, CURLOPT_HEADERFUNCTION, CurlHeaderCallback); - curl_easy_setopt(H, CURLOPT_HEADERDATA, &HdrData); + struct PostHeaderCallbackData + { + std::vector>* Headers = nullptr; + std::unique_ptr* PayloadFile = nullptr; + std::string* PayloadString = nullptr; + const std::filesystem::path* TempFolderPath = nullptr; + uint64_t MaxInMemorySize = 0; + LoggerRef Log; + }; - CurlResult Result = Sess.Perform(); - Result.Body = std::move(Body); - Result.Headers = std::move(ResponseHeaders); + PostHeaderCallbackData PostHdrData; + std::vector> ResponseHeaders; + PostHdrData.Headers = &ResponseHeaders; + PostHdrData.PayloadFile = &PayloadFile; + PostHdrData.PayloadString = &PayloadString; + PostHdrData.TempFolderPath = &TempFolderPath; + PostHdrData.MaxInMemorySize = m_ConnectionSettings.MaximumInMemoryDownloadSize; + PostHdrData.Log = m_Log; - curl_slist_free_all(Headers); - return Result; - }), - {}); + auto HeaderCb = [](char* Buffer, size_t Size, size_t Nmemb, void* UserData) -> size_t { + auto* Data = static_cast(UserData); + size_t TotalBytes = Size * Nmemb; + + std::string_view Line(Buffer, TotalBytes); + while (!Line.empty() && (Line.back() == '\r' || Line.back() == '\n')) + { + Line.remove_suffix(1); + } + + if (Line.empty()) + { + return TotalBytes; + } + + size_t ColonPos = Line.find(':'); + if (ColonPos != std::string_view::npos) + { + std::string_view Key = Line.substr(0, ColonPos); + std::string_view Value = Line.substr(ColonPos + 1); + + while (!Key.empty() && Key.back() == ' ') + { + Key.remove_suffix(1); + } + while (!Value.empty() && Value.front() == ' ') + { + Value.remove_prefix(1); + } + + if (StrCaseCompare(std::string(Key).c_str(), "Content-Length") == 0) + { + std::optional ContentLength = ParseInt(Value); + if (ContentLength.has_value()) + { + if (!Data->TempFolderPath->empty() && ContentLength.value() > Data->MaxInMemorySize) + { + *Data->PayloadFile = std::make_unique(); + std::error_code Ec = (*Data->PayloadFile)->Open(*Data->TempFolderPath, ContentLength.value()); + if (Ec) + { + auto Log = [&]() -> LoggerRef { return Data->Log; }; + ZEN_WARN("Failed to create temp file in '{}' for HttpClient::Post. Reason: {}", + Data->TempFolderPath->string(), + Ec.message()); + Data->PayloadFile->reset(); + } + } + else + { + Data->PayloadString->reserve(ContentLength.value()); + } + } + } + + Data->Headers->emplace_back(std::string(Key), std::string(Value)); + } + + return TotalBytes; + }; + + curl_easy_setopt(H, CURLOPT_HEADERFUNCTION, static_cast(HeaderCb)); + curl_easy_setopt(H, CURLOPT_HEADERDATA, &PostHdrData); + + struct PostWriteCallbackData + { + std::string* PayloadString = nullptr; + std::unique_ptr* PayloadFile = nullptr; + std::function* CheckIfAbortFunction = nullptr; + const std::filesystem::path* TempFolderPath = nullptr; + LoggerRef Log; + }; + + PostWriteCallbackData PostWriteData; + PostWriteData.PayloadString = &PayloadString; + PostWriteData.PayloadFile = &PayloadFile; + PostWriteData.CheckIfAbortFunction = m_CheckIfAbortFunction ? &m_CheckIfAbortFunction : nullptr; + PostWriteData.TempFolderPath = &TempFolderPath; + PostWriteData.Log = m_Log; + + auto WriteCb = [](char* Ptr, size_t Size, size_t Nmemb, void* UserData) -> size_t { + auto* Data = static_cast(UserData); + size_t TotalBytes = Size * Nmemb; + + if (Data->CheckIfAbortFunction && *Data->CheckIfAbortFunction && (*Data->CheckIfAbortFunction)()) + { + return 0; + } + + if (*Data->PayloadFile) + { + std::error_code Ec = (*Data->PayloadFile)->Write(std::string_view(Ptr, TotalBytes)); + if (Ec) + { + auto Log = [&]() -> LoggerRef { return Data->Log; }; + ZEN_WARN("Failed to write to temp file in '{}' for HttpClient::Post. Reason: {}", + Data->TempFolderPath->string(), + Ec.message()); + return 0; + } + } + else + { + Data->PayloadString->append(Ptr, TotalBytes); + } + return TotalBytes; + }; + + curl_easy_setopt(H, CURLOPT_WRITEFUNCTION, static_cast(WriteCb)); + curl_easy_setopt(H, CURLOPT_WRITEDATA, &PostWriteData); + + CurlResult Res = Sess.Perform(); + Res.Headers = std::move(ResponseHeaders); + + if (!PayloadString.empty()) + { + Res.Body = std::move(PayloadString); + } + + curl_slist_free_all(Headers); + return Res; + }, + PayloadFile); + + return CommonResponse(m_SessionId, std::move(Result), PayloadFile ? PayloadFile->DetachToIoBuffer() : IoBuffer{}, {}); } CurlHttpClient::Response @@ -1616,19 +1757,21 @@ CurlHttpClient::Download(std::string_view Url, const std::filesystem::path& Temp size_t ColonPos = Line.find(':'); if (ColonPos != std::string_view::npos) { - std::string_view Key = Line.substr(0, ColonPos); - std::string_view Value = Line.substr(ColonPos + 1); + std::string_view KeyView = Line.substr(0, ColonPos); + std::string_view Value = Line.substr(ColonPos + 1); - while (!Key.empty() && Key.back() == ' ') + while (!KeyView.empty() && KeyView.back() == ' ') { - Key.remove_suffix(1); + KeyView.remove_suffix(1); } while (!Value.empty() && Value.front() == ' ') { Value.remove_prefix(1); } - if (Key == "Content-Length"sv) + const std::string Key(KeyView); + + if (StrCaseCompare(Key.c_str(), "Content-Length") == 0) { std::optional ContentLength = ParseInt(Value); if (ContentLength.has_value()) @@ -1652,7 +1795,7 @@ CurlHttpClient::Download(std::string_view Url, const std::filesystem::path& Temp } } } - else if (Key == "Content-Type"sv) + else if (StrCaseCompare(Key.c_str(), "Content-Type") == 0) { *Data->IsMultiRange = Data->BoundaryParser->Init(Value); if (!*Data->IsMultiRange) @@ -1660,7 +1803,7 @@ CurlHttpClient::Download(std::string_view Url, const std::filesystem::path& Temp *Data->ContentTypeOut = ParseContentType(Value); } } - else if (Key == "Content-Range"sv) + else if (StrCaseCompare(Key.c_str(), "Content-Range") == 0) { if (!*Data->IsMultiRange) { @@ -1751,13 +1894,13 @@ CurlHttpClient::Download(std::string_view Url, const std::filesystem::path& Temp auto SupportsRanges = [](const CurlResult& R) -> bool { for (const auto& [K, V] : R.Headers) { - if (K == "Content-Range") + if (StrCaseCompare(K.c_str(), "Content-Range") == 0) { return true; } - if (K == "Accept-Ranges" && V == "bytes") + if (StrCaseCompare(K.c_str(), "Accept-Ranges") == 0) { - return true; + return V == "bytes"sv; } } return false; @@ -1781,7 +1924,7 @@ CurlHttpClient::Download(std::string_view Url, const std::filesystem::path& Temp std::string ContentLengthValue; for (const auto& [K, V] : Res.Headers) { - if (K == "Content-Length") + if (StrCaseCompare(K.c_str(), "Content-Length") == 0) { ContentLengthValue = V; break; @@ -1865,7 +2008,7 @@ CurlHttpClient::Download(std::string_view Url, const std::filesystem::path& Temp Value.remove_prefix(1); } - if (Key == "Content-Range"sv) + if (StrCaseCompare(std::string(Key).c_str(), "Content-Range") == 0) { if (Value.starts_with("bytes "sv)) { -- cgit v1.2.3