// Copyright Epic Games, Inc. All Rights Reserved. #include "buildsremoteprojectstore.h" #include #include #include #include #include #include #include namespace zen { using namespace std::literals; static const std::string_view OplogContainerPartName = "oplogcontainer"sv; class BuildsRemoteStore : public RemoteProjectStore { public: BuildsRemoteStore(Ref&& InJupiterClient, std::string_view Namespace, std::string_view Bucket, const Oid& BuildId, const IoBuffer& MetaData, bool ForceDisableBlocks, bool ForceDisableTempBlocks, const std::filesystem::path& TempFilePath) : m_JupiterClient(std::move(InJupiterClient)) , m_Namespace(Namespace) , m_Bucket(Bucket) , m_BuildId(BuildId) , m_MetaData(MetaData) , m_TempFilePath(TempFilePath) { m_MetaData.MakeOwned(); if (ForceDisableBlocks) { m_EnableBlocks = false; } if (ForceDisableTempBlocks) { m_UseTempBlocks = false; } } virtual RemoteStoreInfo GetInfo() const override { return {.CreateBlocks = m_EnableBlocks, .UseTempBlockFiles = m_UseTempBlocks, .AllowChunking = true, .ContainerName = fmt::format("{}/{}/{}", m_Namespace, m_Bucket, m_BuildId), .Description = fmt::format("[cloud] {} as {}/{}/{}"sv, m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId)}; } virtual Stats GetStats() const override { return {.m_SentBytes = m_SentBytes.load(), .m_ReceivedBytes = m_ReceivedBytes.load(), .m_RequestTimeNS = m_RequestTimeNS.load(), .m_RequestCount = m_RequestCount.load(), .m_PeakSentBytes = m_PeakSentBytes.load(), .m_PeakReceivedBytes = m_PeakReceivedBytes.load(), .m_PeakBytesPerSec = m_PeakBytesPerSec.load()}; } virtual CreateContainerResult CreateContainer() override { ZEN_ASSERT(m_OplogBuildPartId == Oid::Zero); JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); IoBuffer Payload = m_MetaData; Payload.SetContentType(ZenContentType::kCbObject); JupiterResult PutResult = Session.PutBuild(m_Namespace, m_Bucket, m_BuildId, Payload); AddStats(PutResult); CreateContainerResult Result{ConvertResult(PutResult)}; if (Result.ErrorCode) { Result.Reason = fmt::format("Failed creating oplog build to {}/{}/{}/{}. Reason: '{}'", m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, Result.Reason); } m_OplogBuildPartId = Oid::NewOid(); return Result; } virtual SaveResult SaveContainer(const IoBuffer& Payload) override { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); PutBuildPartResult PutResult = Session.PutBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, OplogContainerPartName, Payload); AddStats(PutResult); SaveResult Result{ConvertResult(PutResult), {PutResult.Needs.begin(), PutResult.Needs.end()}, PutResult.RawHash}; if (Result.ErrorCode) { Result.Reason = fmt::format("Failed saving oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'", m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, Result.Reason); } return Result; } virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&& Block) override { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); JupiterResult PutResult = Session.PutBuildBlob(m_Namespace, m_Bucket, m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); AddStats(PutResult); SaveAttachmentResult Result{ConvertResult(PutResult)}; if (Result.ErrorCode) { Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}/{}. Reason: '{}'", m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, RawHash, Result.Reason); return Result; } if (Block.BlockHash == RawHash) { CbObjectWriter BlockMetaData; BlockMetaData.AddString("createdBy", GetRunningExecutablePath().stem().string()); IoBuffer MetaPayload = BuildChunkBlockDescription(Block, BlockMetaData.Save()).GetBuffer().AsIoBuffer(); MetaPayload.SetContentType(ZenContentType::kCbObject); JupiterResult PutMetaResult = Session.PutBlockMetadata(m_Namespace, m_Bucket, m_BuildId, RawHash, MetaPayload); AddStats(PutMetaResult); RemoteProjectStore::Result MetaDataResult = ConvertResult(PutMetaResult); if (MetaDataResult.ErrorCode) { ZEN_WARN("Failed saving block attachment meta data to {}/{}/{}/{}/{}. Reason: '{}'", m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, RawHash, MetaDataResult.Reason); } } return Result; } virtual SaveAttachmentsResult SaveAttachments(const std::vector& Chunks) override { SaveAttachmentsResult Result; for (const SharedBuffer& Chunk : Chunks) { CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer()); SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash(), {}); if (ChunkResult.ErrorCode) { return SaveAttachmentsResult{ChunkResult}; } } return Result; } virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) override { ZEN_UNUSED(RawHash); ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); FinalizeBuildPartResult FinalizeRefResult = Session.FinalizeBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash); AddStats(FinalizeRefResult); FinalizeResult Result{ConvertResult(FinalizeRefResult), {FinalizeRefResult.Needs.begin(), FinalizeRefResult.Needs.end()}}; if (Result.ErrorCode) { Result.Reason = fmt::format("Failed finalizing oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'", m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, Result.Reason); } else if (Result.Needs.empty()) { JupiterResult FinalizeBuildResult = Session.FinalizeBuild(m_Namespace, m_Bucket, m_BuildId); AddStats(FinalizeBuildResult); FinalizeBuildResult.ElapsedSeconds += FinalizeRefResult.ElapsedSeconds; Result = {ConvertResult(FinalizeBuildResult)}; if (Result.ErrorCode) { Result.Reason = fmt::format("Failed finalizing oplog container build to {}/{}/{}/{}. Reason: '{}'", m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, FinalizeBuildResult.Reason); } } return Result; } virtual LoadContainerResult LoadContainer() override { ZEN_ASSERT(m_OplogBuildPartId == Oid::Zero); JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); JupiterResult GetBuildResult = Session.GetBuild(m_Namespace, m_Bucket, m_BuildId); AddStats(GetBuildResult); LoadContainerResult Result{ConvertResult(GetBuildResult)}; if (Result.ErrorCode) { Result.Reason = fmt::format("Failed fetching oplog container build from {}/{}/{}/{}. Reason: '{}'", m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, Result.Reason); return Result; } CbObject BuildObject = LoadCompactBinaryObject(GetBuildResult.Response); if (!BuildObject) { Result.ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("The build {}/{}/{}/{} payload is not formatted as a compact binary object"sv, m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId); return Result; } CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView(); if (!PartsObject) { Result.ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("The build {}/{}/{}/{} payload does not contain a 'parts' object"sv, m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId); return Result; } m_OplogBuildPartId = PartsObject[OplogContainerPartName].AsObjectId(); if (m_OplogBuildPartId == Oid::Zero) { Result.ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("The build {}/{}/{}/{} payload 'parts' object does not contain a '{}' entry"sv, m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, OplogContainerPartName); return Result; } JupiterResult GetBuildPartResult = Session.GetBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId); AddStats(GetBuildPartResult); Result = {ConvertResult(GetBuildResult)}; Result.ElapsedSeconds += GetBuildResult.ElapsedSeconds; if (Result.ErrorCode) { Result.ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("Failed fetching oplog build part from {}/{}/{}/{}/{}. Reason: '{}'", m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, Result.Reason); return Result; } CbObject ContainerObject = LoadCompactBinaryObject(GetBuildPartResult.Response); if (!ContainerObject) { Result.ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("The build part for oplog container {}/{}/{}/{}/{} is not formatted as a compact binary object"sv, m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId); return Result; } Result.ContainerObject = std::move(ContainerObject); return Result; } virtual GetKnownBlocksResult GetKnownBlocks() override { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); JupiterResult FindResult = Session.FindBlocks(m_Namespace, m_Bucket, m_BuildId, (uint64_t)-1); AddStats(FindResult); GetKnownBlocksResult Result{ConvertResult(FindResult)}; if (Result.ErrorCode) { Result.ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'", m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, Result.Reason); return Result; } if (ValidateCompactBinary(FindResult.Response.GetView(), CbValidateMode::Default) != CbValidateError::None) { Result.ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("The block list {}/{}/{} is not formatted as a compact binary object"sv, m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId); return Result; } std::optional> Blocks = ParseChunkBlockDescriptionList(LoadCompactBinaryObject(FindResult.Response)); if (!Blocks) { Result.ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("The block list {}/{}/{} is not formatted as a list of blocks"sv, m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId); return Result; } Result.Blocks.reserve(Blocks.value().size()); for (ChunkBlockDescription& BlockDescription : Blocks.value()) { Result.Blocks.push_back(ThinChunkBlockDescription{.BlockHash = BlockDescription.BlockHash, .ChunkRawHashes = std::move(BlockDescription.ChunkRawHashes)}); } return Result; } virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); JupiterResult GetResult = Session.GetBuildBlob(m_Namespace, m_Bucket, m_BuildId, RawHash, m_TempFilePath); AddStats(GetResult); LoadAttachmentResult Result{ConvertResult(GetResult), std::move(GetResult.Response)}; if (GetResult.ErrorCode) { Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}/{}/{}. Reason: '{}'", m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, RawHash, Result.Reason); } return Result; } virtual LoadAttachmentsResult LoadAttachments(const std::vector& RawHashes) override { LoadAttachmentsResult Result; for (const IoHash& Hash : RawHashes) { LoadAttachmentResult ChunkResult = LoadAttachment(Hash); if (ChunkResult.ErrorCode) { return LoadAttachmentsResult{ChunkResult}; } ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast(ChunkResult.ElapsedSeconds * 1000))); Result.Chunks.emplace_back( std::pair{Hash, CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult.Bytes))}); } return Result; } private: void AddStats(const JupiterResult& Result) { m_SentBytes.fetch_add(gsl::narrow(Result.SentBytes)); m_ReceivedBytes.fetch_add(gsl::narrow(Result.ReceivedBytes)); m_RequestTimeNS.fetch_add(static_cast(Result.ElapsedSeconds * 1000000000)); SetAtomicMax(m_PeakSentBytes, Result.SentBytes); SetAtomicMax(m_PeakReceivedBytes, Result.ReceivedBytes); if (Result.ElapsedSeconds > 0.0) { uint64_t BytesPerSec = static_cast((Result.SentBytes + Result.ReceivedBytes) / Result.ElapsedSeconds); SetAtomicMax(m_PeakBytesPerSec, BytesPerSec); } m_RequestCount.fetch_add(1); } static Result ConvertResult(const JupiterResult& Response) { std::string Text; int32_t ErrorCode = 0; if (Response.ErrorCode != 0 || !Response.Success) { if (Response.Response) { HttpContentType ContentType = Response.Response.GetContentType(); if (ContentType == ZenContentType::kText || ContentType == ZenContentType::kJSON) { ExtendableStringBuilder<256> SB; SB.Append("\n"); SB.Append(std::string_view(reinterpret_cast(Response.Response.GetData()), Response.Response.GetSize())); Text = SB.ToString(); } else if (ContentType == ZenContentType::kCbObject) { ExtendableStringBuilder<256> SB; SB.Append("\n"); CompactBinaryToJson(Response.Response.GetView(), SB); Text = SB.ToString(); } } } if (Response.ErrorCode != 0) { ErrorCode = Response.ErrorCode; } else if (!Response.Success) { ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); } return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.ElapsedSeconds, .Reason = Response.Reason, .Text = Text}; } Ref m_JupiterClient; const std::string m_Namespace; const std::string m_Bucket; const Oid m_BuildId; IoBuffer m_MetaData; Oid m_OplogBuildPartId = Oid::Zero; std::filesystem::path m_TempFilePath; bool m_EnableBlocks = true; bool m_UseTempBlocks = true; std::atomic_uint64_t m_SentBytes = {}; std::atomic_uint64_t m_ReceivedBytes = {}; std::atomic_uint64_t m_RequestTimeNS = {}; std::atomic_uint64_t m_RequestCount = {}; std::atomic_uint64_t m_PeakSentBytes = {}; std::atomic_uint64_t m_PeakReceivedBytes = {}; std::atomic_uint64_t m_PeakBytesPerSec = {}; }; std::shared_ptr CreateBuildsRemoteStore(const BuildsRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath) { std::string Url = Options.Url; if (Url.find("://"sv) == std::string::npos) { // Assume https URL Url = fmt::format("https://{}"sv, Url); } JupiterClientOptions ClientOptions{.Name = "Remote store"sv, .ServiceUrl = Url, .ConnectTimeout = std::chrono::milliseconds(2000), .Timeout = std::chrono::milliseconds(1800000), .AssumeHttp2 = Options.AssumeHttp2, .AllowResume = true, .RetryCount = 4}; // 1) openid-provider if given (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider // 2) Access token as parameter in request // 3) Environment variable (different win vs linux/mac) // 4) Default openid-provider (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider std::function TokenProvider; if (!Options.OpenIdProvider.empty()) { TokenProvider = httpclientauth::CreateFromOpenIdProvider(Options.AuthManager, Options.OpenIdProvider); } else if (!Options.AccessToken.empty()) { TokenProvider = httpclientauth::CreateFromStaticToken(Options.AccessToken); } else if (!Options.OidcExePath.empty()) { if (auto TokenProviderMaybe = httpclientauth::CreateFromOidcTokenExecutable(Options.OidcExePath, Url); TokenProviderMaybe) { TokenProvider = TokenProviderMaybe.value(); } } if (!TokenProvider) { TokenProvider = httpclientauth::CreateFromDefaultOpenIdProvider(Options.AuthManager); } Ref Client(new JupiterClient(ClientOptions, std::move(TokenProvider))); std::shared_ptr RemoteStore = std::make_shared(std::move(Client), Options.Namespace, Options.Bucket, Options.BuildId, Options.MetaData, Options.ForceDisableBlocks, Options.ForceDisableTempBlocks, TempFilePath); return RemoteStore; } } // namespace zen