// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include ZEN_THIRD_PARTY_INCLUDES_END namespace zen { using namespace std::literals; namespace { bool IsExtensionHashCompressable(const tsl::robin_set& NonCompressableExtensionHashes, const uint32_t PathHash) { return !NonCompressableExtensionHashes.contains(PathHash); } bool IsChunkCompressable(const tsl::robin_set& NonCompressableExtensionHashes, const ChunkedContentLookup& Lookup, uint32_t ChunkIndex) { const uint32_t ChunkLocationCount = Lookup.ChunkSequenceLocationCounts[ChunkIndex]; if (ChunkLocationCount == 0) { return false; } const size_t ChunkLocationOffset = Lookup.ChunkSequenceLocationOffset[ChunkIndex]; const uint32_t SequenceIndex = Lookup.ChunkSequenceLocations[ChunkLocationOffset].SequenceIndex; const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex]; const uint32_t ExtensionHash = Lookup.PathExtensionHash[PathIndex]; const bool IsCompressable = IsExtensionHashCompressable(NonCompressableExtensionHashes, ExtensionHash); return IsCompressable; } template std::string FormatArray(std::span Items, std::string_view Prefix) { ExtendableStringBuilder<512> SB; for (const T& Item : Items) { SB.Append(fmt::format("{}{}", Prefix, Item)); } return SB.ToString(); } } // namespace class ReadFileCache { public: // A buffered file reader that provides CompositeBuffer where the buffers are owned and the memory never overwritten ReadFileCache(std::atomic& OpenReadCount, std::atomic& CurrentOpenFileCount, std::atomic& ReadCount, std::atomic& ReadByteCount, const std::filesystem::path& Path, const ChunkedFolderContent& LocalContent, const ChunkedContentLookup& LocalLookup, size_t MaxOpenFileCount) : m_Path(Path) , m_LocalContent(LocalContent) , m_LocalLookup(LocalLookup) , m_OpenReadCount(OpenReadCount) , m_CurrentOpenFileCount(CurrentOpenFileCount) , m_ReadCount(ReadCount) , m_ReadByteCount(ReadByteCount) { m_OpenFiles.reserve(MaxOpenFileCount); } ~ReadFileCache() { m_OpenFiles.clear(); } CompositeBuffer GetRange(uint32_t SequenceIndex, uint64_t Offset, uint64_t Size) { ZEN_TRACE_CPU("ReadFileCache::GetRange"); auto CacheIt = std::find_if(m_OpenFiles.begin(), m_OpenFiles.end(), [SequenceIndex](const auto& Lhs) { return Lhs.first == SequenceIndex; }); if (CacheIt != m_OpenFiles.end()) { if (CacheIt != m_OpenFiles.begin()) { auto CachedFile(std::move(CacheIt->second)); m_OpenFiles.erase(CacheIt); m_OpenFiles.insert(m_OpenFiles.begin(), std::make_pair(SequenceIndex, std::move(CachedFile))); } CompositeBuffer Result = m_OpenFiles.front().second->GetRange(Offset, Size); return Result; } const uint32_t LocalPathIndex = m_LocalLookup.SequenceIndexFirstPathIndex[SequenceIndex]; const std::filesystem::path LocalFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred(); if (Size == m_LocalContent.RawSizes[LocalPathIndex]) { IoBuffer Result = IoBufferBuilder::MakeFromFile(LocalFilePath); return CompositeBuffer(SharedBuffer(Result)); } if (m_OpenFiles.size() == m_OpenFiles.capacity()) { m_OpenFiles.pop_back(); } m_OpenFiles.insert( m_OpenFiles.begin(), std::make_pair( SequenceIndex, std::make_unique(LocalFilePath, m_OpenReadCount, m_CurrentOpenFileCount, m_ReadCount, m_ReadByteCount))); CompositeBuffer Result = m_OpenFiles.front().second->GetRange(Offset, Size); return Result; } private: const std::filesystem::path m_Path; const ChunkedFolderContent& m_LocalContent; const ChunkedContentLookup& m_LocalLookup; std::vector>> m_OpenFiles; std::atomic& m_OpenReadCount; std::atomic& m_CurrentOpenFileCount; std::atomic& m_ReadCount; std::atomic& m_ReadByteCount; }; BuildsOperationUploadFolder::BuildsOperationUploadFolder(LoggerRef Log, ProgressBase& Progress, StorageInstance& Storage, std::atomic& AbortFlag, std::atomic& PauseFlag, WorkerThreadPool& IOWorkerPool, WorkerThreadPool& NetworkPool, const Oid& BuildId, const std::filesystem::path& Path, bool CreateBuild, const CbObject& MetaData, const Options& Options) : m_Log(Log) , m_Progress(Progress) , m_Storage(Storage) , m_AbortFlag(AbortFlag) , m_PauseFlag(PauseFlag) , m_IOWorkerPool(IOWorkerPool) , m_NetworkPool(NetworkPool) , m_BuildId(BuildId) , m_Path(Path) , m_CreateBuild(CreateBuild) , m_MetaData(MetaData) , m_Options(Options) { m_NonCompressableExtensionHashes.reserve(Options.NonCompressableExtensions.size()); for (const std::string& Extension : Options.NonCompressableExtensions) { m_NonCompressableExtensionHashes.insert(HashStringAsLowerDjb2(Extension)); } } BuildsOperationUploadFolder::PrepareBuildResult BuildsOperationUploadFolder::PrepareBuild() { ZEN_TRACE_CPU("PrepareBuild"); PrepareBuildResult Result; Result.PreferredMultipartChunkSize = m_Options.PreferredMultipartChunkSize; Stopwatch Timer; if (m_CreateBuild) { ZEN_TRACE_CPU("CreateBuild"); Stopwatch PutBuildTimer; CbObject PutBuildResult = m_Storage.BuildStorage->PutBuild(m_BuildId, m_MetaData); Result.PrepareBuildTimeMs = PutBuildTimer.GetElapsedTimeMs(); if (auto ChunkSize = PutBuildResult["chunkSize"sv].AsUInt64(); ChunkSize != 0) { Result.PreferredMultipartChunkSize = ChunkSize; } Result.PayloadSize = m_MetaData.GetSize(); } else { ZEN_TRACE_CPU("PutBuild"); Stopwatch GetBuildTimer; CbObject Build = m_Storage.BuildStorage->GetBuild(m_BuildId); Result.PrepareBuildTimeMs = GetBuildTimer.GetElapsedTimeMs(); Result.PayloadSize = Build.GetSize(); if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0) { Result.PreferredMultipartChunkSize = ChunkSize; } else if (m_Options.AllowMultiparts) { ZEN_WARN("PreferredMultipartChunkSize is unknown. Defaulting to '{}'", NiceBytes(Result.PreferredMultipartChunkSize)); } } if (!m_Options.IgnoreExistingBlocks) { ZEN_TRACE_CPU("FindBlocks"); Stopwatch KnownBlocksTimer; CbObject BlockDescriptionList = m_Storage.BuildStorage->FindBlocks(m_BuildId, m_Options.FindBlockMaxCount); if (BlockDescriptionList) { Result.KnownBlocks = ParseChunkBlockDescriptionList(BlockDescriptionList); } Result.FindBlocksTimeMs = KnownBlocksTimer.GetElapsedTimeMs(); } Result.ElapsedTimeMs = Timer.GetElapsedTimeMs(); return Result; } std::vector BuildsOperationUploadFolder::ReadFolder() { std::vector UploadParts; std::filesystem::path ExcludeManifestPath = m_Path / m_Options.ZenExcludeManifestName; tsl::robin_set ExcludeAssetPaths; if (IsFile(ExcludeManifestPath)) { std::filesystem::path AbsoluteExcludeManifestPath = MakeSafeAbsolutePath(ExcludeManifestPath.is_absolute() ? ExcludeManifestPath : m_Path / ExcludeManifestPath); BuildManifest Manifest = ParseBuildManifest(AbsoluteExcludeManifestPath); const std::vector& AssetPaths = Manifest.Parts.front().Files; ExcludeAssetPaths.reserve(AssetPaths.size()); for (const std::filesystem::path& AssetPath : AssetPaths) { ExcludeAssetPaths.insert(AssetPath.generic_string()); } } UploadParts.resize(1); UploadPart& Part = UploadParts.front(); GetFolderContentStatistics& LocalFolderScanStats = Part.LocalFolderScanStats; Part.Content = GetFolderContent( Part.LocalFolderScanStats, m_Path, [this](const std::string_view& RelativePath) { return IsAcceptedFolder(RelativePath); }, [this, &ExcludeAssetPaths](const std::string_view& RelativePath, uint64_t Size, uint32_t Attributes) -> bool { ZEN_UNUSED(Size, Attributes); if (!IsAcceptedFile(RelativePath)) { return false; } if (ExcludeAssetPaths.contains(std::filesystem::path(RelativePath).generic_string())) { return false; } return true; }, m_IOWorkerPool, m_Progress.GetProgressUpdateDelayMS(), [&](bool, std::ptrdiff_t) { ZEN_INFO("Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), m_Path); }, m_AbortFlag); Part.TotalRawSize = std::accumulate(Part.Content.RawSizes.begin(), Part.Content.RawSizes.end(), std::uint64_t(0)); return UploadParts; } std::vector BuildsOperationUploadFolder::ReadManifestParts(const std::filesystem::path& ManifestPath) { std::vector UploadParts; Stopwatch ManifestParseTimer; std::filesystem::path AbsoluteManifestPath = MakeSafeAbsolutePath(ManifestPath.is_absolute() ? ManifestPath : m_Path / ManifestPath); BuildManifest Manifest = ParseBuildManifest(AbsoluteManifestPath); if (Manifest.Parts.empty()) { throw std::runtime_error(fmt::format("Manifest file at '{}' is invalid", ManifestPath)); } UploadParts.resize(Manifest.Parts.size()); for (size_t PartIndex = 0; PartIndex < Manifest.Parts.size(); PartIndex++) { BuildManifest::Part& PartManifest = Manifest.Parts[PartIndex]; if (ManifestPath.is_relative()) { PartManifest.Files.push_back(ManifestPath); } UploadPart& Part = UploadParts[PartIndex]; FolderContent& Content = Part.Content; GetFolderContentStatistics& LocalFolderScanStats = Part.LocalFolderScanStats; const std::vector& AssetPaths = PartManifest.Files; Content = GetValidFolderContent( m_IOWorkerPool, LocalFolderScanStats, m_Path, AssetPaths, [](uint64_t PathCount, uint64_t CompletedPathCount) { ZEN_UNUSED(PathCount, CompletedPathCount); }, 1000, m_AbortFlag, m_PauseFlag); if (Content.Paths.size() != AssetPaths.size()) { const tsl::robin_set FoundPaths(Content.Paths.begin(), Content.Paths.end()); ExtendableStringBuilder<1024> SB; for (const std::filesystem::path& AssetPath : AssetPaths) { if (!FoundPaths.contains(AssetPath)) { SB << "\n " << AssetPath.generic_string(); } } throw std::runtime_error( fmt::format("Manifest file at '{}' references files that does not exist{}", ManifestPath, SB.ToView())); } Part.PartId = PartManifest.PartId; Part.PartName = PartManifest.PartName; Part.TotalRawSize = std::accumulate(Part.Content.RawSizes.begin(), Part.Content.RawSizes.end(), std::uint64_t(0)); } return UploadParts; } std::vector> BuildsOperationUploadFolder::Execute(const Oid& BuildPartId, const std::string_view BuildPartName, const std::filesystem::path& ManifestPath, ChunkingController& ChunkController, ChunkingCache& ChunkCache) { ZEN_TRACE_CPU("BuildsOperationUploadFolder::Execute"); try { Stopwatch ReadPartsTimer; std::vector UploadParts = ManifestPath.empty() ? ReadFolder() : ReadManifestParts(ManifestPath); for (UploadPart& Part : UploadParts) { if (Part.PartId == Oid::Zero) { if (UploadParts.size() != 1) { throw std::runtime_error(fmt::format("Multi part upload manifest '{}' must contains build part id", ManifestPath)); } if (BuildPartId == Oid::Zero) { Part.PartId = Oid::NewOid(); } else { Part.PartId = BuildPartId; } } if (Part.PartName.empty()) { if (UploadParts.size() != 1) { throw std::runtime_error(fmt::format("Multi part upload manifest '{}' must contains build part name", ManifestPath)); } if (BuildPartName.empty()) { throw std::runtime_error("Build part name must be set"); } Part.PartName = std::string(BuildPartName); } } if (!m_Options.IsQuiet) { ZEN_INFO("Reading {} parts took {}", UploadParts.size(), NiceTimeSpanMs(ReadPartsTimer.GetElapsedTimeMs())); } const uint32_t PartsUploadStepCount = gsl::narrow(uint32_t(PartTaskSteps::StepCount) * UploadParts.size()); const uint32_t PrepareBuildStep = 0; const uint32_t UploadPartsStep = 1; const uint32_t FinalizeBuildStep = UploadPartsStep + PartsUploadStepCount; const uint32_t CleanupStep = FinalizeBuildStep + 1; const uint32_t StepCount = CleanupStep + 1; auto EndProgress = MakeGuard([&]() { m_Progress.SetLogOperationProgress(StepCount, StepCount); }); Stopwatch ProcessTimer; CleanAndRemoveDirectory(m_IOWorkerPool, m_AbortFlag, m_PauseFlag, m_Options.TempDir); CreateDirectories(m_Options.TempDir); auto _ = MakeGuard([&]() { CleanAndRemoveDirectory(m_IOWorkerPool, m_AbortFlag, m_PauseFlag, m_Options.TempDir); }); m_Progress.SetLogOperationProgress(PrepareBuildStep, StepCount); m_PrepBuildResultFuture = m_NetworkPool.EnqueueTask(std::packaged_task{[this] { return PrepareBuild(); }}, WorkerThreadPool::EMode::EnableBacklog); for (uint32_t PartIndex = 0; PartIndex < UploadParts.size(); PartIndex++) { const uint32_t PartStepOffset = UploadPartsStep + (PartIndex * uint32_t(PartTaskSteps::StepCount)); const UploadPart& Part = UploadParts[PartIndex]; UploadBuildPart(ChunkController, ChunkCache, PartIndex, Part, PartStepOffset, StepCount); if (m_AbortFlag) { return {}; } } m_Progress.SetLogOperationProgress(FinalizeBuildStep, StepCount); if (m_CreateBuild && !m_AbortFlag) { Stopwatch FinalizeBuildTimer; m_Storage.BuildStorage->FinalizeBuild(m_BuildId); if (!m_Options.IsQuiet) { ZEN_INFO("FinalizeBuild took {}", NiceTimeSpanMs(FinalizeBuildTimer.GetElapsedTimeMs())); } } m_Progress.SetLogOperationProgress(CleanupStep, StepCount); std::vector> Result; Result.reserve(UploadParts.size()); for (UploadPart& Part : UploadParts) { Result.push_back(std::make_pair(Part.PartId, Part.PartName)); } return Result; } catch (const std::exception&) { m_AbortFlag = true; throw; } } bool BuildsOperationUploadFolder::IsAcceptedFolder(const std::string_view& RelativePath) const { for (const std::string& ExcludeFolder : m_Options.ExcludeFolders) { if (StrCaseStartsWith(RelativePath, ExcludeFolder)) { if (RelativePath.length() == ExcludeFolder.length()) { return false; } else if (RelativePath[ExcludeFolder.length()] == '/') { return false; } } } return true; } bool BuildsOperationUploadFolder::IsAcceptedFile(const std::string_view& RelativePath) const { if (RelativePath == m_Options.ZenExcludeManifestName) { return false; } for (const std::string& ExcludeExtension : m_Options.ExcludeExtensions) { if (StrCaseEndsWith(RelativePath, ExcludeExtension)) { return false; } } return true; } void BuildsOperationUploadFolder::ArrangeChunksIntoBlocks(const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, std::vector& ChunkIndexes, std::vector>& OutBlocks) { ZEN_TRACE_CPU("ArrangeChunksIntoBlocks"); std::sort(ChunkIndexes.begin(), ChunkIndexes.end(), [&Content, &Lookup](uint32_t Lhs, uint32_t Rhs) { const ChunkedContentLookup::ChunkSequenceLocation& LhsLocation = GetChunkSequenceLocations(Lookup, Lhs)[0]; const ChunkedContentLookup::ChunkSequenceLocation& RhsLocation = GetChunkSequenceLocations(Lookup, Rhs)[0]; if (LhsLocation.SequenceIndex < RhsLocation.SequenceIndex) { return true; } else if (LhsLocation.SequenceIndex > RhsLocation.SequenceIndex) { return false; } return LhsLocation.Offset < RhsLocation.Offset; }); uint64_t MaxBlockSizeLowThreshold = m_Options.BlockParameters.MaxBlockSize - (m_Options.BlockParameters.MaxBlockSize / 16); uint64_t BlockSize = 0; uint32_t ChunkIndexStart = 0; for (uint32_t ChunkIndexOffset = 0; ChunkIndexOffset < ChunkIndexes.size();) { const uint32_t ChunkIndex = ChunkIndexes[ChunkIndexOffset]; const uint64_t ChunkSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; if (((BlockSize + ChunkSize) > m_Options.BlockParameters.MaxBlockSize) || (ChunkIndexOffset - ChunkIndexStart) > m_Options.BlockParameters.MaxChunksPerBlock) { // Within the span of MaxBlockSizeLowThreshold and MaxBlockSize, see if there is a break // between source paths for chunks. Break the block at the last such break if any. ZEN_ASSERT(ChunkIndexOffset > ChunkIndexStart); const uint32_t ChunkSequenceIndex = Lookup.ChunkSequenceLocations[Lookup.ChunkSequenceLocationOffset[ChunkIndex]].SequenceIndex; uint64_t ScanBlockSize = BlockSize; uint32_t ScanChunkIndexOffset = ChunkIndexOffset - 1; while (ScanChunkIndexOffset > (ChunkIndexStart + 2)) { const uint32_t TestChunkIndex = ChunkIndexes[ScanChunkIndexOffset]; const uint64_t TestChunkSize = Content.ChunkedContent.ChunkRawSizes[TestChunkIndex]; if ((ScanBlockSize - TestChunkSize) < MaxBlockSizeLowThreshold) { break; } const uint32_t TestSequenceIndex = Lookup.ChunkSequenceLocations[Lookup.ChunkSequenceLocationOffset[TestChunkIndex]].SequenceIndex; if (ChunkSequenceIndex != TestSequenceIndex) { ChunkIndexOffset = ScanChunkIndexOffset + 1; break; } ScanBlockSize -= TestChunkSize; ScanChunkIndexOffset--; } std::vector ChunksInBlock; ChunksInBlock.reserve(ChunkIndexOffset - ChunkIndexStart); for (uint32_t AddIndexOffset = ChunkIndexStart; AddIndexOffset < ChunkIndexOffset; AddIndexOffset++) { const uint32_t AddChunkIndex = ChunkIndexes[AddIndexOffset]; ChunksInBlock.push_back(AddChunkIndex); } OutBlocks.emplace_back(std::move(ChunksInBlock)); BlockSize = 0; ChunkIndexStart = ChunkIndexOffset; } else { ChunkIndexOffset++; BlockSize += ChunkSize; } } if (ChunkIndexStart < ChunkIndexes.size()) { std::vector ChunksInBlock; ChunksInBlock.reserve(ChunkIndexes.size() - ChunkIndexStart); for (uint32_t AddIndexOffset = ChunkIndexStart; AddIndexOffset < ChunkIndexes.size(); AddIndexOffset++) { const uint32_t AddChunkIndex = ChunkIndexes[AddIndexOffset]; ChunksInBlock.push_back(AddChunkIndex); } OutBlocks.emplace_back(std::move(ChunksInBlock)); } } void BuildsOperationUploadFolder::GenerateBuildBlocks(const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, const std::vector>& NewBlockChunks, GeneratedBlocks& OutBlocks, GenerateBlocksStatistics& GenerateBlocksStats, UploadStatistics& UploadStats) { ZEN_TRACE_CPU("GenerateBuildBlocks"); const std::size_t NewBlockCount = NewBlockChunks.size(); if (NewBlockCount == 0) { return; } std::unique_ptr ProgressBar = m_Progress.CreateProgressBar("Generate Blocks"); OutBlocks.BlockDescriptions.resize(NewBlockCount); OutBlocks.BlockSizes.resize(NewBlockCount); OutBlocks.BlockMetaDatas.resize(NewBlockCount); OutBlocks.BlockHeaders.resize(NewBlockCount); OutBlocks.MetaDataHasBeenUploaded.resize(NewBlockCount, 0); OutBlocks.BlockHashToBlockIndex.reserve(NewBlockCount); RwLock Lock; FilteredRate FilteredGeneratedBytesPerSecond; FilteredRate FilteredUploadedBytesPerSecond; ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); std::atomic QueuedPendingBlocksForUpload = 0; GenerateBuildBlocksContext Context{.Work = Work, .GenerateBlobsPool = m_IOWorkerPool, .UploadBlocksPool = m_NetworkPool, .FilteredGeneratedBytesPerSecond = FilteredGeneratedBytesPerSecond, .FilteredUploadedBytesPerSecond = FilteredUploadedBytesPerSecond, .QueuedPendingBlocksForUpload = QueuedPendingBlocksForUpload, .Lock = Lock, .OutBlocks = OutBlocks, .GenerateBlocksStats = GenerateBlocksStats, .UploadStats = UploadStats, .NewBlockCount = NewBlockCount}; ScheduleBlockGeneration(Context, Content, Lookup, NewBlockChunks); Work.Wait(m_Progress.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { ZEN_UNUSED(PendingWork); FilteredGeneratedBytesPerSecond.Update(GenerateBlocksStats.GeneratedBlockByteCount.load()); FilteredUploadedBytesPerSecond.Update(UploadStats.BlocksBytes.load()); std::string Details = fmt::format("Generated {}/{} ({}, {}B/s). Uploaded {}/{} ({}, {}bits/s)", GenerateBlocksStats.GeneratedBlockCount.load(), NewBlockCount, NiceBytes(GenerateBlocksStats.GeneratedBlockByteCount.load()), NiceNum(FilteredGeneratedBytesPerSecond.GetCurrent()), UploadStats.BlockCount.load(), NewBlockCount, NiceBytes(UploadStats.BlocksBytes.load()), NiceNum(FilteredUploadedBytesPerSecond.GetCurrent() * 8)); ProgressBar->UpdateState({.Task = "Generating blocks", .Details = Details, .TotalCount = gsl::narrow(NewBlockCount), .RemainingCount = gsl::narrow(NewBlockCount - GenerateBlocksStats.GeneratedBlockCount.load()), .Status = ProgressBase::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); ZEN_ASSERT(m_AbortFlag || QueuedPendingBlocksForUpload.load() == 0); ProgressBar->Finish(); GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGeneratedBytesPerSecond.GetElapsedTimeUS(); UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS(); } void BuildsOperationUploadFolder::ScheduleBlockGeneration(GenerateBuildBlocksContext& Context, const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, const std::vector>& NewBlockChunks) { for (size_t BlockIndex = 0; BlockIndex < Context.NewBlockCount; BlockIndex++) { if (Context.Work.IsAborted()) { break; } const std::vector& ChunksInBlock = NewBlockChunks[BlockIndex]; Context.Work.ScheduleWork( Context.GenerateBlobsPool, [this, &Context, &Content, &Lookup, ChunksInBlock, BlockIndex](std::atomic&) { if (!m_AbortFlag) { ZEN_TRACE_CPU("GenerateBuildBlocks_Generate"); Context.FilteredGeneratedBytesPerSecond.Start(); Stopwatch GenerateTimer; CompressedBuffer CompressedBlock = GenerateBlock(Content, Lookup, ChunksInBlock, Context.OutBlocks.BlockDescriptions[BlockIndex]); if (m_Options.IsVerbose) { ZEN_INFO("Generated block {} ({}) containing {} chunks in {}", Context.OutBlocks.BlockDescriptions[BlockIndex].BlockHash, NiceBytes(CompressedBlock.GetCompressedSize()), Context.OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size(), NiceTimeSpanMs(GenerateTimer.GetElapsedTimeMs())); } Context.OutBlocks.BlockSizes[BlockIndex] = CompressedBlock.GetCompressedSize(); { CbObjectWriter Writer; Writer.AddString("createdBy", "zen"); Context.OutBlocks.BlockMetaDatas[BlockIndex] = Writer.Save(); } Context.GenerateBlocksStats.GeneratedBlockByteCount += Context.OutBlocks.BlockSizes[BlockIndex]; Context.GenerateBlocksStats.GeneratedBlockCount++; Context.Lock.WithExclusiveLock([&]() { Context.OutBlocks.BlockHashToBlockIndex.insert_or_assign(Context.OutBlocks.BlockDescriptions[BlockIndex].BlockHash, BlockIndex); }); { std::span Segments = CompressedBlock.GetCompressed().GetSegments(); ZEN_ASSERT(Segments.size() >= 2); Context.OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]); } if (Context.GenerateBlocksStats.GeneratedBlockCount == Context.NewBlockCount) { Context.FilteredGeneratedBytesPerSecond.Stop(); } if (Context.QueuedPendingBlocksForUpload.load() > 16) { std::span Segments = CompressedBlock.GetCompressed().GetSegments(); ZEN_ASSERT(Segments.size() >= 2); Context.OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]); } else { if (!m_AbortFlag) { Context.QueuedPendingBlocksForUpload++; Context.Work.ScheduleWork( Context.UploadBlocksPool, [this, &Context, BlockIndex, Payload = std::move(CompressedBlock)](std::atomic&) mutable { UploadGeneratedBlock(Context, BlockIndex, std::move(Payload)); }); } } } }); } } void BuildsOperationUploadFolder::UploadGeneratedBlock(GenerateBuildBlocksContext& Context, size_t BlockIndex, CompressedBuffer Payload) { auto _ = MakeGuard([&Context] { Context.QueuedPendingBlocksForUpload--; }); if (m_AbortFlag) { return; } if (Context.GenerateBlocksStats.GeneratedBlockCount == Context.NewBlockCount) { ZEN_TRACE_CPU("GenerateBuildBlocks_Save"); Context.FilteredUploadedBytesPerSecond.Stop(); std::span Segments = Payload.GetCompressed().GetSegments(); ZEN_ASSERT(Segments.size() >= 2); Context.OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]); return; } ZEN_TRACE_CPU("GenerateBuildBlocks_Upload"); Context.FilteredUploadedBytesPerSecond.Start(); const CbObject BlockMetaData = BuildChunkBlockDescription(Context.OutBlocks.BlockDescriptions[BlockIndex], Context.OutBlocks.BlockMetaDatas[BlockIndex]); const IoHash& BlockHash = Context.OutBlocks.BlockDescriptions[BlockIndex].BlockHash; const uint64_t CompressedBlockSize = Payload.GetCompressedSize(); if (m_Storage.CacheStorage && m_Options.PopulateCache) { m_Storage.CacheStorage->PutBuildBlob(m_BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload.GetCompressed()); } try { m_Storage.BuildStorage->PutBuildBlob(m_BuildId, BlockHash, ZenContentType::kCompressedBinary, std::move(Payload).GetCompressed()); } catch (const std::exception&) { // Silence http errors due to abort if (!m_AbortFlag) { throw; } } if (m_AbortFlag) { return; } Context.UploadStats.BlocksBytes += CompressedBlockSize; if (m_Options.IsVerbose) { ZEN_INFO("Uploaded block {} ({}) containing {} chunks", BlockHash, NiceBytes(CompressedBlockSize), Context.OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); } if (m_Storage.CacheStorage && m_Options.PopulateCache) { m_Storage.CacheStorage->PutBlobMetadatas(m_BuildId, std::vector({BlockHash}), std::vector({BlockMetaData})); } bool MetadataSucceeded = false; try { MetadataSucceeded = m_Storage.BuildStorage->PutBlockMetadata(m_BuildId, BlockHash, BlockMetaData); } catch (const std::exception&) { // Silence http errors due to abort if (!m_AbortFlag) { throw; } } if (m_AbortFlag) { return; } if (MetadataSucceeded) { if (m_Options.IsVerbose) { ZEN_INFO("Uploaded block {} metadata ({})", BlockHash, NiceBytes(BlockMetaData.GetSize())); } Context.OutBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; Context.UploadStats.BlocksBytes += BlockMetaData.GetSize(); } Context.UploadStats.BlockCount++; if (Context.UploadStats.BlockCount == Context.NewBlockCount) { Context.FilteredUploadedBytesPerSecond.Stop(); } } std::vector BuildsOperationUploadFolder::CalculateAbsoluteChunkOrders( const std::span LocalChunkHashes, const std::span LocalChunkOrder, const tsl::robin_map& ChunkHashToLocalChunkIndex, const std::span& LooseChunkIndexes, const std::span& BlockDescriptions) { ZEN_TRACE_CPU("CalculateAbsoluteChunkOrders"); std::vector TmpAbsoluteChunkHashes; if (m_Options.DoExtraContentValidation) { TmpAbsoluteChunkHashes.reserve(LocalChunkHashes.size()); } std::vector LocalChunkIndexToAbsoluteChunkIndex; LocalChunkIndexToAbsoluteChunkIndex.resize(LocalChunkHashes.size(), (uint32_t)-1); std::uint32_t AbsoluteChunkCount = 0; for (uint32_t ChunkIndex : LooseChunkIndexes) { LocalChunkIndexToAbsoluteChunkIndex[ChunkIndex] = AbsoluteChunkCount; if (m_Options.DoExtraContentValidation) { TmpAbsoluteChunkHashes.push_back(LocalChunkHashes[ChunkIndex]); } AbsoluteChunkCount++; } for (const ChunkBlockDescription& Block : BlockDescriptions) { for (const IoHash& ChunkHash : Block.ChunkRawHashes) { if (auto It = ChunkHashToLocalChunkIndex.find(ChunkHash); It != ChunkHashToLocalChunkIndex.end()) { const uint32_t LocalChunkIndex = It->second; ZEN_ASSERT_SLOW(LocalChunkHashes[LocalChunkIndex] == ChunkHash); LocalChunkIndexToAbsoluteChunkIndex[LocalChunkIndex] = AbsoluteChunkCount; } if (m_Options.DoExtraContentValidation) { TmpAbsoluteChunkHashes.push_back(ChunkHash); } AbsoluteChunkCount++; } } std::vector AbsoluteChunkOrder; AbsoluteChunkOrder.reserve(LocalChunkHashes.size()); for (const uint32_t LocalChunkIndex : LocalChunkOrder) { const uint32_t AbsoluteChunkIndex = LocalChunkIndexToAbsoluteChunkIndex[LocalChunkIndex]; if (m_Options.DoExtraContentValidation) { ZEN_ASSERT(LocalChunkHashes[LocalChunkIndex] == TmpAbsoluteChunkHashes[AbsoluteChunkIndex]); } AbsoluteChunkOrder.push_back(AbsoluteChunkIndex); } if (m_Options.DoExtraContentValidation) { uint32_t OrderIndex = 0; while (OrderIndex < LocalChunkOrder.size()) { const uint32_t LocalChunkIndex = LocalChunkOrder[OrderIndex]; const IoHash& LocalChunkHash = LocalChunkHashes[LocalChunkIndex]; const uint32_t AbsoluteChunkIndex = AbsoluteChunkOrder[OrderIndex]; const IoHash& AbsoluteChunkHash = TmpAbsoluteChunkHashes[AbsoluteChunkIndex]; ZEN_ASSERT(LocalChunkHash == AbsoluteChunkHash); OrderIndex++; } } return AbsoluteChunkOrder; } CompositeBuffer BuildsOperationUploadFolder::FetchChunk(const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, const IoHash& ChunkHash, ReadFileCache& OpenFileCache) { ZEN_TRACE_CPU("FetchChunk"); auto It = Lookup.ChunkHashToChunkIndex.find(ChunkHash); ZEN_ASSERT(It != Lookup.ChunkHashToChunkIndex.end()); uint32_t ChunkIndex = It->second; std::span ChunkLocations = GetChunkSequenceLocations(Lookup, ChunkIndex); ZEN_ASSERT(!ChunkLocations.empty()); CompositeBuffer Chunk = OpenFileCache.GetRange(ChunkLocations[0].SequenceIndex, ChunkLocations[0].Offset, Content.ChunkedContent.ChunkRawSizes[ChunkIndex]); if (!Chunk) { throw std::runtime_error(fmt::format("Unable to read chunk at {}, size {} from '{}'", ChunkLocations[0].Offset, Content.ChunkedContent.ChunkRawSizes[ChunkIndex], Content.Paths[Lookup.SequenceIndexFirstPathIndex[ChunkLocations[0].SequenceIndex]])); } ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == ChunkHash); return Chunk; }; CompressedBuffer BuildsOperationUploadFolder::GenerateBlock(const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, const std::vector& ChunksInBlock, ChunkBlockDescription& OutBlockDescription) { ZEN_TRACE_CPU("GenerateBlock"); ReadFileCache OpenFileCache(m_DiskStats.OpenReadCount, m_DiskStats.CurrentOpenFileCount, m_DiskStats.ReadCount, m_DiskStats.ReadByteCount, m_Path, Content, Lookup, 4); std::vector> BlockContent; BlockContent.reserve(ChunksInBlock.size()); for (uint32_t ChunkIndex : ChunksInBlock) { BlockContent.emplace_back(std::make_pair( Content.ChunkedContent.ChunkHashes[ChunkIndex], [this, &Content, &Lookup, &OpenFileCache, ChunkIndex](const IoHash& ChunkHash) -> std::pair { CompositeBuffer Chunk = FetchChunk(Content, Lookup, ChunkHash, OpenFileCache); ZEN_ASSERT(Chunk); uint64_t RawSize = Chunk.GetSize(); const bool ShouldCompressChunk = RawSize >= m_Options.MinimumSizeForCompressInBlock && IsChunkCompressable(m_NonCompressableExtensionHashes, Lookup, ChunkIndex); const OodleCompressionLevel CompressionLevel = ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None; return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid, CompressionLevel).GetCompressed()}; })); } return GenerateChunkBlock(std::move(BlockContent), OutBlockDescription); }; CompressedBuffer BuildsOperationUploadFolder::RebuildBlock(const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, CompositeBuffer&& HeaderBuffer, const std::vector& ChunksInBlock) { ZEN_TRACE_CPU("RebuildBlock"); ReadFileCache OpenFileCache(m_DiskStats.OpenReadCount, m_DiskStats.CurrentOpenFileCount, m_DiskStats.ReadCount, m_DiskStats.ReadByteCount, m_Path, Content, Lookup, 4); std::vector ResultBuffers; ResultBuffers.reserve(HeaderBuffer.GetSegments().size() + ChunksInBlock.size()); ResultBuffers.insert(ResultBuffers.end(), HeaderBuffer.GetSegments().begin(), HeaderBuffer.GetSegments().end()); for (uint32_t ChunkIndex : ChunksInBlock) { std::span ChunkLocations = GetChunkSequenceLocations(Lookup, ChunkIndex); ZEN_ASSERT(!ChunkLocations.empty()); CompositeBuffer Chunk = OpenFileCache.GetRange(ChunkLocations[0].SequenceIndex, ChunkLocations[0].Offset, Content.ChunkedContent.ChunkRawSizes[ChunkIndex]); ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == Content.ChunkedContent.ChunkHashes[ChunkIndex]); const uint64_t RawSize = Chunk.GetSize(); const bool ShouldCompressChunk = RawSize >= m_Options.MinimumSizeForCompressInBlock && IsChunkCompressable(m_NonCompressableExtensionHashes, Lookup, ChunkIndex); const OodleCompressionLevel CompressionLevel = ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None; CompositeBuffer CompressedChunk = CompressedBuffer::Compress(std::move(Chunk), OodleCompressor::Mermaid, CompressionLevel).GetCompressed(); ResultBuffers.insert(ResultBuffers.end(), CompressedChunk.GetSegments().begin(), CompressedChunk.GetSegments().end()); } return CompressedBuffer::FromCompressedNoValidate(CompositeBuffer(std::move(ResultBuffers))); }; void BuildsOperationUploadFolder::UploadBuildPart(ChunkingController& ChunkController, ChunkingCache& ChunkCache, uint32_t PartIndex, const UploadPart& Part, uint32_t PartStepOffset, uint32_t StepCount) { Stopwatch UploadTimer; ChunkingStatistics ChunkingStats; FindBlocksStatistics FindBlocksStats; ReuseBlocksStatistics ReuseBlocksStats; UploadStatistics UploadStats; GenerateBlocksStatistics GenerateBlocksStats; LooseChunksStatistics LooseChunksStats; m_Progress.SetLogOperationProgress(PartStepOffset + (uint32_t)PartTaskSteps::ChunkPartContent, StepCount); ChunkedFolderContent LocalContent = ScanPartContent(Part, ChunkController, ChunkCache, ChunkingStats); if (m_AbortFlag) { return; } const ChunkedContentLookup LocalLookup = BuildChunkedContentLookup(LocalContent); if (PartIndex == 0) { ConsumePrepareBuildResult(); } ZEN_ASSERT(m_PreferredMultipartChunkSize != 0); ZEN_ASSERT(m_LargeAttachmentSize != 0); m_Progress.SetLogOperationProgress(PartStepOffset + (uint32_t)PartTaskSteps::CalculateDelta, StepCount); Stopwatch BlockArrangeTimer; std::vector LooseChunkIndexes; std::vector NewBlockChunkIndexes; std::vector ReuseBlockIndexes; ClassifyChunksByBlockEligibility(LocalContent, LooseChunkIndexes, NewBlockChunkIndexes, ReuseBlockIndexes, LooseChunksStats, FindBlocksStats, ReuseBlocksStats); std::vector> NewBlockChunks; ArrangeChunksIntoBlocks(LocalContent, LocalLookup, NewBlockChunkIndexes, NewBlockChunks); FindBlocksStats.NewBlocksCount += NewBlockChunks.size(); for (uint32_t ChunkIndex : NewBlockChunkIndexes) { FindBlocksStats.NewBlocksChunkByteCount += LocalContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; } FindBlocksStats.NewBlocksChunkCount += NewBlockChunkIndexes.size(); const double AcceptedByteCountPercent = FindBlocksStats.PotentialChunkByteCount > 0 ? (100.0 * ReuseBlocksStats.AcceptedRawByteCount / FindBlocksStats.PotentialChunkByteCount) : 0.0; const double AcceptedReduntantByteCountPercent = ReuseBlocksStats.AcceptedByteCount > 0 ? (100.0 * ReuseBlocksStats.AcceptedReduntantByteCount) / (ReuseBlocksStats.AcceptedByteCount + ReuseBlocksStats.AcceptedReduntantByteCount) : 0.0; if (!m_Options.IsQuiet) { ZEN_INFO( "Found {} chunks in {} ({}) blocks eligible for reuse in {}\n" " Reusing {} ({}) matching chunks in {} blocks ({:.1f}%)\n" " Accepting {} ({}) redundant chunks ({:.1f}%)\n" " Rejected {} ({}) chunks in {} blocks\n" " Arranged {} ({}) chunks in {} new blocks\n" " Keeping {} ({}) chunks as loose chunks\n" " Discovery completed in {}", FindBlocksStats.FoundBlockChunkCount, FindBlocksStats.FoundBlockCount, NiceBytes(FindBlocksStats.FoundBlockByteCount), NiceTimeSpanMs(FindBlocksStats.FindBlockTimeMS), ReuseBlocksStats.AcceptedChunkCount, NiceBytes(ReuseBlocksStats.AcceptedRawByteCount), FindBlocksStats.AcceptedBlockCount, AcceptedByteCountPercent, ReuseBlocksStats.AcceptedReduntantChunkCount, NiceBytes(ReuseBlocksStats.AcceptedReduntantByteCount), AcceptedReduntantByteCountPercent, ReuseBlocksStats.RejectedChunkCount, NiceBytes(ReuseBlocksStats.RejectedByteCount), ReuseBlocksStats.RejectedBlockCount, FindBlocksStats.NewBlocksChunkCount, NiceBytes(FindBlocksStats.NewBlocksChunkByteCount), FindBlocksStats.NewBlocksCount, LooseChunksStats.ChunkCount, NiceBytes(LooseChunksStats.ChunkByteCount), NiceTimeSpanMs(BlockArrangeTimer.GetElapsedTimeMs())); } m_Progress.SetLogOperationProgress(PartStepOffset + (uint32_t)PartTaskSteps::GenerateBlocks, StepCount); GeneratedBlocks NewBlocks; if (!NewBlockChunks.empty()) { Stopwatch GenerateBuildBlocksTimer; auto __ = MakeGuard([&]() { uint64_t BlockGenerateTimeUs = GenerateBuildBlocksTimer.GetElapsedTimeUs(); if (!m_Options.IsQuiet) { ZEN_INFO("Generated {} ({}) and uploaded {} ({}) blocks in {}. Generate speed: {}B/sec. Transfer speed {}bits/sec.", GenerateBlocksStats.GeneratedBlockCount.load(), NiceBytes(GenerateBlocksStats.GeneratedBlockByteCount), UploadStats.BlockCount.load(), NiceBytes(UploadStats.BlocksBytes.load()), NiceTimeSpanMs(BlockGenerateTimeUs / 1000), NiceNum(GetBytesPerSecond(GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS, GenerateBlocksStats.GeneratedBlockByteCount)), NiceNum(GetBytesPerSecond(UploadStats.ElapsedWallTimeUS, UploadStats.BlocksBytes * 8))); } }); GenerateBuildBlocks(LocalContent, LocalLookup, NewBlockChunks, NewBlocks, GenerateBlocksStats, UploadStats); } m_Progress.SetLogOperationProgress(PartStepOffset + (uint32_t)PartTaskSteps::BuildPartManifest, StepCount); BuiltPartManifest Manifest = BuildPartManifestObject(LocalContent, LocalLookup, ChunkController, ReuseBlockIndexes, NewBlocks, LooseChunkIndexes); m_Progress.SetLogOperationProgress(PartStepOffset + (uint32_t)PartTaskSteps::UploadBuildPart, StepCount); Stopwatch PutBuildPartResultTimer; std::pair> PutBuildPartResult = m_Storage.BuildStorage->PutBuildPart(m_BuildId, Part.PartId, Part.PartName, Manifest.PartManifest); if (!m_Options.IsQuiet) { ZEN_INFO("PutBuildPart took {}, payload size {}. {} attachments are needed.", NiceTimeSpanMs(PutBuildPartResultTimer.GetElapsedTimeMs()), NiceBytes(Manifest.PartManifest.GetSize()), PutBuildPartResult.second.size()); } IoHash PartHash = PutBuildPartResult.first; m_Progress.SetLogOperationProgress(PartStepOffset + (uint32_t)PartTaskSteps::UploadAttachments, StepCount); std::vector UnknownChunks; if (m_Options.IgnoreExistingBlocks) { if (m_Options.IsVerbose) { ZEN_INFO("PutBuildPart uploading all attachments, needs are: {}", FormatArray(PutBuildPartResult.second, "\n "sv)); } std::vector ForceUploadChunkHashes; ForceUploadChunkHashes.reserve(LooseChunkIndexes.size()); for (uint32_t ChunkIndex : LooseChunkIndexes) { ForceUploadChunkHashes.push_back(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]); } for (size_t BlockIndex = 0; BlockIndex < NewBlocks.BlockHeaders.size(); BlockIndex++) { if (NewBlocks.BlockHeaders[BlockIndex]) { // Block was not uploaded during generation ForceUploadChunkHashes.push_back(NewBlocks.BlockDescriptions[BlockIndex].BlockHash); } } UploadAttachmentBatch(ForceUploadChunkHashes, UnknownChunks, LocalContent, LocalLookup, NewBlockChunks, NewBlocks, LooseChunkIndexes, UploadStats, LooseChunksStats); } else if (!PutBuildPartResult.second.empty()) { if (m_Options.IsVerbose) { ZEN_INFO("PutBuildPart needs attachments: {}", FormatArray(PutBuildPartResult.second, "\n "sv)); } UploadAttachmentBatch(PutBuildPartResult.second, UnknownChunks, LocalContent, LocalLookup, NewBlockChunks, NewBlocks, LooseChunkIndexes, UploadStats, LooseChunksStats); } FinalizeBuildPartWithRetries(Part, PartHash, UnknownChunks, LocalContent, LocalLookup, NewBlockChunks, NewBlocks, LooseChunkIndexes, UploadStats, LooseChunksStats); if (!NewBlocks.BlockDescriptions.empty() && !m_AbortFlag) { UploadMissingBlockMetadata(NewBlocks, UploadStats); // The newly generated blocks are now known blocks so the next part upload can use those blocks as well m_KnownBlocks.insert(m_KnownBlocks.end(), NewBlocks.BlockDescriptions.begin(), NewBlocks.BlockDescriptions.end()); } m_Progress.SetLogOperationProgress(PartStepOffset + (uint32_t)PartTaskSteps::PutBuildPartStats, StepCount); m_Storage.BuildStorage->PutBuildPartStats( m_BuildId, Part.PartId, {{"totalSize", double(Part.LocalFolderScanStats.FoundFileByteCount.load())}, {"reusedRatio", AcceptedByteCountPercent / 100.0}, {"reusedBlockCount", double(FindBlocksStats.AcceptedBlockCount)}, {"reusedBlockByteCount", double(ReuseBlocksStats.AcceptedRawByteCount)}, {"newBlockCount", double(FindBlocksStats.NewBlocksCount)}, {"newBlockByteCount", double(FindBlocksStats.NewBlocksChunkByteCount)}, {"uploadedCount", double(UploadStats.BlockCount.load() + UploadStats.ChunkCount.load())}, {"uploadedByteCount", double(UploadStats.BlocksBytes.load() + UploadStats.ChunksBytes.load())}, {"uploadedBytesPerSec", double(GetBytesPerSecond(UploadStats.ElapsedWallTimeUS, UploadStats.ChunksBytes + UploadStats.BlocksBytes))}, {"elapsedTimeSec", double(UploadTimer.GetElapsedTimeMs() / 1000.0)}}); m_LocalFolderScanStats += Part.LocalFolderScanStats; m_ChunkingStats += ChunkingStats; m_FindBlocksStats += FindBlocksStats; m_ReuseBlocksStats += ReuseBlocksStats; m_UploadStats += UploadStats; m_GenerateBlocksStats += GenerateBlocksStats; m_LooseChunksStats += LooseChunksStats; } ChunkedFolderContent BuildsOperationUploadFolder::ScanPartContent(const UploadPart& Part, ChunkingController& ChunkController, ChunkingCache& ChunkCache, ChunkingStatistics& ChunkingStats) { Stopwatch ScanTimer; std::unique_ptr ProgressBar = m_Progress.CreateProgressBar("Scan Folder"); FilteredRate FilteredBytesHashed; FilteredBytesHashed.Start(); ChunkedFolderContent LocalContent = ChunkFolderContent( ChunkingStats, m_IOWorkerPool, m_Path, Part.Content, ChunkController, ChunkCache, m_Progress.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t) { FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load()); std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found", ChunkingStats.FilesProcessed.load(), Part.Content.Paths.size(), NiceBytes(ChunkingStats.BytesHashed.load()), NiceBytes(Part.TotalRawSize), NiceNum(FilteredBytesHashed.GetCurrent()), ChunkingStats.UniqueChunksFound.load(), NiceBytes(ChunkingStats.UniqueBytesFound.load())); ProgressBar->UpdateState({.Task = "Scanning files ", .Details = Details, .TotalCount = Part.TotalRawSize, .RemainingCount = Part.TotalRawSize - ChunkingStats.BytesHashed.load(), .Status = ProgressBase::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }, m_AbortFlag, m_PauseFlag); FilteredBytesHashed.Stop(); ProgressBar->Finish(); if (m_AbortFlag) { return LocalContent; } if (!m_Options.IsQuiet) { ZEN_INFO("Found {} ({}) files divided into {} ({}) unique chunks in '{}' in {}. Average hash rate {}B/sec", Part.Content.Paths.size(), NiceBytes(Part.TotalRawSize), ChunkingStats.UniqueChunksFound.load(), NiceBytes(ChunkingStats.UniqueBytesFound.load()), m_Path, NiceTimeSpanMs(ScanTimer.GetElapsedTimeMs()), NiceNum(GetBytesPerSecond(ChunkingStats.ElapsedWallTimeUS, ChunkingStats.BytesHashed))); } return LocalContent; } void BuildsOperationUploadFolder::ConsumePrepareBuildResult() { const PrepareBuildResult PrepBuildResult = m_PrepBuildResultFuture.get(); m_FindBlocksStats.FindBlockTimeMS = PrepBuildResult.ElapsedTimeMs; m_FindBlocksStats.FoundBlockCount = PrepBuildResult.KnownBlocks.size(); if (!m_Options.IsQuiet) { ZEN_INFO("Build prepare took {}. {} took {}, payload size {}{}", NiceTimeSpanMs(PrepBuildResult.ElapsedTimeMs), m_CreateBuild ? "PutBuild" : "GetBuild", NiceTimeSpanMs(PrepBuildResult.PrepareBuildTimeMs), NiceBytes(PrepBuildResult.PayloadSize), m_Options.IgnoreExistingBlocks ? "" : fmt::format(". Found {} blocks in {}", PrepBuildResult.KnownBlocks.size(), NiceTimeSpanMs(PrepBuildResult.FindBlocksTimeMs))); } m_PreferredMultipartChunkSize = PrepBuildResult.PreferredMultipartChunkSize; m_LargeAttachmentSize = m_Options.AllowMultiparts ? m_PreferredMultipartChunkSize * 4u : (std::uint64_t)-1; m_KnownBlocks = std::move(PrepBuildResult.KnownBlocks); } void BuildsOperationUploadFolder::ClassifyChunksByBlockEligibility(const ChunkedFolderContent& LocalContent, std::vector& OutLooseChunkIndexes, std::vector& OutNewBlockChunkIndexes, std::vector& OutReuseBlockIndexes, LooseChunksStatistics& LooseChunksStats, FindBlocksStatistics& FindBlocksStats, ReuseBlocksStatistics& ReuseBlocksStats) { const bool EnableBlocks = true; std::vector BlockChunkIndexes; for (uint32_t ChunkIndex = 0; ChunkIndex < LocalContent.ChunkedContent.ChunkHashes.size(); ChunkIndex++) { const uint64_t ChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; if (!EnableBlocks || ChunkRawSize == 0 || ChunkRawSize > m_Options.BlockParameters.MaxChunkEmbedSize) { OutLooseChunkIndexes.push_back(ChunkIndex); LooseChunksStats.ChunkByteCount += ChunkRawSize; } else { BlockChunkIndexes.push_back(ChunkIndex); FindBlocksStats.PotentialChunkByteCount += ChunkRawSize; } } FindBlocksStats.PotentialChunkCount += BlockChunkIndexes.size(); LooseChunksStats.ChunkCount = OutLooseChunkIndexes.size(); if (m_Options.IgnoreExistingBlocks) { if (!m_Options.IsQuiet) { ZEN_INFO("Ignoring any existing blocks in store"); } OutNewBlockChunkIndexes = std::move(BlockChunkIndexes); return; } OutReuseBlockIndexes = FindReuseBlocks(Log(), m_Options.BlockReuseMinPercentLimit, m_Options.IsVerbose, ReuseBlocksStats, m_KnownBlocks, LocalContent.ChunkedContent.ChunkHashes, BlockChunkIndexes, OutNewBlockChunkIndexes); FindBlocksStats.AcceptedBlockCount += OutReuseBlockIndexes.size(); for (const ChunkBlockDescription& Description : m_KnownBlocks) { for (uint32_t ChunkRawLength : Description.ChunkRawLengths) { FindBlocksStats.FoundBlockByteCount += ChunkRawLength; } FindBlocksStats.FoundBlockChunkCount += Description.ChunkRawHashes.size(); } } BuildsOperationUploadFolder::BuiltPartManifest BuildsOperationUploadFolder::BuildPartManifestObject(const ChunkedFolderContent& LocalContent, const ChunkedContentLookup& LocalLookup, ChunkingController& ChunkController, std::span ReuseBlockIndexes, const GeneratedBlocks& NewBlocks, std::span LooseChunkIndexes) { BuiltPartManifest Result; CbObjectWriter PartManifestWriter; Stopwatch ManifestGenerationTimer; auto __ = MakeGuard([&]() { if (!m_Options.IsQuiet) { ZEN_INFO("Generated build part manifest in {} ({})", NiceTimeSpanMs(ManifestGenerationTimer.GetElapsedTimeMs()), NiceBytes(PartManifestWriter.GetSaveSize())); } }); PartManifestWriter.BeginObject("chunker"sv); { PartManifestWriter.AddString("name"sv, ChunkController.GetName()); PartManifestWriter.AddObject("parameters"sv, ChunkController.GetParameters()); } PartManifestWriter.EndObject(); // chunker Result.AllChunkBlockHashes.reserve(ReuseBlockIndexes.size() + NewBlocks.BlockDescriptions.size()); Result.AllChunkBlockDescriptions.reserve(ReuseBlockIndexes.size() + NewBlocks.BlockDescriptions.size()); for (size_t ReuseBlockIndex : ReuseBlockIndexes) { Result.AllChunkBlockDescriptions.push_back(m_KnownBlocks[ReuseBlockIndex]); Result.AllChunkBlockHashes.push_back(m_KnownBlocks[ReuseBlockIndex].BlockHash); } Result.AllChunkBlockDescriptions.insert(Result.AllChunkBlockDescriptions.end(), NewBlocks.BlockDescriptions.begin(), NewBlocks.BlockDescriptions.end()); for (const ChunkBlockDescription& BlockDescription : NewBlocks.BlockDescriptions) { Result.AllChunkBlockHashes.push_back(BlockDescription.BlockHash); } std::vector AbsoluteChunkHashes; if (m_Options.DoExtraContentValidation) { tsl::robin_map ChunkHashToAbsoluteChunkIndex; AbsoluteChunkHashes.reserve(LocalContent.ChunkedContent.ChunkHashes.size()); for (uint32_t ChunkIndex : LooseChunkIndexes) { ChunkHashToAbsoluteChunkIndex.insert({LocalContent.ChunkedContent.ChunkHashes[ChunkIndex], AbsoluteChunkHashes.size()}); AbsoluteChunkHashes.push_back(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]); } for (const ChunkBlockDescription& Block : Result.AllChunkBlockDescriptions) { for (const IoHash& ChunkHash : Block.ChunkRawHashes) { ChunkHashToAbsoluteChunkIndex.insert({ChunkHash, AbsoluteChunkHashes.size()}); AbsoluteChunkHashes.push_back(ChunkHash); } } for (const IoHash& ChunkHash : LocalContent.ChunkedContent.ChunkHashes) { ZEN_ASSERT(AbsoluteChunkHashes[ChunkHashToAbsoluteChunkIndex.at(ChunkHash)] == ChunkHash); ZEN_ASSERT(LocalContent.ChunkedContent.ChunkHashes[LocalLookup.ChunkHashToChunkIndex.at(ChunkHash)] == ChunkHash); } for (const uint32_t ChunkIndex : LocalContent.ChunkedContent.ChunkOrders) { ZEN_ASSERT(AbsoluteChunkHashes[ChunkHashToAbsoluteChunkIndex.at(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex])] == LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]); ZEN_ASSERT(LocalLookup.ChunkHashToChunkIndex.at(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]) == ChunkIndex); } } std::vector AbsoluteChunkOrders = CalculateAbsoluteChunkOrders(LocalContent.ChunkedContent.ChunkHashes, LocalContent.ChunkedContent.ChunkOrders, LocalLookup.ChunkHashToChunkIndex, LooseChunkIndexes, Result.AllChunkBlockDescriptions); if (m_Options.DoExtraContentValidation) { for (uint32_t ChunkOrderIndex = 0; ChunkOrderIndex < LocalContent.ChunkedContent.ChunkOrders.size(); ChunkOrderIndex++) { uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[ChunkOrderIndex]; uint32_t AbsoluteChunkIndex = AbsoluteChunkOrders[ChunkOrderIndex]; const IoHash& LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; const IoHash& AbsoluteChunkHash = AbsoluteChunkHashes[AbsoluteChunkIndex]; ZEN_ASSERT(LocalChunkHash == AbsoluteChunkHash); } } WriteBuildContentToCompactBinary(PartManifestWriter, LocalContent.Platform, LocalContent.Paths, LocalContent.RawHashes, LocalContent.RawSizes, LocalContent.Attributes, LocalContent.ChunkedContent.SequenceRawHashes, LocalContent.ChunkedContent.ChunkCounts, LocalContent.ChunkedContent.ChunkHashes, LocalContent.ChunkedContent.ChunkRawSizes, AbsoluteChunkOrders, LooseChunkIndexes, Result.AllChunkBlockHashes); if (m_Options.DoExtraContentValidation) { ChunkedFolderContent VerifyFolderContent; std::vector OutAbsoluteChunkOrders; std::vector OutLooseChunkHashes; std::vector OutLooseChunkRawSizes; std::vector OutBlockRawHashes; ReadBuildContentFromCompactBinary(PartManifestWriter.Save(), VerifyFolderContent.Platform, VerifyFolderContent.Paths, VerifyFolderContent.RawHashes, VerifyFolderContent.RawSizes, VerifyFolderContent.Attributes, VerifyFolderContent.ChunkedContent.SequenceRawHashes, VerifyFolderContent.ChunkedContent.ChunkCounts, OutAbsoluteChunkOrders, OutLooseChunkHashes, OutLooseChunkRawSizes, OutBlockRawHashes); ZEN_ASSERT(OutBlockRawHashes == Result.AllChunkBlockHashes); for (uint32_t OrderIndex = 0; OrderIndex < OutAbsoluteChunkOrders.size(); OrderIndex++) { uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[OrderIndex]; const IoHash LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; uint32_t VerifyChunkIndex = OutAbsoluteChunkOrders[OrderIndex]; const IoHash VerifyChunkHash = AbsoluteChunkHashes[VerifyChunkIndex]; ZEN_ASSERT(LocalChunkHash == VerifyChunkHash); } CalculateLocalChunkOrders(OutAbsoluteChunkOrders, OutLooseChunkHashes, OutLooseChunkRawSizes, Result.AllChunkBlockDescriptions, VerifyFolderContent.ChunkedContent.ChunkHashes, VerifyFolderContent.ChunkedContent.ChunkRawSizes, VerifyFolderContent.ChunkedContent.ChunkOrders, m_Options.DoExtraContentValidation); ZEN_ASSERT(LocalContent.Paths == VerifyFolderContent.Paths); ZEN_ASSERT(LocalContent.RawHashes == VerifyFolderContent.RawHashes); ZEN_ASSERT(LocalContent.RawSizes == VerifyFolderContent.RawSizes); ZEN_ASSERT(LocalContent.Attributes == VerifyFolderContent.Attributes); ZEN_ASSERT(LocalContent.ChunkedContent.SequenceRawHashes == VerifyFolderContent.ChunkedContent.SequenceRawHashes); ZEN_ASSERT(LocalContent.ChunkedContent.ChunkCounts == VerifyFolderContent.ChunkedContent.ChunkCounts); for (uint32_t OrderIndex = 0; OrderIndex < LocalContent.ChunkedContent.ChunkOrders.size(); OrderIndex++) { uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[OrderIndex]; const IoHash LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; uint64_t LocalChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[LocalChunkIndex]; uint32_t VerifyChunkIndex = VerifyFolderContent.ChunkedContent.ChunkOrders[OrderIndex]; const IoHash VerifyChunkHash = VerifyFolderContent.ChunkedContent.ChunkHashes[VerifyChunkIndex]; uint64_t VerifyChunkRawSize = VerifyFolderContent.ChunkedContent.ChunkRawSizes[VerifyChunkIndex]; ZEN_ASSERT(LocalChunkHash == VerifyChunkHash); ZEN_ASSERT(LocalChunkRawSize == VerifyChunkRawSize); } } Result.PartManifest = PartManifestWriter.Save(); return Result; } void BuildsOperationUploadFolder::UploadAttachmentBatch(std::span RawHashes, std::vector& OutUnknownChunks, const ChunkedFolderContent& LocalContent, const ChunkedContentLookup& LocalLookup, const std::vector>& NewBlockChunks, GeneratedBlocks& NewBlocks, std::span LooseChunkIndexes, UploadStatistics& UploadStats, LooseChunksStatistics& LooseChunksStats) { if (m_AbortFlag) { return; } UploadStatistics TempUploadStats; LooseChunksStatistics TempLooseChunksStats; Stopwatch TempUploadTimer; auto __ = MakeGuard([&]() { if (!m_Options.IsQuiet) { uint64_t TempChunkUploadTimeUs = TempUploadTimer.GetElapsedTimeUs(); ZEN_INFO( "Uploaded {} ({}) blocks. " "Compressed {} ({} {}B/s) and uploaded {} ({}) chunks. " "Transferred {} ({}bits/s) in {}", TempUploadStats.BlockCount.load(), NiceBytes(TempUploadStats.BlocksBytes), TempLooseChunksStats.CompressedChunkCount.load(), NiceBytes(TempLooseChunksStats.CompressedChunkBytes.load()), NiceNum(GetBytesPerSecond(TempLooseChunksStats.CompressChunksElapsedWallTimeUS, TempLooseChunksStats.ChunkByteCount)), TempUploadStats.ChunkCount.load(), NiceBytes(TempUploadStats.ChunksBytes), NiceBytes(TempUploadStats.BlocksBytes + TempUploadStats.ChunksBytes), NiceNum(GetBytesPerSecond(TempUploadStats.ElapsedWallTimeUS, TempUploadStats.ChunksBytes * 8)), NiceTimeSpanMs(TempChunkUploadTimeUs / 1000)); } }); UploadPartBlobs(LocalContent, LocalLookup, RawHashes, NewBlockChunks, NewBlocks, LooseChunkIndexes, m_LargeAttachmentSize, TempUploadStats, TempLooseChunksStats, OutUnknownChunks); UploadStats += TempUploadStats; LooseChunksStats += TempLooseChunksStats; } void BuildsOperationUploadFolder::FinalizeBuildPartWithRetries(const UploadPart& Part, const IoHash& PartHash, std::vector& InOutUnknownChunks, const ChunkedFolderContent& LocalContent, const ChunkedContentLookup& LocalLookup, const std::vector>& NewBlockChunks, GeneratedBlocks& NewBlocks, std::span LooseChunkIndexes, UploadStatistics& UploadStats, LooseChunksStatistics& LooseChunksStats) { auto BuildUnkownChunksResponse = [](const std::vector& UnknownChunks, bool WillRetry) { return fmt::format( "The following build blobs was reported as needed for upload but was reported as existing at the start of the " "operation.{}{}", WillRetry ? " Treating this as a transient inconsistency issue and will attempt to retry finalization."sv : ""sv, FormatArray(UnknownChunks, "\n "sv)); }; if (!InOutUnknownChunks.empty()) { ZEN_WARN("{}", BuildUnkownChunksResponse(InOutUnknownChunks, /*WillRetry*/ true)); } uint32_t FinalizeBuildPartRetryCount = 5; while (!m_AbortFlag && (FinalizeBuildPartRetryCount--) > 0) { Stopwatch FinalizeBuildPartTimer; std::vector Needs = m_Storage.BuildStorage->FinalizeBuildPart(m_BuildId, Part.PartId, PartHash); if (!m_Options.IsQuiet) { ZEN_INFO("FinalizeBuildPart took {}. {} attachments are missing.", NiceTimeSpanMs(FinalizeBuildPartTimer.GetElapsedTimeMs()), Needs.size()); } if (Needs.empty()) { break; } if (m_Options.IsVerbose) { ZEN_INFO("FinalizeBuildPart needs attachments: {}", FormatArray(Needs, "\n "sv)); } std::vector RetryUnknownChunks; UploadAttachmentBatch(Needs, RetryUnknownChunks, LocalContent, LocalLookup, NewBlockChunks, NewBlocks, LooseChunkIndexes, UploadStats, LooseChunksStats); if (RetryUnknownChunks == InOutUnknownChunks) { if (FinalizeBuildPartRetryCount > 0) { // Back off a bit Sleep(1000); } } else { InOutUnknownChunks = RetryUnknownChunks; ZEN_WARN("{}", BuildUnkownChunksResponse(InOutUnknownChunks, /*WillRetry*/ FinalizeBuildPartRetryCount != 0)); } } if (!InOutUnknownChunks.empty()) { throw std::runtime_error(BuildUnkownChunksResponse(InOutUnknownChunks, /*WillRetry*/ false)); } } void BuildsOperationUploadFolder::UploadMissingBlockMetadata(GeneratedBlocks& NewBlocks, UploadStatistics& UploadStats) { uint64_t UploadBlockMetadataCount = 0; Stopwatch UploadBlockMetadataTimer; uint32_t FailedMetadataUploadCount = 1; int32_t MetadataUploadRetryCount = 3; while ((MetadataUploadRetryCount-- > 0) && (FailedMetadataUploadCount > 0)) { FailedMetadataUploadCount = 0; for (size_t BlockIndex = 0; BlockIndex < NewBlocks.BlockDescriptions.size(); BlockIndex++) { if (m_AbortFlag) { break; } const IoHash& BlockHash = NewBlocks.BlockDescriptions[BlockIndex].BlockHash; if (!NewBlocks.MetaDataHasBeenUploaded[BlockIndex]) { const CbObject BlockMetaData = BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]); if (m_Storage.CacheStorage && m_Options.PopulateCache) { m_Storage.CacheStorage->PutBlobMetadatas(m_BuildId, std::vector({BlockHash}), std::vector({BlockMetaData})); } bool MetadataSucceeded = m_Storage.BuildStorage->PutBlockMetadata(m_BuildId, BlockHash, BlockMetaData); if (MetadataSucceeded) { UploadStats.BlocksBytes += BlockMetaData.GetSize(); NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; UploadBlockMetadataCount++; } else { FailedMetadataUploadCount++; } } } } if (UploadBlockMetadataCount > 0) { uint64_t ElapsedUS = UploadBlockMetadataTimer.GetElapsedTimeUs(); UploadStats.ElapsedWallTimeUS += ElapsedUS; if (!m_Options.IsQuiet) { ZEN_INFO("Uploaded metadata for {} blocks in {}", UploadBlockMetadataCount, NiceTimeSpanMs(ElapsedUS / 1000)); } } } void BuildsOperationUploadFolder::UploadPartBlobs(const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, std::span RawHashes, const std::vector>& NewBlockChunks, GeneratedBlocks& NewBlocks, std::span LooseChunkIndexes, const std::uint64_t LargeAttachmentSize, UploadStatistics& TempUploadStats, LooseChunksStatistics& TempLooseChunksStats, std::vector& OutUnknownChunks) { ZEN_TRACE_CPU("UploadPartBlobs"); UploadPartClassification Classification = ClassifyUploadRawHashes(RawHashes, Content, Lookup, NewBlocks, LooseChunkIndexes, OutUnknownChunks); if (Classification.BlockIndexes.empty() && Classification.LooseChunkOrderIndexes.empty()) { return; } std::unique_ptr ProgressBar = m_Progress.CreateProgressBar("Upload Blobs"); FilteredRate FilteredGenerateBlockBytesPerSecond; FilteredRate FilteredCompressedBytesPerSecond; FilteredRate FilteredUploadedBytesPerSecond; ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); std::atomic UploadedBlockSize = 0; std::atomic UploadedBlockCount = 0; std::atomic UploadedRawChunkSize = 0; std::atomic UploadedCompressedChunkSize = 0; std::atomic UploadedChunkCount = 0; std::atomic GeneratedBlockCount = 0; std::atomic GeneratedBlockByteCount = 0; std::atomic QueuedPendingInMemoryBlocksForUpload = 0; const size_t UploadBlockCount = Classification.BlockIndexes.size(); const uint32_t UploadChunkCount = gsl::narrow(Classification.LooseChunkOrderIndexes.size()); const uint64_t TotalRawSize = Classification.TotalLooseChunksSize + Classification.TotalBlocksSize; UploadPartBlobsContext Context{.Work = Work, .ReadChunkPool = m_IOWorkerPool, .UploadChunkPool = m_NetworkPool, .FilteredGenerateBlockBytesPerSecond = FilteredGenerateBlockBytesPerSecond, .FilteredCompressedBytesPerSecond = FilteredCompressedBytesPerSecond, .FilteredUploadedBytesPerSecond = FilteredUploadedBytesPerSecond, .UploadedBlockSize = UploadedBlockSize, .UploadedBlockCount = UploadedBlockCount, .UploadedRawChunkSize = UploadedRawChunkSize, .UploadedCompressedChunkSize = UploadedCompressedChunkSize, .UploadedChunkCount = UploadedChunkCount, .GeneratedBlockCount = GeneratedBlockCount, .GeneratedBlockByteCount = GeneratedBlockByteCount, .QueuedPendingInMemoryBlocksForUpload = QueuedPendingInMemoryBlocksForUpload, .UploadBlockCount = UploadBlockCount, .UploadChunkCount = UploadChunkCount, .LargeAttachmentSize = LargeAttachmentSize, .NewBlocks = NewBlocks, .Content = Content, .Lookup = Lookup, .NewBlockChunks = NewBlockChunks, .LooseChunkIndexes = LooseChunkIndexes, .TempUploadStats = TempUploadStats, .TempLooseChunksStats = TempLooseChunksStats}; ScheduleBlockGenerationAndUpload(Context, Classification.BlockIndexes); ScheduleLooseChunkCompressionAndUpload(Context, Classification.LooseChunkOrderIndexes); Work.Wait(m_Progress.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { ZEN_UNUSED(PendingWork); FilteredCompressedBytesPerSecond.Update(TempLooseChunksStats.CompressedChunkRawBytes.load()); FilteredGenerateBlockBytesPerSecond.Update(GeneratedBlockByteCount.load()); FilteredUploadedBytesPerSecond.Update(UploadedCompressedChunkSize.load() + UploadedBlockSize.load()); uint64_t UploadedRawSize = UploadedRawChunkSize.load() + UploadedBlockSize.load(); uint64_t UploadedCompressedSize = UploadedCompressedChunkSize.load() + UploadedBlockSize.load(); std::string Details = fmt::format( "Compressed {}/{} ({}/{}{}) chunks. " "Uploaded {}/{} ({}/{}) blobs " "({}{})", TempLooseChunksStats.CompressedChunkCount.load(), Classification.LooseChunkOrderIndexes.size(), NiceBytes(TempLooseChunksStats.CompressedChunkRawBytes), NiceBytes(Classification.TotalLooseChunksSize), (TempLooseChunksStats.CompressedChunkCount == Classification.LooseChunkOrderIndexes.size()) ? "" : fmt::format(" {}B/s", NiceNum(FilteredCompressedBytesPerSecond.GetCurrent())), UploadedBlockCount.load() + UploadedChunkCount.load(), UploadBlockCount + UploadChunkCount, NiceBytes(UploadedRawSize), NiceBytes(TotalRawSize), NiceBytes(UploadedCompressedSize), (UploadedBlockCount == UploadBlockCount && UploadedChunkCount == UploadChunkCount) ? "" : fmt::format(" {}bits/s", NiceNum(FilteredUploadedBytesPerSecond.GetCurrent()))); ProgressBar->UpdateState({.Task = "Uploading blobs ", .Details = Details, .TotalCount = gsl::narrow(TotalRawSize), .RemainingCount = gsl::narrow(TotalRawSize - UploadedRawSize), .Status = ProgressBase::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); ZEN_ASSERT(m_AbortFlag || QueuedPendingInMemoryBlocksForUpload.load() == 0); ProgressBar->Finish(); TempUploadStats.ElapsedWallTimeUS += FilteredUploadedBytesPerSecond.GetElapsedTimeUS(); TempLooseChunksStats.CompressChunksElapsedWallTimeUS += FilteredCompressedBytesPerSecond.GetElapsedTimeUS(); } BuildsOperationUploadFolder::UploadPartClassification BuildsOperationUploadFolder::ClassifyUploadRawHashes(std::span RawHashes, const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, const GeneratedBlocks& NewBlocks, std::span LooseChunkIndexes, std::vector& OutUnknownChunks) { UploadPartClassification Result; tsl::robin_map ChunkIndexToLooseChunkOrderIndex; ChunkIndexToLooseChunkOrderIndex.reserve(LooseChunkIndexes.size()); for (uint32_t OrderIndex = 0; OrderIndex < LooseChunkIndexes.size(); OrderIndex++) { ChunkIndexToLooseChunkOrderIndex.insert_or_assign(LooseChunkIndexes[OrderIndex], OrderIndex); } for (const IoHash& RawHash : RawHashes) { if (auto It = NewBlocks.BlockHashToBlockIndex.find(RawHash); It != NewBlocks.BlockHashToBlockIndex.end()) { Result.BlockIndexes.push_back(It->second); Result.TotalBlocksSize += NewBlocks.BlockSizes[It->second]; } else if (auto ChunkIndexIt = Lookup.ChunkHashToChunkIndex.find(RawHash); ChunkIndexIt != Lookup.ChunkHashToChunkIndex.end()) { const uint32_t ChunkIndex = ChunkIndexIt->second; if (auto LooseOrderIndexIt = ChunkIndexToLooseChunkOrderIndex.find(ChunkIndex); LooseOrderIndexIt != ChunkIndexToLooseChunkOrderIndex.end()) { Result.LooseChunkOrderIndexes.push_back(LooseOrderIndexIt->second); Result.TotalLooseChunksSize += Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; } } else { OutUnknownChunks.push_back(RawHash); } } return Result; } void BuildsOperationUploadFolder::ScheduleBlockGenerationAndUpload(UploadPartBlobsContext& Context, std::span BlockIndexes) { for (const size_t BlockIndex : BlockIndexes) { const IoHash& BlockHash = Context.NewBlocks.BlockDescriptions[BlockIndex].BlockHash; if (m_AbortFlag) { break; } Context.Work.ScheduleWork( Context.ReadChunkPool, [this, &Context, BlockHash = IoHash(BlockHash), BlockIndex, GenerateBlockCount = BlockIndexes.size()](std::atomic&) { if (m_AbortFlag) { return; } ZEN_TRACE_CPU("UploadPartBlobs_GenerateBlock"); Context.FilteredGenerateBlockBytesPerSecond.Start(); Stopwatch GenerateTimer; CompositeBuffer Payload; if (Context.NewBlocks.BlockHeaders[BlockIndex]) { Payload = RebuildBlock(Context.Content, Context.Lookup, std::move(Context.NewBlocks.BlockHeaders[BlockIndex]), Context.NewBlockChunks[BlockIndex]) .GetCompressed(); } else { ChunkBlockDescription BlockDescription; CompressedBuffer CompressedBlock = GenerateBlock(Context.Content, Context.Lookup, Context.NewBlockChunks[BlockIndex], BlockDescription); if (!CompressedBlock) { throw std::runtime_error(fmt::format("Failed generating block {}", BlockHash)); } ZEN_ASSERT(BlockDescription.BlockHash == BlockHash); Payload = std::move(CompressedBlock).GetCompressed(); } Context.GeneratedBlockByteCount += Context.NewBlocks.BlockSizes[BlockIndex]; if (Context.GeneratedBlockCount.fetch_add(1) + 1 == GenerateBlockCount) { Context.FilteredGenerateBlockBytesPerSecond.Stop(); } if (m_Options.IsVerbose) { ZEN_INFO("{} block {} ({}) containing {} chunks in {}", Context.NewBlocks.BlockHeaders[BlockIndex] ? "Regenerated" : "Generated", Context.NewBlocks.BlockDescriptions[BlockIndex].BlockHash, NiceBytes(Context.NewBlocks.BlockSizes[BlockIndex]), Context.NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size(), NiceTimeSpanMs(GenerateTimer.GetElapsedTimeMs())); } if (!m_AbortFlag) { UploadBlockPayload(Context, BlockIndex, BlockHash, std::move(Payload)); } }); } } void BuildsOperationUploadFolder::UploadBlockPayload(UploadPartBlobsContext& Context, size_t BlockIndex, const IoHash& BlockHash, CompositeBuffer Payload) { bool IsInMemoryBlock = true; if (Context.QueuedPendingInMemoryBlocksForUpload.load() > 16) { ZEN_TRACE_CPU("AsyncUploadBlock_WriteTempBlock"); std::filesystem::path TempFilePath = m_Options.TempDir / (BlockHash.ToHexString()); Payload = CompositeBuffer(WriteToTempFile(std::move(Payload), TempFilePath)); IsInMemoryBlock = false; } else { Context.QueuedPendingInMemoryBlocksForUpload++; } Context.Work.ScheduleWork( Context.UploadChunkPool, [this, &Context, IsInMemoryBlock, BlockIndex, BlockHash = IoHash(BlockHash), Payload = CompositeBuffer(std::move(Payload))]( std::atomic&) { auto _ = MakeGuard([IsInMemoryBlock, &Context] { if (IsInMemoryBlock) { Context.QueuedPendingInMemoryBlocksForUpload--; } }); if (m_AbortFlag) { return; } ZEN_TRACE_CPU("AsyncUploadBlock"); const uint64_t PayloadSize = Payload.GetSize(); Context.FilteredUploadedBytesPerSecond.Start(); const CbObject BlockMetaData = BuildChunkBlockDescription(Context.NewBlocks.BlockDescriptions[BlockIndex], Context.NewBlocks.BlockMetaDatas[BlockIndex]); if (m_Storage.CacheStorage && m_Options.PopulateCache) { m_Storage.CacheStorage->PutBuildBlob(m_BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload); } try { m_Storage.BuildStorage->PutBuildBlob(m_BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload); } catch (const std::exception&) { // Silence http errors due to abort if (!m_AbortFlag) { throw; } } if (m_AbortFlag) { return; } if (m_Options.IsVerbose) { ZEN_INFO("Uploaded block {} ({}) containing {} chunks", BlockHash, NiceBytes(PayloadSize), Context.NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); } Context.UploadedBlockSize += PayloadSize; Context.TempUploadStats.BlocksBytes += PayloadSize; if (m_Storage.CacheStorage && m_Options.PopulateCache) { m_Storage.CacheStorage->PutBlobMetadatas(m_BuildId, std::vector({BlockHash}), std::vector({BlockMetaData})); } bool MetadataSucceeded = false; try { MetadataSucceeded = m_Storage.BuildStorage->PutBlockMetadata(m_BuildId, BlockHash, BlockMetaData); } catch (const std::exception&) { // Silence http errors due to abort if (!m_AbortFlag) { throw; } } if (m_AbortFlag) { return; } if (MetadataSucceeded) { if (m_Options.IsVerbose) { ZEN_INFO("Uploaded block {} metadata ({})", BlockHash, NiceBytes(BlockMetaData.GetSize())); } Context.NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; Context.TempUploadStats.BlocksBytes += BlockMetaData.GetSize(); } Context.TempUploadStats.BlockCount++; if (Context.UploadedBlockCount.fetch_add(1) + 1 == Context.UploadBlockCount && Context.UploadedChunkCount == Context.UploadChunkCount) { Context.FilteredUploadedBytesPerSecond.Stop(); } }); } void BuildsOperationUploadFolder::ScheduleLooseChunkCompressionAndUpload(UploadPartBlobsContext& Context, std::span LooseChunkOrderIndexes) { for (const uint32_t LooseChunkOrderIndex : LooseChunkOrderIndexes) { const uint32_t ChunkIndex = Context.LooseChunkIndexes[LooseChunkOrderIndex]; Context.Work.ScheduleWork(Context.ReadChunkPool, [this, &Context, LooseChunkOrderCount = LooseChunkOrderIndexes.size(), ChunkIndex](std::atomic&) { if (m_AbortFlag) { return; } ZEN_TRACE_CPU("UploadPartBlobs_CompressChunk"); Context.FilteredCompressedBytesPerSecond.Start(); Stopwatch CompressTimer; CompositeBuffer Payload = CompressChunk(Context.Content, Context.Lookup, ChunkIndex, Context.TempLooseChunksStats); if (m_Options.IsVerbose) { ZEN_INFO("Compressed chunk {} ({} -> {}) in {}", Context.Content.ChunkedContent.ChunkHashes[ChunkIndex], NiceBytes(Context.Content.ChunkedContent.ChunkRawSizes[ChunkIndex]), NiceBytes(Payload.GetSize()), NiceTimeSpanMs(CompressTimer.GetElapsedTimeMs())); } const uint64_t ChunkRawSize = Context.Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; Context.TempUploadStats.ReadFromDiskBytes += ChunkRawSize; if (Context.TempLooseChunksStats.CompressedChunkCount == LooseChunkOrderCount) { Context.FilteredCompressedBytesPerSecond.Stop(); } if (!m_AbortFlag) { UploadLooseChunkPayload(Context, Context.Content.ChunkedContent.ChunkHashes[ChunkIndex], ChunkRawSize, std::move(Payload)); } }); } } void BuildsOperationUploadFolder::UploadLooseChunkPayload(UploadPartBlobsContext& Context, const IoHash& RawHash, uint64_t RawSize, CompositeBuffer Payload) { Context.Work.ScheduleWork( Context.UploadChunkPool, [this, &Context, RawHash = IoHash(RawHash), RawSize, Payload = CompositeBuffer(std::move(Payload))](std::atomic&) mutable { if (m_AbortFlag) { return; } ZEN_TRACE_CPU("AsyncUploadLooseChunk"); const uint64_t PayloadSize = Payload.GetSize(); if (m_Storage.CacheStorage && m_Options.PopulateCache) { m_Storage.CacheStorage->PutBuildBlob(m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); } if (PayloadSize >= Context.LargeAttachmentSize) { ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart"); Context.TempUploadStats.MultipartAttachmentCount++; try { std::vector> MultipartWork = m_Storage.BuildStorage->PutLargeBuildBlob( m_BuildId, RawHash, ZenContentType::kCompressedBinary, PayloadSize, [Payload = std::move(Payload), &Context](uint64_t Offset, uint64_t Size) -> IoBuffer { Context.FilteredUploadedBytesPerSecond.Start(); IoBuffer PartPayload = Payload.Mid(Offset, Size).Flatten().AsIoBuffer(); PartPayload.SetContentType(ZenContentType::kBinary); return PartPayload; }, [&Context, RawSize](uint64_t SentBytes, bool IsComplete) { Context.TempUploadStats.ChunksBytes += SentBytes; Context.UploadedCompressedChunkSize += SentBytes; if (IsComplete) { Context.TempUploadStats.ChunkCount++; if (Context.UploadedChunkCount.fetch_add(1) + 1 == Context.UploadChunkCount && Context.UploadedBlockCount == Context.UploadBlockCount) { Context.FilteredUploadedBytesPerSecond.Stop(); } Context.UploadedRawChunkSize += RawSize; } }); for (auto& WorkPart : MultipartWork) { Context.Work.ScheduleWork(Context.UploadChunkPool, [Work = std::move(WorkPart)](std::atomic& AbortFlag) { ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart_Work"); if (!AbortFlag) { Work(); } }); } if (m_Options.IsVerbose) { ZEN_INFO("Uploaded multipart chunk {} ({})", RawHash, NiceBytes(PayloadSize)); } } catch (const std::exception&) { // Silence http errors due to abort if (!m_AbortFlag) { throw; } } return; } ZEN_TRACE_CPU("AsyncUploadLooseChunk_Singlepart"); try { m_Storage.BuildStorage->PutBuildBlob(m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); } catch (const std::exception&) { // Silence http errors due to abort if (!m_AbortFlag) { throw; } } if (m_AbortFlag) { return; } if (m_Options.IsVerbose) { ZEN_INFO("Uploaded chunk {} ({})", RawHash, NiceBytes(PayloadSize)); } Context.TempUploadStats.ChunksBytes += Payload.GetSize(); Context.TempUploadStats.ChunkCount++; Context.UploadedCompressedChunkSize += Payload.GetSize(); Context.UploadedRawChunkSize += RawSize; if (Context.UploadedChunkCount.fetch_add(1) + 1 == Context.UploadChunkCount && Context.UploadedBlockCount == Context.UploadBlockCount) { Context.FilteredUploadedBytesPerSecond.Stop(); } }); } CompositeBuffer BuildsOperationUploadFolder::CompressChunk(const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, uint32_t ChunkIndex, LooseChunksStatistics& TempLooseChunksStats) { ZEN_TRACE_CPU("CompressChunk"); ZEN_ASSERT(!m_Options.TempDir.empty()); const IoHash& ChunkHash = Content.ChunkedContent.ChunkHashes[ChunkIndex]; const uint64_t ChunkSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; const ChunkedContentLookup::ChunkSequenceLocation& Source = GetChunkSequenceLocations(Lookup, ChunkIndex)[0]; const std::uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[Source.SequenceIndex]; IoBuffer RawSource = IoBufferBuilder::MakeFromFile((m_Path / Content.Paths[PathIndex]).make_preferred(), Source.Offset, ChunkSize); if (!RawSource) { throw std::runtime_error(fmt::format("Failed fetching chunk {}", ChunkHash)); } if (RawSource.GetSize() != ChunkSize) { throw std::runtime_error(fmt::format("Fetched chunk {} has invalid size", ChunkHash)); } const bool ShouldCompressChunk = IsChunkCompressable(m_NonCompressableExtensionHashes, Lookup, ChunkIndex); const OodleCompressionLevel CompressionLevel = ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None; if (ShouldCompressChunk) { std::filesystem::path TempFilePath = m_Options.TempDir / ChunkHash.ToHexString(); BasicFile CompressedFile; std::error_code Ec; CompressedFile.Open(TempFilePath, BasicFile::Mode::kTruncateDelete, Ec); if (Ec) { throw std::runtime_error(fmt::format("Failed creating temporary file for compressing blob {}, reason: ({}) {}", ChunkHash, Ec.value(), Ec.message())); } uint64_t StreamRawBytes = 0; uint64_t StreamCompressedBytes = 0; bool CouldCompress = CompressedBuffer::CompressToStream( CompositeBuffer(SharedBuffer(RawSource)), [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { ZEN_UNUSED(SourceOffset); TempLooseChunksStats.CompressedChunkRawBytes += SourceSize; CompressedFile.Write(RangeBuffer, Offset); TempLooseChunksStats.CompressedChunkBytes += RangeBuffer.GetSize(); StreamRawBytes += SourceSize; StreamCompressedBytes += RangeBuffer.GetSize(); }, OodleCompressor::Mermaid, CompressionLevel); if (CouldCompress) { uint64_t CompressedSize = CompressedFile.FileSize(); void* FileHandle = CompressedFile.Detach(); IoBuffer TempPayload = IoBuffer(IoBuffer::File, FileHandle, 0, CompressedSize, /*IsWholeFile*/ true); ZEN_ASSERT(TempPayload); TempPayload.SetDeleteOnClose(true); IoHash RawHash; uint64_t RawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(TempPayload), RawHash, RawSize); ZEN_ASSERT(Compressed); ZEN_ASSERT(RawHash == ChunkHash); ZEN_ASSERT(RawSize == ChunkSize); TempLooseChunksStats.CompressedChunkCount++; return Compressed.GetCompressed(); } else { TempLooseChunksStats.CompressedChunkRawBytes -= StreamRawBytes; TempLooseChunksStats.CompressedChunkBytes -= StreamCompressedBytes; } CompressedFile.Close(); RemoveFile(TempFilePath, Ec); ZEN_UNUSED(Ec); } CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(RawSource)), OodleCompressor::Mermaid, CompressionLevel); if (!CompressedBlob) { throw std::runtime_error(fmt::format("Failed to compress large blob {}", ChunkHash)); } ZEN_ASSERT_SLOW(CompressedBlob.DecodeRawHash() == ChunkHash); ZEN_ASSERT_SLOW(CompressedBlob.DecodeRawSize() == ChunkSize); TempLooseChunksStats.CompressedChunkRawBytes += ChunkSize; TempLooseChunksStats.CompressedChunkBytes += CompressedBlob.GetCompressedSize(); // If we use none-compression, the compressed blob references the data and has 64 kb in memory so we don't need to write it to disk if (ShouldCompressChunk) { std::filesystem::path TempFilePath = m_Options.TempDir / (ChunkHash.ToHexString()); IoBuffer TempPayload = WriteToTempFile(std::move(CompressedBlob).GetCompressed(), TempFilePath); CompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(TempPayload)); } TempLooseChunksStats.CompressedChunkCount++; return std::move(CompressedBlob).GetCompressed(); } std::vector> UploadFolder(LoggerRef Log, ProgressBase& Progress, TransferThreadWorkers& Workers, StorageInstance& Storage, std::atomic& AbortFlag, std::atomic& PauseFlag, const Oid& BuildId, const Oid& BuildPartId, std::string_view BuildPartName, const std::filesystem::path& Path, const std::filesystem::path& ManifestPath, const CbObject& MetaData, ChunkingController& ChunkController, ChunkingCache& ChunkCache, const UploadFolderOptions& Options) { Progress.SetLogOperationName("Upload Folder"); Stopwatch UploadTimer; BuildsOperationUploadFolder UploadOp( Log, Progress, Storage, AbortFlag, PauseFlag, Workers.GetIOWorkerPool(), Workers.GetNetworkPool(), BuildId, Path, Options.CreateBuild, std::move(MetaData), BuildsOperationUploadFolder::Options{.IsQuiet = Options.IsQuiet, .IsVerbose = Options.IsVerbose, .DoExtraContentValidation = Options.DoExtraContentVerify, .FindBlockMaxCount = Options.FindBlockMaxCount, .BlockReuseMinPercentLimit = Options.BlockReuseMinPercentLimit, .AllowMultiparts = Options.AllowMultiparts, .IgnoreExistingBlocks = Options.IgnoreExistingBlocks, .TempDir = Options.TempDir, .ExcludeFolders = Options.ExcludeFolders, .ExcludeExtensions = Options.ExcludeExtensions, .NonCompressableExtensions = DefaultSplitOnlyExtensions, .PopulateCache = Options.UploadToZenCache}); std::vector> UploadedParts = UploadOp.Execute(BuildPartId, BuildPartName, ManifestPath, ChunkController, ChunkCache); if (AbortFlag) { return {}; } if (Options.IsVerbose) { ZEN_CONSOLE( "Folder scanning stats:" "\n FoundFileCount: {}" "\n FoundFileByteCount: {}" "\n AcceptedFileCount: {}" "\n AcceptedFileByteCount: {}" "\n ElapsedWallTimeUS: {}", UploadOp.m_LocalFolderScanStats.FoundFileCount.load(), NiceBytes(UploadOp.m_LocalFolderScanStats.FoundFileByteCount.load()), UploadOp.m_LocalFolderScanStats.AcceptedFileCount.load(), NiceBytes(UploadOp.m_LocalFolderScanStats.AcceptedFileByteCount.load()), NiceLatencyNs(UploadOp.m_LocalFolderScanStats.ElapsedWallTimeUS * 1000)); ZEN_CONSOLE( "Chunking stats:" "\n FilesProcessed: {}" "\n FilesChunked: {}" "\n BytesHashed: {}" "\n UniqueChunksFound: {}" "\n UniqueSequencesFound: {}" "\n UniqueBytesFound: {}" "\n FilesFoundInCache: {}" "\n ChunksFoundInCache: {}" "\n FilesStoredInCache: {}" "\n ChunksStoredInCache: {}" "\n ElapsedWallTimeUS: {}", UploadOp.m_ChunkingStats.FilesProcessed.load(), UploadOp.m_ChunkingStats.FilesChunked.load(), NiceBytes(UploadOp.m_ChunkingStats.BytesHashed.load()), UploadOp.m_ChunkingStats.UniqueChunksFound.load(), UploadOp.m_ChunkingStats.UniqueSequencesFound.load(), NiceBytes(UploadOp.m_ChunkingStats.UniqueBytesFound.load()), UploadOp.m_ChunkingStats.FilesFoundInCache.load(), UploadOp.m_ChunkingStats.ChunksFoundInCache.load(), NiceBytes(UploadOp.m_ChunkingStats.BytesFoundInCache.load()), UploadOp.m_ChunkingStats.FilesStoredInCache.load(), UploadOp.m_ChunkingStats.ChunksStoredInCache.load(), NiceBytes(UploadOp.m_ChunkingStats.BytesStoredInCache.load()), NiceLatencyNs(UploadOp.m_ChunkingStats.ElapsedWallTimeUS * 1000)); ZEN_CONSOLE( "Find block stats:" "\n FindBlockTimeMS: {}" "\n PotentialChunkCount: {}" "\n PotentialChunkByteCount: {}" "\n FoundBlockCount: {}" "\n FoundBlockChunkCount: {}" "\n FoundBlockByteCount: {}" "\n AcceptedBlockCount: {}" "\n NewBlocksCount: {}" "\n NewBlocksChunkCount: {}" "\n NewBlocksChunkByteCount: {}", NiceTimeSpanMs(UploadOp.m_FindBlocksStats.FindBlockTimeMS), UploadOp.m_FindBlocksStats.PotentialChunkCount, NiceBytes(UploadOp.m_FindBlocksStats.PotentialChunkByteCount), UploadOp.m_FindBlocksStats.FoundBlockCount, UploadOp.m_FindBlocksStats.FoundBlockChunkCount, NiceBytes(UploadOp.m_FindBlocksStats.FoundBlockByteCount), UploadOp.m_FindBlocksStats.AcceptedBlockCount, UploadOp.m_FindBlocksStats.NewBlocksCount, UploadOp.m_FindBlocksStats.NewBlocksChunkCount, NiceBytes(UploadOp.m_FindBlocksStats.NewBlocksChunkByteCount)); ZEN_CONSOLE( "Reuse block stats:" "\n AcceptedChunkCount: {}" "\n AcceptedByteCount: {}" "\n AcceptedRawByteCount: {}" "\n RejectedBlockCount: {}" "\n RejectedChunkCount: {}" "\n RejectedByteCount: {}" "\n AcceptedReduntantChunkCount: {}" "\n AcceptedReduntantByteCount: {}", UploadOp.m_ReuseBlocksStats.AcceptedChunkCount, NiceBytes(UploadOp.m_ReuseBlocksStats.AcceptedByteCount), NiceBytes(UploadOp.m_ReuseBlocksStats.AcceptedRawByteCount), UploadOp.m_ReuseBlocksStats.RejectedBlockCount, UploadOp.m_ReuseBlocksStats.RejectedChunkCount, NiceBytes(UploadOp.m_ReuseBlocksStats.RejectedByteCount), UploadOp.m_ReuseBlocksStats.AcceptedReduntantChunkCount, NiceBytes(UploadOp.m_ReuseBlocksStats.AcceptedReduntantByteCount)); ZEN_CONSOLE( "Generate blocks stats:" "\n GeneratedBlockByteCount: {}" "\n GeneratedBlockCount: {}" "\n GenerateBlocksElapsedWallTimeUS: {}", NiceBytes(UploadOp.m_GenerateBlocksStats.GeneratedBlockByteCount.load()), UploadOp.m_GenerateBlocksStats.GeneratedBlockCount.load(), NiceLatencyNs(UploadOp.m_GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS * 1000)); ZEN_CONSOLE( "Loose chunks stats:" "\n ChunkCount: {}" "\n ChunkByteCount: {}" "\n CompressedChunkCount: {}" "\n CompressChunksElapsedWallTimeUS: {}", UploadOp.m_LooseChunksStats.ChunkCount, NiceBytes(UploadOp.m_LooseChunksStats.ChunkByteCount), UploadOp.m_LooseChunksStats.CompressedChunkCount.load(), NiceBytes(UploadOp.m_LooseChunksStats.CompressedChunkBytes.load()), NiceLatencyNs(UploadOp.m_LooseChunksStats.CompressChunksElapsedWallTimeUS * 1000)); ZEN_CONSOLE( "Disk stats:" "\n OpenReadCount: {}" "\n OpenWriteCount: {}" "\n ReadCount: {}" "\n ReadByteCount: {}" "\n WriteCount: {} ({} cloned)" "\n WriteByteCount: {} ({} cloned)" "\n CurrentOpenFileCount: {}", UploadOp.m_DiskStats.OpenReadCount.load(), UploadOp.m_DiskStats.OpenWriteCount.load(), UploadOp.m_DiskStats.ReadCount.load(), NiceBytes(UploadOp.m_DiskStats.ReadByteCount.load()), UploadOp.m_DiskStats.WriteCount.load(), UploadOp.m_DiskStats.CloneCount.load(), NiceBytes(UploadOp.m_DiskStats.WriteByteCount.load()), NiceBytes(UploadOp.m_DiskStats.CloneByteCount.load()), UploadOp.m_DiskStats.CurrentOpenFileCount.load()); ZEN_CONSOLE( "Upload stats:" "\n BlockCount: {}" "\n BlocksBytes: {}" "\n ChunkCount: {}" "\n ChunksBytes: {}" "\n ReadFromDiskBytes: {}" "\n MultipartAttachmentCount: {}" "\n ElapsedWallTimeUS: {}", UploadOp.m_UploadStats.BlockCount.load(), NiceBytes(UploadOp.m_UploadStats.BlocksBytes.load()), UploadOp.m_UploadStats.ChunkCount.load(), NiceBytes(UploadOp.m_UploadStats.ChunksBytes.load()), NiceBytes(UploadOp.m_UploadStats.ReadFromDiskBytes.load()), UploadOp.m_UploadStats.MultipartAttachmentCount.load(), NiceLatencyNs(UploadOp.m_UploadStats.ElapsedWallTimeUS * 1000)); } const double DeltaByteCountPercent = UploadOp.m_ChunkingStats.BytesHashed > 0 ? (100.0 * (UploadOp.m_FindBlocksStats.NewBlocksChunkByteCount + UploadOp.m_LooseChunksStats.CompressedChunkBytes)) / (UploadOp.m_ChunkingStats.BytesHashed) : 0.0; const std::string MultipartAttachmentStats = Options.AllowMultiparts ? fmt::format(" ({} as multipart)", UploadOp.m_UploadStats.MultipartAttachmentCount.load()) : ""; if (!Options.IsQuiet) { ZEN_CONSOLE( "Uploaded part {} ('{}') to build {}, {}\n" " Scanned files: {:>8} ({}), {}B/sec, {}\n" " New data: {:>8} ({}) {:.1f}%\n" " New blocks: {:>8} ({} -> {}), {}B/sec, {}\n" " New chunks: {:>8} ({} -> {}), {}B/sec, {}\n" " Uploaded: {:>8} ({}), {}bits/sec, {}\n" " Blocks: {:>8} ({})\n" " Chunks: {:>8} ({}){}", BuildPartId, BuildPartName, BuildId, NiceTimeSpanMs(UploadTimer.GetElapsedTimeMs()), UploadOp.m_LocalFolderScanStats.FoundFileCount.load(), NiceBytes(UploadOp.m_LocalFolderScanStats.FoundFileByteCount.load()), NiceNum(GetBytesPerSecond(UploadOp.m_ChunkingStats.ElapsedWallTimeUS, UploadOp.m_ChunkingStats.BytesHashed)), NiceTimeSpanMs(UploadOp.m_ChunkingStats.ElapsedWallTimeUS / 1000), UploadOp.m_FindBlocksStats.NewBlocksChunkCount + UploadOp.m_LooseChunksStats.CompressedChunkCount, NiceBytes(UploadOp.m_FindBlocksStats.NewBlocksChunkByteCount + UploadOp.m_LooseChunksStats.CompressedChunkBytes), DeltaByteCountPercent, UploadOp.m_GenerateBlocksStats.GeneratedBlockCount.load(), NiceBytes(UploadOp.m_FindBlocksStats.NewBlocksChunkByteCount), NiceBytes(UploadOp.m_GenerateBlocksStats.GeneratedBlockByteCount.load()), NiceNum(GetBytesPerSecond(UploadOp.m_GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS, UploadOp.m_GenerateBlocksStats.GeneratedBlockByteCount)), NiceTimeSpanMs(UploadOp.m_GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS / 1000), UploadOp.m_LooseChunksStats.CompressedChunkCount.load(), NiceBytes(UploadOp.m_LooseChunksStats.CompressedChunkRawBytes), NiceBytes(UploadOp.m_LooseChunksStats.CompressedChunkBytes.load()), NiceNum(GetBytesPerSecond(UploadOp.m_LooseChunksStats.CompressChunksElapsedWallTimeUS, UploadOp.m_LooseChunksStats.CompressedChunkRawBytes)), NiceTimeSpanMs(UploadOp.m_LooseChunksStats.CompressChunksElapsedWallTimeUS / 1000), UploadOp.m_UploadStats.BlockCount.load() + UploadOp.m_UploadStats.ChunkCount.load(), NiceBytes(UploadOp.m_UploadStats.BlocksBytes + UploadOp.m_UploadStats.ChunksBytes), NiceNum(GetBytesPerSecond(UploadOp.m_UploadStats.ElapsedWallTimeUS, (UploadOp.m_UploadStats.ChunksBytes + UploadOp.m_UploadStats.BlocksBytes) * 8)), NiceTimeSpanMs(UploadOp.m_UploadStats.ElapsedWallTimeUS / 1000), UploadOp.m_UploadStats.BlockCount.load(), NiceBytes(UploadOp.m_UploadStats.BlocksBytes.load()), UploadOp.m_UploadStats.ChunkCount.load(), NiceBytes(UploadOp.m_UploadStats.ChunksBytes.load()), MultipartAttachmentStats); } return UploadedParts; } } // namespace zen