// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include ZEN_THIRD_PARTY_INCLUDES_END #include namespace zen { using namespace std::literals; namespace { void ThrowFromJupiterResult(const JupiterResult& Result, std::string_view Prefix) { int Error = Result.ErrorCode < (int)HttpResponseCode::Continue ? Result.ErrorCode : 0; HttpResponseCode Status = Result.ErrorCode >= int(HttpResponseCode::Continue) ? HttpResponseCode(Result.ErrorCode) : HttpResponseCode::ImATeapot; throw HttpClientError(fmt::format("{}: {} ({})", Prefix, Result.Reason, Result.ErrorCode), Error, Status); } } // namespace class JupiterBuildStorage : public BuildStorageBase { public: JupiterBuildStorage(LoggerRef InLog, HttpClient& InHttpClient, Statistics& Stats, std::string_view Namespace, std::string_view Bucket, bool AllowRedirect, const std::filesystem::path& TempFolderPath) : m_Session(InLog, InHttpClient, AllowRedirect) , m_Stats(Stats) , m_Namespace(Namespace) , m_Bucket(Bucket) , m_TempFolderPath(TempFolderPath) { } virtual ~JupiterBuildStorage() {} virtual CbObject ListNamespaces(bool bRecursive) override { ZEN_TRACE_CPU("Jupiter::ListNamespaces"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); JupiterResult ListResult = m_Session.ListBuildNamespaces(); AddStatistic(ListResult); if (!ListResult.Success) { ThrowFromJupiterResult(ListResult, "Failed listing namespaces"); } CbObject NamespaceResponse = PayloadToCbObject("Failed listing namespaces"sv, ListResult.Response); CbObjectWriter Response; Response.BeginArray("results"sv); for (CbFieldView NamespaceField : NamespaceResponse["namespaces"]) { std::string_view Namespace = NamespaceField.AsString(); if (!Namespace.empty()) { Response.BeginObject(); Response.AddString("name", Namespace); if (bRecursive) { JupiterResult BucketsResult = m_Session.ListBuildBuckets(Namespace); AddStatistic(BucketsResult); if (!BucketsResult.Success) { ThrowFromJupiterResult(BucketsResult, fmt::format("Failed listing buckets in namespace {}", Namespace)); } CbObject BucketResponse = PayloadToCbObject(fmt::format("Failed listing buckets in namespace {}", Namespace), BucketsResult.Response); Response.BeginArray("items"); for (CbFieldView BucketField : BucketResponse["buckets"]) { std::string_view Bucket = BucketField.AsString(); if (!Bucket.empty()) { Response.AddString(Bucket); } } Response.EndArray(); } Response.EndObject(); } } Response.EndArray(); return Response.Save(); } virtual CbObject ListBuilds(std::string_view JsonQuery) override { ZEN_TRACE_CPU("Jupiter::ListBuilds"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); JupiterResult ListResult = m_Session.ListBuilds(m_Namespace, m_Bucket, JsonQuery); AddStatistic(ListResult); if (!ListResult.Success) { ThrowFromJupiterResult(ListResult, "Failed listing builds"sv); } return PayloadToCbObject("Failed listing builds"sv, ListResult.Response); } virtual CbObject PutBuild(const Oid& BuildId, const CbObject& MetaData) override { ZEN_TRACE_CPU("Jupiter::PutBuild"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); IoBuffer Payload = MetaData.GetBuffer().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCbObject); JupiterResult PutResult = m_Session.PutBuild(m_Namespace, m_Bucket, BuildId, Payload); AddStatistic(PutResult); if (!PutResult.Success) { ThrowFromJupiterResult(PutResult, "Failed creating build"sv); } return PayloadToCbObject(fmt::format("Failed creating build: {}", BuildId), PutResult.Response); } virtual CbObject GetBuild(const Oid& BuildId) override { ZEN_TRACE_CPU("Jupiter::GetBuild"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); JupiterResult GetBuildResult = m_Session.GetBuild(m_Namespace, m_Bucket, BuildId); AddStatistic(GetBuildResult); if (!GetBuildResult.Success) { ThrowFromJupiterResult(GetBuildResult, "Failed fetching build"sv); } return PayloadToCbObject(fmt::format("Failed fetching build {}:", BuildId), GetBuildResult.Response); } virtual void FinalizeBuild(const Oid& BuildId) override { ZEN_TRACE_CPU("Jupiter::FinalizeBuild"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); JupiterResult FinalizeBuildResult = m_Session.FinalizeBuild(m_Namespace, m_Bucket, BuildId); AddStatistic(FinalizeBuildResult); if (!FinalizeBuildResult.Success) { ThrowFromJupiterResult(FinalizeBuildResult, "Failed finalizing build"sv); } } virtual std::pair> PutBuildPart(const Oid& BuildId, const Oid& BuildPartId, std::string_view PartName, const CbObject& MetaData) override { ZEN_TRACE_CPU("Jupiter::PutBuildPart"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); IoBuffer Payload = MetaData.GetBuffer().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCbObject); PutBuildPartResult PutPartResult = m_Session.PutBuildPart(m_Namespace, m_Bucket, BuildId, BuildPartId, PartName, Payload); AddStatistic(PutPartResult); if (!PutPartResult.Success) { ThrowFromJupiterResult(PutPartResult, "Failed creating build part"sv); } return std::make_pair(PutPartResult.RawHash, std::move(PutPartResult.Needs)); } virtual CbObject GetBuildPart(const Oid& BuildId, const Oid& BuildPartId) override { ZEN_TRACE_CPU("Jupiter::GetBuildPart"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); JupiterResult GetBuildPartResult = m_Session.GetBuildPart(m_Namespace, m_Bucket, BuildId, BuildPartId); AddStatistic(GetBuildPartResult); if (!GetBuildPartResult.Success) { ThrowFromJupiterResult(GetBuildPartResult, "Failed fetching build part"sv); } return PayloadToCbObject(fmt::format("Failed fetching build part {}:", BuildPartId), GetBuildPartResult.Response); } virtual std::vector FinalizeBuildPart(const Oid& BuildId, const Oid& BuildPartId, const IoHash& PartHash) override { ZEN_TRACE_CPU("Jupiter::FinalizeBuildPart"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); FinalizeBuildPartResult FinalizePartResult = m_Session.FinalizeBuildPart(m_Namespace, m_Bucket, BuildId, BuildPartId, PartHash); AddStatistic(FinalizePartResult); if (!FinalizePartResult.Success) { ThrowFromJupiterResult(FinalizePartResult, "Failed finalizing build part"sv); } return std::move(FinalizePartResult.Needs); } virtual void PutBuildBlob(const Oid& BuildId, const IoHash& RawHash, ZenContentType ContentType, const CompositeBuffer& Payload) override { ZEN_TRACE_CPU("Jupiter::PutBuildBlob"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); JupiterResult PutBlobResult = m_Session.PutBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, ContentType, Payload); AddStatistic(PutBlobResult); if (!PutBlobResult.Success) { ThrowFromJupiterResult(PutBlobResult, "Failed putting build part"sv); } } virtual std::vector> PutLargeBuildBlob(const Oid& BuildId, const IoHash& RawHash, ZenContentType ContentType, uint64_t PayloadSize, std::function&& Transmitter, std::function&& OnSentBytes) override { ZEN_TRACE_CPU("Jupiter::PutLargeBuildBlob"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); std::vector> WorkItems; JupiterResult PutMultipartBlobResult = m_Session.PutMultipartBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, ContentType, PayloadSize, std::move(Transmitter), WorkItems); AddStatistic(PutMultipartBlobResult); if (!PutMultipartBlobResult.Success) { ThrowFromJupiterResult(PutMultipartBlobResult, "Failed putting large build blob"sv); } OnSentBytes(PutMultipartBlobResult.SentBytes, WorkItems.empty()); std::vector> WorkList; for (auto& WorkItem : WorkItems) { WorkList.emplace_back([this, WorkItem = std::move(WorkItem), OnSentBytes]() { Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); bool IsComplete = false; JupiterResult PartResult = WorkItem(IsComplete); AddStatistic(PartResult); if (!PartResult.Success) { ThrowFromJupiterResult(PartResult, "Failed putting large build blob"sv); } OnSentBytes(PartResult.SentBytes, IsComplete); }); } return WorkList; } virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override { ZEN_TRACE_CPU("Jupiter::GetBuildBlob"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); CreateDirectories(m_TempFolderPath); JupiterResult GetBuildBlobResult = m_Session.GetBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, m_TempFolderPath, RangeOffset, RangeBytes); AddStatistic(GetBuildBlobResult); if (!GetBuildBlobResult.Success) { ThrowFromJupiterResult(GetBuildBlobResult, "Failed fetching build blob"sv); } return std::move(GetBuildBlobResult.Response); } virtual std::vector> GetLargeBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t ChunkSize, std::function&& OnReceive, std::function&& OnComplete) override { ZEN_TRACE_CPU("Jupiter::GetLargeBuildBlob"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); std::vector> WorkItems; JupiterResult GetMultipartBlobResult = m_Session.GetMultipartBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, ChunkSize, std::move(OnReceive), std::move(OnComplete), WorkItems); AddStatistic(GetMultipartBlobResult); if (!GetMultipartBlobResult.Success) { ThrowFromJupiterResult(GetMultipartBlobResult, "Failed getting large build part"sv); } std::vector> WorkList; for (auto& WorkItem : WorkItems) { WorkList.emplace_back([this, WorkItem = std::move(WorkItem)]() { Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); JupiterResult PartResult = WorkItem(); AddStatistic(PartResult); if (!PartResult.Success) { ThrowFromJupiterResult(PartResult, "Failed getting large build part"sv); } }); } return WorkList; } virtual bool PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) override { ZEN_TRACE_CPU("Jupiter::PutBlockMetadata"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); IoBuffer Payload = MetaData.GetBuffer().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCbObject); JupiterResult PutMetaResult = m_Session.PutBlockMetadata(m_Namespace, m_Bucket, BuildId, BlockRawHash, Payload); AddStatistic(PutMetaResult); if (!PutMetaResult.Success) { if (PutMetaResult.ErrorCode == int32_t(HttpResponseCode::NotFound)) { return false; } ThrowFromJupiterResult(PutMetaResult, "Failed putting build block metadata"sv); } return true; } virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) override { ZEN_TRACE_CPU("Jupiter::FindBlocks"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); JupiterResult FindResult = m_Session.FindBlocks(m_Namespace, m_Bucket, BuildId, MaxBlockCount); AddStatistic(FindResult); if (!FindResult.Success) { ThrowFromJupiterResult(FindResult, "Failed fetching known blocks"sv); } return PayloadToCbObject("Failed fetching known blocks"sv, FindResult.Response); } virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span BlockHashes) override { ZEN_TRACE_CPU("Jupiter::GetBlockMetadata"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); CbObjectWriter Request; Request.BeginArray("blocks"sv); for (const IoHash& BlockHash : BlockHashes) { Request.AddHash(BlockHash); } Request.EndArray(); IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCbObject); JupiterResult GetBlockMetadataResult = m_Session.GetBlockMetadata(m_Namespace, m_Bucket, BuildId, Payload); AddStatistic(GetBlockMetadataResult); if (!GetBlockMetadataResult.Success) { ThrowFromJupiterResult(GetBlockMetadataResult, "Failed fetching block metadatas"sv); } return PayloadToCbObject("Failed fetching block metadatas", GetBlockMetadataResult.Response); } virtual void PutBuildPartStats(const Oid& BuildId, const Oid& BuildPartId, const tsl::robin_map& FloatStats) override { ZEN_UNUSED(BuildId, BuildPartId, FloatStats); CbObjectWriter Request; Request.BeginObject("floatStats"sv); for (auto It : FloatStats) { Request.AddFloat(It.first, It.second); } Request.EndObject(); IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCbObject); JupiterResult PutBuildPartStatsResult = m_Session.PutBuildPartStats(m_Namespace, m_Bucket, BuildId, BuildPartId, Payload); AddStatistic(PutBuildPartStatsResult); if (!PutBuildPartStatsResult.Success) { ThrowFromJupiterResult(PutBuildPartStatsResult, "Failed posting build part statistics"sv); } } virtual bool GetExtendedStatistics(ExtendedStatistics& OutStats) override { OutStats.ReceivedBytesPerSource.reserve(m_ReceivedBytesPerSource.size()); for (auto& It : m_ReceivedBytesPerSource) { OutStats.ReceivedBytesPerSource.insert_or_assign(It.first, m_SourceBytes[It.second]); } return true; } private: static CbObject PayloadToCbObject(std::string_view ErrorContext, const IoBuffer& Payload) { if (Payload.GetContentType() == ZenContentType::kJSON) { std::string_view Json(reinterpret_cast(Payload.GetData()), Payload.GetSize()); return LoadCompactBinaryFromJson(Json).AsObject(); } else if (Payload.GetContentType() == ZenContentType::kCbObject) { CbValidateError ValidateResult = CbValidateError::None; if (CbObject Object = ValidateAndReadCompactBinaryObject(IoBuffer(Payload), ValidateResult); ValidateResult == CbValidateError::None) { return Object; } else { throw std::runtime_error(fmt::format("{}: {} ({})", "Invalid compact binary object: '{}'", ErrorContext, ToString(Payload.GetContentType()), ToString(ValidateResult))); } } else if (Payload.GetContentType() == ZenContentType::kCompressedBinary) { IoHash RawHash; uint64_t RawSize; CbValidateError ValidateResult = CbValidateError::None; if (CbObject Object = ValidateAndReadCompactBinaryObject(CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, RawSize), ValidateResult); ValidateResult == CbValidateError::None) { return Object; } else { throw std::runtime_error(fmt::format("{}: {} ({})", "Invalid compresed compact binary object: '{}'", ErrorContext, ToString(Payload.GetContentType()), ToString(ValidateResult))); } } else { throw std::runtime_error( fmt::format("{}: {} ({})", "Unsupported response format", ErrorContext, ToString(Payload.GetContentType()))); } } void AddStatistic(const JupiterResult& Result) { m_Stats.TotalBytesWritten += Result.SentBytes; m_Stats.TotalBytesRead += Result.ReceivedBytes; m_Stats.TotalRequestTimeUs += uint64_t(Result.ElapsedSeconds * 1000000.0); m_Stats.TotalRequestCount++; SetAtomicMax(m_Stats.PeakSentBytes, Result.SentBytes); SetAtomicMax(m_Stats.PeakReceivedBytes, Result.ReceivedBytes); if (Result.ElapsedSeconds > 0.0) { uint64_t BytesPerSec = uint64_t((Result.SentBytes + Result.ReceivedBytes) / Result.ElapsedSeconds); SetAtomicMax(m_Stats.PeakBytesPerSec, BytesPerSec); } if (!Result.Source.empty()) { if (tsl::robin_map::const_iterator It = m_ReceivedBytesPerSource.find(Result.Source); It != m_ReceivedBytesPerSource.end()) { m_SourceBytes[It->second] += Result.ReceivedBytes; } else { m_ReceivedBytesPerSource.insert_or_assign(Result.Source, m_SourceBytes.size()); m_SourceBytes.push_back(Result.ReceivedBytes); } } } JupiterSession m_Session; Statistics& m_Stats; const std::string m_Namespace; const std::string m_Bucket; const std::filesystem::path m_TempFolderPath; tsl::robin_map m_ReceivedBytesPerSource; std::vector m_SourceBytes; }; std::unique_ptr CreateJupiterBuildStorage(LoggerRef InLog, HttpClient& InHttpClient, BuildStorageBase::Statistics& Stats, std::string_view Namespace, std::string_view Bucket, bool AllowRedirect, const std::filesystem::path& TempFolderPath) { ZEN_TRACE_CPU("CreateJupiterBuildStorage"); return std::make_unique(InLog, InHttpClient, Stats, Namespace, Bucket, AllowRedirect, TempFolderPath); } bool ParseBuildStorageUrl(std::string_view InUrl, std::string& OutHost, std::string& OutNamespace, std::string& OutBucket, std::string& OutBuildId) { std::string Url(InUrl); const std::string_view ExtendedApiString = "api/v2/builds/"; if (auto ApiString = ToLower(Url).find(ExtendedApiString); ApiString != std::string::npos) { Url.erase(ApiString, ExtendedApiString.length()); } const std::string ArtifactURLRegExString = R"((http[s]?:\/\/.*?)\/(.*?)\/(.*?)\/(.*))"; const std::regex ArtifactURLRegEx(ArtifactURLRegExString, std::regex::ECMAScript | std::regex::icase); std::match_results MatchResults; std::string_view UrlToParse(Url); if (regex_match(begin(UrlToParse), end(UrlToParse), MatchResults, ArtifactURLRegEx) && MatchResults.size() == 5) { auto GetMatch = [&MatchResults](uint32_t Index) -> std::string_view { ZEN_ASSERT(Index < MatchResults.size()); const auto& Match = MatchResults[Index]; return std::string_view(&*Match.first, Match.second - Match.first); }; const std::string_view Host = GetMatch(1); const std::string_view Namespace = GetMatch(2); const std::string_view Bucket = GetMatch(3); const std::string_view BuildId = GetMatch(4); OutHost = Host; OutNamespace = Namespace; OutBucket = Bucket; OutBuildId = BuildId; return true; } else { return false; } } } // namespace zen