diff options
| author | Dan Engelbrecht <[email protected]> | 2025-10-22 12:25:08 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-22 12:25:08 +0200 |
| commit | 2f0298d3874d0e4cb351e6b7f18566f6dc0df388 (patch) | |
| tree | 664b3d6a2bcb0f7f420a09227cabd1c0b1e5b6a3 /src | |
| parent | tweak worker pools for builds (#595) (diff) | |
| download | zen-2f0298d3874d0e4cb351e6b7f18566f6dc0df388.tar.xz zen-2f0298d3874d0e4cb351e6b7f18566f6dc0df388.zip | |
add `zen builds prime-cache` command (#598)
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 178 | ||||
| -rw-r--r-- | src/zen/cmds/builds_cmd.h | 7 | ||||
| -rw-r--r-- | src/zenremotestore/builds/buildstoragecache.cpp | 4 | ||||
| -rw-r--r-- | src/zenremotestore/builds/buildstorageoperations.cpp | 274 | ||||
| -rw-r--r-- | src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h | 2 | ||||
| -rw-r--r-- | src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h | 42 |
6 files changed, 481 insertions, 26 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index 1f748400a..343702ade 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -3033,6 +3033,39 @@ BuildsCommand::BuildsCommand() m_FetchBlobOptions.parse_positional({"build-id", "blob-hash"}); m_FetchBlobOptions.positional_help("build-id blob-hash"); + // prime-cache + AddSystemOptions(m_PrimeCacheOptions); + AddCloudOptions(m_PrimeCacheOptions); + AddFileOptions(m_PrimeCacheOptions); + AddOutputOptions(m_PrimeCacheOptions); + AddCacheOptions(m_PrimeCacheOptions); + AddZenFolderOptions(m_PrimeCacheOptions); + m_PrimeCacheOptions.add_option("", "", "build-id", "Build Id", cxxopts::value(m_BuildId), "<id>"); + m_PrimeCacheOptions.add_option( + "", + "", + "build-part-id", + "Build part Ids list separated by ',', if no build-part-ids or build-part-names are given all parts will be downloaded", + cxxopts::value(m_BuildPartIds), + "<id>"); + m_PrimeCacheOptions.add_option("", + "", + "build-part-name", + "Name of the build parts list separated by ',', if no build-part-ids or build-part-names are given " + "all parts will be downloaded", + cxxopts::value(m_BuildPartNames), + "<name>"); + + m_PrimeCacheOptions.add_option("", + "", + "force", + "Force download of all blobs by ignoring any existing blobs in cache", + cxxopts::value(m_Force), + "<force>"); + + m_PrimeCacheOptions.parse_positional({"build-id"}); + m_PrimeCacheOptions.positional_help("build-id"); + auto AddZenProcessOptions = [this](cxxopts::Options& Ops) { Ops.add_option("", "", "process-id", "Process id of running process", cxxopts::value(m_ZenProcessId), "<pid>"); }; @@ -3241,7 +3274,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) BuildStorageCache::Statistics& StorageCacheStats, const std::filesystem::path& TempPath, bool RequireNamespace, - bool RequireBucket) -> StorageInstance { + bool RequireBucket, + bool BoostCacheBackgroundWorkerPool) -> StorageInstance { ParseStorageOptions(RequireNamespace, RequireBucket); StorageInstance Result; @@ -3420,21 +3454,22 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { if (ZenCacheEndpointTestResult TestResult = TestZenCacheEndpoint(m_ZenCacheHost, false); TestResult.Success) { - Result.CacheHttp = std::make_unique<HttpClient>(m_ZenCacheHost, + Result.CacheHttp = std::make_unique<HttpClient>(m_ZenCacheHost, HttpClientSettings{.LogCategory = "httpcacheclient", - .ConnectTimeout = std::chrono::milliseconds{3000}, - .Timeout = std::chrono::milliseconds{30000}, - .AssumeHttp2 = CacheAssumeHttp2, - .AllowResume = true, - .RetryCount = 0}, + .ConnectTimeout = std::chrono::milliseconds{3000}, + .Timeout = std::chrono::milliseconds{30000}, + .AssumeHttp2 = CacheAssumeHttp2, + .AllowResume = true, + .RetryCount = 0}, []() { return AbortFlag.load(); }); - Result.BuildCacheStorage = CreateZenBuildStorageCache( - *Result.CacheHttp, - StorageCacheStats, - m_Namespace, - m_Bucket, - TempPath / "zencache", - m_PrimeCacheOnly ? GetSmallWorkerPool(EWorkloadType::Background) : GetTinyWorkerPool(EWorkloadType::Background)); + Result.BuildCacheStorage = + CreateZenBuildStorageCache(*Result.CacheHttp, + StorageCacheStats, + m_Namespace, + m_Bucket, + TempPath / "zencache", + BoostCacheBackgroundWorkerPool ? GetSmallWorkerPool(EWorkloadType::Background) + : GetTinyWorkerPool(EWorkloadType::Background)); CacheDescription = fmt::format("Zen {}{}. SessionId: '{}'", BuildCacheName.empty() ? "" : fmt::format("{}, ", BuildCacheName), m_ZenCacheHost, @@ -3679,7 +3714,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) StorageCacheStats, ZenTempFolderPath(ZenFolderPath), /*RequriesNamespace*/ false, - /*RequireBucket*/ false); + /*RequireBucket*/ false, + /*BoostCacheBackgroundWorkerPool */ false); CbObject Response = Storage.BuildStorage->ListNamespaces(m_ListNamespacesRecursive); ZEN_ASSERT(ValidateCompactBinary(Response.GetView(), CbValidateMode::Default) == CbValidateError::None); @@ -3769,7 +3805,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) StorageCacheStats, ZenTempFolderPath(m_ZenFolderPath), /*RequriesNamespace*/ true, - /*RequireBucket*/ false); + /*RequireBucket*/ false, + /*BoostCacheBackgroundWorkerPool */ false); CbObject Response = Storage.BuildStorage->ListBuilds(QueryObject); ZEN_ASSERT(ValidateCompactBinary(Response.GetView(), CbValidateMode::Default) == CbValidateError::None); @@ -3835,7 +3872,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) StorageCacheStats, ZenTempFolderPath(m_ZenFolderPath), /*RequriesNamespace*/ true, - /*RequireBucket*/ true); + /*RequireBucket*/ true, + /*BoostCacheBackgroundWorkerPool */ false); const Oid BuildId = ParseBuildId(); @@ -3907,7 +3945,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) StorageCacheStats, ZenTempFolderPath(m_ZenFolderPath), /*RequriesNamespace*/ true, - /*RequireBucket*/ true); + /*RequireBucket*/ true, + /*BoostCacheBackgroundWorkerPool */ false); if (m_BuildPartName.empty()) { @@ -4024,7 +4063,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) StorageCacheStats, ZenTempFolderPath(m_ZenFolderPath), /*RequriesNamespace*/ true, - /*RequireBucket*/ true); + /*RequireBucket*/ true, + /*BoostCacheBackgroundWorkerPool*/ m_PrimeCacheOnly); const Oid BuildId = ParseBuildId(); @@ -4102,7 +4142,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) StorageCacheStats, ZenTempFolderPath(m_ZenFolderPath), /*RequriesNamespace*/ true, - /*RequireBucket*/ true); + /*RequireBucket*/ true, + /*BoostCacheBackgroundWorkerPool */ false); const Oid BuildId = ParseBuildId(); @@ -4136,6 +4177,91 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } } + if (SubOption == &m_PrimeCacheOptions) + { + if (!IsQuiet) + { + LogExecutableVersionAndPid(); + } + + InitializeWorkerPools(m_BoostWorkerThreads); + + BuildStorage::Statistics StorageStats; + BuildStorageCache::Statistics StorageCacheStats; + + if (m_ZenFolderPath.empty()) + { + m_ZenFolderPath = std::filesystem::current_path() / ZenFolderName; + } + MakeSafeAbsolutePathÍnPlace(m_ZenFolderPath); + + CreateDirectories(m_ZenFolderPath); + auto _ = MakeGuard([this]() { + if (CleanDirectory(m_ZenFolderPath, {})) + { + std::error_code DummyEc; + RemoveDir(m_ZenFolderPath, DummyEc); + } + }); + + StorageInstance Storage = CreateBuildStorage(StorageStats, + StorageCacheStats, + ZenTempFolderPath(m_ZenFolderPath), + /*RequriesNamespace*/ true, + /*RequireBucket*/ true, + /*BoostCacheBackgroundWorkerPool */ true); + + const Oid BuildId = ParseBuildId(); + + std::vector<Oid> BuildPartIds = ParseBuildPartIds(); + std::vector<std::string> BuildPartNames = ParseBuildPartNames(); + + std::uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u; + + std::vector<std::pair<Oid, std::string>> AllBuildParts = + ResolveBuildPartNames(*Storage.BuildStorage, BuildId, BuildPartIds, BuildPartNames, PreferredMultipartChunkSize); + + std::vector<Oid> AllBuildPartIds; + AllBuildPartIds.reserve(AllBuildParts.size()); + for (const std::pair<Oid, std::string>& BuildPart : AllBuildParts) + { + AllBuildPartIds.push_back(BuildPart.first); + } + + ProgressBar::SetLogOperationName(ProgressMode, "Prime Cache"); + + ConsoleOpLogOutput Output(ProgressMode); + + BuildsOperationPrimeCache PrimeOp(Output, + Storage, + AbortFlag, + PauseFlag, + GetNetworkPool(), + BuildId, + AllBuildPartIds, + BuildsOperationPrimeCache::Options{.IsQuiet = IsQuiet, + .IsVerbose = IsVerbose, + .ZenFolderPath = m_ZenFolderPath, + .LargeAttachmentSize = PreferredMultipartChunkSize * 4u, + .PreferredMultipartChunkSize = PreferredMultipartChunkSize, + .ForceUpload = m_Force}, + StorageCacheStats); + PrimeOp.Execute(); + + if (!IsQuiet) + { + if (Storage.BuildCacheStorage) + { + ZEN_CONSOLE("Uploaded {} ({}) blobs", + StorageCacheStats.PutBlobCount.load(), + NiceBytes(StorageCacheStats.PutBlobByteCount), + Storage.CacheName); + } + } + + return; + } + if (SubOption == &m_FetchBlobOptions) { if (!IsQuiet) @@ -4167,7 +4293,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) StorageCacheStats, ZenTempFolderPath(m_ZenFolderPath), /*RequriesNamespace*/ true, - /*RequireBucket*/ true); + /*RequireBucket*/ true, + /*BoostCacheBackgroundWorkerPool */ false); IoHash BlobHash = ParseBlobHash(); @@ -4223,7 +4350,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) StorageCacheStats, ZenTempFolderPath(m_ZenFolderPath), /*RequriesNamespace*/ true, - /*RequireBucket*/ true); + /*RequireBucket*/ true, + /*BoostCacheBackgroundWorkerPool */ false); Oid BuildId = ParseBuildId(); @@ -4270,7 +4398,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) StorageCacheStats, ZenTempFolderPath(m_ZenFolderPath), /*RequriesNamespace*/ true, - /*RequireBucket*/ true); + /*RequireBucket*/ true, + /*BoostCacheBackgroundWorkerPool */ false); Stopwatch Timer; for (const std::string& BuildIdString : m_BuildIds) @@ -4404,7 +4533,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) StorageCacheStats, ZenTempFolderPath(m_ZenFolderPath), /*RequriesNamespace*/ true, - /*RequireBucket*/ true); + /*RequireBucket*/ true, + /*BoostCacheBackgroundWorkerPool */ false); m_BuildId = Oid::NewOid().ToString(); m_BuildPartName = m_Path.filename().string(); diff --git a/src/zen/cmds/builds_cmd.h b/src/zen/cmds/builds_cmd.h index e909a6155..ef5ac1e10 100644 --- a/src/zen/cmds/builds_cmd.h +++ b/src/zen/cmds/builds_cmd.h @@ -16,7 +16,7 @@ class BuildsCommand : public CacheStoreCommand public: static constexpr char Name[] = "builds"; static constexpr char Description[] = - "Manage builds - list, list-namespaces, ls, upload, download, diff, fetch-blob, validate-part, pause, resume, abort"; + "Manage builds - list, list-namespaces, ls, upload, download, prime-cache, diff, fetch-blob, validate-part, pause, resume, abort"; BuildsCommand(); ~BuildsCommand(); @@ -106,6 +106,8 @@ private: cxxopts::Options m_FetchBlobOptions{"fetch-blob", "Fetch a blob from remote store"}; std::string m_BlobHash; + cxxopts::Options m_PrimeCacheOptions{"prime-cache", "Prime cache from a remote store"}; + cxxopts::Options m_PauseOptions{"pause", "Pause an ongoing zen builds process"}; cxxopts::Options m_ResumeOptions{"resume", "Resume a paused zen builds process"}; cxxopts::Options m_AbortOptions{"abort", "Abort an ongoing zen builds process"}; @@ -119,7 +121,7 @@ private: cxxopts::Options m_MultiTestDownloadOptions{"multi-test-download", "Test multiple sequenced downloads with verify"}; std::vector<std::string> m_BuildIds; - cxxopts::Options* m_SubCommands[14] = {&m_ListNamespacesOptions, + cxxopts::Options* m_SubCommands[15] = {&m_ListNamespacesOptions, &m_ListOptions, &m_ListBlocksOptions, &m_UploadOptions, @@ -130,6 +132,7 @@ private: &m_DiffOptions, &m_LsOptions, &m_FetchBlobOptions, + &m_PrimeCacheOptions, &m_ValidateBuildPartOptions, &m_TestOptions, &m_MultiTestDownloadOptions}; diff --git a/src/zenremotestore/builds/buildstoragecache.cpp b/src/zenremotestore/builds/buildstoragecache.cpp index d36d75480..2e9cf9954 100644 --- a/src/zenremotestore/builds/buildstoragecache.cpp +++ b/src/zenremotestore/builds/buildstoragecache.cpp @@ -104,6 +104,10 @@ public: m_HttpClient.Upload(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()), Payload, ContentType); + + m_Stats.PutBlobCount++; + m_Stats.PutBlobByteCount += Payload.GetSize(); + AddStatistic(CacheResponse); if (!CacheResponse.IsSuccess()) { diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp index c9f142d7a..f106b7b18 100644 --- a/src/zenremotestore/builds/buildstorageoperations.cpp +++ b/src/zenremotestore/builds/buildstorageoperations.cpp @@ -7099,6 +7099,280 @@ BuildsOperationValidateBuildPart::Execute() } } +BuildsOperationPrimeCache::BuildsOperationPrimeCache(BuildOpLogOutput& LogOutput, + StorageInstance& Storage, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag, + WorkerThreadPool& NetworkPool, + const Oid& BuildId, + std::span<const Oid> BuildPartIds, + const Options& Options, + BuildStorageCache::Statistics& StorageCacheStats) +: m_LogOutput(LogOutput) +, m_Storage(Storage) +, m_AbortFlag(AbortFlag) +, m_PauseFlag(PauseFlag) +, m_NetworkPool(NetworkPool) +, m_BuildId(BuildId) +, m_BuildPartIds(BuildPartIds.begin(), BuildPartIds.end()) +, m_Options(Options) +, m_StorageCacheStats(StorageCacheStats) +{ + m_TempPath = m_Options.ZenFolderPath / "tmp"; + CreateDirectories(m_TempPath); +} + +void +BuildsOperationPrimeCache::Execute() +{ + ZEN_TRACE_CPU("BuildsOperationPrimeCache::Execute"); + + Stopwatch PrimeTimer; + + tsl::robin_map<IoHash, uint64_t, IoHash::Hasher> LooseChunkRawSizes; + + tsl::robin_set<IoHash, IoHash::Hasher> BuildBlobs; + + for (const Oid& BuildPartId : m_BuildPartIds) + { + CbObject BuildPart = m_Storage.BuildStorage->GetBuildPart(m_BuildId, BuildPartId); + + CbObjectView BlockAttachmentsView = BuildPart["blockAttachments"sv].AsObjectView(); + std::vector<IoHash> BlockAttachments = compactbinary_helpers::ReadBinaryAttachmentArray("rawHashes"sv, BlockAttachmentsView); + + CbObjectView ChunkAttachmentsView = BuildPart["chunkAttachments"sv].AsObjectView(); + std::vector<IoHash> ChunkAttachments = compactbinary_helpers::ReadBinaryAttachmentArray("rawHashes"sv, ChunkAttachmentsView); + std::vector<uint64_t> ChunkRawSizes = compactbinary_helpers::ReadArray<uint64_t>("chunkRawSizes"sv, ChunkAttachmentsView); + if (ChunkAttachments.size() != ChunkRawSizes.size()) + { + throw std::runtime_error(fmt::format("Mismatch of loose chunk raw size array, expected {}, found {}", + ChunkAttachments.size(), + ChunkRawSizes.size())); + } + + BuildBlobs.reserve(ChunkAttachments.size() + BlockAttachments.size()); + BuildBlobs.insert(BlockAttachments.begin(), BlockAttachments.end()); + BuildBlobs.insert(ChunkAttachments.begin(), ChunkAttachments.end()); + + for (size_t ChunkAttachmentIndex = 0; ChunkAttachmentIndex < ChunkAttachments.size(); ChunkAttachmentIndex++) + { + LooseChunkRawSizes.insert_or_assign(ChunkAttachments[ChunkAttachmentIndex], ChunkRawSizes[ChunkAttachmentIndex]); + } + } + + if (!m_Options.IsQuiet) + { + LOG_OUTPUT(m_LogOutput, "Found {} referenced blobs", BuildBlobs.size()); + } + + if (BuildBlobs.empty()) + { + return; + } + + std::vector<IoHash> BlobsToDownload; + BlobsToDownload.reserve(BuildBlobs.size()); + + if (m_Storage.BuildCacheStorage && !BuildBlobs.empty() && !m_Options.ForceUpload) + { + ZEN_TRACE_CPU("BlobCacheExistCheck"); + Stopwatch Timer; + + const std::vector<IoHash> BlobHashes(BuildBlobs.begin(), BuildBlobs.end()); + const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult = + m_Storage.BuildCacheStorage->BlobsExists(m_BuildId, BlobHashes); + + if (CacheExistsResult.size() == BlobHashes.size()) + { + for (size_t BlobIndex = 0; BlobIndex < BlobHashes.size(); BlobIndex++) + { + if (!CacheExistsResult[BlobIndex].HasBody) + { + BlobsToDownload.push_back(BlobHashes[BlobIndex]); + } + } + size_t FoundCount = BuildBlobs.size() - BlobsToDownload.size(); + + if (FoundCount > 0 && !m_Options.IsQuiet) + { + LOG_OUTPUT(m_LogOutput, + "Remote cache : Found {} out of {} needed blobs in {}", + FoundCount, + BuildBlobs.size(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + } + } + } + else + { + BlobsToDownload.insert(BlobsToDownload.end(), BuildBlobs.begin(), BuildBlobs.end()); + } + + if (BlobsToDownload.empty()) + { + return; + } + + std::atomic<uint64_t> MultipartAttachmentCount; + std::atomic<size_t> CompletedDownloadCount; + FilteredRate FilteredDownloadedBytesPerSecond; + + { + std::unique_ptr<BuildOpLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Downloading")); + BuildOpLogOutput::ProgressBar& Progress(*ProgressBarPtr); + + ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + const size_t BlobCount = BlobsToDownload.size(); + + for (size_t BlobIndex = 0; BlobIndex < BlobCount; BlobIndex++) + { + Work.ScheduleWork( + m_NetworkPool, + [this, + &Work, + &BlobsToDownload, + BlobCount, + &LooseChunkRawSizes, + &CompletedDownloadCount, + &FilteredDownloadedBytesPerSecond, + &MultipartAttachmentCount, + BlobIndex](std::atomic<bool>&) { + if (!m_AbortFlag) + { + const IoHash& BlobHash = BlobsToDownload[BlobIndex]; + + bool IsLargeBlob = false; + + if (auto It = LooseChunkRawSizes.find(BlobHash); It != LooseChunkRawSizes.end()) + { + IsLargeBlob = It->second >= m_Options.LargeAttachmentSize; + } + + FilteredDownloadedBytesPerSecond.Start(); + + if (IsLargeBlob) + { + DownloadLargeBlob(*m_Storage.BuildStorage, + m_TempPath, + m_BuildId, + BlobHash, + m_Options.PreferredMultipartChunkSize, + Work, + m_NetworkPool, + m_DownloadStats.DownloadedChunkByteCount, + MultipartAttachmentCount, + [this, BlobCount, BlobHash, &FilteredDownloadedBytesPerSecond, &CompletedDownloadCount]( + IoBuffer&& Payload) { + m_DownloadStats.DownloadedChunkCount++; + m_DownloadStats.RequestsCompleteCount++; + + if (!m_AbortFlag) + { + if (Payload && m_Storage.BuildCacheStorage) + { + m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, + BlobHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(SharedBuffer(Payload))); + } + } + CompletedDownloadCount++; + if (CompletedDownloadCount == BlobCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + }); + } + else + { + IoBuffer Payload = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlobHash); + m_DownloadStats.DownloadedBlockCount++; + m_DownloadStats.DownloadedBlockByteCount += Payload.GetSize(); + m_DownloadStats.RequestsCompleteCount++; + + if (!m_AbortFlag) + { + if (Payload && m_Storage.BuildCacheStorage) + { + m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, + BlobHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(SharedBuffer(std::move(Payload)))); + } + } + CompletedDownloadCount++; + if (CompletedDownloadCount == BlobCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + } + } + }); + } + + Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); + + uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() + m_DownloadStats.DownloadedBlockByteCount.load(); + FilteredDownloadedBytesPerSecond.Update(DownloadedBytes); + + std::string DownloadRateString = (CompletedDownloadCount == BlobCount) + ? "" + : fmt::format(" {}bits/s", NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8)); + std::string UploadDetails = m_Storage.BuildCacheStorage ? fmt::format(" {} ({}) uploaded.", + m_StorageCacheStats.PutBlobCount.load(), + NiceBytes(m_StorageCacheStats.PutBlobByteCount.load())) + : ""; + + std::string Details = fmt::format("{}/{} ({}{}) downloaded.{}", + CompletedDownloadCount.load(), + BlobCount, + NiceBytes(DownloadedBytes), + DownloadRateString, + UploadDetails); + Progress.UpdateState({.Task = "Downloading", + .Details = Details, + .TotalCount = BlobCount, + .RemainingCount = BlobCount - CompletedDownloadCount.load(), + .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, + false); + }); + + FilteredDownloadedBytesPerSecond.Stop(); + + Progress.Finish(); + } + if (m_AbortFlag) + { + return; + } + + if (m_Storage.BuildCacheStorage) + { + m_Storage.BuildCacheStorage->Flush(m_LogOutput.GetProgressUpdateDelayMS(), [this](intptr_t Remaining) -> bool { + ZEN_UNUSED(Remaining); + if (!m_Options.IsQuiet) + { + LOG_OUTPUT(m_LogOutput, "Waiting for {} blobs to finish upload to '{}'", Remaining, m_Storage.CacheName); + } + return !m_AbortFlag; + }); + } + + if (!m_Options.IsQuiet) + { + uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() + m_DownloadStats.DownloadedBlockByteCount.load(); + LOG_OUTPUT(m_LogOutput, + "Downloaded {} ({}bits/s) in {}. {} as multipart. Completed in {}", + NiceBytes(DownloadedBytes), + NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), DownloadedBytes * 8)), + NiceTimeSpanMs(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS() / 1000), + MultipartAttachmentCount.load(), + NiceTimeSpanMs(PrimeTimer.GetElapsedTimeMs())); + } +} + CompositeBuffer ValidateBlob(std::atomic<bool>& AbortFlag, BuildStorage& Storage, diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h b/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h index e30270848..f1916da10 100644 --- a/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h +++ b/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h @@ -26,6 +26,8 @@ public: std::atomic<uint64_t> PeakSentBytes = 0; std::atomic<uint64_t> PeakReceivedBytes = 0; std::atomic<uint64_t> PeakBytesPerSec = 0; + std::atomic<uint64_t> PutBlobCount = 0; + std::atomic<uint64_t> PutBlobByteCount = 0; }; virtual ~BuildStorageCache() {} diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h index 8b5b63ef9..157435971 100644 --- a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h +++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h @@ -6,6 +6,7 @@ #include <zencore/logging.h> #include <zencore/uid.h> #include <zencore/zencore.h> +#include <zenremotestore/builds/buildstoragecache.h> #include <zenremotestore/chunking/chunkedcontent.h> #include <zenutil/bufferedwritefilecache.h> @@ -755,6 +756,47 @@ private: const Options m_Options; }; +class BuildsOperationPrimeCache +{ +public: + struct Options + { + bool IsQuiet = false; + bool IsVerbose = false; + std::filesystem::path ZenFolderPath; + std::uint64_t LargeAttachmentSize = 32u * 1024u * 1024u * 4u; + std::uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u; + bool ForceUpload = false; + }; + + BuildsOperationPrimeCache(BuildOpLogOutput& LogOutput, + StorageInstance& Storage, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag, + WorkerThreadPool& NetworkPool, + const Oid& BuildId, + std::span<const Oid> BuildPartIds, + const Options& Options, + BuildStorageCache::Statistics& StorageCacheStats); + + void Execute(); + + DownloadStatistics m_DownloadStats; + +private: + BuildOpLogOutput& m_LogOutput; + StorageInstance& m_Storage; + std::atomic<bool>& m_AbortFlag; + std::atomic<bool>& m_PauseFlag; + WorkerThreadPool& m_NetworkPool; + const Oid m_BuildId; + std::vector<Oid> m_BuildPartIds; + Options m_Options; + std::filesystem::path m_TempPath; + + BuildStorageCache::Statistics& m_StorageCacheStats; +}; + CompositeBuffer ValidateBlob(std::atomic<bool>& AbortFlag, BuildStorage& Storage, const Oid& BuildId, |