aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/jupiter/jupitersession.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenutil/jupiter/jupitersession.cpp')
-rw-r--r--src/zenutil/jupiter/jupitersession.cpp177
1 files changed, 131 insertions, 46 deletions
diff --git a/src/zenutil/jupiter/jupitersession.cpp b/src/zenutil/jupiter/jupitersession.cpp
index 68f214c06..1fd59acdf 100644
--- a/src/zenutil/jupiter/jupitersession.cpp
+++ b/src/zenutil/jupiter/jupitersession.cpp
@@ -5,6 +5,7 @@
#include <zencore/compactbinary.h>
#include <zencore/compactbinarybuilder.h>
#include <zencore/compositebuffer.h>
+#include <zencore/compress.h>
#include <zencore/fmtutils.h>
#include <zencore/trace.h>
@@ -48,7 +49,10 @@ namespace detail {
}
} // namespace detail
-JupiterSession::JupiterSession(LoggerRef InLog, HttpClient& InHttpClient) : m_Log(InLog), m_HttpClient(InHttpClient)
+JupiterSession::JupiterSession(LoggerRef InLog, HttpClient& InHttpClient, bool AllowRedirect)
+: m_Log(InLog)
+, m_HttpClient(InHttpClient)
+, m_AllowRedirect(AllowRedirect)
{
}
@@ -357,12 +361,28 @@ JupiterSession::CacheTypeExists(std::string_view Namespace, std::string_view Typ
}
JupiterResult
+JupiterSession::ListBuildNamespaces()
+{
+ HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds"), {HttpClient::Accept(ZenContentType::kJSON)});
+ return detail::ConvertResponse(Response, "JupiterSession::ListBuildNamespaces"sv);
+}
+
+JupiterResult
+JupiterSession::ListBuildBuckets(std::string_view Namespace)
+{
+ HttpClient::Response Response =
+ m_HttpClient.Get(fmt::format("/api/v2/builds/{}", Namespace), {HttpClient::Accept(ZenContentType::kJSON)});
+ return detail::ConvertResponse(Response, "JupiterSession::ListBuildBuckets"sv);
+}
+
+JupiterResult
JupiterSession::ListBuilds(std::string_view Namespace, std::string_view BucketId, const IoBuffer& Payload)
{
ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
- HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/search", Namespace, BucketId),
- Payload,
- {HttpClient::Accept(ZenContentType::kCbObject)});
+ std::string OptionalBucketPath = BucketId.empty() ? "" : fmt::format("/{}", BucketId);
+ HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v2/builds/{}{}/search", Namespace, OptionalBucketPath),
+ Payload,
+ {HttpClient::Accept(ZenContentType::kCbObject)});
return detail::ConvertResponse(Response, "JupiterSession::ListBuilds"sv);
}
@@ -527,12 +547,14 @@ JupiterSession::PutMultipartBuildBlob(std::string_view Namespace,
bool& OutIsComplete) -> JupiterResult {
const MultipartUploadResponse::Part& Part = Workload->PartDescription.Parts[PartIndex];
IoBuffer PartPayload = Workload->Transmitter(Part.FirstByte, Part.LastByte - Part.FirstByte);
- std::string MultipartUploadResponseRequestString = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}",
- Namespace,
- BucketId,
- BuildId,
- Hash.ToHexString(),
- Part.QueryString);
+ std::string MultipartUploadResponseRequestString =
+ fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}&supportsRedirect={}",
+ Namespace,
+ BucketId,
+ BuildId,
+ Hash.ToHexString(),
+ Part.QueryString,
+ m_AllowRedirect ? "true"sv : "false"sv);
// ZEN_INFO("PUT: {}", MultipartUploadResponseRequestString);
HttpClient::Response MultipartUploadResponse = m_HttpClient.Put(MultipartUploadResponseRequestString, PartPayload);
if (!MultipartUploadResponse.IsSuccess())
@@ -590,12 +612,13 @@ JupiterSession::PutMultipartBuildBlob(std::string_view Namespace,
IoBuffer RetryPartPayload =
Workload->Transmitter(RetryPart.FirstByte, RetryPart.LastByte - RetryPart.FirstByte - 1);
std::string RetryMultipartUploadResponseRequestString =
- fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}",
+ fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}&supportsRedirect={}",
Namespace,
BucketId,
BuildId,
Hash.ToHexString(),
- RetryPart.QueryString);
+ RetryPart.QueryString,
+ m_AllowRedirect ? "true"sv : "false"sv);
MultipartUploadResponse = m_HttpClient.Put(RetryMultipartUploadResponseRequestString, RetryPartPayload);
TotalUploadedBytes = MultipartUploadResponse.UploadedBytes;
@@ -626,15 +649,21 @@ JupiterSession::PutMultipartBuildBlob(std::string_view Namespace,
}
JupiterResult
-JupiterSession::GetMultipartBuildBlob(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const IoHash& Hash,
- uint64_t ChunkSize,
- std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver,
- std::vector<std::function<JupiterResult()>>& OutWorkItems)
-{
- std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString());
+JupiterSession::GetMultipartBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const IoHash& Hash,
+ uint64_t ChunkSize,
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive,
+ std::function<void()>&& OnComplete,
+ std::vector<std::function<JupiterResult()>>& OutWorkItems)
+{
+ std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}",
+ Namespace,
+ BucketId,
+ BuildId,
+ Hash.ToHexString(),
+ m_AllowRedirect ? "true"sv : "false"sv);
HttpClient::Response Response =
m_HttpClient.Get(RequestUrl, HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", 0, ChunkSize - 1)}}));
if (Response.IsSuccess())
@@ -649,46 +678,68 @@ JupiterSession::GetMultipartBuildBlob(std::string_view Namespa
uint64_t TotalSize = TotalSizeMaybe.value();
uint64_t PayloadSize = Response.ResponsePayload.GetSize();
- Receiver(0, Response.ResponsePayload, TotalSize);
+ OnReceive(0, Response.ResponsePayload);
if (TotalSize > PayloadSize)
{
struct WorkloadData
{
- std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)> Receiver;
- std::atomic<uint64_t> BytesRemaining;
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk)> OnReceive;
+ std::function<void()> OnComplete;
+ std::atomic<uint64_t> BytesRemaining;
};
std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>());
- Workload->Receiver = std::move(Receiver);
+ Workload->OnReceive = std::move(OnReceive);
+ Workload->OnComplete = std::move(OnComplete);
Workload->BytesRemaining = TotalSize - PayloadSize;
uint64_t Offset = PayloadSize;
while (Offset < TotalSize)
{
uint64_t PartSize = Min(ChunkSize, TotalSize - Offset);
- OutWorkItems.emplace_back(
- [this, Namespace, BucketId, BuildId, Hash, TotalSize, Workload, Offset, PartSize]() -> JupiterResult {
- std::string RequestUrl =
- fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString());
- HttpClient::Response Response = m_HttpClient.Get(
- RequestUrl,
- HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", Offset, Offset + PartSize - 1)}}));
- if (Response.IsSuccess())
+ OutWorkItems.emplace_back([this,
+ Namespace = std::string(Namespace),
+ BucketId = std::string(BucketId),
+ BuildId = Oid(BuildId),
+ Hash = IoHash(Hash),
+ TotalSize,
+ Workload,
+ Offset,
+ PartSize]() -> JupiterResult {
+ std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}",
+ Namespace,
+ BucketId,
+ BuildId,
+ Hash.ToHexString(),
+ m_AllowRedirect ? "true"sv : "false"sv);
+ HttpClient::Response Response = m_HttpClient.Get(
+ RequestUrl,
+ HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", Offset, Offset + PartSize - 1)}}));
+ if (Response.IsSuccess())
+ {
+ Workload->OnReceive(Offset, Response.ResponsePayload);
+ uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Response.ResponsePayload.GetSize());
+ if (ByteRemaning == Response.ResponsePayload.GetSize())
{
- uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Response.ResponsePayload.GetSize());
- Workload->Receiver(Offset, Response.ResponsePayload, ByteRemaning);
+ Workload->OnComplete();
}
- return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv);
- });
+ }
+ return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv);
+ });
Offset += PartSize;
}
}
+ else
+ {
+ OnComplete();
+ }
return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv);
}
}
}
- Receiver(0, Response.ResponsePayload, Response.ResponsePayload.GetSize());
+ OnReceive(0, Response.ResponsePayload);
+ OnComplete();
}
return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv);
}
@@ -707,11 +758,28 @@ JupiterSession::GetBuildBlob(std::string_view Namespace,
{
Headers.Entries.insert({"Range", fmt::format("bytes={}-{}", Offset, Offset + Size - 1)});
}
- HttpClient::Response Response =
- m_HttpClient.Download(fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString()),
- TempFolderPath,
- Headers);
-
+ HttpClient::Response Response = m_HttpClient.Download(fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}",
+ Namespace,
+ BucketId,
+ BuildId,
+ Hash.ToHexString(),
+ m_AllowRedirect ? "true"sv : "false"sv),
+ TempFolderPath,
+ Headers);
+ if (Response.IsSuccess())
+ {
+ // If we get a redirect to S3 or a non-Jupiter endpoint the content type will not be correct, validate it and set it
+ if (m_AllowRedirect && (Response.ResponsePayload.GetContentType() == HttpContentType::kBinary))
+ {
+ IoHash ValidateRawHash;
+ uint64_t ValidateRawSize = 0;
+ ZEN_ASSERT_SLOW(CompressedBuffer::ValidateCompressedHeader(Response.ResponsePayload, ValidateRawHash, ValidateRawSize));
+ ZEN_ASSERT_SLOW(ValidateRawHash == Hash);
+ ZEN_ASSERT_SLOW(ValidateRawSize > 0);
+ ZEN_UNUSED(ValidateRawHash, ValidateRawSize);
+ Response.ResponsePayload.SetContentType(ZenContentType::kCompressedBinary);
+ }
+ }
return detail::ConvertResponse(Response, "JupiterSession::GetBuildBlob"sv);
}
@@ -758,10 +826,12 @@ JupiterSession::FinalizeBuildPart(std::string_view Namespace,
}
JupiterResult
-JupiterSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId)
+JupiterSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, uint64_t MaxBlockCount)
{
- HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/blocks/listBlocks", Namespace, BucketId, BuildId),
- HttpClient::Accept(ZenContentType::kCbObject));
+ const std::string Parameters = MaxBlockCount == (uint64_t)-1 ? "" : fmt::format("?count={}", MaxBlockCount);
+ HttpClient::Response Response =
+ m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/blocks/listBlocks{}", Namespace, BucketId, BuildId, Parameters),
+ HttpClient::Accept(ZenContentType::kCbObject));
return detail::ConvertResponse(Response, "JupiterSession::FindBlocks"sv);
}
@@ -776,4 +846,19 @@ JupiterSession::GetBlockMetadata(std::string_view Namespace, std::string_view Bu
return detail::ConvertResponse(Response, "JupiterSession::GetBlockMetadata"sv);
}
+JupiterResult
+JupiterSession::PutBuildPartStats(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& BuildPartId,
+ IoBuffer Payload)
+{
+ ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
+ HttpClient::Response Response =
+ m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/stats", Namespace, BucketId, BuildId, BuildPartId),
+ Payload,
+ HttpClient::Accept(ZenContentType::kCbObject));
+ return detail::ConvertResponse(Response, "JupiterSession::PutBuildPartStats"sv);
+}
+
} // namespace zen