aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-02-12 08:58:52 +0100
committerGitHub Enterprise <[email protected]>2025-02-12 08:58:52 +0100
commitd39724eb644cab4ec5bbf19a703cb770b34e68c4 (patch)
treee7263163d6bed8ba7f5cd05822f6993a26907b01 /src/zenutil
parentFix workspace shares reply array (#280) (diff)
downloadzen-d39724eb644cab4ec5bbf19a703cb770b34e68c4.tar.xz
zen-d39724eb644cab4ec5bbf19a703cb770b34e68c4.zip
improved builds api interface in jupiter (#281)
* multipart upload/download iterface in jupiter * review fixes
Diffstat (limited to 'src/zenutil')
-rw-r--r--src/zenutil/include/zenutil/jupiter/jupitersession.h70
-rw-r--r--src/zenutil/jupiter/jupitersession.cpp293
2 files changed, 340 insertions, 23 deletions
diff --git a/src/zenutil/include/zenutil/jupiter/jupitersession.h b/src/zenutil/include/zenutil/jupiter/jupitersession.h
index 6a80332f4..075c35b40 100644
--- a/src/zenutil/include/zenutil/jupiter/jupitersession.h
+++ b/src/zenutil/include/zenutil/jupiter/jupitersession.h
@@ -102,29 +102,48 @@ public:
std::vector<IoHash> Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes);
- JupiterResult PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload);
- JupiterResult GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId);
- JupiterResult FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId);
- PutBuildPartResult PutBuildPart(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& PartId,
- std::string_view PartName,
- const IoBuffer& Payload);
- JupiterResult GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId);
- JupiterResult PutBuildBlob(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& PartId,
- const IoHash& Hash,
- ZenContentType ContentType,
- const CompositeBuffer& Payload);
- JupiterResult GetBuildBlob(std::string_view Namespace,
- std::string_view BucketId,
- const Oid& BuildId,
- const Oid& PartId,
- const IoHash& Hash,
- std::filesystem::path TempFolderPath);
+ JupiterResult ListBuilds(std::string_view Namespace, std::string_view BucketId, const IoBuffer& Payload);
+ JupiterResult PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload);
+ JupiterResult GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId);
+ JupiterResult FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId);
+ PutBuildPartResult PutBuildPart(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ std::string_view PartName,
+ const IoBuffer& Payload);
+ JupiterResult GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId);
+ JupiterResult PutBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ ZenContentType ContentType,
+ const CompositeBuffer& Payload);
+ JupiterResult GetBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ std::filesystem::path TempFolderPath);
+
+ JupiterResult PutMultipartBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ ZenContentType ContentType,
+ uint64_t PayloadSize,
+ std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter,
+ std::vector<std::function<JupiterResult(bool& OutIsComplete)>>& OutWorkItems);
+ JupiterResult GetMultipartBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ uint64_t ChunkSize,
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver,
+ std::vector<std::function<JupiterResult()>>& OutWorkItems);
JupiterResult PutBlockMetadata(std::string_view Namespace,
std::string_view BucketId,
const Oid& BuildId,
@@ -137,6 +156,11 @@ public:
const Oid& PartId,
const IoHash& RawHash);
JupiterResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId);
+ JupiterResult GetBlockMetadata(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ IoBuffer Payload);
private:
inline LoggerRef Log() { return m_Log; }
diff --git a/src/zenutil/jupiter/jupitersession.cpp b/src/zenutil/jupiter/jupitersession.cpp
index f706a7efc..d56927b44 100644
--- a/src/zenutil/jupiter/jupitersession.cpp
+++ b/src/zenutil/jupiter/jupitersession.cpp
@@ -3,6 +3,8 @@
#include <zenutil/jupiter/jupitersession.h>
#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compositebuffer.h>
#include <zencore/fmtutils.h>
#include <zencore/trace.h>
@@ -355,6 +357,16 @@ JupiterSession::CacheTypeExists(std::string_view Namespace, std::string_view Typ
}
JupiterResult
+JupiterSession::ListBuilds(std::string_view Namespace, std::string_view BucketId, const IoBuffer& Payload)
+{
+ ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
+ HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/search", Namespace, BucketId),
+ Payload,
+ {HttpClient::Accept(ZenContentType::kCbObject)});
+ return detail::ConvertResponse(Response, "JupiterSession::ListBuilds"sv);
+}
+
+JupiterResult
JupiterSession::PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload)
{
ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
@@ -437,6 +449,271 @@ JupiterSession::PutBuildBlob(std::string_view Namespace,
}
JupiterResult
+JupiterSession::PutMultipartBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ ZenContentType ContentType,
+ uint64_t PayloadSize,
+ std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter,
+ std::vector<std::function<JupiterResult(bool& OutIsComplete)>>& OutWorkItems)
+{
+ struct MultipartUploadResponse
+ {
+ struct Part
+ {
+ uint64_t FirstByte;
+ uint64_t LastByte;
+ std::string PartId;
+ std::string QueryString;
+ };
+
+ std::string UploadId;
+ std::string BlobName;
+ std::vector<Part> Parts;
+
+ static MultipartUploadResponse Parse(CbObject& Payload)
+ {
+ MultipartUploadResponse Result;
+ Result.UploadId = Payload["uploadId"sv].AsString();
+ Result.BlobName = Payload["blobName"sv].AsString();
+ CbArrayView PartsArray = Payload["parts"sv].AsArrayView();
+ Result.Parts.reserve(PartsArray.Num());
+ for (CbFieldView PartView : PartsArray)
+ {
+ CbObjectView PartObject = PartView.AsObjectView();
+ Result.Parts.emplace_back(Part{
+ .FirstByte = PartObject["firstByte"sv].AsUInt64(),
+ .LastByte = PartObject["lastByte"sv].AsUInt64(),
+ .PartId = std::string(PartObject["partId"sv].AsString()),
+ .QueryString = std::string(PartObject["queryString"sv].AsString()),
+ });
+ }
+ return Result;
+ }
+ };
+
+ CbObjectWriter StartMultipartPayloadWriter;
+ StartMultipartPayloadWriter.AddInteger("blobLength"sv, PayloadSize);
+ CbObject StartMultipartPayload = StartMultipartPayloadWriter.Save();
+
+ std::string StartMultipartResponseRequestString = fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}/startMultipartUpload",
+ Namespace,
+ BucketId,
+ BuildId,
+ PartId,
+ Hash.ToHexString());
+ // ZEN_INFO("POST: {}", StartMultipartResponseRequestString);
+ HttpClient::Response StartMultipartResponse =
+ m_HttpClient.Post(StartMultipartResponseRequestString, StartMultipartPayload, HttpClient::Accept(ZenContentType::kCbObject));
+ if (!StartMultipartResponse.IsSuccess())
+ {
+ ZEN_WARN("{}", StartMultipartResponse.ErrorMessage("startMultipartUpload: "));
+ return detail::ConvertResponse(StartMultipartResponse, "JupiterSession::PutMultipartBuildBlob"sv);
+ }
+ CbObject ResponseObject = LoadCompactBinaryObject(StartMultipartResponse.ResponsePayload);
+
+ struct WorkloadData
+ {
+ MultipartUploadResponse PartDescription;
+ std::function<IoBuffer(uint64_t Offset, uint64_t Size)> Transmitter;
+ std::atomic<size_t> PartsLeft;
+ };
+
+ std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>());
+
+ Workload->PartDescription = MultipartUploadResponse::Parse(ResponseObject);
+ Workload->Transmitter = std::move(Transmitter);
+ Workload->PartsLeft = Workload->PartDescription.Parts.size();
+
+ for (size_t PartIndex = 0; PartIndex < Workload->PartDescription.Parts.size(); PartIndex++)
+ {
+ OutWorkItems.emplace_back([this, Namespace, BucketId, BuildId, PartId, Hash, ContentType, Workload, PartIndex](
+ bool& OutIsComplete) -> JupiterResult {
+ const MultipartUploadResponse::Part& Part = Workload->PartDescription.Parts[PartIndex];
+ IoBuffer PartPayload = Workload->Transmitter(Part.FirstByte, Part.LastByte - Part.FirstByte);
+ std::string MultipartUploadResponseRequestString = fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}/uploadMultipart{}",
+ Namespace,
+ BucketId,
+ BuildId,
+ PartId,
+ Hash.ToHexString(),
+ Part.QueryString);
+ // ZEN_INFO("PUT: {}", MultipartUploadResponseRequestString);
+ HttpClient::Response MultipartUploadResponse = m_HttpClient.Put(MultipartUploadResponseRequestString, PartPayload);
+ if (!MultipartUploadResponse.IsSuccess())
+ {
+ ZEN_WARN("{}", MultipartUploadResponse.ErrorMessage(MultipartUploadResponseRequestString));
+ }
+ OutIsComplete = Workload->PartsLeft.fetch_sub(1) == 1;
+ if (OutIsComplete)
+ {
+ int64_t TotalUploadedBytes = MultipartUploadResponse.UploadedBytes;
+ int64_t TotalDownloadedBytes = MultipartUploadResponse.DownloadedBytes;
+ double TotalElapsedSeconds = MultipartUploadResponse.ElapsedSeconds;
+ HttpClient::Response MultipartEndResponse = MultipartUploadResponse;
+ while (MultipartEndResponse.IsSuccess())
+ {
+ CbObjectWriter CompletePayloadWriter;
+ CompletePayloadWriter.AddString("blobName"sv, Workload->PartDescription.BlobName);
+ CompletePayloadWriter.AddString("uploadId"sv, Workload->PartDescription.UploadId);
+ CompletePayloadWriter.AddBool("isCompressed"sv, ContentType == ZenContentType::kCompressedBinary);
+ CompletePayloadWriter.BeginArray("partIds"sv);
+ std::unordered_map<std::string, size_t> PartNameToIndex;
+ for (size_t UploadPartIndex = 0; UploadPartIndex < Workload->PartDescription.Parts.size(); UploadPartIndex++)
+ {
+ const MultipartUploadResponse::Part& PartDescription = Workload->PartDescription.Parts[UploadPartIndex];
+ PartNameToIndex.insert({PartDescription.PartId, UploadPartIndex});
+ CompletePayloadWriter.AddString(PartDescription.PartId);
+ }
+ CompletePayloadWriter.EndArray(); // "partIds"
+ CbObject CompletePayload = CompletePayloadWriter.Save();
+
+ std::string MultipartEndResponseRequestString =
+ fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}/completeMultipart",
+ Namespace,
+ BucketId,
+ BuildId,
+ PartId,
+ Hash.ToHexString());
+
+ MultipartEndResponse = m_HttpClient.Post(MultipartEndResponseRequestString,
+ CompletePayload,
+ HttpClient::Accept(ZenContentType::kCbObject));
+ TotalUploadedBytes += MultipartEndResponse.UploadedBytes;
+ TotalDownloadedBytes += MultipartEndResponse.DownloadedBytes;
+ TotalElapsedSeconds += MultipartEndResponse.ElapsedSeconds;
+ if (MultipartEndResponse.IsSuccess())
+ {
+ CbObject ResponseObject = MultipartEndResponse.AsObject();
+ CbArrayView MissingPartsArrayView = ResponseObject["missingParts"sv].AsArrayView();
+ if (MissingPartsArrayView.Num() == 0)
+ {
+ break;
+ }
+ else
+ {
+ for (CbFieldView PartIdView : MissingPartsArrayView)
+ {
+ std::string RetryPartId(PartIdView.AsString());
+ size_t RetryPartIndex = PartNameToIndex.at(RetryPartId);
+ const MultipartUploadResponse::Part& RetryPart = Workload->PartDescription.Parts[RetryPartIndex];
+ IoBuffer RetryPartPayload =
+ Workload->Transmitter(RetryPart.FirstByte, RetryPart.LastByte - RetryPart.FirstByte);
+ std::string RetryMultipartUploadResponseRequestString =
+ fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}/uploadMultipart{}",
+ Namespace,
+ BucketId,
+ BuildId,
+ RetryPartId,
+ Hash.ToHexString(),
+ RetryPart.QueryString);
+
+ MultipartUploadResponse = m_HttpClient.Put(RetryMultipartUploadResponseRequestString, RetryPartPayload);
+ TotalUploadedBytes = MultipartUploadResponse.UploadedBytes;
+ TotalDownloadedBytes = MultipartUploadResponse.DownloadedBytes;
+ TotalElapsedSeconds = MultipartUploadResponse.ElapsedSeconds;
+ if (!MultipartUploadResponse.IsSuccess())
+ {
+ ZEN_WARN("{}", MultipartUploadResponse.ErrorMessage(RetryMultipartUploadResponseRequestString));
+ MultipartEndResponse = MultipartUploadResponse;
+ }
+ }
+ }
+ }
+ else
+ {
+ ZEN_WARN("{}", MultipartEndResponse.ErrorMessage(MultipartEndResponseRequestString));
+ }
+ }
+ MultipartEndResponse.UploadedBytes = TotalUploadedBytes;
+ MultipartEndResponse.DownloadedBytes = TotalDownloadedBytes;
+ MultipartEndResponse.ElapsedSeconds = TotalElapsedSeconds;
+ return detail::ConvertResponse(MultipartEndResponse, "JupiterSession::PutMultipartBuildBlob"sv);
+ }
+ return detail::ConvertResponse(MultipartUploadResponse, "JupiterSession::PutMultipartBuildBlob"sv);
+ });
+ }
+ return detail::ConvertResponse(StartMultipartResponse, "JupiterSession::PutMultipartBuildBlob"sv);
+}
+
+JupiterResult
+JupiterSession::GetMultipartBuildBlob(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ const IoHash& Hash,
+ uint64_t ChunkSize,
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver,
+ std::vector<std::function<JupiterResult()>>& OutWorkItems)
+{
+ std::string RequestUrl =
+ fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString());
+ HttpClient::Response Response =
+ m_HttpClient.Get(RequestUrl, HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", 0, ChunkSize - 1)}}));
+ if (Response.IsSuccess())
+ {
+ if (std::string_view ContentRange = Response.Header.Entries["Content-Range"]; !ContentRange.empty())
+ {
+ if (std::string_view::size_type SizeDelimiterPos = ContentRange.find('/'); SizeDelimiterPos != std::string_view::npos)
+ {
+ if (std::optional<uint64_t> TotalSizeMaybe = ParseInt<uint64_t>(ContentRange.substr(SizeDelimiterPos + 1));
+ TotalSizeMaybe.has_value())
+ {
+ uint64_t TotalSize = TotalSizeMaybe.value();
+ uint64_t PayloadSize = Response.ResponsePayload.GetSize();
+
+ Receiver(0, Response.ResponsePayload, TotalSize);
+
+ if (TotalSize > PayloadSize)
+ {
+ struct WorkloadData
+ {
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)> Receiver;
+ std::atomic<uint64_t> BytesRemaining;
+ };
+
+ std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>());
+ Workload->Receiver = std::move(Receiver);
+ Workload->BytesRemaining = TotalSize - PayloadSize;
+
+ uint64_t Offset = PayloadSize;
+ while (Offset < TotalSize)
+ {
+ uint64_t PartSize = Min(ChunkSize, TotalSize - Offset);
+ OutWorkItems.emplace_back(
+ [this, Namespace, BucketId, BuildId, PartId, Hash, TotalSize, Workload, Offset, PartSize]()
+ -> JupiterResult {
+ std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}",
+ Namespace,
+ BucketId,
+ BuildId,
+ PartId,
+ Hash.ToHexString());
+ HttpClient::Response Response = m_HttpClient.Get(
+ RequestUrl,
+ HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", Offset, Offset + PartSize - 1)}}));
+ if (Response.IsSuccess())
+ {
+ uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Response.ResponsePayload.GetSize());
+ Workload->Receiver(Offset, Response.ResponsePayload, ByteRemaning);
+ }
+ return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv);
+ });
+ Offset += PartSize;
+ }
+ }
+ return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv);
+ }
+ }
+ }
+ Receiver(0, Response.ResponsePayload, Response.ResponsePayload.GetSize());
+ }
+ return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv);
+}
+
+JupiterResult
JupiterSession::GetBuildBlob(std::string_view Namespace,
std::string_view BucketId,
const Oid& BuildId,
@@ -447,6 +724,7 @@ JupiterSession::GetBuildBlob(std::string_view Namespace,
HttpClient::Response Response = m_HttpClient.Download(
fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()),
TempFolderPath);
+
return detail::ConvertResponse(Response, "JupiterSession::GetBuildBlob"sv);
}
@@ -502,4 +780,19 @@ JupiterSession::FindBlocks(std::string_view Namespace, std::string_view BucketId
return detail::ConvertResponse(Response, "JupiterSession::FindBlocks"sv);
}
+JupiterResult
+JupiterSession::GetBlockMetadata(std::string_view Namespace,
+ std::string_view BucketId,
+ const Oid& BuildId,
+ const Oid& PartId,
+ IoBuffer Payload)
+{
+ ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject);
+ HttpClient::Response Response =
+ m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/getBlockMetadata", Namespace, BucketId, BuildId, PartId),
+ Payload,
+ HttpClient::Accept(ZenContentType::kCbObject));
+ return detail::ConvertResponse(Response, "JupiterSession::GetBlockMetadata"sv);
+}
+
} // namespace zen