diff options
| author | Dan Engelbrecht <[email protected]> | 2025-10-22 12:25:08 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-22 12:25:08 +0200 |
| commit | 2f0298d3874d0e4cb351e6b7f18566f6dc0df388 (patch) | |
| tree | 664b3d6a2bcb0f7f420a09227cabd1c0b1e5b6a3 /src/zenremotestore/builds/buildstorageoperations.cpp | |
| parent | tweak worker pools for builds (#595) (diff) | |
| download | zen-2f0298d3874d0e4cb351e6b7f18566f6dc0df388.tar.xz zen-2f0298d3874d0e4cb351e6b7f18566f6dc0df388.zip | |
add `zen builds prime-cache` command (#598)
Diffstat (limited to 'src/zenremotestore/builds/buildstorageoperations.cpp')
| -rw-r--r-- | src/zenremotestore/builds/buildstorageoperations.cpp | 274 |
1 files changed, 274 insertions, 0 deletions
diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp index c9f142d7a..f106b7b18 100644 --- a/src/zenremotestore/builds/buildstorageoperations.cpp +++ b/src/zenremotestore/builds/buildstorageoperations.cpp @@ -7099,6 +7099,280 @@ BuildsOperationValidateBuildPart::Execute() } } +BuildsOperationPrimeCache::BuildsOperationPrimeCache(BuildOpLogOutput& LogOutput, + StorageInstance& Storage, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag, + WorkerThreadPool& NetworkPool, + const Oid& BuildId, + std::span<const Oid> BuildPartIds, + const Options& Options, + BuildStorageCache::Statistics& StorageCacheStats) +: m_LogOutput(LogOutput) +, m_Storage(Storage) +, m_AbortFlag(AbortFlag) +, m_PauseFlag(PauseFlag) +, m_NetworkPool(NetworkPool) +, m_BuildId(BuildId) +, m_BuildPartIds(BuildPartIds.begin(), BuildPartIds.end()) +, m_Options(Options) +, m_StorageCacheStats(StorageCacheStats) +{ + m_TempPath = m_Options.ZenFolderPath / "tmp"; + CreateDirectories(m_TempPath); +} + +void +BuildsOperationPrimeCache::Execute() +{ + ZEN_TRACE_CPU("BuildsOperationPrimeCache::Execute"); + + Stopwatch PrimeTimer; + + tsl::robin_map<IoHash, uint64_t, IoHash::Hasher> LooseChunkRawSizes; + + tsl::robin_set<IoHash, IoHash::Hasher> BuildBlobs; + + for (const Oid& BuildPartId : m_BuildPartIds) + { + CbObject BuildPart = m_Storage.BuildStorage->GetBuildPart(m_BuildId, BuildPartId); + + CbObjectView BlockAttachmentsView = BuildPart["blockAttachments"sv].AsObjectView(); + std::vector<IoHash> BlockAttachments = compactbinary_helpers::ReadBinaryAttachmentArray("rawHashes"sv, BlockAttachmentsView); + + CbObjectView ChunkAttachmentsView = BuildPart["chunkAttachments"sv].AsObjectView(); + std::vector<IoHash> ChunkAttachments = compactbinary_helpers::ReadBinaryAttachmentArray("rawHashes"sv, ChunkAttachmentsView); + std::vector<uint64_t> ChunkRawSizes = compactbinary_helpers::ReadArray<uint64_t>("chunkRawSizes"sv, ChunkAttachmentsView); + if (ChunkAttachments.size() != ChunkRawSizes.size()) + { + throw std::runtime_error(fmt::format("Mismatch of loose chunk raw size array, expected {}, found {}", + ChunkAttachments.size(), + ChunkRawSizes.size())); + } + + BuildBlobs.reserve(ChunkAttachments.size() + BlockAttachments.size()); + BuildBlobs.insert(BlockAttachments.begin(), BlockAttachments.end()); + BuildBlobs.insert(ChunkAttachments.begin(), ChunkAttachments.end()); + + for (size_t ChunkAttachmentIndex = 0; ChunkAttachmentIndex < ChunkAttachments.size(); ChunkAttachmentIndex++) + { + LooseChunkRawSizes.insert_or_assign(ChunkAttachments[ChunkAttachmentIndex], ChunkRawSizes[ChunkAttachmentIndex]); + } + } + + if (!m_Options.IsQuiet) + { + LOG_OUTPUT(m_LogOutput, "Found {} referenced blobs", BuildBlobs.size()); + } + + if (BuildBlobs.empty()) + { + return; + } + + std::vector<IoHash> BlobsToDownload; + BlobsToDownload.reserve(BuildBlobs.size()); + + if (m_Storage.BuildCacheStorage && !BuildBlobs.empty() && !m_Options.ForceUpload) + { + ZEN_TRACE_CPU("BlobCacheExistCheck"); + Stopwatch Timer; + + const std::vector<IoHash> BlobHashes(BuildBlobs.begin(), BuildBlobs.end()); + const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult = + m_Storage.BuildCacheStorage->BlobsExists(m_BuildId, BlobHashes); + + if (CacheExistsResult.size() == BlobHashes.size()) + { + for (size_t BlobIndex = 0; BlobIndex < BlobHashes.size(); BlobIndex++) + { + if (!CacheExistsResult[BlobIndex].HasBody) + { + BlobsToDownload.push_back(BlobHashes[BlobIndex]); + } + } + size_t FoundCount = BuildBlobs.size() - BlobsToDownload.size(); + + if (FoundCount > 0 && !m_Options.IsQuiet) + { + LOG_OUTPUT(m_LogOutput, + "Remote cache : Found {} out of {} needed blobs in {}", + FoundCount, + BuildBlobs.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + } + } + } + else + { + BlobsToDownload.insert(BlobsToDownload.end(), BuildBlobs.begin(), BuildBlobs.end()); + } + + if (BlobsToDownload.empty()) + { + return; + } + + std::atomic<uint64_t> MultipartAttachmentCount; + std::atomic<size_t> CompletedDownloadCount; + FilteredRate FilteredDownloadedBytesPerSecond; + + { + std::unique_ptr<BuildOpLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Downloading")); + BuildOpLogOutput::ProgressBar& Progress(*ProgressBarPtr); + + ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + const size_t BlobCount = BlobsToDownload.size(); + + for (size_t BlobIndex = 0; BlobIndex < BlobCount; BlobIndex++) + { + Work.ScheduleWork( + m_NetworkPool, + [this, + &Work, + &BlobsToDownload, + BlobCount, + &LooseChunkRawSizes, + &CompletedDownloadCount, + &FilteredDownloadedBytesPerSecond, + &MultipartAttachmentCount, + BlobIndex](std::atomic<bool>&) { + if (!m_AbortFlag) + { + const IoHash& BlobHash = BlobsToDownload[BlobIndex]; + + bool IsLargeBlob = false; + + if (auto It = LooseChunkRawSizes.find(BlobHash); It != LooseChunkRawSizes.end()) + { + IsLargeBlob = It->second >= m_Options.LargeAttachmentSize; + } + + FilteredDownloadedBytesPerSecond.Start(); + + if (IsLargeBlob) + { + DownloadLargeBlob(*m_Storage.BuildStorage, + m_TempPath, + m_BuildId, + BlobHash, + m_Options.PreferredMultipartChunkSize, + Work, + m_NetworkPool, + m_DownloadStats.DownloadedChunkByteCount, + MultipartAttachmentCount, + [this, BlobCount, BlobHash, &FilteredDownloadedBytesPerSecond, &CompletedDownloadCount]( + IoBuffer&& Payload) { + m_DownloadStats.DownloadedChunkCount++; + m_DownloadStats.RequestsCompleteCount++; + + if (!m_AbortFlag) + { + if (Payload && m_Storage.BuildCacheStorage) + { + m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, + BlobHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(SharedBuffer(Payload))); + } + } + CompletedDownloadCount++; + if (CompletedDownloadCount == BlobCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + }); + } + else + { + IoBuffer Payload = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlobHash); + m_DownloadStats.DownloadedBlockCount++; + m_DownloadStats.DownloadedBlockByteCount += Payload.GetSize(); + m_DownloadStats.RequestsCompleteCount++; + + if (!m_AbortFlag) + { + if (Payload && m_Storage.BuildCacheStorage) + { + m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, + BlobHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(SharedBuffer(std::move(Payload)))); + } + } + CompletedDownloadCount++; + if (CompletedDownloadCount == BlobCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + } + } + }); + } + + Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); + + uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() + m_DownloadStats.DownloadedBlockByteCount.load(); + FilteredDownloadedBytesPerSecond.Update(DownloadedBytes); + + std::string DownloadRateString = (CompletedDownloadCount == BlobCount) + ? "" + : fmt::format(" {}bits/s", NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8)); + std::string UploadDetails = m_Storage.BuildCacheStorage ? fmt::format(" {} ({}) uploaded.", + m_StorageCacheStats.PutBlobCount.load(), + NiceBytes(m_StorageCacheStats.PutBlobByteCount.load())) + : ""; + + std::string Details = fmt::format("{}/{} ({}{}) downloaded.{}", + CompletedDownloadCount.load(), + BlobCount, + NiceBytes(DownloadedBytes), + DownloadRateString, + UploadDetails); + Progress.UpdateState({.Task = "Downloading", + .Details = Details, + .TotalCount = BlobCount, + .RemainingCount = BlobCount - CompletedDownloadCount.load(), + .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, + false); + }); + + FilteredDownloadedBytesPerSecond.Stop(); + + Progress.Finish(); + } + if (m_AbortFlag) + { + return; + } + + if (m_Storage.BuildCacheStorage) + { + m_Storage.BuildCacheStorage->Flush(m_LogOutput.GetProgressUpdateDelayMS(), [this](intptr_t Remaining) -> bool { + ZEN_UNUSED(Remaining); + if (!m_Options.IsQuiet) + { + LOG_OUTPUT(m_LogOutput, "Waiting for {} blobs to finish upload to '{}'", Remaining, m_Storage.CacheName); + } + return !m_AbortFlag; + }); + } + + if (!m_Options.IsQuiet) + { + uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() + m_DownloadStats.DownloadedBlockByteCount.load(); + LOG_OUTPUT(m_LogOutput, + "Downloaded {} ({}bits/s) in {}. {} as multipart. Completed in {}", + NiceBytes(DownloadedBytes), + NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), DownloadedBytes * 8)), + NiceTimeSpanMs(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS() / 1000), + MultipartAttachmentCount.load(), + NiceTimeSpanMs(PrimeTimer.GetElapsedTimeMs())); + } +} + CompositeBuffer ValidateBlob(std::atomic<bool>& AbortFlag, BuildStorage& Storage, |