diff options
| author | Dan Engelbrecht <[email protected]> | 2025-03-27 16:08:47 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-03-27 16:08:47 +0100 |
| commit | 013ac818cd09c1d31bf9411e00b2bbbf02defa3f (patch) | |
| tree | cdc94b8fe80f0c5db20f0417d76e5351fe480f4a | |
| parent | Merge pull request #317 from ue-foundation/zs/ui-show-cook-artifacts (diff) | |
| download | zen-013ac818cd09c1d31bf9411e00b2bbbf02defa3f.tar.xz zen-013ac818cd09c1d31bf9411e00b2bbbf02defa3f.zip | |
build cache prime (#327)
- Feature: zen `--boost-workers` option to builds `upload`, `download` and `validate-part` that will increase the number of worker threads, may cause computer to be less responsive
- Feature: zen `--cache-prime-only` that uploads referenced data from a part to `--zen-cache-host` if it is not already present. Target folder will be untouched.
| -rw-r--r-- | CHANGELOG.md | 2 | ||||
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 1087 | ||||
| -rw-r--r-- | src/zen/cmds/builds_cmd.h | 6 | ||||
| -rw-r--r-- | src/zenutil/buildstoragecache.cpp | 59 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/buildstoragecache.h | 7 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/workerpools.h | 3 | ||||
| -rw-r--r-- | src/zenutil/workerpools.cpp | 14 |
7 files changed, 724 insertions, 454 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c6dccb44..155654ca1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ - `/builds/{namespace}/{bucket}/{buildid}/blobs/exists` `POST` method for checking existance of blobs - Feature: zen: `--zen-cache-host` option for `upload` and `download` operations to use a zenserver host `/builds` endpoint for storing build blob and blob metadata - Feature: zenserver: Add command line option `--gc-buildstore-duration-seconds` to control GC life time of build store data + - Feature: zen `--boost-workers` option to builds `upload`, `download` and `validate-part` that will increase the number of worker threads, may cause computer to be less responsive + - Feature: zen `--cache-prime-only` that uploads referenced data from a part to `--zen-cache-host` if it is not already present. Target folder will be untouched. - Improvement: Do partial requests of blocks if not all of the block is needed - Improvement: Better progress/statistics on upload and download - Improvement: Scavenge .zen temp folders for existing data (downloaded, decompressed or written) from previous failed run diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index 08d30948b..3a54de935 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -91,7 +91,16 @@ namespace { const double DefaultLatency = 0; // .0010; const double DefaultDelayPerKBSec = 0; // 0.00005; - const bool SingleThreaded = false; + const bool SingleThreaded = false; + bool BoostWorkerThreads = false; + + WorkerThreadPool& GetIOWorkerPool() + { + return SingleThreaded ? GetSyncWorkerPool() + : BoostWorkerThreads ? GetLargeWorkerPool(EWorkloadType::Burst) + : GetMediumWorkerPool(EWorkloadType::Burst); + } + WorkerThreadPool& GetNetworkPool() { return SingleThreaded ? GetSyncWorkerPool() : GetSmallWorkerPool(EWorkloadType::Burst); } const uint64_t MinimumSizeForCompressInBlock = 2u * 1024u; @@ -438,7 +447,7 @@ namespace { Path, std::move(IsAcceptedFolder), std::move(IsAcceptedFile), - SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst), + GetIOWorkerPool(), UsePlainProgress ? 5000 : 200, [](bool, std::ptrdiff_t) {}, AbortFlag); @@ -452,7 +461,7 @@ namespace { FilteredBytesHashed.Start(); ChunkedFolderContent FolderContent = ChunkFolderContent( ChunkingStats, - SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst), + GetIOWorkerPool(), Path, Content, ChunkController, @@ -1563,8 +1572,8 @@ namespace { BlockAttachments.size() - VerifyBlockDescriptions.size())); } - WorkerThreadPool& NetworkPool = SingleThreaded ? GetSyncWorkerPool() : GetSmallWorkerPool(EWorkloadType::Burst); - WorkerThreadPool& VerifyPool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst); + WorkerThreadPool& NetworkPool = GetNetworkPool(); + WorkerThreadPool& VerifyPool = GetIOWorkerPool(); ParallellWork Work(AbortFlag); const std::filesystem::path TempFolder = ".zen-tmp"; @@ -1946,8 +1955,8 @@ namespace { RwLock Lock; - WorkerThreadPool& GenerateBlobsPool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst); - WorkerThreadPool& UploadBlocksPool = SingleThreaded ? GetSyncWorkerPool() : GetSmallWorkerPool(EWorkloadType::Burst); + WorkerThreadPool& GenerateBlobsPool = GetIOWorkerPool(); + WorkerThreadPool& UploadBlocksPool = GetNetworkPool(); FilteredRate FilteredGeneratedBytesPerSecond; FilteredRate FilteredUploadedBytesPerSecond; @@ -2147,8 +2156,8 @@ namespace { { ProgressBar ProgressBar(UsePlainProgress); - WorkerThreadPool& ReadChunkPool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst); - WorkerThreadPool& UploadChunkPool = SingleThreaded ? GetSyncWorkerPool() : GetSmallWorkerPool(EWorkloadType::Burst); + WorkerThreadPool& ReadChunkPool = GetIOWorkerPool(); + WorkerThreadPool& UploadChunkPool = GetNetworkPool(); FilteredRate FilteredGenerateBlockBytesPerSecond; FilteredRate FilteredCompressedBytesPerSecond; @@ -2363,7 +2372,7 @@ namespace { if (!AbortFlag) { Work.ScheduleWork( - SingleThreaded ? GetSyncWorkerPool() : ReadChunkPool, + ReadChunkPool, [&, BlockIndex](std::atomic<bool>&) { if (!AbortFlag) { @@ -2420,7 +2429,7 @@ namespace { { const uint32_t ChunkIndex = LooseChunkIndexes[LooseChunkOrderIndex]; Work.ScheduleWork( - SingleThreaded ? GetSyncWorkerPool() : ReadChunkPool, + ReadChunkPool, [&, ChunkIndex](std::atomic<bool>&) { if (!AbortFlag) { @@ -2697,54 +2706,52 @@ namespace { FindBlocksStatistics FindBlocksStats; - std::future<PrepareBuildResult> PrepBuildResultFuture = - GetSmallWorkerPool(EWorkloadType::Burst) - .EnqueueTask(std::packaged_task<PrepareBuildResult()>{ - [&Storage, BuildId, &MetaData, CreateBuild, AllowMultiparts, IgnoreExistingBlocks, &FindBlocksStats] { - ZEN_TRACE_CPU("PrepareBuild"); + std::future<PrepareBuildResult> PrepBuildResultFuture = GetNetworkPool().EnqueueTask(std::packaged_task<PrepareBuildResult()>{ + [&Storage, BuildId, &MetaData, CreateBuild, AllowMultiparts, IgnoreExistingBlocks, &FindBlocksStats] { + ZEN_TRACE_CPU("PrepareBuild"); - PrepareBuildResult Result; - Stopwatch Timer; - if (CreateBuild) - { - ZEN_TRACE_CPU("CreateBuild"); + PrepareBuildResult Result; + Stopwatch Timer; + if (CreateBuild) + { + ZEN_TRACE_CPU("CreateBuild"); - Stopwatch PutBuildTimer; - CbObject PutBuildResult = Storage.BuildStorage->PutBuild(BuildId, MetaData); - Result.PrepareBuildTimeMs = PutBuildTimer.GetElapsedTimeMs(); - Result.PreferredMultipartChunkSize = PutBuildResult["chunkSize"sv].AsUInt64(Result.PreferredMultipartChunkSize); - Result.PayloadSize = MetaData.GetSize(); - } - else - { - ZEN_TRACE_CPU("PutBuild"); - Stopwatch GetBuildTimer; - CbObject Build = Storage.BuildStorage->GetBuild(BuildId); - Result.PrepareBuildTimeMs = GetBuildTimer.GetElapsedTimeMs(); - Result.PayloadSize = Build.GetSize(); - if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0) - { - Result.PreferredMultipartChunkSize = ChunkSize; - } - else if (AllowMultiparts) - { - ZEN_WARN("PreferredMultipartChunkSize is unknown. Defaulting to '{}'", - NiceBytes(Result.PreferredMultipartChunkSize)); - } - } + Stopwatch PutBuildTimer; + CbObject PutBuildResult = Storage.BuildStorage->PutBuild(BuildId, MetaData); + Result.PrepareBuildTimeMs = PutBuildTimer.GetElapsedTimeMs(); + Result.PreferredMultipartChunkSize = PutBuildResult["chunkSize"sv].AsUInt64(Result.PreferredMultipartChunkSize); + Result.PayloadSize = MetaData.GetSize(); + } + else + { + ZEN_TRACE_CPU("PutBuild"); + Stopwatch GetBuildTimer; + CbObject Build = Storage.BuildStorage->GetBuild(BuildId); + Result.PrepareBuildTimeMs = GetBuildTimer.GetElapsedTimeMs(); + Result.PayloadSize = Build.GetSize(); + if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0) + { + Result.PreferredMultipartChunkSize = ChunkSize; + } + else if (AllowMultiparts) + { + ZEN_WARN("PreferredMultipartChunkSize is unknown. Defaulting to '{}'", + NiceBytes(Result.PreferredMultipartChunkSize)); + } + } - if (!IgnoreExistingBlocks) - { - ZEN_TRACE_CPU("FindBlocks"); - Stopwatch KnownBlocksTimer; - Result.KnownBlocks = ParseChunkBlockDescriptionList(Storage.BuildStorage->FindBlocks(BuildId)); - FindBlocksStats.FindBlockTimeMS = KnownBlocksTimer.GetElapsedTimeMs(); - FindBlocksStats.FoundBlockCount = Result.KnownBlocks.size(); - Result.FindBlocksTimeMs = KnownBlocksTimer.GetElapsedTimeMs(); - } - Result.ElapsedTimeMs = Timer.GetElapsedTimeMs(); - return Result; - }}); + if (!IgnoreExistingBlocks) + { + ZEN_TRACE_CPU("FindBlocks"); + Stopwatch KnownBlocksTimer; + Result.KnownBlocks = ParseChunkBlockDescriptionList(Storage.BuildStorage->FindBlocks(BuildId)); + FindBlocksStats.FindBlockTimeMS = KnownBlocksTimer.GetElapsedTimeMs(); + FindBlocksStats.FoundBlockCount = Result.KnownBlocks.size(); + Result.FindBlocksTimeMs = KnownBlocksTimer.GetElapsedTimeMs(); + } + Result.ElapsedTimeMs = Timer.GetElapsedTimeMs(); + return Result; + }}); ChunkedFolderContent LocalContent; @@ -2852,7 +2859,7 @@ namespace { } return true; }, - SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst), + GetIOWorkerPool(), UsePlainProgress ? 5000 : 200, [&](bool, std::ptrdiff_t) { ZEN_CONSOLE_VERBOSE("Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), Path); @@ -2911,7 +2918,7 @@ namespace { FilteredBytesHashed.Start(); LocalContent = ChunkFolderContent( ChunkingStats, - SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst), + GetIOWorkerPool(), Path, Content, *ChunkController, @@ -3663,7 +3670,7 @@ namespace { ProgressBar ProgressBar(UsePlainProgress); - WorkerThreadPool& VerifyPool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst); + WorkerThreadPool& VerifyPool = GetIOWorkerPool(); ParallellWork Work(AbortFlag); @@ -4712,6 +4719,7 @@ namespace { const std::vector<IoHash>& LooseChunkHashes, bool AllowPartialBlockRequests, bool WipeTargetFolder, + bool PrimeCacheOnly, FolderContent& OutLocalFolderState, DiskStatistics& DiskStats, CacheMappingStatistics& CacheMappingStats, @@ -4721,7 +4729,7 @@ namespace { { ZEN_TRACE_CPU("UpdateFolder"); - ZEN_UNUSED(WipeTargetFolder); + ZEN_ASSERT((!PrimeCacheOnly) || (PrimeCacheOnly && (!AllowPartialBlockRequests))); Stopwatch IndexTimer; @@ -4741,6 +4749,7 @@ namespace { tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedChunkHashesFound; tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedSequenceHashesFound; + if (!PrimeCacheOnly) { ZEN_TRACE_CPU("UpdateFolder_CheckChunkCache"); @@ -4786,6 +4795,7 @@ namespace { } tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedBlocksFound; + if (!PrimeCacheOnly) { ZEN_TRACE_CPU("UpdateFolder_CheckBlockCache"); @@ -4831,8 +4841,10 @@ namespace { } std::vector<uint32_t> LocalPathIndexesMatchingSequenceIndexes; - // Pick up all whole files we can use from current local state + + if (!PrimeCacheOnly) { + // Pick up all whole files we can use from current local state ZEN_TRACE_CPU("UpdateFolder_CheckLocalChunks"); for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < RemoteContent.ChunkedContent.SequenceRawHashes.size(); RemoteSequenceIndex++) @@ -4870,6 +4882,15 @@ namespace { } } } + else + { + for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < RemoteContent.ChunkedContent.SequenceRawHashes.size(); + RemoteSequenceIndex++) + { + const uint32_t ChunkCount = RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]; + SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = ChunkCount; + } + } // Pick up all chunks in current local state struct CacheCopyData { @@ -4887,6 +4908,7 @@ namespace { tsl::robin_map<IoHash, size_t, IoHash::Hasher> RawHashToCacheCopyDataIndex; std::vector<CacheCopyData> CacheCopyDatas; + if (!PrimeCacheOnly) { ZEN_TRACE_CPU("UpdateFolder_GetLocalChunks"); @@ -5004,8 +5026,8 @@ namespace { FilteredRate FilteredDownloadedBytesPerSecond; FilteredRate FilteredWrittenBytesPerSecond; - WorkerThreadPool& NetworkPool = SingleThreaded ? GetSyncWorkerPool() : GetSmallWorkerPool(EWorkloadType::Burst); - WorkerThreadPool& WritePool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst); + WorkerThreadPool& NetworkPool = GetNetworkPool(); + WorkerThreadPool& WritePool = GetIOWorkerPool(); ProgressBar WriteProgressBar(UsePlainProgress); ParallellWork Work(AbortFlag); @@ -5097,191 +5119,205 @@ namespace { const std::vector<uint32_t> BlockChunkIndexNeeded = GetNeededChunkBlockIndexes(BlockDescription); if (!BlockChunkIndexNeeded.empty()) { - bool UsingCachedBlock = false; - if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end()) + if (PrimeCacheOnly) { - ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_CacheGet"); - + TotalRequestCount++; TotalPartWriteCount++; - std::filesystem::path BlockPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString(); - if (std::filesystem::exists(BlockPath)) + FullBlockWorks.push_back(BlockIndex); + } + else + { + bool UsingCachedBlock = false; + if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end()) { - CachedChunkBlockIndexes.push_back(BlockIndex); - UsingCachedBlock = true; + ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_CacheGet"); + + TotalPartWriteCount++; + + std::filesystem::path BlockPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString(); + if (std::filesystem::exists(BlockPath)) + { + CachedChunkBlockIndexes.push_back(BlockIndex); + UsingCachedBlock = true; + } } - } - if (!UsingCachedBlock) - { - bool WantsToDoPartialBlockDownload = BlockChunkIndexNeeded.size() < BlockDescription.ChunkRawHashes.size(); - bool CanDoPartialBlockDownload = - (BlockDescription.HeaderSize > 0) && - (BlockDescription.ChunkCompressedLengths.size() == BlockDescription.ChunkRawHashes.size()); - if (AllowPartialBlockRequests && WantsToDoPartialBlockDownload && CanDoPartialBlockDownload) + if (!UsingCachedBlock) { - std::vector<BlockRangeDescriptor> BlockRanges; + bool WantsToDoPartialBlockDownload = BlockChunkIndexNeeded.size() < BlockDescription.ChunkRawHashes.size(); + bool CanDoPartialBlockDownload = + (BlockDescription.HeaderSize > 0) && + (BlockDescription.ChunkCompressedLengths.size() == BlockDescription.ChunkRawHashes.size()); + if (AllowPartialBlockRequests && WantsToDoPartialBlockDownload && CanDoPartialBlockDownload) + { + std::vector<BlockRangeDescriptor> BlockRanges; - ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_PartialAnalysis"); + ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_PartialAnalysis"); - uint32_t NeedBlockChunkIndexOffset = 0; - uint32_t ChunkBlockIndex = 0; - uint32_t CurrentOffset = - gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); + uint32_t NeedBlockChunkIndexOffset = 0; + uint32_t ChunkBlockIndex = 0; + uint32_t CurrentOffset = + gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); - const uint64_t TotalBlockSize = std::accumulate(BlockDescription.ChunkCompressedLengths.begin(), - BlockDescription.ChunkCompressedLengths.end(), - std::uint64_t(CurrentOffset)); + const uint64_t TotalBlockSize = std::accumulate(BlockDescription.ChunkCompressedLengths.begin(), + BlockDescription.ChunkCompressedLengths.end(), + std::uint64_t(CurrentOffset)); - BlockRangeDescriptor NextRange{.BlockIndex = BlockIndex}; - while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() && - ChunkBlockIndex < BlockDescription.ChunkRawHashes.size()) - { - const uint32_t ChunkCompressedLength = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex]; - if (ChunkBlockIndex < BlockChunkIndexNeeded[NeedBlockChunkIndexOffset]) + BlockRangeDescriptor NextRange{.BlockIndex = BlockIndex}; + while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() && + ChunkBlockIndex < BlockDescription.ChunkRawHashes.size()) { - if (NextRange.RangeLength > 0) + const uint32_t ChunkCompressedLength = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex]; + if (ChunkBlockIndex < BlockChunkIndexNeeded[NeedBlockChunkIndexOffset]) + { + if (NextRange.RangeLength > 0) + { + BlockRanges.push_back(NextRange); + NextRange = {.BlockIndex = BlockIndex}; + } + ChunkBlockIndex++; + CurrentOffset += ChunkCompressedLength; + } + else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset]) { - BlockRanges.push_back(NextRange); - NextRange = {.BlockIndex = BlockIndex}; + if (NextRange.RangeLength == 0) + { + NextRange.RangeStart = CurrentOffset; + NextRange.ChunkBlockIndexStart = ChunkBlockIndex; + } + NextRange.RangeLength += ChunkCompressedLength; + NextRange.ChunkBlockIndexCount++; + ChunkBlockIndex++; + CurrentOffset += ChunkCompressedLength; + NeedBlockChunkIndexOffset++; } - ChunkBlockIndex++; - CurrentOffset += ChunkCompressedLength; - } - else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset]) - { - if (NextRange.RangeLength == 0) + else { - NextRange.RangeStart = CurrentOffset; - NextRange.ChunkBlockIndexStart = ChunkBlockIndex; + ZEN_ASSERT(false); } - NextRange.RangeLength += ChunkCompressedLength; - NextRange.ChunkBlockIndexCount++; - ChunkBlockIndex++; - CurrentOffset += ChunkCompressedLength; - NeedBlockChunkIndexOffset++; } - else + if (NextRange.RangeLength > 0) { - ZEN_ASSERT(false); + BlockRanges.push_back(NextRange); } - } - if (NextRange.RangeLength > 0) - { - BlockRanges.push_back(NextRange); - } - ZEN_ASSERT(!BlockRanges.empty()); + ZEN_ASSERT(!BlockRanges.empty()); - std::vector<BlockRangeDescriptor> CollapsedBlockRanges; - auto It = BlockRanges.begin(); - CollapsedBlockRanges.push_back(*It++); - while (It != BlockRanges.end()) - { - BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back(); - uint64_t Slack = It->RangeStart - (LastRange.RangeStart + LastRange.RangeLength); - uint64_t BothRangeSize = It->RangeLength + LastRange.RangeLength; - if (Slack <= Max(BothRangeSize / 8, 64u * 1024u)) // Made up heuristic - we'll see how it pans out - { - LastRange.ChunkBlockIndexCount = - (It->ChunkBlockIndexStart + It->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart; - LastRange.RangeLength = (It->RangeStart + It->RangeLength) - LastRange.RangeStart; - } - else + std::vector<BlockRangeDescriptor> CollapsedBlockRanges; + auto It = BlockRanges.begin(); + CollapsedBlockRanges.push_back(*It++); + while (It != BlockRanges.end()) { - CollapsedBlockRanges.push_back(*It); + BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back(); + uint64_t Slack = It->RangeStart - (LastRange.RangeStart + LastRange.RangeLength); + uint64_t BothRangeSize = It->RangeLength + LastRange.RangeLength; + if (Slack <= Max(BothRangeSize / 8, 64u * 1024u)) // Made up heuristic - we'll see how it pans out + { + LastRange.ChunkBlockIndexCount = + (It->ChunkBlockIndexStart + It->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart; + LastRange.RangeLength = (It->RangeStart + It->RangeLength) - LastRange.RangeStart; + } + else + { + CollapsedBlockRanges.push_back(*It); + } + ++It; } - ++It; - } - - const std::uint64_t WantedSize = std::accumulate( - CollapsedBlockRanges.begin(), - CollapsedBlockRanges.end(), - uint64_t(0), - [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; }); - ZEN_ASSERT(WantedSize <= TotalBlockSize); - if (WantedSize > ((TotalBlockSize * 95) / 100)) - { - ZEN_CONSOLE_VERBOSE("Using more than 95% ({}) of block {} ({}), requesting full block", - NiceBytes(WantedSize), - BlockDescription.BlockHash, - NiceBytes(TotalBlockSize)); - TotalRequestCount++; - TotalPartWriteCount++; - - FullBlockWorks.push_back(BlockIndex); - } - else if ((WantedSize > ((TotalBlockSize * 9) / 10)) && CollapsedBlockRanges.size() > 1) - { - ZEN_CONSOLE_VERBOSE("Using more than 90% ({}) of block {} ({}) using {} requests, requesting full block", - NiceBytes(WantedSize), - BlockDescription.BlockHash, - NiceBytes(TotalBlockSize), - CollapsedBlockRanges.size()); - TotalRequestCount++; - TotalPartWriteCount++; - FullBlockWorks.push_back(BlockIndex); - } - else if ((WantedSize > ((TotalBlockSize * 8) / 10)) && (CollapsedBlockRanges.size() > 16)) - { - ZEN_CONSOLE_VERBOSE("Using more than 80% ({}) of block {} ({}) using {} requests, requesting full block", - NiceBytes(WantedSize), - BlockDescription.BlockHash, - NiceBytes(TotalBlockSize), - CollapsedBlockRanges.size()); - TotalRequestCount++; - TotalPartWriteCount++; + const std::uint64_t WantedSize = std::accumulate( + CollapsedBlockRanges.begin(), + CollapsedBlockRanges.end(), + uint64_t(0), + [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; }); + ZEN_ASSERT(WantedSize <= TotalBlockSize); + if (WantedSize > ((TotalBlockSize * 95) / 100)) + { + ZEN_CONSOLE_VERBOSE("Using more than 95% ({}) of block {} ({}), requesting full block", + NiceBytes(WantedSize), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize)); + TotalRequestCount++; + TotalPartWriteCount++; - FullBlockWorks.push_back(BlockIndex); - } - else if ((WantedSize > ((TotalBlockSize * 7) / 10)) && (CollapsedBlockRanges.size() > 48)) - { - ZEN_CONSOLE_VERBOSE("Using more than 70% ({}) of block {} ({}) using {} requests, requesting full block", - NiceBytes(WantedSize), - BlockDescription.BlockHash, - NiceBytes(TotalBlockSize), - CollapsedBlockRanges.size()); - TotalRequestCount++; - TotalPartWriteCount++; + FullBlockWorks.push_back(BlockIndex); + } + else if ((WantedSize > ((TotalBlockSize * 9) / 10)) && CollapsedBlockRanges.size() > 1) + { + ZEN_CONSOLE_VERBOSE( + "Using more than 90% ({}) of block {} ({}) using {} requests, requesting full block", + NiceBytes(WantedSize), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + CollapsedBlockRanges.size()); + TotalRequestCount++; + TotalPartWriteCount++; + + FullBlockWorks.push_back(BlockIndex); + } + else if ((WantedSize > ((TotalBlockSize * 8) / 10)) && (CollapsedBlockRanges.size() > 16)) + { + ZEN_CONSOLE_VERBOSE( + "Using more than 80% ({}) of block {} ({}) using {} requests, requesting full block", + NiceBytes(WantedSize), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + CollapsedBlockRanges.size()); + TotalRequestCount++; + TotalPartWriteCount++; + + FullBlockWorks.push_back(BlockIndex); + } + else if ((WantedSize > ((TotalBlockSize * 7) / 10)) && (CollapsedBlockRanges.size() > 48)) + { + ZEN_CONSOLE_VERBOSE( + "Using more than 70% ({}) of block {} ({}) using {} requests, requesting full block", + NiceBytes(WantedSize), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + CollapsedBlockRanges.size()); + TotalRequestCount++; + TotalPartWriteCount++; + + FullBlockWorks.push_back(BlockIndex); + } + else if ((WantedSize > ((TotalBlockSize * 6) / 10)) && (CollapsedBlockRanges.size() > 64)) + { + ZEN_CONSOLE_VERBOSE( + "Using more than 60% ({}) of block {} ({}) using {} requests, requesting full block", + NiceBytes(WantedSize), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + CollapsedBlockRanges.size()); + TotalRequestCount++; + TotalPartWriteCount++; + + FullBlockWorks.push_back(BlockIndex); + } + else + { + if (WantedSize > ((TotalBlockSize * 5) / 10)) + { + ZEN_CONSOLE_VERBOSE("Using {}% ({}) of block {} ({}) using {} requests, requesting partial block", + (WantedSize * 100) / TotalBlockSize, + NiceBytes(WantedSize), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + CollapsedBlockRanges.size()); + } + TotalRequestCount += CollapsedBlockRanges.size(); + TotalPartWriteCount += CollapsedBlockRanges.size(); - FullBlockWorks.push_back(BlockIndex); + BlockRangeWorks.insert(BlockRangeWorks.end(), CollapsedBlockRanges.begin(), CollapsedBlockRanges.end()); + } } - else if ((WantedSize > ((TotalBlockSize * 6) / 10)) && (CollapsedBlockRanges.size() > 64)) + else { - ZEN_CONSOLE_VERBOSE("Using more than 60% ({}) of block {} ({}) using {} requests, requesting full block", - NiceBytes(WantedSize), - BlockDescription.BlockHash, - NiceBytes(TotalBlockSize), - CollapsedBlockRanges.size()); TotalRequestCount++; TotalPartWriteCount++; FullBlockWorks.push_back(BlockIndex); } - else - { - if (WantedSize > ((TotalBlockSize * 5) / 10)) - { - ZEN_CONSOLE_VERBOSE("Using {}% ({}) of block {} ({}) using {} requests, requesting partial block", - (WantedSize * 100) / TotalBlockSize, - NiceBytes(WantedSize), - BlockDescription.BlockHash, - NiceBytes(TotalBlockSize), - CollapsedBlockRanges.size()); - } - TotalRequestCount += CollapsedBlockRanges.size(); - TotalPartWriteCount += CollapsedBlockRanges.size(); - - BlockRangeWorks.insert(BlockRangeWorks.end(), CollapsedBlockRanges.begin(), CollapsedBlockRanges.end()); - } - } - else - { - TotalRequestCount++; - TotalPartWriteCount++; - - FullBlockWorks.push_back(BlockIndex); } } } @@ -5363,6 +5399,12 @@ namespace { std::move(LooseChunkHashWork.ChunkTargetPtrs); const uint32_t RemoteChunkIndex = LooseChunkHashWork.RemoteChunkIndex; + if (PrimeCacheOnly && ExistsResult.ExistingBlobs.contains(RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex])) + { + DownloadStats.RequestsCompleteCount++; + continue; + } + Work.ScheduleWork( WritePool, [&, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) mutable { @@ -5370,6 +5412,7 @@ namespace { { ZEN_TRACE_CPU("UpdateFolder_ReadPreDownloaded"); std::filesystem::path ExistingCompressedChunkPath; + if (!PrimeCacheOnly) { const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; std::filesystem::path CompressedChunkPath = Path / ZenTempDownloadFolderName / ChunkHash.ToHexString(); @@ -5481,6 +5524,7 @@ namespace { NetworkPool, [&Path, &Storage, + PrimeCacheOnly, BuildId, &RemoteContent, &RemoteLookup, @@ -5503,54 +5547,67 @@ namespace { &DownloadStats](std::atomic<bool>&) mutable { if (!AbortFlag) { - FilteredDownloadedBytesPerSecond.Start(); const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; - if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize) + FilteredDownloadedBytesPerSecond.Start(); + IoBuffer BuildBlob; + const bool ExistsInCache = + Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash); + if (ExistsInCache) { - ZEN_TRACE_CPU("UpdateFolder_GetLargeChunk"); - DownloadLargeBlob(*Storage.BuildStorage, - Path / ZenTempDownloadFolderName, - BuildId, - ChunkHash, - PreferredMultipartChunkSize, - Work, - NetworkPool, - DownloadStats, - [&, RemoteChunkIndex, ChunkTargetPtrs](IoBuffer&& Payload) mutable { - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - if (!AbortFlag) - { - AsyncWriteDownloadedChunk( - Path, - RemoteContent, - RemoteLookup, - RemoteChunkIndex, - std::move(ChunkTargetPtrs), - Work, - WritePool, - std::move(Payload), - SequenceIndexChunksLeftToWriteCounters, - WritePartsComplete, - TotalPartWriteCount, - FilteredWrittenBytesPerSecond, - DiskStats); - } - }); + BuildBlob = Storage.BuildCacheStorage->GetBuildBlob(BuildId, ChunkHash); } - else + if (!BuildBlob) { - ZEN_TRACE_CPU("UpdateFolder_GetChunk"); - IoBuffer BuildBlob; - if (Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash)) + if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize) { - BuildBlob = Storage.BuildCacheStorage->GetBuildBlob(BuildId, ChunkHash); + ZEN_TRACE_CPU("UpdateFolder_GetLargeChunk"); + DownloadLargeBlob( + *Storage.BuildStorage, + Path / ZenTempDownloadFolderName, + BuildId, + ChunkHash, + PreferredMultipartChunkSize, + Work, + NetworkPool, + DownloadStats, + [&, RemoteChunkIndex, ChunkTargetPtrs](IoBuffer&& Payload) mutable { + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + if (Payload && Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->PutBuildBlob( + BuildId, + ChunkHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(SharedBuffer(Payload))); + } + if (!PrimeCacheOnly) + { + if (!AbortFlag) + { + AsyncWriteDownloadedChunk(Path, + RemoteContent, + RemoteLookup, + RemoteChunkIndex, + std::move(ChunkTargetPtrs), + Work, + WritePool, + std::move(Payload), + SequenceIndexChunksLeftToWriteCounters, + WritePartsComplete, + TotalPartWriteCount, + FilteredWrittenBytesPerSecond, + DiskStats); + } + } + }); } - if (!BuildBlob) + else { + ZEN_TRACE_CPU("UpdateFolder_GetChunk"); BuildBlob = Storage.BuildStorage->GetBuildBlob(BuildId, ChunkHash); if (BuildBlob && Storage.BuildCacheStorage) { @@ -5560,34 +5617,37 @@ namespace { BuildBlob.GetContentType(), CompositeBuffer(SharedBuffer(BuildBlob))); } - } - if (!BuildBlob) - { - throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash)); - } - if (!AbortFlag) - { - uint64_t BlobSize = BuildBlob.GetSize(); - DownloadStats.DownloadedChunkCount++; - DownloadStats.DownloadedChunkByteCount += BlobSize; - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + if (!BuildBlob) { - FilteredDownloadedBytesPerSecond.Stop(); + throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash)); + } + if (!PrimeCacheOnly) + { + if (!AbortFlag) + { + uint64_t BlobSize = BuildBlob.GetSize(); + DownloadStats.DownloadedChunkCount++; + DownloadStats.DownloadedChunkByteCount += BlobSize; + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + AsyncWriteDownloadedChunk(Path, + RemoteContent, + RemoteLookup, + RemoteChunkIndex, + std::move(ChunkTargetPtrs), + Work, + WritePool, + std::move(BuildBlob), + SequenceIndexChunksLeftToWriteCounters, + WritePartsComplete, + TotalPartWriteCount, + FilteredWrittenBytesPerSecond, + DiskStats); + } } - AsyncWriteDownloadedChunk(Path, - RemoteContent, - RemoteLookup, - RemoteChunkIndex, - std::move(ChunkTargetPtrs), - Work, - WritePool, - std::move(BuildBlob), - SequenceIndexChunksLeftToWriteCounters, - WritePartsComplete, - TotalPartWriteCount, - FilteredWrittenBytesPerSecond, - DiskStats); } } } @@ -5602,6 +5662,7 @@ namespace { for (size_t CopyDataIndex = 0; CopyDataIndex < CacheCopyDatas.size(); CopyDataIndex++) { + ZEN_ASSERT(!PrimeCacheOnly); if (AbortFlag) { break; @@ -5775,6 +5836,7 @@ namespace { for (uint32_t BlockIndex : CachedChunkBlockIndexes) { + ZEN_ASSERT(!PrimeCacheOnly); if (AbortFlag) { break; @@ -5829,6 +5891,7 @@ namespace { for (size_t BlockRangeIndex = 0; BlockRangeIndex < BlockRangeWorks.size(); BlockRangeIndex++) { + ZEN_ASSERT(!PrimeCacheOnly); if (AbortFlag) { break; @@ -6000,6 +6063,13 @@ namespace { { break; } + + if (PrimeCacheOnly && ExistsResult.ExistingBlobs.contains(BlockDescriptions[BlockIndex].BlockHash)) + { + DownloadStats.RequestsCompleteCount++; + continue; + } + Work.ScheduleWork( NetworkPool, [&, BlockIndex](std::atomic<bool>&) { @@ -6011,8 +6081,10 @@ namespace { FilteredDownloadedBytesPerSecond.Start(); - IoBuffer BlockBuffer; - if (Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash)) + IoBuffer BlockBuffer; + const bool ExistsInCache = + Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash); + if (ExistsInCache) { BlockBuffer = Storage.BuildCacheStorage->GetBuildBlob(BuildId, BlockDescription.BlockHash); } @@ -6042,119 +6114,123 @@ namespace { FilteredDownloadedBytesPerSecond.Stop(); } - std::filesystem::path BlockChunkPath; - - // Check if the dowloaded block is file based and we can move it directly without rewriting it + if (!PrimeCacheOnly) { - IoBufferFileReference FileRef; - if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && - (FileRef.FileChunkSize == BlockSize)) + std::filesystem::path BlockChunkPath; + + // Check if the dowloaded block is file based and we can move it directly without rewriting it { - ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock"); - std::error_code Ec; - std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); - if (!Ec) + IoBufferFileReference FileRef; + if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && + (FileRef.FileChunkSize == BlockSize)) { - BlockBuffer.SetDeleteOnClose(false); - BlockBuffer = {}; - BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString(); - std::filesystem::rename(TempBlobPath, BlockChunkPath, Ec); - if (Ec) + ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock"); + std::error_code Ec; + std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); + if (!Ec) { - BlockChunkPath = std::filesystem::path{}; + BlockBuffer.SetDeleteOnClose(false); + BlockBuffer = {}; + BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString(); + std::filesystem::rename(TempBlobPath, BlockChunkPath, Ec); + if (Ec) + { + BlockChunkPath = std::filesystem::path{}; - // Re-open the temp file again - BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); - BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); - BlockBuffer.SetDeleteOnClose(true); + // Re-open the temp file again + BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); + BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); + BlockBuffer.SetDeleteOnClose(true); + } } } } - } - if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u)) - { - ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock"); - // Could not be moved and rather large, lets store it on disk - BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString(); - TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); - BlockBuffer = {}; - } + if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u)) + { + ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock"); + // Could not be moved and rather large, lets store it on disk + BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString(); + TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); + BlockBuffer = {}; + } - if (!AbortFlag) - { - Work.ScheduleWork( - WritePool, - [&Work, - &WritePool, - &RemoteContent, - &RemoteLookup, - CacheFolderPath, - &RemoteChunkIndexNeedsCopyFromSourceFlags, - &SequenceIndexChunksLeftToWriteCounters, - BlockIndex, - &BlockDescriptions, - &WriteChunkStats, - &DiskStats, - &WritePartsComplete, - TotalPartWriteCount, - &FilteredWrittenBytesPerSecond, - BlockChunkPath, - BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_WriteFullBlock"); + if (!AbortFlag) + { + Work.ScheduleWork( + WritePool, + [&Work, + &WritePool, + &RemoteContent, + &RemoteLookup, + CacheFolderPath, + &RemoteChunkIndexNeedsCopyFromSourceFlags, + &SequenceIndexChunksLeftToWriteCounters, + BlockIndex, + &BlockDescriptions, + &WriteChunkStats, + &DiskStats, + &WritePartsComplete, + TotalPartWriteCount, + &FilteredWrittenBytesPerSecond, + BlockChunkPath, + BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable { + if (!AbortFlag) + { + ZEN_TRACE_CPU("UpdateFolder_WriteFullBlock"); - const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; - if (BlockChunkPath.empty()) - { - ZEN_ASSERT(BlockBuffer); - } - else - { - ZEN_ASSERT(!BlockBuffer); - BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); - if (!BlockBuffer) + if (BlockChunkPath.empty()) { - throw std::runtime_error(fmt::format("Could not open dowloaded block {} from {}", - BlockDescription.BlockHash, - BlockChunkPath)); + ZEN_ASSERT(BlockBuffer); + } + else + { + ZEN_ASSERT(!BlockBuffer); + BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); + if (!BlockBuffer) + { + throw std::runtime_error( + fmt::format("Could not open dowloaded block {} from {}", + BlockDescription.BlockHash, + BlockChunkPath)); + } } - } - FilteredWrittenBytesPerSecond.Start(); - if (!WriteBlockToDisk(CacheFolderPath, - RemoteContent, - BlockDescription, - SequenceIndexChunksLeftToWriteCounters, - Work, - WritePool, - CompositeBuffer(std::move(BlockBuffer)), - RemoteLookup, - RemoteChunkIndexNeedsCopyFromSourceFlags, - DiskStats)) - { - std::error_code DummyEc; - std::filesystem::remove(BlockChunkPath, DummyEc); - throw std::runtime_error( - fmt::format("Block {} is malformed", BlockDescription.BlockHash)); - } + FilteredWrittenBytesPerSecond.Start(); + if (!WriteBlockToDisk(CacheFolderPath, + RemoteContent, + BlockDescription, + SequenceIndexChunksLeftToWriteCounters, + Work, + WritePool, + CompositeBuffer(std::move(BlockBuffer)), + RemoteLookup, + RemoteChunkIndexNeedsCopyFromSourceFlags, + DiskStats)) + { + std::error_code DummyEc; + std::filesystem::remove(BlockChunkPath, DummyEc); + throw std::runtime_error( + fmt::format("Block {} is malformed", BlockDescription.BlockHash)); + } - if (!BlockChunkPath.empty()) - { - std::filesystem::remove(BlockChunkPath); - } + if (!BlockChunkPath.empty()) + { + std::filesystem::remove(BlockChunkPath); + } - WritePartsComplete++; + WritePartsComplete++; - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } } - } - }, - Work.DefaultErrorFunction()); + }, + Work.DefaultErrorFunction()); + } } } } @@ -6176,19 +6252,24 @@ namespace { (DownloadStats.RequestsCompleteCount == TotalRequestCount) ? "" : fmt::format(" {}bits/s", NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8)); - std::string Details = fmt::format("{}/{} ({}{}) downloaded. {}/{} ({}B/s) written.", - DownloadStats.RequestsCompleteCount.load(), - TotalRequestCount, - NiceBytes(DownloadedBytes), - DownloadRateString, - NiceBytes(DiskStats.WriteByteCount.load()), - NiceBytes(BytesToWrite), - NiceNum(FilteredWrittenBytesPerSecond.GetCurrent())); - WriteProgressBar.UpdateState({.Task = "Writing chunks ", - .Details = Details, - .TotalCount = gsl::narrow<uint64_t>(BytesToWrite), - .RemainingCount = gsl::narrow<uint64_t>(BytesToWrite - DiskStats.WriteByteCount.load())}, - false); + std::string WriteDetails = PrimeCacheOnly ? "" + : fmt::format(" {}/{} ({}B/s) written.", + NiceBytes(DiskStats.WriteByteCount.load()), + NiceBytes(BytesToWrite), + NiceNum(FilteredWrittenBytesPerSecond.GetCurrent())); + std::string Details = fmt::format("{}/{} ({}{}) downloaded.{}", + DownloadStats.RequestsCompleteCount.load(), + TotalRequestCount, + NiceBytes(DownloadedBytes), + DownloadRateString, + WriteDetails); + WriteProgressBar.UpdateState( + {.Task = PrimeCacheOnly ? "Downloading " : "Writing chunks ", + .Details = Details, + .TotalCount = PrimeCacheOnly ? TotalRequestCount : BytesToWrite, + .RemainingCount = PrimeCacheOnly ? (TotalRequestCount - DownloadStats.RequestsCompleteCount.load()) + : (BytesToWrite - DiskStats.WriteByteCount.load())}, + false); }); } @@ -6202,21 +6283,24 @@ namespace { WriteProgressBar.Finish(); - uint32_t RawSequencesMissingWriteCount = 0; - for (uint32_t SequenceIndex = 0; SequenceIndex < SequenceIndexChunksLeftToWriteCounters.size(); SequenceIndex++) + if (!PrimeCacheOnly) { - const auto& SequenceIndexChunksLeftToWriteCounter = SequenceIndexChunksLeftToWriteCounters[SequenceIndex]; - if (SequenceIndexChunksLeftToWriteCounter.load() != 0) + uint32_t RawSequencesMissingWriteCount = 0; + for (uint32_t SequenceIndex = 0; SequenceIndex < SequenceIndexChunksLeftToWriteCounters.size(); SequenceIndex++) { - RawSequencesMissingWriteCount++; - const uint32_t PathIndex = RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; - const std::filesystem::path& IncompletePath = RemoteContent.Paths[PathIndex]; - ZEN_ASSERT(!IncompletePath.empty()); - const uint32_t ExpectedSequenceCount = RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]; - ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounter.load() <= ExpectedSequenceCount); + const auto& SequenceIndexChunksLeftToWriteCounter = SequenceIndexChunksLeftToWriteCounters[SequenceIndex]; + if (SequenceIndexChunksLeftToWriteCounter.load() != 0) + { + RawSequencesMissingWriteCount++; + const uint32_t PathIndex = RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; + const std::filesystem::path& IncompletePath = RemoteContent.Paths[PathIndex]; + ZEN_ASSERT(!IncompletePath.empty()); + const uint32_t ExpectedSequenceCount = RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]; + ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounter.load() <= ExpectedSequenceCount); + } } + ZEN_ASSERT(RawSequencesMissingWriteCount == 0); } - ZEN_ASSERT(RawSequencesMissingWriteCount == 0); const uint64_t DownloadedBytes = DownloadStats.DownloadedChunkByteCount.load() + DownloadStats.DownloadedBlockByteCount.load() + +DownloadStats.DownloadedPartialBlockByteCount.load(); @@ -6234,6 +6318,11 @@ namespace { WriteChunkStats.WriteTimeUs = FilteredWrittenBytesPerSecond.GetElapsedTimeUS(); } + if (PrimeCacheOnly) + { + return; + } + // Move all files we will reuse to cache folder // TODO: If WipeTargetFolder is false we could check which files are already correct and leave them in place if (!LocalPathIndexesMatchingSequenceIndexes.empty()) @@ -6319,7 +6408,7 @@ namespace { ZEN_TRACE_CPU("UpdateFolder_FinalizeTree"); Stopwatch Timer; - WorkerThreadPool& WritePool = SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst); + WorkerThreadPool& WritePool = GetIOWorkerPool(); ProgressBar RebuildProgressBar(UsePlainProgress); ParallellWork Work(AbortFlag); @@ -6930,7 +7019,7 @@ namespace { Path, std::move(IsAcceptedFolder), std::move(IsAcceptedFile), - SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst), + GetIOWorkerPool(), UsePlainProgress ? 5000 : 200, [&](bool, std::ptrdiff_t) { ZEN_CONSOLE_VERBOSE("Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), Path); @@ -7003,7 +7092,7 @@ namespace { FilteredBytesHashed.Start(); ChunkedFolderContent UpdatedLocalContent = ChunkFolderContent( ChunkingStats, - SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst), + GetIOWorkerPool(), Path, UpdatedContent, ChunkController, @@ -7080,7 +7169,7 @@ namespace { FilteredBytesHashed.Start(); ChunkedFolderContent UpdatedLocalContent = ChunkFolderContent( ChunkingStats, - SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst), + GetIOWorkerPool(), Path, CurrentLocalFolderContent, ChunkController, @@ -7122,10 +7211,13 @@ namespace { bool AllowMultiparts, bool AllowPartialBlockRequests, bool WipeTargetFolder, - bool PostDownloadVerify) + bool PostDownloadVerify, + bool PrimeCacheOnly) { ZEN_TRACE_CPU("DownloadFolder"); + ZEN_ASSERT((!PrimeCacheOnly) || (PrimeCacheOnly && (!AllowPartialBlockRequests))); + Stopwatch DownloadTimer; const std::filesystem::path ZenTempFolder = Path / ZenTempFolderName; @@ -7150,27 +7242,30 @@ namespace { ChunkedFolderContent RemoteContent = GetRemoteContent(Storage, BuildId, AllBuildParts, ChunkController, PartContents, BlockDescriptions, LooseChunkHashes); - const std::uint64_t LargeAttachmentSize = AllowMultiparts ? PreferredMultipartChunkSize * 4u : (std::uint64_t)-1; - if (!ChunkController) - { - ZEN_CONSOLE("Warning: Unspecified chunking algorith, using default"); - ChunkController = CreateBasicChunkingController(); - } - + const std::uint64_t LargeAttachmentSize = AllowMultiparts ? PreferredMultipartChunkSize * 4u : (std::uint64_t)-1; GetFolderContentStatistics LocalFolderScanStats; ChunkingStatistics ChunkingStats; ChunkedFolderContent LocalContent; - if (std::filesystem::is_directory(Path)) + if (!PrimeCacheOnly) { - if (!WipeTargetFolder) + if (std::filesystem::is_directory(Path)) + { + if (!WipeTargetFolder) + { + if (!ChunkController) + { + ZEN_CONSOLE("Warning: Unspecified chunking algorith, using default"); + ChunkController = CreateBasicChunkingController(); + } + + LocalContent = GetLocalContent(LocalFolderScanStats, ChunkingStats, Path, *ChunkController); + } + } + else { - LocalContent = GetLocalContent(LocalFolderScanStats, ChunkingStats, Path, *ChunkController); + CreateDirectories(Path); } } - else - { - CreateDirectories(Path); - } if (AbortFlag) { return; @@ -7251,6 +7346,7 @@ namespace { LooseChunkHashes, AllowPartialBlockRequests, WipeTargetFolder, + PrimeCacheOnly, LocalFolderState, DiskStats, CacheMappingStats, @@ -7260,20 +7356,23 @@ namespace { if (!AbortFlag) { - VerifyFolder(RemoteContent, Path, PostDownloadVerify, VerifyFolderStats); + if (!PrimeCacheOnly) + { + VerifyFolder(RemoteContent, Path, PostDownloadVerify, VerifyFolderStats); - Stopwatch WriteStateTimer; - CbObject StateObject = CreateStateObject(BuildId, AllBuildParts, PartContents, LocalFolderState); + Stopwatch WriteStateTimer; + CbObject StateObject = CreateStateObject(BuildId, AllBuildParts, PartContents, LocalFolderState); - CreateDirectories((Path / ZenStateFilePath).parent_path()); - TemporaryFile::SafeWriteFile(Path / ZenStateFilePath, StateObject.GetView()); - ZEN_CONSOLE("Wrote local state in {}", NiceTimeSpanMs(WriteStateTimer.GetElapsedTimeMs())); + CreateDirectories((Path / ZenStateFilePath).parent_path()); + TemporaryFile::SafeWriteFile(Path / ZenStateFilePath, StateObject.GetView()); + ZEN_CONSOLE("Wrote local state in {}", NiceTimeSpanMs(WriteStateTimer.GetElapsedTimeMs())); #if 0 - ExtendableStringBuilder<1024> SB; - CompactBinaryToJson(StateObject, SB); - WriteFile(Path / ZenStateFileJsonPath, IoBuffer(IoBuffer::Wrap, SB.Data(), SB.Size())); + ExtendableStringBuilder<1024> SB; + CompactBinaryToJson(StateObject, SB); + WriteFile(Path / ZenStateFileJsonPath, IoBuffer(IoBuffer::Wrap, SB.Data(), SB.Size())); #endif // 0 + } const uint64_t DownloadCount = DownloadStats.DownloadedChunkCount.load() + DownloadStats.DownloadedBlockCount.load() + DownloadStats.DownloadedPartialBlockCount.load(); const uint64_t DownloadByteCount = DownloadStats.DownloadedChunkByteCount.load() + @@ -7307,6 +7406,23 @@ namespace { NiceTimeSpanMs(VerifyFolderStats.VerifyElapsedWallTimeUs / 1000)); } } + if (PrimeCacheOnly) + { + if (Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->Flush(5000, [](intptr_t Remaining) { + if (Remaining == 0) + { + ZEN_CONSOLE("Build cache upload complete"); + } + else + { + ZEN_CONSOLE("Waiting for build cache to complete uploading. {} blobs remaining", Remaining); + } + return !AbortFlag; + }); + } + } if (CleanDirectory(ZenTempFolder, {})) { std::filesystem::remove(ZenTempFolder); @@ -7584,6 +7700,15 @@ BuildsCommand::BuildsCommand() Ops.add_option("output", "", "verbose", "Enable verbose console output", cxxopts::value(m_Verbose), "<verbose>"); }; + auto AddWorkerOptions = [this](cxxopts::Options& Ops) { + Ops.add_option("", + "", + "boost-workers", + "Increase the number of worker threads - may cause computer to less responsive", + cxxopts::value(m_BoostWorkerThreads), + "<boostworkers>"); + }; + m_Options.add_option("", "v", "verb", @@ -7618,6 +7743,7 @@ BuildsCommand::BuildsCommand() AddFileOptions(m_UploadOptions); AddOutputOptions(m_UploadOptions); AddCacheOptions(m_UploadOptions); + AddWorkerOptions(m_UploadOptions); m_UploadOptions.add_options()("h,help", "Print help"); m_UploadOptions.add_option("", "l", "local-path", "Root file system folder for build", cxxopts::value(m_Path), "<local-path>"); m_UploadOptions.add_option("", @@ -7682,6 +7808,15 @@ BuildsCommand::BuildsCommand() AddFileOptions(m_DownloadOptions); AddOutputOptions(m_DownloadOptions); AddCacheOptions(m_DownloadOptions); + + m_DownloadOptions.add_option("cache", + "", + "cache-prime-only", + "Only download blobs missing in cache and upload to cache", + cxxopts::value(m_PrimeCacheOnly), + "<cacheprimeonly>"); + + AddWorkerOptions(m_DownloadOptions); m_DownloadOptions.add_options()("h,help", "Print help"); m_DownloadOptions.add_option("", "l", "local-path", "Root file system folder for build", cxxopts::value(m_Path), "<local-path>"); m_DownloadOptions.add_option("", "", "build-id", "Build Id", cxxopts::value(m_BuildId), "<id>"); @@ -7719,6 +7854,7 @@ BuildsCommand::BuildsCommand() m_DownloadOptions.positional_help("local-path build-id build-part-name"); AddOutputOptions(m_DiffOptions); + AddWorkerOptions(m_DiffOptions); m_DiffOptions.add_options()("h,help", "Print help"); m_DiffOptions.add_option("", "l", "local-path", "Root file system folder used as base", cxxopts::value(m_Path), "<local-path>"); m_DiffOptions.add_option("", "c", "compare-path", "Root file system folder used as diff", cxxopts::value(m_DiffPath), "<diff-path>"); @@ -7735,6 +7871,7 @@ BuildsCommand::BuildsCommand() AddFileOptions(m_TestOptions); AddOutputOptions(m_TestOptions); AddCacheOptions(m_TestOptions); + AddWorkerOptions(m_TestOptions); m_TestOptions.add_options()("h,help", "Print help"); m_TestOptions.add_option("", "l", "local-path", "Root file system folder used as base", cxxopts::value(m_Path), "<local-path>"); m_TestOptions.add_option("", @@ -7765,6 +7902,7 @@ BuildsCommand::BuildsCommand() AddCloudOptions(m_ValidateBuildPartOptions); AddFileOptions(m_ValidateBuildPartOptions); AddOutputOptions(m_ValidateBuildPartOptions); + AddWorkerOptions(m_ValidateBuildPartOptions); m_ValidateBuildPartOptions.add_option("", "", "build-id", "Build Id", cxxopts::value(m_BuildId), "<id>"); m_ValidateBuildPartOptions.add_option("", "", @@ -7786,6 +7924,7 @@ BuildsCommand::BuildsCommand() AddFileOptions(m_MultiTestDownloadOptions); AddOutputOptions(m_MultiTestDownloadOptions); AddCacheOptions(m_MultiTestDownloadOptions); + AddWorkerOptions(m_MultiTestDownloadOptions); m_MultiTestDownloadOptions .add_option("", "l", "local-path", "Root file system folder used as base", cxxopts::value(m_Path), "<local-path>"); m_MultiTestDownloadOptions.add_option("", "", "build-ids", "Build Ids list separated by ','", cxxopts::value(m_BuildIds), "<ids>"); @@ -7991,9 +8130,13 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) .RetryCount = 0}); if (Result.CacheHttp->Get("/health").IsSuccess()) { - Result.BuildCacheStorage = - CreateZenBuildStorageCache(*Result.CacheHttp, StorageCacheStats, m_Namespace, m_Bucket, TempPath / "zencache"); - CacheDescription = fmt::format("zen cache {}. SessionId: '{}'", m_ZenCacheHost, Result.CacheHttp->GetSessionId()); + Result.BuildCacheStorage = CreateZenBuildStorageCache(*Result.CacheHttp, + StorageCacheStats, + m_Namespace, + m_Bucket, + TempPath / "zencache", + m_PrimeCacheOnly); + CacheDescription = fmt::format("zen cache {}. SessionId: '{}'", m_ZenCacheHost, Result.CacheHttp->GetSessionId()); if (!m_Namespace.empty()) { CacheDescription += fmt::format(" {}.", m_Namespace); @@ -8014,6 +8157,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) return Result; }; + BoostWorkerThreads = m_BoostWorkerThreads; + try { if (SubOption == &m_ListOptions) @@ -8243,6 +8388,22 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) throw zen::OptionParseException(fmt::format("build-part-id conflicts with build-part-name\n{}", m_DownloadOptions.help())); } + if (m_PostDownloadVerify && m_PrimeCacheOnly) + { + throw zen::OptionParseException( + fmt::format("'cache-prime-only' option is not compatible with 'verify' option\n{}", m_DownloadOptions.help())); + } + + if (m_Clean && m_PrimeCacheOnly) + { + ZEN_WARN("ignoring 'clean' option when 'cache-prime-only' is enabled"); + } + + if (m_AllowPartialBlockRequests && m_PrimeCacheOnly) + { + ZEN_WARN("ignoring 'allow-partial-block-requests' option when 'cache-prime-only' is enabled"); + } + std::vector<Oid> BuildPartIds; for (const std::string& BuildPartId : m_BuildPartIds) { @@ -8267,9 +8428,10 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) m_BuildPartNames, Path, m_AllowMultiparts, - m_AllowPartialBlockRequests, + m_AllowPartialBlockRequests && !m_PrimeCacheOnly, m_Clean, - m_PostDownloadVerify); + m_PostDownloadVerify, + m_PrimeCacheOnly); if (false) { @@ -8350,7 +8512,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) m_AllowMultiparts, m_AllowPartialBlockRequests, BuildIdString == m_BuildIds.front(), - true); + true, + false); if (AbortFlag) { ZEN_CONSOLE("Download cancelled"); @@ -8441,7 +8604,16 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) const std::filesystem::path DownloadPath = Path.parent_path() / (m_BuildPartName + "_download"); ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}'", BuildId, BuildPartId, m_BuildPartName, DownloadPath); - DownloadFolder(Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, true, true); + DownloadFolder(Storage, + BuildId, + {BuildPartId}, + {}, + DownloadPath, + m_AllowMultiparts, + m_AllowPartialBlockRequests, + true, + true, + false); if (AbortFlag) { ZEN_CONSOLE("Download failed."); @@ -8453,7 +8625,16 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) BuildPartId, m_BuildPartName, DownloadPath); - DownloadFolder(Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true); + DownloadFolder(Storage, + BuildId, + {BuildPartId}, + {}, + DownloadPath, + m_AllowMultiparts, + m_AllowPartialBlockRequests, + false, + true, + false); if (AbortFlag) { ZEN_CONSOLE("Re-download failed. (identical target)"); @@ -8504,7 +8685,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (SourceSize > 256) { Work.ScheduleWork( - SingleThreaded ? GetSyncWorkerPool() : GetMediumWorkerPool(EWorkloadType::Burst), + GetIOWorkerPool(), [SourceSize, FilePath](std::atomic<bool>&) { if (!AbortFlag) { @@ -8557,7 +8738,16 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) BuildPartId, m_BuildPartName, DownloadPath); - DownloadFolder(Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true); + DownloadFolder(Storage, + BuildId, + {BuildPartId}, + {}, + DownloadPath, + m_AllowMultiparts, + m_AllowPartialBlockRequests, + false, + true, + false); if (AbortFlag) { ZEN_CONSOLE("Re-download failed. (scrambled target)"); @@ -8595,7 +8785,16 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (original)", BuildId, BuildPartId, m_BuildPartName, DownloadPath); - DownloadFolder(Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true); + DownloadFolder(Storage, + BuildId, + {BuildPartId}, + {}, + DownloadPath, + m_AllowMultiparts, + m_AllowPartialBlockRequests, + false, + true, + false); if (AbortFlag) { ZEN_CONSOLE("Re-download failed."); @@ -8611,7 +8810,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) m_AllowMultiparts, m_AllowPartialBlockRequests, false, - true); + true, + false); if (AbortFlag) { ZEN_CONSOLE("Re-download failed."); @@ -8627,7 +8827,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) m_AllowMultiparts, m_AllowPartialBlockRequests, false, - true); + true, + false); if (AbortFlag) { ZEN_CONSOLE("Re-download failed."); diff --git a/src/zen/cmds/builds_cmd.h b/src/zen/cmds/builds_cmd.h index b5af236e1..46257a567 100644 --- a/src/zen/cmds/builds_cmd.h +++ b/src/zen/cmds/builds_cmd.h @@ -27,8 +27,9 @@ private: std::string m_SystemRootDir; - bool m_PlainProgress = false; - bool m_Verbose = false; + bool m_PlainProgress = false; + bool m_Verbose = false; + bool m_BoostWorkerThreads = false; // cloud builds std::string m_BuildsUrl; @@ -42,6 +43,7 @@ private: // cache std::string m_ZenCacheHost; + bool m_PrimeCacheOnly = false; std::string m_BuildId; bool m_CreateBuild = false; diff --git a/src/zenutil/buildstoragecache.cpp b/src/zenutil/buildstoragecache.cpp index c95215889..f273ac699 100644 --- a/src/zenutil/buildstoragecache.cpp +++ b/src/zenutil/buildstoragecache.cpp @@ -11,6 +11,7 @@ #include <zencore/workthreadpool.h> #include <zenhttp/httpclient.h> #include <zenhttp/packageformat.h> +#include <zenutil/workerpools.h> ZEN_THIRD_PARTY_INCLUDES_START #include <tsl/robin_set.h> @@ -27,13 +28,16 @@ public: BuildStorageCache::Statistics& Stats, std::string_view Namespace, std::string_view Bucket, - const std::filesystem::path& TempFolderPath) + const std::filesystem::path& TempFolderPath, + bool BoostBackgroundThreadCount) : m_HttpClient(HttpClient) , m_Stats(Stats) , m_Namespace(Namespace.empty() ? "none" : Namespace) , m_Bucket(Bucket.empty() ? "none" : Bucket) , m_TempFolderPath(std::filesystem::path(TempFolderPath).make_preferred()) - , m_BackgroundWorkPool(1) + , m_BoostBackgroundThreadCount(BoostBackgroundThreadCount) + , m_BackgroundWorkPool(m_BoostBackgroundThreadCount ? GetSmallWorkerPool(EWorkloadType::Background) + : GetTinyWorkerPool(EWorkloadType::Background)) , m_PendingBackgroundWorkCount(1) , m_CancelBackgroundWork(false) { @@ -44,8 +48,11 @@ public: try { m_CancelBackgroundWork.store(true); - m_PendingBackgroundWorkCount.CountDown(); - m_PendingBackgroundWorkCount.Wait(); + if (!IsFlushed) + { + m_PendingBackgroundWorkCount.CountDown(); + m_PendingBackgroundWorkCount.Wait(); + } } catch (const std::exception& Ex) { @@ -86,6 +93,7 @@ public: ZenContentType ContentType, const CompositeBuffer& Payload) override { + ZEN_ASSERT(!IsFlushed); ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary); ScheduleBackgroundWork( [this, BuildId = Oid(BuildId), RawHash = IoHash(RawHash), ContentType, Payload = CompositeBuffer(Payload)]() { @@ -132,6 +140,7 @@ public: virtual void PutBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes, std::span<const CbObject> MetaDatas) override { + ZEN_ASSERT(!IsFlushed); ScheduleBackgroundWork([this, BuildId = Oid(BuildId), BlobRawHashes = std::vector<IoHash>(BlobHashes.begin(), BlobHashes.end()), @@ -329,6 +338,39 @@ public: return {}; } + virtual void Flush(int32_t UpdateInteralMS, std::function<bool(intptr_t Remaining)>&& UpdateCallback) override + { + if (IsFlushed) + { + return; + } + if (!IsFlushed) + { + m_PendingBackgroundWorkCount.CountDown(); + IsFlushed = true; + } + if (m_PendingBackgroundWorkCount.Wait(100)) + { + return; + } + while (true) + { + intptr_t Remaining = m_PendingBackgroundWorkCount.Remaining(); + if (UpdateCallback(Remaining)) + { + if (m_PendingBackgroundWorkCount.Wait(UpdateInteralMS)) + { + UpdateCallback(0); + return; + } + } + else + { + m_CancelBackgroundWork.store(true); + } + } + } + private: void AddStatistic(const HttpClient::Response& Result) { @@ -343,8 +385,10 @@ private: const std::string m_Namespace; const std::string m_Bucket; const std::filesystem::path m_TempFolderPath; + const bool m_BoostBackgroundThreadCount; + bool IsFlushed = false; - WorkerThreadPool m_BackgroundWorkPool; + WorkerThreadPool& m_BackgroundWorkPool; Latch m_PendingBackgroundWorkCount; std::atomic<bool> m_CancelBackgroundWork; }; @@ -354,9 +398,10 @@ CreateZenBuildStorageCache(HttpClient& HttpClient, BuildStorageCache::Statistics& Stats, std::string_view Namespace, std::string_view Bucket, - const std::filesystem::path& TempFolderPath) + const std::filesystem::path& TempFolderPath, + bool BoostBackgroundThreadCount) { - return std::make_unique<ZenBuildStorageCache>(HttpClient, Stats, Namespace, Bucket, TempFolderPath); + return std::make_unique<ZenBuildStorageCache>(HttpClient, Stats, Namespace, Bucket, TempFolderPath, BoostBackgroundThreadCount); } } // namespace zen diff --git a/src/zenutil/include/zenutil/buildstoragecache.h b/src/zenutil/include/zenutil/buildstoragecache.h index 08c936bf5..cab35328d 100644 --- a/src/zenutil/include/zenutil/buildstoragecache.h +++ b/src/zenutil/include/zenutil/buildstoragecache.h @@ -42,11 +42,16 @@ public: }; virtual std::vector<BlobExistsResult> BlobsExists(const Oid& BuildId, std::span<const IoHash> BlobHashes) = 0; + + virtual void Flush( + int32_t UpdateInteralMS, + std::function<bool(intptr_t Remaining)>&& UpdateCallback = [](intptr_t) { return true; }) = 0; }; std::unique_ptr<BuildStorageCache> CreateZenBuildStorageCache(HttpClient& HttpClient, BuildStorageCache::Statistics& Stats, std::string_view Namespace, std::string_view Bucket, - const std::filesystem::path& TempFolderPath); + const std::filesystem::path& TempFolderPath, + bool BoostBackgroundThreadCount); } // namespace zen diff --git a/src/zenutil/include/zenutil/workerpools.h b/src/zenutil/include/zenutil/workerpools.h index 9683ad720..df2033bca 100644 --- a/src/zenutil/include/zenutil/workerpools.h +++ b/src/zenutil/include/zenutil/workerpools.h @@ -21,6 +21,9 @@ WorkerThreadPool& GetMediumWorkerPool(EWorkloadType WorkloadType); // Worker pool with std::thread::hardware_concurrency() / 8 worker threads, but at least one thread WorkerThreadPool& GetSmallWorkerPool(EWorkloadType WorkloadType); +// Worker pool with minimum number of worker threads, but at least one thread +WorkerThreadPool& GetTinyWorkerPool(EWorkloadType WorkloadType); + // Special worker pool that does not use worker thread but issues all scheduled work on the calling thread // This is useful for debugging when multiple async thread can make stepping in debugger complicated WorkerThreadPool& GetSyncWorkerPool(); diff --git a/src/zenutil/workerpools.cpp b/src/zenutil/workerpools.cpp index e3165e838..797034978 100644 --- a/src/zenutil/workerpools.cpp +++ b/src/zenutil/workerpools.cpp @@ -11,9 +11,10 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { namespace { - const int LargeWorkerThreadPoolTreadCount = gsl::narrow<int>(std::thread::hardware_concurrency()); + const int LargeWorkerThreadPoolTreadCount = gsl::narrow<int>(Max(std::thread::hardware_concurrency() - 1u, 2u)); const int MediumWorkerThreadPoolTreadCount = gsl::narrow<int>(Max((std::thread::hardware_concurrency() / 4u), 2u)); const int SmallWorkerThreadPoolTreadCount = gsl::narrow<int>(Max((std::thread::hardware_concurrency() / 8u), 1u)); + const int TinyWorkerThreadPoolTreadCount = 1; static bool IsShutDown = false; @@ -35,6 +36,9 @@ namespace { WorkerPool BurstSmallWorkerPool = {.TreadCount = SmallWorkerThreadPoolTreadCount, .Name = "SmallThreadPool(burst)"}; WorkerPool BackgroundSmallWorkerPool = {.TreadCount = SmallWorkerThreadPoolTreadCount, .Name = "SmallThreadPool(bkg)"}; + WorkerPool BurstTinyWorkerPool = {.TreadCount = TinyWorkerThreadPoolTreadCount, .Name = "TinyThreadPool(burst)"}; + WorkerPool BackgroundTinyWorkerPool = {.TreadCount = TinyWorkerThreadPoolTreadCount, .Name = "TinyThreadPool(bkg)"}; + WorkerPool SyncWorkerPool = {.TreadCount = 0, .Name = "SyncThreadPool"}; WorkerThreadPool& EnsurePoolPtr(WorkerPool& Pool) @@ -75,6 +79,12 @@ GetSmallWorkerPool(EWorkloadType WorkloadType) } WorkerThreadPool& +GetTinyWorkerPool(EWorkloadType WorkloadType) +{ + return EnsurePoolPtr(WorkloadType == EWorkloadType::Burst ? BurstTinyWorkerPool : BackgroundTinyWorkerPool); +} + +WorkerThreadPool& GetSyncWorkerPool() { return EnsurePoolPtr(SyncWorkerPool); @@ -91,6 +101,8 @@ ShutdownWorkerPools() BackgroundMediumWorkerPool.Pool.reset(); BurstSmallWorkerPool.Pool.reset(); BackgroundSmallWorkerPool.Pool.reset(); + BurstTinyWorkerPool.Pool.reset(); + BackgroundTinyWorkerPool.Pool.reset(); SyncWorkerPool.Pool.reset(); } } // namespace zen |