diff options
| author | Dan Engelbrecht <[email protected]> | 2025-10-03 09:59:41 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-03 09:59:41 +0200 |
| commit | 1f27c826bd36cd065e0fdd160f60c1f887138f33 (patch) | |
| tree | 000528b3f672d0126ca92438378d9007bfdcd0d8 /src/zenutil/jupiter | |
| parent | move remoteproject to remotestorelib (#542) (diff) | |
| download | zen-1f27c826bd36cd065e0fdd160f60c1f887138f33.tar.xz zen-1f27c826bd36cd065e0fdd160f60c1f887138f33.zip | |
move zenutil builds code to zenremotestore (#543)
* move buildstorage implementations to zenremotestore lib
* move builds storage to zenremotelib
Diffstat (limited to 'src/zenutil/jupiter')
| -rw-r--r-- | src/zenutil/jupiter/jupiterbuildstorage.cpp | 561 |
1 files changed, 0 insertions, 561 deletions
diff --git a/src/zenutil/jupiter/jupiterbuildstorage.cpp b/src/zenutil/jupiter/jupiterbuildstorage.cpp deleted file mode 100644 index 6eb3489dc..000000000 --- a/src/zenutil/jupiter/jupiterbuildstorage.cpp +++ /dev/null @@ -1,561 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include <zenutil/jupiter/jupiterbuildstorage.h> - -#include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinaryutil.h> -#include <zencore/fmtutils.h> -#include <zencore/scopeguard.h> -#include <zencore/timer.h> -#include <zencore/trace.h> -#include <zenutil/jupiter/jupitersession.h> - -ZEN_THIRD_PARTY_INCLUDES_START -#include <tsl/robin_map.h> -ZEN_THIRD_PARTY_INCLUDES_END - -#include <regex> - -namespace zen { - -using namespace std::literals; - -namespace { - void ThrowFromJupiterResult(const JupiterResult& Result, std::string_view Prefix) - { - int Error = Result.ErrorCode < (int)HttpResponseCode::Continue ? Result.ErrorCode : 0; - HttpResponseCode Status = - Result.ErrorCode >= int(HttpResponseCode::Continue) ? HttpResponseCode(Result.ErrorCode) : HttpResponseCode::ImATeapot; - throw HttpClientError(fmt::format("{}: {} ({})", Prefix, Result.Reason, Result.ErrorCode), Error, Status); - } -} // namespace - -class JupiterBuildStorage : public BuildStorage -{ -public: - JupiterBuildStorage(LoggerRef InLog, - HttpClient& InHttpClient, - Statistics& Stats, - std::string_view Namespace, - std::string_view Bucket, - bool AllowRedirect, - const std::filesystem::path& TempFolderPath) - : m_Session(InLog, InHttpClient, AllowRedirect) - , m_Stats(Stats) - , m_Namespace(Namespace) - , m_Bucket(Bucket) - , m_TempFolderPath(TempFolderPath) - { - } - virtual ~JupiterBuildStorage() {} - - virtual CbObject ListNamespaces(bool bRecursive) override - { - ZEN_TRACE_CPU("Jupiter::ListNamespaces"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - JupiterResult ListResult = m_Session.ListBuildNamespaces(); - AddStatistic(ListResult); - if (!ListResult.Success) - { - ThrowFromJupiterResult(ListResult, "Failed listing namespaces"); - } - CbObject NamespaceResponse = PayloadToCbObject("Failed listing namespaces"sv, ListResult.Response); - - CbObjectWriter Response; - Response.BeginArray("results"sv); - for (CbFieldView NamespaceField : NamespaceResponse["namespaces"]) - { - std::string_view Namespace = NamespaceField.AsString(); - if (!Namespace.empty()) - { - Response.BeginObject(); - Response.AddString("name", Namespace); - - if (bRecursive) - { - JupiterResult BucketsResult = m_Session.ListBuildBuckets(Namespace); - AddStatistic(BucketsResult); - if (!BucketsResult.Success) - { - ThrowFromJupiterResult(BucketsResult, fmt::format("Failed listing buckets in namespace {}", Namespace)); - } - CbObject BucketResponse = - PayloadToCbObject(fmt::format("Failed listing buckets in namespace {}", Namespace), BucketsResult.Response); - - Response.BeginArray("items"); - for (CbFieldView BucketField : BucketResponse["buckets"]) - { - std::string_view Bucket = BucketField.AsString(); - if (!Bucket.empty()) - { - Response.AddString(Bucket); - } - } - Response.EndArray(); - } - - Response.EndObject(); - } - } - Response.EndArray(); - - return Response.Save(); - } - - virtual CbObject ListBuilds(CbObject Query) override - { - ZEN_TRACE_CPU("Jupiter::ListBuilds"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - IoBuffer Payload = Query.GetBuffer().AsIoBuffer(); - Payload.SetContentType(ZenContentType::kCbObject); - JupiterResult ListResult = m_Session.ListBuilds(m_Namespace, m_Bucket, Payload); - AddStatistic(ListResult); - if (!ListResult.Success) - { - ThrowFromJupiterResult(ListResult, "Failed listing builds"sv); - } - return PayloadToCbObject("Failed listing builds"sv, ListResult.Response); - } - - virtual CbObject PutBuild(const Oid& BuildId, const CbObject& MetaData) override - { - ZEN_TRACE_CPU("Jupiter::PutBuild"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - IoBuffer Payload = MetaData.GetBuffer().AsIoBuffer(); - Payload.SetContentType(ZenContentType::kCbObject); - JupiterResult PutResult = m_Session.PutBuild(m_Namespace, m_Bucket, BuildId, Payload); - AddStatistic(PutResult); - if (!PutResult.Success) - { - ThrowFromJupiterResult(PutResult, "Failed creating build"sv); - } - return PayloadToCbObject(fmt::format("Failed creating build: {}", BuildId), PutResult.Response); - } - - virtual CbObject GetBuild(const Oid& BuildId) override - { - ZEN_TRACE_CPU("Jupiter::GetBuild"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - JupiterResult GetBuildResult = m_Session.GetBuild(m_Namespace, m_Bucket, BuildId); - AddStatistic(GetBuildResult); - if (!GetBuildResult.Success) - { - ThrowFromJupiterResult(GetBuildResult, "Failed fetching build"sv); - } - return PayloadToCbObject(fmt::format("Failed fetching build {}:", BuildId), GetBuildResult.Response); - } - - virtual void FinalizeBuild(const Oid& BuildId) override - { - ZEN_TRACE_CPU("Jupiter::FinalizeBuild"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - JupiterResult FinalizeBuildResult = m_Session.FinalizeBuild(m_Namespace, m_Bucket, BuildId); - AddStatistic(FinalizeBuildResult); - if (!FinalizeBuildResult.Success) - { - ThrowFromJupiterResult(FinalizeBuildResult, "Failed finalizing build"sv); - } - } - - virtual std::pair<IoHash, std::vector<IoHash>> PutBuildPart(const Oid& BuildId, - const Oid& BuildPartId, - std::string_view PartName, - const CbObject& MetaData) override - { - ZEN_TRACE_CPU("Jupiter::PutBuildPart"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - IoBuffer Payload = MetaData.GetBuffer().AsIoBuffer(); - Payload.SetContentType(ZenContentType::kCbObject); - PutBuildPartResult PutPartResult = m_Session.PutBuildPart(m_Namespace, m_Bucket, BuildId, BuildPartId, PartName, Payload); - AddStatistic(PutPartResult); - if (!PutPartResult.Success) - { - ThrowFromJupiterResult(PutPartResult, "Failed creating build part"sv); - } - return std::make_pair(PutPartResult.RawHash, std::move(PutPartResult.Needs)); - } - - virtual CbObject GetBuildPart(const Oid& BuildId, const Oid& BuildPartId) override - { - ZEN_TRACE_CPU("Jupiter::GetBuildPart"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - JupiterResult GetBuildPartResult = m_Session.GetBuildPart(m_Namespace, m_Bucket, BuildId, BuildPartId); - AddStatistic(GetBuildPartResult); - if (!GetBuildPartResult.Success) - { - ThrowFromJupiterResult(GetBuildPartResult, "Failed fetching build part"sv); - } - return PayloadToCbObject(fmt::format("Failed fetching build part {}:", BuildPartId), GetBuildPartResult.Response); - } - - virtual std::vector<IoHash> FinalizeBuildPart(const Oid& BuildId, const Oid& BuildPartId, const IoHash& PartHash) override - { - ZEN_TRACE_CPU("Jupiter::FinalizeBuildPart"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - FinalizeBuildPartResult FinalizePartResult = m_Session.FinalizeBuildPart(m_Namespace, m_Bucket, BuildId, BuildPartId, PartHash); - AddStatistic(FinalizePartResult); - if (!FinalizePartResult.Success) - { - ThrowFromJupiterResult(FinalizePartResult, "Failed finalizing build part"sv); - } - return std::move(FinalizePartResult.Needs); - } - - virtual void PutBuildBlob(const Oid& BuildId, - const IoHash& RawHash, - ZenContentType ContentType, - const CompositeBuffer& Payload) override - { - ZEN_TRACE_CPU("Jupiter::PutBuildBlob"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - JupiterResult PutBlobResult = m_Session.PutBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, ContentType, Payload); - AddStatistic(PutBlobResult); - if (!PutBlobResult.Success) - { - ThrowFromJupiterResult(PutBlobResult, "Failed putting build part"sv); - } - } - - virtual std::vector<std::function<void()>> PutLargeBuildBlob(const Oid& BuildId, - const IoHash& RawHash, - ZenContentType ContentType, - uint64_t PayloadSize, - std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter, - std::function<void(uint64_t, bool)>&& OnSentBytes) override - { - ZEN_TRACE_CPU("Jupiter::PutLargeBuildBlob"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - std::vector<std::function<JupiterResult(bool&)>> WorkItems; - JupiterResult PutMultipartBlobResult = m_Session.PutMultipartBuildBlob(m_Namespace, - m_Bucket, - BuildId, - RawHash, - ContentType, - PayloadSize, - std::move(Transmitter), - WorkItems); - AddStatistic(PutMultipartBlobResult); - if (!PutMultipartBlobResult.Success) - { - ThrowFromJupiterResult(PutMultipartBlobResult, "Failed putting large build blob"sv); - } - OnSentBytes(PutMultipartBlobResult.SentBytes, WorkItems.empty()); - - std::vector<std::function<void()>> WorkList; - for (auto& WorkItem : WorkItems) - { - WorkList.emplace_back([this, WorkItem = std::move(WorkItem), OnSentBytes]() { - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - bool IsComplete = false; - JupiterResult PartResult = WorkItem(IsComplete); - AddStatistic(PartResult); - if (!PartResult.Success) - { - ThrowFromJupiterResult(PartResult, "Failed putting large build blob"sv); - } - OnSentBytes(PartResult.SentBytes, IsComplete); - }); - } - return WorkList; - } - - virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override - { - ZEN_TRACE_CPU("Jupiter::GetBuildBlob"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - CreateDirectories(m_TempFolderPath); - JupiterResult GetBuildBlobResult = - m_Session.GetBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, m_TempFolderPath, RangeOffset, RangeBytes); - AddStatistic(GetBuildBlobResult); - if (!GetBuildBlobResult.Success) - { - ThrowFromJupiterResult(GetBuildBlobResult, "Failed fetching build blob"sv); - } - return std::move(GetBuildBlobResult.Response); - } - - virtual std::vector<std::function<void()>> GetLargeBuildBlob(const Oid& BuildId, - const IoHash& RawHash, - uint64_t ChunkSize, - std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive, - std::function<void()>&& OnComplete) override - { - ZEN_TRACE_CPU("Jupiter::GetLargeBuildBlob"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - std::vector<std::function<JupiterResult()>> WorkItems; - JupiterResult GetMultipartBlobResult = m_Session.GetMultipartBuildBlob(m_Namespace, - m_Bucket, - BuildId, - RawHash, - ChunkSize, - std::move(OnReceive), - std::move(OnComplete), - WorkItems); - - AddStatistic(GetMultipartBlobResult); - if (!GetMultipartBlobResult.Success) - { - ThrowFromJupiterResult(GetMultipartBlobResult, "Failed getting large build part"sv); - } - std::vector<std::function<void()>> WorkList; - for (auto& WorkItem : WorkItems) - { - WorkList.emplace_back([this, WorkItem = std::move(WorkItem)]() { - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - JupiterResult PartResult = WorkItem(); - AddStatistic(PartResult); - if (!PartResult.Success) - { - ThrowFromJupiterResult(PartResult, "Failed getting large build part"sv); - } - }); - } - return WorkList; - } - - virtual bool PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) override - { - ZEN_TRACE_CPU("Jupiter::PutBlockMetadata"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - IoBuffer Payload = MetaData.GetBuffer().AsIoBuffer(); - Payload.SetContentType(ZenContentType::kCbObject); - JupiterResult PutMetaResult = m_Session.PutBlockMetadata(m_Namespace, m_Bucket, BuildId, BlockRawHash, Payload); - AddStatistic(PutMetaResult); - if (!PutMetaResult.Success) - { - if (PutMetaResult.ErrorCode == int32_t(HttpResponseCode::NotFound)) - { - return false; - } - ThrowFromJupiterResult(PutMetaResult, "Failed putting build block metadata"sv); - } - return true; - } - - virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) override - { - ZEN_TRACE_CPU("Jupiter::FindBlocks"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - JupiterResult FindResult = m_Session.FindBlocks(m_Namespace, m_Bucket, BuildId, MaxBlockCount); - AddStatistic(FindResult); - if (!FindResult.Success) - { - ThrowFromJupiterResult(FindResult, "Failed fetching known blocks"sv); - } - return PayloadToCbObject("Failed fetching known blocks"sv, FindResult.Response); - } - - virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) override - { - ZEN_TRACE_CPU("Jupiter::GetBlockMetadata"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - CbObjectWriter Request; - - Request.BeginArray("blocks"sv); - for (const IoHash& BlockHash : BlockHashes) - { - Request.AddHash(BlockHash); - } - Request.EndArray(); - - IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); - Payload.SetContentType(ZenContentType::kCbObject); - JupiterResult GetBlockMetadataResult = m_Session.GetBlockMetadata(m_Namespace, m_Bucket, BuildId, Payload); - AddStatistic(GetBlockMetadataResult); - if (!GetBlockMetadataResult.Success) - { - ThrowFromJupiterResult(GetBlockMetadataResult, "Failed fetching block metadatas"sv); - } - return PayloadToCbObject("Failed fetching block metadatas", GetBlockMetadataResult.Response); - } - - virtual void PutBuildPartStats(const Oid& BuildId, - const Oid& BuildPartId, - const tsl::robin_map<std::string, double>& FloatStats) override - { - ZEN_UNUSED(BuildId, BuildPartId, FloatStats); - CbObjectWriter Request; - Request.BeginObject("floatStats"sv); - for (auto It : FloatStats) - { - Request.AddFloat(It.first, It.second); - } - Request.EndObject(); - IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); - Payload.SetContentType(ZenContentType::kCbObject); - JupiterResult PutBuildPartStatsResult = m_Session.PutBuildPartStats(m_Namespace, m_Bucket, BuildId, BuildPartId, Payload); - AddStatistic(PutBuildPartStatsResult); - if (!PutBuildPartStatsResult.Success) - { - ThrowFromJupiterResult(PutBuildPartStatsResult, "Failed posting build part statistics"sv); - } - } - -private: - static CbObject PayloadToCbObject(std::string_view ErrorContext, const IoBuffer& Payload) - { - if (Payload.GetContentType() == ZenContentType::kJSON) - { - std::string_view Json(reinterpret_cast<const char*>(Payload.GetData()), Payload.GetSize()); - return LoadCompactBinaryFromJson(Json).AsObject(); - } - else if (Payload.GetContentType() == ZenContentType::kCbObject) - { - CbValidateError ValidateResult = CbValidateError::None; - if (CbObject Object = ValidateAndReadCompactBinaryObject(IoBuffer(Payload), ValidateResult); - ValidateResult == CbValidateError::None) - { - return Object; - } - else - { - throw std::runtime_error(fmt::format("{}: {} ({})", - "Invalid compact binary object: '{}'", - ErrorContext, - ToString(Payload.GetContentType()), - ToString(ValidateResult))); - } - } - else if (Payload.GetContentType() == ZenContentType::kCompressedBinary) - { - IoHash RawHash; - uint64_t RawSize; - CbValidateError ValidateResult = CbValidateError::None; - if (CbObject Object = - ValidateAndReadCompactBinaryObject(CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, RawSize), - ValidateResult); - ValidateResult == CbValidateError::None) - { - return Object; - } - else - { - throw std::runtime_error(fmt::format("{}: {} ({})", - "Invalid compresed compact binary object: '{}'", - ErrorContext, - ToString(Payload.GetContentType()), - ToString(ValidateResult))); - } - } - else - { - throw std::runtime_error( - fmt::format("{}: {} ({})", "Unsupported response format", ErrorContext, ToString(Payload.GetContentType()))); - } - } - - void AddStatistic(const JupiterResult& Result) - { - m_Stats.TotalBytesWritten += Result.SentBytes; - m_Stats.TotalBytesRead += Result.ReceivedBytes; - m_Stats.TotalRequestTimeUs += uint64_t(Result.ElapsedSeconds * 1000000.0); - m_Stats.TotalRequestCount++; - - SetAtomicMax(m_Stats.PeakSentBytes, Result.SentBytes); - SetAtomicMax(m_Stats.PeakReceivedBytes, Result.ReceivedBytes); - if (Result.ElapsedSeconds > 0.0) - { - uint64_t BytesPerSec = uint64_t((Result.SentBytes + Result.ReceivedBytes) / Result.ElapsedSeconds); - SetAtomicMax(m_Stats.PeakBytesPerSec, BytesPerSec); - } - } - - JupiterSession m_Session; - Statistics& m_Stats; - const std::string m_Namespace; - const std::string m_Bucket; - const std::filesystem::path m_TempFolderPath; -}; - -std::unique_ptr<BuildStorage> -CreateJupiterBuildStorage(LoggerRef InLog, - HttpClient& InHttpClient, - BuildStorage::Statistics& Stats, - std::string_view Namespace, - std::string_view Bucket, - bool AllowRedirect, - const std::filesystem::path& TempFolderPath) -{ - ZEN_TRACE_CPU("CreateJupiterBuildStorage"); - - return std::make_unique<JupiterBuildStorage>(InLog, InHttpClient, Stats, Namespace, Bucket, AllowRedirect, TempFolderPath); -} - -bool -ParseBuildStorageUrl(std::string_view InUrl, - std::string& OutHost, - std::string& OutNamespace, - std::string& OutBucket, - std::string& OutBuildId) -{ - std::string Url(InUrl); - const std::string_view ExtendedApiString = "api/v2/builds/"; - if (auto ApiString = ToLower(Url).find(ExtendedApiString); ApiString != std::string::npos) - { - Url.erase(ApiString, ExtendedApiString.length()); - } - - const std::string ArtifactURLRegExString = R"((http[s]?:\/\/.*?)\/(.*?)\/(.*?)\/(.*))"; - const std::regex ArtifactURLRegEx(ArtifactURLRegExString, std::regex::ECMAScript | std::regex::icase); - std::match_results<std::string_view::const_iterator> MatchResults; - std::string_view UrlToParse(Url); - if (regex_match(begin(UrlToParse), end(UrlToParse), MatchResults, ArtifactURLRegEx) && MatchResults.size() == 5) - { - auto GetMatch = [&MatchResults](uint32_t Index) -> std::string_view { - ZEN_ASSERT(Index < MatchResults.size()); - - const auto& Match = MatchResults[Index]; - - return std::string_view(&*Match.first, Match.second - Match.first); - }; - - const std::string_view Host = GetMatch(1); - const std::string_view Namespace = GetMatch(2); - const std::string_view Bucket = GetMatch(3); - const std::string_view BuildId = GetMatch(4); - - OutHost = Host; - OutNamespace = Namespace; - OutBucket = Bucket; - OutBuildId = BuildId; - return true; - } - else - { - return false; - } -} - -} // namespace zen |