diff options
| author | Dan Engelbrecht <[email protected]> | 2024-04-20 13:22:05 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-04-20 13:22:05 +0200 |
| commit | aa0b0d3cbfc6c4561591df856396703f7177292e (patch) | |
| tree | 6ff9a4e94559ba62d8ee07076d56dedc7d2e9115 /src/zenhttp/httpclient.cpp | |
| parent | 5.4.5-pre0 (diff) | |
| download | zen-aa0b0d3cbfc6c4561591df856396703f7177292e.tar.xz zen-aa0b0d3cbfc6c4561591df856396703f7177292e.zip | |
import oplog improvements (#54)
* report down/up transfer speed during progress
* add disk buffering in http client
* offload block decoding and chunk writing form network worker pool threads
add block hash verification for blocks recevied at oplog import
* separate download-latch from write-latch to get more accurate download speed
* check headers when downloading with http client to go directly to file writing for large payloads
* we must clear write callback even if we only provide it as an argument to the Download() call
* make timeout optional in AddSponsorProcess
* check return codes when creating windows threadpool
Diffstat (limited to 'src/zenhttp/httpclient.cpp')
| -rw-r--r-- | src/zenhttp/httpclient.cpp | 228 |
1 files changed, 158 insertions, 70 deletions
diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp index 262785a0a..81c9064f6 100644 --- a/src/zenhttp/httpclient.cpp +++ b/src/zenhttp/httpclient.cpp @@ -192,6 +192,7 @@ struct HttpClient::Impl : public RefCounted cpr::Response Result = CprSession->Download(Write); ZEN_TRACE("GET {}", Result); CprSession->SetHeaderCallback({}); + CprSession->SetWriteCallback({}); return Result; } inline cpr::Response Head() @@ -431,10 +432,76 @@ public: std::error_code Write(std::string_view DataString) { + const uint8_t* DataPtr = (const uint8_t*)DataString.data(); + size_t DataSize = DataString.size(); + if (DataSize >= CacheBufferSize) + { + std::error_code Ec = Flush(); + if (Ec) + { + return Ec; + } + return AppendData(DataPtr, DataSize); + } + size_t CopySize = Min(DataSize, CacheBufferSize - m_CacheBufferOffset); + memcpy(&m_CacheBuffer[m_CacheBufferOffset], DataPtr, CopySize); + m_CacheBufferOffset += CopySize; + DataSize -= CopySize; + if (m_CacheBufferOffset == CacheBufferSize) + { + AppendData(m_CacheBuffer, CacheBufferSize); + if (DataSize > 0) + { + ZEN_ASSERT(DataSize < CacheBufferSize); + memcpy(m_CacheBuffer, DataPtr + CopySize, DataSize); + } + m_CacheBufferOffset = DataSize; + } + else + { + ZEN_ASSERT(DataSize == 0); + } + return {}; + } + + IoBuffer DetachToIoBuffer() + { + if (std::error_code Ec = Flush(); Ec) + { + ThrowSystemError(Ec.value(), Ec.message()); + } + ZEN_ASSERT(m_FileHandle != nullptr); + void* FileHandle = m_FileHandle; + IoBuffer Buffer(IoBuffer::File, FileHandle, 0, m_WriteOffset, /*IsWholeFile*/ true); + Buffer.SetDeleteOnClose(true); + m_FileHandle = 0; + m_WriteOffset = 0; + return Buffer; + } + + uint64_t GetSize() const { return m_WriteOffset; } + void ResetWritePos(uint64_t WriteOffset) + { + Flush(); + m_WriteOffset = WriteOffset; + } + +private: + std::error_code Flush() + { + if (m_CacheBufferOffset == 0) + { + return {}; + } + std::error_code Res = AppendData(m_CacheBuffer, m_CacheBufferOffset); + m_CacheBufferOffset = 0; + return Res; + } + + std::error_code AppendData(const void* Data, uint64_t Size) + { ZEN_ASSERT(m_FileHandle != nullptr); const uint64_t MaxChunkSize = 2u * 1024 * 1024 * 1024; - const void* Data = DataString.data(); - std::size_t Size = DataString.size(); while (Size) { @@ -476,23 +543,11 @@ public: return {}; } - IoBuffer DetachToIoBuffer() - { - ZEN_ASSERT(m_FileHandle != nullptr); - void* FileHandle = m_FileHandle; - IoBuffer Buffer(IoBuffer::File, FileHandle, 0, m_WriteOffset, /*IsWholeFile*/ true); - Buffer.SetDeleteOnClose(true); - m_FileHandle = 0; - m_WriteOffset = 0; - return Buffer; - } - - uint64_t GetSize() const { return m_WriteOffset; } - void ResetWritePos(uint64_t WriteOffset) { m_WriteOffset = WriteOffset; } - -private: - void* m_FileHandle; - std::uint64_t m_WriteOffset; + void* m_FileHandle; + std::uint64_t m_WriteOffset; + static constexpr uint64_t CacheBufferSize = 512u * 1024u; + uint8_t m_CacheBuffer[CacheBufferSize]; + std::uint64_t m_CacheBufferOffset = 0; }; ////////////////////////////////////////////////////////////////////////// @@ -851,23 +906,28 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold std::unique_ptr<TempPayloadFile> PayloadFile; cpr::Response Response = DoWithRetry( [&]() { - auto DownloadCallback = [&](std::string data, intptr_t) { - if (!PayloadFile && (PayloadString.length() + data.length()) > (1024 * 1024)) + auto GetHeader = [&](std::string header) -> std::pair<std::string, std::string> { + size_t DelimiterPos = header.find(':'); + if (DelimiterPos != std::string::npos) { - PayloadFile = std::make_unique<TempPayloadFile>(); - std::error_code Ec = PayloadFile->Open(TempFolderPath); - if (Ec) - { - ZEN_WARN("Failed to create temp file in '{}' for HttpClient::Download. Reason: {}", - TempFolderPath.string(), - Ec.message()); - return false; - } - PayloadFile->Write(PayloadString); - PayloadString.clear(); + std::string Key = header.substr(0, DelimiterPos); + constexpr AsciiSet WhitespaceCharacters(" \v\f\t\r\n"); + Key = AsciiSet::TrimSuffixWith(Key, WhitespaceCharacters); + Key = AsciiSet::TrimPrefixWith(Key, WhitespaceCharacters); + + std::string Value = header.substr(DelimiterPos + 1); + Value = AsciiSet::TrimSuffixWith(Value, WhitespaceCharacters); + Value = AsciiSet::TrimPrefixWith(Value, WhitespaceCharacters); + + return std::make_pair(Key, Value); } + return std::make_pair(header, ""); + }; + + auto DownloadCallback = [&](std::string data, intptr_t) { if (PayloadFile) { + ZEN_ASSERT(PayloadString.empty()); std::error_code Ec = PayloadFile->Write(data); if (Ec) { @@ -886,9 +946,46 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold cpr::Response Response; { + std::vector<std::pair<std::string, std::string>> ReceivedHeaders; + auto HeaderCallback = [&](std::string header, intptr_t) { + std::pair<std::string, std::string> Header = GetHeader(header); + if (Header.first == "Content-Length"sv) + { + std::optional<size_t> ContentSize = ParseInt<size_t>(Header.second); + if (ContentSize.has_value()) + { + if (ContentSize.value() > 1024 * 1024) + { + PayloadFile = std::make_unique<TempPayloadFile>(); + std::error_code Ec = PayloadFile->Open(TempFolderPath); + if (Ec) + { + ZEN_WARN("Failed to create temp file in '{}' for HttpClient::Download. Reason: {}", + TempFolderPath.string(), + Ec.message()); + PayloadFile.reset(); + } + } + else + { + PayloadString.reserve(ContentSize.value()); + } + } + } + if (!Header.first.empty()) + { + ReceivedHeaders.emplace_back(std::move(Header)); + } + return 1; + }; + Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); - Response = Sess.Download(cpr::WriteCallback{DownloadCallback}); + Response = Sess.Download(cpr::WriteCallback{DownloadCallback}, cpr::HeaderCallback{HeaderCallback}); + for (const std::pair<std::string, std::string>& H : ReceivedHeaders) + { + Response.header.insert_or_assign(H.first, H.second); + } } if (m_ConnectionSettings.AllowResume) { @@ -899,7 +996,7 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold } if (auto It = Response.header.find("Accept-Ranges"); It != Response.header.end()) { - return It->second == "bytes"; + return It->second == "bytes"sv; } return false; }; @@ -923,53 +1020,44 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold std::vector<std::pair<std::string, std::string>> ReceivedHeaders; auto HeaderCallback = [&](std::string header, intptr_t) { - size_t DelimiterPos = header.find(':'); - if (DelimiterPos != std::string::npos) + std::pair<std::string, std::string> Header = GetHeader(header); + if (!Header.first.empty()) { - std::string Key = header.substr(0, DelimiterPos); - constexpr AsciiSet WhitespaceCharacters(" \v\f\t\r\n"); - Key = AsciiSet::TrimSuffixWith(Key, WhitespaceCharacters); - Key = AsciiSet::TrimPrefixWith(Key, WhitespaceCharacters); - - std::string Value = header.substr(DelimiterPos + 1); - Value = AsciiSet::TrimSuffixWith(Value, WhitespaceCharacters); - Value = AsciiSet::TrimPrefixWith(Value, WhitespaceCharacters); - - ReceivedHeaders.push_back({Key, Value}); + ReceivedHeaders.emplace_back(std::move(Header)); + } - if (Key == "Content-Range"sv) + if (Header.first == "Content-Range"sv) + { + if (Header.second.starts_with("bytes "sv)) { - if (Value.starts_with("bytes ")) + size_t RangeStartEnd = Header.second.find('-', 6); + if (RangeStartEnd != std::string::npos) { - size_t RangeStartEnd = Value.find('-', 6); - if (RangeStartEnd != std::string::npos) + const auto Start = ParseInt<uint64_t>(Header.second.substr(6, RangeStartEnd - 6)); + if (Start) { - const auto Start = ParseInt<uint64_t>(Value.substr(6, RangeStartEnd - 6)); - if (Start) + uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length(); + if (Start.value() == DownloadedSize) { - uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length(); - if (Start.value() == DownloadedSize) - { - return 1; - } - else if (Start.value() > DownloadedSize) - { - return 0; - } - if (PayloadFile) - { - PayloadFile->ResetWritePos(Start.value()); - } - else - { - PayloadString = PayloadString.substr(0, Start.value()); - } return 1; } + else if (Start.value() > DownloadedSize) + { + return 0; + } + if (PayloadFile) + { + PayloadFile->ResetWritePos(Start.value()); + } + else + { + PayloadString = PayloadString.substr(0, Start.value()); + } + return 1; } } - return 0; } + return 0; } return 1; }; |