aboutsummaryrefslogtreecommitdiff
path: root/src/zenremotestore/builds/buildprimecache.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenremotestore/builds/buildprimecache.cpp')
-rw-r--r--src/zenremotestore/builds/buildprimecache.cpp350
1 files changed, 350 insertions, 0 deletions
diff --git a/src/zenremotestore/builds/buildprimecache.cpp b/src/zenremotestore/builds/buildprimecache.cpp
new file mode 100644
index 000000000..12791f718
--- /dev/null
+++ b/src/zenremotestore/builds/buildprimecache.cpp
@@ -0,0 +1,350 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenremotestore/builds/buildprimecache.h>
+
+#include <zencore/compactbinaryutil.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/parallelwork.h>
+#include <zencore/timer.h>
+#include <zencore/trace.h>
+#include <zenremotestore/builds/buildstorageutil.h>
+#include <zenremotestore/builds/builduploadfolder.h>
+#include <zenutil/filteredrate.h>
+#include <zenutil/progress.h>
+
+namespace zen {
+
+using namespace std::literals;
+
+BuildsOperationPrimeCache::BuildsOperationPrimeCache(LoggerRef Log,
+ ProgressBase& Progress,
+ StorageInstance& Storage,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag,
+ WorkerThreadPool& NetworkPool,
+ const Oid& BuildId,
+ std::span<const Oid> BuildPartIds,
+ const Options& Options,
+ BuildStorageCache::Statistics& StorageCacheStats)
+: m_Log(Log)
+, m_Progress(Progress)
+, m_Storage(Storage)
+, m_AbortFlag(AbortFlag)
+, m_PauseFlag(PauseFlag)
+, m_NetworkPool(NetworkPool)
+, m_BuildId(BuildId)
+, m_BuildPartIds(BuildPartIds.begin(), BuildPartIds.end())
+, m_Options(Options)
+, m_StorageCacheStats(StorageCacheStats)
+{
+ m_TempPath = m_Options.ZenFolderPath / "tmp";
+ CreateDirectories(m_TempPath);
+}
+
+void
+BuildsOperationPrimeCache::Execute()
+{
+ ZEN_TRACE_CPU("BuildsOperationPrimeCache::Execute");
+
+ Stopwatch PrimeTimer;
+
+ tsl::robin_map<IoHash, uint64_t, IoHash::Hasher> LooseChunkRawSizes;
+ tsl::robin_set<IoHash, IoHash::Hasher> BuildBlobs;
+ CollectReferencedBlobs(BuildBlobs, LooseChunkRawSizes);
+
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_INFO("Found {} referenced blobs", BuildBlobs.size());
+ }
+
+ if (BuildBlobs.empty())
+ {
+ return;
+ }
+
+ std::vector<IoHash> BlobsToDownload = FilterAlreadyCachedBlobs(BuildBlobs);
+
+ if (BlobsToDownload.empty())
+ {
+ return;
+ }
+
+ std::atomic<uint64_t> MultipartAttachmentCount;
+ std::atomic<size_t> CompletedDownloadCount;
+ FilteredRate FilteredDownloadedBytesPerSecond;
+
+ ScheduleBlobDownloads(BlobsToDownload,
+ LooseChunkRawSizes,
+ MultipartAttachmentCount,
+ CompletedDownloadCount,
+ FilteredDownloadedBytesPerSecond);
+
+ if (m_AbortFlag)
+ {
+ return;
+ }
+
+ if (m_Storage.CacheStorage)
+ {
+ m_Storage.CacheStorage->Flush(m_Progress.GetProgressUpdateDelayMS(), [this](intptr_t Remaining) -> bool {
+ ZEN_UNUSED(Remaining);
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_INFO("Waiting for {} blobs to finish upload to '{}'", Remaining, m_Storage.CacheHost.Name);
+ }
+ return !m_AbortFlag;
+ });
+ }
+
+ if (!m_Options.IsQuiet)
+ {
+ uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() + m_DownloadStats.DownloadedBlockByteCount.load();
+ ZEN_INFO("Downloaded {} ({}bits/s) in {}. {} as multipart. Completed in {}",
+ NiceBytes(DownloadedBytes),
+ NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), DownloadedBytes * 8)),
+ NiceTimeSpanMs(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS() / 1000),
+ MultipartAttachmentCount.load(),
+ NiceTimeSpanMs(PrimeTimer.GetElapsedTimeMs()));
+ }
+}
+
+void
+BuildsOperationPrimeCache::CollectReferencedBlobs(tsl::robin_set<IoHash, IoHash::Hasher>& OutBuildBlobs,
+ tsl::robin_map<IoHash, uint64_t, IoHash::Hasher>& OutLooseChunkRawSizes)
+{
+ for (const Oid& BuildPartId : m_BuildPartIds)
+ {
+ CbObject BuildPart = m_Storage.BuildStorage->GetBuildPart(m_BuildId, BuildPartId);
+
+ CbObjectView BlockAttachmentsView = BuildPart["blockAttachments"sv].AsObjectView();
+ std::vector<IoHash> BlockAttachments = compactbinary_helpers::ReadBinaryAttachmentArray("rawHashes"sv, BlockAttachmentsView);
+
+ CbObjectView ChunkAttachmentsView = BuildPart["chunkAttachments"sv].AsObjectView();
+ std::vector<IoHash> ChunkAttachments = compactbinary_helpers::ReadBinaryAttachmentArray("rawHashes"sv, ChunkAttachmentsView);
+ std::vector<uint64_t> ChunkRawSizes = compactbinary_helpers::ReadArray<uint64_t>("chunkRawSizes"sv, ChunkAttachmentsView);
+ if (ChunkAttachments.size() != ChunkRawSizes.size())
+ {
+ throw std::runtime_error(fmt::format("Mismatch of loose chunk raw size array, expected {}, found {}",
+ ChunkAttachments.size(),
+ ChunkRawSizes.size()));
+ }
+
+ OutBuildBlobs.reserve(ChunkAttachments.size() + BlockAttachments.size());
+ OutBuildBlobs.insert(BlockAttachments.begin(), BlockAttachments.end());
+ OutBuildBlobs.insert(ChunkAttachments.begin(), ChunkAttachments.end());
+
+ for (size_t ChunkAttachmentIndex = 0; ChunkAttachmentIndex < ChunkAttachments.size(); ChunkAttachmentIndex++)
+ {
+ OutLooseChunkRawSizes.insert_or_assign(ChunkAttachments[ChunkAttachmentIndex], ChunkRawSizes[ChunkAttachmentIndex]);
+ }
+ }
+}
+
+std::vector<IoHash>
+BuildsOperationPrimeCache::FilterAlreadyCachedBlobs(const tsl::robin_set<IoHash, IoHash::Hasher>& BuildBlobs)
+{
+ std::vector<IoHash> BlobsToDownload;
+ BlobsToDownload.reserve(BuildBlobs.size());
+
+ if (m_Storage.CacheStorage && !BuildBlobs.empty() && !m_Options.ForceUpload)
+ {
+ ZEN_TRACE_CPU("BlobCacheExistCheck");
+ Stopwatch Timer;
+
+ const std::vector<IoHash> BlobHashes(BuildBlobs.begin(), BuildBlobs.end());
+ const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult =
+ m_Storage.CacheStorage->BlobsExists(m_BuildId, BlobHashes);
+
+ if (CacheExistsResult.size() == BlobHashes.size())
+ {
+ for (size_t BlobIndex = 0; BlobIndex < BlobHashes.size(); BlobIndex++)
+ {
+ if (!CacheExistsResult[BlobIndex].HasBody)
+ {
+ BlobsToDownload.push_back(BlobHashes[BlobIndex]);
+ }
+ }
+ size_t FoundCount = BuildBlobs.size() - BlobsToDownload.size();
+
+ if (FoundCount > 0 && !m_Options.IsQuiet)
+ {
+ ZEN_INFO("Remote cache : Found {} out of {} needed blobs in {}",
+ FoundCount,
+ BuildBlobs.size(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ }
+ }
+ }
+ else
+ {
+ BlobsToDownload.insert(BlobsToDownload.end(), BuildBlobs.begin(), BuildBlobs.end());
+ }
+ return BlobsToDownload;
+}
+
+void
+BuildsOperationPrimeCache::ScheduleBlobDownloads(std::span<const IoHash> BlobsToDownload,
+ const tsl::robin_map<IoHash, uint64_t, IoHash::Hasher>& LooseChunkRawSizes,
+ std::atomic<uint64_t>& MultipartAttachmentCount,
+ std::atomic<size_t>& CompletedDownloadCount,
+ FilteredRate& FilteredDownloadedBytesPerSecond)
+{
+ std::unique_ptr<ProgressBase::ProgressBar> ProgressBar = m_Progress.CreateProgressBar("Downloading");
+
+ ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ const size_t BlobCount = BlobsToDownload.size();
+
+ for (size_t BlobIndex = 0; BlobIndex < BlobCount; BlobIndex++)
+ {
+ Work.ScheduleWork(
+ m_NetworkPool,
+ [this,
+ &Work,
+ BlobsToDownload,
+ BlobCount,
+ &LooseChunkRawSizes,
+ &CompletedDownloadCount,
+ &FilteredDownloadedBytesPerSecond,
+ &MultipartAttachmentCount,
+ BlobIndex](std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ const IoHash& BlobHash = BlobsToDownload[BlobIndex];
+ bool IsLargeBlob = false;
+ if (auto It = LooseChunkRawSizes.find(BlobHash); It != LooseChunkRawSizes.end())
+ {
+ IsLargeBlob = It->second >= m_Options.LargeAttachmentSize;
+ }
+
+ FilteredDownloadedBytesPerSecond.Start();
+
+ if (IsLargeBlob)
+ {
+ DownloadLargeBlobForCache(Work,
+ BlobHash,
+ BlobCount,
+ CompletedDownloadCount,
+ MultipartAttachmentCount,
+ FilteredDownloadedBytesPerSecond);
+ }
+ else
+ {
+ DownloadSingleBlobForCache(BlobHash, BlobCount, CompletedDownloadCount, FilteredDownloadedBytesPerSecond);
+ }
+ }
+ });
+ }
+
+ Work.Wait(m_Progress.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
+
+ uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() + m_DownloadStats.DownloadedBlockByteCount.load();
+ FilteredDownloadedBytesPerSecond.Update(DownloadedBytes);
+
+ std::string DownloadRateString = (CompletedDownloadCount == BlobCount)
+ ? ""
+ : fmt::format(" {}bits/s", NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8));
+ std::string UploadDetails = m_Storage.CacheStorage ? fmt::format(" {} ({}) uploaded.",
+ m_StorageCacheStats.PutBlobCount.load(),
+ NiceBytes(m_StorageCacheStats.PutBlobByteCount.load()))
+ : "";
+
+ std::string Details = fmt::format("{}/{} ({}{}) downloaded.{}",
+ CompletedDownloadCount.load(),
+ BlobCount,
+ NiceBytes(DownloadedBytes),
+ DownloadRateString,
+ UploadDetails);
+ ProgressBar->UpdateState({.Task = "Downloading",
+ .Details = Details,
+ .TotalCount = BlobCount,
+ .RemainingCount = BlobCount - CompletedDownloadCount.load(),
+ .Status = ProgressBase::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ });
+
+ FilteredDownloadedBytesPerSecond.Stop();
+ ProgressBar->Finish();
+}
+
+void
+BuildsOperationPrimeCache::DownloadLargeBlobForCache(ParallelWork& Work,
+ const IoHash& BlobHash,
+ size_t BlobCount,
+ std::atomic<size_t>& CompletedDownloadCount,
+ std::atomic<uint64_t>& MultipartAttachmentCount,
+ FilteredRate& FilteredDownloadedBytesPerSecond)
+{
+ DownloadLargeBlob(*m_Storage.BuildStorage,
+ m_TempPath,
+ m_BuildId,
+ BlobHash,
+ m_Options.PreferredMultipartChunkSize,
+ Work,
+ m_NetworkPool,
+ m_DownloadStats.DownloadedChunkByteCount,
+ MultipartAttachmentCount,
+ [this, BlobCount, BlobHash, &FilteredDownloadedBytesPerSecond, &CompletedDownloadCount](IoBuffer&& Payload) {
+ m_DownloadStats.DownloadedChunkCount++;
+ m_DownloadStats.RequestsCompleteCount++;
+
+ if (!m_AbortFlag)
+ {
+ if (Payload && m_Storage.CacheStorage)
+ {
+ m_Storage.CacheStorage->PutBuildBlob(m_BuildId,
+ BlobHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(SharedBuffer(Payload)));
+ }
+ }
+ if (CompletedDownloadCount.fetch_add(1) + 1 == BlobCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ });
+}
+
+void
+BuildsOperationPrimeCache::DownloadSingleBlobForCache(const IoHash& BlobHash,
+ size_t BlobCount,
+ std::atomic<size_t>& CompletedDownloadCount,
+ FilteredRate& FilteredDownloadedBytesPerSecond)
+{
+ IoBuffer Payload;
+ try
+ {
+ Payload = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlobHash);
+
+ m_DownloadStats.DownloadedBlockCount++;
+ m_DownloadStats.DownloadedBlockByteCount += Payload.GetSize();
+ m_DownloadStats.RequestsCompleteCount++;
+ }
+ catch (const std::exception&)
+ {
+ // Silence http errors due to abort
+ if (!m_AbortFlag)
+ {
+ throw;
+ }
+ }
+
+ if (!m_AbortFlag)
+ {
+ if (Payload && m_Storage.CacheStorage)
+ {
+ m_Storage.CacheStorage->PutBuildBlob(m_BuildId,
+ BlobHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(SharedBuffer(std::move(Payload))));
+ }
+ if (CompletedDownloadCount.fetch_add(1) + 1 == BlobCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ }
+}
+
+} // namespace zen