aboutsummaryrefslogtreecommitdiff
path: root/src/zenhttp/httpclient.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-04-20 13:22:05 +0200
committerGitHub Enterprise <[email protected]>2024-04-20 13:22:05 +0200
commitaa0b0d3cbfc6c4561591df856396703f7177292e (patch)
tree6ff9a4e94559ba62d8ee07076d56dedc7d2e9115 /src/zenhttp/httpclient.cpp
parent5.4.5-pre0 (diff)
downloadzen-aa0b0d3cbfc6c4561591df856396703f7177292e.tar.xz
zen-aa0b0d3cbfc6c4561591df856396703f7177292e.zip
import oplog improvements (#54)
* report down/up transfer speed during progress * add disk buffering in http client * offload block decoding and chunk writing form network worker pool threads add block hash verification for blocks recevied at oplog import * separate download-latch from write-latch to get more accurate download speed * check headers when downloading with http client to go directly to file writing for large payloads * we must clear write callback even if we only provide it as an argument to the Download() call * make timeout optional in AddSponsorProcess * check return codes when creating windows threadpool
Diffstat (limited to 'src/zenhttp/httpclient.cpp')
-rw-r--r--src/zenhttp/httpclient.cpp228
1 files changed, 158 insertions, 70 deletions
diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp
index 262785a0a..81c9064f6 100644
--- a/src/zenhttp/httpclient.cpp
+++ b/src/zenhttp/httpclient.cpp
@@ -192,6 +192,7 @@ struct HttpClient::Impl : public RefCounted
cpr::Response Result = CprSession->Download(Write);
ZEN_TRACE("GET {}", Result);
CprSession->SetHeaderCallback({});
+ CprSession->SetWriteCallback({});
return Result;
}
inline cpr::Response Head()
@@ -431,10 +432,76 @@ public:
std::error_code Write(std::string_view DataString)
{
+ const uint8_t* DataPtr = (const uint8_t*)DataString.data();
+ size_t DataSize = DataString.size();
+ if (DataSize >= CacheBufferSize)
+ {
+ std::error_code Ec = Flush();
+ if (Ec)
+ {
+ return Ec;
+ }
+ return AppendData(DataPtr, DataSize);
+ }
+ size_t CopySize = Min(DataSize, CacheBufferSize - m_CacheBufferOffset);
+ memcpy(&m_CacheBuffer[m_CacheBufferOffset], DataPtr, CopySize);
+ m_CacheBufferOffset += CopySize;
+ DataSize -= CopySize;
+ if (m_CacheBufferOffset == CacheBufferSize)
+ {
+ AppendData(m_CacheBuffer, CacheBufferSize);
+ if (DataSize > 0)
+ {
+ ZEN_ASSERT(DataSize < CacheBufferSize);
+ memcpy(m_CacheBuffer, DataPtr + CopySize, DataSize);
+ }
+ m_CacheBufferOffset = DataSize;
+ }
+ else
+ {
+ ZEN_ASSERT(DataSize == 0);
+ }
+ return {};
+ }
+
+ IoBuffer DetachToIoBuffer()
+ {
+ if (std::error_code Ec = Flush(); Ec)
+ {
+ ThrowSystemError(Ec.value(), Ec.message());
+ }
+ ZEN_ASSERT(m_FileHandle != nullptr);
+ void* FileHandle = m_FileHandle;
+ IoBuffer Buffer(IoBuffer::File, FileHandle, 0, m_WriteOffset, /*IsWholeFile*/ true);
+ Buffer.SetDeleteOnClose(true);
+ m_FileHandle = 0;
+ m_WriteOffset = 0;
+ return Buffer;
+ }
+
+ uint64_t GetSize() const { return m_WriteOffset; }
+ void ResetWritePos(uint64_t WriteOffset)
+ {
+ Flush();
+ m_WriteOffset = WriteOffset;
+ }
+
+private:
+ std::error_code Flush()
+ {
+ if (m_CacheBufferOffset == 0)
+ {
+ return {};
+ }
+ std::error_code Res = AppendData(m_CacheBuffer, m_CacheBufferOffset);
+ m_CacheBufferOffset = 0;
+ return Res;
+ }
+
+ std::error_code AppendData(const void* Data, uint64_t Size)
+ {
ZEN_ASSERT(m_FileHandle != nullptr);
const uint64_t MaxChunkSize = 2u * 1024 * 1024 * 1024;
- const void* Data = DataString.data();
- std::size_t Size = DataString.size();
while (Size)
{
@@ -476,23 +543,11 @@ public:
return {};
}
- IoBuffer DetachToIoBuffer()
- {
- ZEN_ASSERT(m_FileHandle != nullptr);
- void* FileHandle = m_FileHandle;
- IoBuffer Buffer(IoBuffer::File, FileHandle, 0, m_WriteOffset, /*IsWholeFile*/ true);
- Buffer.SetDeleteOnClose(true);
- m_FileHandle = 0;
- m_WriteOffset = 0;
- return Buffer;
- }
-
- uint64_t GetSize() const { return m_WriteOffset; }
- void ResetWritePos(uint64_t WriteOffset) { m_WriteOffset = WriteOffset; }
-
-private:
- void* m_FileHandle;
- std::uint64_t m_WriteOffset;
+ void* m_FileHandle;
+ std::uint64_t m_WriteOffset;
+ static constexpr uint64_t CacheBufferSize = 512u * 1024u;
+ uint8_t m_CacheBuffer[CacheBufferSize];
+ std::uint64_t m_CacheBufferOffset = 0;
};
//////////////////////////////////////////////////////////////////////////
@@ -851,23 +906,28 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold
std::unique_ptr<TempPayloadFile> PayloadFile;
cpr::Response Response = DoWithRetry(
[&]() {
- auto DownloadCallback = [&](std::string data, intptr_t) {
- if (!PayloadFile && (PayloadString.length() + data.length()) > (1024 * 1024))
+ auto GetHeader = [&](std::string header) -> std::pair<std::string, std::string> {
+ size_t DelimiterPos = header.find(':');
+ if (DelimiterPos != std::string::npos)
{
- PayloadFile = std::make_unique<TempPayloadFile>();
- std::error_code Ec = PayloadFile->Open(TempFolderPath);
- if (Ec)
- {
- ZEN_WARN("Failed to create temp file in '{}' for HttpClient::Download. Reason: {}",
- TempFolderPath.string(),
- Ec.message());
- return false;
- }
- PayloadFile->Write(PayloadString);
- PayloadString.clear();
+ std::string Key = header.substr(0, DelimiterPos);
+ constexpr AsciiSet WhitespaceCharacters(" \v\f\t\r\n");
+ Key = AsciiSet::TrimSuffixWith(Key, WhitespaceCharacters);
+ Key = AsciiSet::TrimPrefixWith(Key, WhitespaceCharacters);
+
+ std::string Value = header.substr(DelimiterPos + 1);
+ Value = AsciiSet::TrimSuffixWith(Value, WhitespaceCharacters);
+ Value = AsciiSet::TrimPrefixWith(Value, WhitespaceCharacters);
+
+ return std::make_pair(Key, Value);
}
+ return std::make_pair(header, "");
+ };
+
+ auto DownloadCallback = [&](std::string data, intptr_t) {
if (PayloadFile)
{
+ ZEN_ASSERT(PayloadString.empty());
std::error_code Ec = PayloadFile->Write(data);
if (Ec)
{
@@ -886,9 +946,46 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold
cpr::Response Response;
{
+ std::vector<std::pair<std::string, std::string>> ReceivedHeaders;
+ auto HeaderCallback = [&](std::string header, intptr_t) {
+ std::pair<std::string, std::string> Header = GetHeader(header);
+ if (Header.first == "Content-Length"sv)
+ {
+ std::optional<size_t> ContentSize = ParseInt<size_t>(Header.second);
+ if (ContentSize.has_value())
+ {
+ if (ContentSize.value() > 1024 * 1024)
+ {
+ PayloadFile = std::make_unique<TempPayloadFile>();
+ std::error_code Ec = PayloadFile->Open(TempFolderPath);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to create temp file in '{}' for HttpClient::Download. Reason: {}",
+ TempFolderPath.string(),
+ Ec.message());
+ PayloadFile.reset();
+ }
+ }
+ else
+ {
+ PayloadString.reserve(ContentSize.value());
+ }
+ }
+ }
+ if (!Header.first.empty())
+ {
+ ReceivedHeaders.emplace_back(std::move(Header));
+ }
+ return 1;
+ };
+
Impl::Session Sess =
m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
- Response = Sess.Download(cpr::WriteCallback{DownloadCallback});
+ Response = Sess.Download(cpr::WriteCallback{DownloadCallback}, cpr::HeaderCallback{HeaderCallback});
+ for (const std::pair<std::string, std::string>& H : ReceivedHeaders)
+ {
+ Response.header.insert_or_assign(H.first, H.second);
+ }
}
if (m_ConnectionSettings.AllowResume)
{
@@ -899,7 +996,7 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold
}
if (auto It = Response.header.find("Accept-Ranges"); It != Response.header.end())
{
- return It->second == "bytes";
+ return It->second == "bytes"sv;
}
return false;
};
@@ -923,53 +1020,44 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold
std::vector<std::pair<std::string, std::string>> ReceivedHeaders;
auto HeaderCallback = [&](std::string header, intptr_t) {
- size_t DelimiterPos = header.find(':');
- if (DelimiterPos != std::string::npos)
+ std::pair<std::string, std::string> Header = GetHeader(header);
+ if (!Header.first.empty())
{
- std::string Key = header.substr(0, DelimiterPos);
- constexpr AsciiSet WhitespaceCharacters(" \v\f\t\r\n");
- Key = AsciiSet::TrimSuffixWith(Key, WhitespaceCharacters);
- Key = AsciiSet::TrimPrefixWith(Key, WhitespaceCharacters);
-
- std::string Value = header.substr(DelimiterPos + 1);
- Value = AsciiSet::TrimSuffixWith(Value, WhitespaceCharacters);
- Value = AsciiSet::TrimPrefixWith(Value, WhitespaceCharacters);
-
- ReceivedHeaders.push_back({Key, Value});
+ ReceivedHeaders.emplace_back(std::move(Header));
+ }
- if (Key == "Content-Range"sv)
+ if (Header.first == "Content-Range"sv)
+ {
+ if (Header.second.starts_with("bytes "sv))
{
- if (Value.starts_with("bytes "))
+ size_t RangeStartEnd = Header.second.find('-', 6);
+ if (RangeStartEnd != std::string::npos)
{
- size_t RangeStartEnd = Value.find('-', 6);
- if (RangeStartEnd != std::string::npos)
+ const auto Start = ParseInt<uint64_t>(Header.second.substr(6, RangeStartEnd - 6));
+ if (Start)
{
- const auto Start = ParseInt<uint64_t>(Value.substr(6, RangeStartEnd - 6));
- if (Start)
+ uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length();
+ if (Start.value() == DownloadedSize)
{
- uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length();
- if (Start.value() == DownloadedSize)
- {
- return 1;
- }
- else if (Start.value() > DownloadedSize)
- {
- return 0;
- }
- if (PayloadFile)
- {
- PayloadFile->ResetWritePos(Start.value());
- }
- else
- {
- PayloadString = PayloadString.substr(0, Start.value());
- }
return 1;
}
+ else if (Start.value() > DownloadedSize)
+ {
+ return 0;
+ }
+ if (PayloadFile)
+ {
+ PayloadFile->ResetWritePos(Start.value());
+ }
+ else
+ {
+ PayloadString = PayloadString.substr(0, Start.value());
+ }
+ return 1;
}
}
- return 0;
}
+ return 0;
}
return 1;
};