// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START //#include //#include #include ZEN_THIRD_PARTY_INCLUDES_END using namespace std::literals; namespace zen { namespace detail { JupiterResult ConvertResponse(const HttpClient::Response& Response, const std::string_view ErrorPrefix = ""sv) { if (Response.Error) { return {.SentBytes = gsl::narrow(Response.UploadedBytes), .ReceivedBytes = gsl::narrow(Response.DownloadedBytes), .ElapsedSeconds = Response.ElapsedSeconds, .ErrorCode = Response.Error.value().ErrorCode, .Reason = Response.ErrorMessage(ErrorPrefix), .Success = false}; } if (!Response.IsSuccess()) { return {.SentBytes = gsl::narrow(Response.UploadedBytes), .ReceivedBytes = gsl::narrow(Response.DownloadedBytes), .ElapsedSeconds = Response.ElapsedSeconds, .ErrorCode = static_cast(Response.StatusCode), .Reason = Response.ErrorMessage(ErrorPrefix), .Success = false}; } return {.Response = Response.ResponsePayload, .SentBytes = gsl::narrow(Response.UploadedBytes), .ReceivedBytes = gsl::narrow(Response.DownloadedBytes), .ElapsedSeconds = Response.ElapsedSeconds, .ErrorCode = 0, .Success = true}; } } // namespace detail JupiterSession::JupiterSession(LoggerRef InLog, HttpClient& InHttpClient) : m_Log(InLog), m_HttpClient(InHttpClient) { } JupiterSession::~JupiterSession() { } JupiterResult JupiterSession::Authenticate() { bool OK = m_HttpClient.Authenticate(); return {.Success = OK}; } JupiterResult JupiterSession::GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType) { ZEN_TRACE_CPU("JupiterClient::GetRef"); HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), {HttpClient::Accept(RefType)}); return detail::ConvertResponse(Response, "JupiterSession::GetRef"sv); } JupiterResult JupiterSession::GetBlob(std::string_view Namespace, const IoHash& Key) { ZEN_TRACE_CPU("JupiterClient::GetBlob"); HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), {HttpClient::Accept(ZenContentType::kBinary)}); return detail::ConvertResponse(Response); } JupiterResult JupiterSession::GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath) { ZEN_TRACE_CPU("JupiterClient::GetCompressedBlob"); HttpClient::Response Response = m_HttpClient.Download(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), TempFolderPath, {HttpClient::Accept(ZenContentType::kCompressedBinary)}); return detail::ConvertResponse(Response); } JupiterResult JupiterSession::GetInlineBlob(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoHash& OutPayloadHash, std::filesystem::path TempFolderPath) { ZEN_TRACE_CPU("JupiterClient::GetInlineBlob"); HttpClient::Response Response = m_HttpClient.Download(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), TempFolderPath, {{"Accept", "application/x-jupiter-inline"}}); JupiterResult Result = detail::ConvertResponse(Response); if (auto It = Response.Header->find("X-Jupiter-InlinePayloadHash"); It != Response.Header->end()) { const std::string& PayloadHashHeader = It->second; if (PayloadHashHeader.length() == IoHash::StringLength) { OutPayloadHash = IoHash::FromHexString(PayloadHashHeader); } } return Result; } JupiterResult JupiterSession::GetObject(std::string_view Namespace, const IoHash& Key) { ZEN_TRACE_CPU("JupiterClient::GetObject"); HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), {HttpClient::Accept(ZenContentType::kCbObject)}); return detail::ConvertResponse(Response); } PutRefResult JupiterSession::PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType) { ZEN_TRACE_CPU("JupiterClient::PutRef"); Ref.SetContentType(RefType); IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size()); HttpClient::Response Response = m_HttpClient.Put(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), Ref, {{"X-Jupiter-IoHash", Hash.ToHexString()}}); PutRefResult Result = {detail::ConvertResponse(Response)}; if (Result.Success) { std::string JsonError; json11::Json Json = json11::Json::parse(Response.ToText(), JsonError); if (JsonError.empty()) { json11::Json::array Needs = Json["needs"].array_items(); for (const auto& Need : Needs) { Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); } } Result.RawHash = Hash; } return Result; } FinalizeRefResult JupiterSession::FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHash) { ZEN_TRACE_CPU("JupiterClient::FinalizeRef"); HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v1/refs/{}/{}/{}/finalize/{}", Namespace, BucketId, Key.ToHexString(), RefHash.ToHexString()), {{"X-Jupiter-IoHash", RefHash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}}); FinalizeRefResult Result = {detail::ConvertResponse(Response)}; if (Result.Success) { std::string JsonError; json11::Json Json = json11::Json::parse(std::string(Response.ToText()), JsonError); if (JsonError.empty()) { json11::Json::array Needs = Json["needs"].array_items(); for (const auto& Need : Needs) { Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); } } } return Result; } JupiterResult JupiterSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob) { ZEN_TRACE_CPU("JupiterClient::PutBlob"); HttpClient::Response Response = m_HttpClient.Put(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), Blob); return detail::ConvertResponse(Response); } JupiterResult JupiterSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob) { ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob"); Blob.SetContentType(ZenContentType::kCompressedBinary); HttpClient::Response Response = m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), Blob); return detail::ConvertResponse(Response); } JupiterResult JupiterSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Payload) { ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob"); HttpClient::Response Response = m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), Payload, ZenContentType::kCompressedBinary); return detail::ConvertResponse(Response); } JupiterResult JupiterSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object) { ZEN_TRACE_CPU("JupiterClient::PutObject"); Object.SetContentType(ZenContentType::kCbObject); HttpClient::Response Response = m_HttpClient.Upload(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), Object); return detail::ConvertResponse(Response); } JupiterResult JupiterSession::RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key) { ZEN_TRACE_CPU("JupiterClient::RefExists"); HttpClient::Response Response = m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString())); return detail::ConvertResponse(Response); } GetObjectReferencesResult JupiterSession::GetObjectReferences(std::string_view Namespace, const IoHash& Key) { ZEN_TRACE_CPU("JupiterClient::GetObjectReferences"); HttpClient::Response Response = m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/references", Namespace, Key.ToHexString()), {HttpClient::Accept(ZenContentType::kCbObject)}); GetObjectReferencesResult Result = {detail::ConvertResponse(Response)}; if (Result.Success) { const CbObject ReferencesResponse = Response.AsObject(); for (auto& Item : ReferencesResponse["references"sv]) { Result.References.insert(Item.AsHash()); } } return Result; } JupiterResult JupiterSession::BlobExists(std::string_view Namespace, const IoHash& Key) { return CacheTypeExists(Namespace, "blobs"sv, Key); } JupiterResult JupiterSession::CompressedBlobExists(std::string_view Namespace, const IoHash& Key) { return CacheTypeExists(Namespace, "compressed-blobs"sv, Key); } JupiterResult JupiterSession::ObjectExists(std::string_view Namespace, const IoHash& Key) { return CacheTypeExists(Namespace, "objects"sv, Key); } JupiterExistsResult JupiterSession::BlobExists(std::string_view Namespace, const std::set& Keys) { return CacheTypeExists(Namespace, "blobs"sv, Keys); } JupiterExistsResult JupiterSession::CompressedBlobExists(std::string_view Namespace, const std::set& Keys) { return CacheTypeExists(Namespace, "compressed-blobs"sv, Keys); } JupiterExistsResult JupiterSession::ObjectExists(std::string_view Namespace, const std::set& Keys) { return CacheTypeExists(Namespace, "objects"sv, Keys); } std::vector JupiterSession::Filter(std::string_view Namespace, std::string_view BucketId, const std::vector& ChunkHashes) { // ExtendableStringBuilder<256> Uri; // Uri << m_CacheClient->ServiceUrl(); // Uri << "/api/v1/s/" << Namespace; ZEN_UNUSED(Namespace, BucketId, ChunkHashes); return {}; } JupiterResult JupiterSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key) { ZEN_TRACE_CPU("JupiterClient::CacheTypeExists"); HttpClient::Response Response = m_HttpClient.Head(fmt::format("/api/v1/{}/{}/{}", TypeId, Namespace, Key.ToHexString())); return detail::ConvertResponse(Response); } JupiterExistsResult JupiterSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set& Keys) { ZEN_TRACE_CPU("JupiterClient::CacheTypeExists"); ExtendableStringBuilder<256> Body; Body << "["; for (const auto& Key : Keys) { Body << (Body.Size() != 1 ? ",\"" : "\"") << Key.ToHexString() << "\""; } Body << "]"; IoBuffer Payload = IoBuffer(IoBuffer::Wrap, Body.Data(), Body.Size()); Payload.SetContentType(ZenContentType::kJSON); HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v1/{}/{}/exist", TypeId, Namespace), Payload, {HttpClient::Accept(ZenContentType::kCbObject)}); JupiterExistsResult Result = {detail::ConvertResponse(Response)}; if (Result.Success) { const CbObject ExistsResponse = Response.AsObject(); for (auto& Item : ExistsResponse["needs"sv]) { Result.Needs.insert(Item.AsHash()); } } return Result; } JupiterResult JupiterSession::ListBuilds(std::string_view Namespace, std::string_view BucketId, const IoBuffer& Payload) { ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/search", Namespace, BucketId), Payload, {HttpClient::Accept(ZenContentType::kCbObject)}); return detail::ConvertResponse(Response, "JupiterSession::ListBuilds"sv); } JupiterResult JupiterSession::PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload) { ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); HttpClient::Response Response = m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId), Payload); return detail::ConvertResponse(Response, "JupiterSession::PutBuild"sv); } JupiterResult JupiterSession::GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId) { HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId), HttpClient::Accept(ZenContentType::kCbObject)); return detail::ConvertResponse(Response, "JupiterSession::GetBuild"sv); } JupiterResult JupiterSession::FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId) { HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/finalize", Namespace, BucketId, BuildId)); return detail::ConvertResponse(Response, "JupiterSession::FinalizeBuild"sv); } PutBuildPartResult JupiterSession::PutBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId, std::string_view PartName, const IoBuffer& Payload) { ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); IoHash Hash = IoHash::HashBuffer(Payload.Data(), Payload.Size()); HttpClient::Response Response = m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/{}", Namespace, BucketId, BuildId, PartId, PartName), Payload, {{"X-Jupiter-IoHash", Hash.ToHexString()}}); PutBuildPartResult Result = {detail::ConvertResponse(Response, "JupiterSession::PutBuildPart"sv)}; if (Result.Success) { std::string JsonError; json11::Json Json = json11::Json::parse(Response.ToText(), JsonError); if (JsonError.empty()) { json11::Json::array Needs = Json["needs"].array_items(); for (const auto& Need : Needs) { Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); } } Result.RawHash = Hash; } return Result; } JupiterResult JupiterSession::GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId) { HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}", Namespace, BucketId, BuildId, PartId), HttpClient::Accept(ZenContentType::kCbObject)); return detail::ConvertResponse(Response, "JupiterSession::GetBuildPart"sv); } JupiterResult JupiterSession::PutBuildBlob(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoHash& Hash, ZenContentType ContentType, const CompositeBuffer& Payload) { HttpClient::Response Response = m_HttpClient.Upload(fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString()), Payload, ContentType); return detail::ConvertResponse(Response, "JupiterSession::PutBuildBlob"sv); } JupiterResult JupiterSession::PutMultipartBuildBlob(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoHash& Hash, ZenContentType ContentType, uint64_t PayloadSize, std::function&& Transmitter, std::vector>& OutWorkItems) { struct MultipartUploadResponse { struct Part { uint64_t FirstByte; uint64_t LastByte; std::string PartId; std::string QueryString; }; std::string UploadId; std::string BlobName; std::vector Parts; static MultipartUploadResponse Parse(CbObject& Payload) { MultipartUploadResponse Result; Result.UploadId = Payload["uploadId"sv].AsString(); Result.BlobName = Payload["blobName"sv].AsString(); CbArrayView PartsArray = Payload["parts"sv].AsArrayView(); Result.Parts.reserve(PartsArray.Num()); for (CbFieldView PartView : PartsArray) { CbObjectView PartObject = PartView.AsObjectView(); Result.Parts.emplace_back(Part{ .FirstByte = PartObject["firstByte"sv].AsUInt64(), .LastByte = PartObject["lastByte"sv].AsUInt64(), .PartId = std::string(PartObject["partId"sv].AsString()), .QueryString = std::string(PartObject["queryString"sv].AsString()), }); } return Result; } }; CbObjectWriter StartMultipartPayloadWriter; StartMultipartPayloadWriter.AddInteger("blobLength"sv, PayloadSize); CbObject StartMultipartPayload = StartMultipartPayloadWriter.Save(); std::string StartMultipartResponseRequestString = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/startMultipartUpload", Namespace, BucketId, BuildId, Hash.ToHexString()); // ZEN_INFO("POST: {}", StartMultipartResponseRequestString); HttpClient::Response StartMultipartResponse = m_HttpClient.Post(StartMultipartResponseRequestString, StartMultipartPayload, HttpClient::Accept(ZenContentType::kCbObject)); if (!StartMultipartResponse.IsSuccess()) { ZEN_WARN("{}", StartMultipartResponse.ErrorMessage("startMultipartUpload: ")); return detail::ConvertResponse(StartMultipartResponse, "JupiterSession::PutMultipartBuildBlob"sv); } CbObject ResponseObject = LoadCompactBinaryObject(StartMultipartResponse.ResponsePayload); struct WorkloadData { MultipartUploadResponse PartDescription; std::function Transmitter; std::atomic PartsLeft; }; std::shared_ptr Workload(std::make_shared()); Workload->PartDescription = MultipartUploadResponse::Parse(ResponseObject); Workload->Transmitter = std::move(Transmitter); Workload->PartsLeft = Workload->PartDescription.Parts.size(); for (size_t PartIndex = 0; PartIndex < Workload->PartDescription.Parts.size(); PartIndex++) { OutWorkItems.emplace_back([this, Namespace, BucketId, BuildId, Hash, ContentType, Workload, PartIndex]( bool& OutIsComplete) -> JupiterResult { const MultipartUploadResponse::Part& Part = Workload->PartDescription.Parts[PartIndex]; IoBuffer PartPayload = Workload->Transmitter(Part.FirstByte, Part.LastByte - Part.FirstByte); std::string MultipartUploadResponseRequestString = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}", Namespace, BucketId, BuildId, Hash.ToHexString(), Part.QueryString); // ZEN_INFO("PUT: {}", MultipartUploadResponseRequestString); HttpClient::Response MultipartUploadResponse = m_HttpClient.Put(MultipartUploadResponseRequestString, PartPayload); if (!MultipartUploadResponse.IsSuccess()) { ZEN_WARN("{}", MultipartUploadResponse.ErrorMessage(MultipartUploadResponseRequestString)); } OutIsComplete = Workload->PartsLeft.fetch_sub(1) == 1; if (OutIsComplete) { int64_t TotalUploadedBytes = MultipartUploadResponse.UploadedBytes; int64_t TotalDownloadedBytes = MultipartUploadResponse.DownloadedBytes; double TotalElapsedSeconds = MultipartUploadResponse.ElapsedSeconds; HttpClient::Response MultipartEndResponse = MultipartUploadResponse; while (MultipartEndResponse.IsSuccess()) { CbObjectWriter CompletePayloadWriter; CompletePayloadWriter.AddString("blobName"sv, Workload->PartDescription.BlobName); CompletePayloadWriter.AddString("uploadId"sv, Workload->PartDescription.UploadId); CompletePayloadWriter.AddBool("isCompressed"sv, ContentType == ZenContentType::kCompressedBinary); CompletePayloadWriter.BeginArray("partIds"sv); std::unordered_map PartNameToIndex; for (size_t UploadPartIndex = 0; UploadPartIndex < Workload->PartDescription.Parts.size(); UploadPartIndex++) { const MultipartUploadResponse::Part& PartDescription = Workload->PartDescription.Parts[UploadPartIndex]; PartNameToIndex.insert({PartDescription.PartId, UploadPartIndex}); CompletePayloadWriter.AddString(PartDescription.PartId); } CompletePayloadWriter.EndArray(); // "partIds" CbObject CompletePayload = CompletePayloadWriter.Save(); std::string MultipartEndResponseRequestString = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/completeMultipart", Namespace, BucketId, BuildId, Hash.ToHexString()); MultipartEndResponse = m_HttpClient.Post(MultipartEndResponseRequestString, CompletePayload, HttpClient::Accept(ZenContentType::kCbObject)); TotalUploadedBytes += MultipartEndResponse.UploadedBytes; TotalDownloadedBytes += MultipartEndResponse.DownloadedBytes; TotalElapsedSeconds += MultipartEndResponse.ElapsedSeconds; if (MultipartEndResponse.IsSuccess()) { CbObject ResponseObject = MultipartEndResponse.AsObject(); CbArrayView MissingPartsArrayView = ResponseObject["missingParts"sv].AsArrayView(); if (MissingPartsArrayView.Num() == 0) { break; } else { for (CbFieldView PartIdView : MissingPartsArrayView) { std::string RetryPartId(PartIdView.AsString()); size_t RetryPartIndex = PartNameToIndex.at(RetryPartId); const MultipartUploadResponse::Part& RetryPart = Workload->PartDescription.Parts[RetryPartIndex]; IoBuffer RetryPartPayload = Workload->Transmitter(RetryPart.FirstByte, RetryPart.LastByte - RetryPart.FirstByte - 1); std::string RetryMultipartUploadResponseRequestString = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}", Namespace, BucketId, BuildId, Hash.ToHexString(), RetryPart.QueryString); MultipartUploadResponse = m_HttpClient.Put(RetryMultipartUploadResponseRequestString, RetryPartPayload); TotalUploadedBytes = MultipartUploadResponse.UploadedBytes; TotalDownloadedBytes = MultipartUploadResponse.DownloadedBytes; TotalElapsedSeconds = MultipartUploadResponse.ElapsedSeconds; if (!MultipartUploadResponse.IsSuccess()) { ZEN_WARN("{}", MultipartUploadResponse.ErrorMessage(RetryMultipartUploadResponseRequestString)); MultipartEndResponse = MultipartUploadResponse; } } } } else { ZEN_WARN("{}", MultipartEndResponse.ErrorMessage(MultipartEndResponseRequestString)); } } MultipartEndResponse.UploadedBytes = TotalUploadedBytes; MultipartEndResponse.DownloadedBytes = TotalDownloadedBytes; MultipartEndResponse.ElapsedSeconds = TotalElapsedSeconds; return detail::ConvertResponse(MultipartEndResponse, "JupiterSession::PutMultipartBuildBlob"sv); } return detail::ConvertResponse(MultipartUploadResponse, "JupiterSession::PutMultipartBuildBlob"sv); }); } return detail::ConvertResponse(StartMultipartResponse, "JupiterSession::PutMultipartBuildBlob"sv); } JupiterResult JupiterSession::GetMultipartBuildBlob(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoHash& Hash, uint64_t ChunkSize, std::function&& OnReceive, std::function&& OnComplete, std::vector>& OutWorkItems) { std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString()); HttpClient::Response Response = m_HttpClient.Get(RequestUrl, HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", 0, ChunkSize - 1)}})); if (Response.IsSuccess()) { if (std::string_view ContentRange = Response.Header.Entries["Content-Range"]; !ContentRange.empty()) { if (std::string_view::size_type SizeDelimiterPos = ContentRange.find('/'); SizeDelimiterPos != std::string_view::npos) { if (std::optional TotalSizeMaybe = ParseInt(ContentRange.substr(SizeDelimiterPos + 1)); TotalSizeMaybe.has_value()) { uint64_t TotalSize = TotalSizeMaybe.value(); uint64_t PayloadSize = Response.ResponsePayload.GetSize(); OnReceive(0, Response.ResponsePayload); if (TotalSize > PayloadSize) { struct WorkloadData { std::function OnReceive; std::function OnComplete; std::atomic BytesRemaining; }; std::shared_ptr Workload(std::make_shared()); Workload->OnReceive = std::move(OnReceive); Workload->OnComplete = std::move(OnComplete); Workload->BytesRemaining = TotalSize - PayloadSize; uint64_t Offset = PayloadSize; while (Offset < TotalSize) { uint64_t PartSize = Min(ChunkSize, TotalSize - Offset); OutWorkItems.emplace_back( [this, Namespace, BucketId, BuildId, Hash, TotalSize, Workload, Offset, PartSize]() -> JupiterResult { std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString()); HttpClient::Response Response = m_HttpClient.Get( RequestUrl, HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", Offset, Offset + PartSize - 1)}})); if (Response.IsSuccess()) { Workload->OnReceive(Offset, Response.ResponsePayload); uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Response.ResponsePayload.GetSize()); if (ByteRemaning == Response.ResponsePayload.GetSize()) { Workload->OnComplete(); } } return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv); }); Offset += PartSize; } } else { OnComplete(); } return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv); } } } OnReceive(0, Response.ResponsePayload); OnComplete(); } return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv); } JupiterResult JupiterSession::GetBuildBlob(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoHash& Hash, std::filesystem::path TempFolderPath, uint64_t Offset, uint64_t Size) { HttpClient::KeyValueMap Headers; if (Offset != 0 || Size != (uint64_t)-1) { Headers.Entries.insert({"Range", fmt::format("bytes={}-{}", Offset, Offset + Size - 1)}); } HttpClient::Response Response = m_HttpClient.Download(fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString()), TempFolderPath, Headers); return detail::ConvertResponse(Response, "JupiterSession::GetBuildBlob"sv); } JupiterResult JupiterSession::PutBlockMetadata(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoHash& Hash, const IoBuffer& Payload) { ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); HttpClient::Response Response = m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/blocks/{}/metadata", Namespace, BucketId, BuildId, Hash.ToHexString()), Payload); return detail::ConvertResponse(Response, "JupiterSession::PutBlockMetadata"sv); } FinalizeBuildPartResult JupiterSession::FinalizeBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId, const IoHash& RawHash) { HttpClient::Response Response = m_HttpClient.Post( fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/finalize/{}", Namespace, BucketId, BuildId, PartId, RawHash.ToHexString()), HttpClient::Accept(ZenContentType::kCbObject)); FinalizeBuildPartResult Result = {detail::ConvertResponse(Response, "JupiterSession::FinalizeBuildPart"sv)}; if (Result.Success) { std::string JsonError; json11::Json Json = json11::Json::parse(Response.ToText(), JsonError); if (JsonError.empty()) { json11::Json::array Needs = Json["needs"].array_items(); for (const auto& Need : Needs) { Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); } } } return Result; } JupiterResult JupiterSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, uint64_t MaxBlockCount) { const std::string Parameters = MaxBlockCount == (uint64_t)-1 ? "" : fmt::format("?count={}", MaxBlockCount); HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/blocks/listBlocks{}", Namespace, BucketId, BuildId, Parameters), HttpClient::Accept(ZenContentType::kCbObject)); return detail::ConvertResponse(Response, "JupiterSession::FindBlocks"sv); } JupiterResult JupiterSession::GetBlockMetadata(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, IoBuffer Payload) { ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/blocks/getBlockMetadata", Namespace, BucketId, BuildId), Payload, HttpClient::Accept(ZenContentType::kCbObject)); return detail::ConvertResponse(Response, "JupiterSession::GetBlockMetadata"sv); } JupiterResult JupiterSession::PutBuildPartStats(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& BuildPartId, IoBuffer Payload) { ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); HttpClient::Response Response = m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/stats", Namespace, BucketId, BuildId, BuildPartId), Payload, HttpClient::Accept(ZenContentType::kCbObject)); return detail::ConvertResponse(Response, "JupiterSession::PutBuildPartStats"sv); } } // namespace zen