diff options
| author | Dan Engelbrecht <[email protected]> | 2023-09-05 18:53:44 -0400 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-09-06 00:53:44 +0200 |
| commit | 832a1b464633ec7a31a8aad386520e1990d0b6cb (patch) | |
| tree | a07ba97f28fbe90e5aac8ea5d086f687e7aa38bd /src/zenserver/upstream/jupiter.cpp | |
| parent | retry file create (#383) (diff) | |
| download | zen-832a1b464633ec7a31a8aad386520e1990d0b6cb.tar.xz zen-832a1b464633ec7a31a8aad386520e1990d0b6cb.zip | |
stream oplog attachments from jupiter (#384)
* stream large downloads from jupiter to temporary file
* rework DeleteOnClose - top level marks file for delete and if lower level parts wants to keep it it clears that flag
* changelog
* log number of attachments to download
* add delay on jupiter request failure when retrying
* make sure we upload all attachments even if Needs are empty when ForceUpload is true
release TempAttachment as soon as it is used
* sort attachments so we get predictable blocks for the same oplog
Diffstat (limited to 'src/zenserver/upstream/jupiter.cpp')
| -rw-r--r-- | src/zenserver/upstream/jupiter.cpp | 164 |
1 files changed, 120 insertions, 44 deletions
diff --git a/src/zenserver/upstream/jupiter.cpp b/src/zenserver/upstream/jupiter.cpp index 2137523b2..61d8a85cc 100644 --- a/src/zenserver/upstream/jupiter.cpp +++ b/src/zenserver/upstream/jupiter.cpp @@ -6,12 +6,14 @@ #include <zencore/compactbinary.h> #include <zencore/compositebuffer.h> +#include <zencore/fmtutils.h> #include <zencore/iobuffer.h> #include <zencore/iohash.h> -#include <zencore/string.h> +#include <zencore/scopeguard.h> #include <zencore/thread.h> #include <zencore/trace.h> #include <zenhttp/formatters.h> +#include <zenutil/basicfile.h> ZEN_THIRD_PARTY_INCLUDES_START #include <cpr/cpr.h> @@ -79,7 +81,7 @@ namespace detail { { return {.ElapsedSeconds = Response.elapsed, .ErrorCode = static_cast<int32_t>(Response.status_code), - .Reason = Response.reason, + .Reason = Response.reason.empty() ? Response.text : Response.reason, .Success = false}; } return {.Bytes = Response.downloaded_bytes, @@ -89,6 +91,72 @@ namespace detail { .Success = true}; } + cpr::Response GetWithStreaming(cpr::Session& Session, std::filesystem::path TempFolderPath, std::string_view Name, IoBuffer& OutBuffer) + { + if (TempFolderPath.empty()) + { + return Session.Get(); + } + + std::string PayloadString; + std::shared_ptr<BasicFile> PayloadFile; + + auto _ = MakeGuard([&]() { + if (PayloadFile) + { + PayloadFile.reset(); + std::filesystem::path TempPath = TempFolderPath / Name; + std::error_code Ec; + std::filesystem::remove(TempPath, Ec); + } + }); + + uint64_t Offset = 0; + Session.SetWriteCallback(cpr::WriteCallback{[&](std::string data, intptr_t) { + if (!PayloadFile && (PayloadString.length() + data.length()) > (1024 * 1024)) + { + std::filesystem::path TempPath = TempFolderPath / Name; + PayloadFile = std::make_shared<BasicFile>(); + PayloadFile->Open(TempPath, BasicFile::Mode::kTruncateDelete); + PayloadFile->Write(PayloadString.data(), PayloadString.size(), Offset); + Offset += PayloadString.size(); + PayloadString.clear(); + } + if (PayloadFile) + { + PayloadFile->Write(data.data(), data.size(), Offset); + Offset += data.size(); + } + else + { + PayloadString.append(data); + } + return true; + }}); + + cpr::Response Response = Session.Get(); + + if (!Response.error && IsHttpSuccessCode(Response.status_code)) + { + if (PayloadFile) + { + uint64_t PayloadSize = PayloadFile->FileSize(); + void* FileHandle = PayloadFile->Detach(); + PayloadFile.reset(); + OutBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, PayloadSize, /*IsWholeFile*/ true); + OutBuffer.SetDeleteOnClose(true); + } + else + { + OutBuffer = IoBufferBuilder::MakeCloneFromMemory(PayloadString.data(), PayloadString.size()); + } + return Response; + } + + Response.text.swap(PayloadString); + return Response; + } + } // namespace detail CloudCacheSession::CloudCacheSession(CloudCacheClient* CacheClient) : m_Log(CacheClient->Logger()), m_CacheClient(CacheClient) @@ -145,8 +213,8 @@ CloudCacheSession::GetRef(std::string_view Namespace, std::string_view BucketId, "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " - "Resonse.raw_header: '{}'" - "Resonse.text: '{}'", + "Response.raw_header: '{}'" + "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), @@ -194,8 +262,8 @@ CloudCacheSession::GetBlob(std::string_view Namespace, const IoHash& Key) "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " - "Resonse.raw_header: '{}'" - "Resonse.text: '{}'", + "Response.raw_header: '{}'" + "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), @@ -211,12 +279,13 @@ CloudCacheSession::GetBlob(std::string_view Namespace, const IoHash& Key) } CloudCacheResult -CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& Key) +CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath) { ZEN_TRACE_CPU("HordeClient::GetCompressedBlob"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString(); + std::string KeyString = Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << KeyString; cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -225,13 +294,14 @@ CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& K Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-comp"}}); Session.SetOption(cpr::Body{}); - cpr::Response Response = Session.Get(); + IoBuffer Payload; + cpr::Response Response = detail::GetWithStreaming(Session, TempFolderPath, KeyString, Payload); ZEN_DEBUG("GET {}", Response); CloudCacheResult Result = detail::ConvertResponse(Response); if (Result.Success) { - Result.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()); + Result.Response = std::move(Payload); } else { @@ -245,8 +315,8 @@ CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& K "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " - "Resonse.raw_header: '{}'" - "Resonse.text: '{}'", + "Response.raw_header: '{}'" + "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), @@ -262,12 +332,17 @@ CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& K } CloudCacheResult -CloudCacheSession::GetInlineBlob(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoHash& OutPayloadHash) +CloudCacheSession::GetInlineBlob(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + IoHash& OutPayloadHash, + std::filesystem::path TempFolderPath) { ZEN_TRACE_CPU("HordeClient::GetInlineBlob"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString(); + std::string KeyString = Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << KeyString; cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -276,13 +351,14 @@ CloudCacheSession::GetInlineBlob(std::string_view Namespace, std::string_view Bu Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-jupiter-inline"}}); Session.SetOption(cpr::Body{}); - cpr::Response Response = Session.Get(); + IoBuffer Payload; + cpr::Response Response = detail::GetWithStreaming(Session, TempFolderPath, KeyString, Payload); ZEN_DEBUG("GET {}", Response); CloudCacheResult Result = detail::ConvertResponse(Response); if (Result.Success) { - Result.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()); + Result.Response = std::move(Payload); } else { @@ -296,8 +372,8 @@ CloudCacheSession::GetInlineBlob(std::string_view Namespace, std::string_view Bu "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " - "Resonse.raw_header: '{}'" - "Resonse.text: '{}'", + "Response.raw_header: '{}'" + "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), @@ -357,8 +433,8 @@ CloudCacheSession::GetObject(std::string_view Namespace, const IoHash& Key) "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " - "Resonse.raw_header: '{}'" - "Resonse.text: '{}'", + "Response.raw_header: '{}'" + "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), @@ -426,8 +502,8 @@ CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId, "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " - "Resonse.raw_header: '{}'" - "Resonse.text: '{}'", + "Response.raw_header: '{}'" + "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), @@ -495,8 +571,8 @@ CloudCacheSession::FinalizeRef(std::string_view Namespace, std::string_view Buck "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " - "Resonse.raw_header: '{}'" - "Resonse.text: '{}'", + "Response.raw_header: '{}'" + "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), @@ -546,8 +622,8 @@ CloudCacheSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuff "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " - "Resonse.raw_header: '{}'" - "Resonse.text: '{}'", + "Response.raw_header: '{}'" + "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), @@ -612,8 +688,8 @@ CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& K "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " - "Resonse.raw_header: '{}'" - "Resonse.text: '{}'", + "Response.raw_header: '{}'" + "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), @@ -670,8 +746,8 @@ CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& K "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " - "Resonse.raw_header: '{}'" - "Resonse.text: '{}'", + "Response.raw_header: '{}'" + "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), @@ -719,8 +795,8 @@ CloudCacheSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBu "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " - "Resonse.raw_header: '{}'" - "Resonse.text: '{}'", + "Response.raw_header: '{}'" + "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), @@ -766,8 +842,8 @@ CloudCacheSession::RefExists(std::string_view Namespace, std::string_view Bucket "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " - "Resonse.raw_header: '{}'" - "Resonse.text: '{}'", + "Response.raw_header: '{}'" + "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), @@ -822,8 +898,8 @@ CloudCacheSession::GetObjectReferences(std::string_view Namespace, const IoHash& "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " - "Resonse.raw_header: '{}'" - "Resonse.text: '{}'", + "Response.raw_header: '{}'" + "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), @@ -906,8 +982,8 @@ CloudCacheSession::PostComputeTasks(IoBuffer TasksData) "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " - "Resonse.raw_header: '{}'" - "Resonse.text: '{}'", + "Response.raw_header: '{}'" + "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), @@ -959,8 +1035,8 @@ CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " - "Resonse.raw_header: '{}'" - "Resonse.text: '{}'", + "Response.raw_header: '{}'" + "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), @@ -1031,8 +1107,8 @@ CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " - "Resonse.raw_header: '{}'" - "Resonse.text: '{}'", + "Response.raw_header: '{}'" + "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), @@ -1096,8 +1172,8 @@ CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view "Response.reason: '{}', " "Response.error.code: {}, " "Response.error.message: '{}', " - "Resonse.raw_header: '{}'" - "Resonse.text: '{}'", + "Response.raw_header: '{}'" + "Response.text: '{}'", Response.elapsed, Uri, AccessToken.Value.substr(0, 6), |