diff options
| author | Dan Engelbrecht <[email protected]> | 2025-02-12 08:58:52 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-02-12 08:58:52 +0100 |
| commit | d39724eb644cab4ec5bbf19a703cb770b34e68c4 (patch) | |
| tree | e7263163d6bed8ba7f5cd05822f6993a26907b01 /src/zenutil | |
| parent | Fix workspace shares reply array (#280) (diff) | |
| download | zen-d39724eb644cab4ec5bbf19a703cb770b34e68c4.tar.xz zen-d39724eb644cab4ec5bbf19a703cb770b34e68c4.zip | |
improved builds api interface in jupiter (#281)
* multipart upload/download iterface in jupiter
* review fixes
Diffstat (limited to 'src/zenutil')
| -rw-r--r-- | src/zenutil/include/zenutil/jupiter/jupitersession.h | 70 | ||||
| -rw-r--r-- | src/zenutil/jupiter/jupitersession.cpp | 293 |
2 files changed, 340 insertions, 23 deletions
diff --git a/src/zenutil/include/zenutil/jupiter/jupitersession.h b/src/zenutil/include/zenutil/jupiter/jupitersession.h index 6a80332f4..075c35b40 100644 --- a/src/zenutil/include/zenutil/jupiter/jupitersession.h +++ b/src/zenutil/include/zenutil/jupiter/jupitersession.h @@ -102,29 +102,48 @@ public: std::vector<IoHash> Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes); - JupiterResult PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload); - JupiterResult GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId); - JupiterResult FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId); - PutBuildPartResult PutBuildPart(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const Oid& PartId, - std::string_view PartName, - const IoBuffer& Payload); - JupiterResult GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId); - JupiterResult PutBuildBlob(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const Oid& PartId, - const IoHash& Hash, - ZenContentType ContentType, - const CompositeBuffer& Payload); - JupiterResult GetBuildBlob(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const Oid& PartId, - const IoHash& Hash, - std::filesystem::path TempFolderPath); + JupiterResult ListBuilds(std::string_view Namespace, std::string_view BucketId, const IoBuffer& Payload); + JupiterResult PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload); + JupiterResult GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId); + JupiterResult FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId); + PutBuildPartResult PutBuildPart(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + std::string_view PartName, + const IoBuffer& Payload); + JupiterResult GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId); + JupiterResult PutBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& Hash, + ZenContentType ContentType, + const CompositeBuffer& Payload); + JupiterResult GetBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& Hash, + std::filesystem::path TempFolderPath); + + JupiterResult PutMultipartBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& Hash, + ZenContentType ContentType, + uint64_t PayloadSize, + std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter, + std::vector<std::function<JupiterResult(bool& OutIsComplete)>>& OutWorkItems); + JupiterResult GetMultipartBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& Hash, + uint64_t ChunkSize, + std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver, + std::vector<std::function<JupiterResult()>>& OutWorkItems); JupiterResult PutBlockMetadata(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, @@ -137,6 +156,11 @@ public: const Oid& PartId, const IoHash& RawHash); JupiterResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId); + JupiterResult GetBlockMetadata(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + IoBuffer Payload); private: inline LoggerRef Log() { return m_Log; } diff --git a/src/zenutil/jupiter/jupitersession.cpp b/src/zenutil/jupiter/jupitersession.cpp index f706a7efc..d56927b44 100644 --- a/src/zenutil/jupiter/jupitersession.cpp +++ b/src/zenutil/jupiter/jupitersession.cpp @@ -3,6 +3,8 @@ #include <zenutil/jupiter/jupitersession.h> #include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/compositebuffer.h> #include <zencore/fmtutils.h> #include <zencore/trace.h> @@ -355,6 +357,16 @@ JupiterSession::CacheTypeExists(std::string_view Namespace, std::string_view Typ } JupiterResult +JupiterSession::ListBuilds(std::string_view Namespace, std::string_view BucketId, const IoBuffer& Payload) +{ + ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); + HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/search", Namespace, BucketId), + Payload, + {HttpClient::Accept(ZenContentType::kCbObject)}); + return detail::ConvertResponse(Response, "JupiterSession::ListBuilds"sv); +} + +JupiterResult JupiterSession::PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload) { ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); @@ -437,6 +449,271 @@ JupiterSession::PutBuildBlob(std::string_view Namespace, } JupiterResult +JupiterSession::PutMultipartBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& Hash, + ZenContentType ContentType, + uint64_t PayloadSize, + std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter, + std::vector<std::function<JupiterResult(bool& OutIsComplete)>>& OutWorkItems) +{ + struct MultipartUploadResponse + { + struct Part + { + uint64_t FirstByte; + uint64_t LastByte; + std::string PartId; + std::string QueryString; + }; + + std::string UploadId; + std::string BlobName; + std::vector<Part> Parts; + + static MultipartUploadResponse Parse(CbObject& Payload) + { + MultipartUploadResponse Result; + Result.UploadId = Payload["uploadId"sv].AsString(); + Result.BlobName = Payload["blobName"sv].AsString(); + CbArrayView PartsArray = Payload["parts"sv].AsArrayView(); + Result.Parts.reserve(PartsArray.Num()); + for (CbFieldView PartView : PartsArray) + { + CbObjectView PartObject = PartView.AsObjectView(); + Result.Parts.emplace_back(Part{ + .FirstByte = PartObject["firstByte"sv].AsUInt64(), + .LastByte = PartObject["lastByte"sv].AsUInt64(), + .PartId = std::string(PartObject["partId"sv].AsString()), + .QueryString = std::string(PartObject["queryString"sv].AsString()), + }); + } + return Result; + } + }; + + CbObjectWriter StartMultipartPayloadWriter; + StartMultipartPayloadWriter.AddInteger("blobLength"sv, PayloadSize); + CbObject StartMultipartPayload = StartMultipartPayloadWriter.Save(); + + std::string StartMultipartResponseRequestString = fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}/startMultipartUpload", + Namespace, + BucketId, + BuildId, + PartId, + Hash.ToHexString()); + // ZEN_INFO("POST: {}", StartMultipartResponseRequestString); + HttpClient::Response StartMultipartResponse = + m_HttpClient.Post(StartMultipartResponseRequestString, StartMultipartPayload, HttpClient::Accept(ZenContentType::kCbObject)); + if (!StartMultipartResponse.IsSuccess()) + { + ZEN_WARN("{}", StartMultipartResponse.ErrorMessage("startMultipartUpload: ")); + return detail::ConvertResponse(StartMultipartResponse, "JupiterSession::PutMultipartBuildBlob"sv); + } + CbObject ResponseObject = LoadCompactBinaryObject(StartMultipartResponse.ResponsePayload); + + struct WorkloadData + { + MultipartUploadResponse PartDescription; + std::function<IoBuffer(uint64_t Offset, uint64_t Size)> Transmitter; + std::atomic<size_t> PartsLeft; + }; + + std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>()); + + Workload->PartDescription = MultipartUploadResponse::Parse(ResponseObject); + Workload->Transmitter = std::move(Transmitter); + Workload->PartsLeft = Workload->PartDescription.Parts.size(); + + for (size_t PartIndex = 0; PartIndex < Workload->PartDescription.Parts.size(); PartIndex++) + { + OutWorkItems.emplace_back([this, Namespace, BucketId, BuildId, PartId, Hash, ContentType, Workload, PartIndex]( + bool& OutIsComplete) -> JupiterResult { + const MultipartUploadResponse::Part& Part = Workload->PartDescription.Parts[PartIndex]; + IoBuffer PartPayload = Workload->Transmitter(Part.FirstByte, Part.LastByte - Part.FirstByte); + std::string MultipartUploadResponseRequestString = fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}/uploadMultipart{}", + Namespace, + BucketId, + BuildId, + PartId, + Hash.ToHexString(), + Part.QueryString); + // ZEN_INFO("PUT: {}", MultipartUploadResponseRequestString); + HttpClient::Response MultipartUploadResponse = m_HttpClient.Put(MultipartUploadResponseRequestString, PartPayload); + if (!MultipartUploadResponse.IsSuccess()) + { + ZEN_WARN("{}", MultipartUploadResponse.ErrorMessage(MultipartUploadResponseRequestString)); + } + OutIsComplete = Workload->PartsLeft.fetch_sub(1) == 1; + if (OutIsComplete) + { + int64_t TotalUploadedBytes = MultipartUploadResponse.UploadedBytes; + int64_t TotalDownloadedBytes = MultipartUploadResponse.DownloadedBytes; + double TotalElapsedSeconds = MultipartUploadResponse.ElapsedSeconds; + HttpClient::Response MultipartEndResponse = MultipartUploadResponse; + while (MultipartEndResponse.IsSuccess()) + { + CbObjectWriter CompletePayloadWriter; + CompletePayloadWriter.AddString("blobName"sv, Workload->PartDescription.BlobName); + CompletePayloadWriter.AddString("uploadId"sv, Workload->PartDescription.UploadId); + CompletePayloadWriter.AddBool("isCompressed"sv, ContentType == ZenContentType::kCompressedBinary); + CompletePayloadWriter.BeginArray("partIds"sv); + std::unordered_map<std::string, size_t> PartNameToIndex; + for (size_t UploadPartIndex = 0; UploadPartIndex < Workload->PartDescription.Parts.size(); UploadPartIndex++) + { + const MultipartUploadResponse::Part& PartDescription = Workload->PartDescription.Parts[UploadPartIndex]; + PartNameToIndex.insert({PartDescription.PartId, UploadPartIndex}); + CompletePayloadWriter.AddString(PartDescription.PartId); + } + CompletePayloadWriter.EndArray(); // "partIds" + CbObject CompletePayload = CompletePayloadWriter.Save(); + + std::string MultipartEndResponseRequestString = + fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}/completeMultipart", + Namespace, + BucketId, + BuildId, + PartId, + Hash.ToHexString()); + + MultipartEndResponse = m_HttpClient.Post(MultipartEndResponseRequestString, + CompletePayload, + HttpClient::Accept(ZenContentType::kCbObject)); + TotalUploadedBytes += MultipartEndResponse.UploadedBytes; + TotalDownloadedBytes += MultipartEndResponse.DownloadedBytes; + TotalElapsedSeconds += MultipartEndResponse.ElapsedSeconds; + if (MultipartEndResponse.IsSuccess()) + { + CbObject ResponseObject = MultipartEndResponse.AsObject(); + CbArrayView MissingPartsArrayView = ResponseObject["missingParts"sv].AsArrayView(); + if (MissingPartsArrayView.Num() == 0) + { + break; + } + else + { + for (CbFieldView PartIdView : MissingPartsArrayView) + { + std::string RetryPartId(PartIdView.AsString()); + size_t RetryPartIndex = PartNameToIndex.at(RetryPartId); + const MultipartUploadResponse::Part& RetryPart = Workload->PartDescription.Parts[RetryPartIndex]; + IoBuffer RetryPartPayload = + Workload->Transmitter(RetryPart.FirstByte, RetryPart.LastByte - RetryPart.FirstByte); + std::string RetryMultipartUploadResponseRequestString = + fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}/uploadMultipart{}", + Namespace, + BucketId, + BuildId, + RetryPartId, + Hash.ToHexString(), + RetryPart.QueryString); + + MultipartUploadResponse = m_HttpClient.Put(RetryMultipartUploadResponseRequestString, RetryPartPayload); + TotalUploadedBytes = MultipartUploadResponse.UploadedBytes; + TotalDownloadedBytes = MultipartUploadResponse.DownloadedBytes; + TotalElapsedSeconds = MultipartUploadResponse.ElapsedSeconds; + if (!MultipartUploadResponse.IsSuccess()) + { + ZEN_WARN("{}", MultipartUploadResponse.ErrorMessage(RetryMultipartUploadResponseRequestString)); + MultipartEndResponse = MultipartUploadResponse; + } + } + } + } + else + { + ZEN_WARN("{}", MultipartEndResponse.ErrorMessage(MultipartEndResponseRequestString)); + } + } + MultipartEndResponse.UploadedBytes = TotalUploadedBytes; + MultipartEndResponse.DownloadedBytes = TotalDownloadedBytes; + MultipartEndResponse.ElapsedSeconds = TotalElapsedSeconds; + return detail::ConvertResponse(MultipartEndResponse, "JupiterSession::PutMultipartBuildBlob"sv); + } + return detail::ConvertResponse(MultipartUploadResponse, "JupiterSession::PutMultipartBuildBlob"sv); + }); + } + return detail::ConvertResponse(StartMultipartResponse, "JupiterSession::PutMultipartBuildBlob"sv); +} + +JupiterResult +JupiterSession::GetMultipartBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& Hash, + uint64_t ChunkSize, + std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver, + std::vector<std::function<JupiterResult()>>& OutWorkItems) +{ + std::string RequestUrl = + fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()); + HttpClient::Response Response = + m_HttpClient.Get(RequestUrl, HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", 0, ChunkSize - 1)}})); + if (Response.IsSuccess()) + { + if (std::string_view ContentRange = Response.Header.Entries["Content-Range"]; !ContentRange.empty()) + { + if (std::string_view::size_type SizeDelimiterPos = ContentRange.find('/'); SizeDelimiterPos != std::string_view::npos) + { + if (std::optional<uint64_t> TotalSizeMaybe = ParseInt<uint64_t>(ContentRange.substr(SizeDelimiterPos + 1)); + TotalSizeMaybe.has_value()) + { + uint64_t TotalSize = TotalSizeMaybe.value(); + uint64_t PayloadSize = Response.ResponsePayload.GetSize(); + + Receiver(0, Response.ResponsePayload, TotalSize); + + if (TotalSize > PayloadSize) + { + struct WorkloadData + { + std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)> Receiver; + std::atomic<uint64_t> BytesRemaining; + }; + + std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>()); + Workload->Receiver = std::move(Receiver); + Workload->BytesRemaining = TotalSize - PayloadSize; + + uint64_t Offset = PayloadSize; + while (Offset < TotalSize) + { + uint64_t PartSize = Min(ChunkSize, TotalSize - Offset); + OutWorkItems.emplace_back( + [this, Namespace, BucketId, BuildId, PartId, Hash, TotalSize, Workload, Offset, PartSize]() + -> JupiterResult { + std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", + Namespace, + BucketId, + BuildId, + PartId, + Hash.ToHexString()); + HttpClient::Response Response = m_HttpClient.Get( + RequestUrl, + HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", Offset, Offset + PartSize - 1)}})); + if (Response.IsSuccess()) + { + uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Response.ResponsePayload.GetSize()); + Workload->Receiver(Offset, Response.ResponsePayload, ByteRemaning); + } + return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv); + }); + Offset += PartSize; + } + } + return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv); + } + } + } + Receiver(0, Response.ResponsePayload, Response.ResponsePayload.GetSize()); + } + return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv); +} + +JupiterResult JupiterSession::GetBuildBlob(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, @@ -447,6 +724,7 @@ JupiterSession::GetBuildBlob(std::string_view Namespace, HttpClient::Response Response = m_HttpClient.Download( fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()), TempFolderPath); + return detail::ConvertResponse(Response, "JupiterSession::GetBuildBlob"sv); } @@ -502,4 +780,19 @@ JupiterSession::FindBlocks(std::string_view Namespace, std::string_view BucketId return detail::ConvertResponse(Response, "JupiterSession::FindBlocks"sv); } +JupiterResult +JupiterSession::GetBlockMetadata(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + IoBuffer Payload) +{ + ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); + HttpClient::Response Response = + m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/getBlockMetadata", Namespace, BucketId, BuildId, PartId), + Payload, + HttpClient::Accept(ZenContentType::kCbObject)); + return detail::ConvertResponse(Response, "JupiterSession::GetBlockMetadata"sv); +} + } // namespace zen |