aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/upstream/jupiter.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-09-05 18:53:44 -0400
committerGitHub <[email protected]>2023-09-06 00:53:44 +0200
commit832a1b464633ec7a31a8aad386520e1990d0b6cb (patch)
treea07ba97f28fbe90e5aac8ea5d086f687e7aa38bd /src/zenserver/upstream/jupiter.cpp
parentretry file create (#383) (diff)
downloadzen-832a1b464633ec7a31a8aad386520e1990d0b6cb.tar.xz
zen-832a1b464633ec7a31a8aad386520e1990d0b6cb.zip
stream oplog attachments from jupiter (#384)
* stream large downloads from jupiter to temporary file * rework DeleteOnClose - top level marks file for delete and if lower level parts wants to keep it it clears that flag * changelog * log number of attachments to download * add delay on jupiter request failure when retrying * make sure we upload all attachments even if Needs are empty when ForceUpload is true release TempAttachment as soon as it is used * sort attachments so we get predictable blocks for the same oplog
Diffstat (limited to 'src/zenserver/upstream/jupiter.cpp')
-rw-r--r--src/zenserver/upstream/jupiter.cpp164
1 files changed, 120 insertions, 44 deletions
diff --git a/src/zenserver/upstream/jupiter.cpp b/src/zenserver/upstream/jupiter.cpp
index 2137523b2..61d8a85cc 100644
--- a/src/zenserver/upstream/jupiter.cpp
+++ b/src/zenserver/upstream/jupiter.cpp
@@ -6,12 +6,14 @@
#include <zencore/compactbinary.h>
#include <zencore/compositebuffer.h>
+#include <zencore/fmtutils.h>
#include <zencore/iobuffer.h>
#include <zencore/iohash.h>
-#include <zencore/string.h>
+#include <zencore/scopeguard.h>
#include <zencore/thread.h>
#include <zencore/trace.h>
#include <zenhttp/formatters.h>
+#include <zenutil/basicfile.h>
ZEN_THIRD_PARTY_INCLUDES_START
#include <cpr/cpr.h>
@@ -79,7 +81,7 @@ namespace detail {
{
return {.ElapsedSeconds = Response.elapsed,
.ErrorCode = static_cast<int32_t>(Response.status_code),
- .Reason = Response.reason,
+ .Reason = Response.reason.empty() ? Response.text : Response.reason,
.Success = false};
}
return {.Bytes = Response.downloaded_bytes,
@@ -89,6 +91,72 @@ namespace detail {
.Success = true};
}
+ cpr::Response GetWithStreaming(cpr::Session& Session, std::filesystem::path TempFolderPath, std::string_view Name, IoBuffer& OutBuffer)
+ {
+ if (TempFolderPath.empty())
+ {
+ return Session.Get();
+ }
+
+ std::string PayloadString;
+ std::shared_ptr<BasicFile> PayloadFile;
+
+ auto _ = MakeGuard([&]() {
+ if (PayloadFile)
+ {
+ PayloadFile.reset();
+ std::filesystem::path TempPath = TempFolderPath / Name;
+ std::error_code Ec;
+ std::filesystem::remove(TempPath, Ec);
+ }
+ });
+
+ uint64_t Offset = 0;
+ Session.SetWriteCallback(cpr::WriteCallback{[&](std::string data, intptr_t) {
+ if (!PayloadFile && (PayloadString.length() + data.length()) > (1024 * 1024))
+ {
+ std::filesystem::path TempPath = TempFolderPath / Name;
+ PayloadFile = std::make_shared<BasicFile>();
+ PayloadFile->Open(TempPath, BasicFile::Mode::kTruncateDelete);
+ PayloadFile->Write(PayloadString.data(), PayloadString.size(), Offset);
+ Offset += PayloadString.size();
+ PayloadString.clear();
+ }
+ if (PayloadFile)
+ {
+ PayloadFile->Write(data.data(), data.size(), Offset);
+ Offset += data.size();
+ }
+ else
+ {
+ PayloadString.append(data);
+ }
+ return true;
+ }});
+
+ cpr::Response Response = Session.Get();
+
+ if (!Response.error && IsHttpSuccessCode(Response.status_code))
+ {
+ if (PayloadFile)
+ {
+ uint64_t PayloadSize = PayloadFile->FileSize();
+ void* FileHandle = PayloadFile->Detach();
+ PayloadFile.reset();
+ OutBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, PayloadSize, /*IsWholeFile*/ true);
+ OutBuffer.SetDeleteOnClose(true);
+ }
+ else
+ {
+ OutBuffer = IoBufferBuilder::MakeCloneFromMemory(PayloadString.data(), PayloadString.size());
+ }
+ return Response;
+ }
+
+ Response.text.swap(PayloadString);
+ return Response;
+ }
+
} // namespace detail
CloudCacheSession::CloudCacheSession(CloudCacheClient* CacheClient) : m_Log(CacheClient->Logger()), m_CacheClient(CacheClient)
@@ -145,8 +213,8 @@ CloudCacheSession::GetRef(std::string_view Namespace, std::string_view BucketId,
"Response.reason: '{}', "
"Response.error.code: {}, "
"Response.error.message: '{}', "
- "Resonse.raw_header: '{}'"
- "Resonse.text: '{}'",
+ "Response.raw_header: '{}'"
+ "Response.text: '{}'",
Response.elapsed,
Uri,
AccessToken.Value.substr(0, 6),
@@ -194,8 +262,8 @@ CloudCacheSession::GetBlob(std::string_view Namespace, const IoHash& Key)
"Response.reason: '{}', "
"Response.error.code: {}, "
"Response.error.message: '{}', "
- "Resonse.raw_header: '{}'"
- "Resonse.text: '{}'",
+ "Response.raw_header: '{}'"
+ "Response.text: '{}'",
Response.elapsed,
Uri,
AccessToken.Value.substr(0, 6),
@@ -211,12 +279,13 @@ CloudCacheSession::GetBlob(std::string_view Namespace, const IoHash& Key)
}
CloudCacheResult
-CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& Key)
+CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath)
{
ZEN_TRACE_CPU("HordeClient::GetCompressedBlob");
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString();
+ std::string KeyString = Key.ToHexString();
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << KeyString;
cpr::Session& Session = GetSession();
const CloudCacheAccessToken& AccessToken = GetAccessToken();
@@ -225,13 +294,14 @@ CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& K
Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-comp"}});
Session.SetOption(cpr::Body{});
- cpr::Response Response = Session.Get();
+ IoBuffer Payload;
+ cpr::Response Response = detail::GetWithStreaming(Session, TempFolderPath, KeyString, Payload);
ZEN_DEBUG("GET {}", Response);
CloudCacheResult Result = detail::ConvertResponse(Response);
if (Result.Success)
{
- Result.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size());
+ Result.Response = std::move(Payload);
}
else
{
@@ -245,8 +315,8 @@ CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& K
"Response.reason: '{}', "
"Response.error.code: {}, "
"Response.error.message: '{}', "
- "Resonse.raw_header: '{}'"
- "Resonse.text: '{}'",
+ "Response.raw_header: '{}'"
+ "Response.text: '{}'",
Response.elapsed,
Uri,
AccessToken.Value.substr(0, 6),
@@ -262,12 +332,17 @@ CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& K
}
CloudCacheResult
-CloudCacheSession::GetInlineBlob(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoHash& OutPayloadHash)
+CloudCacheSession::GetInlineBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const IoHash& Key,
+ IoHash& OutPayloadHash,
+ std::filesystem::path TempFolderPath)
{
ZEN_TRACE_CPU("HordeClient::GetInlineBlob");
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString();
+ std::string KeyString = Key.ToHexString();
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << KeyString;
cpr::Session& Session = GetSession();
const CloudCacheAccessToken& AccessToken = GetAccessToken();
@@ -276,13 +351,14 @@ CloudCacheSession::GetInlineBlob(std::string_view Namespace, std::string_view Bu
Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-jupiter-inline"}});
Session.SetOption(cpr::Body{});
- cpr::Response Response = Session.Get();
+ IoBuffer Payload;
+ cpr::Response Response = detail::GetWithStreaming(Session, TempFolderPath, KeyString, Payload);
ZEN_DEBUG("GET {}", Response);
CloudCacheResult Result = detail::ConvertResponse(Response);
if (Result.Success)
{
- Result.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size());
+ Result.Response = std::move(Payload);
}
else
{
@@ -296,8 +372,8 @@ CloudCacheSession::GetInlineBlob(std::string_view Namespace, std::string_view Bu
"Response.reason: '{}', "
"Response.error.code: {}, "
"Response.error.message: '{}', "
- "Resonse.raw_header: '{}'"
- "Resonse.text: '{}'",
+ "Response.raw_header: '{}'"
+ "Response.text: '{}'",
Response.elapsed,
Uri,
AccessToken.Value.substr(0, 6),
@@ -357,8 +433,8 @@ CloudCacheSession::GetObject(std::string_view Namespace, const IoHash& Key)
"Response.reason: '{}', "
"Response.error.code: {}, "
"Response.error.message: '{}', "
- "Resonse.raw_header: '{}'"
- "Resonse.text: '{}'",
+ "Response.raw_header: '{}'"
+ "Response.text: '{}'",
Response.elapsed,
Uri,
AccessToken.Value.substr(0, 6),
@@ -426,8 +502,8 @@ CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId,
"Response.reason: '{}', "
"Response.error.code: {}, "
"Response.error.message: '{}', "
- "Resonse.raw_header: '{}'"
- "Resonse.text: '{}'",
+ "Response.raw_header: '{}'"
+ "Response.text: '{}'",
Response.elapsed,
Uri,
AccessToken.Value.substr(0, 6),
@@ -495,8 +571,8 @@ CloudCacheSession::FinalizeRef(std::string_view Namespace, std::string_view Buck
"Response.reason: '{}', "
"Response.error.code: {}, "
"Response.error.message: '{}', "
- "Resonse.raw_header: '{}'"
- "Resonse.text: '{}'",
+ "Response.raw_header: '{}'"
+ "Response.text: '{}'",
Response.elapsed,
Uri,
AccessToken.Value.substr(0, 6),
@@ -546,8 +622,8 @@ CloudCacheSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuff
"Response.reason: '{}', "
"Response.error.code: {}, "
"Response.error.message: '{}', "
- "Resonse.raw_header: '{}'"
- "Resonse.text: '{}'",
+ "Response.raw_header: '{}'"
+ "Response.text: '{}'",
Response.elapsed,
Uri,
AccessToken.Value.substr(0, 6),
@@ -612,8 +688,8 @@ CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& K
"Response.reason: '{}', "
"Response.error.code: {}, "
"Response.error.message: '{}', "
- "Resonse.raw_header: '{}'"
- "Resonse.text: '{}'",
+ "Response.raw_header: '{}'"
+ "Response.text: '{}'",
Response.elapsed,
Uri,
AccessToken.Value.substr(0, 6),
@@ -670,8 +746,8 @@ CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& K
"Response.reason: '{}', "
"Response.error.code: {}, "
"Response.error.message: '{}', "
- "Resonse.raw_header: '{}'"
- "Resonse.text: '{}'",
+ "Response.raw_header: '{}'"
+ "Response.text: '{}'",
Response.elapsed,
Uri,
AccessToken.Value.substr(0, 6),
@@ -719,8 +795,8 @@ CloudCacheSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBu
"Response.reason: '{}', "
"Response.error.code: {}, "
"Response.error.message: '{}', "
- "Resonse.raw_header: '{}'"
- "Resonse.text: '{}'",
+ "Response.raw_header: '{}'"
+ "Response.text: '{}'",
Response.elapsed,
Uri,
AccessToken.Value.substr(0, 6),
@@ -766,8 +842,8 @@ CloudCacheSession::RefExists(std::string_view Namespace, std::string_view Bucket
"Response.reason: '{}', "
"Response.error.code: {}, "
"Response.error.message: '{}', "
- "Resonse.raw_header: '{}'"
- "Resonse.text: '{}'",
+ "Response.raw_header: '{}'"
+ "Response.text: '{}'",
Response.elapsed,
Uri,
AccessToken.Value.substr(0, 6),
@@ -822,8 +898,8 @@ CloudCacheSession::GetObjectReferences(std::string_view Namespace, const IoHash&
"Response.reason: '{}', "
"Response.error.code: {}, "
"Response.error.message: '{}', "
- "Resonse.raw_header: '{}'"
- "Resonse.text: '{}'",
+ "Response.raw_header: '{}'"
+ "Response.text: '{}'",
Response.elapsed,
Uri,
AccessToken.Value.substr(0, 6),
@@ -906,8 +982,8 @@ CloudCacheSession::PostComputeTasks(IoBuffer TasksData)
"Response.reason: '{}', "
"Response.error.code: {}, "
"Response.error.message: '{}', "
- "Resonse.raw_header: '{}'"
- "Resonse.text: '{}'",
+ "Response.raw_header: '{}'"
+ "Response.text: '{}'",
Response.elapsed,
Uri,
AccessToken.Value.substr(0, 6),
@@ -959,8 +1035,8 @@ CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t
"Response.reason: '{}', "
"Response.error.code: {}, "
"Response.error.message: '{}', "
- "Resonse.raw_header: '{}'"
- "Resonse.text: '{}'",
+ "Response.raw_header: '{}'"
+ "Response.text: '{}'",
Response.elapsed,
Uri,
AccessToken.Value.substr(0, 6),
@@ -1031,8 +1107,8 @@ CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view
"Response.reason: '{}', "
"Response.error.code: {}, "
"Response.error.message: '{}', "
- "Resonse.raw_header: '{}'"
- "Resonse.text: '{}'",
+ "Response.raw_header: '{}'"
+ "Response.text: '{}'",
Response.elapsed,
Uri,
AccessToken.Value.substr(0, 6),
@@ -1096,8 +1172,8 @@ CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view
"Response.reason: '{}', "
"Response.error.code: {}, "
"Response.error.message: '{}', "
- "Resonse.raw_header: '{}'"
- "Resonse.text: '{}'",
+ "Response.raw_header: '{}'"
+ "Response.text: '{}'",
Response.elapsed,
Uri,
AccessToken.Value.substr(0, 6),