// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #include #include #include #include namespace zen { using namespace std::literals; BuildsOperationPrimeCache::BuildsOperationPrimeCache(LoggerRef Log, ProgressBase& Progress, StorageInstance& Storage, std::atomic& AbortFlag, std::atomic& PauseFlag, WorkerThreadPool& NetworkPool, const Oid& BuildId, std::span BuildPartIds, const Options& Options, BuildStorageCache::Statistics& StorageCacheStats) : m_Log(Log) , m_Progress(Progress) , m_Storage(Storage) , m_AbortFlag(AbortFlag) , m_PauseFlag(PauseFlag) , m_NetworkPool(NetworkPool) , m_BuildId(BuildId) , m_BuildPartIds(BuildPartIds.begin(), BuildPartIds.end()) , m_Options(Options) , m_StorageCacheStats(StorageCacheStats) { m_TempPath = m_Options.ZenFolderPath / "tmp"; CreateDirectories(m_TempPath); } void BuildsOperationPrimeCache::Execute() { ZEN_TRACE_CPU("BuildsOperationPrimeCache::Execute"); Stopwatch PrimeTimer; tsl::robin_map LooseChunkRawSizes; tsl::robin_set BuildBlobs; CollectReferencedBlobs(BuildBlobs, LooseChunkRawSizes); if (!m_Options.IsQuiet) { ZEN_INFO("Found {} referenced blobs", BuildBlobs.size()); } if (BuildBlobs.empty()) { return; } std::vector BlobsToDownload = FilterAlreadyCachedBlobs(BuildBlobs); if (BlobsToDownload.empty()) { return; } std::atomic MultipartAttachmentCount; std::atomic CompletedDownloadCount; FilteredRate FilteredDownloadedBytesPerSecond; ScheduleBlobDownloads(BlobsToDownload, LooseChunkRawSizes, MultipartAttachmentCount, CompletedDownloadCount, FilteredDownloadedBytesPerSecond); if (m_AbortFlag) { return; } if (m_Storage.CacheStorage) { m_Storage.CacheStorage->Flush(m_Progress.GetProgressUpdateDelayMS(), [this](intptr_t Remaining) -> bool { ZEN_UNUSED(Remaining); if (!m_Options.IsQuiet) { ZEN_INFO("Waiting for {} blobs to finish upload to '{}'", Remaining, m_Storage.CacheHost.Name); } return !m_AbortFlag; }); } if (!m_Options.IsQuiet) { uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() + m_DownloadStats.DownloadedBlockByteCount.load(); ZEN_INFO("Downloaded {} ({}bits/s) in {}. {} as multipart. Completed in {}", NiceBytes(DownloadedBytes), NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), DownloadedBytes * 8)), NiceTimeSpanMs(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS() / 1000), MultipartAttachmentCount.load(), NiceTimeSpanMs(PrimeTimer.GetElapsedTimeMs())); } } void BuildsOperationPrimeCache::CollectReferencedBlobs(tsl::robin_set& OutBuildBlobs, tsl::robin_map& OutLooseChunkRawSizes) { for (const Oid& BuildPartId : m_BuildPartIds) { CbObject BuildPart = m_Storage.BuildStorage->GetBuildPart(m_BuildId, BuildPartId); CbObjectView BlockAttachmentsView = BuildPart["blockAttachments"sv].AsObjectView(); std::vector BlockAttachments = compactbinary_helpers::ReadBinaryAttachmentArray("rawHashes"sv, BlockAttachmentsView); CbObjectView ChunkAttachmentsView = BuildPart["chunkAttachments"sv].AsObjectView(); std::vector ChunkAttachments = compactbinary_helpers::ReadBinaryAttachmentArray("rawHashes"sv, ChunkAttachmentsView); std::vector ChunkRawSizes = compactbinary_helpers::ReadArray("chunkRawSizes"sv, ChunkAttachmentsView); if (ChunkAttachments.size() != ChunkRawSizes.size()) { throw std::runtime_error(fmt::format("Mismatch of loose chunk raw size array, expected {}, found {}", ChunkAttachments.size(), ChunkRawSizes.size())); } OutBuildBlobs.reserve(ChunkAttachments.size() + BlockAttachments.size()); OutBuildBlobs.insert(BlockAttachments.begin(), BlockAttachments.end()); OutBuildBlobs.insert(ChunkAttachments.begin(), ChunkAttachments.end()); for (size_t ChunkAttachmentIndex = 0; ChunkAttachmentIndex < ChunkAttachments.size(); ChunkAttachmentIndex++) { OutLooseChunkRawSizes.insert_or_assign(ChunkAttachments[ChunkAttachmentIndex], ChunkRawSizes[ChunkAttachmentIndex]); } } } std::vector BuildsOperationPrimeCache::FilterAlreadyCachedBlobs(const tsl::robin_set& BuildBlobs) { std::vector BlobsToDownload; BlobsToDownload.reserve(BuildBlobs.size()); if (m_Storage.CacheStorage && !BuildBlobs.empty() && !m_Options.ForceUpload) { ZEN_TRACE_CPU("BlobCacheExistCheck"); Stopwatch Timer; const std::vector BlobHashes(BuildBlobs.begin(), BuildBlobs.end()); const std::vector CacheExistsResult = m_Storage.CacheStorage->BlobsExists(m_BuildId, BlobHashes); if (CacheExistsResult.size() == BlobHashes.size()) { for (size_t BlobIndex = 0; BlobIndex < BlobHashes.size(); BlobIndex++) { if (!CacheExistsResult[BlobIndex].HasBody) { BlobsToDownload.push_back(BlobHashes[BlobIndex]); } } size_t FoundCount = BuildBlobs.size() - BlobsToDownload.size(); if (FoundCount > 0 && !m_Options.IsQuiet) { ZEN_INFO("Remote cache : Found {} out of {} needed blobs in {}", FoundCount, BuildBlobs.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } } } else { BlobsToDownload.insert(BlobsToDownload.end(), BuildBlobs.begin(), BuildBlobs.end()); } return BlobsToDownload; } void BuildsOperationPrimeCache::ScheduleBlobDownloads(std::span BlobsToDownload, const tsl::robin_map& LooseChunkRawSizes, std::atomic& MultipartAttachmentCount, std::atomic& CompletedDownloadCount, FilteredRate& FilteredDownloadedBytesPerSecond) { std::unique_ptr ProgressBar = m_Progress.CreateProgressBar("Downloading"); ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); const size_t BlobCount = BlobsToDownload.size(); for (size_t BlobIndex = 0; BlobIndex < BlobCount; BlobIndex++) { Work.ScheduleWork( m_NetworkPool, [this, &Work, BlobsToDownload, BlobCount, &LooseChunkRawSizes, &CompletedDownloadCount, &FilteredDownloadedBytesPerSecond, &MultipartAttachmentCount, BlobIndex](std::atomic&) { if (!m_AbortFlag) { const IoHash& BlobHash = BlobsToDownload[BlobIndex]; bool IsLargeBlob = false; if (auto It = LooseChunkRawSizes.find(BlobHash); It != LooseChunkRawSizes.end()) { IsLargeBlob = It->second >= m_Options.LargeAttachmentSize; } FilteredDownloadedBytesPerSecond.Start(); if (IsLargeBlob) { DownloadLargeBlobForCache(Work, BlobHash, BlobCount, CompletedDownloadCount, MultipartAttachmentCount, FilteredDownloadedBytesPerSecond); } else { DownloadSingleBlobForCache(BlobHash, BlobCount, CompletedDownloadCount, FilteredDownloadedBytesPerSecond); } } }); } Work.Wait(m_Progress.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { ZEN_UNUSED(PendingWork); uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() + m_DownloadStats.DownloadedBlockByteCount.load(); FilteredDownloadedBytesPerSecond.Update(DownloadedBytes); std::string DownloadRateString = (CompletedDownloadCount == BlobCount) ? "" : fmt::format(" {}bits/s", NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8)); std::string UploadDetails = m_Storage.CacheStorage ? fmt::format(" {} ({}) uploaded.", m_StorageCacheStats.PutBlobCount.load(), NiceBytes(m_StorageCacheStats.PutBlobByteCount.load())) : ""; std::string Details = fmt::format("{}/{} ({}{}) downloaded.{}", CompletedDownloadCount.load(), BlobCount, NiceBytes(DownloadedBytes), DownloadRateString, UploadDetails); ProgressBar->UpdateState({.Task = "Downloading", .Details = Details, .TotalCount = BlobCount, .RemainingCount = BlobCount - CompletedDownloadCount.load(), .Status = ProgressBase::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); FilteredDownloadedBytesPerSecond.Stop(); ProgressBar->Finish(); } void BuildsOperationPrimeCache::DownloadLargeBlobForCache(ParallelWork& Work, const IoHash& BlobHash, size_t BlobCount, std::atomic& CompletedDownloadCount, std::atomic& MultipartAttachmentCount, FilteredRate& FilteredDownloadedBytesPerSecond) { DownloadLargeBlob(*m_Storage.BuildStorage, m_TempPath, m_BuildId, BlobHash, m_Options.PreferredMultipartChunkSize, Work, m_NetworkPool, m_DownloadStats.DownloadedChunkByteCount, MultipartAttachmentCount, [this, BlobCount, BlobHash, &FilteredDownloadedBytesPerSecond, &CompletedDownloadCount](IoBuffer&& Payload) { m_DownloadStats.DownloadedChunkCount++; m_DownloadStats.RequestsCompleteCount++; if (!m_AbortFlag) { if (Payload && m_Storage.CacheStorage) { m_Storage.CacheStorage->PutBuildBlob(m_BuildId, BlobHash, ZenContentType::kCompressedBinary, CompositeBuffer(SharedBuffer(Payload))); } } if (CompletedDownloadCount.fetch_add(1) + 1 == BlobCount) { FilteredDownloadedBytesPerSecond.Stop(); } }); } void BuildsOperationPrimeCache::DownloadSingleBlobForCache(const IoHash& BlobHash, size_t BlobCount, std::atomic& CompletedDownloadCount, FilteredRate& FilteredDownloadedBytesPerSecond) { IoBuffer Payload; try { Payload = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlobHash); m_DownloadStats.DownloadedBlockCount++; m_DownloadStats.DownloadedBlockByteCount += Payload.GetSize(); m_DownloadStats.RequestsCompleteCount++; } catch (const std::exception&) { // Silence http errors due to abort if (!m_AbortFlag) { throw; } } if (!m_AbortFlag) { if (Payload && m_Storage.CacheStorage) { m_Storage.CacheStorage->PutBuildBlob(m_BuildId, BlobHash, ZenContentType::kCompressedBinary, CompositeBuffer(SharedBuffer(std::move(Payload)))); } if (CompletedDownloadCount.fetch_add(1) + 1 == BlobCount) { FilteredDownloadedBytesPerSecond.Stop(); } } } } // namespace zen