// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #include namespace zen { using namespace std::literals; static const std::string_view OplogContainerPartName = "oplogcontainer"sv; class BuildsRemoteStore : public RemoteProjectStore { public: BuildsRemoteStore(LoggerRef InLog, const HttpClientSettings& ClientSettings, std::string_view HostUrl, const std::filesystem::path& TempFilePath, std::string_view Namespace, std::string_view Bucket, const Oid& BuildId, const IoBuffer& MetaData, bool ForceDisableBlocks, bool ForceDisableTempBlocks) : m_Log(InLog) , m_BuildStorageHttp(HostUrl, ClientSettings) , m_BuildStorage(CreateJupiterBuildStorage(Log(), m_BuildStorageHttp, m_BuildStorageStats, Namespace, Bucket, /*AllowRedirect*/ false, TempFilePath)) , m_Namespace(Namespace) , m_Bucket(Bucket) , m_BuildId(BuildId) , m_MetaData(MetaData) , m_EnableBlocks(!ForceDisableBlocks) , m_UseTempBlocks(!ForceDisableTempBlocks) { m_MetaData.MakeOwned(); } virtual RemoteStoreInfo GetInfo() const override { return {.CreateBlocks = m_EnableBlocks, .UseTempBlockFiles = m_UseTempBlocks, .AllowChunking = true, .ContainerName = fmt::format("{}/{}/{}", m_Namespace, m_Bucket, m_BuildId), .Description = fmt::format("[cloud] {}. SessionId: {}. {}/{}/{}"sv, m_BuildStorageHttp.GetBaseUri(), m_BuildStorageHttp.GetSessionId(), m_Namespace, m_Bucket, m_BuildId)}; } virtual Stats GetStats() const override { return {.m_SentBytes = m_BuildStorageStats.TotalBytesWritten.load(), .m_ReceivedBytes = m_BuildStorageStats.TotalBytesRead.load(), .m_RequestTimeNS = m_BuildStorageStats.TotalRequestTimeUs.load() * 1000, .m_RequestCount = m_BuildStorageStats.TotalRequestCount.load(), .m_PeakSentBytes = m_BuildStorageStats.PeakSentBytes.load(), .m_PeakReceivedBytes = m_BuildStorageStats.PeakReceivedBytes.load(), .m_PeakBytesPerSec = m_BuildStorageStats.PeakBytesPerSec.load()}; } virtual bool GetExtendedStats(ExtendedStats& OutStats) const override { bool Result = false; BuildStorageBase::ExtendedStatistics StorageStats; if (m_BuildStorage->GetExtendedStatistics(StorageStats)) { for (auto It : StorageStats.ReceivedBytesPerSource) { OutStats.m_ReceivedBytesPerSource.insert_or_assign(It.first, It.second); } Result = true; } return Result; } virtual CreateContainerResult CreateContainer() override { ZEN_ASSERT(m_OplogBuildPartId == Oid::Zero); CreateContainerResult Result; Stopwatch Timer; auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; }); CbObject Payload = LoadCompactBinaryObject(m_MetaData); try { CbObject PutBuildResult = m_BuildStorage->PutBuild(m_BuildId, Payload); ZEN_UNUSED(PutBuildResult); m_OplogBuildPartId = Oid::NewOid(); } catch (const HttpClientError& Ex) { Result.ErrorCode = MakeErrorCode(Ex); Result.Reason = fmt::format("Failed creating oplog build to {}/{}/{}/{}. Reason: '{}'", m_BuildStorageHttp.GetBaseUri(), m_Namespace, m_Bucket, m_BuildId, Ex.what()); } catch (const std::exception& Ex) { Result.ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("Failed creating oplog build to {}/{}/{}/{}. Reason: '{}'", m_BuildStorageHttp.GetBaseUri(), m_Namespace, m_Bucket, m_BuildId, Ex.what()); } return Result; } virtual SaveResult SaveContainer(const IoBuffer& Payload) override { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); SaveResult Result; Stopwatch Timer; auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; }); try { CbObject ObjectPayload = LoadCompactBinaryObject(Payload); std::pair> PutBuildPartResult = m_BuildStorage->PutBuildPart(m_BuildId, m_OplogBuildPartId, OplogContainerPartName, ObjectPayload); Result.RawHash = PutBuildPartResult.first; Result.Needs = std::unordered_set(PutBuildPartResult.second.begin(), PutBuildPartResult.second.end()); } catch (const HttpClientError& Ex) { Result.ErrorCode = MakeErrorCode(Ex); Result.Reason = fmt::format("Failed saving oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'", m_BuildStorageHttp.GetBaseUri(), m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, Ex.what()); } catch (const std::exception& Ex) { Result.ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("Failed saving oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'", m_BuildStorageHttp.GetBaseUri(), m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, Ex.what()); } return Result; } virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&& Block) override { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); SaveAttachmentResult Result; Stopwatch Timer; auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; }); try { m_BuildStorage->PutBuildBlob(m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); if (Block.BlockHash == RawHash) { try { CbObjectWriter BlockMetaData; BlockMetaData.AddString("createdBy", GetRunningExecutablePath().stem().string()); CbObject MetaPayload = BuildChunkBlockDescription(Block, BlockMetaData.Save()); if (!m_BuildStorage->PutBlockMetadata(m_BuildId, RawHash, MetaPayload)) { ZEN_WARN("Failed saving block attachment meta data to {}/{}/{}/{}/{}. Reason: '{}'", m_BuildStorageHttp.GetBaseUri(), m_Namespace, m_Bucket, m_BuildId, RawHash, "not found"); } } catch (const HttpClientError& Ex) { Result.ErrorCode = MakeErrorCode(Ex); Result.Reason = fmt::format("Failed saving block attachment meta data to {}/{}/{}/{}. Reason: '{}'", m_BuildStorageHttp.GetBaseUri(), m_Namespace, m_Bucket, m_BuildId, Ex.what()); } catch (const std::exception& Ex) { Result.ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("Failed saving block attachment meta data to {}/{}/{}/{}. Reason: '{}'", m_BuildStorageHttp.GetBaseUri(), m_Namespace, m_Bucket, m_BuildId, Ex.what()); } } } catch (const HttpClientError& Ex) { Result.ErrorCode = MakeErrorCode(Ex); Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}. Reason: '{}'", m_BuildStorageHttp.GetBaseUri(), m_Namespace, m_Bucket, m_BuildId, Ex.what()); } catch (const std::exception& Ex) { Result.ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}. Reason: '{}'", m_BuildStorageHttp.GetBaseUri(), m_Namespace, m_Bucket, m_BuildId, Ex.what()); } return Result; } virtual SaveAttachmentsResult SaveAttachments(const std::vector& Chunks) override { SaveAttachmentsResult Result; Stopwatch Timer; auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; }); for (const SharedBuffer& Chunk : Chunks) { CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer()); SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash(), {}); if (ChunkResult.ErrorCode) { return SaveAttachmentsResult{ChunkResult}; } } return Result; } virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) override { ZEN_UNUSED(RawHash); ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); FinalizeResult Result; Stopwatch Timer; auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; }); try { std::vector Needs = m_BuildStorage->FinalizeBuildPart(m_BuildId, m_OplogBuildPartId, RawHash); Result.Needs = std::unordered_set(Needs.begin(), Needs.end()); } catch (const HttpClientError& Ex) { Result.ErrorCode = MakeErrorCode(Ex); Result.Reason = fmt::format("Failed finalizing oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'", m_BuildStorageHttp.GetBaseUri(), m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, Ex.what()); } catch (const std::exception& Ex) { Result.ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("Failed finalizing oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'", m_BuildStorageHttp.GetBaseUri(), m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, Ex.what()); } if (!Result.ErrorCode && Result.Needs.empty()) { try { m_BuildStorage->FinalizeBuild(m_BuildId); } catch (const HttpClientError& Ex) { Result.ErrorCode = MakeErrorCode(Ex); Result.Reason = fmt::format("Failed finalizing oplog container build to {}/{}/{}/{}. Reason: '{}'", m_BuildStorageHttp.GetBaseUri(), m_Namespace, m_Bucket, m_BuildId, Ex.what()); } catch (const std::exception& Ex) { Result.ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("Failed finalizing oplog container build to {}/{}/{}/{}. Reason: '{}'", m_BuildStorageHttp.GetBaseUri(), m_Namespace, m_Bucket, m_BuildId, Ex.what()); } } return Result; } virtual LoadContainerResult LoadContainer() override { ZEN_ASSERT(m_OplogBuildPartId == Oid::Zero); LoadContainerResult Result; Stopwatch Timer; auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; }); try { CbObject BuildObject = m_BuildStorage->GetBuild(m_BuildId); CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView(); if (!PartsObject) { throw std::runtime_error(fmt::format("The build {}/{}/{}/{} payload does not contain a 'parts' object"sv, m_BuildStorageHttp.GetBaseUri(), m_Namespace, m_Bucket, m_BuildId)); } m_OplogBuildPartId = PartsObject[OplogContainerPartName].AsObjectId(); if (m_OplogBuildPartId == Oid::Zero) { throw std::runtime_error(fmt::format("The build {}/{}/{}/{} payload 'parts' object does not contain a '{}' entry"sv, m_BuildStorageHttp.GetBaseUri(), m_Namespace, m_Bucket, m_BuildId, OplogContainerPartName)); } Result.ContainerObject = m_BuildStorage->GetBuildPart(m_BuildId, m_OplogBuildPartId); } catch (const HttpClientError& Ex) { Result.ErrorCode = MakeErrorCode(Ex); Result.Reason = fmt::format("Failed fetching oplog container build part to {}/{}/{}/{}. Reason: '{}'", m_BuildStorageHttp.GetBaseUri(), m_Namespace, m_Bucket, m_BuildId, Ex.what()); } catch (const std::exception& Ex) { Result.ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("Failed fetching oplog container build part to {}/{}/{}/{}. Reason: '{}'", m_BuildStorageHttp.GetBaseUri(), m_Namespace, m_Bucket, m_BuildId, Ex.what()); } return Result; } virtual GetKnownBlocksResult GetKnownBlocks() override { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); GetKnownBlocksResult Result; Stopwatch Timer; auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; }); try { CbObject KnownBlocks = m_BuildStorage->FindBlocks(m_BuildId, 10000u); std::optional> Blocks = ParseChunkBlockDescriptionList(KnownBlocks); Result.Blocks = std::move(Blocks.value()); } catch (const HttpClientError& Ex) { Result.ErrorCode = MakeErrorCode(Ex); Result.Reason = fmt::format("Failed listing known blocks for {}/{}/{}/{}. Reason: '{}'", m_BuildStorageHttp.GetBaseUri(), m_Namespace, m_Bucket, m_BuildId, Ex.what()); } catch (const std::exception& Ex) { Result.ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("Failed listing known blocks for {}/{}/{}/{}. Reason: '{}'", m_BuildStorageHttp.GetBaseUri(), m_Namespace, m_Bucket, m_BuildId, Ex.what()); } return Result; } virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span BlockHashes, BuildStorageCache* OptionalCache, const Oid& CacheBuildId) override { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); ZEN_ASSERT(OptionalCache == nullptr || CacheBuildId == m_BuildId); GetBlockDescriptionsResult Result; Stopwatch Timer; auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; }); try { Result.Blocks = zen::GetBlockDescriptions(Log(), *m_BuildStorage, OptionalCache, m_BuildId, BlockHashes, /*AttemptFallback*/ false, /*IsQuiet*/ false, /*IsVerbose)*/ false); } catch (const HttpClientError& Ex) { Result.ErrorCode = MakeErrorCode(Ex); Result.Reason = fmt::format("Failed listing known blocks for {}/{}/{}/{}. Reason: '{}'", m_BuildStorageHttp.GetBaseUri(), m_Namespace, m_Bucket, m_BuildId, Ex.what()); } catch (const std::exception& Ex) { Result.ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("Failed listing known blocks for {}/{}/{}/{}. Reason: '{}'", m_BuildStorageHttp.GetBaseUri(), m_Namespace, m_Bucket, m_BuildId, Ex.what()); } return Result; } virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); LoadAttachmentResult Result; Stopwatch Timer; auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; }); try { Result.Bytes = m_BuildStorage->GetBuildBlob(m_BuildId, RawHash); } catch (const HttpClientError& Ex) { Result.ErrorCode = MakeErrorCode(Ex); Result.Reason = fmt::format("Failed getting blob {}/{}/{}/{}/{}. Reason: '{}'", m_BuildStorageHttp.GetBaseUri(), m_Namespace, m_Bucket, m_BuildId, RawHash, Ex.what()); } catch (const std::exception& Ex) { Result.ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("Failed getting blob {}/{}/{}/{}/{}. Reason: '{}'", m_BuildStorageHttp.GetBaseUri(), m_Namespace, m_Bucket, m_BuildId, RawHash, Ex.what()); } return Result; } virtual LoadAttachmentRangesResult LoadAttachmentRanges(const IoHash& RawHash, std::span> Ranges) override { ZEN_ASSERT(!Ranges.empty()); LoadAttachmentRangesResult Result; Stopwatch Timer; auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; }); try { BuildStorageBase::BuildBlobRanges BlobRanges = m_BuildStorage->GetBuildBlobRanges(m_BuildId, RawHash, Ranges); if (BlobRanges.PayloadBuffer) { Result.Bytes = std::move(BlobRanges.PayloadBuffer); Result.Ranges = std::move(BlobRanges.Ranges); } } catch (const HttpClientError& Ex) { Result.ErrorCode = MakeErrorCode(Ex); Result.Reason = fmt::format("Failed getting {} ranges for blob {}/{}/{}/{}/{}. Reason: '{}'", Ranges.size(), m_BuildStorageHttp.GetBaseUri(), m_Namespace, m_Bucket, m_BuildId, RawHash, Ex.what()); } catch (const std::exception& Ex) { Result.ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("Failed getting {} ranges for blob {}/{}/{}/{}/{}. Reason: '{}'", Ranges.size(), m_BuildStorageHttp.GetBaseUri(), m_Namespace, m_Bucket, m_BuildId, RawHash, Ex.what()); } return Result; } virtual LoadAttachmentsResult LoadAttachments(const std::vector& RawHashes) override { LoadAttachmentsResult Result; Stopwatch Timer; auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; }); std::vector AttachmentsLeftToFind = RawHashes; for (const IoHash& Hash : AttachmentsLeftToFind) { LoadAttachmentResult ChunkResult = LoadAttachment(Hash); if (ChunkResult.ErrorCode) { return LoadAttachmentsResult{ChunkResult}; } ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast(ChunkResult.ElapsedSeconds * 1000))); Result.Chunks.emplace_back( std::pair{Hash, CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult.Bytes))}); } return Result; } private: static int MakeErrorCode(const HttpClientError& Ex) { return Ex.GetInternalErrorCode() != HttpClientErrorCode::kOK ? static_cast(Ex.GetInternalErrorCode()) : Ex.GetHttpResponseCode() != HttpResponseCode::ImATeapot ? static_cast(Ex.GetHttpResponseCode()) : 0; } inline LoggerRef Log() const { return m_Log; } LoggerRef m_Log; BuildStorageBase::Statistics m_BuildStorageStats; HttpClient m_BuildStorageHttp; std::unique_ptr m_BuildStorage; const std::string m_Namespace; const std::string m_Bucket; const Oid m_BuildId; IoBuffer m_MetaData; Oid m_OplogBuildPartId = Oid::Zero; const bool m_EnableBlocks = true; const bool m_UseTempBlocks = true; const bool m_AllowRedirect = false; }; std::shared_ptr CreateJupiterBuildsRemoteStore(LoggerRef InLog, const BuildStorageResolveResult& ResolveResult, std::function&& TokenProvider, const BuildsRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath) { HttpClientSettings ClientSettings{.LogCategory = "httpbuildsclient", .ConnectTimeout = std::chrono::milliseconds(3000), .Timeout = std::chrono::milliseconds(1800000), .AccessTokenProvider = std::move(TokenProvider), .AssumeHttp2 = ResolveResult.Cloud.AssumeHttp2, .AllowResume = true, .RetryCount = 4, .MaximumInMemoryDownloadSize = Options.MaximumInMemoryDownloadSize}; std::shared_ptr RemoteStore = std::make_shared(InLog, ClientSettings, ResolveResult.Cloud.Address, TempFilePath, Options.Namespace, Options.Bucket, Options.BuildId, Options.MetaData, Options.ForceDisableBlocks, Options.ForceDisableTempBlocks); return RemoteStore; } } // namespace zen