aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/upstream/jupiter.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-01-24 11:41:18 +0100
committerGitHub <[email protected]>2024-01-24 11:41:18 +0100
commit0e63573fbe9973f6b922656a785817a711581b78 (patch)
tree48e18f0b4aea958a536ba50f72f589a580c4b798 /src/zenserver/upstream/jupiter.cpp
parentoplog import/export improvements (#634) (diff)
downloadzen-0e63573fbe9973f6b922656a785817a711581b78.tar.xz
zen-0e63573fbe9973f6b922656a785817a711581b78.zip
Add retry with optional resume logic to HttpClient::Download (#639)
- Improvement: Refactored Jupiter upstream to use HttpClient - Improvement: Added retry and resume logic to HttpClient - Improvement: Added authentication support to HttpClient - Improvement: Clearer logging in GCV2 compact of FileCas/BlockStore - Improvement: Size details in oplog import logging
Diffstat (limited to 'src/zenserver/upstream/jupiter.cpp')
-rw-r--r--src/zenserver/upstream/jupiter.cpp1012
1 files changed, 106 insertions, 906 deletions
diff --git a/src/zenserver/upstream/jupiter.cpp b/src/zenserver/upstream/jupiter.cpp
index e4d45e316..bf2538908 100644
--- a/src/zenserver/upstream/jupiter.cpp
+++ b/src/zenserver/upstream/jupiter.cpp
@@ -16,7 +16,6 @@
#include <zenutil/basicfile.h>
ZEN_THIRD_PARTY_INCLUDES_START
-#include <cpr/cpr.h>
#include <fmt/format.h>
ZEN_THIRD_PARTY_INCLUDES_END
@@ -32,277 +31,70 @@ using namespace std::literals;
namespace zen {
namespace detail {
- struct CloudCacheSessionState
+ CloudCacheResult ConvertResponse(const HttpClient::Response& Response, const std::string_view ErrorPrefix = ""sv)
{
- CloudCacheSessionState(CloudCacheClient& Client) : m_Client(Client) {}
-
- const CloudCacheAccessToken& GetAccessToken(bool RefreshToken)
- {
- if (RefreshToken)
- {
- m_AccessToken = m_Client.AcquireAccessToken();
- }
-
- return m_AccessToken;
- }
-
- cpr::Session& GetSession() { return m_Session; }
-
- void Reset(std::chrono::milliseconds ConnectTimeout, std::chrono::milliseconds Timeout, bool AssumeHttp2)
+ if (Response.Error)
{
- m_Session.SetBody({});
- m_Session.SetHeader({});
- m_Session.SetConnectTimeout(ConnectTimeout);
- m_Session.SetTimeout(Timeout);
- if (AssumeHttp2)
- {
- m_Session.SetHttpVersion(cpr::HttpVersion{cpr::HttpVersionCode::VERSION_2_0_PRIOR_KNOWLEDGE});
- }
- }
-
- private:
- friend class zen::CloudCacheClient;
-
- CloudCacheClient& m_Client;
- CloudCacheAccessToken m_AccessToken;
- cpr::Session m_Session;
- };
-
- CloudCacheResult ConvertResponse(const cpr::Response& Response)
- {
- if (Response.error)
- {
- return {.ElapsedSeconds = Response.elapsed,
- .ErrorCode = static_cast<int32_t>(Response.error.code),
- .Reason = Response.error.message,
+ return {.ElapsedSeconds = Response.ElapsedSeconds,
+ .ErrorCode = Response.Error.value().ErrorCode,
+ .Reason = Response.ErrorMessage(ErrorPrefix),
.Success = false};
}
- if (!IsHttpSuccessCode(Response.status_code))
+ if (!Response.IsSuccess())
{
- return {.ElapsedSeconds = Response.elapsed,
- .ErrorCode = static_cast<int32_t>(Response.status_code),
- .Reason = Response.reason.empty() ? Response.text : Response.reason,
+ return {.ElapsedSeconds = Response.ElapsedSeconds,
+ .ErrorCode = static_cast<int32_t>(Response.StatusCode),
+ .Reason = Response.ErrorMessage(ErrorPrefix),
.Success = false};
}
- return {.Bytes = Response.downloaded_bytes,
- .ElapsedSeconds = Response.elapsed,
+ return {.Response = Response.ResponsePayload,
+ .Bytes = Response.DownloadedBytes,
+ .ElapsedSeconds = Response.ElapsedSeconds,
.ErrorCode = 0,
- .Reason = Response.reason,
.Success = true};
}
-
- cpr::Response GetWithStreaming(cpr::Session& Session, std::filesystem::path TempFolderPath, std::string_view Name, IoBuffer& OutBuffer)
- {
- if (TempFolderPath.empty())
- {
- return Session.Get();
- }
-
- std::string PayloadString;
- std::shared_ptr<BasicFile> PayloadFile;
-
- auto _ = MakeGuard([&]() {
- if (PayloadFile)
- {
- PayloadFile.reset();
- std::filesystem::path TempPath = TempFolderPath / Name;
- std::error_code Ec;
- std::filesystem::remove(TempPath, Ec);
- }
- });
-
- uint64_t Offset = 0;
- Session.SetWriteCallback(cpr::WriteCallback{[&](std::string data, intptr_t) {
- if (!PayloadFile && (PayloadString.length() + data.length()) > (1024 * 1024))
- {
- std::filesystem::path TempPath = TempFolderPath / Name;
- PayloadFile = std::make_shared<BasicFile>();
- PayloadFile->Open(TempPath, BasicFile::Mode::kTruncateDelete);
- PayloadFile->Write(PayloadString.data(), PayloadString.size(), Offset);
- Offset += PayloadString.size();
- PayloadString.clear();
- }
- if (PayloadFile)
- {
- PayloadFile->Write(data.data(), data.size(), Offset);
- Offset += data.size();
- }
- else
- {
- PayloadString.append(data);
- }
- return true;
- }});
-
- cpr::Response Response = Session.Get();
-
- if (!Response.error && IsHttpSuccessCode(Response.status_code))
- {
- if (PayloadFile)
- {
- uint64_t PayloadSize = PayloadFile->FileSize();
- void* FileHandle = PayloadFile->Detach();
- PayloadFile.reset();
- OutBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, PayloadSize, /*IsWholeFile*/ true);
- OutBuffer.SetDeleteOnClose(true);
- }
- else
- {
- OutBuffer = IoBufferBuilder::MakeCloneFromMemory(PayloadString.data(), PayloadString.size());
- }
- return Response;
- }
-
- Response.text.swap(PayloadString);
- return Response;
- }
-
- static std::optional<zen::HttpContentType> TryGetContentType(const cpr::Response& Response)
- {
- if (auto It = Response.header.find("Content-Type"); It != Response.header.end())
- {
- zen::HttpContentType ContentType = zen::ParseContentType(It->second);
- if (ContentType != zen::HttpContentType::kUnknownContentType)
- {
- return ContentType;
- }
- }
- return {};
- }
-
- static IoBuffer MakeBufferFromResponseIfKnownFormat(const cpr::Response& Response)
- {
- std::optional<zen::HttpContentType> ContentType = TryGetContentType(Response);
- if (ContentType)
- {
- IoBuffer Buffer = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size());
- Buffer.SetContentType(ContentType.value());
- return Buffer;
- }
- return {};
- }
-
} // namespace detail
CloudCacheSession::CloudCacheSession(CloudCacheClient* CacheClient) : m_Log(CacheClient->Logger()), m_CacheClient(CacheClient)
{
- m_SessionState = m_CacheClient->AllocSessionState();
}
CloudCacheSession::~CloudCacheSession()
{
- m_CacheClient->FreeSessionState(m_SessionState);
}
CloudCacheResult
CloudCacheSession::Authenticate()
{
- const bool RefreshToken = true;
- const CloudCacheAccessToken& AccessToken = GetAccessToken(RefreshToken);
-
- return {.Success = AccessToken.IsValid()};
+ bool OK = m_CacheClient->m_HttpClient.Authenticate();
+ return {.Success = OK};
}
CloudCacheResult
-CloudCacheSession::GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType)
+CloudCacheSession::GetRef(std::string_view Namespace,
+ std::string_view BucketId,
+ const IoHash& Key,
+ ZenContentType RefType,
+ std::filesystem::path TempFolderPath)
{
- const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream";
-
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString();
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", ContentType}});
- Session.SetOption(cpr::Body{});
+ ZEN_TRACE_CPU("JupiterClient::GetRef");
- cpr::Response Response = Session.Get();
- ZEN_DEBUG("GET {}", Response);
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()),
+ TempFolderPath,
+ {HttpClient::Accept(RefType)});
- CloudCacheResult Result = detail::ConvertResponse(Response);
- if (Result.Success)
- {
- Result.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size());
- }
- else
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::GetRef failed GET. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-Accept: '{}', "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- ContentType,
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
- return Result;
+ return detail::ConvertResponse(Response, "CloudCacheSession::GetRef"sv);
}
CloudCacheResult
CloudCacheSession::GetBlob(std::string_view Namespace, const IoHash& Key)
{
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << Namespace << "/" << Key.ToHexString();
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/octet-stream"}});
- Session.SetOption(cpr::Body{});
-
- cpr::Response Response = Session.Get();
- ZEN_DEBUG("GET {}", Response);
+ ZEN_TRACE_CPU("JupiterClient::GetBlob");
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()),
+ {HttpClient::Accept(ZenContentType::kBinary)});
- CloudCacheResult Result = detail::ConvertResponse(Response);
- if (Result.Success)
- {
- Result.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size());
- }
- else
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::GetBlob failed GET. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-Accept: '{}', "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- "application/octet-stream",
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
- return Result;
+ return detail::ConvertResponse(Response);
}
CloudCacheResult
@@ -310,58 +102,12 @@ CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& K
{
ZEN_TRACE_CPU("JupiterClient::GetCompressedBlob");
- ExtendableStringBuilder<256> Uri;
- std::string KeyString = Key.ToHexString();
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << KeyString;
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()),
+ TempFolderPath,
+ {HttpClient::Accept(ZenContentType::kCompressedBinary)});
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-comp"}});
- Session.SetOption(cpr::Body{});
-
- IoBuffer Payload;
- cpr::Response Response = detail::GetWithStreaming(Session, TempFolderPath, KeyString, Payload);
- ZEN_DEBUG("GET {}", Response);
-
- CloudCacheResult Result = detail::ConvertResponse(Response);
- if (Result.Success)
- {
- Result.Response = std::move(Payload);
- }
- else
- {
- std::optional<zen::HttpContentType> ContentType = detail::TryGetContentType(Response);
- if (ContentType.has_value())
- {
- Result.Response = std::move(Payload);
- Result.Response.SetContentType(ContentType.value());
- }
- ZEN_WARN(
- "CloudCacheSession::GetCompressedBlob failed GET. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-Accept: '{}', "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- "application/x-ue-comp",
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
- return Result;
+ return detail::ConvertResponse(Response);
}
CloudCacheResult
@@ -373,59 +119,14 @@ CloudCacheSession::GetInlineBlob(std::string_view Namespace,
{
ZEN_TRACE_CPU("JupiterClient::GetInlineBlob");
- ExtendableStringBuilder<256> Uri;
- std::string KeyString = Key.ToHexString();
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << KeyString;
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-jupiter-inline"}});
- Session.SetOption(cpr::Body{});
-
- IoBuffer Payload;
- cpr::Response Response = detail::GetWithStreaming(Session, TempFolderPath, KeyString, Payload);
- ZEN_DEBUG("GET {}", Response);
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()),
+ TempFolderPath,
+ {{"Accept", "application/x-jupiter-inline"}});
CloudCacheResult Result = detail::ConvertResponse(Response);
- if (Result.Success)
- {
- Result.Response = std::move(Payload);
- }
- else
- {
- std::optional<zen::HttpContentType> ContentType = detail::TryGetContentType(Response);
- if (ContentType.has_value())
- {
- Result.Response = std::move(Payload);
- Result.Response.SetContentType(ContentType.value());
- }
- ZEN_WARN(
- "CloudCacheSession::GetInlineBlob failed GET. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-Accept: '{}', "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- "application/x-jupiter-inline",
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
- if (auto It = Response.header.find("X-Jupiter-InlinePayloadHash"); It != Response.header.end())
+ if (auto It = Response.Header->find("X-Jupiter-InlinePayloadHash"); It != Response.Header->end())
{
const std::string& PayloadHashHeader = It->second;
if (PayloadHashHeader.length() == IoHash::StringLength)
@@ -442,52 +143,10 @@ CloudCacheSession::GetObject(std::string_view Namespace, const IoHash& Key)
{
ZEN_TRACE_CPU("JupiterClient::GetObject");
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString();
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}});
- Session.SetOption(cpr::Body{});
-
- cpr::Response Response = Session.Get();
- ZEN_DEBUG("GET {}", Response);
-
- CloudCacheResult Result = detail::ConvertResponse(Response);
- if (Result.Success)
- {
- Result.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size());
- }
- else
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::GetObject failed GET. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-Accept: '{}', "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- "application/x-ue-cb",
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()),
+ {HttpClient::Accept(ZenContentType::kCbObject)});
- return Result;
+ return detail::ConvertResponse(Response);
}
PutRefResult
@@ -495,29 +154,18 @@ CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId,
{
ZEN_TRACE_CPU("JupiterClient::PutRef");
- IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size());
-
- const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream";
+ Ref.SetContentType(RefType);
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString();
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(
- cpr::Header{{"Authorization", AccessToken.Value}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", ContentType}});
- Session.SetBody(cpr::Body{(const char*)Ref.Data(), Ref.Size()});
+ IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size());
- cpr::Response Response = Session.Put();
- ZEN_DEBUG("PUT {}", Response);
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Put(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), Ref);
PutRefResult Result = {detail::ConvertResponse(Response)};
if (Result.Success)
{
std::string JsonError;
- json11::Json Json = json11::Json::parse(Response.text, JsonError);
+ json11::Json Json = json11::Json::parse(Response.ToText(), JsonError);
if (JsonError.empty())
{
json11::Json::array Needs = Json["needs"].array_items();
@@ -528,37 +176,6 @@ CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId,
}
Result.RawHash = Hash;
}
- else
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::PutRef failed PUT. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-X-Jupiter-IoHash: '{}', "
- "Header-ContentType: '{}', "
- "ContentSize: {}, "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- Hash.ToHexString(),
- ContentType,
- NiceBytes(Ref.Size()),
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
-
return Result;
}
@@ -567,28 +184,16 @@ CloudCacheSession::FinalizeRef(std::string_view Namespace, std::string_view Buck
{
ZEN_TRACE_CPU("JupiterClient::FinalizeRef");
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString() << "/finalize/"
- << RefHash.ToHexString();
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value},
- {"X-Jupiter-IoHash", RefHash.ToHexString()},
- {"Content-Type", "application/x-ue-cb"}});
- Session.SetBody(cpr::Body{});
-
- cpr::Response Response = Session.Post();
- ZEN_DEBUG("POST {}", Response);
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Post(
+ fmt::format("/api/v1/refs/{}/{}/{}/finalize/{}", Namespace, BucketId, Key.ToHexString(), RefHash.ToHexString()),
+ {{"X-Jupiter-IoHash", RefHash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}});
FinalizeRefResult Result = {detail::ConvertResponse(Response)};
if (Result.Success)
{
std::string JsonError;
- json11::Json Json = json11::Json::parse(Response.text, JsonError);
+ json11::Json Json = json11::Json::parse(std::string(Response.AsText()), JsonError);
if (JsonError.empty())
{
json11::Json::array Needs = Json["needs"].array_items();
@@ -598,37 +203,6 @@ CloudCacheSession::FinalizeRef(std::string_view Namespace, std::string_view Buck
}
}
}
- else
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::FinalizeRef failed PUT. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-X-Jupiter-IoHash: '{}', "
- "Header-ContentType: '{}', "
- "ContentSize: {}, "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- RefHash.ToHexString(),
- "application/x-ue-cb",
- NiceBytes(0),
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
-
return Result;
}
@@ -637,49 +211,9 @@ CloudCacheSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuff
{
ZEN_TRACE_CPU("JupiterClient::PutBlob");
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << Namespace << "/" << Key.ToHexString();
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Put(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), Blob);
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/octet-stream"}});
- Session.SetBody(cpr::Body{(const char*)Blob.Data(), Blob.Size()});
-
- cpr::Response Response = Session.Put();
- ZEN_DEBUG("PUT {}", Response);
-
- CloudCacheResult Result = detail::ConvertResponse(Response);
- if (!Result.Success)
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::PutBlob failed PUT. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-ContentType: '{}', "
- "ContentSize: {}, "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- "application/octet-stream",
- NiceBytes(Blob.Size()),
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
- return Result;
+ return detail::ConvertResponse(Response);
}
CloudCacheResult
@@ -687,66 +221,11 @@ CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& K
{
ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob");
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString();
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-comp"}});
-
- uint64_t Offset = 0;
- if (Blob.IsWholeFile())
- {
- auto ReadCallback = [&Blob, &Offset](char* buffer, size_t& size, intptr_t) {
- size = Min<size_t>(size, Blob.GetSize() - Offset);
- IoBuffer PayloadRange = IoBuffer(Blob, Offset, size);
- MutableMemoryView Data(buffer, size);
- Data.CopyFrom(PayloadRange.GetView());
- Offset += size;
- return true;
- };
- Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Blob.GetSize()), ReadCallback));
- }
- else
- {
- Session.SetBody(cpr::Body{(const char*)Blob.Data(), Blob.Size()});
- }
-
- cpr::Response Response = Session.Put();
- ZEN_DEBUG("PUT {}", Response);
+ Blob.SetContentType(ZenContentType::kCompressedBinary);
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), Blob);
- CloudCacheResult Result = detail::ConvertResponse(Response);
- if (!Result.Success)
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::PutCompressedBlob failed PUT. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-ContentType: '{}', "
- "ContentSize: {}, "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- "application/x-ue-comp",
- NiceBytes(Blob.Size()),
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
- return Result;
+ return detail::ConvertResponse(Response);
}
CloudCacheResult
@@ -754,58 +233,12 @@ CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& K
{
ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob");
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString();
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-comp"}});
- uint64_t SizeLeft = Payload.GetSize();
- CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0);
- auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) {
- size = Min<size_t>(size, SizeLeft);
- MutableMemoryView Data(buffer, size);
- Payload.CopyTo(Data, BufferIt);
- SizeLeft -= size;
- return true;
- };
- Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(SizeLeft), ReadCallback));
-
- cpr::Response Response = Session.Put();
- ZEN_DEBUG("PUT {}", Response);
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()),
+ Payload,
+ ZenContentType::kCompressedBinary);
- CloudCacheResult Result = detail::ConvertResponse(Response);
- if (!Result.Success)
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::PutCompressedBlob failed PUT. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-ContentType: '{}', "
- "ContentSize: {}, "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- "application/x-ue-comp",
- NiceBytes(Payload.GetSize()),
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
- return Result;
+ return detail::ConvertResponse(Response);
}
CloudCacheResult
@@ -813,49 +246,11 @@ CloudCacheSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBu
{
ZEN_TRACE_CPU("JupiterClient::PutObject");
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString();
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ Object.SetContentType(ZenContentType::kCbObject);
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), Object);
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}});
- Session.SetBody(cpr::Body{(const char*)Object.Data(), Object.Size()});
-
- cpr::Response Response = Session.Put();
- ZEN_DEBUG("PUT {}", Response);
-
- CloudCacheResult Result = detail::ConvertResponse(Response);
- if (!Result.Success)
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::PutObject failed PUT. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-ContentType: '{}', "
- "ContentSize: {}, "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- "application/x-ue-cb",
- NiceBytes(Object.GetSize()),
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
- return Result;
+ return detail::ConvertResponse(Response);
}
CloudCacheResult
@@ -863,45 +258,10 @@ CloudCacheSession::RefExists(std::string_view Namespace, std::string_view Bucket
{
ZEN_TRACE_CPU("JupiterClient::RefExists");
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString();
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()));
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}});
- Session.SetOption(cpr::Body{});
-
- cpr::Response Response = Session.Head();
- ZEN_DEBUG("HEAD {}", Response);
-
- CloudCacheResult Result = detail::ConvertResponse(Response);
- if (!Result.Success)
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::RefExists failed PUT. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
- return Result;
+ return detail::ConvertResponse(Response);
}
GetObjectReferencesResult
@@ -909,57 +269,20 @@ CloudCacheSession::GetObjectReferences(std::string_view Namespace, const IoHash&
{
ZEN_TRACE_CPU("JupiterClient::GetObjectReferences");
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString() << "/references";
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}});
- Session.SetOption(cpr::Body{});
-
- cpr::Response Response = Session.Get();
- ZEN_DEBUG("GET {}", Response);
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/references", Namespace, Key.ToHexString()),
+ {HttpClient::Accept(ZenContentType::kCbObject)});
GetObjectReferencesResult Result = {detail::ConvertResponse(Response)};
if (Result.Success)
{
- IoBuffer Buffer = IoBuffer(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size());
- const CbObject ReferencesResponse = LoadCompactBinaryObject(Buffer);
+ const CbObject ReferencesResponse = Response.AsObject();
for (auto& Item : ReferencesResponse["references"sv])
{
Result.References.insert(Item.AsHash());
}
}
- else
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::GetObjectReferences failed PUT. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-Accept: '{}', "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- "application/x-ue-cb",
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
-
return Result;
}
@@ -1002,73 +325,23 @@ CloudCacheSession::ObjectExists(std::string_view Namespace, const std::set<IoHas
std::vector<IoHash>
CloudCacheSession::Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes)
{
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl();
- Uri << "/api/v1/s/" << Namespace;
+ // ExtendableStringBuilder<256> Uri;
+ // Uri << m_CacheClient->ServiceUrl();
+ // Uri << "/api/v1/s/" << Namespace;
- ZEN_UNUSED(BucketId, ChunkHashes);
+ ZEN_UNUSED(Namespace, BucketId, ChunkHashes);
return {};
}
-cpr::Session&
-CloudCacheSession::GetSession()
-{
- return m_SessionState->GetSession();
-}
-
-CloudCacheAccessToken
-CloudCacheSession::GetAccessToken(bool RefreshToken)
-{
- return m_SessionState->GetAccessToken(RefreshToken);
-}
-
CloudCacheResult
CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key)
{
ZEN_TRACE_CPU("JupiterClient::CacheTypeExists");
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << Namespace << "/" << Key.ToHexString();
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}});
- Session.SetOption(cpr::Body{});
-
- cpr::Response Response = Session.Head();
- ZEN_DEBUG("HEAD {}", Response);
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/{}/{}/{}", TypeId, Namespace, Key.ToHexString()));
- CloudCacheResult Result = detail::ConvertResponse(Response);
- if (!Result.Success)
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::CacheTypeExists failed GET. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-Accept: '{}', "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- "application/x-ue-cb",
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
- return Result;
+ return detail::ConvertResponse(Response);
}
CloudCacheExistsResult
@@ -1083,58 +356,23 @@ CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view
Body << (Body.Size() != 1 ? ",\"" : "\"") << Key.ToHexString() << "\"";
}
Body << "]";
+ IoBuffer Payload = IoBuffer(IoBuffer::Wrap, Body.Data(), Body.Size());
+ Payload.SetContentType(ZenContentType::kJSON);
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << Namespace << "/exist";
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Post(fmt::format("/api/v1/{}/{}/exist", TypeId, Namespace),
+ Payload,
+ {HttpClient::Accept(ZenContentType::kCbObject)});
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(
- cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}, {"Content-Type", "application/json"}});
- Session.SetOption(cpr::Body(Body.ToString()));
-
- cpr::Response Response = Session.Post();
- ZEN_DEBUG("POST {}", Response);
CloudCacheExistsResult Result = {detail::ConvertResponse(Response)};
if (Result.Success)
{
- IoBuffer Buffer = IoBuffer(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size());
- const CbObject ExistsResponse = LoadCompactBinaryObject(Buffer);
+ const CbObject ExistsResponse = Response.AsObject();
for (auto& Item : ExistsResponse["needs"sv])
{
Result.Needs.insert(Item.AsHash());
}
}
- else
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::CacheTypeExists failed GET. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-Accept: '{}', "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- "application/x-ue-cb",
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
-
return Result;
}
@@ -1233,77 +471,39 @@ CloudCacheTokenProvider::CreateFromCallback(std::function<CloudCacheAccessToken(
return std::make_unique<CallbackTokenProvider>(std::move(Callback));
}
+static std::optional<std::function<HttpClientAccessToken()>>
+GetHttpClientAccessProvider(CloudCacheTokenProvider* TokenProvider)
+{
+ if (TokenProvider == nullptr)
+ {
+ return {};
+ }
+ auto ProviderFunc = [TokenProvider]() -> HttpClientAccessToken {
+ CloudCacheAccessToken Token = TokenProvider->AcquireAccessToken();
+ return HttpClientAccessToken{.Value = Token.Value, .ExpireTime = Token.ExpireTime};
+ };
+ return ProviderFunc;
+}
+
CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options, std::unique_ptr<CloudCacheTokenProvider> TokenProvider)
: m_Log(zen::logging::Get("jupiter"))
-, m_ServiceUrl(Options.ServiceUrl)
, m_DefaultDdcNamespace(Options.DdcNamespace)
, m_DefaultBlobStoreNamespace(Options.BlobStoreNamespace)
, m_ComputeCluster(Options.ComputeCluster)
-, m_ConnectTimeout(Options.ConnectTimeout)
-, m_Timeout(Options.Timeout)
, m_TokenProvider(std::move(TokenProvider))
-, m_AssumeHttp2(Options.AssumeHttp2)
+, m_HttpClient(Options.ServiceUrl,
+ HttpClientSettings{.ConnectTimeout = Options.ConnectTimeout,
+ .Timeout = Options.Timeout,
+ .AccessTokenProvider = GetHttpClientAccessProvider(m_TokenProvider.get()),
+ .AssumeHttp2 = Options.AssumeHttp2,
+ .AllowResume = Options.AllowResume,
+ .RetryCount = Options.RetryCount})
{
ZEN_ASSERT(m_TokenProvider.get() != nullptr);
}
CloudCacheClient::~CloudCacheClient()
{
- RwLock::ExclusiveLockScope _(m_SessionStateLock);
-
- for (auto State : m_SessionStateCache)
- {
- delete State;
- }
-}
-
-CloudCacheAccessToken
-CloudCacheClient::AcquireAccessToken()
-{
- ZEN_TRACE_CPU("JupiterClient::AcquireAccessToken");
-
- return m_TokenProvider->AcquireAccessToken();
-}
-
-detail::CloudCacheSessionState*
-CloudCacheClient::AllocSessionState()
-{
- detail::CloudCacheSessionState* State = nullptr;
-
- bool IsTokenValid = false;
-
- {
- RwLock::ExclusiveLockScope _(m_SessionStateLock);
-
- if (m_SessionStateCache.empty() == false)
- {
- State = m_SessionStateCache.front();
- IsTokenValid = State->m_AccessToken.IsValid();
-
- m_SessionStateCache.pop_front();
- }
- }
-
- if (State == nullptr)
- {
- State = new detail::CloudCacheSessionState(*this);
- }
-
- State->Reset(m_ConnectTimeout, m_Timeout, m_AssumeHttp2);
-
- if (IsTokenValid == false)
- {
- State->m_AccessToken = m_TokenProvider->AcquireAccessToken();
- }
-
- return State;
-}
-
-void
-CloudCacheClient::FreeSessionState(detail::CloudCacheSessionState* State)
-{
- RwLock::ExclusiveLockScope _(m_SessionStateLock);
- m_SessionStateCache.push_front(State);
}
} // namespace zen