diff options
| author | zousar <[email protected]> | 2025-06-24 16:26:29 -0600 |
|---|---|---|
| committer | zousar <[email protected]> | 2025-06-24 16:26:29 -0600 |
| commit | bb298631ba35a323827dda0b8cd6158e276b5f61 (patch) | |
| tree | 7ba8db91c44ce83f2c518f80f80ab14910eefa6f /src/zenutil/jupiter | |
| parent | Change to PutResult structure (diff) | |
| parent | 5.6.14 (diff) | |
| download | zen-bb298631ba35a323827dda0b8cd6158e276b5f61.tar.xz zen-bb298631ba35a323827dda0b8cd6158e276b5f61.zip | |
Merge branch 'main' into zs/put-overwrite-policy
Diffstat (limited to 'src/zenutil/jupiter')
| -rw-r--r-- | src/zenutil/jupiter/jupiterbuildstorage.cpp | 479 | ||||
| -rw-r--r-- | src/zenutil/jupiter/jupiterclient.cpp | 1 | ||||
| -rw-r--r-- | src/zenutil/jupiter/jupitersession.cpp | 393 |
3 files changed, 855 insertions, 18 deletions
diff --git a/src/zenutil/jupiter/jupiterbuildstorage.cpp b/src/zenutil/jupiter/jupiterbuildstorage.cpp new file mode 100644 index 000000000..9974725ff --- /dev/null +++ b/src/zenutil/jupiter/jupiterbuildstorage.cpp @@ -0,0 +1,479 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/jupiter/jupiterbuildstorage.h> + +#include <zencore/compactbinarybuilder.h> +#include <zencore/fmtutils.h> +#include <zencore/scopeguard.h> +#include <zencore/timer.h> +#include <zencore/trace.h> +#include <zenutil/jupiter/jupitersession.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_map.h> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +using namespace std::literals; + +class JupiterBuildStorage : public BuildStorage +{ +public: + JupiterBuildStorage(LoggerRef InLog, + HttpClient& InHttpClient, + Statistics& Stats, + std::string_view Namespace, + std::string_view Bucket, + bool AllowRedirect, + const std::filesystem::path& TempFolderPath) + : m_Session(InLog, InHttpClient, AllowRedirect) + , m_Stats(Stats) + , m_Namespace(Namespace) + , m_Bucket(Bucket) + , m_TempFolderPath(TempFolderPath) + { + } + virtual ~JupiterBuildStorage() {} + + virtual CbObject ListNamespaces(bool bRecursive) override + { + ZEN_TRACE_CPU("Jupiter::ListNamespaces"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + JupiterResult ListResult = m_Session.ListBuildNamespaces(); + AddStatistic(ListResult); + if (!ListResult.Success) + { + throw std::runtime_error(fmt::format("Failed listing namespaces: {} ({})", ListResult.Reason, ListResult.ErrorCode)); + } + CbObject NamespaceResponse = PayloadToCbObject("Failed listing namespaces"sv, ListResult.Response); + + CbObjectWriter Response; + Response.BeginArray("results"sv); + for (CbFieldView NamespaceField : NamespaceResponse["namespaces"]) + { + std::string_view Namespace = NamespaceField.AsString(); + if (!Namespace.empty()) + { + Response.BeginObject(); + Response.AddString("name", Namespace); + + if (bRecursive) + { + JupiterResult BucketsResult = m_Session.ListBuildBuckets(Namespace); + AddStatistic(BucketsResult); + if (!BucketsResult.Success) + { + throw std::runtime_error( + fmt::format("Failed listing namespaces: {} ({})", BucketsResult.Reason, BucketsResult.ErrorCode)); + } + CbObject BucketResponse = PayloadToCbObject("Failed listing namespaces"sv, BucketsResult.Response); + + Response.BeginArray("items"); + for (CbFieldView BucketField : BucketResponse["buckets"]) + { + std::string_view Bucket = BucketField.AsString(); + if (!Bucket.empty()) + { + Response.AddString(Bucket); + } + } + Response.EndArray(); + } + + Response.EndObject(); + } + } + Response.EndArray(); + + return Response.Save(); + } + + virtual CbObject ListBuilds(CbObject Query) override + { + ZEN_TRACE_CPU("Jupiter::ListBuilds"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + IoBuffer Payload = Query.GetBuffer().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCbObject); + JupiterResult ListResult = m_Session.ListBuilds(m_Namespace, m_Bucket, Payload); + AddStatistic(ListResult); + if (!ListResult.Success) + { + throw std::runtime_error(fmt::format("Failed listing builds: {} ({})", ListResult.Reason, ListResult.ErrorCode)); + } + return PayloadToCbObject("Failed listing builds"sv, ListResult.Response); + } + + virtual CbObject PutBuild(const Oid& BuildId, const CbObject& MetaData) override + { + ZEN_TRACE_CPU("Jupiter::PutBuild"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + IoBuffer Payload = MetaData.GetBuffer().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCbObject); + JupiterResult PutResult = m_Session.PutBuild(m_Namespace, m_Bucket, BuildId, Payload); + AddStatistic(PutResult); + if (!PutResult.Success) + { + throw std::runtime_error(fmt::format("Failed creating build: {} ({})", PutResult.Reason, PutResult.ErrorCode)); + } + return PayloadToCbObject(fmt::format("Failed creating build: {}", BuildId), PutResult.Response); + } + + virtual CbObject GetBuild(const Oid& BuildId) override + { + ZEN_TRACE_CPU("Jupiter::GetBuild"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + JupiterResult GetBuildResult = m_Session.GetBuild(m_Namespace, m_Bucket, BuildId); + AddStatistic(GetBuildResult); + if (!GetBuildResult.Success) + { + throw std::runtime_error(fmt::format("Failed fetching build: {} ({})", GetBuildResult.Reason, GetBuildResult.ErrorCode)); + } + return PayloadToCbObject(fmt::format("Failed fetching build {}:", BuildId), GetBuildResult.Response); + } + + virtual void FinalizeBuild(const Oid& BuildId) override + { + ZEN_TRACE_CPU("Jupiter::FinalizeBuild"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + JupiterResult FinalizeBuildResult = m_Session.FinalizeBuild(m_Namespace, m_Bucket, BuildId); + AddStatistic(FinalizeBuildResult); + if (!FinalizeBuildResult.Success) + { + throw std::runtime_error( + fmt::format("Failed finalizing build part: {} ({})", FinalizeBuildResult.Reason, FinalizeBuildResult.ErrorCode)); + } + } + + virtual std::pair<IoHash, std::vector<IoHash>> PutBuildPart(const Oid& BuildId, + const Oid& BuildPartId, + std::string_view PartName, + const CbObject& MetaData) override + { + ZEN_TRACE_CPU("Jupiter::PutBuildPart"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + IoBuffer Payload = MetaData.GetBuffer().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCbObject); + PutBuildPartResult PutPartResult = m_Session.PutBuildPart(m_Namespace, m_Bucket, BuildId, BuildPartId, PartName, Payload); + AddStatistic(PutPartResult); + if (!PutPartResult.Success) + { + throw std::runtime_error(fmt::format("Failed creating build part: {} ({})", PutPartResult.Reason, PutPartResult.ErrorCode)); + } + return std::make_pair(PutPartResult.RawHash, std::move(PutPartResult.Needs)); + } + + virtual CbObject GetBuildPart(const Oid& BuildId, const Oid& BuildPartId) override + { + ZEN_TRACE_CPU("Jupiter::GetBuildPart"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + JupiterResult GetBuildPartResult = m_Session.GetBuildPart(m_Namespace, m_Bucket, BuildId, BuildPartId); + AddStatistic(GetBuildPartResult); + if (!GetBuildPartResult.Success) + { + throw std::runtime_error(fmt::format("Failed fetching build part {}: {} ({})", + BuildPartId, + GetBuildPartResult.Reason, + GetBuildPartResult.ErrorCode)); + } + return PayloadToCbObject(fmt::format("Failed fetching build part {}:", BuildPartId), GetBuildPartResult.Response); + } + + virtual std::vector<IoHash> FinalizeBuildPart(const Oid& BuildId, const Oid& BuildPartId, const IoHash& PartHash) override + { + ZEN_TRACE_CPU("Jupiter::FinalizeBuildPart"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + FinalizeBuildPartResult FinalizePartResult = m_Session.FinalizeBuildPart(m_Namespace, m_Bucket, BuildId, BuildPartId, PartHash); + AddStatistic(FinalizePartResult); + if (!FinalizePartResult.Success) + { + throw std::runtime_error( + fmt::format("Failed finalizing build part: {} ({})", FinalizePartResult.Reason, FinalizePartResult.ErrorCode)); + } + return std::move(FinalizePartResult.Needs); + } + + virtual void PutBuildBlob(const Oid& BuildId, + const IoHash& RawHash, + ZenContentType ContentType, + const CompositeBuffer& Payload) override + { + ZEN_TRACE_CPU("Jupiter::PutBuildBlob"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + JupiterResult PutBlobResult = m_Session.PutBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, ContentType, Payload); + AddStatistic(PutBlobResult); + if (!PutBlobResult.Success) + { + throw std::runtime_error(fmt::format("Failed putting build part: {} ({})", PutBlobResult.Reason, PutBlobResult.ErrorCode)); + } + } + + virtual std::vector<std::function<void()>> PutLargeBuildBlob(const Oid& BuildId, + const IoHash& RawHash, + ZenContentType ContentType, + uint64_t PayloadSize, + std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter, + std::function<void(uint64_t, bool)>&& OnSentBytes) override + { + ZEN_TRACE_CPU("Jupiter::PutLargeBuildBlob"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + std::vector<std::function<JupiterResult(bool&)>> WorkItems; + JupiterResult PutMultipartBlobResult = m_Session.PutMultipartBuildBlob(m_Namespace, + m_Bucket, + BuildId, + RawHash, + ContentType, + PayloadSize, + std::move(Transmitter), + WorkItems); + AddStatistic(PutMultipartBlobResult); + if (!PutMultipartBlobResult.Success) + { + throw std::runtime_error( + fmt::format("Failed putting build part: {} ({})", PutMultipartBlobResult.Reason, PutMultipartBlobResult.ErrorCode)); + } + OnSentBytes(PutMultipartBlobResult.SentBytes, WorkItems.empty()); + + std::vector<std::function<void()>> WorkList; + for (auto& WorkItem : WorkItems) + { + WorkList.emplace_back([this, WorkItem = std::move(WorkItem), OnSentBytes]() { + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + bool IsComplete = false; + JupiterResult PartResult = WorkItem(IsComplete); + AddStatistic(PartResult); + if (!PartResult.Success) + { + throw std::runtime_error(fmt::format("Failed putting build part: {} ({})", PartResult.Reason, PartResult.ErrorCode)); + } + OnSentBytes(PartResult.SentBytes, IsComplete); + }); + } + return WorkList; + } + + virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override + { + ZEN_TRACE_CPU("Jupiter::GetBuildBlob"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + CreateDirectories(m_TempFolderPath); + JupiterResult GetBuildBlobResult = + m_Session.GetBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, m_TempFolderPath, RangeOffset, RangeBytes); + AddStatistic(GetBuildBlobResult); + if (!GetBuildBlobResult.Success) + { + throw std::runtime_error( + fmt::format("Failed fetching build blob {}: {} ({})", RawHash, GetBuildBlobResult.Reason, GetBuildBlobResult.ErrorCode)); + } + return std::move(GetBuildBlobResult.Response); + } + + virtual std::vector<std::function<void()>> GetLargeBuildBlob(const Oid& BuildId, + const IoHash& RawHash, + uint64_t ChunkSize, + std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive, + std::function<void()>&& OnComplete) override + { + ZEN_TRACE_CPU("Jupiter::GetLargeBuildBlob"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + std::vector<std::function<JupiterResult()>> WorkItems; + JupiterResult GetMultipartBlobResult = m_Session.GetMultipartBuildBlob(m_Namespace, + m_Bucket, + BuildId, + RawHash, + ChunkSize, + std::move(OnReceive), + std::move(OnComplete), + WorkItems); + + AddStatistic(GetMultipartBlobResult); + if (!GetMultipartBlobResult.Success) + { + throw std::runtime_error( + fmt::format("Failed getting build part: {} ({})", GetMultipartBlobResult.Reason, GetMultipartBlobResult.ErrorCode)); + } + std::vector<std::function<void()>> WorkList; + for (auto& WorkItem : WorkItems) + { + WorkList.emplace_back([this, WorkItem = std::move(WorkItem)]() { + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + JupiterResult PartResult = WorkItem(); + AddStatistic(PartResult); + if (!PartResult.Success) + { + throw std::runtime_error(fmt::format("Failed getting build part: {} ({})", PartResult.Reason, PartResult.ErrorCode)); + } + }); + } + return WorkList; + } + + virtual bool PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) override + { + ZEN_TRACE_CPU("Jupiter::PutBlockMetadata"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + IoBuffer Payload = MetaData.GetBuffer().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCbObject); + JupiterResult PutMetaResult = m_Session.PutBlockMetadata(m_Namespace, m_Bucket, BuildId, BlockRawHash, Payload); + AddStatistic(PutMetaResult); + if (!PutMetaResult.Success) + { + if (PutMetaResult.ErrorCode == int32_t(HttpResponseCode::NotFound)) + { + return false; + } + throw std::runtime_error( + fmt::format("Failed putting build block metadata: {} ({})", PutMetaResult.Reason, PutMetaResult.ErrorCode)); + } + return true; + } + + virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) override + { + ZEN_TRACE_CPU("Jupiter::FindBlocks"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + JupiterResult FindResult = m_Session.FindBlocks(m_Namespace, m_Bucket, BuildId, MaxBlockCount); + AddStatistic(FindResult); + if (!FindResult.Success) + { + throw std::runtime_error(fmt::format("Failed fetching known blocks: {} ({})", FindResult.Reason, FindResult.ErrorCode)); + } + return PayloadToCbObject("Failed fetching known blocks"sv, FindResult.Response); + } + + virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) override + { + ZEN_TRACE_CPU("Jupiter::GetBlockMetadata"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + CbObjectWriter Request; + + Request.BeginArray("blocks"sv); + for (const IoHash& BlockHash : BlockHashes) + { + Request.AddHash(BlockHash); + } + Request.EndArray(); + + IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCbObject); + JupiterResult GetBlockMetadataResult = m_Session.GetBlockMetadata(m_Namespace, m_Bucket, BuildId, Payload); + AddStatistic(GetBlockMetadataResult); + if (!GetBlockMetadataResult.Success) + { + throw std::runtime_error( + fmt::format("Failed fetching block metadatas: {} ({})", GetBlockMetadataResult.Reason, GetBlockMetadataResult.ErrorCode)); + } + return PayloadToCbObject("Failed fetching block metadatas", GetBlockMetadataResult.Response); + } + + virtual void PutBuildPartStats(const Oid& BuildId, + const Oid& BuildPartId, + const tsl::robin_map<std::string, double>& FloatStats) override + { + ZEN_UNUSED(BuildId, BuildPartId, FloatStats); + CbObjectWriter Request; + Request.BeginObject("floatStats"sv); + for (auto It : FloatStats) + { + Request.AddFloat(It.first, It.second); + } + Request.EndObject(); + IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCbObject); + JupiterResult PutBuildPartStatsResult = m_Session.PutBuildPartStats(m_Namespace, m_Bucket, BuildId, BuildPartId, Payload); + AddStatistic(PutBuildPartStatsResult); + if (!PutBuildPartStatsResult.Success) + { + throw std::runtime_error(fmt::format("Failed posting build part statistics: {} ({})", + PutBuildPartStatsResult.Reason, + PutBuildPartStatsResult.ErrorCode)); + } + } + +private: + static CbObject PayloadToCbObject(std::string_view Context, const IoBuffer& Payload) + { + if (Payload.GetContentType() == ZenContentType::kJSON) + { + std::string_view Json(reinterpret_cast<const char*>(Payload.GetData()), Payload.GetSize()); + return LoadCompactBinaryFromJson(Json).AsObject(); + } + else if (Payload.GetContentType() == ZenContentType::kCbObject) + { + return LoadCompactBinaryObject(Payload); + } + else if (Payload.GetContentType() == ZenContentType::kCompressedBinary) + { + IoHash RawHash; + uint64_t RawSize; + return LoadCompactBinaryObject(CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, RawSize)); + } + else + { + throw std::runtime_error( + fmt::format("{}: {} ({})", "Unsupported response format", Context, ToString(Payload.GetContentType()))); + } + } + + void AddStatistic(const JupiterResult& Result) + { + m_Stats.TotalBytesWritten += Result.SentBytes; + m_Stats.TotalBytesRead += Result.ReceivedBytes; + m_Stats.TotalRequestTimeUs += uint64_t(Result.ElapsedSeconds * 1000000.0); + m_Stats.TotalRequestCount++; + } + + JupiterSession m_Session; + Statistics& m_Stats; + const std::string m_Namespace; + const std::string m_Bucket; + const std::filesystem::path m_TempFolderPath; +}; + +std::unique_ptr<BuildStorage> +CreateJupiterBuildStorage(LoggerRef InLog, + HttpClient& InHttpClient, + BuildStorage::Statistics& Stats, + std::string_view Namespace, + std::string_view Bucket, + bool AllowRedirect, + const std::filesystem::path& TempFolderPath) +{ + ZEN_TRACE_CPU("CreateJupiterBuildStorage"); + + return std::make_unique<JupiterBuildStorage>(InLog, InHttpClient, Stats, Namespace, Bucket, AllowRedirect, TempFolderPath); +} + +} // namespace zen diff --git a/src/zenutil/jupiter/jupiterclient.cpp b/src/zenutil/jupiter/jupiterclient.cpp index 5e5da3750..dbac218a4 100644 --- a/src/zenutil/jupiter/jupiterclient.cpp +++ b/src/zenutil/jupiter/jupiterclient.cpp @@ -11,7 +11,6 @@ JupiterClient::JupiterClient(const JupiterClientOptions& Options, std::function< , m_DefaultDdcNamespace(Options.DdcNamespace) , m_DefaultBlobStoreNamespace(Options.BlobStoreNamespace) , m_ComputeCluster(Options.ComputeCluster) -, m_TokenProvider(std::move(TokenProvider)) , m_HttpClient(Options.ServiceUrl, HttpClientSettings{.ConnectTimeout = Options.ConnectTimeout, .Timeout = Options.Timeout, diff --git a/src/zenutil/jupiter/jupitersession.cpp b/src/zenutil/jupiter/jupitersession.cpp index f706a7efc..1fd59acdf 100644 --- a/src/zenutil/jupiter/jupitersession.cpp +++ b/src/zenutil/jupiter/jupitersession.cpp @@ -3,6 +3,9 @@ #include <zenutil/jupiter/jupitersession.h> #include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/compositebuffer.h> +#include <zencore/compress.h> #include <zencore/fmtutils.h> #include <zencore/trace.h> @@ -46,7 +49,10 @@ namespace detail { } } // namespace detail -JupiterSession::JupiterSession(LoggerRef InLog, HttpClient& InHttpClient) : m_Log(InLog), m_HttpClient(InHttpClient) +JupiterSession::JupiterSession(LoggerRef InLog, HttpClient& InHttpClient, bool AllowRedirect) +: m_Log(InLog) +, m_HttpClient(InHttpClient) +, m_AllowRedirect(AllowRedirect) { } @@ -355,6 +361,32 @@ JupiterSession::CacheTypeExists(std::string_view Namespace, std::string_view Typ } JupiterResult +JupiterSession::ListBuildNamespaces() +{ + HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds"), {HttpClient::Accept(ZenContentType::kJSON)}); + return detail::ConvertResponse(Response, "JupiterSession::ListBuildNamespaces"sv); +} + +JupiterResult +JupiterSession::ListBuildBuckets(std::string_view Namespace) +{ + HttpClient::Response Response = + m_HttpClient.Get(fmt::format("/api/v2/builds/{}", Namespace), {HttpClient::Accept(ZenContentType::kJSON)}); + return detail::ConvertResponse(Response, "JupiterSession::ListBuildBuckets"sv); +} + +JupiterResult +JupiterSession::ListBuilds(std::string_view Namespace, std::string_view BucketId, const IoBuffer& Payload) +{ + ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); + std::string OptionalBucketPath = BucketId.empty() ? "" : fmt::format("/{}", BucketId); + HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v2/builds/{}{}/search", Namespace, OptionalBucketPath), + Payload, + {HttpClient::Accept(ZenContentType::kCbObject)}); + return detail::ConvertResponse(Response, "JupiterSession::ListBuilds"sv); +} + +JupiterResult JupiterSession::PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload) { ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); @@ -424,29 +456,330 @@ JupiterResult JupiterSession::PutBuildBlob(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, - const Oid& PartId, const IoHash& Hash, ZenContentType ContentType, const CompositeBuffer& Payload) { - HttpClient::Response Response = m_HttpClient.Upload( - fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()), - Payload, - ContentType); + HttpClient::Response Response = + m_HttpClient.Upload(fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString()), + Payload, + ContentType); return detail::ConvertResponse(Response, "JupiterSession::PutBuildBlob"sv); } JupiterResult +JupiterSession::PutMultipartBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const IoHash& Hash, + ZenContentType ContentType, + uint64_t PayloadSize, + std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter, + std::vector<std::function<JupiterResult(bool& OutIsComplete)>>& OutWorkItems) +{ + struct MultipartUploadResponse + { + struct Part + { + uint64_t FirstByte; + uint64_t LastByte; + std::string PartId; + std::string QueryString; + }; + + std::string UploadId; + std::string BlobName; + std::vector<Part> Parts; + + static MultipartUploadResponse Parse(CbObject& Payload) + { + MultipartUploadResponse Result; + Result.UploadId = Payload["uploadId"sv].AsString(); + Result.BlobName = Payload["blobName"sv].AsString(); + CbArrayView PartsArray = Payload["parts"sv].AsArrayView(); + Result.Parts.reserve(PartsArray.Num()); + for (CbFieldView PartView : PartsArray) + { + CbObjectView PartObject = PartView.AsObjectView(); + Result.Parts.emplace_back(Part{ + .FirstByte = PartObject["firstByte"sv].AsUInt64(), + .LastByte = PartObject["lastByte"sv].AsUInt64(), + .PartId = std::string(PartObject["partId"sv].AsString()), + .QueryString = std::string(PartObject["queryString"sv].AsString()), + }); + } + return Result; + } + }; + + CbObjectWriter StartMultipartPayloadWriter; + StartMultipartPayloadWriter.AddInteger("blobLength"sv, PayloadSize); + CbObject StartMultipartPayload = StartMultipartPayloadWriter.Save(); + + std::string StartMultipartResponseRequestString = + fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/startMultipartUpload", Namespace, BucketId, BuildId, Hash.ToHexString()); + // ZEN_INFO("POST: {}", StartMultipartResponseRequestString); + HttpClient::Response StartMultipartResponse = + m_HttpClient.Post(StartMultipartResponseRequestString, StartMultipartPayload, HttpClient::Accept(ZenContentType::kCbObject)); + if (!StartMultipartResponse.IsSuccess()) + { + ZEN_WARN("{}", StartMultipartResponse.ErrorMessage("startMultipartUpload: ")); + return detail::ConvertResponse(StartMultipartResponse, "JupiterSession::PutMultipartBuildBlob"sv); + } + CbObject ResponseObject = LoadCompactBinaryObject(StartMultipartResponse.ResponsePayload); + + struct WorkloadData + { + MultipartUploadResponse PartDescription; + std::function<IoBuffer(uint64_t Offset, uint64_t Size)> Transmitter; + std::atomic<size_t> PartsLeft; + }; + + std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>()); + + Workload->PartDescription = MultipartUploadResponse::Parse(ResponseObject); + Workload->Transmitter = std::move(Transmitter); + Workload->PartsLeft = Workload->PartDescription.Parts.size(); + + for (size_t PartIndex = 0; PartIndex < Workload->PartDescription.Parts.size(); PartIndex++) + { + OutWorkItems.emplace_back([this, Namespace, BucketId, BuildId, Hash, ContentType, Workload, PartIndex]( + bool& OutIsComplete) -> JupiterResult { + const MultipartUploadResponse::Part& Part = Workload->PartDescription.Parts[PartIndex]; + IoBuffer PartPayload = Workload->Transmitter(Part.FirstByte, Part.LastByte - Part.FirstByte); + std::string MultipartUploadResponseRequestString = + fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}&supportsRedirect={}", + Namespace, + BucketId, + BuildId, + Hash.ToHexString(), + Part.QueryString, + m_AllowRedirect ? "true"sv : "false"sv); + // ZEN_INFO("PUT: {}", MultipartUploadResponseRequestString); + HttpClient::Response MultipartUploadResponse = m_HttpClient.Put(MultipartUploadResponseRequestString, PartPayload); + if (!MultipartUploadResponse.IsSuccess()) + { + ZEN_WARN("{}", MultipartUploadResponse.ErrorMessage(MultipartUploadResponseRequestString)); + } + OutIsComplete = Workload->PartsLeft.fetch_sub(1) == 1; + if (OutIsComplete) + { + int64_t TotalUploadedBytes = MultipartUploadResponse.UploadedBytes; + int64_t TotalDownloadedBytes = MultipartUploadResponse.DownloadedBytes; + double TotalElapsedSeconds = MultipartUploadResponse.ElapsedSeconds; + HttpClient::Response MultipartEndResponse = MultipartUploadResponse; + while (MultipartEndResponse.IsSuccess()) + { + CbObjectWriter CompletePayloadWriter; + CompletePayloadWriter.AddString("blobName"sv, Workload->PartDescription.BlobName); + CompletePayloadWriter.AddString("uploadId"sv, Workload->PartDescription.UploadId); + CompletePayloadWriter.AddBool("isCompressed"sv, ContentType == ZenContentType::kCompressedBinary); + CompletePayloadWriter.BeginArray("partIds"sv); + std::unordered_map<std::string, size_t> PartNameToIndex; + for (size_t UploadPartIndex = 0; UploadPartIndex < Workload->PartDescription.Parts.size(); UploadPartIndex++) + { + const MultipartUploadResponse::Part& PartDescription = Workload->PartDescription.Parts[UploadPartIndex]; + PartNameToIndex.insert({PartDescription.PartId, UploadPartIndex}); + CompletePayloadWriter.AddString(PartDescription.PartId); + } + CompletePayloadWriter.EndArray(); // "partIds" + CbObject CompletePayload = CompletePayloadWriter.Save(); + + std::string MultipartEndResponseRequestString = + fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/completeMultipart", Namespace, BucketId, BuildId, Hash.ToHexString()); + + MultipartEndResponse = m_HttpClient.Post(MultipartEndResponseRequestString, + CompletePayload, + HttpClient::Accept(ZenContentType::kCbObject)); + TotalUploadedBytes += MultipartEndResponse.UploadedBytes; + TotalDownloadedBytes += MultipartEndResponse.DownloadedBytes; + TotalElapsedSeconds += MultipartEndResponse.ElapsedSeconds; + if (MultipartEndResponse.IsSuccess()) + { + CbObject ResponseObject = MultipartEndResponse.AsObject(); + CbArrayView MissingPartsArrayView = ResponseObject["missingParts"sv].AsArrayView(); + if (MissingPartsArrayView.Num() == 0) + { + break; + } + else + { + for (CbFieldView PartIdView : MissingPartsArrayView) + { + std::string RetryPartId(PartIdView.AsString()); + size_t RetryPartIndex = PartNameToIndex.at(RetryPartId); + const MultipartUploadResponse::Part& RetryPart = Workload->PartDescription.Parts[RetryPartIndex]; + IoBuffer RetryPartPayload = + Workload->Transmitter(RetryPart.FirstByte, RetryPart.LastByte - RetryPart.FirstByte - 1); + std::string RetryMultipartUploadResponseRequestString = + fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}/uploadMultipart{}&supportsRedirect={}", + Namespace, + BucketId, + BuildId, + Hash.ToHexString(), + RetryPart.QueryString, + m_AllowRedirect ? "true"sv : "false"sv); + + MultipartUploadResponse = m_HttpClient.Put(RetryMultipartUploadResponseRequestString, RetryPartPayload); + TotalUploadedBytes = MultipartUploadResponse.UploadedBytes; + TotalDownloadedBytes = MultipartUploadResponse.DownloadedBytes; + TotalElapsedSeconds = MultipartUploadResponse.ElapsedSeconds; + if (!MultipartUploadResponse.IsSuccess()) + { + ZEN_WARN("{}", MultipartUploadResponse.ErrorMessage(RetryMultipartUploadResponseRequestString)); + MultipartEndResponse = MultipartUploadResponse; + } + } + } + } + else + { + ZEN_WARN("{}", MultipartEndResponse.ErrorMessage(MultipartEndResponseRequestString)); + } + } + MultipartEndResponse.UploadedBytes = TotalUploadedBytes; + MultipartEndResponse.DownloadedBytes = TotalDownloadedBytes; + MultipartEndResponse.ElapsedSeconds = TotalElapsedSeconds; + return detail::ConvertResponse(MultipartEndResponse, "JupiterSession::PutMultipartBuildBlob"sv); + } + return detail::ConvertResponse(MultipartUploadResponse, "JupiterSession::PutMultipartBuildBlob"sv); + }); + } + return detail::ConvertResponse(StartMultipartResponse, "JupiterSession::PutMultipartBuildBlob"sv); +} + +JupiterResult +JupiterSession::GetMultipartBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const IoHash& Hash, + uint64_t ChunkSize, + std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive, + std::function<void()>&& OnComplete, + std::vector<std::function<JupiterResult()>>& OutWorkItems) +{ + std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}", + Namespace, + BucketId, + BuildId, + Hash.ToHexString(), + m_AllowRedirect ? "true"sv : "false"sv); + HttpClient::Response Response = + m_HttpClient.Get(RequestUrl, HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", 0, ChunkSize - 1)}})); + if (Response.IsSuccess()) + { + if (std::string_view ContentRange = Response.Header.Entries["Content-Range"]; !ContentRange.empty()) + { + if (std::string_view::size_type SizeDelimiterPos = ContentRange.find('/'); SizeDelimiterPos != std::string_view::npos) + { + if (std::optional<uint64_t> TotalSizeMaybe = ParseInt<uint64_t>(ContentRange.substr(SizeDelimiterPos + 1)); + TotalSizeMaybe.has_value()) + { + uint64_t TotalSize = TotalSizeMaybe.value(); + uint64_t PayloadSize = Response.ResponsePayload.GetSize(); + + OnReceive(0, Response.ResponsePayload); + + if (TotalSize > PayloadSize) + { + struct WorkloadData + { + std::function<void(uint64_t Offset, const IoBuffer& Chunk)> OnReceive; + std::function<void()> OnComplete; + std::atomic<uint64_t> BytesRemaining; + }; + + std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>()); + Workload->OnReceive = std::move(OnReceive); + Workload->OnComplete = std::move(OnComplete); + Workload->BytesRemaining = TotalSize - PayloadSize; + + uint64_t Offset = PayloadSize; + while (Offset < TotalSize) + { + uint64_t PartSize = Min(ChunkSize, TotalSize - Offset); + OutWorkItems.emplace_back([this, + Namespace = std::string(Namespace), + BucketId = std::string(BucketId), + BuildId = Oid(BuildId), + Hash = IoHash(Hash), + TotalSize, + Workload, + Offset, + PartSize]() -> JupiterResult { + std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}", + Namespace, + BucketId, + BuildId, + Hash.ToHexString(), + m_AllowRedirect ? "true"sv : "false"sv); + HttpClient::Response Response = m_HttpClient.Get( + RequestUrl, + HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", Offset, Offset + PartSize - 1)}})); + if (Response.IsSuccess()) + { + Workload->OnReceive(Offset, Response.ResponsePayload); + uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Response.ResponsePayload.GetSize()); + if (ByteRemaning == Response.ResponsePayload.GetSize()) + { + Workload->OnComplete(); + } + } + return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv); + }); + Offset += PartSize; + } + } + else + { + OnComplete(); + } + return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv); + } + } + } + OnReceive(0, Response.ResponsePayload); + OnComplete(); + } + return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv); +} + +JupiterResult JupiterSession::GetBuildBlob(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, - const Oid& PartId, const IoHash& Hash, - std::filesystem::path TempFolderPath) + std::filesystem::path TempFolderPath, + uint64_t Offset, + uint64_t Size) { - HttpClient::Response Response = m_HttpClient.Download( - fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()), - TempFolderPath); + HttpClient::KeyValueMap Headers; + if (Offset != 0 || Size != (uint64_t)-1) + { + Headers.Entries.insert({"Range", fmt::format("bytes={}-{}", Offset, Offset + Size - 1)}); + } + HttpClient::Response Response = m_HttpClient.Download(fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}", + Namespace, + BucketId, + BuildId, + Hash.ToHexString(), + m_AllowRedirect ? "true"sv : "false"sv), + TempFolderPath, + Headers); + if (Response.IsSuccess()) + { + // If we get a redirect to S3 or a non-Jupiter endpoint the content type will not be correct, validate it and set it + if (m_AllowRedirect && (Response.ResponsePayload.GetContentType() == HttpContentType::kBinary)) + { + IoHash ValidateRawHash; + uint64_t ValidateRawSize = 0; + ZEN_ASSERT_SLOW(CompressedBuffer::ValidateCompressedHeader(Response.ResponsePayload, ValidateRawHash, ValidateRawSize)); + ZEN_ASSERT_SLOW(ValidateRawHash == Hash); + ZEN_ASSERT_SLOW(ValidateRawSize > 0); + ZEN_UNUSED(ValidateRawHash, ValidateRawSize); + Response.ResponsePayload.SetContentType(ZenContentType::kCompressedBinary); + } + } return detail::ConvertResponse(Response, "JupiterSession::GetBuildBlob"sv); } @@ -454,14 +787,13 @@ JupiterResult JupiterSession::PutBlockMetadata(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, - const Oid& PartId, const IoHash& Hash, const IoBuffer& Payload) { ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); - HttpClient::Response Response = m_HttpClient.Put( - fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/{}/metadata", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()), - Payload); + HttpClient::Response Response = + m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/blocks/{}/metadata", Namespace, BucketId, BuildId, Hash.ToHexString()), + Payload); return detail::ConvertResponse(Response, "JupiterSession::PutBlockMetadata"sv); } @@ -494,12 +826,39 @@ JupiterSession::FinalizeBuildPart(std::string_view Namespace, } JupiterResult -JupiterSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId) +JupiterSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, uint64_t MaxBlockCount) { + const std::string Parameters = MaxBlockCount == (uint64_t)-1 ? "" : fmt::format("?count={}", MaxBlockCount); HttpClient::Response Response = - m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/listBlocks", Namespace, BucketId, BuildId, PartId), + m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/blocks/listBlocks{}", Namespace, BucketId, BuildId, Parameters), HttpClient::Accept(ZenContentType::kCbObject)); return detail::ConvertResponse(Response, "JupiterSession::FindBlocks"sv); } +JupiterResult +JupiterSession::GetBlockMetadata(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, IoBuffer Payload) +{ + ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); + HttpClient::Response Response = + m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/blocks/getBlockMetadata", Namespace, BucketId, BuildId), + Payload, + HttpClient::Accept(ZenContentType::kCbObject)); + return detail::ConvertResponse(Response, "JupiterSession::GetBlockMetadata"sv); +} + +JupiterResult +JupiterSession::PutBuildPartStats(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& BuildPartId, + IoBuffer Payload) +{ + ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); + HttpClient::Response Response = + m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/stats", Namespace, BucketId, BuildId, BuildPartId), + Payload, + HttpClient::Accept(ZenContentType::kCbObject)); + return detail::ConvertResponse(Response, "JupiterSession::PutBuildPartStats"sv); +} + } // namespace zen |