diff options
| author | Dan Engelbrecht <[email protected]> | 2024-01-24 11:41:18 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-01-24 11:41:18 +0100 |
| commit | 0e63573fbe9973f6b922656a785817a711581b78 (patch) | |
| tree | 48e18f0b4aea958a536ba50f72f589a580c4b798 /src | |
| parent | oplog import/export improvements (#634) (diff) | |
| download | zen-0e63573fbe9973f6b922656a785817a711581b78.tar.xz zen-0e63573fbe9973f6b922656a785817a711581b78.zip | |
Add retry with optional resume logic to HttpClient::Download (#639)
- Improvement: Refactored Jupiter upstream to use HttpClient
- Improvement: Added retry and resume logic to HttpClient
- Improvement: Added authentication support to HttpClient
- Improvement: Clearer logging in GCV2 compact of FileCas/BlockStore
- Improvement: Size details in oplog import logging
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenhttp/httpclient.cpp | 530 | ||||
| -rw-r--r-- | src/zenhttp/include/zenhttp/httpclient.h | 35 | ||||
| -rw-r--r-- | src/zenserver/projectstore/jupiterremoteprojectstore.cpp | 82 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 12 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 58 | ||||
| -rw-r--r-- | src/zenserver/upstream/jupiter.cpp | 1012 | ||||
| -rw-r--r-- | src/zenserver/upstream/jupiter.h | 51 | ||||
| -rw-r--r-- | src/zenstore/blockstore.cpp | 164 | ||||
| -rw-r--r-- | src/zenstore/filecas.cpp | 2 |
9 files changed, 688 insertions, 1258 deletions
diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp index a29a08a3c..3b2a3baec 100644 --- a/src/zenhttp/httpclient.cpp +++ b/src/zenhttp/httpclient.cpp @@ -13,6 +13,7 @@ #include <zencore/session.h> #include <zencore/sharedbuffer.h> #include <zencore/stream.h> +#include <zencore/string.h> #include <zencore/testing.h> #include <zencore/trace.h> #include <zenhttp/formatters.h> @@ -38,19 +39,19 @@ using namespace std::literals; // // CPR helpers -cpr::Body +static cpr::Body AsCprBody(const CbObject& Obj) { return cpr::Body((const char*)Obj.GetBuffer().GetData(), Obj.GetBuffer().GetSize()); } -cpr::Body +static cpr::Body AsCprBody(const IoBuffer& Obj) { return cpr::Body((const char*)Obj.GetData(), Obj.GetSize()); } -cpr::Body +static cpr::Body AsCprBody(const CompositeBuffer& Buffers) { SharedBuffer Buffer = Buffers.Flatten(); @@ -62,7 +63,7 @@ AsCprBody(const CompositeBuffer& Buffers) ////////////////////////////////////////////////////////////////////////// -HttpClient::Response +static HttpClient::Response ResponseWithPayload(cpr::Response& HttpResponse, const HttpResponseCode WorkResponseCode, IoBuffer&& Payload) { // This ends up doing a memcpy, would be good to get rid of it by streaming results @@ -89,7 +90,7 @@ ResponseWithPayload(cpr::Response& HttpResponse, const HttpResponseCode WorkResp .ElapsedSeconds = HttpResponse.elapsed}; } -HttpClient::Response +static HttpClient::Response CommonResponse(cpr::Response&& HttpResponse, IoBuffer&& Payload = {}) { const HttpResponseCode WorkResponseCode = HttpResponseCode(HttpResponse.status_code); @@ -123,6 +124,51 @@ CommonResponse(cpr::Response&& HttpResponse, IoBuffer&& Payload = {}) } } +static bool +ShouldRetry(const cpr::Response& Response) +{ + switch (Response.error.code) + { + case cpr::ErrorCode::OK: + break; + case cpr::ErrorCode::OPERATION_TIMEDOUT: + case cpr::ErrorCode::NETWORK_RECEIVE_ERROR: + case cpr::ErrorCode::NETWORK_SEND_FAILURE: + return true; + default: + return false; + } + switch ((HttpResponseCode)Response.status_code) + { + case HttpResponseCode::GatewayTimeout: + case HttpResponseCode::RequestTimeout: + return true; + default: + return false; + } +}; + +static cpr::Response +DoWithRetry(std::function<cpr::Response()>&& Func, uint8_t RetryCount) +{ + uint8_t Attempt = 0; + cpr::Response Result = Func(); + while (Attempt < RetryCount && ShouldRetry(Result)) + { + Sleep(100 * (Attempt + 1)); + Attempt++; + ZEN_INFO("{} Attempt {}/{}", CommonResponse(std::move(Result)).ErrorMessage("Retry"), Attempt, RetryCount + 1); + Result = Func(); + } + return Result; +} + +static std::pair<std::string, std::string> +HeaderContentType(ZenContentType ContentType) +{ + return std::make_pair("Content-Type", std::string(MapContentTypeToString(ContentType))); +} + ////////////////////////////////////////////////////////////////////////// struct HttpClient::Impl : public RefCounted @@ -144,9 +190,13 @@ struct HttpClient::Impl : public RefCounted ZEN_TRACE("GET {}", Result); return Result; } - inline cpr::Response Download(cpr::WriteCallback&& write) + inline cpr::Response Download(cpr::WriteCallback&& Write, std::optional<cpr::HeaderCallback>&& Header = {}) { - cpr::Response Result = CprSession->Download(write); + if (Header) + { + CprSession->SetHeaderCallback(std::move(Header.value())); + } + cpr::Response Result = CprSession->Download(Write); ZEN_TRACE("GET {}", Result); return Result; } @@ -185,11 +235,13 @@ struct HttpClient::Impl : public RefCounted Session& operator=(Session&&) = delete; }; - Session AllocSession(const std::string_view BaseUrl, - const std::string_view Url, - const HttpClientSettings& ConnectionSettings, - const KeyValueMap& AdditionalHeader, - const KeyValueMap& Parameters); + Session AllocSession(const std::string_view BaseUrl, + const std::string_view Url, + const HttpClientSettings& ConnectionSettings, + const KeyValueMap& AdditionalHeader, + const KeyValueMap& Parameters, + const std::string_view SessionId, + std::optional<HttpClientAccessToken> AccessToken); LoggerRef Logger() { return m_Log; } @@ -217,29 +269,26 @@ HttpClient::Impl::~Impl() } HttpClient::Impl::Session -HttpClient::Impl::AllocSession(const std::string_view BaseUrl, - const std::string_view ResourcePath, - const HttpClientSettings& ConnectionSettings, - const KeyValueMap& AdditionalHeader, - const KeyValueMap& Parameters) +HttpClient::Impl::AllocSession(const std::string_view BaseUrl, + const std::string_view ResourcePath, + const HttpClientSettings& ConnectionSettings, + const KeyValueMap& AdditionalHeader, + const KeyValueMap& Parameters, + const std::string_view SessionId, + std::optional<HttpClientAccessToken> AccessToken) { - bool IsNew = false; cpr::Session* CprSession = nullptr; m_SessionLock.WithExclusiveLock([&] { - if (m_Sessions.empty()) - { - CprSession = new cpr::Session(); - IsNew = true; - } - else + if (!m_Sessions.empty()) { CprSession = m_Sessions.back(); m_Sessions.pop_back(); } }); - if (IsNew) + if (CprSession == nullptr) { + CprSession = new cpr::Session(); CprSession->SetConnectTimeout(ConnectionSettings.ConnectTimeout); CprSession->SetTimeout(ConnectionSettings.Timeout); if (ConnectionSettings.AssumeHttp2) @@ -252,9 +301,13 @@ HttpClient::Impl::AllocSession(const std::string_view BaseUrl, { CprSession->SetHeader(cpr::Header(AdditionalHeader->begin(), AdditionalHeader->end())); } - else + if (!SessionId.empty()) { - CprSession->SetHeader({}); + CprSession->UpdateHeader({{"UE-Session", std::string(SessionId)}}); + } + if (AccessToken) + { + CprSession->UpdateHeader({{"Authorization", AccessToken->Value}}); } if (!Parameters->empty()) { @@ -430,6 +483,9 @@ public: return Buffer; } + uint64_t GetSize() const { return m_WriteOffset; } + void ResetWritePos(uint64_t WriteOffset) { m_WriteOffset = WriteOffset; } + private: void* m_FileHandle; std::uint64_t m_WriteOffset; @@ -452,12 +508,46 @@ HttpClient::~HttpClient() { } +bool +HttpClient::Authenticate() +{ + std::optional<HttpClientAccessToken> Token = GetAccessToken(); + if (!Token) + { + return false; + } + return Token->IsValid(); +} + +const std::optional<HttpClientAccessToken> +HttpClient::GetAccessToken() +{ + if (!m_ConnectionSettings.AccessTokenProvider.has_value()) + { + return {}; + } + { + RwLock::SharedLockScope _(m_AccessTokenLock); + if (m_CachedAccessToken.IsValid()) + { + return m_CachedAccessToken; + } + } + RwLock::ExclusiveLockScope _(m_AccessTokenLock); + if (m_CachedAccessToken.IsValid()) + { + return m_CachedAccessToken; + } + m_CachedAccessToken = m_ConnectionSettings.AccessTokenProvider.value()(); + return m_CachedAccessToken; +} + HttpClient::Response HttpClient::TransactPackage(std::string_view Url, CbPackage Package, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("HttpClient::TransactPackage"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); + Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); // First, list of offered chunks for filtering on the server end @@ -482,7 +572,7 @@ HttpClient::TransactPackage(std::string_view Url, CbPackage Package, const KeyVa BinaryWriter MemWriter; Writer.Save(MemWriter); - Sess->UpdateHeader({{"Content-Type", "application/x-ue-offer"}, {"UE-Session", m_SessionId}, {"UE-Request", RequestIdString}}); + Sess->UpdateHeader({HeaderContentType(HttpContentType::kCbPackageOffer), {"UE-Request", RequestIdString}}); Sess->SetBody(cpr::Body{(const char*)MemWriter.Data(), MemWriter.Size()}); cpr::Response FilterResponse = Sess.Post(); @@ -524,7 +614,7 @@ HttpClient::TransactPackage(std::string_view Url, CbPackage Package, const KeyVa CompositeBuffer Message = FormatPackageMessageBuffer(SendPackage); SharedBuffer FlatMessage = Message.Flatten(); - Sess->UpdateHeader({{"Content-Type", "application/x-ue-cbpkg"}, {"UE-Session", m_SessionId}, {"UE-Request", RequestIdString}}); + Sess->UpdateHeader({HeaderContentType(HttpContentType::kCbPackage), {"UE-Request", RequestIdString}}); Sess->SetBody(cpr::Body{(const char*)FlatMessage.GetData(), FlatMessage.GetSize()}); cpr::Response FilterResponse = Sess.Post(); @@ -556,20 +646,28 @@ HttpClient::Put(std::string_view Url, const IoBuffer& Payload, const KeyValueMap { ZEN_TRACE_CPU("HttpClient::Put"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); - Sess->SetBody(AsCprBody(Payload)); - Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(Payload.GetContentType()))}}); - - return CommonResponse(Sess.Put()); + return CommonResponse(DoWithRetry( + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + Sess->SetBody(AsCprBody(Payload)); + Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())}); + return Sess.Put(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response HttpClient::Get(std::string_view Url, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters) { ZEN_TRACE_CPU("HttpClient::Get"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters); - - return CommonResponse(Sess.Get()); + return CommonResponse(DoWithRetry( + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters, m_SessionId, GetAccessToken()); + return Sess.Get(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -577,9 +675,13 @@ HttpClient::Head(std::string_view Url, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("HttpClient::Head"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); - - return CommonResponse(Sess.Head()); + return CommonResponse(DoWithRetry( + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + return Sess.Head(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -587,9 +689,13 @@ HttpClient::Delete(std::string_view Url, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("HttpClient::Delete"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); - - return CommonResponse(Sess.Delete()); + return CommonResponse(DoWithRetry( + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + return Sess.Delete(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -597,9 +703,13 @@ HttpClient::Post(std::string_view Url, const KeyValueMap& AdditionalHeader, cons { ZEN_TRACE_CPU("HttpClient::PostNoPayload"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters); - - return CommonResponse(Sess.Post()); + return CommonResponse(DoWithRetry( + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters, m_SessionId, GetAccessToken()); + return Sess.Post(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -607,12 +717,16 @@ HttpClient::Post(std::string_view Url, const IoBuffer& Payload, const KeyValueMa { ZEN_TRACE_CPU("HttpClient::PostWithPayload"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); + return CommonResponse(DoWithRetry( + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); - Sess->SetBody(AsCprBody(Payload)); - Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(Payload.GetContentType()))}}); - - return CommonResponse(Sess.Post()); + Sess->SetBody(AsCprBody(Payload)); + Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())}); + return Sess.Post(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -620,12 +734,16 @@ HttpClient::Post(std::string_view Url, CbObject Payload, const KeyValueMap& Addi { ZEN_TRACE_CPU("HttpClient::PostObjectPayload"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); - - Sess->SetBody(AsCprBody(Payload)); - Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(ZenContentType::kCbObject))}}); + return CommonResponse(DoWithRetry( + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); - return CommonResponse(Sess.Post()); + Sess->SetBody(AsCprBody(Payload)); + Sess->UpdateHeader({HeaderContentType(ZenContentType::kCbObject)}); + return Sess.Post(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -633,13 +751,17 @@ HttpClient::Post(std::string_view Url, CbPackage Pkg, const KeyValueMap& Additio { ZEN_TRACE_CPU("HttpClient::PostPackage"); - CompositeBuffer Message = zen::FormatPackageMessageBuffer(Pkg); - - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); - Sess->SetBody(AsCprBody(Message)); - Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(ZenContentType::kCbPackage))}}); - - return CommonResponse(Sess.Post()); + return CommonResponse(DoWithRetry( + [&]() { + CompositeBuffer Message = zen::FormatPackageMessageBuffer(Pkg); + + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + Sess->SetBody(AsCprBody(Message)); + Sess->UpdateHeader({HeaderContentType(ZenContentType::kCbPackage)}); + return Sess.Post(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -647,27 +769,32 @@ HttpClient::Upload(std::string_view Url, const IoBuffer& Payload, const KeyValue { ZEN_TRACE_CPU("HttpClient::Upload"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); - Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(Payload.GetContentType()))}}); + return CommonResponse(DoWithRetry( + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())}); - uint64_t Offset = 0; - if (Payload.IsWholeFile()) - { - auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, Payload.GetSize() - Offset); - IoBuffer PayloadRange = IoBuffer(Payload, Offset, size); - MutableMemoryView Data(buffer, size); - Data.CopyFrom(PayloadRange.GetView()); - Offset += size; - return true; - }; - Sess->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); - } - else - { - Sess->SetBody(AsCprBody(Payload)); - } - return CommonResponse(Sess.Put()); + uint64_t Offset = 0; + if (Payload.IsWholeFile()) + { + auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) { + size = Min<size_t>(size, Payload.GetSize() - Offset); + IoBuffer PayloadRange = IoBuffer(Payload, Offset, size); + MutableMemoryView Data(buffer, size); + Data.CopyFrom(PayloadRange.GetView()); + Offset += size; + return true; + }; + Sess->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); + } + else + { + Sess->SetBody(AsCprBody(Payload)); + } + return Sess.Put(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -675,21 +802,26 @@ HttpClient::Upload(std::string_view Url, const CompositeBuffer& Payload, ZenCont { ZEN_TRACE_CPU("HttpClient::Upload"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); - Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(ContentType))}}); - - uint64_t SizeLeft = Payload.GetSize(); - CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); - auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, SizeLeft); - MutableMemoryView Data(buffer, size); - Payload.CopyTo(Data, BufferIt); - SizeLeft -= size; - return true; - }; - Sess->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); - - return CommonResponse(Sess.Put()); + return CommonResponse(DoWithRetry( + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + Sess->UpdateHeader({HeaderContentType(ContentType)}); + + uint64_t SizeLeft = Payload.GetSize(); + CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); + auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { + size = Min<size_t>(size, SizeLeft); + MutableMemoryView Data(buffer, size); + Payload.CopyTo(Data, BufferIt); + SizeLeft -= size; + return true; + }; + Sess->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); + + return Sess.Put(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -697,46 +829,170 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold { ZEN_TRACE_CPU("HttpClient::Download"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); - std::string PayloadString; std::unique_ptr<TempPayloadFile> PayloadFile; - - cpr::Response Response = Sess.Download(cpr::WriteCallback{[&](std::string data, intptr_t) { - if (!PayloadFile && (PayloadString.length() + data.length()) > (1024 * 1024)) - { - PayloadFile = std::make_unique<TempPayloadFile>(); - std::error_code Ec = PayloadFile->Open(TempFolderPath); - if (Ec) - { - ZEN_WARN("Failed to create temp file in '{}' for HttpClient::Download. Reason: {}", TempFolderPath.string(), Ec.message()); - return false; - } - PayloadFile->Write(PayloadString); - PayloadString.clear(); - } - if (PayloadFile) - { - std::error_code Ec = PayloadFile->Write(data); - if (Ec) - { - ZEN_WARN("Failed to write to temp file in '{}' for HttpClient::Download. Reason: {}", - TempFolderPath.string(), - Ec.message()); - return false; - } - } - else - { - PayloadString.append(data); - } - return true; - }}); - - if (!PayloadString.empty()) - { - Response.text = std::move(PayloadString); - } + cpr::Response Response = DoWithRetry( + [&]() { + auto DownloadCallback = [&](std::string data, intptr_t) { + if (!PayloadFile && (PayloadString.length() + data.length()) > (1024 * 1024)) + { + PayloadFile = std::make_unique<TempPayloadFile>(); + std::error_code Ec = PayloadFile->Open(TempFolderPath); + if (Ec) + { + ZEN_WARN("Failed to create temp file in '{}' for HttpClient::Download. Reason: {}", + TempFolderPath.string(), + Ec.message()); + return false; + } + PayloadFile->Write(PayloadString); + PayloadString.clear(); + } + if (PayloadFile) + { + std::error_code Ec = PayloadFile->Write(data); + if (Ec) + { + ZEN_WARN("Failed to write to temp file in '{}' for HttpClient::Download. Reason: {}", + TempFolderPath.string(), + Ec.message()); + return false; + } + } + else + { + PayloadString.append(data); + } + return true; + }; + + cpr::Response Response; + { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + Response = Sess.Download(cpr::WriteCallback{DownloadCallback}); + } + + if (m_ConnectionSettings.AllowResume) + { + auto SupportsRanges = [](const cpr::Response& Response) -> bool { + if (Response.header.find("Content-Range") != Response.header.end()) + { + return true; + } + if (auto It = Response.header.find("Accept-Ranges"); It != Response.header.end()) + { + return It->second == "bytes"; + } + return false; + }; + + auto ShouldResume = [&SupportsRanges](const cpr::Response& Response) -> bool { + if (ShouldRetry(Response)) + { + return SupportsRanges(Response); + } + return false; + }; + + if (ShouldResume(Response)) + { + auto It = Response.header.find("Content-Length"); + if (It != Response.header.end()) + { + std::optional<int64_t> ContentLength = ParseInt<int64_t>(It->second); + if (ContentLength) + { + auto HeaderCallback = [&](std::string header, intptr_t) { + size_t DelimiterPos = header.find(':'); + if (DelimiterPos != std::string::npos) + { + std::string Key = header.substr(0, DelimiterPos); + constexpr AsciiSet WhitespaceCharacters(" \v\f\t\r\n"); + Key = AsciiSet::TrimSuffixWith(Key, WhitespaceCharacters); + Key = AsciiSet::TrimPrefixWith(Key, WhitespaceCharacters); + + std::string Value = header.substr(DelimiterPos + 1); + Value = AsciiSet::TrimSuffixWith(Value, WhitespaceCharacters); + Value = AsciiSet::TrimPrefixWith(Value, WhitespaceCharacters); + + Response.header.insert_or_assign(Key, Value); + + if (Key == "Content-Range"sv) + { + if (Value.starts_with("bytes ")) + { + size_t RangeStartEnd = Value.find('-', 6); + if (RangeStartEnd != std::string::npos) + { + const auto Start = ParseInt<uint64_t>(Value.substr(6, RangeStartEnd - 6)); + if (Start) + { + uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length(); + if (Start.value() == DownloadedSize) + { + return 1; + } + else if (Start.value() > DownloadedSize) + { + return 0; + } + if (PayloadFile) + { + PayloadFile->ResetWritePos(Start.value()); + } + else + { + PayloadString = PayloadString.substr(0, Start.value()); + } + return 1; + } + } + } + return 0; + } + } + return 1; + }; + + KeyValueMap HeadersWithRange(AdditionalHeader); + do + { + uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length(); + + std::string Range = fmt::format("{}-{}", DownloadedSize, ContentLength.value()); + if (auto RangeIt = HeadersWithRange.Entries.find("Range"); RangeIt != HeadersWithRange.Entries.end()) + { + if (RangeIt->second == Range) + { + // If we didn't make any progress, abort + break; + } + } + HeadersWithRange.Entries.insert_or_assign("Range", Range); + + Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, + Url, + m_ConnectionSettings, + HeadersWithRange, + {}, + m_SessionId, + GetAccessToken()); + Response.header.clear(); + Response = Sess.Download(cpr::WriteCallback{DownloadCallback}, cpr::HeaderCallback{HeaderCallback}); + } while (ShouldResume(Response)); + } + } + } + } + + if (!PayloadString.empty()) + { + Response.text = std::move(PayloadString); + } + return Response; + }, + m_ConnectionSettings.RetryCount); return CommonResponse(std::move(Response), PayloadFile ? PayloadFile->DetachToIoBuffer() : IoBuffer{}); } diff --git a/src/zenhttp/include/zenhttp/httpclient.h b/src/zenhttp/include/zenhttp/httpclient.h index 9de5c7cce..f3559f214 100644 --- a/src/zenhttp/include/zenhttp/httpclient.h +++ b/src/zenhttp/include/zenhttp/httpclient.h @@ -6,9 +6,11 @@ #include <zencore/iobuffer.h> #include <zencore/logbase.h> +#include <zencore/thread.h> #include <zencore/uid.h> #include <zenhttp/httpcommon.h> +#include <functional> #include <optional> #include <unordered_map> @@ -27,12 +29,32 @@ class CompositeBuffer; */ +struct HttpClientAccessToken +{ + using Clock = std::chrono::system_clock; + using TimePoint = Clock::time_point; + + static constexpr int64_t ExpireMarginInSeconds = 30; + + std::string Value; + TimePoint ExpireTime; + + bool IsValid() const + { + return Value.empty() == false && + ExpireMarginInSeconds < std::chrono::duration_cast<std::chrono::seconds>(ExpireTime - Clock::now()).count(); + } +}; + struct HttpClientSettings { - std::string LogCategory = "httpclient"; - std::chrono::milliseconds ConnectTimeout{3000}; - std::chrono::milliseconds Timeout{}; - bool AssumeHttp2 = false; + std::string LogCategory = "httpclient"; + std::chrono::milliseconds ConnectTimeout{3000}; + std::chrono::milliseconds Timeout{}; + std::optional<std::function<HttpClientAccessToken()>> AccessTokenProvider; + bool AssumeHttp2 = false; + bool AllowResume = false; + uint8_t RetryCount = 0; }; class HttpClient @@ -134,6 +156,7 @@ public: const CompositeBuffer& Payload, ZenContentType ContentType, const KeyValueMap& AdditionalHeader = {}); + [[nodiscard]] Response Download(std::string_view Url, const std::filesystem::path& TempFolderPath, const KeyValueMap& AdditionalHeader = {}); @@ -147,14 +170,18 @@ public: LoggerRef Logger() { return m_Log; } std::string_view GetBaseUri() const { return m_BaseUri; } + bool Authenticate(); private: + const std::optional<HttpClientAccessToken> GetAccessToken(); struct Impl; LoggerRef m_Log; std::string m_BaseUri; std::string m_SessionId; const HttpClientSettings m_ConnectionSettings; + RwLock m_AccessTokenLock; + HttpClientAccessToken m_CachedAccessToken; Ref<Impl> m_Impl; }; diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp index 9d8f6c17b..c9f1f5f6f 100644 --- a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp +++ b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp @@ -54,19 +54,8 @@ public: virtual SaveResult SaveContainer(const IoBuffer& Payload) override { - const int32_t MaxAttempts = 3; - PutRefResult PutResult; - { - CloudCacheSession Session(m_CloudClient.Get()); - for (int32_t Attempt = 0; Attempt < MaxAttempts && !PutResult.Success; Attempt++) - { - PutResult = Session.PutRef(m_Namespace, m_Bucket, m_Key, Payload, ZenContentType::kCbObject); - if (!PutResult.Success) - { - Sleep(100 * (Attempt + 1)); - } - } - } + CloudCacheSession Session(m_CloudClient.Get()); + PutRefResult PutResult = Session.PutRef(m_Namespace, m_Bucket, m_Key, Payload, ZenContentType::kCbObject); SaveResult Result{ConvertResult(PutResult), {PutResult.Needs.begin(), PutResult.Needs.end()}, PutResult.RawHash}; if (Result.ErrorCode) @@ -83,19 +72,8 @@ public: virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override { - const int32_t MaxAttempts = 3; - CloudCacheResult PutResult; - { - CloudCacheSession Session(m_CloudClient.Get()); - for (int32_t Attempt = 0; Attempt < MaxAttempts && !PutResult.Success; Attempt++) - { - PutResult = Session.PutCompressedBlob(m_Namespace, RawHash, Payload); - if (!PutResult.Success) - { - Sleep(100 * (Attempt + 1)); - } - } - } + CloudCacheSession Session(m_CloudClient.Get()); + CloudCacheResult PutResult = Session.PutCompressedBlob(m_Namespace, RawHash, Payload); SaveAttachmentResult Result{ConvertResult(PutResult)}; if (Result.ErrorCode) @@ -126,20 +104,9 @@ public: virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) override { - const int32_t MaxAttempts = 3; - FinalizeRefResult FinalizeRefResult; - { - CloudCacheSession Session(m_CloudClient.Get()); - for (int32_t Attempt = 0; Attempt < MaxAttempts && !FinalizeRefResult.Success; Attempt++) - { - FinalizeRefResult = Session.FinalizeRef(m_Namespace, m_Bucket, m_Key, RawHash); - if (!FinalizeRefResult.Success) - { - Sleep(100 * (Attempt + 1)); - } - } - } - FinalizeResult Result{ConvertResult(FinalizeRefResult), {FinalizeRefResult.Needs.begin(), FinalizeRefResult.Needs.end()}}; + CloudCacheSession Session(m_CloudClient.Get()); + FinalizeRefResult FinalizeRefResult = Session.FinalizeRef(m_Namespace, m_Bucket, m_Key, RawHash); + FinalizeResult Result{ConvertResult(FinalizeRefResult), {FinalizeRefResult.Needs.begin(), FinalizeRefResult.Needs.end()}}; if (Result.ErrorCode) { Result.Reason = fmt::format("Failed finalizing oplog container to {}/{}/{}/{}. Reason: '{}'", @@ -165,19 +132,8 @@ public: virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override { - const int32_t MaxAttempts = 3; - CloudCacheResult GetResult; - { - CloudCacheSession Session(m_CloudClient.Get()); - for (int32_t Attempt = 0; Attempt < MaxAttempts && !GetResult.Success; Attempt++) - { - GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath); - if (!GetResult.Success) - { - Sleep(100 * (Attempt + 1)); - } - } - } + CloudCacheSession Session(m_CloudClient.Get()); + CloudCacheResult GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath); LoadAttachmentResult Result{ConvertResult(GetResult), std::move(GetResult.Response)}; if (GetResult.ErrorCode) { @@ -210,20 +166,8 @@ public: private: LoadContainerResult LoadContainer(const IoHash& Key) { - const int32_t MaxAttempts = 3; - CloudCacheResult GetResult; - { - CloudCacheSession Session(m_CloudClient.Get()); - for (int32_t Attempt = 0; Attempt < MaxAttempts && !GetResult.Success; Attempt++) - { - GetResult = Session.GetRef(m_Namespace, m_Bucket, Key, ZenContentType::kCbObject); - if (!GetResult.Success) - { - Sleep(100 * (Attempt + 1)); - } - } - } - + CloudCacheSession Session(m_CloudClient.Get()); + CloudCacheResult GetResult = Session.GetRef(m_Namespace, m_Bucket, Key, ZenContentType::kCbObject, m_TempFilePath); if (GetResult.ErrorCode || !GetResult.Success) { LoadContainerResult Result{ConvertResult(GetResult)}; @@ -312,7 +256,9 @@ CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, const std::fi .ServiceUrl = Url, .ConnectTimeout = std::chrono::milliseconds(2000), .Timeout = std::chrono::milliseconds(1800000), - .AssumeHttp2 = Options.AssumeHttp2}; + .AssumeHttp2 = Options.AssumeHttp2, + .AllowResume = true, + .RetryCount = 2}; // 1) Access token as parameter in request // 2) Environment variable (different win vs linux/mac) // 3) openid-provider (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 42af9b79b..f117a4203 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -3593,12 +3593,16 @@ ProjectStore::CreateReferenceCheckers(GcCtx& Ctx) Checkers.reserve(OpLogs.size()); for (const std::string& OpLogId : OpLogs) { - ProjectStore::Oplog* Oplog = Project->OpenOplog(OpLogId); - GcClock::TimePoint Now = GcClock::Now(); - bool TryPreCache = Project->LastOplogAccessTime(OpLogId) < (Now - std::chrono::minutes(5)); + ProjectStore::Oplog* Oplog = Project->OpenOplog(OpLogId); + if (Oplog == nullptr) + { + continue; + } + GcClock::TimePoint Now = GcClock::Now(); + bool TryPreCache = Project->LastOplogAccessTime(OpLogId) < (Now - std::chrono::minutes(5)); Checkers.emplace_back(new ProjectStoreReferenceChecker(*Oplog, TryPreCache)); + OplogCount++; } - OplogCount += OpLogs.size(); } } catch (std::exception&) diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index ddab7432d..83cec4725 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -560,7 +560,7 @@ BuildContainer(CidStore& ChunkStore, RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {}); ReportMessage(OptionalContext, - fmt::format("Failed to build container ({}): '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + fmt::format("Failed to build container ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); BlockCreateLatch.CountDown(); while (!BlockCreateLatch.Wait(1000)) @@ -893,11 +893,9 @@ UploadAttachments(WorkerThreadPool& WorkerPool, RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), "Invalid attachment", fmt::format("Upload requested of unknown attachment '{}'", Needed)); - ReportMessage(OptionalContext, - fmt::format("Failed to upload attachment '{}'. ({}): '{}'", - Needed, - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); + ReportMessage( + OptionalContext, + fmt::format("Failed to upload attachment '{}'. ({}): {}", Needed, RemoteResult.GetError(), RemoteResult.GetErrorReason())); return; } } @@ -969,7 +967,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool, RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), fmt::format("Failed to find attachment {}", RawHash), {}); - ZEN_WARN("Failed to save attachment '{}' ({}): '{}'", RawHash, RemoteResult.GetError(), RemoteResult.GetErrorReason()); + ZEN_WARN("Failed to save attachment '{}' ({}): {}", RawHash, RemoteResult.GetError(), RemoteResult.GetErrorReason()); return; } @@ -978,7 +976,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool, { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); ReportMessage(OptionalContext, - fmt::format("Failed to save attachment '{}', {} ({}): '{}'", + fmt::format("Failed to save attachment '{}', {} ({}): {}", RawHash, NiceBytes(Payload.GetSize()), RemoteResult.GetError(), @@ -1031,7 +1029,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool, { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); ReportMessage(OptionalContext, - fmt::format("Failed to save attachment '{}', {} ({}): '{}'", + fmt::format("Failed to save attachment '{}', {} ({}): {}", RawHash, NiceBytes(Payload.GetSize()), RemoteResult.GetError(), @@ -1108,7 +1106,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool, { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); ReportMessage(OptionalContext, - fmt::format("Failed to save attachments with {} chunks ({}): '{}'", + fmt::format("Failed to save attachments with {} chunks ({}): {}", Chunks.size(), RemoteResult.GetError(), RemoteResult.GetErrorReason())); @@ -1230,7 +1228,7 @@ SaveOplog(CidStore& ChunkStore, { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); ReportMessage(OptionalContext, - fmt::format("Failed to save attachment ({}): '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + fmt::format("Failed to save attachment ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); return; } ZEN_DEBUG("Saved block {}, {}", BlockHash, NiceBytes(CompressedBlock.GetCompressedSize())); @@ -1270,10 +1268,9 @@ SaveOplog(CidStore& ChunkStore, { if (BaseContainerResult.ErrorCode) { - ReportMessage(OptionalContext, - fmt::format("Failed to load oplog base container: '{}', error code: {}", - BaseContainerResult.Reason, - BaseContainerResult.ErrorCode)); + ReportMessage( + OptionalContext, + fmt::format("Failed to load oplog base container ({}): {}", BaseContainerResult.ErrorCode, BaseContainerResult.Reason)); } else { @@ -1337,7 +1334,7 @@ SaveOplog(CidStore& ChunkStore, { RemoteResult.SetError(ContainerSaveResult.ErrorCode, ContainerSaveResult.Reason, "Failed to save oplog container"); ReportMessage(OptionalContext, - fmt::format("Failed to save oplog container ({}): '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + fmt::format("Failed to save oplog container ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); } else { @@ -1372,7 +1369,7 @@ SaveOplog(CidStore& ChunkStore, { RemoteResult.SetError(ContainerFinalizeResult.ErrorCode, ContainerFinalizeResult.Reason, ContainerFinalizeResult.Text); ReportMessage(OptionalContext, - fmt::format("Failed to finalize oplog container {} ({}): '{}'", + fmt::format("Failed to finalize oplog container {} ({}): {}", ContainerSaveResult.RawHash, RemoteResult.GetError(), RemoteResult.GetErrorReason())); @@ -1635,7 +1632,7 @@ LoadOplog(CidStore& ChunkStore, if (Result.ErrorCode) { ReportMessage(OptionalContext, - fmt::format("Failed to load attachments with {} chunks ({}): '{}'", + fmt::format("Failed to load attachments with {} chunks ({}): {}", Chunks.size(), RemoteResult.GetError(), RemoteResult.GetErrorReason())); @@ -1691,7 +1688,7 @@ LoadOplog(CidStore& ChunkStore, if (BlockResult.ErrorCode) { ReportMessage(OptionalContext, - fmt::format("Failed to download block attachment {} ({}): '{}'", + fmt::format("Failed to download block attachment {} ({}): {}", BlockHash, RemoteResult.GetError(), RemoteResult.GetErrorReason())); @@ -1706,18 +1703,20 @@ LoadOplog(CidStore& ChunkStore, return; } Info.AttachmentBlocksDownloaded.fetch_add(1); - ZEN_INFO("Loaded block attachment '{}' in {}", + uint64_t BlockSize = BlockResult.Bytes.GetSize(); + ZEN_INFO("Loaded block attachment '{}' in {} ({})", BlockHash, - NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000))); + NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)), + NiceBytes(BlockSize)); if (RemoteResult.IsError()) { return; } + Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize); bool StoreChunksOK = IterateBlock(std::move(BlockResult.Bytes), [&ChunkStore, &Info](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { - uint64_t ChunkSize = Chunk.GetCompressedSize(); - Info.AttachmentBlockBytesDownloaded.fetch_add(ChunkSize); + uint64_t ChunkSize = Chunk.GetCompressedSize(); CidStore::InsertResult InsertResult = ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash); if (InsertResult.New) @@ -1730,7 +1729,7 @@ LoadOplog(CidStore& ChunkStore, if (!StoreChunksOK) { ReportMessage(OptionalContext, - fmt::format("Block attachment {} has invalid format ({}): '{}'", + fmt::format("Block attachment {} has invalid format ({}): {}", BlockHash, RemoteResult.GetError(), RemoteResult.GetErrorReason())); @@ -1788,20 +1787,21 @@ LoadOplog(CidStore& ChunkStore, } return; } - ZEN_INFO("Loaded large attachment '{}' in {}", + uint64_t AttachmentSize = AttachmentResult.Bytes.GetSize(); + ZEN_INFO("Loaded large attachment '{}' in {} ({})", RawHash, - NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000))); + NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)), + NiceBytes(AttachmentSize)); Info.AttachmentsDownloaded.fetch_add(1); if (RemoteResult.IsError()) { return; } - uint64_t ChunkSize = AttachmentResult.Bytes.GetSize(); - Info.AttachmentBytesDownloaded.fetch_add(ChunkSize); + Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize); CidStore::InsertResult InsertResult = ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash); if (InsertResult.New) { - Info.AttachmentBytesStored.fetch_add(ChunkSize); + Info.AttachmentBytesStored.fetch_add(AttachmentSize); Info.AttachmentsStored.fetch_add(1); } }); diff --git a/src/zenserver/upstream/jupiter.cpp b/src/zenserver/upstream/jupiter.cpp index e4d45e316..bf2538908 100644 --- a/src/zenserver/upstream/jupiter.cpp +++ b/src/zenserver/upstream/jupiter.cpp @@ -16,7 +16,6 @@ #include <zenutil/basicfile.h> ZEN_THIRD_PARTY_INCLUDES_START -#include <cpr/cpr.h> #include <fmt/format.h> ZEN_THIRD_PARTY_INCLUDES_END @@ -32,277 +31,70 @@ using namespace std::literals; namespace zen { namespace detail { - struct CloudCacheSessionState + CloudCacheResult ConvertResponse(const HttpClient::Response& Response, const std::string_view ErrorPrefix = ""sv) { - CloudCacheSessionState(CloudCacheClient& Client) : m_Client(Client) {} - - const CloudCacheAccessToken& GetAccessToken(bool RefreshToken) - { - if (RefreshToken) - { - m_AccessToken = m_Client.AcquireAccessToken(); - } - - return m_AccessToken; - } - - cpr::Session& GetSession() { return m_Session; } - - void Reset(std::chrono::milliseconds ConnectTimeout, std::chrono::milliseconds Timeout, bool AssumeHttp2) + if (Response.Error) { - m_Session.SetBody({}); - m_Session.SetHeader({}); - m_Session.SetConnectTimeout(ConnectTimeout); - m_Session.SetTimeout(Timeout); - if (AssumeHttp2) - { - m_Session.SetHttpVersion(cpr::HttpVersion{cpr::HttpVersionCode::VERSION_2_0_PRIOR_KNOWLEDGE}); - } - } - - private: - friend class zen::CloudCacheClient; - - CloudCacheClient& m_Client; - CloudCacheAccessToken m_AccessToken; - cpr::Session m_Session; - }; - - CloudCacheResult ConvertResponse(const cpr::Response& Response) - { - if (Response.error) - { - return {.ElapsedSeconds = Response.elapsed, - .ErrorCode = static_cast<int32_t>(Response.error.code), - .Reason = Response.error.message, + return {.ElapsedSeconds = Response.ElapsedSeconds, + .ErrorCode = Response.Error.value().ErrorCode, + .Reason = Response.ErrorMessage(ErrorPrefix), .Success = false}; } - if (!IsHttpSuccessCode(Response.status_code)) + if (!Response.IsSuccess()) { - return {.ElapsedSeconds = Response.elapsed, - .ErrorCode = static_cast<int32_t>(Response.status_code), - .Reason = Response.reason.empty() ? Response.text : Response.reason, + return {.ElapsedSeconds = Response.ElapsedSeconds, + .ErrorCode = static_cast<int32_t>(Response.StatusCode), + .Reason = Response.ErrorMessage(ErrorPrefix), .Success = false}; } - return {.Bytes = Response.downloaded_bytes, - .ElapsedSeconds = Response.elapsed, + return {.Response = Response.ResponsePayload, + .Bytes = Response.DownloadedBytes, + .ElapsedSeconds = Response.ElapsedSeconds, .ErrorCode = 0, - .Reason = Response.reason, .Success = true}; } - - cpr::Response GetWithStreaming(cpr::Session& Session, std::filesystem::path TempFolderPath, std::string_view Name, IoBuffer& OutBuffer) - { - if (TempFolderPath.empty()) - { - return Session.Get(); - } - - std::string PayloadString; - std::shared_ptr<BasicFile> PayloadFile; - - auto _ = MakeGuard([&]() { - if (PayloadFile) - { - PayloadFile.reset(); - std::filesystem::path TempPath = TempFolderPath / Name; - std::error_code Ec; - std::filesystem::remove(TempPath, Ec); - } - }); - - uint64_t Offset = 0; - Session.SetWriteCallback(cpr::WriteCallback{[&](std::string data, intptr_t) { - if (!PayloadFile && (PayloadString.length() + data.length()) > (1024 * 1024)) - { - std::filesystem::path TempPath = TempFolderPath / Name; - PayloadFile = std::make_shared<BasicFile>(); - PayloadFile->Open(TempPath, BasicFile::Mode::kTruncateDelete); - PayloadFile->Write(PayloadString.data(), PayloadString.size(), Offset); - Offset += PayloadString.size(); - PayloadString.clear(); - } - if (PayloadFile) - { - PayloadFile->Write(data.data(), data.size(), Offset); - Offset += data.size(); - } - else - { - PayloadString.append(data); - } - return true; - }}); - - cpr::Response Response = Session.Get(); - - if (!Response.error && IsHttpSuccessCode(Response.status_code)) - { - if (PayloadFile) - { - uint64_t PayloadSize = PayloadFile->FileSize(); - void* FileHandle = PayloadFile->Detach(); - PayloadFile.reset(); - OutBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, PayloadSize, /*IsWholeFile*/ true); - OutBuffer.SetDeleteOnClose(true); - } - else - { - OutBuffer = IoBufferBuilder::MakeCloneFromMemory(PayloadString.data(), PayloadString.size()); - } - return Response; - } - - Response.text.swap(PayloadString); - return Response; - } - - static std::optional<zen::HttpContentType> TryGetContentType(const cpr::Response& Response) - { - if (auto It = Response.header.find("Content-Type"); It != Response.header.end()) - { - zen::HttpContentType ContentType = zen::ParseContentType(It->second); - if (ContentType != zen::HttpContentType::kUnknownContentType) - { - return ContentType; - } - } - return {}; - } - - static IoBuffer MakeBufferFromResponseIfKnownFormat(const cpr::Response& Response) - { - std::optional<zen::HttpContentType> ContentType = TryGetContentType(Response); - if (ContentType) - { - IoBuffer Buffer = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()); - Buffer.SetContentType(ContentType.value()); - return Buffer; - } - return {}; - } - } // namespace detail CloudCacheSession::CloudCacheSession(CloudCacheClient* CacheClient) : m_Log(CacheClient->Logger()), m_CacheClient(CacheClient) { - m_SessionState = m_CacheClient->AllocSessionState(); } CloudCacheSession::~CloudCacheSession() { - m_CacheClient->FreeSessionState(m_SessionState); } CloudCacheResult CloudCacheSession::Authenticate() { - const bool RefreshToken = true; - const CloudCacheAccessToken& AccessToken = GetAccessToken(RefreshToken); - - return {.Success = AccessToken.IsValid()}; + bool OK = m_CacheClient->m_HttpClient.Authenticate(); + return {.Success = OK}; } CloudCacheResult -CloudCacheSession::GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType) +CloudCacheSession::GetRef(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + ZenContentType RefType, + std::filesystem::path TempFolderPath) { - const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"; - - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", ContentType}}); - Session.SetOption(cpr::Body{}); + ZEN_TRACE_CPU("JupiterClient::GetRef"); - cpr::Response Response = Session.Get(); - ZEN_DEBUG("GET {}", Response); + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), + TempFolderPath, + {HttpClient::Accept(RefType)}); - CloudCacheResult Result = detail::ConvertResponse(Response); - if (Result.Success) - { - Result.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()); - } - else - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::GetRef failed GET. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-Accept: '{}', " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - ContentType, - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; + return detail::ConvertResponse(Response, "CloudCacheSession::GetRef"sv); } CloudCacheResult CloudCacheSession::GetBlob(std::string_view Namespace, const IoHash& Key) { - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << Namespace << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/octet-stream"}}); - Session.SetOption(cpr::Body{}); - - cpr::Response Response = Session.Get(); - ZEN_DEBUG("GET {}", Response); + ZEN_TRACE_CPU("JupiterClient::GetBlob"); + HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), + {HttpClient::Accept(ZenContentType::kBinary)}); - CloudCacheResult Result = detail::ConvertResponse(Response); - if (Result.Success) - { - Result.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()); - } - else - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::GetBlob failed GET. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-Accept: '{}', " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - "application/octet-stream", - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; + return detail::ConvertResponse(Response); } CloudCacheResult @@ -310,58 +102,12 @@ CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& K { ZEN_TRACE_CPU("JupiterClient::GetCompressedBlob"); - ExtendableStringBuilder<256> Uri; - std::string KeyString = Key.ToHexString(); - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << KeyString; + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), + TempFolderPath, + {HttpClient::Accept(ZenContentType::kCompressedBinary)}); - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-comp"}}); - Session.SetOption(cpr::Body{}); - - IoBuffer Payload; - cpr::Response Response = detail::GetWithStreaming(Session, TempFolderPath, KeyString, Payload); - ZEN_DEBUG("GET {}", Response); - - CloudCacheResult Result = detail::ConvertResponse(Response); - if (Result.Success) - { - Result.Response = std::move(Payload); - } - else - { - std::optional<zen::HttpContentType> ContentType = detail::TryGetContentType(Response); - if (ContentType.has_value()) - { - Result.Response = std::move(Payload); - Result.Response.SetContentType(ContentType.value()); - } - ZEN_WARN( - "CloudCacheSession::GetCompressedBlob failed GET. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-Accept: '{}', " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - "application/x-ue-comp", - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; + return detail::ConvertResponse(Response); } CloudCacheResult @@ -373,59 +119,14 @@ CloudCacheSession::GetInlineBlob(std::string_view Namespace, { ZEN_TRACE_CPU("JupiterClient::GetInlineBlob"); - ExtendableStringBuilder<256> Uri; - std::string KeyString = Key.ToHexString(); - Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << KeyString; - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-jupiter-inline"}}); - Session.SetOption(cpr::Body{}); - - IoBuffer Payload; - cpr::Response Response = detail::GetWithStreaming(Session, TempFolderPath, KeyString, Payload); - ZEN_DEBUG("GET {}", Response); + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), + TempFolderPath, + {{"Accept", "application/x-jupiter-inline"}}); CloudCacheResult Result = detail::ConvertResponse(Response); - if (Result.Success) - { - Result.Response = std::move(Payload); - } - else - { - std::optional<zen::HttpContentType> ContentType = detail::TryGetContentType(Response); - if (ContentType.has_value()) - { - Result.Response = std::move(Payload); - Result.Response.SetContentType(ContentType.value()); - } - ZEN_WARN( - "CloudCacheSession::GetInlineBlob failed GET. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-Accept: '{}', " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - "application/x-jupiter-inline", - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - if (auto It = Response.header.find("X-Jupiter-InlinePayloadHash"); It != Response.header.end()) + if (auto It = Response.Header->find("X-Jupiter-InlinePayloadHash"); It != Response.Header->end()) { const std::string& PayloadHashHeader = It->second; if (PayloadHashHeader.length() == IoHash::StringLength) @@ -442,52 +143,10 @@ CloudCacheSession::GetObject(std::string_view Namespace, const IoHash& Key) { ZEN_TRACE_CPU("JupiterClient::GetObject"); - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}}); - Session.SetOption(cpr::Body{}); - - cpr::Response Response = Session.Get(); - ZEN_DEBUG("GET {}", Response); - - CloudCacheResult Result = detail::ConvertResponse(Response); - if (Result.Success) - { - Result.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()); - } - else - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::GetObject failed GET. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-Accept: '{}', " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - "application/x-ue-cb", - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } + HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), + {HttpClient::Accept(ZenContentType::kCbObject)}); - return Result; + return detail::ConvertResponse(Response); } PutRefResult @@ -495,29 +154,18 @@ CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId, { ZEN_TRACE_CPU("JupiterClient::PutRef"); - IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size()); - - const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"; + Ref.SetContentType(RefType); - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption( - cpr::Header{{"Authorization", AccessToken.Value}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", ContentType}}); - Session.SetBody(cpr::Body{(const char*)Ref.Data(), Ref.Size()}); + IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size()); - cpr::Response Response = Session.Put(); - ZEN_DEBUG("PUT {}", Response); + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Put(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), Ref); PutRefResult Result = {detail::ConvertResponse(Response)}; if (Result.Success) { std::string JsonError; - json11::Json Json = json11::Json::parse(Response.text, JsonError); + json11::Json Json = json11::Json::parse(Response.ToText(), JsonError); if (JsonError.empty()) { json11::Json::array Needs = Json["needs"].array_items(); @@ -528,37 +176,6 @@ CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId, } Result.RawHash = Hash; } - else - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::PutRef failed PUT. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-X-Jupiter-IoHash: '{}', " - "Header-ContentType: '{}', " - "ContentSize: {}, " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - Hash.ToHexString(), - ContentType, - NiceBytes(Ref.Size()), - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; } @@ -567,28 +184,16 @@ CloudCacheSession::FinalizeRef(std::string_view Namespace, std::string_view Buck { ZEN_TRACE_CPU("JupiterClient::FinalizeRef"); - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString() << "/finalize/" - << RefHash.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, - {"X-Jupiter-IoHash", RefHash.ToHexString()}, - {"Content-Type", "application/x-ue-cb"}}); - Session.SetBody(cpr::Body{}); - - cpr::Response Response = Session.Post(); - ZEN_DEBUG("POST {}", Response); + HttpClient::Response Response = m_CacheClient->m_HttpClient.Post( + fmt::format("/api/v1/refs/{}/{}/{}/finalize/{}", Namespace, BucketId, Key.ToHexString(), RefHash.ToHexString()), + {{"X-Jupiter-IoHash", RefHash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}}); FinalizeRefResult Result = {detail::ConvertResponse(Response)}; if (Result.Success) { std::string JsonError; - json11::Json Json = json11::Json::parse(Response.text, JsonError); + json11::Json Json = json11::Json::parse(std::string(Response.AsText()), JsonError); if (JsonError.empty()) { json11::Json::array Needs = Json["needs"].array_items(); @@ -598,37 +203,6 @@ CloudCacheSession::FinalizeRef(std::string_view Namespace, std::string_view Buck } } } - else - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::FinalizeRef failed PUT. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-X-Jupiter-IoHash: '{}', " - "Header-ContentType: '{}', " - "ContentSize: {}, " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - RefHash.ToHexString(), - "application/x-ue-cb", - NiceBytes(0), - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; } @@ -637,49 +211,9 @@ CloudCacheSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuff { ZEN_TRACE_CPU("JupiterClient::PutBlob"); - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << Namespace << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); + HttpClient::Response Response = m_CacheClient->m_HttpClient.Put(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), Blob); - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/octet-stream"}}); - Session.SetBody(cpr::Body{(const char*)Blob.Data(), Blob.Size()}); - - cpr::Response Response = Session.Put(); - ZEN_DEBUG("PUT {}", Response); - - CloudCacheResult Result = detail::ConvertResponse(Response); - if (!Result.Success) - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::PutBlob failed PUT. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-ContentType: '{}', " - "ContentSize: {}, " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - "application/octet-stream", - NiceBytes(Blob.Size()), - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; + return detail::ConvertResponse(Response); } CloudCacheResult @@ -687,66 +221,11 @@ CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& K { ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob"); - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-comp"}}); - - uint64_t Offset = 0; - if (Blob.IsWholeFile()) - { - auto ReadCallback = [&Blob, &Offset](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, Blob.GetSize() - Offset); - IoBuffer PayloadRange = IoBuffer(Blob, Offset, size); - MutableMemoryView Data(buffer, size); - Data.CopyFrom(PayloadRange.GetView()); - Offset += size; - return true; - }; - Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Blob.GetSize()), ReadCallback)); - } - else - { - Session.SetBody(cpr::Body{(const char*)Blob.Data(), Blob.Size()}); - } - - cpr::Response Response = Session.Put(); - ZEN_DEBUG("PUT {}", Response); + Blob.SetContentType(ZenContentType::kCompressedBinary); + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), Blob); - CloudCacheResult Result = detail::ConvertResponse(Response); - if (!Result.Success) - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::PutCompressedBlob failed PUT. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-ContentType: '{}', " - "ContentSize: {}, " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - "application/x-ue-comp", - NiceBytes(Blob.Size()), - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; + return detail::ConvertResponse(Response); } CloudCacheResult @@ -754,58 +233,12 @@ CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& K { ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob"); - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-comp"}}); - uint64_t SizeLeft = Payload.GetSize(); - CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); - auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, SizeLeft); - MutableMemoryView Data(buffer, size); - Payload.CopyTo(Data, BufferIt); - SizeLeft -= size; - return true; - }; - Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(SizeLeft), ReadCallback)); - - cpr::Response Response = Session.Put(); - ZEN_DEBUG("PUT {}", Response); + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), + Payload, + ZenContentType::kCompressedBinary); - CloudCacheResult Result = detail::ConvertResponse(Response); - if (!Result.Success) - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::PutCompressedBlob failed PUT. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-ContentType: '{}', " - "ContentSize: {}, " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - "application/x-ue-comp", - NiceBytes(Payload.GetSize()), - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; + return detail::ConvertResponse(Response); } CloudCacheResult @@ -813,49 +246,11 @@ CloudCacheSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBu { ZEN_TRACE_CPU("JupiterClient::PutObject"); - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); + Object.SetContentType(ZenContentType::kCbObject); + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), Object); - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}}); - Session.SetBody(cpr::Body{(const char*)Object.Data(), Object.Size()}); - - cpr::Response Response = Session.Put(); - ZEN_DEBUG("PUT {}", Response); - - CloudCacheResult Result = detail::ConvertResponse(Response); - if (!Result.Success) - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::PutObject failed PUT. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-ContentType: '{}', " - "ContentSize: {}, " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - "application/x-ue-cb", - NiceBytes(Object.GetSize()), - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; + return detail::ConvertResponse(Response); } CloudCacheResult @@ -863,45 +258,10 @@ CloudCacheSession::RefExists(std::string_view Namespace, std::string_view Bucket { ZEN_TRACE_CPU("JupiterClient::RefExists"); - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString())); - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}}); - Session.SetOption(cpr::Body{}); - - cpr::Response Response = Session.Head(); - ZEN_DEBUG("HEAD {}", Response); - - CloudCacheResult Result = detail::ConvertResponse(Response); - if (!Result.Success) - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::RefExists failed PUT. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; + return detail::ConvertResponse(Response); } GetObjectReferencesResult @@ -909,57 +269,20 @@ CloudCacheSession::GetObjectReferences(std::string_view Namespace, const IoHash& { ZEN_TRACE_CPU("JupiterClient::GetObjectReferences"); - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString() << "/references"; - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}}); - Session.SetOption(cpr::Body{}); - - cpr::Response Response = Session.Get(); - ZEN_DEBUG("GET {}", Response); + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/references", Namespace, Key.ToHexString()), + {HttpClient::Accept(ZenContentType::kCbObject)}); GetObjectReferencesResult Result = {detail::ConvertResponse(Response)}; if (Result.Success) { - IoBuffer Buffer = IoBuffer(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size()); - const CbObject ReferencesResponse = LoadCompactBinaryObject(Buffer); + const CbObject ReferencesResponse = Response.AsObject(); for (auto& Item : ReferencesResponse["references"sv]) { Result.References.insert(Item.AsHash()); } } - else - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::GetObjectReferences failed PUT. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-Accept: '{}', " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - "application/x-ue-cb", - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; } @@ -1002,73 +325,23 @@ CloudCacheSession::ObjectExists(std::string_view Namespace, const std::set<IoHas std::vector<IoHash> CloudCacheSession::Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes) { - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl(); - Uri << "/api/v1/s/" << Namespace; + // ExtendableStringBuilder<256> Uri; + // Uri << m_CacheClient->ServiceUrl(); + // Uri << "/api/v1/s/" << Namespace; - ZEN_UNUSED(BucketId, ChunkHashes); + ZEN_UNUSED(Namespace, BucketId, ChunkHashes); return {}; } -cpr::Session& -CloudCacheSession::GetSession() -{ - return m_SessionState->GetSession(); -} - -CloudCacheAccessToken -CloudCacheSession::GetAccessToken(bool RefreshToken) -{ - return m_SessionState->GetAccessToken(RefreshToken); -} - CloudCacheResult CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key) { ZEN_TRACE_CPU("JupiterClient::CacheTypeExists"); - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << Namespace << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}}); - Session.SetOption(cpr::Body{}); - - cpr::Response Response = Session.Head(); - ZEN_DEBUG("HEAD {}", Response); + HttpClient::Response Response = m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/{}/{}/{}", TypeId, Namespace, Key.ToHexString())); - CloudCacheResult Result = detail::ConvertResponse(Response); - if (!Result.Success) - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::CacheTypeExists failed GET. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-Accept: '{}', " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - "application/x-ue-cb", - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; + return detail::ConvertResponse(Response); } CloudCacheExistsResult @@ -1083,58 +356,23 @@ CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view Body << (Body.Size() != 1 ? ",\"" : "\"") << Key.ToHexString() << "\""; } Body << "]"; + IoBuffer Payload = IoBuffer(IoBuffer::Wrap, Body.Data(), Body.Size()); + Payload.SetContentType(ZenContentType::kJSON); - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << Namespace << "/exist"; - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); + HttpClient::Response Response = m_CacheClient->m_HttpClient.Post(fmt::format("/api/v1/{}/{}/exist", TypeId, Namespace), + Payload, + {HttpClient::Accept(ZenContentType::kCbObject)}); - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption( - cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}, {"Content-Type", "application/json"}}); - Session.SetOption(cpr::Body(Body.ToString())); - - cpr::Response Response = Session.Post(); - ZEN_DEBUG("POST {}", Response); CloudCacheExistsResult Result = {detail::ConvertResponse(Response)}; if (Result.Success) { - IoBuffer Buffer = IoBuffer(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size()); - const CbObject ExistsResponse = LoadCompactBinaryObject(Buffer); + const CbObject ExistsResponse = Response.AsObject(); for (auto& Item : ExistsResponse["needs"sv]) { Result.Needs.insert(Item.AsHash()); } } - else - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::CacheTypeExists failed GET. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-Accept: '{}', " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - "application/x-ue-cb", - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; } @@ -1233,77 +471,39 @@ CloudCacheTokenProvider::CreateFromCallback(std::function<CloudCacheAccessToken( return std::make_unique<CallbackTokenProvider>(std::move(Callback)); } +static std::optional<std::function<HttpClientAccessToken()>> +GetHttpClientAccessProvider(CloudCacheTokenProvider* TokenProvider) +{ + if (TokenProvider == nullptr) + { + return {}; + } + auto ProviderFunc = [TokenProvider]() -> HttpClientAccessToken { + CloudCacheAccessToken Token = TokenProvider->AcquireAccessToken(); + return HttpClientAccessToken{.Value = Token.Value, .ExpireTime = Token.ExpireTime}; + }; + return ProviderFunc; +} + CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options, std::unique_ptr<CloudCacheTokenProvider> TokenProvider) : m_Log(zen::logging::Get("jupiter")) -, m_ServiceUrl(Options.ServiceUrl) , m_DefaultDdcNamespace(Options.DdcNamespace) , m_DefaultBlobStoreNamespace(Options.BlobStoreNamespace) , m_ComputeCluster(Options.ComputeCluster) -, m_ConnectTimeout(Options.ConnectTimeout) -, m_Timeout(Options.Timeout) , m_TokenProvider(std::move(TokenProvider)) -, m_AssumeHttp2(Options.AssumeHttp2) +, m_HttpClient(Options.ServiceUrl, + HttpClientSettings{.ConnectTimeout = Options.ConnectTimeout, + .Timeout = Options.Timeout, + .AccessTokenProvider = GetHttpClientAccessProvider(m_TokenProvider.get()), + .AssumeHttp2 = Options.AssumeHttp2, + .AllowResume = Options.AllowResume, + .RetryCount = Options.RetryCount}) { ZEN_ASSERT(m_TokenProvider.get() != nullptr); } CloudCacheClient::~CloudCacheClient() { - RwLock::ExclusiveLockScope _(m_SessionStateLock); - - for (auto State : m_SessionStateCache) - { - delete State; - } -} - -CloudCacheAccessToken -CloudCacheClient::AcquireAccessToken() -{ - ZEN_TRACE_CPU("JupiterClient::AcquireAccessToken"); - - return m_TokenProvider->AcquireAccessToken(); -} - -detail::CloudCacheSessionState* -CloudCacheClient::AllocSessionState() -{ - detail::CloudCacheSessionState* State = nullptr; - - bool IsTokenValid = false; - - { - RwLock::ExclusiveLockScope _(m_SessionStateLock); - - if (m_SessionStateCache.empty() == false) - { - State = m_SessionStateCache.front(); - IsTokenValid = State->m_AccessToken.IsValid(); - - m_SessionStateCache.pop_front(); - } - } - - if (State == nullptr) - { - State = new detail::CloudCacheSessionState(*this); - } - - State->Reset(m_ConnectTimeout, m_Timeout, m_AssumeHttp2); - - if (IsTokenValid == false) - { - State->m_AccessToken = m_TokenProvider->AcquireAccessToken(); - } - - return State; -} - -void -CloudCacheClient::FreeSessionState(detail::CloudCacheSessionState* State) -{ - RwLock::ExclusiveLockScope _(m_SessionStateLock); - m_SessionStateCache.push_front(State); } } // namespace zen diff --git a/src/zenserver/upstream/jupiter.h b/src/zenserver/upstream/jupiter.h index b5aa95ed5..93f2cc883 100644 --- a/src/zenserver/upstream/jupiter.h +++ b/src/zenserver/upstream/jupiter.h @@ -6,6 +6,7 @@ #include <zencore/iohash.h> #include <zencore/logging.h> #include <zencore/thread.h> +#include <zenhttp/httpclient.h> #include <zenhttp/httpserver.h> #include <atomic> @@ -22,9 +23,6 @@ class Session; } namespace zen { -namespace detail { - struct CloudCacheSessionState; -} class CbObjectView; class CloudCacheClient; @@ -96,7 +94,11 @@ public: ~CloudCacheSession(); CloudCacheResult Authenticate(); - CloudCacheResult GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType); + CloudCacheResult GetRef(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + ZenContentType RefType, + std::filesystem::path TempFolderPath = {}); CloudCacheResult GetBlob(std::string_view Namespace, const IoHash& Key); CloudCacheResult GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath = {}); CloudCacheResult GetObject(std::string_view Namespace, const IoHash& Key); @@ -131,17 +133,14 @@ public: CloudCacheClient& Client() { return *m_CacheClient; }; private: - inline LoggerRef Log() { return m_Log; } - cpr::Session& GetSession(); - CloudCacheAccessToken GetAccessToken(bool RefreshToken = false); + inline LoggerRef Log() { return m_Log; } CloudCacheResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key); CloudCacheExistsResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys); - LoggerRef m_Log; - RefPtr<CloudCacheClient> m_CacheClient; - detail::CloudCacheSessionState* m_SessionState; + LoggerRef m_Log; + RefPtr<CloudCacheClient> m_CacheClient; }; /** @@ -178,6 +177,8 @@ struct CloudCacheClientOptions std::chrono::milliseconds ConnectTimeout{5000}; std::chrono::milliseconds Timeout{}; bool AssumeHttp2 = false; + bool AllowResume = false; + uint8_t RetryCount = 0; }; /** @@ -189,30 +190,20 @@ public: CloudCacheClient(const CloudCacheClientOptions& Options, std::unique_ptr<CloudCacheTokenProvider> TokenProvider); ~CloudCacheClient(); - CloudCacheAccessToken AcquireAccessToken(); - std::string_view DefaultDdcNamespace() const { return m_DefaultDdcNamespace; } - std::string_view DefaultBlobStoreNamespace() const { return m_DefaultBlobStoreNamespace; } - std::string_view ComputeCluster() const { return m_ComputeCluster; } - std::string_view ServiceUrl() const { return m_ServiceUrl; } + std::string_view DefaultDdcNamespace() const { return m_DefaultDdcNamespace; } + std::string_view DefaultBlobStoreNamespace() const { return m_DefaultBlobStoreNamespace; } + std::string_view ComputeCluster() const { return m_ComputeCluster; } + std::string_view ServiceUrl() const { return m_HttpClient.GetBaseUri(); } LoggerRef Logger() { return m_Log; } private: - LoggerRef m_Log; - std::string m_ServiceUrl; - std::string m_DefaultDdcNamespace; - std::string m_DefaultBlobStoreNamespace; - std::string m_ComputeCluster; - std::chrono::milliseconds m_ConnectTimeout{}; - std::chrono::milliseconds m_Timeout{}; - std::unique_ptr<CloudCacheTokenProvider> m_TokenProvider; - bool m_AssumeHttp2; - - RwLock m_SessionStateLock; - std::list<detail::CloudCacheSessionState*> m_SessionStateCache; - - detail::CloudCacheSessionState* AllocSessionState(); - void FreeSessionState(detail::CloudCacheSessionState*); + LoggerRef m_Log; + const std::string m_DefaultDdcNamespace; + const std::string m_DefaultBlobStoreNamespace; + const std::string m_ComputeCluster; + const std::unique_ptr<CloudCacheTokenProvider> m_TokenProvider; + HttpClient m_HttpClient; friend class CloudCacheSession; }; diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index 73a8ad538..5bcb7f5b4 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -1158,88 +1158,73 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState, } ZEN_ASSERT(OldBlockFile); - ZEN_INFO("Moving {} chunks from '{}' to new block", KeepChunkIndexes.size(), GetBlockPath(m_BlocksBasePath, BlockIndex)); - uint64_t OldBlockSize = OldBlockFile->FileSize(); - std::vector<uint8_t> Chunk; - for (const size_t& ChunkIndex : KeepChunkIndexes) + if (KeepChunkIndexes.empty()) + { + ZEN_INFO("Dropping all chunks from '{}'", GetBlockPath(m_BlocksBasePath, BlockIndex)); + } + else { - const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex]; - if (ChunkLocation.Offset + ChunkLocation.Size > OldBlockSize) + std::vector<uint8_t> Chunk; + for (const size_t& ChunkIndex : KeepChunkIndexes) { - ZEN_WARN( - "Compact Block skipping chunk outside of block range in '{}', Chunk start {}, Chunk size {} in Block {}, Block " - "size {}", - m_BlocksBasePath, - ChunkLocation.Offset, - ChunkLocation.Size, - OldBlockFile->GetPath(), - OldBlockSize); - continue; - } + const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex]; + if (ChunkLocation.Offset + ChunkLocation.Size > OldBlockSize) + { + ZEN_WARN( + "Compact Block skipping chunk outside of block range in '{}', Chunk start {}, Chunk size {} in Block {}, Block " + "size {}", + m_BlocksBasePath, + ChunkLocation.Offset, + ChunkLocation.Size, + OldBlockFile->GetPath(), + OldBlockSize); + continue; + } - Chunk.resize(ChunkLocation.Size); - OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset); + Chunk.resize(ChunkLocation.Size); + OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset); - if ((WriteOffset + Chunk.size()) > m_MaxBlockSize) - { - if (NewBlockFile) + if ((WriteOffset + Chunk.size()) > m_MaxBlockSize) { - ZEN_ASSERT_SLOW(NewBlockFile->IsOpen()); - NewBlockFile->Flush(); - MovedSize += NewBlockFile->FileSize(); - NewBlockFile = nullptr; + if (NewBlockFile) + { + ZEN_ASSERT_SLOW(NewBlockFile->IsOpen()); + NewBlockFile->Flush(); + MovedSize += NewBlockFile->FileSize(); + NewBlockFile = nullptr; - ZEN_ASSERT(!MovedChunks.empty() || RemovedSize > 0); // We should not have a new block if we haven't moved anything + ZEN_ASSERT(!MovedChunks.empty() || RemovedSize > 0); // We should not have a new block if we haven't moved anything - if (!ReportChanges()) - { - return false; + if (!ReportChanges()) + { + return false; + } } - } - uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed); - { - RwLock::ExclusiveLockScope InsertLock(m_InsertLock); - std::filesystem::path NewBlockPath; - NextBlockIndex = GetFreeBlockIndex(NextBlockIndex, InsertLock, NewBlockPath); - if (NextBlockIndex == (uint32_t)m_MaxBlockCount) + uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed); { - ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded", - m_BlocksBasePath, - static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1); - return false; - } - - NewBlockFile = new BlockStoreFile(NewBlockPath); - m_ChunkBlocks[NextBlockIndex] = NewBlockFile; - } + RwLock::ExclusiveLockScope InsertLock(m_InsertLock); + std::filesystem::path NewBlockPath; + NextBlockIndex = GetFreeBlockIndex(NextBlockIndex, InsertLock, NewBlockPath); + if (NextBlockIndex == (uint32_t)m_MaxBlockCount) + { + ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded", + m_BlocksBasePath, + static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1); + return false; + } - std::error_code Error; - DiskSpace Space = DiskSpaceInfo(m_BlocksBasePath, Error); - if (Error) - { - ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BlocksBasePath, Error.message()); - { - RwLock::ExclusiveLockScope _l(m_InsertLock); - ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile); - m_ChunkBlocks.erase(NextBlockIndex); + NewBlockFile = new BlockStoreFile(NewBlockPath); + m_ChunkBlocks[NextBlockIndex] = NewBlockFile; } - ZEN_ASSERT_SLOW(!NewBlockFile->IsOpen()); - NewBlockFile = nullptr; - return false; - } - if (Space.Free < m_MaxBlockSize) - { - uint64_t ReclaimedSpace = DiskReserveCallback(); - if (Space.Free + ReclaimedSpace < m_MaxBlockSize) + std::error_code Error; + DiskSpace Space = DiskSpaceInfo(m_BlocksBasePath, Error); + if (Error) { - ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}", - m_BlocksBasePath, - m_MaxBlockSize, - NiceBytes(Space.Free + ReclaimedSpace)); + ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BlocksBasePath, Error.message()); { RwLock::ExclusiveLockScope _l(m_InsertLock); ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile); @@ -1250,23 +1235,42 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState, return false; } - ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}", - m_BlocksBasePath, - ReclaimedSpace, - NiceBytes(Space.Free + ReclaimedSpace)); + if (Space.Free < m_MaxBlockSize) + { + uint64_t ReclaimedSpace = DiskReserveCallback(); + if (Space.Free + ReclaimedSpace < m_MaxBlockSize) + { + ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}", + m_BlocksBasePath, + m_MaxBlockSize, + NiceBytes(Space.Free + ReclaimedSpace)); + { + RwLock::ExclusiveLockScope _l(m_InsertLock); + ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile); + m_ChunkBlocks.erase(NextBlockIndex); + } + ZEN_ASSERT_SLOW(!NewBlockFile->IsOpen()); + NewBlockFile = nullptr; + return false; + } + + ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}", + m_BlocksBasePath, + ReclaimedSpace, + NiceBytes(Space.Free + ReclaimedSpace)); + } + NewBlockFile->Create(m_MaxBlockSize); + NewBlockIndex = NextBlockIndex; + WriteOffset = 0; } - NewBlockFile->Create(m_MaxBlockSize); - NewBlockIndex = NextBlockIndex; - WriteOffset = 0; - } - NewBlockFile->Write(Chunk.data(), ChunkLocation.Size, WriteOffset); - MovedChunks.push_back( - {ChunkIndex, {.BlockIndex = NewBlockIndex, .Offset = gsl::narrow<uint32_t>(WriteOffset), .Size = ChunkLocation.Size}}); - WriteOffset = RoundUp(WriteOffset + ChunkLocation.Size, PayloadAlignment); - AddedSize += Chunk.size(); + NewBlockFile->Write(Chunk.data(), ChunkLocation.Size, WriteOffset); + MovedChunks.push_back( + {ChunkIndex, {.BlockIndex = NewBlockIndex, .Offset = gsl::narrow<uint32_t>(WriteOffset), .Size = ChunkLocation.Size}}); + WriteOffset = RoundUp(WriteOffset + ChunkLocation.Size, PayloadAlignment); + AddedSize += Chunk.size(); + } } - Chunk.clear(); if (!ReportChanges()) { diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index f18509758..7839c7132 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -1436,6 +1436,8 @@ public: NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); + ZEN_INFO("GCV2: filecas [COMPACT] '{}': Removing {} files", m_FileCasStrategy.m_RootDirectory, m_ReferencesToClean.size()); + size_t Skipped = 0; for (const IoHash& ChunkHash : m_ReferencesToClean) { |