diff options
Diffstat (limited to 'src/zenutil/jupiter/jupitersession.cpp')
| -rw-r--r-- | src/zenutil/jupiter/jupitersession.cpp | 177 |
1 files changed, 131 insertions, 46 deletions
diff --git a/src/zenutil/jupiter/jupitersession.cpp b/src/zenutil/jupiter/jupitersession.cpp index 68f214c06..1fd59acdf 100644 --- a/src/zenutil/jupiter/jupitersession.cpp +++ b/src/zenutil/jupiter/jupitersession.cpp @@ -5,6 +5,7 @@ #include <zencore/compactbinary.h> #include <zencore/compactbinarybuilder.h> #include <zencore/compositebuffer.h> +#include <zencore/compress.h> #include <zencore/fmtutils.h> #include <zencore/trace.h> @@ -48,7 +49,10 @@ namespace detail { } } // namespace detail -JupiterSession::JupiterSession(LoggerRef InLog, HttpClient& InHttpClient) : m_Log(InLog), m_HttpClient(InHttpClient) +JupiterSession::JupiterSession(LoggerRef InLog, HttpClient& InHttpClient, bool AllowRedirect) +: m_Log(InLog) +, m_HttpClient(InHttpClient) +, m_AllowRedirect(AllowRedirect) { } @@ -357,12 +361,28 @@ JupiterSession::CacheTypeExists(std::string_view Namespace, std::string_view Typ } JupiterResult +JupiterSession::ListBuildNamespaces() +{ + HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds"), {HttpClient::Accept(ZenContentType::kJSON)}); + return detail::ConvertResponse(Response, "JupiterSession::ListBuildNamespaces"sv); +} + +JupiterResult +JupiterSession::ListBuildBuckets(std::string_view Namespace) +{ + HttpClient::Response Response = + m_HttpClient.Get(fmt::format("/api/v2/builds/{}", Namespace), {HttpClient::Accept(ZenContentType::kJSON)}); + return detail::ConvertResponse(Response, "JupiterSession::ListBuildBuckets"sv); +} + +JupiterResult JupiterSession::ListBuilds(std::string_view Namespace, std::string_view BucketId, const IoBuffer& Payload) { ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); - HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/search", Namespace, BucketId), - Payload, - {HttpClient::Accept(ZenContentType::kCbObject)}); + std::string OptionalBucketPath = BucketId.empty() ? "" : fmt::format("/{}", BucketId); + HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v2/builds/{}{}/search", Namespace, OptionalBucketPath), + Payload, + {HttpClient::Accept(ZenContentType::kCbObject)}); return detail::ConvertResponse(Response, "JupiterSession::ListBuilds"sv); } @@ -527,12 +547,14 @@ JupiterSession::PutMultipartBuildBlob(std::string_view Namespace, bool& OutIsComplete) -> JupiterResult { const MultipartUploadResponse::Part& Part = Workload->PartDescription.Parts[PartIndex]; IoBuffer PartPayload = Workload->Transmitter(Part.FirstByte, Part.LastByte - Part.FirstByte); - std::string MultipartUploadResponseRequestString = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}", - Namespace, - BucketId, - BuildId, - Hash.ToHexString(), - Part.QueryString); + std::string MultipartUploadResponseRequestString = + fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}&supportsRedirect={}", + Namespace, + BucketId, + BuildId, + Hash.ToHexString(), + Part.QueryString, + m_AllowRedirect ? "true"sv : "false"sv); // ZEN_INFO("PUT: {}", MultipartUploadResponseRequestString); HttpClient::Response MultipartUploadResponse = m_HttpClient.Put(MultipartUploadResponseRequestString, PartPayload); if (!MultipartUploadResponse.IsSuccess()) @@ -590,12 +612,13 @@ JupiterSession::PutMultipartBuildBlob(std::string_view Namespace, IoBuffer RetryPartPayload = Workload->Transmitter(RetryPart.FirstByte, RetryPart.LastByte - RetryPart.FirstByte - 1); std::string RetryMultipartUploadResponseRequestString = - fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}", + fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}&supportsRedirect={}", Namespace, BucketId, BuildId, Hash.ToHexString(), - RetryPart.QueryString); + RetryPart.QueryString, + m_AllowRedirect ? "true"sv : "false"sv); MultipartUploadResponse = m_HttpClient.Put(RetryMultipartUploadResponseRequestString, RetryPartPayload); TotalUploadedBytes = MultipartUploadResponse.UploadedBytes; @@ -626,15 +649,21 @@ JupiterSession::PutMultipartBuildBlob(std::string_view Namespace, } JupiterResult -JupiterSession::GetMultipartBuildBlob(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const IoHash& Hash, - uint64_t ChunkSize, - std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver, - std::vector<std::function<JupiterResult()>>& OutWorkItems) -{ - std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString()); +JupiterSession::GetMultipartBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const IoHash& Hash, + uint64_t ChunkSize, + std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive, + std::function<void()>&& OnComplete, + std::vector<std::function<JupiterResult()>>& OutWorkItems) +{ + std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}", + Namespace, + BucketId, + BuildId, + Hash.ToHexString(), + m_AllowRedirect ? "true"sv : "false"sv); HttpClient::Response Response = m_HttpClient.Get(RequestUrl, HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", 0, ChunkSize - 1)}})); if (Response.IsSuccess()) @@ -649,46 +678,68 @@ JupiterSession::GetMultipartBuildBlob(std::string_view Namespa uint64_t TotalSize = TotalSizeMaybe.value(); uint64_t PayloadSize = Response.ResponsePayload.GetSize(); - Receiver(0, Response.ResponsePayload, TotalSize); + OnReceive(0, Response.ResponsePayload); if (TotalSize > PayloadSize) { struct WorkloadData { - std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)> Receiver; - std::atomic<uint64_t> BytesRemaining; + std::function<void(uint64_t Offset, const IoBuffer& Chunk)> OnReceive; + std::function<void()> OnComplete; + std::atomic<uint64_t> BytesRemaining; }; std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>()); - Workload->Receiver = std::move(Receiver); + Workload->OnReceive = std::move(OnReceive); + Workload->OnComplete = std::move(OnComplete); Workload->BytesRemaining = TotalSize - PayloadSize; uint64_t Offset = PayloadSize; while (Offset < TotalSize) { uint64_t PartSize = Min(ChunkSize, TotalSize - Offset); - OutWorkItems.emplace_back( - [this, Namespace, BucketId, BuildId, Hash, TotalSize, Workload, Offset, PartSize]() -> JupiterResult { - std::string RequestUrl = - fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString()); - HttpClient::Response Response = m_HttpClient.Get( - RequestUrl, - HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", Offset, Offset + PartSize - 1)}})); - if (Response.IsSuccess()) + OutWorkItems.emplace_back([this, + Namespace = std::string(Namespace), + BucketId = std::string(BucketId), + BuildId = Oid(BuildId), + Hash = IoHash(Hash), + TotalSize, + Workload, + Offset, + PartSize]() -> JupiterResult { + std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}", + Namespace, + BucketId, + BuildId, + Hash.ToHexString(), + m_AllowRedirect ? "true"sv : "false"sv); + HttpClient::Response Response = m_HttpClient.Get( + RequestUrl, + HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", Offset, Offset + PartSize - 1)}})); + if (Response.IsSuccess()) + { + Workload->OnReceive(Offset, Response.ResponsePayload); + uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Response.ResponsePayload.GetSize()); + if (ByteRemaning == Response.ResponsePayload.GetSize()) { - uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Response.ResponsePayload.GetSize()); - Workload->Receiver(Offset, Response.ResponsePayload, ByteRemaning); + Workload->OnComplete(); } - return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv); - }); + } + return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv); + }); Offset += PartSize; } } + else + { + OnComplete(); + } return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv); } } } - Receiver(0, Response.ResponsePayload, Response.ResponsePayload.GetSize()); + OnReceive(0, Response.ResponsePayload); + OnComplete(); } return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv); } @@ -707,11 +758,28 @@ JupiterSession::GetBuildBlob(std::string_view Namespace, { Headers.Entries.insert({"Range", fmt::format("bytes={}-{}", Offset, Offset + Size - 1)}); } - HttpClient::Response Response = - m_HttpClient.Download(fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString()), - TempFolderPath, - Headers); - + HttpClient::Response Response = m_HttpClient.Download(fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}", + Namespace, + BucketId, + BuildId, + Hash.ToHexString(), + m_AllowRedirect ? "true"sv : "false"sv), + TempFolderPath, + Headers); + if (Response.IsSuccess()) + { + // If we get a redirect to S3 or a non-Jupiter endpoint the content type will not be correct, validate it and set it + if (m_AllowRedirect && (Response.ResponsePayload.GetContentType() == HttpContentType::kBinary)) + { + IoHash ValidateRawHash; + uint64_t ValidateRawSize = 0; + ZEN_ASSERT_SLOW(CompressedBuffer::ValidateCompressedHeader(Response.ResponsePayload, ValidateRawHash, ValidateRawSize)); + ZEN_ASSERT_SLOW(ValidateRawHash == Hash); + ZEN_ASSERT_SLOW(ValidateRawSize > 0); + ZEN_UNUSED(ValidateRawHash, ValidateRawSize); + Response.ResponsePayload.SetContentType(ZenContentType::kCompressedBinary); + } + } return detail::ConvertResponse(Response, "JupiterSession::GetBuildBlob"sv); } @@ -758,10 +826,12 @@ JupiterSession::FinalizeBuildPart(std::string_view Namespace, } JupiterResult -JupiterSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId) +JupiterSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, uint64_t MaxBlockCount) { - HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/blocks/listBlocks", Namespace, BucketId, BuildId), - HttpClient::Accept(ZenContentType::kCbObject)); + const std::string Parameters = MaxBlockCount == (uint64_t)-1 ? "" : fmt::format("?count={}", MaxBlockCount); + HttpClient::Response Response = + m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/blocks/listBlocks{}", Namespace, BucketId, BuildId, Parameters), + HttpClient::Accept(ZenContentType::kCbObject)); return detail::ConvertResponse(Response, "JupiterSession::FindBlocks"sv); } @@ -776,4 +846,19 @@ JupiterSession::GetBlockMetadata(std::string_view Namespace, std::string_view Bu return detail::ConvertResponse(Response, "JupiterSession::GetBlockMetadata"sv); } +JupiterResult +JupiterSession::PutBuildPartStats(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& BuildPartId, + IoBuffer Payload) +{ + ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); + HttpClient::Response Response = + m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/stats", Namespace, BucketId, BuildId, BuildPartId), + Payload, + HttpClient::Accept(ZenContentType::kCbObject)); + return detail::ConvertResponse(Response, "JupiterSession::PutBuildPartStats"sv); +} + } // namespace zen |