diff options
| author | Dan Engelbrecht <[email protected]> | 2024-01-24 11:41:18 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-01-24 11:41:18 +0100 |
| commit | 0e63573fbe9973f6b922656a785817a711581b78 (patch) | |
| tree | 48e18f0b4aea958a536ba50f72f589a580c4b798 | |
| parent | oplog import/export improvements (#634) (diff) | |
| download | zen-0e63573fbe9973f6b922656a785817a711581b78.tar.xz zen-0e63573fbe9973f6b922656a785817a711581b78.zip | |
Add retry with optional resume logic to HttpClient::Download (#639)
- Improvement: Refactored Jupiter upstream to use HttpClient
- Improvement: Added retry and resume logic to HttpClient
- Improvement: Added authentication support to HttpClient
- Improvement: Clearer logging in GCV2 compact of FileCas/BlockStore
- Improvement: Size details in oplog import logging
| -rw-r--r-- | CHANGELOG.md | 5 | ||||
| -rw-r--r-- | src/zenhttp/httpclient.cpp | 530 | ||||
| -rw-r--r-- | src/zenhttp/include/zenhttp/httpclient.h | 35 | ||||
| -rw-r--r-- | src/zenserver/projectstore/jupiterremoteprojectstore.cpp | 82 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 12 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 58 | ||||
| -rw-r--r-- | src/zenserver/upstream/jupiter.cpp | 1012 | ||||
| -rw-r--r-- | src/zenserver/upstream/jupiter.h | 51 | ||||
| -rw-r--r-- | src/zenstore/blockstore.cpp | 164 | ||||
| -rw-r--r-- | src/zenstore/filecas.cpp | 2 |
10 files changed, 693 insertions, 1258 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index de0977f95..de449e164 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,11 @@ - Improvement: Get more detailed information on Jupiter upstream errors - Improvement: Improved performance when saving oplog via oplog import command - Improvement: Add more feedback and progress information when executing oplog import/export +- Improvement: Refactored Jupiter upstream to use HttpClient +- Improvement: Added retry and resume logic to HttpClient +- Improvement: Added authentication support to HttpClient +- Improvement: Clearer logging in GCV2 compact of FileCas/BlockStore +- Improvement: Size details in oplog import logging - Bugfix: RPC recording would not release memory as early as intended which resulted in memory buildup during long recording sessions. Previously certain memory was only released when recording stopped, now it gets released immediately when a segment is complete and written to disk. - Bugfix: File log format now contains dates again (PR #631) - Bugfix: Jobqueue - Allow multiple threads to report progress/messages (oplog import/export) diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp index a29a08a3c..3b2a3baec 100644 --- a/src/zenhttp/httpclient.cpp +++ b/src/zenhttp/httpclient.cpp @@ -13,6 +13,7 @@ #include <zencore/session.h> #include <zencore/sharedbuffer.h> #include <zencore/stream.h> +#include <zencore/string.h> #include <zencore/testing.h> #include <zencore/trace.h> #include <zenhttp/formatters.h> @@ -38,19 +39,19 @@ using namespace std::literals; // // CPR helpers -cpr::Body +static cpr::Body AsCprBody(const CbObject& Obj) { return cpr::Body((const char*)Obj.GetBuffer().GetData(), Obj.GetBuffer().GetSize()); } -cpr::Body +static cpr::Body AsCprBody(const IoBuffer& Obj) { return cpr::Body((const char*)Obj.GetData(), Obj.GetSize()); } -cpr::Body +static cpr::Body AsCprBody(const CompositeBuffer& Buffers) { SharedBuffer Buffer = Buffers.Flatten(); @@ -62,7 +63,7 @@ AsCprBody(const CompositeBuffer& Buffers) ////////////////////////////////////////////////////////////////////////// -HttpClient::Response +static HttpClient::Response ResponseWithPayload(cpr::Response& HttpResponse, const HttpResponseCode WorkResponseCode, IoBuffer&& Payload) { // This ends up doing a memcpy, would be good to get rid of it by streaming results @@ -89,7 +90,7 @@ ResponseWithPayload(cpr::Response& HttpResponse, const HttpResponseCode WorkResp .ElapsedSeconds = HttpResponse.elapsed}; } -HttpClient::Response +static HttpClient::Response CommonResponse(cpr::Response&& HttpResponse, IoBuffer&& Payload = {}) { const HttpResponseCode WorkResponseCode = HttpResponseCode(HttpResponse.status_code); @@ -123,6 +124,51 @@ CommonResponse(cpr::Response&& HttpResponse, IoBuffer&& Payload = {}) } } +static bool +ShouldRetry(const cpr::Response& Response) +{ + switch (Response.error.code) + { + case cpr::ErrorCode::OK: + break; + case cpr::ErrorCode::OPERATION_TIMEDOUT: + case cpr::ErrorCode::NETWORK_RECEIVE_ERROR: + case cpr::ErrorCode::NETWORK_SEND_FAILURE: + return true; + default: + return false; + } + switch ((HttpResponseCode)Response.status_code) + { + case HttpResponseCode::GatewayTimeout: + case HttpResponseCode::RequestTimeout: + return true; + default: + return false; + } +}; + +static cpr::Response +DoWithRetry(std::function<cpr::Response()>&& Func, uint8_t RetryCount) +{ + uint8_t Attempt = 0; + cpr::Response Result = Func(); + while (Attempt < RetryCount && ShouldRetry(Result)) + { + Sleep(100 * (Attempt + 1)); + Attempt++; + ZEN_INFO("{} Attempt {}/{}", CommonResponse(std::move(Result)).ErrorMessage("Retry"), Attempt, RetryCount + 1); + Result = Func(); + } + return Result; +} + +static std::pair<std::string, std::string> +HeaderContentType(ZenContentType ContentType) +{ + return std::make_pair("Content-Type", std::string(MapContentTypeToString(ContentType))); +} + ////////////////////////////////////////////////////////////////////////// struct HttpClient::Impl : public RefCounted @@ -144,9 +190,13 @@ struct HttpClient::Impl : public RefCounted ZEN_TRACE("GET {}", Result); return Result; } - inline cpr::Response Download(cpr::WriteCallback&& write) + inline cpr::Response Download(cpr::WriteCallback&& Write, std::optional<cpr::HeaderCallback>&& Header = {}) { - cpr::Response Result = CprSession->Download(write); + if (Header) + { + CprSession->SetHeaderCallback(std::move(Header.value())); + } + cpr::Response Result = CprSession->Download(Write); ZEN_TRACE("GET {}", Result); return Result; } @@ -185,11 +235,13 @@ struct HttpClient::Impl : public RefCounted Session& operator=(Session&&) = delete; }; - Session AllocSession(const std::string_view BaseUrl, - const std::string_view Url, - const HttpClientSettings& ConnectionSettings, - const KeyValueMap& AdditionalHeader, - const KeyValueMap& Parameters); + Session AllocSession(const std::string_view BaseUrl, + const std::string_view Url, + const HttpClientSettings& ConnectionSettings, + const KeyValueMap& AdditionalHeader, + const KeyValueMap& Parameters, + const std::string_view SessionId, + std::optional<HttpClientAccessToken> AccessToken); LoggerRef Logger() { return m_Log; } @@ -217,29 +269,26 @@ HttpClient::Impl::~Impl() } HttpClient::Impl::Session -HttpClient::Impl::AllocSession(const std::string_view BaseUrl, - const std::string_view ResourcePath, - const HttpClientSettings& ConnectionSettings, - const KeyValueMap& AdditionalHeader, - const KeyValueMap& Parameters) +HttpClient::Impl::AllocSession(const std::string_view BaseUrl, + const std::string_view ResourcePath, + const HttpClientSettings& ConnectionSettings, + const KeyValueMap& AdditionalHeader, + const KeyValueMap& Parameters, + const std::string_view SessionId, + std::optional<HttpClientAccessToken> AccessToken) { - bool IsNew = false; cpr::Session* CprSession = nullptr; m_SessionLock.WithExclusiveLock([&] { - if (m_Sessions.empty()) - { - CprSession = new cpr::Session(); - IsNew = true; - } - else + if (!m_Sessions.empty()) { CprSession = m_Sessions.back(); m_Sessions.pop_back(); } }); - if (IsNew) + if (CprSession == nullptr) { + CprSession = new cpr::Session(); CprSession->SetConnectTimeout(ConnectionSettings.ConnectTimeout); CprSession->SetTimeout(ConnectionSettings.Timeout); if (ConnectionSettings.AssumeHttp2) @@ -252,9 +301,13 @@ HttpClient::Impl::AllocSession(const std::string_view BaseUrl, { CprSession->SetHeader(cpr::Header(AdditionalHeader->begin(), AdditionalHeader->end())); } - else + if (!SessionId.empty()) { - CprSession->SetHeader({}); + CprSession->UpdateHeader({{"UE-Session", std::string(SessionId)}}); + } + if (AccessToken) + { + CprSession->UpdateHeader({{"Authorization", AccessToken->Value}}); } if (!Parameters->empty()) { @@ -430,6 +483,9 @@ public: return Buffer; } + uint64_t GetSize() const { return m_WriteOffset; } + void ResetWritePos(uint64_t WriteOffset) { m_WriteOffset = WriteOffset; } + private: void* m_FileHandle; std::uint64_t m_WriteOffset; @@ -452,12 +508,46 @@ HttpClient::~HttpClient() { } +bool +HttpClient::Authenticate() +{ + std::optional<HttpClientAccessToken> Token = GetAccessToken(); + if (!Token) + { + return false; + } + return Token->IsValid(); +} + +const std::optional<HttpClientAccessToken> +HttpClient::GetAccessToken() +{ + if (!m_ConnectionSettings.AccessTokenProvider.has_value()) + { + return {}; + } + { + RwLock::SharedLockScope _(m_AccessTokenLock); + if (m_CachedAccessToken.IsValid()) + { + return m_CachedAccessToken; + } + } + RwLock::ExclusiveLockScope _(m_AccessTokenLock); + if (m_CachedAccessToken.IsValid()) + { + return m_CachedAccessToken; + } + m_CachedAccessToken = m_ConnectionSettings.AccessTokenProvider.value()(); + return m_CachedAccessToken; +} + HttpClient::Response HttpClient::TransactPackage(std::string_view Url, CbPackage Package, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("HttpClient::TransactPackage"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); + Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); // First, list of offered chunks for filtering on the server end @@ -482,7 +572,7 @@ HttpClient::TransactPackage(std::string_view Url, CbPackage Package, const KeyVa BinaryWriter MemWriter; Writer.Save(MemWriter); - Sess->UpdateHeader({{"Content-Type", "application/x-ue-offer"}, {"UE-Session", m_SessionId}, {"UE-Request", RequestIdString}}); + Sess->UpdateHeader({HeaderContentType(HttpContentType::kCbPackageOffer), {"UE-Request", RequestIdString}}); Sess->SetBody(cpr::Body{(const char*)MemWriter.Data(), MemWriter.Size()}); cpr::Response FilterResponse = Sess.Post(); @@ -524,7 +614,7 @@ HttpClient::TransactPackage(std::string_view Url, CbPackage Package, const KeyVa CompositeBuffer Message = FormatPackageMessageBuffer(SendPackage); SharedBuffer FlatMessage = Message.Flatten(); - Sess->UpdateHeader({{"Content-Type", "application/x-ue-cbpkg"}, {"UE-Session", m_SessionId}, {"UE-Request", RequestIdString}}); + Sess->UpdateHeader({HeaderContentType(HttpContentType::kCbPackage), {"UE-Request", RequestIdString}}); Sess->SetBody(cpr::Body{(const char*)FlatMessage.GetData(), FlatMessage.GetSize()}); cpr::Response FilterResponse = Sess.Post(); @@ -556,20 +646,28 @@ HttpClient::Put(std::string_view Url, const IoBuffer& Payload, const KeyValueMap { ZEN_TRACE_CPU("HttpClient::Put"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); - Sess->SetBody(AsCprBody(Payload)); - Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(Payload.GetContentType()))}}); - - return CommonResponse(Sess.Put()); + return CommonResponse(DoWithRetry( + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + Sess->SetBody(AsCprBody(Payload)); + Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())}); + return Sess.Put(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response HttpClient::Get(std::string_view Url, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters) { ZEN_TRACE_CPU("HttpClient::Get"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters); - - return CommonResponse(Sess.Get()); + return CommonResponse(DoWithRetry( + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters, m_SessionId, GetAccessToken()); + return Sess.Get(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -577,9 +675,13 @@ HttpClient::Head(std::string_view Url, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("HttpClient::Head"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); - - return CommonResponse(Sess.Head()); + return CommonResponse(DoWithRetry( + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + return Sess.Head(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -587,9 +689,13 @@ HttpClient::Delete(std::string_view Url, const KeyValueMap& AdditionalHeader) { ZEN_TRACE_CPU("HttpClient::Delete"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); - - return CommonResponse(Sess.Delete()); + return CommonResponse(DoWithRetry( + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + return Sess.Delete(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -597,9 +703,13 @@ HttpClient::Post(std::string_view Url, const KeyValueMap& AdditionalHeader, cons { ZEN_TRACE_CPU("HttpClient::PostNoPayload"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters); - - return CommonResponse(Sess.Post()); + return CommonResponse(DoWithRetry( + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, Parameters, m_SessionId, GetAccessToken()); + return Sess.Post(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -607,12 +717,16 @@ HttpClient::Post(std::string_view Url, const IoBuffer& Payload, const KeyValueMa { ZEN_TRACE_CPU("HttpClient::PostWithPayload"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); + return CommonResponse(DoWithRetry( + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); - Sess->SetBody(AsCprBody(Payload)); - Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(Payload.GetContentType()))}}); - - return CommonResponse(Sess.Post()); + Sess->SetBody(AsCprBody(Payload)); + Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())}); + return Sess.Post(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -620,12 +734,16 @@ HttpClient::Post(std::string_view Url, CbObject Payload, const KeyValueMap& Addi { ZEN_TRACE_CPU("HttpClient::PostObjectPayload"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); - - Sess->SetBody(AsCprBody(Payload)); - Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(ZenContentType::kCbObject))}}); + return CommonResponse(DoWithRetry( + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); - return CommonResponse(Sess.Post()); + Sess->SetBody(AsCprBody(Payload)); + Sess->UpdateHeader({HeaderContentType(ZenContentType::kCbObject)}); + return Sess.Post(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -633,13 +751,17 @@ HttpClient::Post(std::string_view Url, CbPackage Pkg, const KeyValueMap& Additio { ZEN_TRACE_CPU("HttpClient::PostPackage"); - CompositeBuffer Message = zen::FormatPackageMessageBuffer(Pkg); - - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); - Sess->SetBody(AsCprBody(Message)); - Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(ZenContentType::kCbPackage))}}); - - return CommonResponse(Sess.Post()); + return CommonResponse(DoWithRetry( + [&]() { + CompositeBuffer Message = zen::FormatPackageMessageBuffer(Pkg); + + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + Sess->SetBody(AsCprBody(Message)); + Sess->UpdateHeader({HeaderContentType(ZenContentType::kCbPackage)}); + return Sess.Post(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -647,27 +769,32 @@ HttpClient::Upload(std::string_view Url, const IoBuffer& Payload, const KeyValue { ZEN_TRACE_CPU("HttpClient::Upload"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); - Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(Payload.GetContentType()))}}); + return CommonResponse(DoWithRetry( + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + Sess->UpdateHeader({HeaderContentType(Payload.GetContentType())}); - uint64_t Offset = 0; - if (Payload.IsWholeFile()) - { - auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, Payload.GetSize() - Offset); - IoBuffer PayloadRange = IoBuffer(Payload, Offset, size); - MutableMemoryView Data(buffer, size); - Data.CopyFrom(PayloadRange.GetView()); - Offset += size; - return true; - }; - Sess->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); - } - else - { - Sess->SetBody(AsCprBody(Payload)); - } - return CommonResponse(Sess.Put()); + uint64_t Offset = 0; + if (Payload.IsWholeFile()) + { + auto ReadCallback = [&Payload, &Offset](char* buffer, size_t& size, intptr_t) { + size = Min<size_t>(size, Payload.GetSize() - Offset); + IoBuffer PayloadRange = IoBuffer(Payload, Offset, size); + MutableMemoryView Data(buffer, size); + Data.CopyFrom(PayloadRange.GetView()); + Offset += size; + return true; + }; + Sess->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); + } + else + { + Sess->SetBody(AsCprBody(Payload)); + } + return Sess.Put(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -675,21 +802,26 @@ HttpClient::Upload(std::string_view Url, const CompositeBuffer& Payload, ZenCont { ZEN_TRACE_CPU("HttpClient::Upload"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); - Sess->UpdateHeader(cpr::Header{{"Content-Type", std::string(MapContentTypeToString(ContentType))}}); - - uint64_t SizeLeft = Payload.GetSize(); - CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); - auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, SizeLeft); - MutableMemoryView Data(buffer, size); - Payload.CopyTo(Data, BufferIt); - SizeLeft -= size; - return true; - }; - Sess->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); - - return CommonResponse(Sess.Put()); + return CommonResponse(DoWithRetry( + [&]() { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + Sess->UpdateHeader({HeaderContentType(ContentType)}); + + uint64_t SizeLeft = Payload.GetSize(); + CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); + auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { + size = Min<size_t>(size, SizeLeft); + MutableMemoryView Data(buffer, size); + Payload.CopyTo(Data, BufferIt); + SizeLeft -= size; + return true; + }; + Sess->SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Payload.GetSize()), ReadCallback)); + + return Sess.Put(); + }, + m_ConnectionSettings.RetryCount)); } HttpClient::Response @@ -697,46 +829,170 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold { ZEN_TRACE_CPU("HttpClient::Download"); - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}); - std::string PayloadString; std::unique_ptr<TempPayloadFile> PayloadFile; - - cpr::Response Response = Sess.Download(cpr::WriteCallback{[&](std::string data, intptr_t) { - if (!PayloadFile && (PayloadString.length() + data.length()) > (1024 * 1024)) - { - PayloadFile = std::make_unique<TempPayloadFile>(); - std::error_code Ec = PayloadFile->Open(TempFolderPath); - if (Ec) - { - ZEN_WARN("Failed to create temp file in '{}' for HttpClient::Download. Reason: {}", TempFolderPath.string(), Ec.message()); - return false; - } - PayloadFile->Write(PayloadString); - PayloadString.clear(); - } - if (PayloadFile) - { - std::error_code Ec = PayloadFile->Write(data); - if (Ec) - { - ZEN_WARN("Failed to write to temp file in '{}' for HttpClient::Download. Reason: {}", - TempFolderPath.string(), - Ec.message()); - return false; - } - } - else - { - PayloadString.append(data); - } - return true; - }}); - - if (!PayloadString.empty()) - { - Response.text = std::move(PayloadString); - } + cpr::Response Response = DoWithRetry( + [&]() { + auto DownloadCallback = [&](std::string data, intptr_t) { + if (!PayloadFile && (PayloadString.length() + data.length()) > (1024 * 1024)) + { + PayloadFile = std::make_unique<TempPayloadFile>(); + std::error_code Ec = PayloadFile->Open(TempFolderPath); + if (Ec) + { + ZEN_WARN("Failed to create temp file in '{}' for HttpClient::Download. Reason: {}", + TempFolderPath.string(), + Ec.message()); + return false; + } + PayloadFile->Write(PayloadString); + PayloadString.clear(); + } + if (PayloadFile) + { + std::error_code Ec = PayloadFile->Write(data); + if (Ec) + { + ZEN_WARN("Failed to write to temp file in '{}' for HttpClient::Download. Reason: {}", + TempFolderPath.string(), + Ec.message()); + return false; + } + } + else + { + PayloadString.append(data); + } + return true; + }; + + cpr::Response Response; + { + Impl::Session Sess = + m_Impl->AllocSession(m_BaseUri, Url, m_ConnectionSettings, AdditionalHeader, {}, m_SessionId, GetAccessToken()); + Response = Sess.Download(cpr::WriteCallback{DownloadCallback}); + } + + if (m_ConnectionSettings.AllowResume) + { + auto SupportsRanges = [](const cpr::Response& Response) -> bool { + if (Response.header.find("Content-Range") != Response.header.end()) + { + return true; + } + if (auto It = Response.header.find("Accept-Ranges"); It != Response.header.end()) + { + return It->second == "bytes"; + } + return false; + }; + + auto ShouldResume = [&SupportsRanges](const cpr::Response& Response) -> bool { + if (ShouldRetry(Response)) + { + return SupportsRanges(Response); + } + return false; + }; + + if (ShouldResume(Response)) + { + auto It = Response.header.find("Content-Length"); + if (It != Response.header.end()) + { + std::optional<int64_t> ContentLength = ParseInt<int64_t>(It->second); + if (ContentLength) + { + auto HeaderCallback = [&](std::string header, intptr_t) { + size_t DelimiterPos = header.find(':'); + if (DelimiterPos != std::string::npos) + { + std::string Key = header.substr(0, DelimiterPos); + constexpr AsciiSet WhitespaceCharacters(" \v\f\t\r\n"); + Key = AsciiSet::TrimSuffixWith(Key, WhitespaceCharacters); + Key = AsciiSet::TrimPrefixWith(Key, WhitespaceCharacters); + + std::string Value = header.substr(DelimiterPos + 1); + Value = AsciiSet::TrimSuffixWith(Value, WhitespaceCharacters); + Value = AsciiSet::TrimPrefixWith(Value, WhitespaceCharacters); + + Response.header.insert_or_assign(Key, Value); + + if (Key == "Content-Range"sv) + { + if (Value.starts_with("bytes ")) + { + size_t RangeStartEnd = Value.find('-', 6); + if (RangeStartEnd != std::string::npos) + { + const auto Start = ParseInt<uint64_t>(Value.substr(6, RangeStartEnd - 6)); + if (Start) + { + uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length(); + if (Start.value() == DownloadedSize) + { + return 1; + } + else if (Start.value() > DownloadedSize) + { + return 0; + } + if (PayloadFile) + { + PayloadFile->ResetWritePos(Start.value()); + } + else + { + PayloadString = PayloadString.substr(0, Start.value()); + } + return 1; + } + } + } + return 0; + } + } + return 1; + }; + + KeyValueMap HeadersWithRange(AdditionalHeader); + do + { + uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length(); + + std::string Range = fmt::format("{}-{}", DownloadedSize, ContentLength.value()); + if (auto RangeIt = HeadersWithRange.Entries.find("Range"); RangeIt != HeadersWithRange.Entries.end()) + { + if (RangeIt->second == Range) + { + // If we didn't make any progress, abort + break; + } + } + HeadersWithRange.Entries.insert_or_assign("Range", Range); + + Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, + Url, + m_ConnectionSettings, + HeadersWithRange, + {}, + m_SessionId, + GetAccessToken()); + Response.header.clear(); + Response = Sess.Download(cpr::WriteCallback{DownloadCallback}, cpr::HeaderCallback{HeaderCallback}); + } while (ShouldResume(Response)); + } + } + } + } + + if (!PayloadString.empty()) + { + Response.text = std::move(PayloadString); + } + return Response; + }, + m_ConnectionSettings.RetryCount); return CommonResponse(std::move(Response), PayloadFile ? PayloadFile->DetachToIoBuffer() : IoBuffer{}); } diff --git a/src/zenhttp/include/zenhttp/httpclient.h b/src/zenhttp/include/zenhttp/httpclient.h index 9de5c7cce..f3559f214 100644 --- a/src/zenhttp/include/zenhttp/httpclient.h +++ b/src/zenhttp/include/zenhttp/httpclient.h @@ -6,9 +6,11 @@ #include <zencore/iobuffer.h> #include <zencore/logbase.h> +#include <zencore/thread.h> #include <zencore/uid.h> #include <zenhttp/httpcommon.h> +#include <functional> #include <optional> #include <unordered_map> @@ -27,12 +29,32 @@ class CompositeBuffer; */ +struct HttpClientAccessToken +{ + using Clock = std::chrono::system_clock; + using TimePoint = Clock::time_point; + + static constexpr int64_t ExpireMarginInSeconds = 30; + + std::string Value; + TimePoint ExpireTime; + + bool IsValid() const + { + return Value.empty() == false && + ExpireMarginInSeconds < std::chrono::duration_cast<std::chrono::seconds>(ExpireTime - Clock::now()).count(); + } +}; + struct HttpClientSettings { - std::string LogCategory = "httpclient"; - std::chrono::milliseconds ConnectTimeout{3000}; - std::chrono::milliseconds Timeout{}; - bool AssumeHttp2 = false; + std::string LogCategory = "httpclient"; + std::chrono::milliseconds ConnectTimeout{3000}; + std::chrono::milliseconds Timeout{}; + std::optional<std::function<HttpClientAccessToken()>> AccessTokenProvider; + bool AssumeHttp2 = false; + bool AllowResume = false; + uint8_t RetryCount = 0; }; class HttpClient @@ -134,6 +156,7 @@ public: const CompositeBuffer& Payload, ZenContentType ContentType, const KeyValueMap& AdditionalHeader = {}); + [[nodiscard]] Response Download(std::string_view Url, const std::filesystem::path& TempFolderPath, const KeyValueMap& AdditionalHeader = {}); @@ -147,14 +170,18 @@ public: LoggerRef Logger() { return m_Log; } std::string_view GetBaseUri() const { return m_BaseUri; } + bool Authenticate(); private: + const std::optional<HttpClientAccessToken> GetAccessToken(); struct Impl; LoggerRef m_Log; std::string m_BaseUri; std::string m_SessionId; const HttpClientSettings m_ConnectionSettings; + RwLock m_AccessTokenLock; + HttpClientAccessToken m_CachedAccessToken; Ref<Impl> m_Impl; }; diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp index 9d8f6c17b..c9f1f5f6f 100644 --- a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp +++ b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp @@ -54,19 +54,8 @@ public: virtual SaveResult SaveContainer(const IoBuffer& Payload) override { - const int32_t MaxAttempts = 3; - PutRefResult PutResult; - { - CloudCacheSession Session(m_CloudClient.Get()); - for (int32_t Attempt = 0; Attempt < MaxAttempts && !PutResult.Success; Attempt++) - { - PutResult = Session.PutRef(m_Namespace, m_Bucket, m_Key, Payload, ZenContentType::kCbObject); - if (!PutResult.Success) - { - Sleep(100 * (Attempt + 1)); - } - } - } + CloudCacheSession Session(m_CloudClient.Get()); + PutRefResult PutResult = Session.PutRef(m_Namespace, m_Bucket, m_Key, Payload, ZenContentType::kCbObject); SaveResult Result{ConvertResult(PutResult), {PutResult.Needs.begin(), PutResult.Needs.end()}, PutResult.RawHash}; if (Result.ErrorCode) @@ -83,19 +72,8 @@ public: virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override { - const int32_t MaxAttempts = 3; - CloudCacheResult PutResult; - { - CloudCacheSession Session(m_CloudClient.Get()); - for (int32_t Attempt = 0; Attempt < MaxAttempts && !PutResult.Success; Attempt++) - { - PutResult = Session.PutCompressedBlob(m_Namespace, RawHash, Payload); - if (!PutResult.Success) - { - Sleep(100 * (Attempt + 1)); - } - } - } + CloudCacheSession Session(m_CloudClient.Get()); + CloudCacheResult PutResult = Session.PutCompressedBlob(m_Namespace, RawHash, Payload); SaveAttachmentResult Result{ConvertResult(PutResult)}; if (Result.ErrorCode) @@ -126,20 +104,9 @@ public: virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) override { - const int32_t MaxAttempts = 3; - FinalizeRefResult FinalizeRefResult; - { - CloudCacheSession Session(m_CloudClient.Get()); - for (int32_t Attempt = 0; Attempt < MaxAttempts && !FinalizeRefResult.Success; Attempt++) - { - FinalizeRefResult = Session.FinalizeRef(m_Namespace, m_Bucket, m_Key, RawHash); - if (!FinalizeRefResult.Success) - { - Sleep(100 * (Attempt + 1)); - } - } - } - FinalizeResult Result{ConvertResult(FinalizeRefResult), {FinalizeRefResult.Needs.begin(), FinalizeRefResult.Needs.end()}}; + CloudCacheSession Session(m_CloudClient.Get()); + FinalizeRefResult FinalizeRefResult = Session.FinalizeRef(m_Namespace, m_Bucket, m_Key, RawHash); + FinalizeResult Result{ConvertResult(FinalizeRefResult), {FinalizeRefResult.Needs.begin(), FinalizeRefResult.Needs.end()}}; if (Result.ErrorCode) { Result.Reason = fmt::format("Failed finalizing oplog container to {}/{}/{}/{}. Reason: '{}'", @@ -165,19 +132,8 @@ public: virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override { - const int32_t MaxAttempts = 3; - CloudCacheResult GetResult; - { - CloudCacheSession Session(m_CloudClient.Get()); - for (int32_t Attempt = 0; Attempt < MaxAttempts && !GetResult.Success; Attempt++) - { - GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath); - if (!GetResult.Success) - { - Sleep(100 * (Attempt + 1)); - } - } - } + CloudCacheSession Session(m_CloudClient.Get()); + CloudCacheResult GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath); LoadAttachmentResult Result{ConvertResult(GetResult), std::move(GetResult.Response)}; if (GetResult.ErrorCode) { @@ -210,20 +166,8 @@ public: private: LoadContainerResult LoadContainer(const IoHash& Key) { - const int32_t MaxAttempts = 3; - CloudCacheResult GetResult; - { - CloudCacheSession Session(m_CloudClient.Get()); - for (int32_t Attempt = 0; Attempt < MaxAttempts && !GetResult.Success; Attempt++) - { - GetResult = Session.GetRef(m_Namespace, m_Bucket, Key, ZenContentType::kCbObject); - if (!GetResult.Success) - { - Sleep(100 * (Attempt + 1)); - } - } - } - + CloudCacheSession Session(m_CloudClient.Get()); + CloudCacheResult GetResult = Session.GetRef(m_Namespace, m_Bucket, Key, ZenContentType::kCbObject, m_TempFilePath); if (GetResult.ErrorCode || !GetResult.Success) { LoadContainerResult Result{ConvertResult(GetResult)}; @@ -312,7 +256,9 @@ CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, const std::fi .ServiceUrl = Url, .ConnectTimeout = std::chrono::milliseconds(2000), .Timeout = std::chrono::milliseconds(1800000), - .AssumeHttp2 = Options.AssumeHttp2}; + .AssumeHttp2 = Options.AssumeHttp2, + .AllowResume = true, + .RetryCount = 2}; // 1) Access token as parameter in request // 2) Environment variable (different win vs linux/mac) // 3) openid-provider (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 42af9b79b..f117a4203 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -3593,12 +3593,16 @@ ProjectStore::CreateReferenceCheckers(GcCtx& Ctx) Checkers.reserve(OpLogs.size()); for (const std::string& OpLogId : OpLogs) { - ProjectStore::Oplog* Oplog = Project->OpenOplog(OpLogId); - GcClock::TimePoint Now = GcClock::Now(); - bool TryPreCache = Project->LastOplogAccessTime(OpLogId) < (Now - std::chrono::minutes(5)); + ProjectStore::Oplog* Oplog = Project->OpenOplog(OpLogId); + if (Oplog == nullptr) + { + continue; + } + GcClock::TimePoint Now = GcClock::Now(); + bool TryPreCache = Project->LastOplogAccessTime(OpLogId) < (Now - std::chrono::minutes(5)); Checkers.emplace_back(new ProjectStoreReferenceChecker(*Oplog, TryPreCache)); + OplogCount++; } - OplogCount += OpLogs.size(); } } catch (std::exception&) diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index ddab7432d..83cec4725 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -560,7 +560,7 @@ BuildContainer(CidStore& ChunkStore, RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), Sb.ToString(), {}); ReportMessage(OptionalContext, - fmt::format("Failed to build container ({}): '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + fmt::format("Failed to build container ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); BlockCreateLatch.CountDown(); while (!BlockCreateLatch.Wait(1000)) @@ -893,11 +893,9 @@ UploadAttachments(WorkerThreadPool& WorkerPool, RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), "Invalid attachment", fmt::format("Upload requested of unknown attachment '{}'", Needed)); - ReportMessage(OptionalContext, - fmt::format("Failed to upload attachment '{}'. ({}): '{}'", - Needed, - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); + ReportMessage( + OptionalContext, + fmt::format("Failed to upload attachment '{}'. ({}): {}", Needed, RemoteResult.GetError(), RemoteResult.GetErrorReason())); return; } } @@ -969,7 +967,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool, RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::NotFound), fmt::format("Failed to find attachment {}", RawHash), {}); - ZEN_WARN("Failed to save attachment '{}' ({}): '{}'", RawHash, RemoteResult.GetError(), RemoteResult.GetErrorReason()); + ZEN_WARN("Failed to save attachment '{}' ({}): {}", RawHash, RemoteResult.GetError(), RemoteResult.GetErrorReason()); return; } @@ -978,7 +976,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool, { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); ReportMessage(OptionalContext, - fmt::format("Failed to save attachment '{}', {} ({}): '{}'", + fmt::format("Failed to save attachment '{}', {} ({}): {}", RawHash, NiceBytes(Payload.GetSize()), RemoteResult.GetError(), @@ -1031,7 +1029,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool, { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); ReportMessage(OptionalContext, - fmt::format("Failed to save attachment '{}', {} ({}): '{}'", + fmt::format("Failed to save attachment '{}', {} ({}): {}", RawHash, NiceBytes(Payload.GetSize()), RemoteResult.GetError(), @@ -1108,7 +1106,7 @@ UploadAttachments(WorkerThreadPool& WorkerPool, { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); ReportMessage(OptionalContext, - fmt::format("Failed to save attachments with {} chunks ({}): '{}'", + fmt::format("Failed to save attachments with {} chunks ({}): {}", Chunks.size(), RemoteResult.GetError(), RemoteResult.GetErrorReason())); @@ -1230,7 +1228,7 @@ SaveOplog(CidStore& ChunkStore, { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); ReportMessage(OptionalContext, - fmt::format("Failed to save attachment ({}): '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + fmt::format("Failed to save attachment ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); return; } ZEN_DEBUG("Saved block {}, {}", BlockHash, NiceBytes(CompressedBlock.GetCompressedSize())); @@ -1270,10 +1268,9 @@ SaveOplog(CidStore& ChunkStore, { if (BaseContainerResult.ErrorCode) { - ReportMessage(OptionalContext, - fmt::format("Failed to load oplog base container: '{}', error code: {}", - BaseContainerResult.Reason, - BaseContainerResult.ErrorCode)); + ReportMessage( + OptionalContext, + fmt::format("Failed to load oplog base container ({}): {}", BaseContainerResult.ErrorCode, BaseContainerResult.Reason)); } else { @@ -1337,7 +1334,7 @@ SaveOplog(CidStore& ChunkStore, { RemoteResult.SetError(ContainerSaveResult.ErrorCode, ContainerSaveResult.Reason, "Failed to save oplog container"); ReportMessage(OptionalContext, - fmt::format("Failed to save oplog container ({}): '{}'", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + fmt::format("Failed to save oplog container ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); } else { @@ -1372,7 +1369,7 @@ SaveOplog(CidStore& ChunkStore, { RemoteResult.SetError(ContainerFinalizeResult.ErrorCode, ContainerFinalizeResult.Reason, ContainerFinalizeResult.Text); ReportMessage(OptionalContext, - fmt::format("Failed to finalize oplog container {} ({}): '{}'", + fmt::format("Failed to finalize oplog container {} ({}): {}", ContainerSaveResult.RawHash, RemoteResult.GetError(), RemoteResult.GetErrorReason())); @@ -1635,7 +1632,7 @@ LoadOplog(CidStore& ChunkStore, if (Result.ErrorCode) { ReportMessage(OptionalContext, - fmt::format("Failed to load attachments with {} chunks ({}): '{}'", + fmt::format("Failed to load attachments with {} chunks ({}): {}", Chunks.size(), RemoteResult.GetError(), RemoteResult.GetErrorReason())); @@ -1691,7 +1688,7 @@ LoadOplog(CidStore& ChunkStore, if (BlockResult.ErrorCode) { ReportMessage(OptionalContext, - fmt::format("Failed to download block attachment {} ({}): '{}'", + fmt::format("Failed to download block attachment {} ({}): {}", BlockHash, RemoteResult.GetError(), RemoteResult.GetErrorReason())); @@ -1706,18 +1703,20 @@ LoadOplog(CidStore& ChunkStore, return; } Info.AttachmentBlocksDownloaded.fetch_add(1); - ZEN_INFO("Loaded block attachment '{}' in {}", + uint64_t BlockSize = BlockResult.Bytes.GetSize(); + ZEN_INFO("Loaded block attachment '{}' in {} ({})", BlockHash, - NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000))); + NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)), + NiceBytes(BlockSize)); if (RemoteResult.IsError()) { return; } + Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize); bool StoreChunksOK = IterateBlock(std::move(BlockResult.Bytes), [&ChunkStore, &Info](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { - uint64_t ChunkSize = Chunk.GetCompressedSize(); - Info.AttachmentBlockBytesDownloaded.fetch_add(ChunkSize); + uint64_t ChunkSize = Chunk.GetCompressedSize(); CidStore::InsertResult InsertResult = ChunkStore.AddChunk(Chunk.GetCompressed().Flatten().AsIoBuffer(), AttachmentRawHash); if (InsertResult.New) @@ -1730,7 +1729,7 @@ LoadOplog(CidStore& ChunkStore, if (!StoreChunksOK) { ReportMessage(OptionalContext, - fmt::format("Block attachment {} has invalid format ({}): '{}'", + fmt::format("Block attachment {} has invalid format ({}): {}", BlockHash, RemoteResult.GetError(), RemoteResult.GetErrorReason())); @@ -1788,20 +1787,21 @@ LoadOplog(CidStore& ChunkStore, } return; } - ZEN_INFO("Loaded large attachment '{}' in {}", + uint64_t AttachmentSize = AttachmentResult.Bytes.GetSize(); + ZEN_INFO("Loaded large attachment '{}' in {} ({})", RawHash, - NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000))); + NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)), + NiceBytes(AttachmentSize)); Info.AttachmentsDownloaded.fetch_add(1); if (RemoteResult.IsError()) { return; } - uint64_t ChunkSize = AttachmentResult.Bytes.GetSize(); - Info.AttachmentBytesDownloaded.fetch_add(ChunkSize); + Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize); CidStore::InsertResult InsertResult = ChunkStore.AddChunk(AttachmentResult.Bytes, RawHash); if (InsertResult.New) { - Info.AttachmentBytesStored.fetch_add(ChunkSize); + Info.AttachmentBytesStored.fetch_add(AttachmentSize); Info.AttachmentsStored.fetch_add(1); } }); diff --git a/src/zenserver/upstream/jupiter.cpp b/src/zenserver/upstream/jupiter.cpp index e4d45e316..bf2538908 100644 --- a/src/zenserver/upstream/jupiter.cpp +++ b/src/zenserver/upstream/jupiter.cpp @@ -16,7 +16,6 @@ #include <zenutil/basicfile.h> ZEN_THIRD_PARTY_INCLUDES_START -#include <cpr/cpr.h> #include <fmt/format.h> ZEN_THIRD_PARTY_INCLUDES_END @@ -32,277 +31,70 @@ using namespace std::literals; namespace zen { namespace detail { - struct CloudCacheSessionState + CloudCacheResult ConvertResponse(const HttpClient::Response& Response, const std::string_view ErrorPrefix = ""sv) { - CloudCacheSessionState(CloudCacheClient& Client) : m_Client(Client) {} - - const CloudCacheAccessToken& GetAccessToken(bool RefreshToken) - { - if (RefreshToken) - { - m_AccessToken = m_Client.AcquireAccessToken(); - } - - return m_AccessToken; - } - - cpr::Session& GetSession() { return m_Session; } - - void Reset(std::chrono::milliseconds ConnectTimeout, std::chrono::milliseconds Timeout, bool AssumeHttp2) + if (Response.Error) { - m_Session.SetBody({}); - m_Session.SetHeader({}); - m_Session.SetConnectTimeout(ConnectTimeout); - m_Session.SetTimeout(Timeout); - if (AssumeHttp2) - { - m_Session.SetHttpVersion(cpr::HttpVersion{cpr::HttpVersionCode::VERSION_2_0_PRIOR_KNOWLEDGE}); - } - } - - private: - friend class zen::CloudCacheClient; - - CloudCacheClient& m_Client; - CloudCacheAccessToken m_AccessToken; - cpr::Session m_Session; - }; - - CloudCacheResult ConvertResponse(const cpr::Response& Response) - { - if (Response.error) - { - return {.ElapsedSeconds = Response.elapsed, - .ErrorCode = static_cast<int32_t>(Response.error.code), - .Reason = Response.error.message, + return {.ElapsedSeconds = Response.ElapsedSeconds, + .ErrorCode = Response.Error.value().ErrorCode, + .Reason = Response.ErrorMessage(ErrorPrefix), .Success = false}; } - if (!IsHttpSuccessCode(Response.status_code)) + if (!Response.IsSuccess()) { - return {.ElapsedSeconds = Response.elapsed, - .ErrorCode = static_cast<int32_t>(Response.status_code), - .Reason = Response.reason.empty() ? Response.text : Response.reason, + return {.ElapsedSeconds = Response.ElapsedSeconds, + .ErrorCode = static_cast<int32_t>(Response.StatusCode), + .Reason = Response.ErrorMessage(ErrorPrefix), .Success = false}; } - return {.Bytes = Response.downloaded_bytes, - .ElapsedSeconds = Response.elapsed, + return {.Response = Response.ResponsePayload, + .Bytes = Response.DownloadedBytes, + .ElapsedSeconds = Response.ElapsedSeconds, .ErrorCode = 0, - .Reason = Response.reason, .Success = true}; } - - cpr::Response GetWithStreaming(cpr::Session& Session, std::filesystem::path TempFolderPath, std::string_view Name, IoBuffer& OutBuffer) - { - if (TempFolderPath.empty()) - { - return Session.Get(); - } - - std::string PayloadString; - std::shared_ptr<BasicFile> PayloadFile; - - auto _ = MakeGuard([&]() { - if (PayloadFile) - { - PayloadFile.reset(); - std::filesystem::path TempPath = TempFolderPath / Name; - std::error_code Ec; - std::filesystem::remove(TempPath, Ec); - } - }); - - uint64_t Offset = 0; - Session.SetWriteCallback(cpr::WriteCallback{[&](std::string data, intptr_t) { - if (!PayloadFile && (PayloadString.length() + data.length()) > (1024 * 1024)) - { - std::filesystem::path TempPath = TempFolderPath / Name; - PayloadFile = std::make_shared<BasicFile>(); - PayloadFile->Open(TempPath, BasicFile::Mode::kTruncateDelete); - PayloadFile->Write(PayloadString.data(), PayloadString.size(), Offset); - Offset += PayloadString.size(); - PayloadString.clear(); - } - if (PayloadFile) - { - PayloadFile->Write(data.data(), data.size(), Offset); - Offset += data.size(); - } - else - { - PayloadString.append(data); - } - return true; - }}); - - cpr::Response Response = Session.Get(); - - if (!Response.error && IsHttpSuccessCode(Response.status_code)) - { - if (PayloadFile) - { - uint64_t PayloadSize = PayloadFile->FileSize(); - void* FileHandle = PayloadFile->Detach(); - PayloadFile.reset(); - OutBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, PayloadSize, /*IsWholeFile*/ true); - OutBuffer.SetDeleteOnClose(true); - } - else - { - OutBuffer = IoBufferBuilder::MakeCloneFromMemory(PayloadString.data(), PayloadString.size()); - } - return Response; - } - - Response.text.swap(PayloadString); - return Response; - } - - static std::optional<zen::HttpContentType> TryGetContentType(const cpr::Response& Response) - { - if (auto It = Response.header.find("Content-Type"); It != Response.header.end()) - { - zen::HttpContentType ContentType = zen::ParseContentType(It->second); - if (ContentType != zen::HttpContentType::kUnknownContentType) - { - return ContentType; - } - } - return {}; - } - - static IoBuffer MakeBufferFromResponseIfKnownFormat(const cpr::Response& Response) - { - std::optional<zen::HttpContentType> ContentType = TryGetContentType(Response); - if (ContentType) - { - IoBuffer Buffer = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()); - Buffer.SetContentType(ContentType.value()); - return Buffer; - } - return {}; - } - } // namespace detail CloudCacheSession::CloudCacheSession(CloudCacheClient* CacheClient) : m_Log(CacheClient->Logger()), m_CacheClient(CacheClient) { - m_SessionState = m_CacheClient->AllocSessionState(); } CloudCacheSession::~CloudCacheSession() { - m_CacheClient->FreeSessionState(m_SessionState); } CloudCacheResult CloudCacheSession::Authenticate() { - const bool RefreshToken = true; - const CloudCacheAccessToken& AccessToken = GetAccessToken(RefreshToken); - - return {.Success = AccessToken.IsValid()}; + bool OK = m_CacheClient->m_HttpClient.Authenticate(); + return {.Success = OK}; } CloudCacheResult -CloudCacheSession::GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType) +CloudCacheSession::GetRef(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + ZenContentType RefType, + std::filesystem::path TempFolderPath) { - const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"; - - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", ContentType}}); - Session.SetOption(cpr::Body{}); + ZEN_TRACE_CPU("JupiterClient::GetRef"); - cpr::Response Response = Session.Get(); - ZEN_DEBUG("GET {}", Response); + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), + TempFolderPath, + {HttpClient::Accept(RefType)}); - CloudCacheResult Result = detail::ConvertResponse(Response); - if (Result.Success) - { - Result.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()); - } - else - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::GetRef failed GET. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-Accept: '{}', " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - ContentType, - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; + return detail::ConvertResponse(Response, "CloudCacheSession::GetRef"sv); } CloudCacheResult CloudCacheSession::GetBlob(std::string_view Namespace, const IoHash& Key) { - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << Namespace << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/octet-stream"}}); - Session.SetOption(cpr::Body{}); - - cpr::Response Response = Session.Get(); - ZEN_DEBUG("GET {}", Response); + ZEN_TRACE_CPU("JupiterClient::GetBlob"); + HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), + {HttpClient::Accept(ZenContentType::kBinary)}); - CloudCacheResult Result = detail::ConvertResponse(Response); - if (Result.Success) - { - Result.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()); - } - else - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::GetBlob failed GET. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-Accept: '{}', " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - "application/octet-stream", - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; + return detail::ConvertResponse(Response); } CloudCacheResult @@ -310,58 +102,12 @@ CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& K { ZEN_TRACE_CPU("JupiterClient::GetCompressedBlob"); - ExtendableStringBuilder<256> Uri; - std::string KeyString = Key.ToHexString(); - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << KeyString; + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), + TempFolderPath, + {HttpClient::Accept(ZenContentType::kCompressedBinary)}); - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-comp"}}); - Session.SetOption(cpr::Body{}); - - IoBuffer Payload; - cpr::Response Response = detail::GetWithStreaming(Session, TempFolderPath, KeyString, Payload); - ZEN_DEBUG("GET {}", Response); - - CloudCacheResult Result = detail::ConvertResponse(Response); - if (Result.Success) - { - Result.Response = std::move(Payload); - } - else - { - std::optional<zen::HttpContentType> ContentType = detail::TryGetContentType(Response); - if (ContentType.has_value()) - { - Result.Response = std::move(Payload); - Result.Response.SetContentType(ContentType.value()); - } - ZEN_WARN( - "CloudCacheSession::GetCompressedBlob failed GET. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-Accept: '{}', " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - "application/x-ue-comp", - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; + return detail::ConvertResponse(Response); } CloudCacheResult @@ -373,59 +119,14 @@ CloudCacheSession::GetInlineBlob(std::string_view Namespace, { ZEN_TRACE_CPU("JupiterClient::GetInlineBlob"); - ExtendableStringBuilder<256> Uri; - std::string KeyString = Key.ToHexString(); - Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << KeyString; - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-jupiter-inline"}}); - Session.SetOption(cpr::Body{}); - - IoBuffer Payload; - cpr::Response Response = detail::GetWithStreaming(Session, TempFolderPath, KeyString, Payload); - ZEN_DEBUG("GET {}", Response); + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), + TempFolderPath, + {{"Accept", "application/x-jupiter-inline"}}); CloudCacheResult Result = detail::ConvertResponse(Response); - if (Result.Success) - { - Result.Response = std::move(Payload); - } - else - { - std::optional<zen::HttpContentType> ContentType = detail::TryGetContentType(Response); - if (ContentType.has_value()) - { - Result.Response = std::move(Payload); - Result.Response.SetContentType(ContentType.value()); - } - ZEN_WARN( - "CloudCacheSession::GetInlineBlob failed GET. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-Accept: '{}', " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - "application/x-jupiter-inline", - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - if (auto It = Response.header.find("X-Jupiter-InlinePayloadHash"); It != Response.header.end()) + if (auto It = Response.Header->find("X-Jupiter-InlinePayloadHash"); It != Response.Header->end()) { const std::string& PayloadHashHeader = It->second; if (PayloadHashHeader.length() == IoHash::StringLength) @@ -442,52 +143,10 @@ CloudCacheSession::GetObject(std::string_view Namespace, const IoHash& Key) { ZEN_TRACE_CPU("JupiterClient::GetObject"); - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}}); - Session.SetOption(cpr::Body{}); - - cpr::Response Response = Session.Get(); - ZEN_DEBUG("GET {}", Response); - - CloudCacheResult Result = detail::ConvertResponse(Response); - if (Result.Success) - { - Result.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()); - } - else - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::GetObject failed GET. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-Accept: '{}', " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - "application/x-ue-cb", - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } + HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), + {HttpClient::Accept(ZenContentType::kCbObject)}); - return Result; + return detail::ConvertResponse(Response); } PutRefResult @@ -495,29 +154,18 @@ CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId, { ZEN_TRACE_CPU("JupiterClient::PutRef"); - IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size()); - - const std::string ContentType = RefType == ZenContentType::kCbObject ? "application/x-ue-cb" : "application/octet-stream"; + Ref.SetContentType(RefType); - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption( - cpr::Header{{"Authorization", AccessToken.Value}, {"X-Jupiter-IoHash", Hash.ToHexString()}, {"Content-Type", ContentType}}); - Session.SetBody(cpr::Body{(const char*)Ref.Data(), Ref.Size()}); + IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size()); - cpr::Response Response = Session.Put(); - ZEN_DEBUG("PUT {}", Response); + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Put(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), Ref); PutRefResult Result = {detail::ConvertResponse(Response)}; if (Result.Success) { std::string JsonError; - json11::Json Json = json11::Json::parse(Response.text, JsonError); + json11::Json Json = json11::Json::parse(Response.ToText(), JsonError); if (JsonError.empty()) { json11::Json::array Needs = Json["needs"].array_items(); @@ -528,37 +176,6 @@ CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId, } Result.RawHash = Hash; } - else - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::PutRef failed PUT. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-X-Jupiter-IoHash: '{}', " - "Header-ContentType: '{}', " - "ContentSize: {}, " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - Hash.ToHexString(), - ContentType, - NiceBytes(Ref.Size()), - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; } @@ -567,28 +184,16 @@ CloudCacheSession::FinalizeRef(std::string_view Namespace, std::string_view Buck { ZEN_TRACE_CPU("JupiterClient::FinalizeRef"); - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString() << "/finalize/" - << RefHash.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, - {"X-Jupiter-IoHash", RefHash.ToHexString()}, - {"Content-Type", "application/x-ue-cb"}}); - Session.SetBody(cpr::Body{}); - - cpr::Response Response = Session.Post(); - ZEN_DEBUG("POST {}", Response); + HttpClient::Response Response = m_CacheClient->m_HttpClient.Post( + fmt::format("/api/v1/refs/{}/{}/{}/finalize/{}", Namespace, BucketId, Key.ToHexString(), RefHash.ToHexString()), + {{"X-Jupiter-IoHash", RefHash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}}); FinalizeRefResult Result = {detail::ConvertResponse(Response)}; if (Result.Success) { std::string JsonError; - json11::Json Json = json11::Json::parse(Response.text, JsonError); + json11::Json Json = json11::Json::parse(std::string(Response.AsText()), JsonError); if (JsonError.empty()) { json11::Json::array Needs = Json["needs"].array_items(); @@ -598,37 +203,6 @@ CloudCacheSession::FinalizeRef(std::string_view Namespace, std::string_view Buck } } } - else - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::FinalizeRef failed PUT. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-X-Jupiter-IoHash: '{}', " - "Header-ContentType: '{}', " - "ContentSize: {}, " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - RefHash.ToHexString(), - "application/x-ue-cb", - NiceBytes(0), - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; } @@ -637,49 +211,9 @@ CloudCacheSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuff { ZEN_TRACE_CPU("JupiterClient::PutBlob"); - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << Namespace << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); + HttpClient::Response Response = m_CacheClient->m_HttpClient.Put(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), Blob); - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/octet-stream"}}); - Session.SetBody(cpr::Body{(const char*)Blob.Data(), Blob.Size()}); - - cpr::Response Response = Session.Put(); - ZEN_DEBUG("PUT {}", Response); - - CloudCacheResult Result = detail::ConvertResponse(Response); - if (!Result.Success) - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::PutBlob failed PUT. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-ContentType: '{}', " - "ContentSize: {}, " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - "application/octet-stream", - NiceBytes(Blob.Size()), - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; + return detail::ConvertResponse(Response); } CloudCacheResult @@ -687,66 +221,11 @@ CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& K { ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob"); - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-comp"}}); - - uint64_t Offset = 0; - if (Blob.IsWholeFile()) - { - auto ReadCallback = [&Blob, &Offset](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, Blob.GetSize() - Offset); - IoBuffer PayloadRange = IoBuffer(Blob, Offset, size); - MutableMemoryView Data(buffer, size); - Data.CopyFrom(PayloadRange.GetView()); - Offset += size; - return true; - }; - Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(Blob.GetSize()), ReadCallback)); - } - else - { - Session.SetBody(cpr::Body{(const char*)Blob.Data(), Blob.Size()}); - } - - cpr::Response Response = Session.Put(); - ZEN_DEBUG("PUT {}", Response); + Blob.SetContentType(ZenContentType::kCompressedBinary); + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), Blob); - CloudCacheResult Result = detail::ConvertResponse(Response); - if (!Result.Success) - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::PutCompressedBlob failed PUT. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-ContentType: '{}', " - "ContentSize: {}, " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - "application/x-ue-comp", - NiceBytes(Blob.Size()), - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; + return detail::ConvertResponse(Response); } CloudCacheResult @@ -754,58 +233,12 @@ CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& K { ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob"); - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-comp"}}); - uint64_t SizeLeft = Payload.GetSize(); - CompositeBuffer::Iterator BufferIt = Payload.GetIterator(0); - auto ReadCallback = [&Payload, &BufferIt, &SizeLeft](char* buffer, size_t& size, intptr_t) { - size = Min<size_t>(size, SizeLeft); - MutableMemoryView Data(buffer, size); - Payload.CopyTo(Data, BufferIt); - SizeLeft -= size; - return true; - }; - Session.SetReadCallback(cpr::ReadCallback(gsl::narrow<cpr::cpr_off_t>(SizeLeft), ReadCallback)); - - cpr::Response Response = Session.Put(); - ZEN_DEBUG("PUT {}", Response); + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), + Payload, + ZenContentType::kCompressedBinary); - CloudCacheResult Result = detail::ConvertResponse(Response); - if (!Result.Success) - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::PutCompressedBlob failed PUT. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-ContentType: '{}', " - "ContentSize: {}, " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - "application/x-ue-comp", - NiceBytes(Payload.GetSize()), - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; + return detail::ConvertResponse(Response); } CloudCacheResult @@ -813,49 +246,11 @@ CloudCacheSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBu { ZEN_TRACE_CPU("JupiterClient::PutObject"); - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); + Object.SetContentType(ZenContentType::kCbObject); + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), Object); - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}}); - Session.SetBody(cpr::Body{(const char*)Object.Data(), Object.Size()}); - - cpr::Response Response = Session.Put(); - ZEN_DEBUG("PUT {}", Response); - - CloudCacheResult Result = detail::ConvertResponse(Response); - if (!Result.Success) - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::PutObject failed PUT. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-ContentType: '{}', " - "ContentSize: {}, " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - "application/x-ue-cb", - NiceBytes(Object.GetSize()), - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; + return detail::ConvertResponse(Response); } CloudCacheResult @@ -863,45 +258,10 @@ CloudCacheSession::RefExists(std::string_view Namespace, std::string_view Bucket { ZEN_TRACE_CPU("JupiterClient::RefExists"); - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString())); - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}}); - Session.SetOption(cpr::Body{}); - - cpr::Response Response = Session.Head(); - ZEN_DEBUG("HEAD {}", Response); - - CloudCacheResult Result = detail::ConvertResponse(Response); - if (!Result.Success) - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::RefExists failed PUT. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; + return detail::ConvertResponse(Response); } GetObjectReferencesResult @@ -909,57 +269,20 @@ CloudCacheSession::GetObjectReferences(std::string_view Namespace, const IoHash& { ZEN_TRACE_CPU("JupiterClient::GetObjectReferences"); - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString() << "/references"; - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}}); - Session.SetOption(cpr::Body{}); - - cpr::Response Response = Session.Get(); - ZEN_DEBUG("GET {}", Response); + HttpClient::Response Response = + m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/references", Namespace, Key.ToHexString()), + {HttpClient::Accept(ZenContentType::kCbObject)}); GetObjectReferencesResult Result = {detail::ConvertResponse(Response)}; if (Result.Success) { - IoBuffer Buffer = IoBuffer(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size()); - const CbObject ReferencesResponse = LoadCompactBinaryObject(Buffer); + const CbObject ReferencesResponse = Response.AsObject(); for (auto& Item : ReferencesResponse["references"sv]) { Result.References.insert(Item.AsHash()); } } - else - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::GetObjectReferences failed PUT. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-Accept: '{}', " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - "application/x-ue-cb", - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; } @@ -1002,73 +325,23 @@ CloudCacheSession::ObjectExists(std::string_view Namespace, const std::set<IoHas std::vector<IoHash> CloudCacheSession::Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes) { - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl(); - Uri << "/api/v1/s/" << Namespace; + // ExtendableStringBuilder<256> Uri; + // Uri << m_CacheClient->ServiceUrl(); + // Uri << "/api/v1/s/" << Namespace; - ZEN_UNUSED(BucketId, ChunkHashes); + ZEN_UNUSED(Namespace, BucketId, ChunkHashes); return {}; } -cpr::Session& -CloudCacheSession::GetSession() -{ - return m_SessionState->GetSession(); -} - -CloudCacheAccessToken -CloudCacheSession::GetAccessToken(bool RefreshToken) -{ - return m_SessionState->GetAccessToken(RefreshToken); -} - CloudCacheResult CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key) { ZEN_TRACE_CPU("JupiterClient::CacheTypeExists"); - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << Namespace << "/" << Key.ToHexString(); - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}}); - Session.SetOption(cpr::Body{}); - - cpr::Response Response = Session.Head(); - ZEN_DEBUG("HEAD {}", Response); + HttpClient::Response Response = m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/{}/{}/{}", TypeId, Namespace, Key.ToHexString())); - CloudCacheResult Result = detail::ConvertResponse(Response); - if (!Result.Success) - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::CacheTypeExists failed GET. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-Accept: '{}', " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - "application/x-ue-cb", - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; + return detail::ConvertResponse(Response); } CloudCacheExistsResult @@ -1083,58 +356,23 @@ CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view Body << (Body.Size() != 1 ? ",\"" : "\"") << Key.ToHexString() << "\""; } Body << "]"; + IoBuffer Payload = IoBuffer(IoBuffer::Wrap, Body.Data(), Body.Size()); + Payload.SetContentType(ZenContentType::kJSON); - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << Namespace << "/exist"; - - cpr::Session& Session = GetSession(); - const CloudCacheAccessToken& AccessToken = GetAccessToken(); + HttpClient::Response Response = m_CacheClient->m_HttpClient.Post(fmt::format("/api/v1/{}/{}/exist", TypeId, Namespace), + Payload, + {HttpClient::Accept(ZenContentType::kCbObject)}); - Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption( - cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}, {"Content-Type", "application/json"}}); - Session.SetOption(cpr::Body(Body.ToString())); - - cpr::Response Response = Session.Post(); - ZEN_DEBUG("POST {}", Response); CloudCacheExistsResult Result = {detail::ConvertResponse(Response)}; if (Result.Success) { - IoBuffer Buffer = IoBuffer(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size()); - const CbObject ExistsResponse = LoadCompactBinaryObject(Buffer); + const CbObject ExistsResponse = Response.AsObject(); for (auto& Item : ExistsResponse["needs"sv]) { Result.Needs.insert(Item.AsHash()); } } - else - { - Result.Response = detail::MakeBufferFromResponseIfKnownFormat(Response); - ZEN_WARN( - "CloudCacheSession::CacheTypeExists failed GET. " - "Elapsed: {} s, " - "Uri: '{}', " - "Header-Authorization: '{} <redacted>', " - "Header-Accept: '{}', " - "Response.status_code: {}, " - "Response.reason: '{}', " - "Response.error.code: {}, " - "Response.error.message: '{}', " - "Response.raw_header: '{}'" - "Response.text: '{}'", - Response.elapsed, - Uri, - AccessToken.Value.substr(0, 6), - "application/x-ue-cb", - Response.status_code, - Response.reason, - gsl::narrow<int>(Response.error.code), - Response.error.message, - Response.raw_header, - Response.text); - } - return Result; } @@ -1233,77 +471,39 @@ CloudCacheTokenProvider::CreateFromCallback(std::function<CloudCacheAccessToken( return std::make_unique<CallbackTokenProvider>(std::move(Callback)); } +static std::optional<std::function<HttpClientAccessToken()>> +GetHttpClientAccessProvider(CloudCacheTokenProvider* TokenProvider) +{ + if (TokenProvider == nullptr) + { + return {}; + } + auto ProviderFunc = [TokenProvider]() -> HttpClientAccessToken { + CloudCacheAccessToken Token = TokenProvider->AcquireAccessToken(); + return HttpClientAccessToken{.Value = Token.Value, .ExpireTime = Token.ExpireTime}; + }; + return ProviderFunc; +} + CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options, std::unique_ptr<CloudCacheTokenProvider> TokenProvider) : m_Log(zen::logging::Get("jupiter")) -, m_ServiceUrl(Options.ServiceUrl) , m_DefaultDdcNamespace(Options.DdcNamespace) , m_DefaultBlobStoreNamespace(Options.BlobStoreNamespace) , m_ComputeCluster(Options.ComputeCluster) -, m_ConnectTimeout(Options.ConnectTimeout) -, m_Timeout(Options.Timeout) , m_TokenProvider(std::move(TokenProvider)) -, m_AssumeHttp2(Options.AssumeHttp2) +, m_HttpClient(Options.ServiceUrl, + HttpClientSettings{.ConnectTimeout = Options.ConnectTimeout, + .Timeout = Options.Timeout, + .AccessTokenProvider = GetHttpClientAccessProvider(m_TokenProvider.get()), + .AssumeHttp2 = Options.AssumeHttp2, + .AllowResume = Options.AllowResume, + .RetryCount = Options.RetryCount}) { ZEN_ASSERT(m_TokenProvider.get() != nullptr); } CloudCacheClient::~CloudCacheClient() { - RwLock::ExclusiveLockScope _(m_SessionStateLock); - - for (auto State : m_SessionStateCache) - { - delete State; - } -} - -CloudCacheAccessToken -CloudCacheClient::AcquireAccessToken() -{ - ZEN_TRACE_CPU("JupiterClient::AcquireAccessToken"); - - return m_TokenProvider->AcquireAccessToken(); -} - -detail::CloudCacheSessionState* -CloudCacheClient::AllocSessionState() -{ - detail::CloudCacheSessionState* State = nullptr; - - bool IsTokenValid = false; - - { - RwLock::ExclusiveLockScope _(m_SessionStateLock); - - if (m_SessionStateCache.empty() == false) - { - State = m_SessionStateCache.front(); - IsTokenValid = State->m_AccessToken.IsValid(); - - m_SessionStateCache.pop_front(); - } - } - - if (State == nullptr) - { - State = new detail::CloudCacheSessionState(*this); - } - - State->Reset(m_ConnectTimeout, m_Timeout, m_AssumeHttp2); - - if (IsTokenValid == false) - { - State->m_AccessToken = m_TokenProvider->AcquireAccessToken(); - } - - return State; -} - -void -CloudCacheClient::FreeSessionState(detail::CloudCacheSessionState* State) -{ - RwLock::ExclusiveLockScope _(m_SessionStateLock); - m_SessionStateCache.push_front(State); } } // namespace zen diff --git a/src/zenserver/upstream/jupiter.h b/src/zenserver/upstream/jupiter.h index b5aa95ed5..93f2cc883 100644 --- a/src/zenserver/upstream/jupiter.h +++ b/src/zenserver/upstream/jupiter.h @@ -6,6 +6,7 @@ #include <zencore/iohash.h> #include <zencore/logging.h> #include <zencore/thread.h> +#include <zenhttp/httpclient.h> #include <zenhttp/httpserver.h> #include <atomic> @@ -22,9 +23,6 @@ class Session; } namespace zen { -namespace detail { - struct CloudCacheSessionState; -} class CbObjectView; class CloudCacheClient; @@ -96,7 +94,11 @@ public: ~CloudCacheSession(); CloudCacheResult Authenticate(); - CloudCacheResult GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType); + CloudCacheResult GetRef(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + ZenContentType RefType, + std::filesystem::path TempFolderPath = {}); CloudCacheResult GetBlob(std::string_view Namespace, const IoHash& Key); CloudCacheResult GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath = {}); CloudCacheResult GetObject(std::string_view Namespace, const IoHash& Key); @@ -131,17 +133,14 @@ public: CloudCacheClient& Client() { return *m_CacheClient; }; private: - inline LoggerRef Log() { return m_Log; } - cpr::Session& GetSession(); - CloudCacheAccessToken GetAccessToken(bool RefreshToken = false); + inline LoggerRef Log() { return m_Log; } CloudCacheResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key); CloudCacheExistsResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys); - LoggerRef m_Log; - RefPtr<CloudCacheClient> m_CacheClient; - detail::CloudCacheSessionState* m_SessionState; + LoggerRef m_Log; + RefPtr<CloudCacheClient> m_CacheClient; }; /** @@ -178,6 +177,8 @@ struct CloudCacheClientOptions std::chrono::milliseconds ConnectTimeout{5000}; std::chrono::milliseconds Timeout{}; bool AssumeHttp2 = false; + bool AllowResume = false; + uint8_t RetryCount = 0; }; /** @@ -189,30 +190,20 @@ public: CloudCacheClient(const CloudCacheClientOptions& Options, std::unique_ptr<CloudCacheTokenProvider> TokenProvider); ~CloudCacheClient(); - CloudCacheAccessToken AcquireAccessToken(); - std::string_view DefaultDdcNamespace() const { return m_DefaultDdcNamespace; } - std::string_view DefaultBlobStoreNamespace() const { return m_DefaultBlobStoreNamespace; } - std::string_view ComputeCluster() const { return m_ComputeCluster; } - std::string_view ServiceUrl() const { return m_ServiceUrl; } + std::string_view DefaultDdcNamespace() const { return m_DefaultDdcNamespace; } + std::string_view DefaultBlobStoreNamespace() const { return m_DefaultBlobStoreNamespace; } + std::string_view ComputeCluster() const { return m_ComputeCluster; } + std::string_view ServiceUrl() const { return m_HttpClient.GetBaseUri(); } LoggerRef Logger() { return m_Log; } private: - LoggerRef m_Log; - std::string m_ServiceUrl; - std::string m_DefaultDdcNamespace; - std::string m_DefaultBlobStoreNamespace; - std::string m_ComputeCluster; - std::chrono::milliseconds m_ConnectTimeout{}; - std::chrono::milliseconds m_Timeout{}; - std::unique_ptr<CloudCacheTokenProvider> m_TokenProvider; - bool m_AssumeHttp2; - - RwLock m_SessionStateLock; - std::list<detail::CloudCacheSessionState*> m_SessionStateCache; - - detail::CloudCacheSessionState* AllocSessionState(); - void FreeSessionState(detail::CloudCacheSessionState*); + LoggerRef m_Log; + const std::string m_DefaultDdcNamespace; + const std::string m_DefaultBlobStoreNamespace; + const std::string m_ComputeCluster; + const std::unique_ptr<CloudCacheTokenProvider> m_TokenProvider; + HttpClient m_HttpClient; friend class CloudCacheSession; }; diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index 73a8ad538..5bcb7f5b4 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -1158,88 +1158,73 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState, } ZEN_ASSERT(OldBlockFile); - ZEN_INFO("Moving {} chunks from '{}' to new block", KeepChunkIndexes.size(), GetBlockPath(m_BlocksBasePath, BlockIndex)); - uint64_t OldBlockSize = OldBlockFile->FileSize(); - std::vector<uint8_t> Chunk; - for (const size_t& ChunkIndex : KeepChunkIndexes) + if (KeepChunkIndexes.empty()) + { + ZEN_INFO("Dropping all chunks from '{}'", GetBlockPath(m_BlocksBasePath, BlockIndex)); + } + else { - const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex]; - if (ChunkLocation.Offset + ChunkLocation.Size > OldBlockSize) + std::vector<uint8_t> Chunk; + for (const size_t& ChunkIndex : KeepChunkIndexes) { - ZEN_WARN( - "Compact Block skipping chunk outside of block range in '{}', Chunk start {}, Chunk size {} in Block {}, Block " - "size {}", - m_BlocksBasePath, - ChunkLocation.Offset, - ChunkLocation.Size, - OldBlockFile->GetPath(), - OldBlockSize); - continue; - } + const BlockStoreLocation ChunkLocation = ChunkLocations[ChunkIndex]; + if (ChunkLocation.Offset + ChunkLocation.Size > OldBlockSize) + { + ZEN_WARN( + "Compact Block skipping chunk outside of block range in '{}', Chunk start {}, Chunk size {} in Block {}, Block " + "size {}", + m_BlocksBasePath, + ChunkLocation.Offset, + ChunkLocation.Size, + OldBlockFile->GetPath(), + OldBlockSize); + continue; + } - Chunk.resize(ChunkLocation.Size); - OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset); + Chunk.resize(ChunkLocation.Size); + OldBlockFile->Read(Chunk.data(), Chunk.size(), ChunkLocation.Offset); - if ((WriteOffset + Chunk.size()) > m_MaxBlockSize) - { - if (NewBlockFile) + if ((WriteOffset + Chunk.size()) > m_MaxBlockSize) { - ZEN_ASSERT_SLOW(NewBlockFile->IsOpen()); - NewBlockFile->Flush(); - MovedSize += NewBlockFile->FileSize(); - NewBlockFile = nullptr; + if (NewBlockFile) + { + ZEN_ASSERT_SLOW(NewBlockFile->IsOpen()); + NewBlockFile->Flush(); + MovedSize += NewBlockFile->FileSize(); + NewBlockFile = nullptr; - ZEN_ASSERT(!MovedChunks.empty() || RemovedSize > 0); // We should not have a new block if we haven't moved anything + ZEN_ASSERT(!MovedChunks.empty() || RemovedSize > 0); // We should not have a new block if we haven't moved anything - if (!ReportChanges()) - { - return false; + if (!ReportChanges()) + { + return false; + } } - } - uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed); - { - RwLock::ExclusiveLockScope InsertLock(m_InsertLock); - std::filesystem::path NewBlockPath; - NextBlockIndex = GetFreeBlockIndex(NextBlockIndex, InsertLock, NewBlockPath); - if (NextBlockIndex == (uint32_t)m_MaxBlockCount) + uint32_t NextBlockIndex = m_WriteBlockIndex.load(std::memory_order_relaxed); { - ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded", - m_BlocksBasePath, - static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1); - return false; - } - - NewBlockFile = new BlockStoreFile(NewBlockPath); - m_ChunkBlocks[NextBlockIndex] = NewBlockFile; - } + RwLock::ExclusiveLockScope InsertLock(m_InsertLock); + std::filesystem::path NewBlockPath; + NextBlockIndex = GetFreeBlockIndex(NextBlockIndex, InsertLock, NewBlockPath); + if (NextBlockIndex == (uint32_t)m_MaxBlockCount) + { + ZEN_ERROR("unable to allocate a new block in '{}', count limit {} exeeded", + m_BlocksBasePath, + static_cast<uint64_t>(std::numeric_limits<uint32_t>::max()) + 1); + return false; + } - std::error_code Error; - DiskSpace Space = DiskSpaceInfo(m_BlocksBasePath, Error); - if (Error) - { - ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BlocksBasePath, Error.message()); - { - RwLock::ExclusiveLockScope _l(m_InsertLock); - ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile); - m_ChunkBlocks.erase(NextBlockIndex); + NewBlockFile = new BlockStoreFile(NewBlockPath); + m_ChunkBlocks[NextBlockIndex] = NewBlockFile; } - ZEN_ASSERT_SLOW(!NewBlockFile->IsOpen()); - NewBlockFile = nullptr; - return false; - } - if (Space.Free < m_MaxBlockSize) - { - uint64_t ReclaimedSpace = DiskReserveCallback(); - if (Space.Free + ReclaimedSpace < m_MaxBlockSize) + std::error_code Error; + DiskSpace Space = DiskSpaceInfo(m_BlocksBasePath, Error); + if (Error) { - ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}", - m_BlocksBasePath, - m_MaxBlockSize, - NiceBytes(Space.Free + ReclaimedSpace)); + ZEN_ERROR("get disk space in '{}' FAILED, reason: '{}'", m_BlocksBasePath, Error.message()); { RwLock::ExclusiveLockScope _l(m_InsertLock); ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile); @@ -1250,23 +1235,42 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState, return false; } - ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}", - m_BlocksBasePath, - ReclaimedSpace, - NiceBytes(Space.Free + ReclaimedSpace)); + if (Space.Free < m_MaxBlockSize) + { + uint64_t ReclaimedSpace = DiskReserveCallback(); + if (Space.Free + ReclaimedSpace < m_MaxBlockSize) + { + ZEN_WARN("garbage collect for '{}' FAILED, required disk space {}, free {}", + m_BlocksBasePath, + m_MaxBlockSize, + NiceBytes(Space.Free + ReclaimedSpace)); + { + RwLock::ExclusiveLockScope _l(m_InsertLock); + ZEN_ASSERT(m_ChunkBlocks[NextBlockIndex] == NewBlockFile); + m_ChunkBlocks.erase(NextBlockIndex); + } + ZEN_ASSERT_SLOW(!NewBlockFile->IsOpen()); + NewBlockFile = nullptr; + return false; + } + + ZEN_INFO("using gc reserve for '{}', reclaimed {}, disk free {}", + m_BlocksBasePath, + ReclaimedSpace, + NiceBytes(Space.Free + ReclaimedSpace)); + } + NewBlockFile->Create(m_MaxBlockSize); + NewBlockIndex = NextBlockIndex; + WriteOffset = 0; } - NewBlockFile->Create(m_MaxBlockSize); - NewBlockIndex = NextBlockIndex; - WriteOffset = 0; - } - NewBlockFile->Write(Chunk.data(), ChunkLocation.Size, WriteOffset); - MovedChunks.push_back( - {ChunkIndex, {.BlockIndex = NewBlockIndex, .Offset = gsl::narrow<uint32_t>(WriteOffset), .Size = ChunkLocation.Size}}); - WriteOffset = RoundUp(WriteOffset + ChunkLocation.Size, PayloadAlignment); - AddedSize += Chunk.size(); + NewBlockFile->Write(Chunk.data(), ChunkLocation.Size, WriteOffset); + MovedChunks.push_back( + {ChunkIndex, {.BlockIndex = NewBlockIndex, .Offset = gsl::narrow<uint32_t>(WriteOffset), .Size = ChunkLocation.Size}}); + WriteOffset = RoundUp(WriteOffset + ChunkLocation.Size, PayloadAlignment); + AddedSize += Chunk.size(); + } } - Chunk.clear(); if (!ReportChanges()) { diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index f18509758..7839c7132 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -1436,6 +1436,8 @@ public: NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); + ZEN_INFO("GCV2: filecas [COMPACT] '{}': Removing {} files", m_FileCasStrategy.m_RootDirectory, m_ReferencesToClean.size()); + size_t Skipped = 0; for (const IoHash& ChunkHash : m_ReferencesToClean) { |