aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-01-24 11:41:18 +0100
committerGitHub <[email protected]>2024-01-24 11:41:18 +0100
commit0e63573fbe9973f6b922656a785817a711581b78 (patch)
tree48e18f0b4aea958a536ba50f72f589a580c4b798 /src
parentoplog import/export improvements (#634) (diff)
downloadzen-0e63573fbe9973f6b922656a785817a711581b78.tar.xz
zen-0e63573fbe9973f6b922656a785817a711581b78.zip
Add retry with optional resume logic to HttpClient::Download (#639)
- Improvement: Refactored Jupiter upstream to use HttpClient - Improvement: Added retry and resume logic to HttpClient - Improvement: Added authentication support to HttpClient - Improvement: Clearer logging in GCV2 compact of FileCas/BlockStore - Improvement: Size details in oplog import logging
Diffstat (limited to 'src')
-rw-r--r--src/zenhttp/httpclient.cpp530
-rw-r--r--src/zenhttp/include/zenhttp/httpclient.h35
-rw-r--r--src/zenserver/projectstore/jupiterremoteprojectstore.cpp82
-rw-r--r--src/zenserver/projectstore/projectstore.cpp12
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp58
-rw-r--r--src/zenserver/upstream/jupiter.cpp1012
-rw-r--r--src/zenserver/upstream/jupiter.h51
-rw-r--r--src/zenstore/blockstore.cpp164
-rw-r--r--src/zenstore/filecas.cpp2
9 files changed, 688 insertions, 1258 deletions
diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp
index a29a08a3c..3b2a3baec 100644
--- a/src/zenhttp/httpclient.cpp
+++ b/src/zenhttp/httpclient.cpp
@@ -13,6 +13,7 @@
#include <zencore/session.h>
#include <zencore/sharedbuffer.h>
#include <zencore/stream.h>
+#include <zencore/string.h>
#include <zencore/testing.h>
#include <zencore/trace.h>
#include <zenhttp/formatters.h>
@@ -38,19 +39,19 @@ using namespace std::literals;
//
// CPR helpers
-cpr::Body
+static cpr::Body
AsCprBody(const CbObject& Obj)
{
return cpr::Body((const char*)Obj.GetBuffer().GetData(), Obj.GetBuffer().GetSize());
}
-cpr::Body
+static cpr::Body
AsCprBody(const IoBuffer& Obj)
{
return cpr::Body((const char*)Obj.GetData(), Obj.GetSize());
}
-cpr::Body
+static cpr::Body
AsCprBody(const CompositeBuffer& Buffers)
{
SharedBuffer Buffer = Buffers.Flatten();
@@ -62,7 +63,7 @@ AsCprBody(const CompositeBuffer& Buffers)
//////////////////////////////////////////////////////////////////////////
-HttpClient::Response
+static HttpClient::Response
ResponseWithPayload(cpr::Response& HttpResponse, const HttpResponseCode WorkResponseCode, IoBuffer&& Payload)
{
// This ends up doing a memcpy, would be good to get rid of it by streaming results
@@ -89,7 +90,7 @@ ResponseWithPayload(cpr::Response& HttpResponse, const HttpResponseCode WorkResp
.ElapsedSeconds = HttpResponse.elapsed};
}
-HttpClient::Response
+static HttpClient::Response
CommonResponse(cpr::Response&& HttpResponse, IoBuffer&& Payload = {})
{
const HttpResponseCode WorkResponseCode = HttpResponseCode(HttpResponse.status_code);
@@ -123,6 +124,51 @@ CommonResponse(cpr::Response&& HttpResponse, IoBuffer&& Payload = {})
}
}
+static bool
+ShouldRetry(const cpr::Response& Response)
+{
+ switch (Response.error.code)
+ {
+ case cpr::ErrorCode::OK:
+ break;
+ case cpr::ErrorCode::OPERATION_TIMEDOUT:
+ case cpr::ErrorCode::NETWORK_RECEIVE_ERROR:
+ case cpr::ErrorCode::NETWORK_SEND_FAILURE:
+ return true;
+ default:
+ return false;
+ }
+ switch ((HttpResponseCode)Response.status_code)
+ {
+ case HttpResponseCode::GatewayTimeout:
+ case HttpResponseCode::RequestTimeout:
+ return true;
+ default:
+ return false;
+ }
+};
+
+static cpr::Response
+DoWithRetry(std::function<cpr::Response()>&& Func, uint8_t RetryCount)
+{
+ uint8_t Attempt = 0;
+ cpr::Response Result = Func();
+ while (Attempt < RetryCount && ShouldRetry(Result))
+ {
+ Sleep(100 * (Attempt + 1));
+ Attempt++;
+ ZEN_INFO("{} Attempt {}/{}", CommonResponse(std::move(Result)).ErrorMessage("Retry"), Attempt, RetryCount + 1);
+ Result = Func();
+ }
+ return Result;
+}
+
+static std::pair<std::string, std::string>
+HeaderContentType(ZenContentType ContentType)
+{
+ return std::make_pair("Content-Type", std::string(MapContentTypeToString(ContentType)));
+}
+
//////////////////////////////////////////////////////////////////////////
struct HttpClient::Impl : public RefCounted
@@ -144,9 +190,13 @@ struct HttpClient::Impl : public RefCounted
ZEN_TRACE("GET {}", Result);
return Result;
}
- inline cpr::Response Download(cpr::WriteCallback&& write)
+ inline cpr::Response Download(cpr::WriteCallback&& Write, std::optional<cpr::HeaderCallback>&& Header = {})
{
- cpr::Response Result = CprSession->Download(write);
+ if (Header)
+ {
+ CprSession->SetHeaderCallback(std::move(Header.value()));
+ }
+ cpr::Response Result = CprSession->Download(Write);
ZEN_TRACE("GET {}", Result);
return Result;
}
@@ -185,11 +235,13 @@ struct HttpClient::Impl : public RefCounted
Session& operator=(Session&&) = delete;
};
- Session AllocSession(const std::string_view BaseUrl,
- const std::string_view Url,
- const HttpClientSettings& ConnectionSettings,
- const KeyValueMap& AdditionalHeader,
- const KeyValueMap& Parameters);
+ Session AllocSession(const std::string_view BaseUrl,
+ const std::string_view Url,
+ const HttpClientSettings& ConnectionSettings,
+ const KeyValueMap& AdditionalHeader,
+ const KeyValueMap& Parameters,
+ const std::string_view SessionId,
+ std::optional<HttpClientAccessToken> AccessToken);
LoggerRef Logger() { return m_Log; }
@@ -217,29 +269,26 @@ HttpClient::Impl::~Impl()
}
HttpClient::Impl::Session
-HttpClient::Impl::AllocSession(const std::string_view BaseUrl,
- const std::string_view ResourcePath,
- const HttpClientSettings& ConnectionSettings,
- const KeyValueMap& AdditionalHeader,
- const KeyValueMap& Parameters)
+HttpClient::Impl::AllocSession(const std::string_view BaseUrl,
+ const std::string_view ResourcePath,
+ const HttpClientSettings& ConnectionSettings,
+ const KeyValueMap& AdditionalHeader,
+ const KeyValueMap& Parameters,
+ const std::string_view SessionId,
+ std::optional<HttpClientAccessToken> AccessToken)
{
- bool IsNew = false;
cpr::Session* CprSession = nullptr;
m_SessionLock.WithExclusiveLock([&] {
- if (m_Sessions.empty())
- {
- CprSession = new cpr::Session();
- IsNew = true;
- }
- else
+ if (!m_Sessions.empty())
{
CprSession = m_Sessions.back();
m_Sessions.pop_back();
}
});
- if (IsNew)
+ if (CprSession == nullptr)
{
+ CprSession = new cpr::Session();
CprSession->SetConnectTimeout(ConnectionSettings.ConnectTimeout);
CprSession->SetTimeout(ConnectionSettings.Timeout);
if (ConnectionSettings.AssumeHttp2)
@@ -252,9 +301,13 @@ HttpClient::Impl::AllocSession(const std::string_view BaseUrl,
{
CprSession->SetHeader(cpr::Header(AdditionalHeader->begin(), AdditionalHeader->end()));
}
- else
+ if (!SessionId.empty())
{
- CprSession->SetHeader({});
+ CprSession->UpdateHeader({{"UE-Session", std::string(SessionId)}});
+ }
+ if (AccessToken)
+ {
+ CprSession->UpdateHeader({{"Authorization", AccessToken->Value}});
}
if (!Parameters->empty())
{
@@ -430,6 +483,9 @@ public:
return Buffer;
}
+ uint64_t GetSize() const { return m_WriteOffset; }
+ void ResetWritePos(uint64_t WriteOffset) { m_WriteOffset = WriteOffset; }
+
private:
void* m_FileHandle;
std::uint64_t m_WriteOffset;
@@ -452,12 +508,46 @@ HttpClient::~HttpClient()
{
}
+bool
+HttpClient::Authenticate()
+{
+ std::optional<HttpClientAccessToken> Token = GetAccessToken();
+ if (!Token)
+ {
+ return false;
+ }
+ return Token->IsValid();
+}
+
+const std::optional<HttpClientAccessToken>
+HttpClient::GetAccessToken()
+{
+ if (!m_ConnectionSettings.AccessTokenProvider.has_value())
+ {
+ return {};
+ }
+ {
+ RwLock::SharedLockScope _(m_AccessTokenLock);
+ if (m_CachedAccessToken.IsValid())
+ {
+ return m_CachedAccessToken;
+ }
+ }
+ RwLock::ExclusiveLockScope _(m_AccessTokenLock);
+ if (m_CachedAccessToken.IsValid())
+ {
+ return m_CachedAccessToken;
+ }
+ m_CachedAccessToken = m_ConnectionSettings.AccessTokenProvider.value()();
+ return m_CachedAccessToken;
+}
+
HttpClient::Response
HttpClient::TransactPackage(std::string_view Url, CbPackage Package, const KeyValueMap& AdditionalHeader)
{
ZEN_TRACE_CPU("HttpClient::TransactPackage");
- Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {});
+ Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
// First, list of offered chunks for filtering on the server end
@@ -482,7 +572,7 @@ HttpClient::TransactPackage(std::string_view Url, CbPackage Package, const KeyVa
BinaryWriter MemWriter;
Writer.Save(MemWriter);
- Sess->UpdateHeader({{"Content-Type", "application/x-ue-offer"}, {"UE-Session", m_SessionId}, {"UE-Request", RequestIdString}});
+ Sess->UpdateHeader({HeaderContentType(HttpContentType::kCbPackageOffer), {"UE-Request", RequestIdString}});
Sess->SetBody(cpr::Body{(const char*)MemWriter.Data(), MemWriter.Size()});
cpr::Response FilterResponse = Sess.Post();
@@ -524,7 +614,7 @@ HttpClient::TransactPackage(std::string_view Url, CbPackage Package, const KeyVa
CompositeBuffer Message = FormatPackageMessageBuffer(SendPackage);
SharedBuffer FlatMessage = Message.Flatten();
- Sess->UpdateHeader({{"Content-Type", "application/x-ue-cbpkg"}, {"UE-Session", m_SessionId}, {"UE-Request", RequestIdString}});
+ Sess->UpdateHeader({HeaderContentType(HttpContentType::kCbPackage), {"UE-Request", RequestIdString}});
Sess->SetBody(cpr::Body{(const char*)FlatMessage.GetData(), FlatMessage.GetSize()});
cpr::Response FilterResponse = Sess.Post();
@@ -556,20 +646,28 @@ HttpClient::Put(std::string_view Url, const IoBuffer& Payload, const KeyValueMap
{
ZEN_TRACE_CPU("HttpClient::Put");
- Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {});
- Sess->SetBody(AsCprBody(Payload));
- Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(Payload.GetContentType()))}});
-
- return CommonResponse(Sess.Put());
+ return CommonResponse(DoWithRetry(
+ [&]() {
+ Impl::Session Sess =
+ m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
+ Sess->SetBody(AsCprBody(Payload));
+ Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())});
+ return Sess.Put();
+ },
+ m_ConnectionSettings.RetryCount));
}
HttpClient::Response
HttpClient::Get(std::string_view Url, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters)
{
ZEN_TRACE_CPU("HttpClient::Get");
- Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters);
-
- return CommonResponse(Sess.Get());
+ return CommonResponse(DoWithRetry(
+ [&]() {
+ Impl::Session Sess =
+ m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters, m_SessionId, GetAccessToken());
+ return Sess.Get();
+ },
+ m_ConnectionSettings.RetryCount));
}
HttpClient::Response
@@ -577,9 +675,13 @@ HttpClient::Head(std::string_view Url, const KeyValueMap& AdditionalHeader)
{
ZEN_TRACE_CPU("HttpClient::Head");
- Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {});
-
- return CommonResponse(Sess.Head());
+ return CommonResponse(DoWithRetry(
+ [&]() {
+ Impl::Session Sess =
+ m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
+ return Sess.Head();
+ },
+ m_ConnectionSettings.RetryCount));
}
HttpClient::Response
@@ -587,9 +689,13 @@ HttpClient::Delete(std::string_view Url, const KeyValueMap& AdditionalHeader)
{
ZEN_TRACE_CPU("HttpClient::Delete");
- Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {});
-
- return CommonResponse(Sess.Delete());
+ return CommonResponse(DoWithRetry(
+ [&]() {
+ Impl::Session Sess =
+ m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
+ return Sess.Delete();
+ },
+ m_ConnectionSettings.RetryCount));
}
HttpClient::Response
@@ -597,9 +703,13 @@ HttpClient::Post(std::string_view Url, const KeyValueMap& AdditionalHeader, cons
{
ZEN_TRACE_CPU("HttpClient::PostNoPayload");
- Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters);
-
- return CommonResponse(Sess.Post());
+ return CommonResponse(DoWithRetry(
+ [&]() {
+ Impl::Session Sess =
+ m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters, m_SessionId, GetAccessToken());
+ return Sess.Post();
+ },
+ m_ConnectionSettings.RetryCount));
}
HttpClient::Response
@@ -607,12 +717,16 @@ HttpClient::Post(std::string_view Url, const IoBuffer& Payload, const KeyValueMa
{
ZEN_TRACE_CPU("HttpClient::PostWithPayload");
- Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {});
+ return CommonResponse(DoWithRetry(
+ [&]() {
+ Impl::Session Sess =
+ m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
- Sess->SetBody(AsCprBody(Payload));
- Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(Payload.GetContentType()))}});
-
- return CommonResponse(Sess.Post());
+ Sess->SetBody(AsCprBody(Payload));
+ Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())});
+ return Sess.Post();
+ },
+ m_ConnectionSettings.RetryCount));
}
HttpClient::Response
@@ -620,12 +734,16 @@ HttpClient::Post(std::string_view Url, CbObject Payload, const KeyValueMap& Addi
{
ZEN_TRACE_CPU("HttpClient::PostObjectPayload");
- Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {});
-
- Sess->SetBody(AsCprBody(Payload));
- Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(ZenContentType::kCbObject))}});
+ return CommonResponse(DoWithRetry(
+ [&]() {
+ Impl::Session Sess =
+ m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
- return CommonResponse(Sess.Post());
+ Sess->SetBody(AsCprBody(Payload));
+ Sess->UpdateHeader({HeaderContentType(ZenContentType::kCbObject)});
+ return Sess.Post();
+ },
+ m_ConnectionSettings.RetryCount));
}
HttpClient::Response
@@ -633,13 +751,17 @@ HttpClient::Post(std::string_view Url, CbPackage Pkg, const KeyValueMap& Additio
{
ZEN_TRACE_CPU("HttpClient::PostPackage");
- CompositeBuffer Message = zen::FormatPackageMessageBuffer(Pkg);
-
- Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {});
- Sess->SetBody(AsCprBody(Message));
- Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(ZenContentType::kCbPackage))}});
-
- return CommonResponse(Sess.Post());
+ return CommonResponse(DoWithRetry(
+ [&]() {
+ CompositeBuffer Message = zen::FormatPackageMessageBuffer(Pkg);
+
+ Impl::Session Sess =
+ m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
+ Sess->SetBody(AsCprBody(Message));
+ Sess->UpdateHeader({HeaderContentType(ZenContentType::kCbPackage)});
+ return Sess.Post();
+ },
+ m_ConnectionSettings.RetryCount));
}
HttpClient::Response
@@ -647,27 +769,32 @@ HttpClient::Upload(std::string_view Url, const IoBuffer& Payload, const KeyValue
{
ZEN_TRACE_CPU("HttpClient::Upload");
- Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {});
- Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(Payload.GetContentType()))}});
+ return CommonResponse(DoWithRetry(
+ [&]() {
+ Impl::Session Sess =
+ m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
+ Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())});
- uint64_t Offset = 0;
- if (Payload.IsWholeFile())
- {
- auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) {
- size = Min<size_t>(size, Payload.GetSize() - Offset);
- IoBuffer PayloadRange = IoBuffer(Payload, Offset, size);
- MutableMemoryView Data(buffer, size);
- Data.CopyFrom(PayloadRange.GetView());
- Offset += size;
- return true;
- };
- Sess->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
- }
- else
- {
- Sess->SetBody(AsCprBody(Payload));
- }
- return CommonResponse(Sess.Put());
+ uint64_t Offset = 0;
+ if (Payload.IsWholeFile())
+ {
+ auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) {
+ size = Min<size_t>(size, Payload.GetSize() - Offset);
+ IoBuffer PayloadRange = IoBuffer(Payload, Offset, size);
+ MutableMemoryView Data(buffer, size);
+ Data.CopyFrom(PayloadRange.GetView());
+ Offset += size;
+ return true;
+ };
+ Sess->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
+ }
+ else
+ {
+ Sess->SetBody(AsCprBody(Payload));
+ }
+ return Sess.Put();
+ },
+ m_ConnectionSettings.RetryCount));
}
HttpClient::Response
@@ -675,21 +802,26 @@ HttpClient::Upload(std::string_view Url, const CompositeBuffer& Payload, ZenCont
{
ZEN_TRACE_CPU("HttpClient::Upload");
- Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {});
- Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(ContentType))}});
-
- uint64_t SizeLeft = Payload.GetSize();
- CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0);
- auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) {
- size = Min<size_t>(size, SizeLeft);
- MutableMemoryView Data(buffer, size);
- Payload.CopyTo(Data, BufferIt);
- SizeLeft -= size;
- return true;
- };
- Sess->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
-
- return CommonResponse(Sess.Put());
+ return CommonResponse(DoWithRetry(
+ [&]() {
+ Impl::Session Sess =
+ m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
+ Sess->UpdateHeader({HeaderContentType(ContentType)});
+
+ uint64_t SizeLeft = Payload.GetSize();
+ CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0);
+ auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) {
+ size = Min<size_t>(size, SizeLeft);
+ MutableMemoryView Data(buffer, size);
+ Payload.CopyTo(Data, BufferIt);
+ SizeLeft -= size;
+ return true;
+ };
+ Sess->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback));
+
+ return Sess.Put();
+ },
+ m_ConnectionSettings.RetryCount));
}
HttpClient::Response
@@ -697,46 +829,170 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold
{
ZEN_TRACE_CPU("HttpClient::Download");
- Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {});
-
std::string PayloadString;
std::unique_ptr<TempPayloadFile> PayloadFile;
-
- cpr::Response Response = Sess.Download(cpr::WriteCallback{[&](std::string data, intptr_t) {
- if (!PayloadFile && (PayloadString.length() + data.length()) > (1024 * 1024))
- {
- PayloadFile = std::make_unique<TempPayloadFile>();
- std::error_code Ec = PayloadFile->Open(TempFolderPath);
- if (Ec)
- {
- ZEN_WARN("Failed to create temp file in '{}' for HttpClient::Download. Reason: {}", TempFolderPath.string(), Ec.message());
- return false;
- }
- PayloadFile->Write(PayloadString);
- PayloadString.clear();
- }
- if (PayloadFile)
- {
- std::error_code Ec = PayloadFile->Write(data);
- if (Ec)
- {
- ZEN_WARN("Failed to write to temp file in '{}' for HttpClient::Download. Reason: {}",
- TempFolderPath.string(),
- Ec.message());
- return false;
- }
- }
- else
- {
- PayloadString.append(data);
- }
- return true;
- }});
-
- if (!PayloadString.empty())
- {
- Response.text = std::move(PayloadString);
- }
+ cpr::Response Response = DoWithRetry(
+ [&]() {
+ auto DownloadCallback = [&](std::string data, intptr_t) {
+ if (!PayloadFile && (PayloadString.length() + data.length()) > (1024 * 1024))
+ {
+ PayloadFile = std::make_unique<TempPayloadFile>();
+ std::error_code Ec = PayloadFile->Open(TempFolderPath);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to create temp file in '{}' for HttpClient::Download. Reason: {}",
+ TempFolderPath.string(),
+ Ec.message());
+ return false;
+ }
+ PayloadFile->Write(PayloadString);
+ PayloadString.clear();
+ }
+ if (PayloadFile)
+ {
+ std::error_code Ec = PayloadFile->Write(data);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to write to temp file in '{}' for HttpClient::Download. Reason: {}",
+ TempFolderPath.string(),
+ Ec.message());
+ return false;
+ }
+ }
+ else
+ {
+ PayloadString.append(data);
+ }
+ return true;
+ };
+
+ cpr::Response Response;
+ {
+ Impl::Session Sess =
+ m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken());
+ Response = Sess.Download(cpr::WriteCallback{DownloadCallback});
+ }
+
+ if (m_ConnectionSettings.AllowResume)
+ {
+ auto SupportsRanges = [](const cpr::Response& Response) -> bool {
+ if (Response.header.find("Content-Range") != Response.header.end())
+ {
+ return true;
+ }
+ if (auto It = Response.header.find("Accept-Ranges"); It != Response.header.end())
+ {
+ return It->second == "bytes";
+ }
+ return false;
+ };
+
+ auto ShouldResume = [&SupportsRanges](const cpr::Response& Response) -> bool {
+ if (ShouldRetry(Response))
+ {
+ return SupportsRanges(Response);
+ }
+ return false;
+ };
+
+ if (ShouldResume(Response))
+ {
+ auto It = Response.header.find("Content-Length");
+ if (It != Response.header.end())
+ {
+ std::optional<int64_t> ContentLength = ParseInt<int64_t>(It->second);
+ if (ContentLength)
+ {
+ auto HeaderCallback = [&](std::string header, intptr_t) {
+ size_t DelimiterPos = header.find(':');
+ if (DelimiterPos != std::string::npos)
+ {
+ std::string Key = header.substr(0, DelimiterPos);
+ constexpr AsciiSet WhitespaceCharacters(" \v\f\t\r\n");
+ Key = AsciiSet::TrimSuffixWith(Key, WhitespaceCharacters);
+ Key = AsciiSet::TrimPrefixWith(Key, WhitespaceCharacters);
+
+ std::string Value = header.substr(DelimiterPos + 1);
+ Value = AsciiSet::TrimSuffixWith(Value, WhitespaceCharacters);
+ Value = AsciiSet::TrimPrefixWith(Value, WhitespaceCharacters);
+
+ Response.header.insert_or_assign(Key, Value);
+
+ if (Key == "Content-Range"sv)
+ {
+ if (Value.starts_with("bytes "))
+ {
+ size_t RangeStartEnd = Value.find('-', 6);
+ if (RangeStartEnd != std::string::npos)
+ {
+ const auto Start = ParseInt<uint64_t>(Value.substr(6, RangeStartEnd - 6));
+ if (Start)
+ {
+ uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length();
+ if (Start.value() == DownloadedSize)
+ {
+ return 1;
+ }
+ else if (Start.value() > DownloadedSize)
+ {
+ return 0;
+ }
+ if (PayloadFile)
+ {
+ PayloadFile->ResetWritePos(Start.value());
+ }
+ else
+ {
+ PayloadString = PayloadString.substr(0, Start.value());
+ }
+ return 1;
+ }
+ }
+ }
+ return 0;
+ }
+ }
+ return 1;
+ };
+
+ KeyValueMap HeadersWithRange(AdditionalHeader);
+ do
+ {
+ uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length();
+
+ std::string Range = fmt::format("{}-{}", DownloadedSize, ContentLength.value());
+ if (auto RangeIt = HeadersWithRange.Entries.find("Range"); RangeIt != HeadersWithRange.Entries.end())
+ {
+ if (RangeIt->second == Range)
+ {
+ // If we didn't make any progress, abort
+ break;
+ }
+ }
+ HeadersWithRange.Entries.insert_or_assign("Range", Range);
+
+ Impl::Session Sess = m_Impl->AllocSession(m_BaseUri,
+ Url,
+ m_ConnectionSettings,
+ HeadersWithRange,
+ {},
+ m_SessionId,
+ GetAccessToken());
+ Response.header.clear();
+ Response = Sess.Download(cpr::WriteCallback{DownloadCallback}, cpr::HeaderCallback{HeaderCallback});
+ } while (ShouldResume(Response));
+ }
+ }
+ }
+ }
+
+ if (!PayloadString.empty())
+ {
+ Response.text = std::move(PayloadString);
+ }
+ return Response;
+ },
+ m_ConnectionSettings.RetryCount);
return CommonResponse(std::move(Response), PayloadFile ? PayloadFile->DetachToIoBuffer() : IoBuffer{});
}
diff --git a/src/zenhttp/include/zenhttp/httpclient.h b/src/zenhttp/include/zenhttp/httpclient.h
index 9de5c7cce..f3559f214 100644
--- a/src/zenhttp/include/zenhttp/httpclient.h
+++ b/src/zenhttp/include/zenhttp/httpclient.h
@@ -6,9 +6,11 @@
#include <zencore/iobuffer.h>
#include <zencore/logbase.h>
+#include <zencore/thread.h>
#include <zencore/uid.h>
#include <zenhttp/httpcommon.h>
+#include <functional>
#include <optional>
#include <unordered_map>
@@ -27,12 +29,32 @@ class CompositeBuffer;
*/
+struct HttpClientAccessToken
+{
+ using Clock = std::chrono::system_clock;
+ using TimePoint = Clock::time_point;
+
+ static constexpr int64_t ExpireMarginInSeconds = 30;
+
+ std::string Value;
+ TimePoint ExpireTime;
+
+ bool IsValid() const
+ {
+ return Value.empty() == false &&
+ ExpireMarginInSeconds < std::chrono::duration_cast<std::chrono::seconds>(ExpireTime - Clock::now()).count();
+ }
+};
+
struct HttpClientSettings
{
- std::string LogCategory = "httpclient";
- std::chrono::milliseconds ConnectTimeout{3000};
- std::chrono::milliseconds Timeout{};
- bool AssumeHttp2 = false;
+ std::string LogCategory = "httpclient";
+ std::chrono::milliseconds ConnectTimeout{3000};
+ std::chrono::milliseconds Timeout{};
+ std::optional<std::function<HttpClientAccessToken()>> AccessTokenProvider;
+ bool AssumeHttp2 = false;
+ bool AllowResume = false;
+ uint8_t RetryCount = 0;
};
class HttpClient
@@ -134,6 +156,7 @@ public:
const CompositeBuffer& Payload,
ZenContentType ContentType,
const KeyValueMap& AdditionalHeader = {});
+
[[nodiscard]] Response Download(std::string_view Url,
const std::filesystem::path& TempFolderPath,
const KeyValueMap& AdditionalHeader = {});
@@ -147,14 +170,18 @@ public:
LoggerRef Logger() { return m_Log; }
std::string_view GetBaseUri() const { return m_BaseUri; }
+ bool Authenticate();
private:
+ const std::optional<HttpClientAccessToken> GetAccessToken();
struct Impl;
LoggerRef m_Log;
std::string m_BaseUri;
std::string m_SessionId;
const HttpClientSettings m_ConnectionSettings;
+ RwLock m_AccessTokenLock;
+ HttpClientAccessToken m_CachedAccessToken;
Ref<Impl> m_Impl;
};
diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
index 9d8f6c17b..c9f1f5f6f 100644
--- a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
@@ -54,19 +54,8 @@ public:
virtual SaveResult SaveContainer(const IoBuffer& Payload) override
{
- const int32_t MaxAttempts = 3;
- PutRefResult PutResult;
- {
- CloudCacheSession Session(m_CloudClient.Get());
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !PutResult.Success; Attempt++)
- {
- PutResult = Session.PutRef(m_Namespace, m_Bucket, m_Key, Payload, ZenContentType::kCbObject);
- if (!PutResult.Success)
- {
- Sleep(100 * (Attempt + 1));
- }
- }
- }
+ CloudCacheSession Session(m_CloudClient.Get());
+ PutRefResult PutResult = Session.PutRef(m_Namespace, m_Bucket, m_Key, Payload, ZenContentType::kCbObject);
SaveResult Result{ConvertResult(PutResult), {PutResult.Needs.begin(), PutResult.Needs.end()}, PutResult.RawHash};
if (Result.ErrorCode)
@@ -83,19 +72,8 @@ public:
virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override
{
- const int32_t MaxAttempts = 3;
- CloudCacheResult PutResult;
- {
- CloudCacheSession Session(m_CloudClient.Get());
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !PutResult.Success; Attempt++)
- {
- PutResult = Session.PutCompressedBlob(m_Namespace, RawHash, Payload);
- if (!PutResult.Success)
- {
- Sleep(100 * (Attempt + 1));
- }
- }
- }
+ CloudCacheSession Session(m_CloudClient.Get());
+ CloudCacheResult PutResult = Session.PutCompressedBlob(m_Namespace, RawHash, Payload);
SaveAttachmentResult Result{ConvertResult(PutResult)};
if (Result.ErrorCode)
@@ -126,20 +104,9 @@ public:
virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) override
{
- const int32_t MaxAttempts = 3;
- FinalizeRefResult FinalizeRefResult;
- {
- CloudCacheSession Session(m_CloudClient.Get());
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !FinalizeRefResult.Success; Attempt++)
- {
- FinalizeRefResult = Session.FinalizeRef(m_Namespace, m_Bucket, m_Key, RawHash);
- if (!FinalizeRefResult.Success)
- {
- Sleep(100 * (Attempt + 1));
- }
- }
- }
- FinalizeResult Result{ConvertResult(FinalizeRefResult), {FinalizeRefResult.Needs.begin(), FinalizeRefResult.Needs.end()}};
+ CloudCacheSession Session(m_CloudClient.Get());
+ FinalizeRefResult FinalizeRefResult = Session.FinalizeRef(m_Namespace, m_Bucket, m_Key, RawHash);
+ FinalizeResult Result{ConvertResult(FinalizeRefResult), {FinalizeRefResult.Needs.begin(), FinalizeRefResult.Needs.end()}};
if (Result.ErrorCode)
{
Result.Reason = fmt::format("Failed finalizing oplog container to {}/{}/{}/{}. Reason: '{}'",
@@ -165,19 +132,8 @@ public:
virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override
{
- const int32_t MaxAttempts = 3;
- CloudCacheResult GetResult;
- {
- CloudCacheSession Session(m_CloudClient.Get());
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !GetResult.Success; Attempt++)
- {
- GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath);
- if (!GetResult.Success)
- {
- Sleep(100 * (Attempt + 1));
- }
- }
- }
+ CloudCacheSession Session(m_CloudClient.Get());
+ CloudCacheResult GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath);
LoadAttachmentResult Result{ConvertResult(GetResult), std::move(GetResult.Response)};
if (GetResult.ErrorCode)
{
@@ -210,20 +166,8 @@ public:
private:
LoadContainerResult LoadContainer(const IoHash& Key)
{
- const int32_t MaxAttempts = 3;
- CloudCacheResult GetResult;
- {
- CloudCacheSession Session(m_CloudClient.Get());
- for (int32_t Attempt = 0; Attempt < MaxAttempts && !GetResult.Success; Attempt++)
- {
- GetResult = Session.GetRef(m_Namespace, m_Bucket, Key, ZenContentType::kCbObject);
- if (!GetResult.Success)
- {
- Sleep(100 * (Attempt + 1));
- }
- }
- }
-
+ CloudCacheSession Session(m_CloudClient.Get());
+ CloudCacheResult GetResult = Session.GetRef(m_Namespace, m_Bucket, Key, ZenContentType::kCbObject, m_TempFilePath);
if (GetResult.ErrorCode || !GetResult.Success)
{
LoadContainerResult Result{ConvertResult(GetResult)};
@@ -312,7 +256,9 @@ CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, const std::fi
.ServiceUrl = Url,
.ConnectTimeout = std::chrono::milliseconds(2000),
.Timeout = std::chrono::milliseconds(1800000),
- .AssumeHttp2 = Options.AssumeHttp2};
+ .AssumeHttp2 = Options.AssumeHttp2,
+ .AllowResume = true,
+ .RetryCount = 2};
// 1) Access token as parameter in request
// 2) Environment variable (different win vs linux/mac)
// 3) openid-provider (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 42af9b79b..f117a4203 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -3593,12 +3593,16 @@ ProjectStore::CreateReferenceCheckers(GcCtx& Ctx)
Checkers.reserve(OpLogs.size());
for (const std::string& OpLogId : OpLogs)
{
- ProjectStore::Oplog* Oplog = Project->OpenOplog(OpLogId);
- GcClock::TimePoint Now = GcClock::Now();
- bool TryPreCache = Project->LastOplogAccessTime(OpLogId) < (Now - std::chrono::minutes(5));
+ ProjectStore::Oplog* Oplog = Project->OpenOplog(OpLogId);
+ if (Oplog == nullptr)
+ {
+ continue;
+ }
+ GcClock::TimePoint Now = GcClock::Now();
+ bool TryPreCache = Project->LastOplogAccessTime(OpLogId) < (Now - std::chrono::minutes(5));
Checkers.emplace_back(new ProjectStoreReferenceChecker(*Oplog, TryPreCache));
+ OplogCount++;
}
- OplogCount += OpLogs.size();
}
}
catch (std::exception&)
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index ddab7432d..83cec4725 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -560,7 +560,7 @@ BuildContainer(CidStore& ChunkStore,
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {});
ReportMessage(OptionalContext,
- fmt::format("Failed to build container ({}): '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ fmt::format("Failed to build container ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
BlockCreateLatch.CountDown();
while (!BlockCreateLatch.Wait(1000))
@@ -893,11 +893,9 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
"Invalid attachment",
fmt::format("Upload requested of unknown attachment '{}'", Needed));
- ReportMessage(OptionalContext,
- fmt::format("Failed to upload attachment '{}'. ({}): '{}'",
- Needed,
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
+ ReportMessage(
+ OptionalContext,
+ fmt::format("Failed to upload attachment '{}'. ({}): {}", Needed, RemoteResult.GetError(), RemoteResult.GetErrorReason()));
return;
}
}
@@ -969,7 +967,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound),
fmt::format("Failed to find attachment {}", RawHash),
{});
- ZEN_WARN("Failed to save attachment '{}' ({}): '{}'", RawHash, RemoteResult.GetError(), RemoteResult.GetErrorReason());
+ ZEN_WARN("Failed to save attachment '{}' ({}): {}", RawHash, RemoteResult.GetError(), RemoteResult.GetErrorReason());
return;
}
@@ -978,7 +976,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
{
RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
ReportMessage(OptionalContext,
- fmt::format("Failed to save attachment '{}', {} ({}): '{}'",
+ fmt::format("Failed to save attachment '{}', {} ({}): {}",
RawHash,
NiceBytes(Payload.GetSize()),
RemoteResult.GetError(),
@@ -1031,7 +1029,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
{
RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
ReportMessage(OptionalContext,
- fmt::format("Failed to save attachment '{}', {} ({}): '{}'",
+ fmt::format("Failed to save attachment '{}', {} ({}): {}",
RawHash,
NiceBytes(Payload.GetSize()),
RemoteResult.GetError(),
@@ -1108,7 +1106,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool,
{
RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
ReportMessage(OptionalContext,
- fmt::format("Failed to save attachments with {} chunks ({}): '{}'",
+ fmt::format("Failed to save attachments with {} chunks ({}): {}",
Chunks.size(),
RemoteResult.GetError(),
RemoteResult.GetErrorReason()));
@@ -1230,7 +1228,7 @@ SaveOplog(CidStore& ChunkStore,
{
RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
ReportMessage(OptionalContext,
- fmt::format("Failed to save attachment ({}): '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ fmt::format("Failed to save attachment ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
return;
}
ZEN_DEBUG("Saved block {}, {}", BlockHash, NiceBytes(CompressedBlock.GetCompressedSize()));
@@ -1270,10 +1268,9 @@ SaveOplog(CidStore& ChunkStore,
{
if (BaseContainerResult.ErrorCode)
{
- ReportMessage(OptionalContext,
- fmt::format("Failed to load oplog base container: '{}', error code: {}",
- BaseContainerResult.Reason,
- BaseContainerResult.ErrorCode));
+ ReportMessage(
+ OptionalContext,
+ fmt::format("Failed to load oplog base container ({}): {}", BaseContainerResult.ErrorCode, BaseContainerResult.Reason));
}
else
{
@@ -1337,7 +1334,7 @@ SaveOplog(CidStore& ChunkStore,
{
RemoteResult.SetError(ContainerSaveResult.ErrorCode, ContainerSaveResult.Reason, "Failed to save oplog container");
ReportMessage(OptionalContext,
- fmt::format("Failed to save oplog container ({}): '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ fmt::format("Failed to save oplog container ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
}
else
{
@@ -1372,7 +1369,7 @@ SaveOplog(CidStore& ChunkStore,
{
RemoteResult.SetError(ContainerFinalizeResult.ErrorCode, ContainerFinalizeResult.Reason, ContainerFinalizeResult.Text);
ReportMessage(OptionalContext,
- fmt::format("Failed to finalize oplog container {} ({}): '{}'",
+ fmt::format("Failed to finalize oplog container {} ({}): {}",
ContainerSaveResult.RawHash,
RemoteResult.GetError(),
RemoteResult.GetErrorReason()));
@@ -1635,7 +1632,7 @@ LoadOplog(CidStore& ChunkStore,
if (Result.ErrorCode)
{
ReportMessage(OptionalContext,
- fmt::format("Failed to load attachments with {} chunks ({}): '{}'",
+ fmt::format("Failed to load attachments with {} chunks ({}): {}",
Chunks.size(),
RemoteResult.GetError(),
RemoteResult.GetErrorReason()));
@@ -1691,7 +1688,7 @@ LoadOplog(CidStore& ChunkStore,
if (BlockResult.ErrorCode)
{
ReportMessage(OptionalContext,
- fmt::format("Failed to download block attachment {} ({}): '{}'",
+ fmt::format("Failed to download block attachment {} ({}): {}",
BlockHash,
RemoteResult.GetError(),
RemoteResult.GetErrorReason()));
@@ -1706,18 +1703,20 @@ LoadOplog(CidStore& ChunkStore,
return;
}
Info.AttachmentBlocksDownloaded.fetch_add(1);
- ZEN_INFO("Loaded block attachment '{}' in {}",
+ uint64_t BlockSize = BlockResult.Bytes.GetSize();
+ ZEN_INFO("Loaded block attachment '{}' in {} ({})",
BlockHash,
- NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)));
+ NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)),
+ NiceBytes(BlockSize));
if (RemoteResult.IsError())
{
return;
}
+ Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize);
bool StoreChunksOK =
IterateBlock(std::move(BlockResult.Bytes), [&ChunkStore, &Info](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) {
- uint64_t ChunkSize = Chunk.GetCompressedSize();
- Info.AttachmentBlockBytesDownloaded.fetch_add(ChunkSize);
+ uint64_t ChunkSize = Chunk.GetCompressedSize();
CidStore::InsertResult InsertResult =
ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash);
if (InsertResult.New)
@@ -1730,7 +1729,7 @@ LoadOplog(CidStore& ChunkStore,
if (!StoreChunksOK)
{
ReportMessage(OptionalContext,
- fmt::format("Block attachment {} has invalid format ({}): '{}'",
+ fmt::format("Block attachment {} has invalid format ({}): {}",
BlockHash,
RemoteResult.GetError(),
RemoteResult.GetErrorReason()));
@@ -1788,20 +1787,21 @@ LoadOplog(CidStore& ChunkStore,
}
return;
}
- ZEN_INFO("Loaded large attachment '{}' in {}",
+ uint64_t AttachmentSize = AttachmentResult.Bytes.GetSize();
+ ZEN_INFO("Loaded large attachment '{}' in {} ({})",
RawHash,
- NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)));
+ NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)),
+ NiceBytes(AttachmentSize));
Info.AttachmentsDownloaded.fetch_add(1);
if (RemoteResult.IsError())
{
return;
}
- uint64_t ChunkSize = AttachmentResult.Bytes.GetSize();
- Info.AttachmentBytesDownloaded.fetch_add(ChunkSize);
+ Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize);
CidStore::InsertResult InsertResult = ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash);
if (InsertResult.New)
{
- Info.AttachmentBytesStored.fetch_add(ChunkSize);
+ Info.AttachmentBytesStored.fetch_add(AttachmentSize);
Info.AttachmentsStored.fetch_add(1);
}
});
diff --git a/src/zenserver/upstream/jupiter.cpp b/src/zenserver/upstream/jupiter.cpp
index e4d45e316..bf2538908 100644
--- a/src/zenserver/upstream/jupiter.cpp
+++ b/src/zenserver/upstream/jupiter.cpp
@@ -16,7 +16,6 @@
#include <zenutil/basicfile.h>
ZEN_THIRD_PARTY_INCLUDES_START
-#include <cpr/cpr.h>
#include <fmt/format.h>
ZEN_THIRD_PARTY_INCLUDES_END
@@ -32,277 +31,70 @@ using namespace std::literals;
namespace zen {
namespace detail {
- struct CloudCacheSessionState
+ CloudCacheResult ConvertResponse(const HttpClient::Response& Response, const std::string_view ErrorPrefix = ""sv)
{
- CloudCacheSessionState(CloudCacheClient& Client) : m_Client(Client) {}
-
- const CloudCacheAccessToken& GetAccessToken(bool RefreshToken)
- {
- if (RefreshToken)
- {
- m_AccessToken = m_Client.AcquireAccessToken();
- }
-
- return m_AccessToken;
- }
-
- cpr::Session& GetSession() { return m_Session; }
-
- void Reset(std::chrono::milliseconds ConnectTimeout, std::chrono::milliseconds Timeout, bool AssumeHttp2)
+ if (Response.Error)
{
- m_Session.SetBody({});
- m_Session.SetHeader({});
- m_Session.SetConnectTimeout(ConnectTimeout);
- m_Session.SetTimeout(Timeout);
- if (AssumeHttp2)
- {
- m_Session.SetHttpVersion(cpr::HttpVersion{cpr::HttpVersionCode::VERSION_2_0_PRIOR_KNOWLEDGE});
- }
- }
-
- private:
- friend class zen::CloudCacheClient;
-
- CloudCacheClient& m_Client;
- CloudCacheAccessToken m_AccessToken;
- cpr::Session m_Session;
- };
-
- CloudCacheResult ConvertResponse(const cpr::Response& Response)
- {
- if (Response.error)
- {
- return {.ElapsedSeconds = Response.elapsed,
- .ErrorCode = static_cast<int32_t>(Response.error.code),
- .Reason = Response.error.message,
+ return {.ElapsedSeconds = Response.ElapsedSeconds,
+ .ErrorCode = Response.Error.value().ErrorCode,
+ .Reason = Response.ErrorMessage(ErrorPrefix),
.Success = false};
}
- if (!IsHttpSuccessCode(Response.status_code))
+ if (!Response.IsSuccess())
{
- return {.ElapsedSeconds = Response.elapsed,
- .ErrorCode = static_cast<int32_t>(Response.status_code),
- .Reason = Response.reason.empty() ? Response.text : Response.reason,
+ return {.ElapsedSeconds = Response.ElapsedSeconds,
+ .ErrorCode = static_cast<int32_t>(Response.StatusCode),
+ .Reason = Response.ErrorMessage(ErrorPrefix),
.Success = false};
}
- return {.Bytes = Response.downloaded_bytes,
- .ElapsedSeconds = Response.elapsed,
+ return {.Response = Response.ResponsePayload,
+ .Bytes = Response.DownloadedBytes,
+ .ElapsedSeconds = Response.ElapsedSeconds,
.ErrorCode = 0,
- .Reason = Response.reason,
.Success = true};
}
-
- cpr::Response GetWithStreaming(cpr::Session& Session, std::filesystem::path TempFolderPath, std::string_view Name, IoBuffer& OutBuffer)
- {
- if (TempFolderPath.empty())
- {
- return Session.Get();
- }
-
- std::string PayloadString;
- std::shared_ptr<BasicFile> PayloadFile;
-
- auto _ = MakeGuard([&]() {
- if (PayloadFile)
- {
- PayloadFile.reset();
- std::filesystem::path TempPath = TempFolderPath / Name;
- std::error_code Ec;
- std::filesystem::remove(TempPath, Ec);
- }
- });
-
- uint64_t Offset = 0;
- Session.SetWriteCallback(cpr::WriteCallback{[&](std::string data, intptr_t) {
- if (!PayloadFile && (PayloadString.length() + data.length()) > (1024 * 1024))
- {
- std::filesystem::path TempPath = TempFolderPath / Name;
- PayloadFile = std::make_shared<BasicFile>();
- PayloadFile->Open(TempPath, BasicFile::Mode::kTruncateDelete);
- PayloadFile->Write(PayloadString.data(), PayloadString.size(), Offset);
- Offset += PayloadString.size();
- PayloadString.clear();
- }
- if (PayloadFile)
- {
- PayloadFile->Write(data.data(), data.size(), Offset);
- Offset += data.size();
- }
- else
- {
- PayloadString.append(data);
- }
- return true;
- }});
-
- cpr::Response Response = Session.Get();
-
- if (!Response.error && IsHttpSuccessCode(Response.status_code))
- {
- if (PayloadFile)
- {
- uint64_t PayloadSize = PayloadFile->FileSize();
- void* FileHandle = PayloadFile->Detach();
- PayloadFile.reset();
- OutBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, PayloadSize, /*IsWholeFile*/ true);
- OutBuffer.SetDeleteOnClose(true);
- }
- else
- {
- OutBuffer = IoBufferBuilder::MakeCloneFromMemory(PayloadString.data(), PayloadString.size());
- }
- return Response;
- }
-
- Response.text.swap(PayloadString);
- return Response;
- }
-
- static std::optional<zen::HttpContentType> TryGetContentType(const cpr::Response& Response)
- {
- if (auto It = Response.header.find("Content-Type"); It != Response.header.end())
- {
- zen::HttpContentType ContentType = zen::ParseContentType(It->second);
- if (ContentType != zen::HttpContentType::kUnknownContentType)
- {
- return ContentType;
- }
- }
- return {};
- }
-
- static IoBuffer MakeBufferFromResponseIfKnownFormat(const cpr::Response& Response)
- {
- std::optional<zen::HttpContentType> ContentType = TryGetContentType(Response);
- if (ContentType)
- {
- IoBuffer Buffer = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size());
- Buffer.SetContentType(ContentType.value());
- return Buffer;
- }
- return {};
- }
-
} // namespace detail
CloudCacheSession::CloudCacheSession(CloudCacheClient* CacheClient) : m_Log(CacheClient->Logger()), m_CacheClient(CacheClient)
{
- m_SessionState = m_CacheClient->AllocSessionState();
}
CloudCacheSession::~CloudCacheSession()
{
- m_CacheClient->FreeSessionState(m_SessionState);
}
CloudCacheResult
CloudCacheSession::Authenticate()
{
- const bool RefreshToken = true;
- const CloudCacheAccessToken& AccessToken = GetAccessToken(RefreshToken);
-
- return {.Success = AccessToken.IsValid()};
+ bool OK = m_CacheClient->m_HttpClient.Authenticate();
+ return {.Success = OK};
}
CloudCacheResult
-CloudCacheSession::GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType)
+CloudCacheSession::GetRef(std::string_view Namespace,
+ std::string_view BucketId,
+ const IoHash& Key,
+ ZenContentType RefType,
+ std::filesystem::path TempFolderPath)
{
- const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream";
-
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString();
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", ContentType}});
- Session.SetOption(cpr::Body{});
+ ZEN_TRACE_CPU("JupiterClient::GetRef");
- cpr::Response Response = Session.Get();
- ZEN_DEBUG("GET {}", Response);
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()),
+ TempFolderPath,
+ {HttpClient::Accept(RefType)});
- CloudCacheResult Result = detail::ConvertResponse(Response);
- if (Result.Success)
- {
- Result.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size());
- }
- else
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::GetRef failed GET. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-Accept: '{}', "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- ContentType,
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
- return Result;
+ return detail::ConvertResponse(Response, "CloudCacheSession::GetRef"sv);
}
CloudCacheResult
CloudCacheSession::GetBlob(std::string_view Namespace, const IoHash& Key)
{
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << Namespace << "/" << Key.ToHexString();
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/octet-stream"}});
- Session.SetOption(cpr::Body{});
-
- cpr::Response Response = Session.Get();
- ZEN_DEBUG("GET {}", Response);
+ ZEN_TRACE_CPU("JupiterClient::GetBlob");
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()),
+ {HttpClient::Accept(ZenContentType::kBinary)});
- CloudCacheResult Result = detail::ConvertResponse(Response);
- if (Result.Success)
- {
- Result.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size());
- }
- else
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::GetBlob failed GET. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-Accept: '{}', "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- "application/octet-stream",
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
- return Result;
+ return detail::ConvertResponse(Response);
}
CloudCacheResult
@@ -310,58 +102,12 @@ CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& K
{
ZEN_TRACE_CPU("JupiterClient::GetCompressedBlob");
- ExtendableStringBuilder<256> Uri;
- std::string KeyString = Key.ToHexString();
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << KeyString;
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()),
+ TempFolderPath,
+ {HttpClient::Accept(ZenContentType::kCompressedBinary)});
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-comp"}});
- Session.SetOption(cpr::Body{});
-
- IoBuffer Payload;
- cpr::Response Response = detail::GetWithStreaming(Session, TempFolderPath, KeyString, Payload);
- ZEN_DEBUG("GET {}", Response);
-
- CloudCacheResult Result = detail::ConvertResponse(Response);
- if (Result.Success)
- {
- Result.Response = std::move(Payload);
- }
- else
- {
- std::optional<zen::HttpContentType> ContentType = detail::TryGetContentType(Response);
- if (ContentType.has_value())
- {
- Result.Response = std::move(Payload);
- Result.Response.SetContentType(ContentType.value());
- }
- ZEN_WARN(
- "CloudCacheSession::GetCompressedBlob failed GET. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-Accept: '{}', "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- "application/x-ue-comp",
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
- return Result;
+ return detail::ConvertResponse(Response);
}
CloudCacheResult
@@ -373,59 +119,14 @@ CloudCacheSession::GetInlineBlob(std::string_view Namespace,
{
ZEN_TRACE_CPU("JupiterClient::GetInlineBlob");
- ExtendableStringBuilder<256> Uri;
- std::string KeyString = Key.ToHexString();
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << KeyString;
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-jupiter-inline"}});
- Session.SetOption(cpr::Body{});
-
- IoBuffer Payload;
- cpr::Response Response = detail::GetWithStreaming(Session, TempFolderPath, KeyString, Payload);
- ZEN_DEBUG("GET {}", Response);
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()),
+ TempFolderPath,
+ {{"Accept", "application/x-jupiter-inline"}});
CloudCacheResult Result = detail::ConvertResponse(Response);
- if (Result.Success)
- {
- Result.Response = std::move(Payload);
- }
- else
- {
- std::optional<zen::HttpContentType> ContentType = detail::TryGetContentType(Response);
- if (ContentType.has_value())
- {
- Result.Response = std::move(Payload);
- Result.Response.SetContentType(ContentType.value());
- }
- ZEN_WARN(
- "CloudCacheSession::GetInlineBlob failed GET. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-Accept: '{}', "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- "application/x-jupiter-inline",
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
- if (auto It = Response.header.find("X-Jupiter-InlinePayloadHash"); It != Response.header.end())
+ if (auto It = Response.Header->find("X-Jupiter-InlinePayloadHash"); It != Response.Header->end())
{
const std::string& PayloadHashHeader = It->second;
if (PayloadHashHeader.length() == IoHash::StringLength)
@@ -442,52 +143,10 @@ CloudCacheSession::GetObject(std::string_view Namespace, const IoHash& Key)
{
ZEN_TRACE_CPU("JupiterClient::GetObject");
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString();
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}});
- Session.SetOption(cpr::Body{});
-
- cpr::Response Response = Session.Get();
- ZEN_DEBUG("GET {}", Response);
-
- CloudCacheResult Result = detail::ConvertResponse(Response);
- if (Result.Success)
- {
- Result.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size());
- }
- else
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::GetObject failed GET. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-Accept: '{}', "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- "application/x-ue-cb",
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()),
+ {HttpClient::Accept(ZenContentType::kCbObject)});
- return Result;
+ return detail::ConvertResponse(Response);
}
PutRefResult
@@ -495,29 +154,18 @@ CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId,
{
ZEN_TRACE_CPU("JupiterClient::PutRef");
- IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size());
-
- const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream";
+ Ref.SetContentType(RefType);
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString();
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(
- cpr::Header{{"Authorization", AccessToken.Value}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", ContentType}});
- Session.SetBody(cpr::Body{(const char*)Ref.Data(), Ref.Size()});
+ IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size());
- cpr::Response Response = Session.Put();
- ZEN_DEBUG("PUT {}", Response);
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Put(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), Ref);
PutRefResult Result = {detail::ConvertResponse(Response)};
if (Result.Success)
{
std::string JsonError;
- json11::Json Json = json11::Json::parse(Response.text, JsonError);
+ json11::Json Json = json11::Json::parse(Response.ToText(), JsonError);
if (JsonError.empty())
{
json11::Json::array Needs = Json["needs"].array_items();
@@ -528,37 +176,6 @@ CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId,
}
Result.RawHash = Hash;
}
- else
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::PutRef failed PUT. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-X-Jupiter-IoHash: '{}', "
- "Header-ContentType: '{}', "
- "ContentSize: {}, "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- Hash.ToHexString(),
- ContentType,
- NiceBytes(Ref.Size()),
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
-
return Result;
}
@@ -567,28 +184,16 @@ CloudCacheSession::FinalizeRef(std::string_view Namespace, std::string_view Buck
{
ZEN_TRACE_CPU("JupiterClient::FinalizeRef");
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString() << "/finalize/"
- << RefHash.ToHexString();
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value},
- {"X-Jupiter-IoHash", RefHash.ToHexString()},
- {"Content-Type", "application/x-ue-cb"}});
- Session.SetBody(cpr::Body{});
-
- cpr::Response Response = Session.Post();
- ZEN_DEBUG("POST {}", Response);
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Post(
+ fmt::format("/api/v1/refs/{}/{}/{}/finalize/{}", Namespace, BucketId, Key.ToHexString(), RefHash.ToHexString()),
+ {{"X-Jupiter-IoHash", RefHash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}});
FinalizeRefResult Result = {detail::ConvertResponse(Response)};
if (Result.Success)
{
std::string JsonError;
- json11::Json Json = json11::Json::parse(Response.text, JsonError);
+ json11::Json Json = json11::Json::parse(std::string(Response.AsText()), JsonError);
if (JsonError.empty())
{
json11::Json::array Needs = Json["needs"].array_items();
@@ -598,37 +203,6 @@ CloudCacheSession::FinalizeRef(std::string_view Namespace, std::string_view Buck
}
}
}
- else
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::FinalizeRef failed PUT. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-X-Jupiter-IoHash: '{}', "
- "Header-ContentType: '{}', "
- "ContentSize: {}, "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- RefHash.ToHexString(),
- "application/x-ue-cb",
- NiceBytes(0),
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
-
return Result;
}
@@ -637,49 +211,9 @@ CloudCacheSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuff
{
ZEN_TRACE_CPU("JupiterClient::PutBlob");
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << Namespace << "/" << Key.ToHexString();
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Put(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), Blob);
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/octet-stream"}});
- Session.SetBody(cpr::Body{(const char*)Blob.Data(), Blob.Size()});
-
- cpr::Response Response = Session.Put();
- ZEN_DEBUG("PUT {}", Response);
-
- CloudCacheResult Result = detail::ConvertResponse(Response);
- if (!Result.Success)
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::PutBlob failed PUT. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-ContentType: '{}', "
- "ContentSize: {}, "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- "application/octet-stream",
- NiceBytes(Blob.Size()),
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
- return Result;
+ return detail::ConvertResponse(Response);
}
CloudCacheResult
@@ -687,66 +221,11 @@ CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& K
{
ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob");
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString();
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-comp"}});
-
- uint64_t Offset = 0;
- if (Blob.IsWholeFile())
- {
- auto ReadCallback = [&Blob, &Offset](char* buffer, size_t& size, intptr_t) {
- size = Min<size_t>(size, Blob.GetSize() - Offset);
- IoBuffer PayloadRange = IoBuffer(Blob, Offset, size);
- MutableMemoryView Data(buffer, size);
- Data.CopyFrom(PayloadRange.GetView());
- Offset += size;
- return true;
- };
- Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Blob.GetSize()), ReadCallback));
- }
- else
- {
- Session.SetBody(cpr::Body{(const char*)Blob.Data(), Blob.Size()});
- }
-
- cpr::Response Response = Session.Put();
- ZEN_DEBUG("PUT {}", Response);
+ Blob.SetContentType(ZenContentType::kCompressedBinary);
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), Blob);
- CloudCacheResult Result = detail::ConvertResponse(Response);
- if (!Result.Success)
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::PutCompressedBlob failed PUT. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-ContentType: '{}', "
- "ContentSize: {}, "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- "application/x-ue-comp",
- NiceBytes(Blob.Size()),
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
- return Result;
+ return detail::ConvertResponse(Response);
}
CloudCacheResult
@@ -754,58 +233,12 @@ CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& K
{
ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob");
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString();
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-comp"}});
- uint64_t SizeLeft = Payload.GetSize();
- CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0);
- auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) {
- size = Min<size_t>(size, SizeLeft);
- MutableMemoryView Data(buffer, size);
- Payload.CopyTo(Data, BufferIt);
- SizeLeft -= size;
- return true;
- };
- Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(SizeLeft), ReadCallback));
-
- cpr::Response Response = Session.Put();
- ZEN_DEBUG("PUT {}", Response);
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()),
+ Payload,
+ ZenContentType::kCompressedBinary);
- CloudCacheResult Result = detail::ConvertResponse(Response);
- if (!Result.Success)
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::PutCompressedBlob failed PUT. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-ContentType: '{}', "
- "ContentSize: {}, "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- "application/x-ue-comp",
- NiceBytes(Payload.GetSize()),
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
- return Result;
+ return detail::ConvertResponse(Response);
}
CloudCacheResult
@@ -813,49 +246,11 @@ CloudCacheSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBu
{
ZEN_TRACE_CPU("JupiterClient::PutObject");
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString();
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ Object.SetContentType(ZenContentType::kCbObject);
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), Object);
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}});
- Session.SetBody(cpr::Body{(const char*)Object.Data(), Object.Size()});
-
- cpr::Response Response = Session.Put();
- ZEN_DEBUG("PUT {}", Response);
-
- CloudCacheResult Result = detail::ConvertResponse(Response);
- if (!Result.Success)
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::PutObject failed PUT. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-ContentType: '{}', "
- "ContentSize: {}, "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- "application/x-ue-cb",
- NiceBytes(Object.GetSize()),
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
- return Result;
+ return detail::ConvertResponse(Response);
}
CloudCacheResult
@@ -863,45 +258,10 @@ CloudCacheSession::RefExists(std::string_view Namespace, std::string_view Bucket
{
ZEN_TRACE_CPU("JupiterClient::RefExists");
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString();
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()));
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}});
- Session.SetOption(cpr::Body{});
-
- cpr::Response Response = Session.Head();
- ZEN_DEBUG("HEAD {}", Response);
-
- CloudCacheResult Result = detail::ConvertResponse(Response);
- if (!Result.Success)
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::RefExists failed PUT. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
- return Result;
+ return detail::ConvertResponse(Response);
}
GetObjectReferencesResult
@@ -909,57 +269,20 @@ CloudCacheSession::GetObjectReferences(std::string_view Namespace, const IoHash&
{
ZEN_TRACE_CPU("JupiterClient::GetObjectReferences");
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString() << "/references";
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}});
- Session.SetOption(cpr::Body{});
-
- cpr::Response Response = Session.Get();
- ZEN_DEBUG("GET {}", Response);
+ HttpClient::Response Response =
+ m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/references", Namespace, Key.ToHexString()),
+ {HttpClient::Accept(ZenContentType::kCbObject)});
GetObjectReferencesResult Result = {detail::ConvertResponse(Response)};
if (Result.Success)
{
- IoBuffer Buffer = IoBuffer(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size());
- const CbObject ReferencesResponse = LoadCompactBinaryObject(Buffer);
+ const CbObject ReferencesResponse = Response.AsObject();
for (auto& Item : ReferencesResponse["references"sv])
{
Result.References.insert(Item.AsHash());
}
}
- else
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::GetObjectReferences failed PUT. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-Accept: '{}', "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- "application/x-ue-cb",
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
-
return Result;
}
@@ -1002,73 +325,23 @@ CloudCacheSession::ObjectExists(std::string_view Namespace, const std::set<IoHas
std::vector<IoHash>
CloudCacheSession::Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes)
{
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl();
- Uri << "/api/v1/s/" << Namespace;
+ // ExtendableStringBuilder<256> Uri;
+ // Uri << m_CacheClient->ServiceUrl();
+ // Uri << "/api/v1/s/" << Namespace;
- ZEN_UNUSED(BucketId, ChunkHashes);
+ ZEN_UNUSED(Namespace, BucketId, ChunkHashes);
return {};
}
-cpr::Session&
-CloudCacheSession::GetSession()
-{
- return m_SessionState->GetSession();
-}
-
-CloudCacheAccessToken
-CloudCacheSession::GetAccessToken(bool RefreshToken)
-{
- return m_SessionState->GetAccessToken(RefreshToken);
-}
-
CloudCacheResult
CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key)
{
ZEN_TRACE_CPU("JupiterClient::CacheTypeExists");
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << Namespace << "/" << Key.ToHexString();
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}});
- Session.SetOption(cpr::Body{});
-
- cpr::Response Response = Session.Head();
- ZEN_DEBUG("HEAD {}", Response);
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/{}/{}/{}", TypeId, Namespace, Key.ToHexString()));
- CloudCacheResult Result = detail::ConvertResponse(Response);
- if (!Result.Success)
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::CacheTypeExists failed GET. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-Accept: '{}', "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- "application/x-ue-cb",
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
- return Result;
+ return detail::ConvertResponse(Response);
}
CloudCacheExistsResult
@@ -1083,58 +356,23 @@ CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view
Body << (Body.Size() != 1 ? ",\"" : "\"") << Key.ToHexString() << "\"";
}
Body << "]";
+ IoBuffer Payload = IoBuffer(IoBuffer::Wrap, Body.Data(), Body.Size());
+ Payload.SetContentType(ZenContentType::kJSON);
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << Namespace << "/exist";
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ HttpClient::Response Response = m_CacheClient->m_HttpClient.Post(fmt::format("/api/v1/{}/{}/exist", TypeId, Namespace),
+ Payload,
+ {HttpClient::Accept(ZenContentType::kCbObject)});
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(
- cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}, {"Content-Type", "application/json"}});
- Session.SetOption(cpr::Body(Body.ToString()));
-
- cpr::Response Response = Session.Post();
- ZEN_DEBUG("POST {}", Response);
CloudCacheExistsResult Result = {detail::ConvertResponse(Response)};
if (Result.Success)
{
- IoBuffer Buffer = IoBuffer(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size());
- const CbObject ExistsResponse = LoadCompactBinaryObject(Buffer);
+ const CbObject ExistsResponse = Response.AsObject();
for (auto& Item : ExistsResponse["needs"sv])
{
Result.Needs.insert(Item.AsHash());
}
}
- else
- {
- Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response);
- ZEN_WARN(
- "CloudCacheSession::CacheTypeExists failed GET. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-Accept: '{}', "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- "application/x-ue-cb",
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
-
return Result;
}
@@ -1233,77 +471,39 @@ CloudCacheTokenProvider::CreateFromCallback(std::function<CloudCacheAccessToken(
return std::make_unique<CallbackTokenProvider>(std::move(Callback));
}
+static std::optional<std::function<HttpClientAccessToken()>>
+GetHttpClientAccessProvider(CloudCacheTokenProvider* TokenProvider)
+{
+ if (TokenProvider == nullptr)
+ {
+ return {};
+ }
+ auto ProviderFunc = [TokenProvider]() -> HttpClientAccessToken {
+ CloudCacheAccessToken Token = TokenProvider->AcquireAccessToken();
+ return HttpClientAccessToken{.Value = Token.Value, .ExpireTime = Token.ExpireTime};
+ };
+ return ProviderFunc;
+}
+
CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options, std::unique_ptr<CloudCacheTokenProvider> TokenProvider)
: m_Log(zen::logging::Get("jupiter"))
-, m_ServiceUrl(Options.ServiceUrl)
, m_DefaultDdcNamespace(Options.DdcNamespace)
, m_DefaultBlobStoreNamespace(Options.BlobStoreNamespace)
, m_ComputeCluster(Options.ComputeCluster)
-, m_ConnectTimeout(Options.ConnectTimeout)
-, m_Timeout(Options.Timeout)
, m_TokenProvider(std::move(TokenProvider))
-, m_AssumeHttp2(Options.AssumeHttp2)
+, m_HttpClient(Options.ServiceUrl,
+ HttpClientSettings{.ConnectTimeout = Options.ConnectTimeout,
+ .Timeout = Options.Timeout,
+ .AccessTokenProvider = GetHttpClientAccessProvider(m_TokenProvider.get()),
+ .AssumeHttp2 = Options.AssumeHttp2,
+ .AllowResume = Options.AllowResume,
+ .RetryCount = Options.RetryCount})
{
ZEN_ASSERT(m_TokenProvider.get() != nullptr);
}
CloudCacheClient::~CloudCacheClient()
{
- RwLock::ExclusiveLockScope _(m_SessionStateLock);
-
- for (auto State : m_SessionStateCache)
- {
- delete State;
- }
-}
-
-CloudCacheAccessToken
-CloudCacheClient::AcquireAccessToken()
-{
- ZEN_TRACE_CPU("JupiterClient::AcquireAccessToken");
-
- return m_TokenProvider->AcquireAccessToken();
-}
-
-detail::CloudCacheSessionState*
-CloudCacheClient::AllocSessionState()
-{
- detail::CloudCacheSessionState* State = nullptr;
-
- bool IsTokenValid = false;
-
- {
- RwLock::ExclusiveLockScope _(m_SessionStateLock);
-
- if (m_SessionStateCache.empty() == false)
- {
- State = m_SessionStateCache.front();
- IsTokenValid = State->m_AccessToken.IsValid();
-
- m_SessionStateCache.pop_front();
- }
- }
-
- if (State == nullptr)
- {
- State = new detail::CloudCacheSessionState(*this);
- }
-
- State->Reset(m_ConnectTimeout, m_Timeout, m_AssumeHttp2);
-
- if (IsTokenValid == false)
- {
- State->m_AccessToken = m_TokenProvider->AcquireAccessToken();
- }
-
- return State;
-}
-
-void
-CloudCacheClient::FreeSessionState(detail::CloudCacheSessionState* State)
-{
- RwLock::ExclusiveLockScope _(m_SessionStateLock);
- m_SessionStateCache.push_front(State);
}
} // namespace zen
diff --git a/src/zenserver/upstream/jupiter.h b/src/zenserver/upstream/jupiter.h
index b5aa95ed5..93f2cc883 100644
--- a/src/zenserver/upstream/jupiter.h
+++ b/src/zenserver/upstream/jupiter.h
@@ -6,6 +6,7 @@
#include <zencore/iohash.h>
#include <zencore/logging.h>
#include <zencore/thread.h>
+#include <zenhttp/httpclient.h>
#include <zenhttp/httpserver.h>
#include <atomic>
@@ -22,9 +23,6 @@ class Session;
}
namespace zen {
-namespace detail {
- struct CloudCacheSessionState;
-}
class CbObjectView;
class CloudCacheClient;
@@ -96,7 +94,11 @@ public:
~CloudCacheSession();
CloudCacheResult Authenticate();
- CloudCacheResult GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType);
+ CloudCacheResult GetRef(std::string_view Namespace,
+ std::string_view BucketId,
+ const IoHash& Key,
+ ZenContentType RefType,
+ std::filesystem::path TempFolderPath = {});
CloudCacheResult GetBlob(std::string_view Namespace, const IoHash& Key);
CloudCacheResult GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath = {});
CloudCacheResult GetObject(std::string_view Namespace, const IoHash& Key);
@@ -131,17 +133,14 @@ public:
CloudCacheClient& Client() { return *m_CacheClient; };
private:
- inline LoggerRef Log() { return m_Log; }
- cpr::Session& GetSession();
- CloudCacheAccessToken GetAccessToken(bool RefreshToken = false);
+ inline LoggerRef Log() { return m_Log; }
CloudCacheResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key);
CloudCacheExistsResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys);
- LoggerRef m_Log;
- RefPtr<CloudCacheClient> m_CacheClient;
- detail::CloudCacheSessionState* m_SessionState;
+ LoggerRef m_Log;
+ RefPtr<CloudCacheClient> m_CacheClient;
};
/**
@@ -178,6 +177,8 @@ struct CloudCacheClientOptions
std::chrono::milliseconds ConnectTimeout{5000};
std::chrono::milliseconds Timeout{};
bool AssumeHttp2 = false;
+ bool AllowResume = false;
+ uint8_t RetryCount = 0;
};
/**
@@ -189,30 +190,20 @@ public:
CloudCacheClient(const CloudCacheClientOptions& Options, std::unique_ptr<CloudCacheTokenProvider> TokenProvider);
~CloudCacheClient();
- CloudCacheAccessToken AcquireAccessToken();
- std::string_view DefaultDdcNamespace() const { return m_DefaultDdcNamespace; }
- std::string_view DefaultBlobStoreNamespace() const { return m_DefaultBlobStoreNamespace; }
- std::string_view ComputeCluster() const { return m_ComputeCluster; }
- std::string_view ServiceUrl() const { return m_ServiceUrl; }
+ std::string_view DefaultDdcNamespace() const { return m_DefaultDdcNamespace; }
+ std::string_view DefaultBlobStoreNamespace() const { return m_DefaultBlobStoreNamespace; }
+ std::string_view ComputeCluster() const { return m_ComputeCluster; }
+ std::string_view ServiceUrl() const { return m_HttpClient.GetBaseUri(); }
LoggerRef Logger() { return m_Log; }
private:
- LoggerRef m_Log;
- std::string m_ServiceUrl;
- std::string m_DefaultDdcNamespace;
- std::string m_DefaultBlobStoreNamespace;
- std::string m_ComputeCluster;
- std::chrono::milliseconds m_ConnectTimeout{};
- std::chrono::milliseconds m_Timeout{};
- std::unique_ptr<CloudCacheTokenProvider> m_TokenProvider;
- bool m_AssumeHttp2;
-
- RwLock m_SessionStateLock;
- std::list<detail::CloudCacheSessionState*> m_SessionStateCache;
-
- detail::CloudCacheSessionState* AllocSessionState();
- void FreeSessionState(detail::CloudCacheSessionState*);
+ LoggerRef m_Log;
+ const std::string m_DefaultDdcNamespace;
+ const std::string m_DefaultBlobStoreNamespace;
+ const std::string m_ComputeCluster;
+ const std::unique_ptr<CloudCacheTokenProvider> m_TokenProvider;
+ HttpClient m_HttpClient;
friend class CloudCacheSession;
};
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index 73a8ad538..5bcb7f5b4 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -1158,88 +1158,73 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState,
}
ZEN_ASSERT(OldBlockFile);
- ZEN_INFO("Moving {} chunks from '{}' to new block", KeepChunkIndexes.size(), GetBlockPath(m_BlocksBasePath, BlockIndex));
-
uint64_t OldBlockSize = OldBlockFile->FileSize();
- std::vector<uint8_t> Chunk;
- for (const size_t& ChunkIndex : KeepChunkIndexes)
+ if (KeepChunkIndexes.empty())
+ {
+ ZEN_INFO("Dropping all chunks from '{}'", GetBlockPath(m_BlocksBasePath, BlockIndex));
+ }
+ else
{
- const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex];
- if (ChunkLocation.Offset + ChunkLocation.Size > OldBlockSize)
+ std::vector<uint8_t> Chunk;
+ for (const size_t& ChunkIndex : KeepChunkIndexes)
{
- ZEN_WARN(
- "Compact Block skipping chunk outside of block range in '{}', Chunk start {}, Chunk size {} in Block {}, Block "
- "size {}",
- m_BlocksBasePath,
- ChunkLocation.Offset,
- ChunkLocation.Size,
- OldBlockFile->GetPath(),
- OldBlockSize);
- continue;
- }
+ const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex];
+ if (ChunkLocation.Offset + ChunkLocation.Size > OldBlockSize)
+ {
+ ZEN_WARN(
+ "Compact Block skipping chunk outside of block range in '{}', Chunk start {}, Chunk size {} in Block {}, Block "
+ "size {}",
+ m_BlocksBasePath,
+ ChunkLocation.Offset,
+ ChunkLocation.Size,
+ OldBlockFile->GetPath(),
+ OldBlockSize);
+ continue;
+ }
- Chunk.resize(ChunkLocation.Size);
- OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset);
+ Chunk.resize(ChunkLocation.Size);
+ OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset);
- if ((WriteOffset + Chunk.size()) > m_MaxBlockSize)
- {
- if (NewBlockFile)
+ if ((WriteOffset + Chunk.size()) > m_MaxBlockSize)
{
- ZEN_ASSERT_SLOW(NewBlockFile->IsOpen());
- NewBlockFile->Flush();
- MovedSize += NewBlockFile->FileSize();
- NewBlockFile = nullptr;
+ if (NewBlockFile)
+ {
+ ZEN_ASSERT_SLOW(NewBlockFile->IsOpen());
+ NewBlockFile->Flush();
+ MovedSize += NewBlockFile->FileSize();
+ NewBlockFile = nullptr;
- ZEN_ASSERT(!MovedChunks.empty() || RemovedSize > 0); // We should not have a new block if we haven't moved anything
+ ZEN_ASSERT(!MovedChunks.empty() || RemovedSize > 0); // We should not have a new block if we haven't moved anything
- if (!ReportChanges())
- {
- return false;
+ if (!ReportChanges())
+ {
+ return false;
+ }
}
- }
- uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed);
- {
- RwLock::ExclusiveLockScope InsertLock(m_InsertLock);
- std::filesystem::path NewBlockPath;
- NextBlockIndex = GetFreeBlockIndex(NextBlockIndex, InsertLock, NewBlockPath);
- if (NextBlockIndex == (uint32_t)m_MaxBlockCount)
+ uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed);
{
- ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded",
- m_BlocksBasePath,
- static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1);
- return false;
- }
-
- NewBlockFile = new BlockStoreFile(NewBlockPath);
- m_ChunkBlocks[NextBlockIndex] = NewBlockFile;
- }
+ RwLock::ExclusiveLockScope InsertLock(m_InsertLock);
+ std::filesystem::path NewBlockPath;
+ NextBlockIndex = GetFreeBlockIndex(NextBlockIndex, InsertLock, NewBlockPath);
+ if (NextBlockIndex == (uint32_t)m_MaxBlockCount)
+ {
+ ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded",
+ m_BlocksBasePath,
+ static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1);
+ return false;
+ }
- std::error_code Error;
- DiskSpace Space = DiskSpaceInfo(m_BlocksBasePath, Error);
- if (Error)
- {
- ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BlocksBasePath, Error.message());
- {
- RwLock::ExclusiveLockScope _l(m_InsertLock);
- ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile);
- m_ChunkBlocks.erase(NextBlockIndex);
+ NewBlockFile = new BlockStoreFile(NewBlockPath);
+ m_ChunkBlocks[NextBlockIndex] = NewBlockFile;
}
- ZEN_ASSERT_SLOW(!NewBlockFile->IsOpen());
- NewBlockFile = nullptr;
- return false;
- }
- if (Space.Free < m_MaxBlockSize)
- {
- uint64_t ReclaimedSpace = DiskReserveCallback();
- if (Space.Free + ReclaimedSpace < m_MaxBlockSize)
+ std::error_code Error;
+ DiskSpace Space = DiskSpaceInfo(m_BlocksBasePath, Error);
+ if (Error)
{
- ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}",
- m_BlocksBasePath,
- m_MaxBlockSize,
- NiceBytes(Space.Free + ReclaimedSpace));
+ ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BlocksBasePath, Error.message());
{
RwLock::ExclusiveLockScope _l(m_InsertLock);
ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile);
@@ -1250,23 +1235,42 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState,
return false;
}
- ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}",
- m_BlocksBasePath,
- ReclaimedSpace,
- NiceBytes(Space.Free + ReclaimedSpace));
+ if (Space.Free < m_MaxBlockSize)
+ {
+ uint64_t ReclaimedSpace = DiskReserveCallback();
+ if (Space.Free + ReclaimedSpace < m_MaxBlockSize)
+ {
+ ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}",
+ m_BlocksBasePath,
+ m_MaxBlockSize,
+ NiceBytes(Space.Free + ReclaimedSpace));
+ {
+ RwLock::ExclusiveLockScope _l(m_InsertLock);
+ ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile);
+ m_ChunkBlocks.erase(NextBlockIndex);
+ }
+ ZEN_ASSERT_SLOW(!NewBlockFile->IsOpen());
+ NewBlockFile = nullptr;
+ return false;
+ }
+
+ ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}",
+ m_BlocksBasePath,
+ ReclaimedSpace,
+ NiceBytes(Space.Free + ReclaimedSpace));
+ }
+ NewBlockFile->Create(m_MaxBlockSize);
+ NewBlockIndex = NextBlockIndex;
+ WriteOffset = 0;
}
- NewBlockFile->Create(m_MaxBlockSize);
- NewBlockIndex = NextBlockIndex;
- WriteOffset = 0;
- }
- NewBlockFile->Write(Chunk.data(), ChunkLocation.Size, WriteOffset);
- MovedChunks.push_back(
- {ChunkIndex, {.BlockIndex = NewBlockIndex, .Offset = gsl::narrow<uint32_t>(WriteOffset), .Size = ChunkLocation.Size}});
- WriteOffset = RoundUp(WriteOffset + ChunkLocation.Size, PayloadAlignment);
- AddedSize += Chunk.size();
+ NewBlockFile->Write(Chunk.data(), ChunkLocation.Size, WriteOffset);
+ MovedChunks.push_back(
+ {ChunkIndex, {.BlockIndex = NewBlockIndex, .Offset = gsl::narrow<uint32_t>(WriteOffset), .Size = ChunkLocation.Size}});
+ WriteOffset = RoundUp(WriteOffset + ChunkLocation.Size, PayloadAlignment);
+ AddedSize += Chunk.size();
+ }
}
- Chunk.clear();
if (!ReportChanges())
{
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index f18509758..7839c7132 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -1436,6 +1436,8 @@ public:
NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
});
+ ZEN_INFO("GCV2: filecas [COMPACT] '{}': Removing {} files", m_FileCasStrategy.m_RootDirectory, m_ReferencesToClean.size());
+
size_t Skipped = 0;
for (const IoHash& ChunkHash : m_ReferencesToClean)
{