aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver')
-rw-r--r--src/zenserver/projectstore/jupiterremoteprojectstore.cpp82
-rw-r--r--src/zenserver/projectstore/projectstore.cpp12
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp58
-rw-r--r--src/zenserver/upstream/jupiter.cpp1012
-rw-r--r--src/zenserver/upstream/jupiter.h51
5 files changed, 178 insertions, 1037 deletions
diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
index 9d8f6c17b..c9f1f5f6f 100644
--- a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
@@ -54,19 +54,8 @@ public:
virtual SaveResult SaveContainer(const IoBuffer& Payload) override
{
- const int32_t MaxAttempts = 3;
- PutRefResult PutResult;
- {
- CloudCacheSession Session(m_CloudClient.Get());
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !PutResult.Success; Attempt++)
- {
- PutResult = Session.PutRef(m_Namespace, m_Bucket, m_Key, Payload, ZenContentType::kCbObject);
- if (!PutResult.Success)
- {
- Sleep(100 * (Attempt + 1));
- }
- }
- }
+ CloudCacheSession Session(m_CloudClient.Get());
+ PutRefResult PutResult = Session.PutRef(m_Namespace, m_Bucket, m_Key, Payload, ZenContentType::kCbObject);
SaveResult Result{ConvertResult(PutResult), {PutResult.Needs.begin(), PutResult.Needs.end()}, PutResult.RawHash};
if (Result.ErrorCode)
@@ -83,19 +72,8 @@ public:
virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override
{
- const int32_t MaxAttempts = 3;
- CloudCacheResult PutResult;
- {
- CloudCacheSession Session(m_CloudClient.Get());
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !PutResult.Success; Attempt++)
- {
- PutResult = Session.PutCompressedBlob(m_Namespace, RawHash, Payload);
- if (!PutResult.Success)
- {
- Sleep(100 * (Attempt + 1));
- }
- }
- }
+ CloudCacheSession Session(m_CloudClient.Get());
+ CloudCacheResult PutResult = Session.PutCompressedBlob(m_Namespace, RawHash, Payload);
SaveAttachmentResult Result{ConvertResult(PutResult)};
if (Result.ErrorCode)
@@ -126,20 +104,9 @@ public:
virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) override
{
- const int32_t MaxAttempts = 3;
- FinalizeRefResult FinalizeRefResult;
- {
- CloudCacheSession Session(m_CloudClient.Get());
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !FinalizeRefResult.Success; Attempt++)
- {
- FinalizeRefResult = Session.FinalizeRef(m_Namespace, m_Bucket, m_Key, RawHash);
- if (!FinalizeRefResult.Success)
- {
- Sleep(100 * (Attempt + 1));
- }
- }
- }
- FinalizeResult Result{ConvertResult(FinalizeRefResult), {FinalizeRefResult.Needs.begin(), FinalizeRefResult.Needs.end()}};
+ CloudCacheSession Session(m_CloudClient.Get());
+ FinalizeRefResult FinalizeRefResult = Session.FinalizeRef(m_Namespace, m_Bucket, m_Key, RawHash);
+ FinalizeResult Result{ConvertResult(FinalizeRefResult), {FinalizeRefResult.Needs.begin(), FinalizeRefResult.Needs.end()}};
if (Result.ErrorCode)
{
Result.Reason = fmt::format("Failed finalizing oplog container to {}/{}/{}/{}. Reason: '{}'",
@@ -165,19 +132,8 @@ public:
virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override
{
- const int32_t MaxAttempts = 3;
- CloudCacheResult GetResult;
- {
- CloudCacheSession Session(m_CloudClient.Get());
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !GetResult.Success; Attempt++)
- {
- GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath);
- if (!GetResult.Success)
- {
- Sleep(100 * (Attempt + 1));
- }
- }
- }
+ CloudCacheSession Session(m_CloudClient.Get());
+ CloudCacheResult GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath);
LoadAttachmentResult Result{ConvertResult(GetResult), std::move(GetResult.Response)};
if (GetResult.ErrorCode)
{
@@ -210,20 +166,8 @@ public:
private:
LoadContainerResult LoadContainer(const IoHash& Key)
{
- const int32_t MaxAttempts = 3;
- CloudCacheResult GetResult;
- {
- CloudCacheSession Session(m_CloudClient.Get());
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !GetResult.Success; Attempt++)
- {
- GetResult = Session.GetRef(m_Namespace, m_Bucket, Key, ZenContentType::kCbObject);
- if (!GetResult.Success)
- {
- Sleep(100 * (Attempt + 1));
- }
- }
- }
-
+ CloudCacheSession Session(m_CloudClient.Get());
+ CloudCacheResult GetResult = Session.GetRef(m_Namespace, m_Bucket, Key, ZenContentType::kCbObject, m_TempFilePath);
if (GetResult.ErrorCode || !GetResult.Success)
{
LoadContainerResult Result{ConvertResult(GetResult)};
@@ -312,7 +256,9 @@ CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, const std::fi
.ServiceUrl = Url,
.ConnectTimeout = std::chrono::milliseconds(2000),
.Timeout = std::chrono::milliseconds(1800000),
- .AssumeHttp2 = Options.AssumeHttp2};
+ .AssumeHttp2 = Options.AssumeHttp2,
+ .AllowResume = true,
+ .RetryCount = 2};
// 1) Access token as parameter in request
// 2) Environment variable (different win vs linux/mac)
// 3) openid-provider (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 42af9b79b..f117a4203 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -3593,12 +3593,16 @@ ProjectStore::CreateReferenceCheckers(GcCtx& Ctx)
Checkers.reserve(OpLogs.size());
for (const std::string& OpLogId : OpLogs)
{
- ProjectStore::Oplog* Oplog = Project->OpenOplog(OpLogId);
- GcClock::TimePoint Now = GcClock::Now();
- bool TryPreCache = Project->LastOplogAccessTime(OpLogId) < (Now - std::chrono::minutes(5));
+ ProjectStore::Oplog* Oplog = Project->OpenOplog(OpLogId);
+ if (Oplog == nullptr)
+ {
+ continue;
+ }
+ GcClock::TimePoint Now = GcClock::Now();
+ bool TryPreCache = Project->LastOplogAccessTime(OpLogId) < (Now - std::chrono::minutes(5));
Checkers.emplace_back(new ProjectStoreReferenceChecker(*Oplog, TryPreCache));
+ OplogCount++;
}
- OplogCount += OpLogs.size();
}
}
catch (std::exception&)
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index ddab7432d..83cec4725 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -560,7 +560,7 @@ BuildContainer(CidStore& ChunkStore,
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {});
ReportMessage(OptionalContext,
- fmt::format("Failed to build container ({}): '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ fmt::format("Failed to build container ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
BlockCreateLatch.CountDown();
while (!BlockCreateLatch.Wait(1000))
@@ -893,11 +893,9 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
"Invalid attachment",
fmt::format("Upload requested of unknown attachment '{}'", Needed));
- ReportMessage(OptionalContext,
- fmt::format("Failed to upload attachment '{}'. ({}): '{}'",
- Needed,
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
+ ReportMessage(
+ OptionalContext,
+ fmt::format("Failed to upload attachment '{}'. ({}): {}", Needed, RemoteResult.GetError(), RemoteResult.GetErrorReason()));
return;
}
}
@@ -969,7 +967,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
fmt::format("Failed to find attachment {}", RawHash),
{});
- ZEN_WARN("Failed to save attachment '{}' ({}): '{}'", RawHash, RemoteResult.GetError(), RemoteResult.GetErrorReason());
+ ZEN_WARN("Failed to save attachment '{}' ({}): {}", RawHash, RemoteResult.GetError(), RemoteResult.GetErrorReason());
return;
}
@@ -978,7 +976,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
{
RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
ReportMessage(OptionalContext,
- fmt::format("Failed to save attachment '{}', {} ({}): '{}'",
+ fmt::format("Failed to save attachment '{}', {} ({}): {}",
RawHash,
NiceBytes(Payload.GetSize()),
RemoteResult.GetError(),
@@ -1031,7 +1029,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
{
RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
ReportMessage(OptionalContext,
- fmt::format("Failed to save attachment '{}', {} ({}): '{}'",
+ fmt::format("Failed to save attachment '{}', {} ({}): {}",
RawHash,
NiceBytes(Payload.GetSize()),
RemoteResult.GetError(),
@@ -1108,7 +1106,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
{
RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
ReportMessage(OptionalContext,
- fmt::format("Failed to save attachments with {} chunks ({}): '{}'",
+ fmt::format("Failed to save attachments with {} chunks ({}): {}",
Chunks.size(),
RemoteResult.GetError(),
RemoteResult.GetErrorReason()));
@@ -1230,7 +1228,7 @@ SaveOplog(CidStore& ChunkStore,
{
RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
ReportMessage(OptionalContext,
- fmt::format("Failed to save attachment ({}): '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ fmt::format("Failed to save attachment ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
return;
}
ZEN_DEBUG("Saved block {}, {}", BlockHash, NiceBytes(CompressedBlock.GetCompressedSize()));
@@ -1270,10 +1268,9 @@ SaveOplog(CidStore& ChunkStore,
{
if (BaseContainerResult.ErrorCode)
{
- ReportMessage(OptionalContext,
- fmt::format("Failed to load oplog base container: '{}', error code: {}",
- BaseContainerResult.Reason,
- BaseContainerResult.ErrorCode));
+ ReportMessage(
+ OptionalContext,
+ fmt::format("Failed to load oplog base container ({}): {}", BaseContainerResult.ErrorCode, BaseContainerResult.Reason));
}
else
{
@@ -1337,7 +1334,7 @@ SaveOplog(CidStore& ChunkStore,
{
RemoteResult.SetError(ContainerSaveResult.ErrorCode, ContainerSaveResult.Reason, "Failed to save oplog container");
ReportMessage(OptionalContext,
- fmt::format("Failed to save oplog container ({}): '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ fmt::format("Failed to save oplog container ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
}
else
{
@@ -1372,7 +1369,7 @@ SaveOplog(CidStore& ChunkStore,
{
RemoteResult.SetError(ContainerFinalizeResult.ErrorCode, ContainerFinalizeResult.Reason, ContainerFinalizeResult.Text);
ReportMessage(OptionalContext,
- fmt::format("Failed to finalize oplog container {} ({}): '{}'",
+ fmt::format("Failed to finalize oplog container {} ({}): {}",
ContainerSaveResult.RawHash,
RemoteResult.GetError(),
RemoteResult.GetErrorReason()));
@@ -1635,7 +1632,7 @@ LoadOplog(CidStore& ChunkStore,
if (Result.ErrorCode)
{
ReportMessage(OptionalContext,
- fmt::format("Failed to load attachments with {} chunks ({}): '{}'",
+ fmt::format("Failed to load attachments with {} chunks ({}): {}",
Chunks.size(),
RemoteResult.GetError(),
RemoteResult.GetErrorReason()));
@@ -1691,7 +1688,7 @@ LoadOplog(CidStore& ChunkStore,
if (BlockResult.ErrorCode)
{
ReportMessage(OptionalContext,
- fmt::format("Failed to download block attachment {} ({}): '{}'",
+ fmt::format("Failed to download block attachment {} ({}): {}",
BlockHash,
RemoteResult.GetError(),
RemoteResult.GetErrorReason()));
@@ -1706,18 +1703,20 @@ LoadOplog(CidStore& ChunkStore,
return;
}
Info.AttachmentBlocksDownloaded.fetch_add(1);
- ZEN_INFO("Loaded block attachment '{}' in {}",
+ uint64_t BlockSize = BlockResult.Bytes.GetSize();
+ ZEN_INFO("Loaded block attachment '{}' in {} ({})",
BlockHash,
- NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)));
+ NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)),
+ NiceBytes(BlockSize));
if (RemoteResult.IsError())
{
return;
}
+ Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize);
bool StoreChunksOK =
IterateBlock(std::move(BlockResult.Bytes), [&ChunkStore, &Info](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) {
- uint64_t ChunkSize = Chunk.GetCompressedSize();
- Info.AttachmentBlockBytesDownloaded.fetch_add(ChunkSize);
+ uint64_t ChunkSize = Chunk.GetCompressedSize();
CidStore::InsertResult InsertResult =
ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash);
if (InsertResult.New)
@@ -1730,7 +1729,7 @@ LoadOplog(CidStore& ChunkStore,
if (!StoreChunksOK)
{
ReportMessage(OptionalContext,
- fmt::format("Block attachment {} has invalid format ({}): '{}'",
+ fmt::format("Block attachment {} has invalid format ({}): {}",
BlockHash,
RemoteResult.GetError(),
RemoteResult.GetErrorReason()));
@@ -1788,20 +1787,21 @@ LoadOplog(CidStore& ChunkStore,
}
return;
}
- ZEN_INFO("Loaded large attachment '{}' in {}",
+ uint64_t AttachmentSize = AttachmentResult.Bytes.GetSize();
+ ZEN_INFO("Loaded large attachment '{}' in {} ({})",
RawHash,
- NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)));
+ NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)),
+ NiceBytes(AttachmentSize));
Info.AttachmentsDownloaded.fetch_add(1);
if (RemoteResult.IsError())
{
return;
}
- uint64_t ChunkSize = AttachmentResult.Bytes.GetSize();
- Info.AttachmentBytesDownloaded.fetch_add(ChunkSize);
+ Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize);
CidStore::InsertResult InsertResult = ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash);
if (InsertResult.New)
{
- Info.AttachmentBytesStored.fetch_add(ChunkSize);
+ Info.AttachmentBytesStored.fetch_add(AttachmentSize);
Info.AttachmentsStored.fetch_add(1);
}
});
diff --git a/src/zenserver/upstream/jupiter.cpp b/src/zenserver/upstream/jupiter.cpp
index e4d45e316..bf2538908 100644
--- a/src/zenserver/upstream/jupiter.cpp
+++ b/src/zenserver/upstream/jupiter.cpp
@@ -16,7 +16,6 @@
#include <zenutil/basicfile.h>
ZEN_THIRD_PARTY_INCLUDES_START
-#include <cpr/cpr.h>
#include <fmt/format.h>
ZEN_THIRD_PARTY_INCLUDES_END
@@ -32,277 +31,70 @@ using namespace std::literals;
namespace zen {
namespace detail {
- struct CloudCacheSessionState
+ CloudCacheResult ConvertResponse(const HttpClient::Response& Response, const std::string_view ErrorPrefix = ""sv)
{
- CloudCacheSessionState(CloudCacheClient& Client) : m_Client(Client) {}
-
- const CloudCacheAccessToken& GetAccessToken(bool RefreshToken)
- {
- if (RefreshToken)
- {
- m_AccessToken = m_Client.AcquireAccessToken();
- }
-
- return m_AccessToken;
- }
-
- cpr::Session& GetSession() { return m_Session; }
-
- void Reset(std::chrono::milliseconds ConnectTimeout, std::chrono::milliseconds Timeout, bool AssumeHttp2)
+ if (Response.Error)
{
- m_Session.SetBody({});
- m_Session.SetHeader({});
- m_Session.SetConnectTimeout(ConnectTimeout);
- m_Session.SetTimeout(Timeout);
- if (AssumeHttp2)
- {
- m_Session.SetHttpVersion(cpr::HttpVersion{cpr::HttpVersionCode::VERSION_2_0_PRIOR_KNOWLEDGE});
- }
- }
-
- private:
- friend class zen::CloudCacheClient;
-
- CloudCacheClient& m_Client;
- CloudCacheAccessToken m_AccessToken;
- cpr::Session m_Session;
- };
-
- CloudCacheResult ConvertResponse(const cpr::Response& Response)
- {
- if (Response.error)
- {
- return {.ElapsedSeconds = Response.elapsed,
- .ErrorCode = static_cast<int32_t>(Response.error.code),
- .Reason = Response.error.message,
+ return {.ElapsedSeconds = Response.ElapsedSeconds,
+ .ErrorCode = Response.Error.value().ErrorCode,
+ .Reason = Response.ErrorMessage(ErrorPrefix),
.Success = false};
}
- if (!IsHttpSuccessCode(Response.status_code))
+ if (!Response.IsSuccess())
{
- return {.ElapsedSeconds = Response.elapsed,
- .ErrorCode = static_cast<int32_t>(Response.status_code),
- .Reason = Response.reason.empty() ? Response.text : Response.reason,
+ return {.ElapsedSeconds = Response.ElapsedSeconds,
+ .ErrorCode = static_cast<int32_t>(Response.StatusCode),
+ .Reason = Response.ErrorMessage(ErrorPrefix),
.Success = false};
}
- return {.Bytes = Response.downloaded_bytes,
- .ElapsedSeconds = Response.elapsed,
+ return {.Response = Response.ResponsePayload,
+ .Bytes = Response.DownloadedBytes,
+ .ElapsedSeconds = Response.ElapsedSeconds,
.ErrorCode = 0,
- .Reason = Response.reason,
.Success = true};
}
-
- cpr::Response GetWithStreaming(cpr::Session& Session, std::filesystem::path TempFolderPath, std::string_view Name, IoBuffer& OutBuffer)
- {
- if (TempFolderPath.empty())
- {
- return Session.Get();
- }
-
- std::string PayloadString;
- std::shared_ptr<BasicFile> PayloadFile;
-
- auto _ = MakeGuard([&]() {
- if (PayloadFile)
- {
- PayloadFile.reset();
- std::filesystem::path TempPath = TempFolderPath / Name;
- std::error_code Ec;
- std::filesystem::remove(TempPath, Ec);
- }
- });
-
- uint64_t Offset = 0;
- Session.SetWriteCallback(cpr::WriteCallback{[&](std::string data, intptr_t) {
- if (!PayloadFile && (PayloadString.length() + data.length()) > (1024 * 1024))
- {
- std::filesystem::path TempPath = TempFolderPath / Name;
- PayloadFile = std::make_shared<BasicFile>();
- PayloadFile->Open(TempPath, BasicFile::Mode::kTruncateDelete);
- PayloadFile->Write(PayloadString.data(), PayloadString.size(), Offset);
- Offset += PayloadString.size();
- PayloadString.clear();
- }
- if (PayloadFile)
- {
- PayloadFile->Write(data.data(), data.size(), Offset);
- Offset += data.size();
- }
- else
- {
- PayloadString.append(data);
- }
- return true;
- }});
-
- cpr::Response Response = Session.Get();
-
- if (!Response.error && IsHttpSuccessCode(Response.status_code))
- {
- if (PayloadFile)
- {
- uint64_t PayloadSize = PayloadFile->FileSize();
- void* FileHandle = PayloadFile->Detach();
- PayloadFile.reset();
- OutBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, PayloadSize, /*IsWholeFile*/ true);
- OutBuffer.SetDeleteOnClose(true);
- }
- else
- {
- OutBuffer = IoBufferBuilder::MakeCloneFromMemory(PayloadString.data(), PayloadString.size());
- }
- return Response;
- }
-
- Response.text.swap(PayloadString);
- return Response;
- }
-
- static std::optional<zen::HttpContentType> TryGetContentType(const cpr::Response& Response)
- {
- if (auto It = Response.header.find("Content-Type"); It != Response.header.end())
- {
- zen::HttpContentType ContentType = zen::ParseContentType(It->second);
- if (ContentType != zen::HttpContentType::kUnknownContentType)
- {
- return ContentType;
- }
- }
- return {};
- }
-
- static IoBuffer MakeBufferFromResponseIfKnownFormat(const cpr::Response& Response)
- {
- std::optional<zen::HttpContentType> ContentType = TryGetContentType(Response);
- if (ContentType)
- {
- IoBuffer Buffer = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size());
- Buffer.SetContentType(ContentType.value());
- return Buffer;
- }
- return {};
- }
-
} // namespace detail
CloudCacheSession::CloudCacheSession(CloudCacheClient* CacheClient) : m_Log(CacheClient->Logger()), m_CacheClient(CacheClient)
{
- m_SessionState = m_CacheClient->AllocSessionState();
}
CloudCacheSession::~CloudCacheSession()
{
- m_CacheClient->FreeSessionState(m_SessionState);
}
CloudCacheResult
CloudCacheSession::Authenticate()
{
- const bool RefreshToken = true;
- const CloudCacheAccessToken& AccessToken = GetAccessToken(RefreshToken);
-
- return {.Success = AccessToken.IsValid()};
+ bool OK = m_CacheClient->m_HttpClient.Authenticate();
+ return {.Success = OK};
}
CloudCacheResult
-CloudCacheSession::GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType)
+CloudCacheSession::GetRef(std::string_view Namespace,
+ std::string_view BucketId,
+ const IoHash& Key,
+ ZenContentType RefType,
+ std::filesystem::path TempFolderPath)
{
- const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream";
-
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString();
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", ContentType}});
- Session.SetOption(cpr::Body{});
+ ZEN_TRACE_CPU("JupiterClient::GetRef");
- cpr::Response Response = Session.Get();
- ZEN_DEBUG("GET {}", Response);
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()),
+ TempFolderPath,
+ {HttpClient::Accept(RefType)});
- CloudCacheResult Result = detail::ConvertResponse(Response);
- if (Result.Success)
- {
- Result.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size());
- }
- else
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::GetRef failed GET. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-Accept: '{}', "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- ContentType,
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
- return Result;
+ return detail::ConvertResponse(Response, "CloudCacheSession::GetRef"sv);
}
CloudCacheResult
CloudCacheSession::GetBlob(std::string_view Namespace, const IoHash& Key)
{
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << Namespace << "/" << Key.ToHexString();
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/octet-stream"}});
- Session.SetOption(cpr::Body{});
-
- cpr::Response Response = Session.Get();
- ZEN_DEBUG("GET {}", Response);
+ ZEN_TRACE_CPU("JupiterClient::GetBlob");
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()),
+ {HttpClient::Accept(ZenContentType::kBinary)});
- CloudCacheResult Result = detail::ConvertResponse(Response);
- if (Result.Success)
- {
- Result.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size());
- }
- else
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::GetBlob failed GET. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-Accept: '{}', "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- "application/octet-stream",
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
- return Result;
+ return detail::ConvertResponse(Response);
}
CloudCacheResult
@@ -310,58 +102,12 @@ CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& K
{
ZEN_TRACE_CPU("JupiterClient::GetCompressedBlob");
- ExtendableStringBuilder<256> Uri;
- std::string KeyString = Key.ToHexString();
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << KeyString;
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()),
+ TempFolderPath,
+ {HttpClient::Accept(ZenContentType::kCompressedBinary)});
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-comp"}});
- Session.SetOption(cpr::Body{});
-
- IoBuffer Payload;
- cpr::Response Response = detail::GetWithStreaming(Session, TempFolderPath, KeyString, Payload);
- ZEN_DEBUG("GET {}", Response);
-
- CloudCacheResult Result = detail::ConvertResponse(Response);
- if (Result.Success)
- {
- Result.Response = std::move(Payload);
- }
- else
- {
- std::optional<zen::HttpContentType> ContentType = detail::TryGetContentType(Response);
- if (ContentType.has_value())
- {
- Result.Response = std::move(Payload);
- Result.Response.SetContentType(ContentType.value());
- }
- ZEN_WARN(
- "CloudCacheSession::GetCompressedBlob failed GET. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-Accept: '{}', "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- "application/x-ue-comp",
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
- return Result;
+ return detail::ConvertResponse(Response);
}
CloudCacheResult
@@ -373,59 +119,14 @@ CloudCacheSession::GetInlineBlob(std::string_view Namespace,
{
ZEN_TRACE_CPU("JupiterClient::GetInlineBlob");
- ExtendableStringBuilder<256> Uri;
- std::string KeyString = Key.ToHexString();
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << KeyString;
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-jupiter-inline"}});
- Session.SetOption(cpr::Body{});
-
- IoBuffer Payload;
- cpr::Response Response = detail::GetWithStreaming(Session, TempFolderPath, KeyString, Payload);
- ZEN_DEBUG("GET {}", Response);
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()),
+ TempFolderPath,
+ {{"Accept", "application/x-jupiter-inline"}});
CloudCacheResult Result = detail::ConvertResponse(Response);
- if (Result.Success)
- {
- Result.Response = std::move(Payload);
- }
- else
- {
- std::optional<zen::HttpContentType> ContentType = detail::TryGetContentType(Response);
- if (ContentType.has_value())
- {
- Result.Response = std::move(Payload);
- Result.Response.SetContentType(ContentType.value());
- }
- ZEN_WARN(
- "CloudCacheSession::GetInlineBlob failed GET. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-Accept: '{}', "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- "application/x-jupiter-inline",
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
- if (auto It = Response.header.find("X-Jupiter-InlinePayloadHash"); It != Response.header.end())
+ if (auto It = Response.Header->find("X-Jupiter-InlinePayloadHash"); It != Response.Header->end())
{
const std::string& PayloadHashHeader = It->second;
if (PayloadHashHeader.length() == IoHash::StringLength)
@@ -442,52 +143,10 @@ CloudCacheSession::GetObject(std::string_view Namespace, const IoHash& Key)
{
ZEN_TRACE_CPU("JupiterClient::GetObject");
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString();
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}});
- Session.SetOption(cpr::Body{});
-
- cpr::Response Response = Session.Get();
- ZEN_DEBUG("GET {}", Response);
-
- CloudCacheResult Result = detail::ConvertResponse(Response);
- if (Result.Success)
- {
- Result.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size());
- }
- else
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::GetObject failed GET. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-Accept: '{}', "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- "application/x-ue-cb",
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()),
+ {HttpClient::Accept(ZenContentType::kCbObject)});
- return Result;
+ return detail::ConvertResponse(Response);
}
PutRefResult
@@ -495,29 +154,18 @@ CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId,
{
ZEN_TRACE_CPU("JupiterClient::PutRef");
- IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size());
-
- const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream";
+ Ref.SetContentType(RefType);
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString();
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(
- cpr::Header{{"Authorization", AccessToken.Value}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", ContentType}});
- Session.SetBody(cpr::Body{(const char*)Ref.Data(), Ref.Size()});
+ IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size());
- cpr::Response Response = Session.Put();
- ZEN_DEBUG("PUT {}", Response);
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Put(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), Ref);
PutRefResult Result = {detail::ConvertResponse(Response)};
if (Result.Success)
{
std::string JsonError;
- json11::Json Json = json11::Json::parse(Response.text, JsonError);
+ json11::Json Json = json11::Json::parse(Response.ToText(), JsonError);
if (JsonError.empty())
{
json11::Json::array Needs = Json["needs"].array_items();
@@ -528,37 +176,6 @@ CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId,
}
Result.RawHash = Hash;
}
- else
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::PutRef failed PUT. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-X-Jupiter-IoHash: '{}', "
- "Header-ContentType: '{}', "
- "ContentSize: {}, "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- Hash.ToHexString(),
- ContentType,
- NiceBytes(Ref.Size()),
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
-
return Result;
}
@@ -567,28 +184,16 @@ CloudCacheSession::FinalizeRef(std::string_view Namespace, std::string_view Buck
{
ZEN_TRACE_CPU("JupiterClient::FinalizeRef");
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString() << "/finalize/"
- << RefHash.ToHexString();
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value},
- {"X-Jupiter-IoHash", RefHash.ToHexString()},
- {"Content-Type", "application/x-ue-cb"}});
- Session.SetBody(cpr::Body{});
-
- cpr::Response Response = Session.Post();
- ZEN_DEBUG("POST {}", Response);
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Post(
+ fmt::format("/api/v1/refs/{}/{}/{}/finalize/{}", Namespace, BucketId, Key.ToHexString(), RefHash.ToHexString()),
+ {{"X-Jupiter-IoHash", RefHash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}});
FinalizeRefResult Result = {detail::ConvertResponse(Response)};
if (Result.Success)
{
std::string JsonError;
- json11::Json Json = json11::Json::parse(Response.text, JsonError);
+ json11::Json Json = json11::Json::parse(std::string(Response.AsText()), JsonError);
if (JsonError.empty())
{
json11::Json::array Needs = Json["needs"].array_items();
@@ -598,37 +203,6 @@ CloudCacheSession::FinalizeRef(std::string_view Namespace, std::string_view Buck
}
}
}
- else
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::FinalizeRef failed PUT. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-X-Jupiter-IoHash: '{}', "
- "Header-ContentType: '{}', "
- "ContentSize: {}, "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- RefHash.ToHexString(),
- "application/x-ue-cb",
- NiceBytes(0),
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
-
return Result;
}
@@ -637,49 +211,9 @@ CloudCacheSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuff
{
ZEN_TRACE_CPU("JupiterClient::PutBlob");
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << Namespace << "/" << Key.ToHexString();
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Put(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), Blob);
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/octet-stream"}});
- Session.SetBody(cpr::Body{(const char*)Blob.Data(), Blob.Size()});
-
- cpr::Response Response = Session.Put();
- ZEN_DEBUG("PUT {}", Response);
-
- CloudCacheResult Result = detail::ConvertResponse(Response);
- if (!Result.Success)
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::PutBlob failed PUT. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-ContentType: '{}', "
- "ContentSize: {}, "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- "application/octet-stream",
- NiceBytes(Blob.Size()),
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
- return Result;
+ return detail::ConvertResponse(Response);
}
CloudCacheResult
@@ -687,66 +221,11 @@ CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& K
{
ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob");
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString();
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-comp"}});
-
- uint64_t Offset = 0;
- if (Blob.IsWholeFile())
- {
- auto ReadCallback = [&Blob, &Offset](char* buffer, size_t& size, intptr_t) {
- size = Min<size_t>(size, Blob.GetSize() - Offset);
- IoBuffer PayloadRange = IoBuffer(Blob, Offset, size);
- MutableMemoryView Data(buffer, size);
- Data.CopyFrom(PayloadRange.GetView());
- Offset += size;
- return true;
- };
- Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Blob.GetSize()), ReadCallback));
- }
- else
- {
- Session.SetBody(cpr::Body{(const char*)Blob.Data(), Blob.Size()});
- }
-
- cpr::Response Response = Session.Put();
- ZEN_DEBUG("PUT {}", Response);
+ Blob.SetContentType(ZenContentType::kCompressedBinary);
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), Blob);
- CloudCacheResult Result = detail::ConvertResponse(Response);
- if (!Result.Success)
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::PutCompressedBlob failed PUT. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-ContentType: '{}', "
- "ContentSize: {}, "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- "application/x-ue-comp",
- NiceBytes(Blob.Size()),
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
- return Result;
+ return detail::ConvertResponse(Response);
}
CloudCacheResult
@@ -754,58 +233,12 @@ CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& K
{
ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob");
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString();
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-comp"}});
- uint64_t SizeLeft = Payload.GetSize();
- CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0);
- auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) {
- size = Min<size_t>(size, SizeLeft);
- MutableMemoryView Data(buffer, size);
- Payload.CopyTo(Data, BufferIt);
- SizeLeft -= size;
- return true;
- };
- Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(SizeLeft), ReadCallback));
-
- cpr::Response Response = Session.Put();
- ZEN_DEBUG("PUT {}", Response);
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()),
+ Payload,
+ ZenContentType::kCompressedBinary);
- CloudCacheResult Result = detail::ConvertResponse(Response);
- if (!Result.Success)
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::PutCompressedBlob failed PUT. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-ContentType: '{}', "
- "ContentSize: {}, "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- "application/x-ue-comp",
- NiceBytes(Payload.GetSize()),
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
- return Result;
+ return detail::ConvertResponse(Response);
}
CloudCacheResult
@@ -813,49 +246,11 @@ CloudCacheSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBu
{
ZEN_TRACE_CPU("JupiterClient::PutObject");
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString();
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ Object.SetContentType(ZenContentType::kCbObject);
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), Object);
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}});
- Session.SetBody(cpr::Body{(const char*)Object.Data(), Object.Size()});
-
- cpr::Response Response = Session.Put();
- ZEN_DEBUG("PUT {}", Response);
-
- CloudCacheResult Result = detail::ConvertResponse(Response);
- if (!Result.Success)
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::PutObject failed PUT. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-ContentType: '{}', "
- "ContentSize: {}, "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- "application/x-ue-cb",
- NiceBytes(Object.GetSize()),
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
- return Result;
+ return detail::ConvertResponse(Response);
}
CloudCacheResult
@@ -863,45 +258,10 @@ CloudCacheSession::RefExists(std::string_view Namespace, std::string_view Bucket
{
ZEN_TRACE_CPU("JupiterClient::RefExists");
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString();
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()));
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}});
- Session.SetOption(cpr::Body{});
-
- cpr::Response Response = Session.Head();
- ZEN_DEBUG("HEAD {}", Response);
-
- CloudCacheResult Result = detail::ConvertResponse(Response);
- if (!Result.Success)
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::RefExists failed PUT. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
- return Result;
+ return detail::ConvertResponse(Response);
}
GetObjectReferencesResult
@@ -909,57 +269,20 @@ CloudCacheSession::GetObjectReferences(std::string_view Namespace, const IoHash&
{
ZEN_TRACE_CPU("JupiterClient::GetObjectReferences");
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString() << "/references";
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}});
- Session.SetOption(cpr::Body{});
-
- cpr::Response Response = Session.Get();
- ZEN_DEBUG("GET {}", Response);
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/references", Namespace, Key.ToHexString()),
+ {HttpClient::Accept(ZenContentType::kCbObject)});
GetObjectReferencesResult Result = {detail::ConvertResponse(Response)};
if (Result.Success)
{
- IoBuffer Buffer = IoBuffer(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size());
- const CbObject ReferencesResponse = LoadCompactBinaryObject(Buffer);
+ const CbObject ReferencesResponse = Response.AsObject();
for (auto& Item : ReferencesResponse["references"sv])
{
Result.References.insert(Item.AsHash());
}
}
- else
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::GetObjectReferences failed PUT. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-Accept: '{}', "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- "application/x-ue-cb",
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
-
return Result;
}
@@ -1002,73 +325,23 @@ CloudCacheSession::ObjectExists(std::string_view Namespace, const std::set<IoHas
std::vector<IoHash>
CloudCacheSession::Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes)
{
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl();
- Uri << "/api/v1/s/" << Namespace;
+ // ExtendableStringBuilder<256> Uri;
+ // Uri << m_CacheClient->ServiceUrl();
+ // Uri << "/api/v1/s/" << Namespace;
- ZEN_UNUSED(BucketId, ChunkHashes);
+ ZEN_UNUSED(Namespace, BucketId, ChunkHashes);
return {};
}
-cpr::Session&
-CloudCacheSession::GetSession()
-{
- return m_SessionState->GetSession();
-}
-
-CloudCacheAccessToken
-CloudCacheSession::GetAccessToken(bool RefreshToken)
-{
- return m_SessionState->GetAccessToken(RefreshToken);
-}
-
CloudCacheResult
CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key)
{
ZEN_TRACE_CPU("JupiterClient::CacheTypeExists");
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << Namespace << "/" << Key.ToHexString();
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}});
- Session.SetOption(cpr::Body{});
-
- cpr::Response Response = Session.Head();
- ZEN_DEBUG("HEAD {}", Response);
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/{}/{}/{}", TypeId, Namespace, Key.ToHexString()));
- CloudCacheResult Result = detail::ConvertResponse(Response);
- if (!Result.Success)
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::CacheTypeExists failed GET. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-Accept: '{}', "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- "application/x-ue-cb",
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
- return Result;
+ return detail::ConvertResponse(Response);
}
CloudCacheExistsResult
@@ -1083,58 +356,23 @@ CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view
Body << (Body.Size() != 1 ? ",\"" : "\"") << Key.ToHexString() << "\"";
}
Body << "]";
+ IoBuffer Payload = IoBuffer(IoBuffer::Wrap, Body.Data(), Body.Size());
+ Payload.SetContentType(ZenContentType::kJSON);
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << Namespace << "/exist";
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Post(fmt::format("/api/v1/{}/{}/exist", TypeId, Namespace),
+ Payload,
+ {HttpClient::Accept(ZenContentType::kCbObject)});
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(
- cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}, {"Content-Type", "application/json"}});
- Session.SetOption(cpr::Body(Body.ToString()));
-
- cpr::Response Response = Session.Post();
- ZEN_DEBUG("POST {}", Response);
CloudCacheExistsResult Result = {detail::ConvertResponse(Response)};
if (Result.Success)
{
- IoBuffer Buffer = IoBuffer(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size());
- const CbObject ExistsResponse = LoadCompactBinaryObject(Buffer);
+ const CbObject ExistsResponse = Response.AsObject();
for (auto& Item : ExistsResponse["needs"sv])
{
Result.Needs.insert(Item.AsHash());
}
}
- else
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::CacheTypeExists failed GET. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-Accept: '{}', "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- "application/x-ue-cb",
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
-
return Result;
}
@@ -1233,77 +471,39 @@ CloudCacheTokenProvider::CreateFromCallback(std::function<CloudCacheAccessToken(
return std::make_unique<CallbackTokenProvider>(std::move(Callback));
}
+static std::optional<std::function<HttpClientAccessToken()>>
+GetHttpClientAccessProvider(CloudCacheTokenProvider* TokenProvider)
+{
+ if (TokenProvider == nullptr)
+ {
+ return {};
+ }
+ auto ProviderFunc = [TokenProvider]() -> HttpClientAccessToken {
+ CloudCacheAccessToken Token = TokenProvider->AcquireAccessToken();
+ return HttpClientAccessToken{.Value = Token.Value, .ExpireTime = Token.ExpireTime};
+ };
+ return ProviderFunc;
+}
+
CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options, std::unique_ptr<CloudCacheTokenProvider> TokenProvider)
: m_Log(zen::logging::Get("jupiter"))
-, m_ServiceUrl(Options.ServiceUrl)
, m_DefaultDdcNamespace(Options.DdcNamespace)
, m_DefaultBlobStoreNamespace(Options.BlobStoreNamespace)
, m_ComputeCluster(Options.ComputeCluster)
-, m_ConnectTimeout(Options.ConnectTimeout)
-, m_Timeout(Options.Timeout)
, m_TokenProvider(std::move(TokenProvider))
-, m_AssumeHttp2(Options.AssumeHttp2)
+, m_HttpClient(Options.ServiceUrl,
+ HttpClientSettings{.ConnectTimeout = Options.ConnectTimeout,
+ .Timeout = Options.Timeout,
+ .AccessTokenProvider = GetHttpClientAccessProvider(m_TokenProvider.get()),
+ .AssumeHttp2 = Options.AssumeHttp2,
+ .AllowResume = Options.AllowResume,
+ .RetryCount = Options.RetryCount})
{
ZEN_ASSERT(m_TokenProvider.get() != nullptr);
}
CloudCacheClient::~CloudCacheClient()
{
- RwLock::ExclusiveLockScope _(m_SessionStateLock);
-
- for (auto State : m_SessionStateCache)
- {
- delete State;
- }
-}
-
-CloudCacheAccessToken
-CloudCacheClient::AcquireAccessToken()
-{
- ZEN_TRACE_CPU("JupiterClient::AcquireAccessToken");
-
- return m_TokenProvider->AcquireAccessToken();
-}
-
-detail::CloudCacheSessionState*
-CloudCacheClient::AllocSessionState()
-{
- detail::CloudCacheSessionState* State = nullptr;
-
- bool IsTokenValid = false;
-
- {
- RwLock::ExclusiveLockScope _(m_SessionStateLock);
-
- if (m_SessionStateCache.empty() == false)
- {
- State = m_SessionStateCache.front();
- IsTokenValid = State->m_AccessToken.IsValid();
-
- m_SessionStateCache.pop_front();
- }
- }
-
- if (State == nullptr)
- {
- State = new detail::CloudCacheSessionState(*this);
- }
-
- State->Reset(m_ConnectTimeout, m_Timeout, m_AssumeHttp2);
-
- if (IsTokenValid == false)
- {
- State->m_AccessToken = m_TokenProvider->AcquireAccessToken();
- }
-
- return State;
-}
-
-void
-CloudCacheClient::FreeSessionState(detail::CloudCacheSessionState* State)
-{
- RwLock::ExclusiveLockScope _(m_SessionStateLock);
- m_SessionStateCache.push_front(State);
}
} // namespace zen
diff --git a/src/zenserver/upstream/jupiter.h b/src/zenserver/upstream/jupiter.h
index b5aa95ed5..93f2cc883 100644
--- a/src/zenserver/upstream/jupiter.h
+++ b/src/zenserver/upstream/jupiter.h
@@ -6,6 +6,7 @@
#include <zencore/iohash.h>
#include <zencore/logging.h>
#include <zencore/thread.h>
+#include <zenhttp/httpclient.h>
#include <zenhttp/httpserver.h>
#include <atomic>
@@ -22,9 +23,6 @@ class Session;
}
namespace zen {
-namespace detail {
- struct CloudCacheSessionState;
-}
class CbObjectView;
class CloudCacheClient;
@@ -96,7 +94,11 @@ public:
~CloudCacheSession();
CloudCacheResult Authenticate();
- CloudCacheResult GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType);
+ CloudCacheResult GetRef(std::string_view Namespace,
+ std::string_view BucketId,
+ const IoHash& Key,
+ ZenContentType RefType,
+ std::filesystem::path TempFolderPath = {});
CloudCacheResult GetBlob(std::string_view Namespace, const IoHash& Key);
CloudCacheResult GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath = {});
CloudCacheResult GetObject(std::string_view Namespace, const IoHash& Key);
@@ -131,17 +133,14 @@ public:
CloudCacheClient& Client() { return *m_CacheClient; };
private:
- inline LoggerRef Log() { return m_Log; }
- cpr::Session& GetSession();
- CloudCacheAccessToken GetAccessToken(bool RefreshToken = false);
+ inline LoggerRef Log() { return m_Log; }
CloudCacheResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key);
CloudCacheExistsResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys);
- LoggerRef m_Log;
- RefPtr<CloudCacheClient> m_CacheClient;
- detail::CloudCacheSessionState* m_SessionState;
+ LoggerRef m_Log;
+ RefPtr<CloudCacheClient> m_CacheClient;
};
/**
@@ -178,6 +177,8 @@ struct CloudCacheClientOptions
std::chrono::milliseconds ConnectTimeout{5000};
std::chrono::milliseconds Timeout{};
bool AssumeHttp2 = false;
+ bool AllowResume = false;
+ uint8_t RetryCount = 0;
};
/**
@@ -189,30 +190,20 @@ public:
CloudCacheClient(const CloudCacheClientOptions& Options, std::unique_ptr<CloudCacheTokenProvider> TokenProvider);
~CloudCacheClient();
- CloudCacheAccessToken AcquireAccessToken();
- std::string_view DefaultDdcNamespace() const { return m_DefaultDdcNamespace; }
- std::string_view DefaultBlobStoreNamespace() const { return m_DefaultBlobStoreNamespace; }
- std::string_view ComputeCluster() const { return m_ComputeCluster; }
- std::string_view ServiceUrl() const { return m_ServiceUrl; }
+ std::string_view DefaultDdcNamespace() const { return m_DefaultDdcNamespace; }
+ std::string_view DefaultBlobStoreNamespace() const { return m_DefaultBlobStoreNamespace; }
+ std::string_view ComputeCluster() const { return m_ComputeCluster; }
+ std::string_view ServiceUrl() const { return m_HttpClient.GetBaseUri(); }
LoggerRef Logger() { return m_Log; }
private:
- LoggerRef m_Log;
- std::string m_ServiceUrl;
- std::string m_DefaultDdcNamespace;
- std::string m_DefaultBlobStoreNamespace;
- std::string m_ComputeCluster;
- std::chrono::milliseconds m_ConnectTimeout{};
- std::chrono::milliseconds m_Timeout{};
- std::unique_ptr<CloudCacheTokenProvider> m_TokenProvider;
- bool m_AssumeHttp2;
-
- RwLock m_SessionStateLock;
- std::list<detail::CloudCacheSessionState*> m_SessionStateCache;
-
- detail::CloudCacheSessionState* AllocSessionState();
- void FreeSessionState(detail::CloudCacheSessionState*);
+ LoggerRef m_Log;
+ const std::string m_DefaultDdcNamespace;
+ const std::string m_DefaultBlobStoreNamespace;
+ const std::string m_ComputeCluster;
+ const std::unique_ptr<CloudCacheTokenProvider> m_TokenProvider;
+ HttpClient m_HttpClient;
friend class CloudCacheSession;
};