// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include ZEN_THIRD_PARTY_INCLUDES_END namespace zen { using namespace std::literals; class JupiterBuildStorage : public BuildStorage { public: JupiterBuildStorage(LoggerRef InLog, HttpClient& InHttpClient, Statistics& Stats, std::string_view Namespace, std::string_view Bucket, const std::filesystem::path& TempFolderPath) : m_Session(InLog, InHttpClient) , m_Stats(Stats) , m_Namespace(Namespace) , m_Bucket(Bucket) , m_TempFolderPath(TempFolderPath) { } virtual ~JupiterBuildStorage() {} virtual CbObject ListBuilds(CbObject Query) override { ZEN_TRACE_CPU("Jupiter::ListBuilds"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); IoBuffer Payload = Query.GetBuffer().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCbObject); JupiterResult ListResult = m_Session.ListBuilds(m_Namespace, m_Bucket, Payload); AddStatistic(ListResult); if (!ListResult.Success) { throw std::runtime_error(fmt::format("Failed listing builds: {} ({})", ListResult.Reason, ListResult.ErrorCode)); } return PayloadToCbObject("Failed listing builds"sv, ListResult.Response); } virtual CbObject PutBuild(const Oid& BuildId, const CbObject& MetaData) override { ZEN_TRACE_CPU("Jupiter::PutBuild"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); IoBuffer Payload = MetaData.GetBuffer().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCbObject); JupiterResult PutResult = m_Session.PutBuild(m_Namespace, m_Bucket, BuildId, Payload); AddStatistic(PutResult); if (!PutResult.Success) { throw std::runtime_error(fmt::format("Failed creating build: {} ({})", PutResult.Reason, PutResult.ErrorCode)); } return PayloadToCbObject(fmt::format("Failed creating build: {}", BuildId), PutResult.Response); } virtual CbObject GetBuild(const Oid& BuildId) override { ZEN_TRACE_CPU("Jupiter::GetBuild"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); JupiterResult GetBuildResult = m_Session.GetBuild(m_Namespace, m_Bucket, BuildId); AddStatistic(GetBuildResult); if (!GetBuildResult.Success) { throw std::runtime_error(fmt::format("Failed fetching build: {} ({})", GetBuildResult.Reason, GetBuildResult.ErrorCode)); } return PayloadToCbObject(fmt::format("Failed fetching build {}:", BuildId), GetBuildResult.Response); } virtual void FinalizeBuild(const Oid& BuildId) override { ZEN_TRACE_CPU("Jupiter::FinalizeBuild"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); JupiterResult FinalizeBuildResult = m_Session.FinalizeBuild(m_Namespace, m_Bucket, BuildId); AddStatistic(FinalizeBuildResult); if (!FinalizeBuildResult.Success) { throw std::runtime_error( fmt::format("Failed finalizing build part: {} ({})", FinalizeBuildResult.Reason, FinalizeBuildResult.ErrorCode)); } } virtual std::pair> PutBuildPart(const Oid& BuildId, const Oid& BuildPartId, std::string_view PartName, const CbObject& MetaData) override { ZEN_TRACE_CPU("Jupiter::PutBuildPart"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); IoBuffer Payload = MetaData.GetBuffer().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCbObject); PutBuildPartResult PutPartResult = m_Session.PutBuildPart(m_Namespace, m_Bucket, BuildId, BuildPartId, PartName, Payload); AddStatistic(PutPartResult); if (!PutPartResult.Success) { throw std::runtime_error(fmt::format("Failed creating build part: {} ({})", PutPartResult.Reason, PutPartResult.ErrorCode)); } return std::make_pair(PutPartResult.RawHash, std::move(PutPartResult.Needs)); } virtual CbObject GetBuildPart(const Oid& BuildId, const Oid& BuildPartId) override { ZEN_TRACE_CPU("Jupiter::GetBuildPart"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); JupiterResult GetBuildPartResult = m_Session.GetBuildPart(m_Namespace, m_Bucket, BuildId, BuildPartId); AddStatistic(GetBuildPartResult); if (!GetBuildPartResult.Success) { throw std::runtime_error(fmt::format("Failed fetching build part {}: {} ({})", BuildPartId, GetBuildPartResult.Reason, GetBuildPartResult.ErrorCode)); } return PayloadToCbObject(fmt::format("Failed fetching build part {}:", BuildPartId), GetBuildPartResult.Response); } virtual std::vector FinalizeBuildPart(const Oid& BuildId, const Oid& BuildPartId, const IoHash& PartHash) override { ZEN_TRACE_CPU("Jupiter::FinalizeBuildPart"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); FinalizeBuildPartResult FinalizePartResult = m_Session.FinalizeBuildPart(m_Namespace, m_Bucket, BuildId, BuildPartId, PartHash); AddStatistic(FinalizePartResult); if (!FinalizePartResult.Success) { throw std::runtime_error( fmt::format("Failed finalizing build part: {} ({})", FinalizePartResult.Reason, FinalizePartResult.ErrorCode)); } return std::move(FinalizePartResult.Needs); } virtual void PutBuildBlob(const Oid& BuildId, const IoHash& RawHash, ZenContentType ContentType, const CompositeBuffer& Payload) override { ZEN_TRACE_CPU("Jupiter::PutBuildBlob"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); JupiterResult PutBlobResult = m_Session.PutBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, ContentType, Payload); AddStatistic(PutBlobResult); if (!PutBlobResult.Success) { throw std::runtime_error(fmt::format("Failed putting build part: {} ({})", PutBlobResult.Reason, PutBlobResult.ErrorCode)); } } virtual std::vector> PutLargeBuildBlob(const Oid& BuildId, const IoHash& RawHash, ZenContentType ContentType, uint64_t PayloadSize, std::function&& Transmitter, std::function&& OnSentBytes) override { ZEN_TRACE_CPU("Jupiter::PutLargeBuildBlob"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); std::vector> WorkItems; JupiterResult PutMultipartBlobResult = m_Session.PutMultipartBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, ContentType, PayloadSize, std::move(Transmitter), WorkItems); AddStatistic(PutMultipartBlobResult); if (!PutMultipartBlobResult.Success) { throw std::runtime_error( fmt::format("Failed putting build part: {} ({})", PutMultipartBlobResult.Reason, PutMultipartBlobResult.ErrorCode)); } OnSentBytes(PutMultipartBlobResult.SentBytes, WorkItems.empty()); std::vector> WorkList; for (auto& WorkItem : WorkItems) { WorkList.emplace_back([this, WorkItem = std::move(WorkItem), OnSentBytes]() { Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); bool IsComplete = false; JupiterResult PartResult = WorkItem(IsComplete); AddStatistic(PartResult); if (!PartResult.Success) { throw std::runtime_error(fmt::format("Failed putting build part: {} ({})", PartResult.Reason, PartResult.ErrorCode)); } OnSentBytes(PartResult.SentBytes, IsComplete); }); } return WorkList; } virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override { ZEN_TRACE_CPU("Jupiter::GetBuildBlob"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); CreateDirectories(m_TempFolderPath); JupiterResult GetBuildBlobResult = m_Session.GetBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, m_TempFolderPath, RangeOffset, RangeBytes); AddStatistic(GetBuildBlobResult); if (!GetBuildBlobResult.Success) { throw std::runtime_error( fmt::format("Failed fetching build blob {}: {} ({})", RawHash, GetBuildBlobResult.Reason, GetBuildBlobResult.ErrorCode)); } return std::move(GetBuildBlobResult.Response); } virtual std::vector> GetLargeBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t ChunkSize, std::function&& OnReceive, std::function&& OnComplete) override { ZEN_TRACE_CPU("Jupiter::GetLargeBuildBlob"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); std::vector> WorkItems; JupiterResult GetMultipartBlobResult = m_Session.GetMultipartBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, ChunkSize, std::move(OnReceive), std::move(OnComplete), WorkItems); AddStatistic(GetMultipartBlobResult); if (!GetMultipartBlobResult.Success) { throw std::runtime_error( fmt::format("Failed getting build part: {} ({})", GetMultipartBlobResult.Reason, GetMultipartBlobResult.ErrorCode)); } std::vector> WorkList; for (auto& WorkItem : WorkItems) { WorkList.emplace_back([this, WorkItem = std::move(WorkItem)]() { Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); JupiterResult PartResult = WorkItem(); AddStatistic(PartResult); if (!PartResult.Success) { throw std::runtime_error(fmt::format("Failed getting build part: {} ({})", PartResult.Reason, PartResult.ErrorCode)); } }); } return WorkList; } virtual void PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) override { ZEN_TRACE_CPU("Jupiter::PutBlockMetadata"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); IoBuffer Payload = MetaData.GetBuffer().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCbObject); JupiterResult PutMetaResult = m_Session.PutBlockMetadata(m_Namespace, m_Bucket, BuildId, BlockRawHash, Payload); AddStatistic(PutMetaResult); if (!PutMetaResult.Success) { throw std::runtime_error( fmt::format("Failed putting build block metadata: {} ({})", PutMetaResult.Reason, PutMetaResult.ErrorCode)); } } virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) override { ZEN_TRACE_CPU("Jupiter::FindBlocks"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); JupiterResult FindResult = m_Session.FindBlocks(m_Namespace, m_Bucket, BuildId, MaxBlockCount); AddStatistic(FindResult); if (!FindResult.Success) { throw std::runtime_error(fmt::format("Failed fetching known blocks: {} ({})", FindResult.Reason, FindResult.ErrorCode)); } return PayloadToCbObject("Failed fetching known blocks"sv, FindResult.Response); } virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span BlockHashes) override { ZEN_TRACE_CPU("Jupiter::GetBlockMetadata"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); CbObjectWriter Request; Request.BeginArray("blocks"sv); for (const IoHash& BlockHash : BlockHashes) { Request.AddHash(BlockHash); } Request.EndArray(); IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCbObject); JupiterResult GetBlockMetadataResult = m_Session.GetBlockMetadata(m_Namespace, m_Bucket, BuildId, Payload); AddStatistic(GetBlockMetadataResult); if (!GetBlockMetadataResult.Success) { throw std::runtime_error( fmt::format("Failed fetching block metadatas: {} ({})", GetBlockMetadataResult.Reason, GetBlockMetadataResult.ErrorCode)); } return PayloadToCbObject("Failed fetching block metadatas", GetBlockMetadataResult.Response); } virtual void PutBuildPartStats(const Oid& BuildId, const Oid& BuildPartId, const tsl::robin_map& FloatStats) override { ZEN_UNUSED(BuildId, BuildPartId, FloatStats); CbObjectWriter Request; Request.BeginObject("floatStats"sv); for (auto It : FloatStats) { Request.AddFloat(It.first, It.second); } Request.EndObject(); IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCbObject); JupiterResult PutBuildPartStatsResult = m_Session.PutBuildPartStats(m_Namespace, m_Bucket, BuildId, BuildPartId, Payload); AddStatistic(PutBuildPartStatsResult); if (!PutBuildPartStatsResult.Success) { throw std::runtime_error(fmt::format("Failed posting build part statistics: {} ({})", PutBuildPartStatsResult.Reason, PutBuildPartStatsResult.ErrorCode)); } } private: static CbObject PayloadToCbObject(std::string_view Context, const IoBuffer& Payload) { if (Payload.GetContentType() == ZenContentType::kJSON) { std::string_view Json(reinterpret_cast(Payload.GetData()), Payload.GetSize()); return LoadCompactBinaryFromJson(Json).AsObject(); } else if (Payload.GetContentType() == ZenContentType::kCbObject) { return LoadCompactBinaryObject(Payload); } else if (Payload.GetContentType() == ZenContentType::kCompressedBinary) { IoHash RawHash; uint64_t RawSize; return LoadCompactBinaryObject(CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, RawSize)); } else { throw std::runtime_error( fmt::format("{}: {} ({})", "Unsupported response format", Context, ToString(Payload.GetContentType()))); } } void AddStatistic(const JupiterResult& Result) { m_Stats.TotalBytesWritten += Result.SentBytes; m_Stats.TotalBytesRead += Result.ReceivedBytes; m_Stats.TotalRequestTimeUs += uint64_t(Result.ElapsedSeconds * 1000000.0); m_Stats.TotalRequestCount++; } JupiterSession m_Session; Statistics& m_Stats; const std::string m_Namespace; const std::string m_Bucket; const std::filesystem::path m_TempFolderPath; }; std::unique_ptr CreateJupiterBuildStorage(LoggerRef InLog, HttpClient& InHttpClient, BuildStorage::Statistics& Stats, std::string_view Namespace, std::string_view Bucket, const std::filesystem::path& TempFolderPath) { ZEN_TRACE_CPU("CreateJupiterBuildStorage"); return std::make_unique(InLog, InHttpClient, Stats, Namespace, Bucket, TempFolderPath); } } // namespace zen