diff options
| author | Dan Engelbrecht <[email protected]> | 2024-01-24 11:41:18 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-01-24 11:41:18 +0100 |
| commit | 0e63573fbe9973f6b922656a785817a711581b78 (patch) | |
| tree | 48e18f0b4aea958a536ba50f72f589a580c4b798 /src/zenserver/upstream/jupiter.cpp | |
| parent | oplog import/export improvements (#634) (diff) | |
| download | zen-0e63573fbe9973f6b922656a785817a711581b78.tar.xz zen-0e63573fbe9973f6b922656a785817a711581b78.zip | |
Add retry with optional resume logic to HttpClient::Download (#639)
- Improvement: Refactored Jupiter upstream to use HttpClient
- Improvement: Added retry and resume logic to HttpClient
- Improvement: Added authentication support to HttpClient
- Improvement: Clearer logging in GCV2 compact of FileCas/BlockStore
- Improvement: Size details in oplog import logging
Diffstat (limited to 'src/zenserver/upstream/jupiter.cpp')
| -rw-r--r-- | src/zenserver/upstream/jupiter.cpp | 1012 |
1 files changed, 106 insertions, 906 deletions
diff --git a/src/zenserver/upstream/jupiter.cpp b/src/zenserver/upstream/jupiter.cpp index e4d45e316..bf2538908 100644 --- a/src/zenserver/upstream/jupiter.cpp +++ b/src/zenserver/upstream/jupiter.cpp @@ -16,7 +16,6 @@ #include <zenutil/basicfile.h> ZEN_THIRD_PARTY_INCLUDES_START -#include <cpr/cpr.h> #include <fmt/format.h> ZEN_THIRD_PARTY_INCLUDES_END @@ -32,277 +31,70 @@ using namespace std::literals; namespace zen { namespace detail { - struct CloudCacheSessionState + CloudCacheResult ConvertResponse(const HttpClient::Response& Response, const std::string_view ErrorPrefix = ""sv) { - CloudCacheSessionState(CloudCacheClient& Client) : m_Client(Client) {} - - const CloudCacheAccessToken& GetAccessToken(bool RefreshToken) - { - if (RefreshToken) - { - m_AccessToken = m_Client.AcquireAccessToken(); - } - - return m_AccessToken; - } - - cpr::Session& GetSession() { return m_Session; } - - void Reset(std::chrono::milliseconds ConnectTimeout, std::chrono::milliseconds Timeout, bool AssumeHttp2) + if (Response.Error) { - m_Session.SetBody({}); - m_Session.SetHeader({}); - m_Session.SetConnectTimeout(ConnectTimeout); - m_Session.SetTimeout(Timeout); - if (AssumeHttp2) - { - m_Session.SetHttpVersion(cpr::HttpVersion{cpr::HttpVersionCode::VERSION_2_0_PRIOR_KNOWLEDGE}); - } - } - - private: - friend class zen::CloudCacheClient; - - CloudCacheClient& m_Client; - CloudCacheAccessToken m_AccessToken; - cpr::Session m_Session; - }; - - CloudCacheResult ConvertResponse(const cpr::Response& Response) - { - if (Response.error) - { - return {.ElapsedSeconds = Response.elapsed, - .ErrorCode = static_cast<int32_t>(Response.error.code), - .Reason = Response.error.message, + return {.ElapsedSeconds = Response.ElapsedSeconds, + .ErrorCode = Response.Error.value().ErrorCode, + .Reason = Response.ErrorMessage(ErrorPrefix), .Success = false}; } - if (!IsHttpSuccessCode(Response.status_code)) + if (!Response.IsSuccess()) { - return {.ElapsedSeconds = Response.elapsed, - .ErrorCode = static_cast<int32_t>(Response.status_code), - .Reason = Response.reason.empty() ? Response.text : Response.reason, + return {.ElapsedSeconds = Response.ElapsedSeconds, + .ErrorCode = static_cast<int32_t>(Response.StatusCode), + .Reason = Response.ErrorMessage(ErrorPrefix), .Success = false}; } - return {.Bytes = Response.downloaded_bytes, - .ElapsedSeconds = Response.elapsed, + return {.Response = Response.ResponsePayload, + .Bytes = Response.DownloadedBytes, + .ElapsedSeconds = Response.ElapsedSeconds, .ErrorCode = 0, - .Reason = Response.reason, .Success = true}; } - - cpr::Response GetWithStreaming(cpr::Session& Session, std::filesystem::path TempFolderPath, std::string_view Name, IoBuffer& OutBuffer) - { - if (TempFolderPath.empty()) - { - return Session.Get(); - } - - std::string PayloadString; - std::shared_ptr<BasicFile> PayloadFile; - - auto _ = MakeGuard([&]() { - if (PayloadFile) - { - PayloadFile.reset(); - std::filesystem::path TempPath = TempFolderPath / Name; - std::error_code Ec; - std::filesystem::remove(TempPath, Ec); - } - }); - - uint64_t Offset = 0; - Session.SetWriteCallback(cpr::WriteCallback{[&](std::string data, intptr_t) { - if (!PayloadFile && (PayloadString.length() + data.length()) > (1024 * 1024)) - { - std::filesystem::path TempPath = TempFolderPath / Name; - PayloadFile = std::make_shared<BasicFile>(); - PayloadFile->Open(TempPath, BasicFile::Mode::kTruncateDelete); - PayloadFile->Write(PayloadString.data(), PayloadString.size(), Offset); - Offset += PayloadString.size(); - PayloadString.clear(); - } - if (PayloadFile) - { - PayloadFile->Write(data.data(), data.size(), Offset); - Offset += data.size(); - } - else - { - PayloadString.append(data); - } - return true; - }}); - - cpr::Response Response = Session.Get(); - - if (!Response.error && IsHttpSuccessCode(Response.status_code)) - { - if (PayloadFile) - { - uint64_t PayloadSize = PayloadFile->FileSize(); - void* FileHandle = PayloadFile->Detach(); - PayloadFile.reset(); - OutBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, PayloadSize, /*IsWholeFile*/ true); - OutBuffer.SetDeleteOnClose(true); - } - else - { - OutBuffer = IoBufferBuilder::MakeCloneFromMemory(PayloadString.data(), PayloadString.size()); - } - return Response; - } - - Response.text.swap(PayloadString); - return Response; - } - - static std::optional<zen::HttpContentType> TryGetContentType(const cpr::Response& Response) - { - if (auto It = Response.header.find("Content-Type"); It != Response.header.end()) - { - zen::HttpContentType ContentType = zen::ParseContentType(It->second); - if (ContentType != zen::HttpContentType::kUnknownContentType) - { - return ContentType; - } - } - return {}; - } - - static IoBuffer MakeBufferFromResponseIfKnownFormat(const cpr::Response& Response) - { - std::optional<zen::HttpContentType> ContentType = TryGetContentType(Response); - if (ContentType) - { - IoBuffer Buffer = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()); - Buffer.SetContentType(ContentType.value()); - return Buffer; - } - return {}; - } - } // namespace detail CloudCacheSession::CloudCacheSession(CloudCacheClient* CacheClient) : m_Log(CacheClient->Logger()), m_CacheClient(CacheClient) { - m_SessionState = m_CacheClient->AllocSessionState(); } CloudCacheSession::~CloudCacheSession() { - m_CacheClient->FreeSessionState(m_SessionState); } CloudCacheResult CloudCacheSession::Authenticate() { - const bool RefreshToken = true; - const CloudCacheAccessToken& AccessToken = GetAccessToken(RefreshToken); - - return {.Success = AccessToken.IsValid()}; + bool OK = m_CacheClient->m_HttpClient.Authenticate(); + return {.Success = OK}; } CloudCacheResult -CloudCacheSession::GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType) +CloudCacheSession::GetRef(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + ZenContentType RefType, + std::filesystem::path TempFolderPath) { - const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"; - - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", ContentType}}); - Session.SetOption(cpr::Body{}); + ZEN_TRACE_CPU("JupiterClient::GetRef"); - cpr::Response Response = Session.Get(); - ZEN_DEBUG("GET {}", Response); + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), + TempFolderPath, + {HttpClient::Accept(RefType)}); - CloudCacheResult Result = detail::ConvertResponse(Response); - if (Result.Success) - { - Result.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()); - } - else - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::GetRef failed GET. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-Accept: '{}', " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - ContentType, - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; + return detail::ConvertResponse(Response, "CloudCacheSession::GetRef"sv); } CloudCacheResult CloudCacheSession::GetBlob(std::string_view Namespace, const IoHash& Key) { - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << Namespace << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/octet-stream"}}); - Session.SetOption(cpr::Body{}); - - cpr::Response Response = Session.Get(); - ZEN_DEBUG("GET {}", Response); + ZEN_TRACE_CPU("JupiterClient::GetBlob"); + HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), + {HttpClient::Accept(ZenContentType::kBinary)}); - CloudCacheResult Result = detail::ConvertResponse(Response); - if (Result.Success) - { - Result.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()); - } - else - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::GetBlob failed GET. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-Accept: '{}', " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - "application/octet-stream", - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; + return detail::ConvertResponse(Response); } CloudCacheResult @@ -310,58 +102,12 @@ CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& K { ZEN_TRACE_CPU("JupiterClient::GetCompressedBlob"); - ExtendableStringBuilder<256> Uri; - std::string KeyString = Key.ToHexString(); - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << KeyString; + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), + TempFolderPath, + {HttpClient::Accept(ZenContentType::kCompressedBinary)}); - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-comp"}}); - Session.SetOption(cpr::Body{}); - - IoBuffer Payload; - cpr::Response Response = detail::GetWithStreaming(Session, TempFolderPath, KeyString, Payload); - ZEN_DEBUG("GET {}", Response); - - CloudCacheResult Result = detail::ConvertResponse(Response); - if (Result.Success) - { - Result.Response = std::move(Payload); - } - else - { - std::optional<zen::HttpContentType> ContentType = detail::TryGetContentType(Response); - if (ContentType.has_value()) - { - Result.Response = std::move(Payload); - Result.Response.SetContentType(ContentType.value()); - } - ZEN_WARN( - "CloudCacheSession::GetCompressedBlob failed GET. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-Accept: '{}', " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - "application/x-ue-comp", - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; + return detail::ConvertResponse(Response); } CloudCacheResult @@ -373,59 +119,14 @@ CloudCacheSession::GetInlineBlob(std::string_view Namespace, { ZEN_TRACE_CPU("JupiterClient::GetInlineBlob"); - ExtendableStringBuilder<256> Uri; - std::string KeyString = Key.ToHexString(); - Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << KeyString; - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-jupiter-inline"}}); - Session.SetOption(cpr::Body{}); - - IoBuffer Payload; - cpr::Response Response = detail::GetWithStreaming(Session, TempFolderPath, KeyString, Payload); - ZEN_DEBUG("GET {}", Response); + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), + TempFolderPath, + {{"Accept", "application/x-jupiter-inline"}}); CloudCacheResult Result = detail::ConvertResponse(Response); - if (Result.Success) - { - Result.Response = std::move(Payload); - } - else - { - std::optional<zen::HttpContentType> ContentType = detail::TryGetContentType(Response); - if (ContentType.has_value()) - { - Result.Response = std::move(Payload); - Result.Response.SetContentType(ContentType.value()); - } - ZEN_WARN( - "CloudCacheSession::GetInlineBlob failed GET. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-Accept: '{}', " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - "application/x-jupiter-inline", - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - if (auto It = Response.header.find("X-Jupiter-InlinePayloadHash"); It != Response.header.end()) + if (auto It = Response.Header->find("X-Jupiter-InlinePayloadHash"); It != Response.Header->end()) { const std::string& PayloadHashHeader = It->second; if (PayloadHashHeader.length() == IoHash::StringLength) @@ -442,52 +143,10 @@ CloudCacheSession::GetObject(std::string_view Namespace, const IoHash& Key) { ZEN_TRACE_CPU("JupiterClient::GetObject"); - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}}); - Session.SetOption(cpr::Body{}); - - cpr::Response Response = Session.Get(); - ZEN_DEBUG("GET {}", Response); - - CloudCacheResult Result = detail::ConvertResponse(Response); - if (Result.Success) - { - Result.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()); - } - else - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::GetObject failed GET. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-Accept: '{}', " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - "application/x-ue-cb", - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } + HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), + {HttpClient::Accept(ZenContentType::kCbObject)}); - return Result; + return detail::ConvertResponse(Response); } PutRefResult @@ -495,29 +154,18 @@ CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId, { ZEN_TRACE_CPU("JupiterClient::PutRef"); - IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size()); - - const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"; + Ref.SetContentType(RefType); - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption( - cpr::Header{{"Authorization", AccessToken.Value}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", ContentType}}); - Session.SetBody(cpr::Body{(const char*)Ref.Data(), Ref.Size()}); + IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size()); - cpr::Response Response = Session.Put(); - ZEN_DEBUG("PUT {}", Response); + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Put(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), Ref); PutRefResult Result = {detail::ConvertResponse(Response)}; if (Result.Success) { std::string JsonError; - json11::Json Json = json11::Json::parse(Response.text, JsonError); + json11::Json Json = json11::Json::parse(Response.ToText(), JsonError); if (JsonError.empty()) { json11::Json::array Needs = Json["needs"].array_items(); @@ -528,37 +176,6 @@ CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId, } Result.RawHash = Hash; } - else - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::PutRef failed PUT. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-X-Jupiter-IoHash: '{}', " - "Header-ContentType: '{}', " - "ContentSize: {}, " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - Hash.ToHexString(), - ContentType, - NiceBytes(Ref.Size()), - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; } @@ -567,28 +184,16 @@ CloudCacheSession::FinalizeRef(std::string_view Namespace, std::string_view Buck { ZEN_TRACE_CPU("JupiterClient::FinalizeRef"); - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString() << "/finalize/" - << RefHash.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, - {"X-Jupiter-IoHash", RefHash.ToHexString()}, - {"Content-Type", "application/x-ue-cb"}}); - Session.SetBody(cpr::Body{}); - - cpr::Response Response = Session.Post(); - ZEN_DEBUG("POST {}", Response); + HttpClient::Response Response = m_CacheClient->m_HttpClient.Post( + fmt::format("/api/v1/refs/{}/{}/{}/finalize/{}", Namespace, BucketId, Key.ToHexString(), RefHash.ToHexString()), + {{"X-Jupiter-IoHash", RefHash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}}); FinalizeRefResult Result = {detail::ConvertResponse(Response)}; if (Result.Success) { std::string JsonError; - json11::Json Json = json11::Json::parse(Response.text, JsonError); + json11::Json Json = json11::Json::parse(std::string(Response.AsText()), JsonError); if (JsonError.empty()) { json11::Json::array Needs = Json["needs"].array_items(); @@ -598,37 +203,6 @@ CloudCacheSession::FinalizeRef(std::string_view Namespace, std::string_view Buck } } } - else - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::FinalizeRef failed PUT. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-X-Jupiter-IoHash: '{}', " - "Header-ContentType: '{}', " - "ContentSize: {}, " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - RefHash.ToHexString(), - "application/x-ue-cb", - NiceBytes(0), - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; } @@ -637,49 +211,9 @@ CloudCacheSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuff { ZEN_TRACE_CPU("JupiterClient::PutBlob"); - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << Namespace << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); + HttpClient::Response Response = m_CacheClient->m_HttpClient.Put(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), Blob); - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/octet-stream"}}); - Session.SetBody(cpr::Body{(const char*)Blob.Data(), Blob.Size()}); - - cpr::Response Response = Session.Put(); - ZEN_DEBUG("PUT {}", Response); - - CloudCacheResult Result = detail::ConvertResponse(Response); - if (!Result.Success) - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::PutBlob failed PUT. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-ContentType: '{}', " - "ContentSize: {}, " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - "application/octet-stream", - NiceBytes(Blob.Size()), - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; + return detail::ConvertResponse(Response); } CloudCacheResult @@ -687,66 +221,11 @@ CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& K { ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob"); - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-comp"}}); - - uint64_t Offset = 0; - if (Blob.IsWholeFile()) - { - auto ReadCallback = [&Blob, &Offset](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, Blob.GetSize() - Offset); - IoBuffer PayloadRange = IoBuffer(Blob, Offset, size); - MutableMemoryView Data(buffer, size); - Data.CopyFrom(PayloadRange.GetView()); - Offset += size; - return true; - }; - Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Blob.GetSize()), ReadCallback)); - } - else - { - Session.SetBody(cpr::Body{(const char*)Blob.Data(), Blob.Size()}); - } - - cpr::Response Response = Session.Put(); - ZEN_DEBUG("PUT {}", Response); + Blob.SetContentType(ZenContentType::kCompressedBinary); + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), Blob); - CloudCacheResult Result = detail::ConvertResponse(Response); - if (!Result.Success) - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::PutCompressedBlob failed PUT. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-ContentType: '{}', " - "ContentSize: {}, " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - "application/x-ue-comp", - NiceBytes(Blob.Size()), - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; + return detail::ConvertResponse(Response); } CloudCacheResult @@ -754,58 +233,12 @@ CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& K { ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob"); - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-comp"}}); - uint64_t SizeLeft = Payload.GetSize(); - CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); - auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, SizeLeft); - MutableMemoryView Data(buffer, size); - Payload.CopyTo(Data, BufferIt); - SizeLeft -= size; - return true; - }; - Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(SizeLeft), ReadCallback)); - - cpr::Response Response = Session.Put(); - ZEN_DEBUG("PUT {}", Response); + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), + Payload, + ZenContentType::kCompressedBinary); - CloudCacheResult Result = detail::ConvertResponse(Response); - if (!Result.Success) - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::PutCompressedBlob failed PUT. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-ContentType: '{}', " - "ContentSize: {}, " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - "application/x-ue-comp", - NiceBytes(Payload.GetSize()), - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; + return detail::ConvertResponse(Response); } CloudCacheResult @@ -813,49 +246,11 @@ CloudCacheSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBu { ZEN_TRACE_CPU("JupiterClient::PutObject"); - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); + Object.SetContentType(ZenContentType::kCbObject); + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), Object); - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}}); - Session.SetBody(cpr::Body{(const char*)Object.Data(), Object.Size()}); - - cpr::Response Response = Session.Put(); - ZEN_DEBUG("PUT {}", Response); - - CloudCacheResult Result = detail::ConvertResponse(Response); - if (!Result.Success) - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::PutObject failed PUT. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-ContentType: '{}', " - "ContentSize: {}, " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - "application/x-ue-cb", - NiceBytes(Object.GetSize()), - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; + return detail::ConvertResponse(Response); } CloudCacheResult @@ -863,45 +258,10 @@ CloudCacheSession::RefExists(std::string_view Namespace, std::string_view Bucket { ZEN_TRACE_CPU("JupiterClient::RefExists"); - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString())); - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}}); - Session.SetOption(cpr::Body{}); - - cpr::Response Response = Session.Head(); - ZEN_DEBUG("HEAD {}", Response); - - CloudCacheResult Result = detail::ConvertResponse(Response); - if (!Result.Success) - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::RefExists failed PUT. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; + return detail::ConvertResponse(Response); } GetObjectReferencesResult @@ -909,57 +269,20 @@ CloudCacheSession::GetObjectReferences(std::string_view Namespace, const IoHash& { ZEN_TRACE_CPU("JupiterClient::GetObjectReferences"); - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString() << "/references"; - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}}); - Session.SetOption(cpr::Body{}); - - cpr::Response Response = Session.Get(); - ZEN_DEBUG("GET {}", Response); + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/references", Namespace, Key.ToHexString()), + {HttpClient::Accept(ZenContentType::kCbObject)}); GetObjectReferencesResult Result = {detail::ConvertResponse(Response)}; if (Result.Success) { - IoBuffer Buffer = IoBuffer(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size()); - const CbObject ReferencesResponse = LoadCompactBinaryObject(Buffer); + const CbObject ReferencesResponse = Response.AsObject(); for (auto& Item : ReferencesResponse["references"sv]) { Result.References.insert(Item.AsHash()); } } - else - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::GetObjectReferences failed PUT. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-Accept: '{}', " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - "application/x-ue-cb", - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; } @@ -1002,73 +325,23 @@ CloudCacheSession::ObjectExists(std::string_view Namespace, const std::set<IoHas std::vector<IoHash> CloudCacheSession::Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes) { - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl(); - Uri << "/api/v1/s/" << Namespace; + // ExtendableStringBuilder<256> Uri; + // Uri << m_CacheClient->ServiceUrl(); + // Uri << "/api/v1/s/" << Namespace; - ZEN_UNUSED(BucketId, ChunkHashes); + ZEN_UNUSED(Namespace, BucketId, ChunkHashes); return {}; } -cpr::Session& -CloudCacheSession::GetSession() -{ - return m_SessionState->GetSession(); -} - -CloudCacheAccessToken -CloudCacheSession::GetAccessToken(bool RefreshToken) -{ - return m_SessionState->GetAccessToken(RefreshToken); -} - CloudCacheResult CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key) { ZEN_TRACE_CPU("JupiterClient::CacheTypeExists"); - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << Namespace << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}}); - Session.SetOption(cpr::Body{}); - - cpr::Response Response = Session.Head(); - ZEN_DEBUG("HEAD {}", Response); + HttpClient::Response Response = m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/{}/{}/{}", TypeId, Namespace, Key.ToHexString())); - CloudCacheResult Result = detail::ConvertResponse(Response); - if (!Result.Success) - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::CacheTypeExists failed GET. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-Accept: '{}', " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - "application/x-ue-cb", - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; + return detail::ConvertResponse(Response); } CloudCacheExistsResult @@ -1083,58 +356,23 @@ CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view Body << (Body.Size() != 1 ? ",\"" : "\"") << Key.ToHexString() << "\""; } Body << "]"; + IoBuffer Payload = IoBuffer(IoBuffer::Wrap, Body.Data(), Body.Size()); + Payload.SetContentType(ZenContentType::kJSON); - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << Namespace << "/exist"; - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); + HttpClient::Response Response = m_CacheClient->m_HttpClient.Post(fmt::format("/api/v1/{}/{}/exist", TypeId, Namespace), + Payload, + {HttpClient::Accept(ZenContentType::kCbObject)}); - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption( - cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}, {"Content-Type", "application/json"}}); - Session.SetOption(cpr::Body(Body.ToString())); - - cpr::Response Response = Session.Post(); - ZEN_DEBUG("POST {}", Response); CloudCacheExistsResult Result = {detail::ConvertResponse(Response)}; if (Result.Success) { - IoBuffer Buffer = IoBuffer(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size()); - const CbObject ExistsResponse = LoadCompactBinaryObject(Buffer); + const CbObject ExistsResponse = Response.AsObject(); for (auto& Item : ExistsResponse["needs"sv]) { Result.Needs.insert(Item.AsHash()); } } - else - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::CacheTypeExists failed GET. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-Accept: '{}', " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - "application/x-ue-cb", - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; } @@ -1233,77 +471,39 @@ CloudCacheTokenProvider::CreateFromCallback(std::function<CloudCacheAccessToken( return std::make_unique<CallbackTokenProvider>(std::move(Callback)); } +static std::optional<std::function<HttpClientAccessToken()>> +GetHttpClientAccessProvider(CloudCacheTokenProvider* TokenProvider) +{ + if (TokenProvider == nullptr) + { + return {}; + } + auto ProviderFunc = [TokenProvider]() -> HttpClientAccessToken { + CloudCacheAccessToken Token = TokenProvider->AcquireAccessToken(); + return HttpClientAccessToken{.Value = Token.Value, .ExpireTime = Token.ExpireTime}; + }; + return ProviderFunc; +} + CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options, std::unique_ptr<CloudCacheTokenProvider> TokenProvider) : m_Log(zen::logging::Get("jupiter")) -, m_ServiceUrl(Options.ServiceUrl) , m_DefaultDdcNamespace(Options.DdcNamespace) , m_DefaultBlobStoreNamespace(Options.BlobStoreNamespace) , m_ComputeCluster(Options.ComputeCluster) -, m_ConnectTimeout(Options.ConnectTimeout) -, m_Timeout(Options.Timeout) , m_TokenProvider(std::move(TokenProvider)) -, m_AssumeHttp2(Options.AssumeHttp2) +, m_HttpClient(Options.ServiceUrl, + HttpClientSettings{.ConnectTimeout = Options.ConnectTimeout, + .Timeout = Options.Timeout, + .AccessTokenProvider = GetHttpClientAccessProvider(m_TokenProvider.get()), + .AssumeHttp2 = Options.AssumeHttp2, + .AllowResume = Options.AllowResume, + .RetryCount = Options.RetryCount}) { ZEN_ASSERT(m_TokenProvider.get() != nullptr); } CloudCacheClient::~CloudCacheClient() { - RwLock::ExclusiveLockScope _(m_SessionStateLock); - - for (auto State : m_SessionStateCache) - { - delete State; - } -} - -CloudCacheAccessToken -CloudCacheClient::AcquireAccessToken() -{ - ZEN_TRACE_CPU("JupiterClient::AcquireAccessToken"); - - return m_TokenProvider->AcquireAccessToken(); -} - -detail::CloudCacheSessionState* -CloudCacheClient::AllocSessionState() -{ - detail::CloudCacheSessionState* State = nullptr; - - bool IsTokenValid = false; - - { - RwLock::ExclusiveLockScope _(m_SessionStateLock); - - if (m_SessionStateCache.empty() == false) - { - State = m_SessionStateCache.front(); - IsTokenValid = State->m_AccessToken.IsValid(); - - m_SessionStateCache.pop_front(); - } - } - - if (State == nullptr) - { - State = new detail::CloudCacheSessionState(*this); - } - - State->Reset(m_ConnectTimeout, m_Timeout, m_AssumeHttp2); - - if (IsTokenValid == false) - { - State->m_AccessToken = m_TokenProvider->AcquireAccessToken(); - } - - return State; -} - -void -CloudCacheClient::FreeSessionState(detail::CloudCacheSessionState* State) -{ - RwLock::ExclusiveLockScope _(m_SessionStateLock); - m_SessionStateCache.push_front(State); } } // namespace zen |