diff options
Diffstat (limited to 'src/zenutil/jupiter')
| -rw-r--r-- | src/zenutil/jupiter/jupiterclient.cpp | 28 | ||||
| -rw-r--r-- | src/zenutil/jupiter/jupiterhost.cpp | 66 | ||||
| -rw-r--r-- | src/zenutil/jupiter/jupitersession.cpp | 871 |
3 files changed, 0 insertions, 965 deletions
diff --git a/src/zenutil/jupiter/jupiterclient.cpp b/src/zenutil/jupiter/jupiterclient.cpp deleted file mode 100644 index dbac218a4..000000000 --- a/src/zenutil/jupiter/jupiterclient.cpp +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include <zenutil/jupiter/jupiterclient.h> - -namespace zen { - -using namespace std::literals; - -JupiterClient::JupiterClient(const JupiterClientOptions& Options, std::function<HttpClientAccessToken()>&& TokenProvider) -: m_Log(zen::logging::Get("jupiter"sv)) -, m_DefaultDdcNamespace(Options.DdcNamespace) -, m_DefaultBlobStoreNamespace(Options.BlobStoreNamespace) -, m_ComputeCluster(Options.ComputeCluster) -, m_HttpClient(Options.ServiceUrl, - HttpClientSettings{.ConnectTimeout = Options.ConnectTimeout, - .Timeout = Options.Timeout, - .AccessTokenProvider = std::move(TokenProvider), - .AssumeHttp2 = Options.AssumeHttp2, - .AllowResume = Options.AllowResume, - .RetryCount = Options.RetryCount}) -{ -} - -JupiterClient::~JupiterClient() -{ -} - -} // namespace zen diff --git a/src/zenutil/jupiter/jupiterhost.cpp b/src/zenutil/jupiter/jupiterhost.cpp deleted file mode 100644 index d06229cbf..000000000 --- a/src/zenutil/jupiter/jupiterhost.cpp +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include <zenutil/jupiter/jupiterhost.h> - -#include <zencore/compactbinary.h> -#include <zencore/fmtutils.h> -#include <zenhttp/httpclient.h> - -namespace zen { - -JupiterServerDiscovery -DiscoverJupiterEndpoints(std::string_view Host, const HttpClientSettings& ClientSettings) -{ - JupiterServerDiscovery Result; - - HttpClient DiscoveryHttpClient(Host, ClientSettings); - HttpClient::Response ServerInfoResponse = DiscoveryHttpClient.Get("/api/v1/status/servers", HttpClient::Accept(HttpContentType::kJSON)); - if (!ServerInfoResponse.IsSuccess()) - { - ServerInfoResponse.ThrowError(fmt::format("Failed to get list of servers from discovery url '{}'", Host)); - } - std::string_view JsonResponse = ServerInfoResponse.AsText(); - CbObject CbPayload = LoadCompactBinaryFromJson(JsonResponse).AsObject(); - CbArrayView ServerEndpoints = CbPayload["serverEndpoints"].AsArrayView(); - Result.ServerEndPoints.reserve(ServerEndpoints.Num()); - - auto ParseEndPoints = [](CbArrayView ServerEndpoints) -> std::vector<JupiterServerDiscovery::EndPoint> { - std::vector<JupiterServerDiscovery::EndPoint> Result; - - Result.reserve(ServerEndpoints.Num()); - for (CbFieldView ServerEndpointView : ServerEndpoints) - { - CbObjectView ServerEndPoint = ServerEndpointView.AsObjectView(); - Result.push_back(JupiterServerDiscovery::EndPoint{.Name = std::string(ServerEndPoint["name"].AsString()), - .BaseUrl = std::string(ServerEndPoint["baseUrl"].AsString()), - .AssumeHttp2 = ServerEndPoint["baseUrl"].AsBool(false)}); - } - return Result; - }; - - Result.ServerEndPoints = ParseEndPoints(CbPayload["serverEndpoints"].AsArrayView()); - Result.CacheEndPoints = ParseEndPoints(CbPayload["cacheEndpoints"].AsArrayView()); - - return Result; -} - -JupiterEndpointTestResult -TestJupiterEndpoint(std::string_view BaseUrl, const bool AssumeHttp2) -{ - HttpClientSettings TestClientSettings{.LogCategory = "httpbuildsclient", - .ConnectTimeout = std::chrono::milliseconds{1000}, - .Timeout = std::chrono::milliseconds{2000}, - .AssumeHttp2 = AssumeHttp2, - .AllowResume = true, - .RetryCount = 0}; - - HttpClient TestHttpClient(BaseUrl, TestClientSettings); - HttpClient::Response TestResponse = TestHttpClient.Get("/health/live"); - if (TestResponse.IsSuccess()) - { - return {.Success = true}; - } - return {.Success = false, .FailureReason = TestResponse.ErrorMessage("")}; -} - -} // namespace zen diff --git a/src/zenutil/jupiter/jupitersession.cpp b/src/zenutil/jupiter/jupitersession.cpp deleted file mode 100644 index 4b594239e..000000000 --- a/src/zenutil/jupiter/jupitersession.cpp +++ /dev/null @@ -1,871 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include <zenutil/jupiter/jupitersession.h> - -#include <zencore/compactbinary.h> -#include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinaryutil.h> -#include <zencore/compositebuffer.h> -#include <zencore/compress.h> -#include <zencore/fmtutils.h> -#include <zencore/trace.h> - -ZEN_THIRD_PARTY_INCLUDES_START -#include <json11.hpp> -ZEN_THIRD_PARTY_INCLUDES_END - -using namespace std::literals; - -namespace zen { - -namespace detail { - JupiterResult ConvertResponse(const HttpClient::Response& Response, const std::string_view ErrorPrefix = ""sv) - { - if (Response.Error) - { - return {.SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes), - .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes), - .ElapsedSeconds = Response.ElapsedSeconds, - .ErrorCode = Response.Error.value().ErrorCode, - .Reason = Response.ErrorMessage(ErrorPrefix), - .Success = false}; - } - if (!Response.IsSuccess()) - { - return {.SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes), - .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes), - .ElapsedSeconds = Response.ElapsedSeconds, - .ErrorCode = static_cast<int32_t>(Response.StatusCode), - .Reason = Response.ErrorMessage(ErrorPrefix), - .Success = false}; - } - return {.Response = Response.ResponsePayload, - .SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes), - .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes), - .ElapsedSeconds = Response.ElapsedSeconds, - .ErrorCode = 0, - .Success = true}; - } -} // namespace detail - -JupiterSession::JupiterSession(LoggerRef InLog, HttpClient& InHttpClient, bool AllowRedirect) -: m_Log(InLog) -, m_HttpClient(InHttpClient) -, m_AllowRedirect(AllowRedirect) -{ -} - -JupiterSession::~JupiterSession() -{ -} - -JupiterResult -JupiterSession::Authenticate() -{ - bool OK = m_HttpClient.Authenticate(); - return {.Success = OK}; -} - -JupiterResult -JupiterSession::GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType) -{ - ZEN_TRACE_CPU("JupiterClient::GetRef"); - - HttpClient::Response Response = - m_HttpClient.Get(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), {HttpClient::Accept(RefType)}); - - return detail::ConvertResponse(Response, "JupiterSession::GetRef"sv); -} - -JupiterResult -JupiterSession::GetBlob(std::string_view Namespace, const IoHash& Key) -{ - ZEN_TRACE_CPU("JupiterClient::GetBlob"); - HttpClient::Response Response = - m_HttpClient.Get(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), {HttpClient::Accept(ZenContentType::kBinary)}); - - return detail::ConvertResponse(Response); -} - -JupiterResult -JupiterSession::GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath) -{ - ZEN_TRACE_CPU("JupiterClient::GetCompressedBlob"); - - HttpClient::Response Response = m_HttpClient.Download(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), - TempFolderPath, - {HttpClient::Accept(ZenContentType::kCompressedBinary)}); - - return detail::ConvertResponse(Response); -} - -JupiterResult -JupiterSession::GetInlineBlob(std::string_view Namespace, - std::string_view BucketId, - const IoHash& Key, - IoHash& OutPayloadHash, - std::filesystem::path TempFolderPath) -{ - ZEN_TRACE_CPU("JupiterClient::GetInlineBlob"); - - HttpClient::Response Response = m_HttpClient.Download(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), - TempFolderPath, - {{"Accept", "application/x-jupiter-inline"}}); - - JupiterResult Result = detail::ConvertResponse(Response); - - if (auto It = Response.Header->find("X-Jupiter-InlinePayloadHash"); It != Response.Header->end()) - { - const std::string& PayloadHashHeader = It->second; - if (PayloadHashHeader.length() == IoHash::StringLength) - { - OutPayloadHash = IoHash::FromHexString(PayloadHashHeader); - } - } - - return Result; -} - -JupiterResult -JupiterSession::GetObject(std::string_view Namespace, const IoHash& Key) -{ - ZEN_TRACE_CPU("JupiterClient::GetObject"); - - HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), - {HttpClient::Accept(ZenContentType::kCbObject)}); - - return detail::ConvertResponse(Response); -} - -PutRefResult -JupiterSession::PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType) -{ - ZEN_TRACE_CPU("JupiterClient::PutRef"); - - Ref.SetContentType(RefType); - - IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size()); - - HttpClient::Response Response = m_HttpClient.Put(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), - Ref, - {{"X-Jupiter-IoHash", Hash.ToHexString()}}); - - PutRefResult Result = {detail::ConvertResponse(Response)}; - if (Result.Success) - { - std::string JsonError; - json11::Json Json = json11::Json::parse(Response.ToText(), JsonError); - if (JsonError.empty()) - { - json11::Json::array Needs = Json["needs"].array_items(); - for (const auto& Need : Needs) - { - Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); - } - } - Result.RawHash = Hash; - } - return Result; -} - -FinalizeRefResult -JupiterSession::FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHash) -{ - ZEN_TRACE_CPU("JupiterClient::FinalizeRef"); - - HttpClient::Response Response = - m_HttpClient.Post(fmt::format("/api/v1/refs/{}/{}/{}/finalize/{}", Namespace, BucketId, Key.ToHexString(), RefHash.ToHexString()), - {{"X-Jupiter-IoHash", RefHash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}}); - - FinalizeRefResult Result = {detail::ConvertResponse(Response)}; - - if (Result.Success) - { - std::string JsonError; - json11::Json Json = json11::Json::parse(std::string(Response.ToText()), JsonError); - if (JsonError.empty()) - { - json11::Json::array Needs = Json["needs"].array_items(); - for (const auto& Need : Needs) - { - Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); - } - } - } - return Result; -} - -JupiterResult -JupiterSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob) -{ - ZEN_TRACE_CPU("JupiterClient::PutBlob"); - - HttpClient::Response Response = m_HttpClient.Put(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), Blob); - - return detail::ConvertResponse(Response); -} - -JupiterResult -JupiterSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob) -{ - ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob"); - - Blob.SetContentType(ZenContentType::kCompressedBinary); - HttpClient::Response Response = m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), Blob); - - return detail::ConvertResponse(Response); -} - -JupiterResult -JupiterSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Payload) -{ - ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob"); - - HttpClient::Response Response = m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), - Payload, - ZenContentType::kCompressedBinary); - - return detail::ConvertResponse(Response); -} - -JupiterResult -JupiterSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object) -{ - ZEN_TRACE_CPU("JupiterClient::PutObject"); - - Object.SetContentType(ZenContentType::kCbObject); - HttpClient::Response Response = m_HttpClient.Upload(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), Object); - - return detail::ConvertResponse(Response); -} - -JupiterResult -JupiterSession::RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key) -{ - ZEN_TRACE_CPU("JupiterClient::RefExists"); - - HttpClient::Response Response = m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString())); - - return detail::ConvertResponse(Response); -} - -GetObjectReferencesResult -JupiterSession::GetObjectReferences(std::string_view Namespace, const IoHash& Key) -{ - ZEN_TRACE_CPU("JupiterClient::GetObjectReferences"); - - HttpClient::Response Response = m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/references", Namespace, Key.ToHexString()), - {HttpClient::Accept(ZenContentType::kCbObject)}); - - GetObjectReferencesResult Result = {detail::ConvertResponse(Response)}; - - if (Result.Success) - { - const CbObject ReferencesResponse = Response.AsObject(); - for (auto& Item : ReferencesResponse["references"sv]) - { - Result.References.insert(Item.AsHash()); - } - } - return Result; -} - -JupiterResult -JupiterSession::BlobExists(std::string_view Namespace, const IoHash& Key) -{ - return CacheTypeExists(Namespace, "blobs"sv, Key); -} - -JupiterResult -JupiterSession::CompressedBlobExists(std::string_view Namespace, const IoHash& Key) -{ - return CacheTypeExists(Namespace, "compressed-blobs"sv, Key); -} - -JupiterResult -JupiterSession::ObjectExists(std::string_view Namespace, const IoHash& Key) -{ - return CacheTypeExists(Namespace, "objects"sv, Key); -} - -JupiterExistsResult -JupiterSession::BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys) -{ - return CacheTypeExists(Namespace, "blobs"sv, Keys); -} - -JupiterExistsResult -JupiterSession::CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys) -{ - return CacheTypeExists(Namespace, "compressed-blobs"sv, Keys); -} - -JupiterExistsResult -JupiterSession::ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys) -{ - return CacheTypeExists(Namespace, "objects"sv, Keys); -} - -std::vector<IoHash> -JupiterSession::Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes) -{ - // ExtendableStringBuilder<256> Uri; - // Uri << m_CacheClient->ServiceUrl(); - // Uri << "/api/v1/s/" << Namespace; - - ZEN_UNUSED(Namespace, BucketId, ChunkHashes); - - return {}; -} - -JupiterResult -JupiterSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key) -{ - ZEN_TRACE_CPU("JupiterClient::CacheTypeExists"); - - HttpClient::Response Response = m_HttpClient.Head(fmt::format("/api/v1/{}/{}/{}", TypeId, Namespace, Key.ToHexString())); - - return detail::ConvertResponse(Response); -} - -JupiterExistsResult -JupiterSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys) -{ - ZEN_TRACE_CPU("JupiterClient::CacheTypeExists"); - - ExtendableStringBuilder<256> Body; - Body << "["; - for (const auto& Key : Keys) - { - Body << (Body.Size() != 1 ? ",\"" : "\"") << Key.ToHexString() << "\""; - } - Body << "]"; - IoBuffer Payload = IoBuffer(IoBuffer::Wrap, Body.Data(), Body.Size()); - Payload.SetContentType(ZenContentType::kJSON); - - HttpClient::Response Response = - m_HttpClient.Post(fmt::format("/api/v1/{}/{}/exist", TypeId, Namespace), Payload, {HttpClient::Accept(ZenContentType::kCbObject)}); - - JupiterExistsResult Result = {detail::ConvertResponse(Response)}; - - if (Result.Success) - { - const CbObject ExistsResponse = Response.AsObject(); - for (auto& Item : ExistsResponse["needs"sv]) - { - Result.Needs.insert(Item.AsHash()); - } - } - return Result; -} - -JupiterResult -JupiterSession::ListBuildNamespaces() -{ - HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds"), {HttpClient::Accept(ZenContentType::kJSON)}); - return detail::ConvertResponse(Response, "JupiterSession::ListBuildNamespaces"sv); -} - -JupiterResult -JupiterSession::ListBuildBuckets(std::string_view Namespace) -{ - HttpClient::Response Response = - m_HttpClient.Get(fmt::format("/api/v2/builds/{}", Namespace), {HttpClient::Accept(ZenContentType::kJSON)}); - return detail::ConvertResponse(Response, "JupiterSession::ListBuildBuckets"sv); -} - -JupiterResult -JupiterSession::ListBuilds(std::string_view Namespace, std::string_view BucketId, const IoBuffer& Payload) -{ - ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); - std::string OptionalBucketPath = BucketId.empty() ? "" : fmt::format("/{}", BucketId); - HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v2/builds/{}{}/search", Namespace, OptionalBucketPath), - Payload, - {HttpClient::Accept(ZenContentType::kCbObject)}); - return detail::ConvertResponse(Response, "JupiterSession::ListBuilds"sv); -} - -JupiterResult -JupiterSession::PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload) -{ - ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); - HttpClient::Response Response = m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId), Payload); - return detail::ConvertResponse(Response, "JupiterSession::PutBuild"sv); -} - -JupiterResult -JupiterSession::GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId) -{ - HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId), - HttpClient::Accept(ZenContentType::kCbObject)); - return detail::ConvertResponse(Response, "JupiterSession::GetBuild"sv); -} - -JupiterResult -JupiterSession::FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId) -{ - HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/finalize", Namespace, BucketId, BuildId)); - return detail::ConvertResponse(Response, "JupiterSession::FinalizeBuild"sv); -} - -PutBuildPartResult -JupiterSession::PutBuildPart(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const Oid& PartId, - std::string_view PartName, - const IoBuffer& Payload) -{ - ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); - - IoHash Hash = IoHash::HashBuffer(Payload.Data(), Payload.Size()); - - HttpClient::Response Response = - m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/{}", Namespace, BucketId, BuildId, PartId, PartName), - Payload, - {{"X-Jupiter-IoHash", Hash.ToHexString()}}); - - PutBuildPartResult Result = {detail::ConvertResponse(Response, "JupiterSession::PutBuildPart"sv)}; - if (Result.Success) - { - std::string JsonError; - json11::Json Json = json11::Json::parse(Response.ToText(), JsonError); - if (JsonError.empty()) - { - json11::Json::array Needs = Json["needs"].array_items(); - for (const auto& Need : Needs) - { - Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); - } - } - Result.RawHash = Hash; - } - return Result; -} - -JupiterResult -JupiterSession::GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId) -{ - HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}", Namespace, BucketId, BuildId, PartId), - HttpClient::Accept(ZenContentType::kCbObject)); - return detail::ConvertResponse(Response, "JupiterSession::GetBuildPart"sv); -} - -JupiterResult -JupiterSession::PutBuildBlob(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const IoHash& Hash, - ZenContentType ContentType, - const CompositeBuffer& Payload) -{ - HttpClient::Response Response = - m_HttpClient.Upload(fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString()), - Payload, - ContentType); - return detail::ConvertResponse(Response, "JupiterSession::PutBuildBlob"sv); -} - -JupiterResult -JupiterSession::PutMultipartBuildBlob(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const IoHash& Hash, - ZenContentType ContentType, - uint64_t PayloadSize, - std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter, - std::vector<std::function<JupiterResult(bool& OutIsComplete)>>& OutWorkItems) -{ - struct MultipartUploadResponse - { - struct Part - { - uint64_t FirstByte; - uint64_t LastByte; - std::string PartId; - std::string QueryString; - }; - - std::string UploadId; - std::string BlobName; - std::vector<Part> Parts; - - static MultipartUploadResponse Parse(CbObject& Payload) - { - MultipartUploadResponse Result; - Result.UploadId = Payload["uploadId"sv].AsString(); - Result.BlobName = Payload["blobName"sv].AsString(); - CbArrayView PartsArray = Payload["parts"sv].AsArrayView(); - Result.Parts.reserve(PartsArray.Num()); - for (CbFieldView PartView : PartsArray) - { - CbObjectView PartObject = PartView.AsObjectView(); - Result.Parts.emplace_back(Part{ - .FirstByte = PartObject["firstByte"sv].AsUInt64(), - .LastByte = PartObject["lastByte"sv].AsUInt64(), - .PartId = std::string(PartObject["partId"sv].AsString()), - .QueryString = std::string(PartObject["queryString"sv].AsString()), - }); - } - return Result; - } - }; - - CbObjectWriter StartMultipartPayloadWriter; - StartMultipartPayloadWriter.AddInteger("blobLength"sv, PayloadSize); - CbObject StartMultipartPayload = StartMultipartPayloadWriter.Save(); - - std::string StartMultipartResponseRequestString = - fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/startMultipartUpload", Namespace, BucketId, BuildId, Hash.ToHexString()); - // ZEN_INFO("POST: {}", StartMultipartResponseRequestString); - HttpClient::Response StartMultipartResponse = - m_HttpClient.Post(StartMultipartResponseRequestString, StartMultipartPayload, HttpClient::Accept(ZenContentType::kCbObject)); - if (!StartMultipartResponse.IsSuccess()) - { - ZEN_WARN("{}", StartMultipartResponse.ErrorMessage("startMultipartUpload: ")); - return detail::ConvertResponse(StartMultipartResponse, "JupiterSession::PutMultipartBuildBlob"sv); - } - CbValidateError ValidateResult = CbValidateError::None; - CbObject ResponseObject = ValidateAndReadCompactBinaryObject(IoBuffer(StartMultipartResponse.ResponsePayload), ValidateResult); - if (ValidateResult != CbValidateError::None) - { - JupiterResult Result = detail::ConvertResponse(StartMultipartResponse, "JupiterSession::PutMultipartBuildBlob"sv); - Result.ErrorCode = (int32)HttpResponseCode::UnsupportedMediaType; - Result.Reason = fmt::format("Invalid multipart response object format: '{}'", ToString(ValidateResult)); - return Result; - } - - struct WorkloadData - { - MultipartUploadResponse PartDescription; - std::function<IoBuffer(uint64_t Offset, uint64_t Size)> Transmitter; - std::atomic<size_t> PartsLeft; - }; - - std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>()); - - Workload->PartDescription = MultipartUploadResponse::Parse(ResponseObject); - Workload->Transmitter = std::move(Transmitter); - Workload->PartsLeft = Workload->PartDescription.Parts.size(); - - for (size_t PartIndex = 0; PartIndex < Workload->PartDescription.Parts.size(); PartIndex++) - { - OutWorkItems.emplace_back([this, Namespace, BucketId, BuildId, Hash, ContentType, Workload, PartIndex]( - bool& OutIsComplete) -> JupiterResult { - const MultipartUploadResponse::Part& Part = Workload->PartDescription.Parts[PartIndex]; - IoBuffer PartPayload = Workload->Transmitter(Part.FirstByte, Part.LastByte - Part.FirstByte); - std::string MultipartUploadResponseRequestString = - fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}&supportsRedirect={}", - Namespace, - BucketId, - BuildId, - Hash.ToHexString(), - Part.QueryString, - m_AllowRedirect ? "true"sv : "false"sv); - // ZEN_INFO("PUT: {}", MultipartUploadResponseRequestString); - HttpClient::Response MultipartUploadResponse = m_HttpClient.Put(MultipartUploadResponseRequestString, PartPayload); - if (!MultipartUploadResponse.IsSuccess()) - { - ZEN_WARN("{}", MultipartUploadResponse.ErrorMessage(MultipartUploadResponseRequestString)); - } - OutIsComplete = Workload->PartsLeft.fetch_sub(1) == 1; - if (OutIsComplete) - { - int64_t TotalUploadedBytes = MultipartUploadResponse.UploadedBytes; - int64_t TotalDownloadedBytes = MultipartUploadResponse.DownloadedBytes; - double TotalElapsedSeconds = MultipartUploadResponse.ElapsedSeconds; - HttpClient::Response MultipartEndResponse = MultipartUploadResponse; - while (MultipartEndResponse.IsSuccess()) - { - CbObjectWriter CompletePayloadWriter; - CompletePayloadWriter.AddString("blobName"sv, Workload->PartDescription.BlobName); - CompletePayloadWriter.AddString("uploadId"sv, Workload->PartDescription.UploadId); - CompletePayloadWriter.AddBool("isCompressed"sv, ContentType == ZenContentType::kCompressedBinary); - CompletePayloadWriter.BeginArray("partIds"sv); - std::unordered_map<std::string, size_t> PartNameToIndex; - for (size_t UploadPartIndex = 0; UploadPartIndex < Workload->PartDescription.Parts.size(); UploadPartIndex++) - { - const MultipartUploadResponse::Part& PartDescription = Workload->PartDescription.Parts[UploadPartIndex]; - PartNameToIndex.insert({PartDescription.PartId, UploadPartIndex}); - CompletePayloadWriter.AddString(PartDescription.PartId); - } - CompletePayloadWriter.EndArray(); // "partIds" - CbObject CompletePayload = CompletePayloadWriter.Save(); - - std::string MultipartEndResponseRequestString = - fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/completeMultipart", Namespace, BucketId, BuildId, Hash.ToHexString()); - - MultipartEndResponse = m_HttpClient.Post(MultipartEndResponseRequestString, - CompletePayload, - HttpClient::Accept(ZenContentType::kCbObject)); - TotalUploadedBytes += MultipartEndResponse.UploadedBytes; - TotalDownloadedBytes += MultipartEndResponse.DownloadedBytes; - TotalElapsedSeconds += MultipartEndResponse.ElapsedSeconds; - if (MultipartEndResponse.IsSuccess()) - { - CbObject ResponseObject = MultipartEndResponse.AsObject(); - CbArrayView MissingPartsArrayView = ResponseObject["missingParts"sv].AsArrayView(); - if (MissingPartsArrayView.Num() == 0) - { - break; - } - else - { - for (CbFieldView PartIdView : MissingPartsArrayView) - { - std::string RetryPartId(PartIdView.AsString()); - size_t RetryPartIndex = PartNameToIndex.at(RetryPartId); - const MultipartUploadResponse::Part& RetryPart = Workload->PartDescription.Parts[RetryPartIndex]; - IoBuffer RetryPartPayload = - Workload->Transmitter(RetryPart.FirstByte, RetryPart.LastByte - RetryPart.FirstByte - 1); - std::string RetryMultipartUploadResponseRequestString = - fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}&supportsRedirect={}", - Namespace, - BucketId, - BuildId, - Hash.ToHexString(), - RetryPart.QueryString, - m_AllowRedirect ? "true"sv : "false"sv); - - MultipartUploadResponse = m_HttpClient.Put(RetryMultipartUploadResponseRequestString, RetryPartPayload); - TotalUploadedBytes = MultipartUploadResponse.UploadedBytes; - TotalDownloadedBytes = MultipartUploadResponse.DownloadedBytes; - TotalElapsedSeconds = MultipartUploadResponse.ElapsedSeconds; - if (!MultipartUploadResponse.IsSuccess()) - { - ZEN_WARN("{}", MultipartUploadResponse.ErrorMessage(RetryMultipartUploadResponseRequestString)); - MultipartEndResponse = MultipartUploadResponse; - } - } - } - } - else - { - ZEN_WARN("{}", MultipartEndResponse.ErrorMessage(MultipartEndResponseRequestString)); - } - } - MultipartEndResponse.UploadedBytes = TotalUploadedBytes; - MultipartEndResponse.DownloadedBytes = TotalDownloadedBytes; - MultipartEndResponse.ElapsedSeconds = TotalElapsedSeconds; - return detail::ConvertResponse(MultipartEndResponse, "JupiterSession::PutMultipartBuildBlob"sv); - } - return detail::ConvertResponse(MultipartUploadResponse, "JupiterSession::PutMultipartBuildBlob"sv); - }); - } - return detail::ConvertResponse(StartMultipartResponse, "JupiterSession::PutMultipartBuildBlob"sv); -} - -JupiterResult -JupiterSession::GetMultipartBuildBlob(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const IoHash& Hash, - uint64_t ChunkSize, - std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive, - std::function<void()>&& OnComplete, - std::vector<std::function<JupiterResult()>>& OutWorkItems) -{ - std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}", - Namespace, - BucketId, - BuildId, - Hash.ToHexString(), - m_AllowRedirect ? "true"sv : "false"sv); - HttpClient::Response Response = - m_HttpClient.Get(RequestUrl, HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", 0, ChunkSize - 1)}})); - if (Response.IsSuccess()) - { - if (std::string_view ContentRange = Response.Header.Entries["Content-Range"]; !ContentRange.empty()) - { - if (std::string_view::size_type SizeDelimiterPos = ContentRange.find('/'); SizeDelimiterPos != std::string_view::npos) - { - if (std::optional<uint64_t> TotalSizeMaybe = ParseInt<uint64_t>(ContentRange.substr(SizeDelimiterPos + 1)); - TotalSizeMaybe.has_value()) - { - uint64_t TotalSize = TotalSizeMaybe.value(); - uint64_t PayloadSize = Response.ResponsePayload.GetSize(); - - OnReceive(0, Response.ResponsePayload); - - if (TotalSize > PayloadSize) - { - struct WorkloadData - { - std::function<void(uint64_t Offset, const IoBuffer& Chunk)> OnReceive; - std::function<void()> OnComplete; - std::atomic<uint64_t> BytesRemaining; - }; - - std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>()); - Workload->OnReceive = std::move(OnReceive); - Workload->OnComplete = std::move(OnComplete); - Workload->BytesRemaining = TotalSize - PayloadSize; - - uint64_t Offset = PayloadSize; - while (Offset < TotalSize) - { - uint64_t PartSize = Min(ChunkSize, TotalSize - Offset); - OutWorkItems.emplace_back([this, - Namespace = std::string(Namespace), - BucketId = std::string(BucketId), - BuildId = Oid(BuildId), - Hash = IoHash(Hash), - TotalSize, - Workload, - Offset, - PartSize]() -> JupiterResult { - std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}", - Namespace, - BucketId, - BuildId, - Hash.ToHexString(), - m_AllowRedirect ? "true"sv : "false"sv); - HttpClient::Response Response = m_HttpClient.Get( - RequestUrl, - HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", Offset, Offset + PartSize - 1)}})); - if (Response.IsSuccess()) - { - Workload->OnReceive(Offset, Response.ResponsePayload); - uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Response.ResponsePayload.GetSize()); - if (ByteRemaning == Response.ResponsePayload.GetSize()) - { - Workload->OnComplete(); - } - } - return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv); - }); - Offset += PartSize; - } - } - else - { - OnComplete(); - } - return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv); - } - } - } - OnReceive(0, Response.ResponsePayload); - OnComplete(); - } - return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv); -} - -JupiterResult -JupiterSession::GetBuildBlob(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const IoHash& Hash, - std::filesystem::path TempFolderPath, - uint64_t Offset, - uint64_t Size) -{ - HttpClient::KeyValueMap Headers; - if (Offset != 0 || Size != (uint64_t)-1) - { - Headers.Entries.insert({"Range", fmt::format("bytes={}-{}", Offset, Offset + Size - 1)}); - } - HttpClient::Response Response = m_HttpClient.Download(fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}", - Namespace, - BucketId, - BuildId, - Hash.ToHexString(), - m_AllowRedirect ? "true"sv : "false"sv), - TempFolderPath, - Headers); - if (Response.IsSuccess()) - { - // If we get a redirect to S3 or a non-Jupiter endpoint the content type will not be correct, validate it and set it - if (m_AllowRedirect && (Response.ResponsePayload.GetContentType() == HttpContentType::kBinary)) - { - IoHash ValidateRawHash; - uint64_t ValidateRawSize = 0; - ZEN_ASSERT_SLOW(CompressedBuffer::ValidateCompressedHeader(Response.ResponsePayload, ValidateRawHash, ValidateRawSize)); - ZEN_ASSERT_SLOW(ValidateRawHash == Hash); - ZEN_ASSERT_SLOW(ValidateRawSize > 0); - ZEN_UNUSED(ValidateRawHash, ValidateRawSize); - Response.ResponsePayload.SetContentType(ZenContentType::kCompressedBinary); - } - } - return detail::ConvertResponse(Response, "JupiterSession::GetBuildBlob"sv); -} - -JupiterResult -JupiterSession::PutBlockMetadata(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const IoHash& Hash, - const IoBuffer& Payload) -{ - ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); - HttpClient::Response Response = - m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/blocks/{}/metadata", Namespace, BucketId, BuildId, Hash.ToHexString()), - Payload); - return detail::ConvertResponse(Response, "JupiterSession::PutBlockMetadata"sv); -} - -FinalizeBuildPartResult -JupiterSession::FinalizeBuildPart(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const Oid& PartId, - const IoHash& RawHash) -{ - HttpClient::Response Response = m_HttpClient.Post( - fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/finalize/{}", Namespace, BucketId, BuildId, PartId, RawHash.ToHexString()), - HttpClient::Accept(ZenContentType::kCbObject)); - - FinalizeBuildPartResult Result = {detail::ConvertResponse(Response, "JupiterSession::FinalizeBuildPart"sv)}; - if (Result.Success) - { - std::string JsonError; - json11::Json Json = json11::Json::parse(Response.ToText(), JsonError); - if (JsonError.empty()) - { - json11::Json::array Needs = Json["needs"].array_items(); - for (const auto& Need : Needs) - { - Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); - } - } - } - return Result; -} - -JupiterResult -JupiterSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, uint64_t MaxBlockCount) -{ - const std::string Parameters = MaxBlockCount == (uint64_t)-1 ? "" : fmt::format("?count={}", MaxBlockCount); - HttpClient::Response Response = - m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/blocks/listBlocks{}", Namespace, BucketId, BuildId, Parameters), - HttpClient::Accept(ZenContentType::kCbObject)); - return detail::ConvertResponse(Response, "JupiterSession::FindBlocks"sv); -} - -JupiterResult -JupiterSession::GetBlockMetadata(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, IoBuffer Payload) -{ - ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); - HttpClient::Response Response = - m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/blocks/getBlockMetadata", Namespace, BucketId, BuildId), - Payload, - HttpClient::Accept(ZenContentType::kCbObject)); - return detail::ConvertResponse(Response, "JupiterSession::GetBlockMetadata"sv); -} - -JupiterResult -JupiterSession::PutBuildPartStats(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const Oid& BuildPartId, - IoBuffer Payload) -{ - ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); - HttpClient::Response Response = - m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/stats", Namespace, BucketId, BuildId, BuildPartId), - Payload, - HttpClient::Accept(ZenContentType::kCbObject)); - return detail::ConvertResponse(Response, "JupiterSession::PutBuildPartStats"sv); -} - -} // namespace zen |