// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include ZEN_THIRD_PARTY_INCLUDES_END namespace zen { using namespace std::literals; namespace { void ThrowFromJupiterResult(const JupiterResult& Result, std::string_view Prefix) { int Error = Result.ErrorCode < (int)HttpResponseCode::Continue ? Result.ErrorCode : 0; HttpResponseCode Status = Result.ErrorCode >= int(HttpResponseCode::Continue) ? HttpResponseCode(Result.ErrorCode) : HttpResponseCode::ImATeapot; throw HttpClientError(fmt::format("{}: {} ({})", Prefix, Result.Reason, Result.ErrorCode), Error, Status); } } // namespace class JupiterBuildStorage : public BuildStorage { public: JupiterBuildStorage(LoggerRef InLog, HttpClient& InHttpClient, Statistics& Stats, std::string_view Namespace, std::string_view Bucket, bool AllowRedirect, const std::filesystem::path& TempFolderPath) : m_Session(InLog, InHttpClient, AllowRedirect) , m_Stats(Stats) , m_Namespace(Namespace) , m_Bucket(Bucket) , m_TempFolderPath(TempFolderPath) { } virtual ~JupiterBuildStorage() {} virtual CbObject ListNamespaces(bool bRecursive) override { ZEN_TRACE_CPU("Jupiter::ListNamespaces"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); JupiterResult ListResult = m_Session.ListBuildNamespaces(); AddStatistic(ListResult); if (!ListResult.Success) { ThrowFromJupiterResult(ListResult, "Failed listing namespaces"); } CbObject NamespaceResponse = PayloadToCbObject("Failed listing namespaces"sv, ListResult.Response); CbObjectWriter Response; Response.BeginArray("results"sv); for (CbFieldView NamespaceField : NamespaceResponse["namespaces"]) { std::string_view Namespace = NamespaceField.AsString(); if (!Namespace.empty()) { Response.BeginObject(); Response.AddString("name", Namespace); if (bRecursive) { JupiterResult BucketsResult = m_Session.ListBuildBuckets(Namespace); AddStatistic(BucketsResult); if (!BucketsResult.Success) { ThrowFromJupiterResult(BucketsResult, fmt::format("Failed listing buckets in namespace {}", Namespace)); } CbObject BucketResponse = PayloadToCbObject(fmt::format("Failed listing buckets in namespace {}", Namespace), BucketsResult.Response); Response.BeginArray("items"); for (CbFieldView BucketField : BucketResponse["buckets"]) { std::string_view Bucket = BucketField.AsString(); if (!Bucket.empty()) { Response.AddString(Bucket); } } Response.EndArray(); } Response.EndObject(); } } Response.EndArray(); return Response.Save(); } virtual CbObject ListBuilds(CbObject Query) override { ZEN_TRACE_CPU("Jupiter::ListBuilds"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); IoBuffer Payload = Query.GetBuffer().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCbObject); JupiterResult ListResult = m_Session.ListBuilds(m_Namespace, m_Bucket, Payload); AddStatistic(ListResult); if (!ListResult.Success) { ThrowFromJupiterResult(ListResult, "Failed listing builds"sv); } return PayloadToCbObject("Failed listing builds"sv, ListResult.Response); } virtual CbObject PutBuild(const Oid& BuildId, const CbObject& MetaData) override { ZEN_TRACE_CPU("Jupiter::PutBuild"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); IoBuffer Payload = MetaData.GetBuffer().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCbObject); JupiterResult PutResult = m_Session.PutBuild(m_Namespace, m_Bucket, BuildId, Payload); AddStatistic(PutResult); if (!PutResult.Success) { ThrowFromJupiterResult(PutResult, "Failed creating build"sv); } return PayloadToCbObject(fmt::format("Failed creating build: {}", BuildId), PutResult.Response); } virtual CbObject GetBuild(const Oid& BuildId) override { ZEN_TRACE_CPU("Jupiter::GetBuild"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); JupiterResult GetBuildResult = m_Session.GetBuild(m_Namespace, m_Bucket, BuildId); AddStatistic(GetBuildResult); if (!GetBuildResult.Success) { ThrowFromJupiterResult(GetBuildResult, "Failed fetching build"sv); } return PayloadToCbObject(fmt::format("Failed fetching build {}:", BuildId), GetBuildResult.Response); } virtual void FinalizeBuild(const Oid& BuildId) override { ZEN_TRACE_CPU("Jupiter::FinalizeBuild"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); JupiterResult FinalizeBuildResult = m_Session.FinalizeBuild(m_Namespace, m_Bucket, BuildId); AddStatistic(FinalizeBuildResult); if (!FinalizeBuildResult.Success) { ThrowFromJupiterResult(FinalizeBuildResult, "Failed finalizing build"sv); } } virtual std::pair> PutBuildPart(const Oid& BuildId, const Oid& BuildPartId, std::string_view PartName, const CbObject& MetaData) override { ZEN_TRACE_CPU("Jupiter::PutBuildPart"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); IoBuffer Payload = MetaData.GetBuffer().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCbObject); PutBuildPartResult PutPartResult = m_Session.PutBuildPart(m_Namespace, m_Bucket, BuildId, BuildPartId, PartName, Payload); AddStatistic(PutPartResult); if (!PutPartResult.Success) { ThrowFromJupiterResult(PutPartResult, "Failed creating build part"sv); } return std::make_pair(PutPartResult.RawHash, std::move(PutPartResult.Needs)); } virtual CbObject GetBuildPart(const Oid& BuildId, const Oid& BuildPartId) override { ZEN_TRACE_CPU("Jupiter::GetBuildPart"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); JupiterResult GetBuildPartResult = m_Session.GetBuildPart(m_Namespace, m_Bucket, BuildId, BuildPartId); AddStatistic(GetBuildPartResult); if (!GetBuildPartResult.Success) { ThrowFromJupiterResult(GetBuildPartResult, "Failed fetching build part"sv); } return PayloadToCbObject(fmt::format("Failed fetching build part {}:", BuildPartId), GetBuildPartResult.Response); } virtual std::vector FinalizeBuildPart(const Oid& BuildId, const Oid& BuildPartId, const IoHash& PartHash) override { ZEN_TRACE_CPU("Jupiter::FinalizeBuildPart"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); FinalizeBuildPartResult FinalizePartResult = m_Session.FinalizeBuildPart(m_Namespace, m_Bucket, BuildId, BuildPartId, PartHash); AddStatistic(FinalizePartResult); if (!FinalizePartResult.Success) { ThrowFromJupiterResult(FinalizePartResult, "Failed finalizing build part"sv); } return std::move(FinalizePartResult.Needs); } virtual void PutBuildBlob(const Oid& BuildId, const IoHash& RawHash, ZenContentType ContentType, const CompositeBuffer& Payload) override { ZEN_TRACE_CPU("Jupiter::PutBuildBlob"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); JupiterResult PutBlobResult = m_Session.PutBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, ContentType, Payload); AddStatistic(PutBlobResult); if (!PutBlobResult.Success) { ThrowFromJupiterResult(PutBlobResult, "Failed putting build part"sv); } } virtual std::vector> PutLargeBuildBlob(const Oid& BuildId, const IoHash& RawHash, ZenContentType ContentType, uint64_t PayloadSize, std::function&& Transmitter, std::function&& OnSentBytes) override { ZEN_TRACE_CPU("Jupiter::PutLargeBuildBlob"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); std::vector> WorkItems; JupiterResult PutMultipartBlobResult = m_Session.PutMultipartBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, ContentType, PayloadSize, std::move(Transmitter), WorkItems); AddStatistic(PutMultipartBlobResult); if (!PutMultipartBlobResult.Success) { ThrowFromJupiterResult(PutMultipartBlobResult, "Failed putting large build blob"sv); } OnSentBytes(PutMultipartBlobResult.SentBytes, WorkItems.empty()); std::vector> WorkList; for (auto& WorkItem : WorkItems) { WorkList.emplace_back([this, WorkItem = std::move(WorkItem), OnSentBytes]() { Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); bool IsComplete = false; JupiterResult PartResult = WorkItem(IsComplete); AddStatistic(PartResult); if (!PartResult.Success) { ThrowFromJupiterResult(PartResult, "Failed putting large build blob"sv); } OnSentBytes(PartResult.SentBytes, IsComplete); }); } return WorkList; } virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override { ZEN_TRACE_CPU("Jupiter::GetBuildBlob"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); CreateDirectories(m_TempFolderPath); JupiterResult GetBuildBlobResult = m_Session.GetBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, m_TempFolderPath, RangeOffset, RangeBytes); AddStatistic(GetBuildBlobResult); if (!GetBuildBlobResult.Success) { ThrowFromJupiterResult(GetBuildBlobResult, "Failed fetching build blob"sv); } return std::move(GetBuildBlobResult.Response); } virtual std::vector> GetLargeBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t ChunkSize, std::function&& OnReceive, std::function&& OnComplete) override { ZEN_TRACE_CPU("Jupiter::GetLargeBuildBlob"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); std::vector> WorkItems; JupiterResult GetMultipartBlobResult = m_Session.GetMultipartBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, ChunkSize, std::move(OnReceive), std::move(OnComplete), WorkItems); AddStatistic(GetMultipartBlobResult); if (!GetMultipartBlobResult.Success) { ThrowFromJupiterResult(GetMultipartBlobResult, "Failed getting large build part"sv); } std::vector> WorkList; for (auto& WorkItem : WorkItems) { WorkList.emplace_back([this, WorkItem = std::move(WorkItem)]() { Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); JupiterResult PartResult = WorkItem(); AddStatistic(PartResult); if (!PartResult.Success) { ThrowFromJupiterResult(PartResult, "Failed getting large build part"sv); } }); } return WorkList; } virtual bool PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) override { ZEN_TRACE_CPU("Jupiter::PutBlockMetadata"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); IoBuffer Payload = MetaData.GetBuffer().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCbObject); JupiterResult PutMetaResult = m_Session.PutBlockMetadata(m_Namespace, m_Bucket, BuildId, BlockRawHash, Payload); AddStatistic(PutMetaResult); if (!PutMetaResult.Success) { if (PutMetaResult.ErrorCode == int32_t(HttpResponseCode::NotFound)) { return false; } ThrowFromJupiterResult(PutMetaResult, "Failed putting build block metadata"sv); } return true; } virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) override { ZEN_TRACE_CPU("Jupiter::FindBlocks"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); JupiterResult FindResult = m_Session.FindBlocks(m_Namespace, m_Bucket, BuildId, MaxBlockCount); AddStatistic(FindResult); if (!FindResult.Success) { ThrowFromJupiterResult(FindResult, "Failed fetching known blocks"sv); } return PayloadToCbObject("Failed fetching known blocks"sv, FindResult.Response); } virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span BlockHashes) override { ZEN_TRACE_CPU("Jupiter::GetBlockMetadata"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); CbObjectWriter Request; Request.BeginArray("blocks"sv); for (const IoHash& BlockHash : BlockHashes) { Request.AddHash(BlockHash); } Request.EndArray(); IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCbObject); JupiterResult GetBlockMetadataResult = m_Session.GetBlockMetadata(m_Namespace, m_Bucket, BuildId, Payload); AddStatistic(GetBlockMetadataResult); if (!GetBlockMetadataResult.Success) { ThrowFromJupiterResult(GetBlockMetadataResult, "Failed fetching block metadatas"sv); } return PayloadToCbObject("Failed fetching block metadatas", GetBlockMetadataResult.Response); } virtual void PutBuildPartStats(const Oid& BuildId, const Oid& BuildPartId, const tsl::robin_map& FloatStats) override { ZEN_UNUSED(BuildId, BuildPartId, FloatStats); CbObjectWriter Request; Request.BeginObject("floatStats"sv); for (auto It : FloatStats) { Request.AddFloat(It.first, It.second); } Request.EndObject(); IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCbObject); JupiterResult PutBuildPartStatsResult = m_Session.PutBuildPartStats(m_Namespace, m_Bucket, BuildId, BuildPartId, Payload); AddStatistic(PutBuildPartStatsResult); if (!PutBuildPartStatsResult.Success) { ThrowFromJupiterResult(PutBuildPartStatsResult, "Failed posting build part statistics"sv); } } private: static CbObject PayloadToCbObject(std::string_view ErrorContext, const IoBuffer& Payload) { if (Payload.GetContentType() == ZenContentType::kJSON) { std::string_view Json(reinterpret_cast(Payload.GetData()), Payload.GetSize()); return LoadCompactBinaryFromJson(Json).AsObject(); } else if (Payload.GetContentType() == ZenContentType::kCbObject) { return LoadCompactBinaryObject(Payload); } else if (Payload.GetContentType() == ZenContentType::kCompressedBinary) { IoHash RawHash; uint64_t RawSize; return LoadCompactBinaryObject(CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, RawSize)); } else { throw std::runtime_error( fmt::format("{}: {} ({})", "Unsupported response format", ErrorContext, ToString(Payload.GetContentType()))); } } void AddStatistic(const JupiterResult& Result) { m_Stats.TotalBytesWritten += Result.SentBytes; m_Stats.TotalBytesRead += Result.ReceivedBytes; m_Stats.TotalRequestTimeUs += uint64_t(Result.ElapsedSeconds * 1000000.0); m_Stats.TotalRequestCount++; SetAtomicMax(m_Stats.PeakSentBytes, Result.SentBytes); SetAtomicMax(m_Stats.PeakReceivedBytes, Result.ReceivedBytes); if (Result.ElapsedSeconds > 0.0) { uint64_t BytesPerSec = uint64_t((Result.SentBytes + Result.ReceivedBytes) / Result.ElapsedSeconds); SetAtomicMax(m_Stats.PeakBytesPerSec, BytesPerSec); } } JupiterSession m_Session; Statistics& m_Stats; const std::string m_Namespace; const std::string m_Bucket; const std::filesystem::path m_TempFolderPath; }; std::unique_ptr CreateJupiterBuildStorage(LoggerRef InLog, HttpClient& InHttpClient, BuildStorage::Statistics& Stats, std::string_view Namespace, std::string_view Bucket, bool AllowRedirect, const std::filesystem::path& TempFolderPath) { ZEN_TRACE_CPU("CreateJupiterBuildStorage"); return std::make_unique(InLog, InHttpClient, Stats, Namespace, Bucket, AllowRedirect, TempFolderPath); } } // namespace zen