diff options
| author | Dan Engelbrecht <[email protected]> | 2025-10-22 11:08:39 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-22 11:08:39 +0200 |
| commit | ab9face3992228d1bf632a79bce1240d61eb431f (patch) | |
| tree | ac3100761b50f7a9d652884d2603aad5fe0da859 /src | |
| parent | fix error log when using TryCloneFile on Mac/Linux (#597) (diff) | |
| download | zen-ab9face3992228d1bf632a79bce1240d61eb431f.tar.xz zen-ab9face3992228d1bf632a79bce1240d61eb431f.zip | |
make validation of completed sequences in builds download optional (#596)
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 1 | ||||
| -rw-r--r-- | src/zenremotestore/builds/buildstorageoperations.cpp | 5100 | ||||
| -rw-r--r-- | src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h | 3 |
3 files changed, 2577 insertions, 2527 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index 9c6fd17ab..d220eefb2 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -2143,6 +2143,7 @@ namespace { .PrimeCacheOnly = Options.PrimeCacheOnly, .EnableOtherDownloadsScavenging = Options.EnableOtherDownloadsScavenging, .EnableTargetFolderScavenging = Options.EnableTargetFolderScavenging, + .ValidateCompletedSequences = Options.PostDownloadVerify, .ExcludeFolders = DefaultExcludeFolders, .ExcludeExtensions = DefaultExcludeExtensions}); { diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp index ebb876ed9..c9f142d7a 100644 --- a/src/zenremotestore/builds/buildstorageoperations.cpp +++ b/src/zenremotestore/builds/buildstorageoperations.cpp @@ -402,95 +402,6 @@ MakeBufferMemoryBased(const CompositeBuffer& PartialBlockBuffer) } } -void -StreamDecompress(std::atomic<bool>& AbortFlag, - const std::filesystem::path& CacheFolderPath, - const IoHash& SequenceRawHash, - CompositeBuffer&& CompressedPart, - std::atomic<uint64_t>& ReadCount, - std::atomic<uint64_t>& ReadByteCount, - std::atomic<uint64_t>& WriteCount, - std::atomic<uint64_t>& WriteByteCount, - std::atomic<uint64_t>& WrittenChunkByteCount, - std::atomic<uint64_t>& ValidatedChunkByteCount) -{ - ZEN_TRACE_CPU("StreamDecompress"); - const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash); - TemporaryFile DecompressedTemp; - std::error_code Ec; - DecompressedTemp.CreateTemporary(TempChunkSequenceFileName.parent_path(), Ec); - if (Ec) - { - throw std::runtime_error(fmt::format("Failed creating temporary file for decompressing large blob {}, reason: ({}) {}", - SequenceRawHash, - Ec.value(), - Ec.message())); - } - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompressedPart, RawHash, RawSize); - if (!Compressed) - { - throw std::runtime_error(fmt::format("Failed to parse header of compressed large blob {}", SequenceRawHash)); - } - if (RawHash != SequenceRawHash) - { - throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, SequenceRawHash)); - } - PrepareFileForScatteredWrite(DecompressedTemp.Handle(), RawSize); - - IoHashStream Hash; - bool CouldDecompress = - Compressed.DecompressToStream(0, - (uint64_t)-1, - [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { - ZEN_UNUSED(SourceOffset); - ZEN_TRACE_CPU("StreamDecompress_Write"); - ReadCount++; - ReadByteCount += SourceSize; - if (!AbortFlag) - { - for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) - { - Hash.Append(Segment.GetView()); - ValidatedChunkByteCount += Segment.GetSize(); - DecompressedTemp.Write(Segment, Offset); - Offset += Segment.GetSize(); - WriteByteCount += Segment.GetSize(); - WriteCount++; - WrittenChunkByteCount += Segment.GetSize(); - } - return true; - } - return false; - }); - - if (AbortFlag) - { - return; - } - - if (!CouldDecompress) - { - throw std::runtime_error(fmt::format("Failed to decompress large blob {}", SequenceRawHash)); - } - const IoHash VerifyHash = Hash.GetHash(); - if (VerifyHash != SequenceRawHash) - { - throw std::runtime_error( - fmt::format("Decompressed blob payload hash {} does not match expected hash {}", VerifyHash, SequenceRawHash)); - } - DecompressedTemp.MoveTemporaryIntoPlace(TempChunkSequenceFileName, Ec); - if (Ec) - { - throw std::runtime_error(fmt::format("Failed moving temporary file for decompressing large blob {}, reason: ({}) {}", - SequenceRawHash, - Ec.value(), - Ec.message())); - } - // WriteChunkStats.ChunkCountWritten++; -} - class FilteredRate { public: @@ -645,346 +556,259 @@ void BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) { ZEN_TRACE_CPU("BuildsOperationUpdateFolder::Execute"); - - enum TaskSteps : uint32_t + try { - ScanExistingData, - WriteChunks, - PrepareTarget, - FinalizeTarget, - Cleanup, - StepCount - }; + enum TaskSteps : uint32_t + { + ScanExistingData, + WriteChunks, + PrepareTarget, + FinalizeTarget, + Cleanup, + StepCount + }; - auto EndProgress = MakeGuard([&]() { m_LogOutput.SetLogOperationProgress(TaskSteps::StepCount, TaskSteps::StepCount); }); + auto EndProgress = MakeGuard([&]() { m_LogOutput.SetLogOperationProgress(TaskSteps::StepCount, TaskSteps::StepCount); }); - ZEN_ASSERT((!m_Options.PrimeCacheOnly) || - (m_Options.PrimeCacheOnly && (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Off))); + ZEN_ASSERT((!m_Options.PrimeCacheOnly) || + (m_Options.PrimeCacheOnly && (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Off))); - m_LogOutput.SetLogOperationProgress(TaskSteps::ScanExistingData, TaskSteps::StepCount); + m_LogOutput.SetLogOperationProgress(TaskSteps::ScanExistingData, TaskSteps::StepCount); - CreateDirectories(m_CacheFolderPath); - CreateDirectories(m_TempDownloadFolderPath); - CreateDirectories(m_TempBlockFolderPath); + CreateDirectories(m_CacheFolderPath); + CreateDirectories(m_TempDownloadFolderPath); + CreateDirectories(m_TempBlockFolderPath); - Stopwatch IndexTimer; + Stopwatch IndexTimer; - if (!m_Options.IsQuiet) - { - LOG_OUTPUT(m_LogOutput, "Indexed local and remote content in {}", NiceTimeSpanMs(IndexTimer.GetElapsedTimeMs())); - } + if (!m_Options.IsQuiet) + { + LOG_OUTPUT(m_LogOutput, "Indexed local and remote content in {}", NiceTimeSpanMs(IndexTimer.GetElapsedTimeMs())); + } - Stopwatch CacheMappingTimer; + Stopwatch CacheMappingTimer; - std::vector<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters(m_RemoteContent.ChunkedContent.SequenceRawHashes.size()); - std::vector<bool> RemoteChunkIndexNeedsCopyFromLocalFileFlags(m_RemoteContent.ChunkedContent.ChunkHashes.size()); - std::vector<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags(m_RemoteContent.ChunkedContent.ChunkHashes.size()); + std::vector<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters(m_RemoteContent.ChunkedContent.SequenceRawHashes.size()); + std::vector<bool> RemoteChunkIndexNeedsCopyFromLocalFileFlags(m_RemoteContent.ChunkedContent.ChunkHashes.size()); + std::vector<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags(m_RemoteContent.ChunkedContent.ChunkHashes.size()); - tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedChunkHashesFound; - tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedSequenceHashesFound; - if (!m_Options.PrimeCacheOnly) - { - ScanCacheFolder(CachedChunkHashesFound, CachedSequenceHashesFound); - } + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedChunkHashesFound; + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedSequenceHashesFound; + if (!m_Options.PrimeCacheOnly) + { + ScanCacheFolder(CachedChunkHashesFound, CachedSequenceHashesFound); + } - tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedBlocksFound; - if (!m_Options.PrimeCacheOnly) - { - ScanTempBlocksFolder(CachedBlocksFound); - } + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedBlocksFound; + if (!m_Options.PrimeCacheOnly) + { + ScanTempBlocksFolder(CachedBlocksFound); + } - tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> SequenceIndexesLeftToFindToRemoteIndex; + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> SequenceIndexesLeftToFindToRemoteIndex; - if (!m_Options.PrimeCacheOnly && m_Options.EnableTargetFolderScavenging) - { - // Pick up all whole files we can use from current local state - ZEN_TRACE_CPU("GetLocalSequences"); + if (!m_Options.PrimeCacheOnly && m_Options.EnableTargetFolderScavenging) + { + // Pick up all whole files we can use from current local state + ZEN_TRACE_CPU("GetLocalSequences"); - Stopwatch LocalTimer; + Stopwatch LocalTimer; - std::vector<uint32_t> MissingSequenceIndexes = ScanTargetFolder(CachedChunkHashesFound, CachedSequenceHashesFound); + std::vector<uint32_t> MissingSequenceIndexes = ScanTargetFolder(CachedChunkHashesFound, CachedSequenceHashesFound); - for (uint32_t RemoteSequenceIndex : MissingSequenceIndexes) - { - // We must write the sequence - const uint32_t ChunkCount = m_RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]; - const IoHash& RemoteSequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; - SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = ChunkCount; - SequenceIndexesLeftToFindToRemoteIndex.insert({RemoteSequenceRawHash, RemoteSequenceIndex}); + for (uint32_t RemoteSequenceIndex : MissingSequenceIndexes) + { + // We must write the sequence + const uint32_t ChunkCount = m_RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]; + const IoHash& RemoteSequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = ChunkCount; + SequenceIndexesLeftToFindToRemoteIndex.insert({RemoteSequenceRawHash, RemoteSequenceIndex}); + } } - } - else - { - for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < m_RemoteContent.ChunkedContent.SequenceRawHashes.size(); - RemoteSequenceIndex++) + else { - const uint32_t ChunkCount = m_RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]; - SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = ChunkCount; + for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < m_RemoteContent.ChunkedContent.SequenceRawHashes.size(); + RemoteSequenceIndex++) + { + const uint32_t ChunkCount = m_RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]; + SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = ChunkCount; + } } - } - - std::vector<ChunkedFolderContent> ScavengedContents; - std::vector<ChunkedContentLookup> ScavengedLookups; - std::vector<std::filesystem::path> ScavengedPaths; - std::vector<ScavengedSequenceCopyOperation> ScavengedSequenceCopyOperations; - uint64_t ScavengedPathsCount = 0; - - if (!m_Options.PrimeCacheOnly && m_Options.EnableOtherDownloadsScavenging) - { - ZEN_TRACE_CPU("GetScavengedSequences"); + std::vector<ChunkedFolderContent> ScavengedContents; + std::vector<ChunkedContentLookup> ScavengedLookups; + std::vector<std::filesystem::path> ScavengedPaths; - Stopwatch ScavengeTimer; + std::vector<ScavengedSequenceCopyOperation> ScavengedSequenceCopyOperations; + uint64_t ScavengedPathsCount = 0; - if (!SequenceIndexesLeftToFindToRemoteIndex.empty()) + if (!m_Options.PrimeCacheOnly && m_Options.EnableOtherDownloadsScavenging) { - std::vector<ScavengeSource> ScavengeSources = FindScavengeSources(); + ZEN_TRACE_CPU("GetScavengedSequences"); - const size_t ScavengePathCount = ScavengeSources.size(); + Stopwatch ScavengeTimer; - ScavengedContents.resize(ScavengePathCount); - ScavengedLookups.resize(ScavengePathCount); - ScavengedPaths.resize(ScavengePathCount); + if (!SequenceIndexesLeftToFindToRemoteIndex.empty()) + { + std::vector<ScavengeSource> ScavengeSources = FindScavengeSources(); - std::unique_ptr<BuildOpLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Scavenging")); - BuildOpLogOutput::ProgressBar& ScavengeProgressBar(*ProgressBarPtr); + const size_t ScavengePathCount = ScavengeSources.size(); - ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + ScavengedContents.resize(ScavengePathCount); + ScavengedLookups.resize(ScavengePathCount); + ScavengedPaths.resize(ScavengePathCount); - std::atomic<uint64_t> PathsFound(0); - std::atomic<uint64_t> ChunksFound(0); - std::atomic<uint64_t> PathsScavenged(0); + std::unique_ptr<BuildOpLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Scavenging")); + BuildOpLogOutput::ProgressBar& ScavengeProgressBar(*ProgressBarPtr); - for (size_t ScavengeIndex = 0; ScavengeIndex < ScavengePathCount; ScavengeIndex++) - { - Work.ScheduleWork(m_IOWorkerPool, - [this, - &ScavengeSources, - &ScavengedContents, - &ScavengedPaths, - &ScavengedLookups, - &PathsFound, - &ChunksFound, - &PathsScavenged, - ScavengeIndex](std::atomic<bool>&) { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Async_FindScavengeContent"); + ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - const ScavengeSource& Source = ScavengeSources[ScavengeIndex]; - ChunkedFolderContent& ScavengedLocalContent = ScavengedContents[ScavengeIndex]; - ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengeIndex]; + std::atomic<uint64_t> PathsFound(0); + std::atomic<uint64_t> ChunksFound(0); + std::atomic<uint64_t> PathsScavenged(0); - if (FindScavengeContent(Source, ScavengedLocalContent, ScavengedLookup)) - { - ScavengedPaths[ScavengeIndex] = Source.Path; - PathsFound += ScavengedLocalContent.Paths.size(); - ChunksFound += ScavengedLocalContent.ChunkedContent.ChunkHashes.size(); - } - else + for (size_t ScavengeIndex = 0; ScavengeIndex < ScavengePathCount; ScavengeIndex++) + { + Work.ScheduleWork(m_IOWorkerPool, + [this, + &ScavengeSources, + &ScavengedContents, + &ScavengedPaths, + &ScavengedLookups, + &PathsFound, + &ChunksFound, + &PathsScavenged, + ScavengeIndex](std::atomic<bool>&) { + if (!m_AbortFlag) { - ScavengedPaths[ScavengeIndex].clear(); - } - PathsScavenged++; - } - }); - } - { - ZEN_TRACE_CPU("ScavengeScan_Wait"); + ZEN_TRACE_CPU("Async_FindScavengeContent"); - Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(PendingWork); - std::string Details = fmt::format("{}/{} scanned. {} paths and {} chunks found for scavenging", - PathsScavenged.load(), - ScavengePathCount, - PathsFound.load(), - ChunksFound.load()); - ScavengeProgressBar.UpdateState({.Task = "Scavenging ", - .Details = Details, - .TotalCount = ScavengePathCount, - .RemainingCount = ScavengePathCount - PathsScavenged.load(), - .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, - false); - }); - } + const ScavengeSource& Source = ScavengeSources[ScavengeIndex]; + ChunkedFolderContent& ScavengedLocalContent = ScavengedContents[ScavengeIndex]; + ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengeIndex]; - ScavengeProgressBar.Finish(); - if (m_AbortFlag) - { - return; - } + if (FindScavengeContent(Source, ScavengedLocalContent, ScavengedLookup)) + { + ScavengedPaths[ScavengeIndex] = Source.Path; + PathsFound += ScavengedLocalContent.Paths.size(); + ChunksFound += ScavengedLocalContent.ChunkedContent.ChunkHashes.size(); + } + else + { + ScavengedPaths[ScavengeIndex].clear(); + } + PathsScavenged++; + } + }); + } + { + ZEN_TRACE_CPU("ScavengeScan_Wait"); + + Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); + std::string Details = fmt::format("{}/{} scanned. {} paths and {} chunks found for scavenging", + PathsScavenged.load(), + ScavengePathCount, + PathsFound.load(), + ChunksFound.load()); + ScavengeProgressBar.UpdateState( + {.Task = "Scavenging ", + .Details = Details, + .TotalCount = ScavengePathCount, + .RemainingCount = ScavengePathCount - PathsScavenged.load(), + .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, + false); + }); + } - for (uint32_t ScavengedContentIndex = 0; - ScavengedContentIndex < ScavengedContents.size() && (!SequenceIndexesLeftToFindToRemoteIndex.empty()); - ScavengedContentIndex++) - { - const std::filesystem::path& ScavengePath = ScavengedPaths[ScavengedContentIndex]; - if (!ScavengePath.empty()) + ScavengeProgressBar.Finish(); + if (m_AbortFlag) { - const ChunkedFolderContent& ScavengedLocalContent = ScavengedContents[ScavengedContentIndex]; - const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengedContentIndex]; + return; + } - for (uint32_t ScavengedSequenceIndex = 0; - ScavengedSequenceIndex < ScavengedLocalContent.ChunkedContent.SequenceRawHashes.size(); - ScavengedSequenceIndex++) + for (uint32_t ScavengedContentIndex = 0; + ScavengedContentIndex < ScavengedContents.size() && (!SequenceIndexesLeftToFindToRemoteIndex.empty()); + ScavengedContentIndex++) + { + const std::filesystem::path& ScavengePath = ScavengedPaths[ScavengedContentIndex]; + if (!ScavengePath.empty()) { - const IoHash& SequenceRawHash = ScavengedLocalContent.ChunkedContent.SequenceRawHashes[ScavengedSequenceIndex]; - if (auto It = SequenceIndexesLeftToFindToRemoteIndex.find(SequenceRawHash); - It != SequenceIndexesLeftToFindToRemoteIndex.end()) + const ChunkedFolderContent& ScavengedLocalContent = ScavengedContents[ScavengedContentIndex]; + const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengedContentIndex]; + + for (uint32_t ScavengedSequenceIndex = 0; + ScavengedSequenceIndex < ScavengedLocalContent.ChunkedContent.SequenceRawHashes.size(); + ScavengedSequenceIndex++) { - const uint32_t RemoteSequenceIndex = It->second; - const uint64_t RawSize = - m_RemoteContent.RawSizes[m_RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]]; - ZEN_ASSERT(RawSize > 0); + const IoHash& SequenceRawHash = ScavengedLocalContent.ChunkedContent.SequenceRawHashes[ScavengedSequenceIndex]; + if (auto It = SequenceIndexesLeftToFindToRemoteIndex.find(SequenceRawHash); + It != SequenceIndexesLeftToFindToRemoteIndex.end()) + { + const uint32_t RemoteSequenceIndex = It->second; + const uint64_t RawSize = + m_RemoteContent.RawSizes[m_RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]]; + ZEN_ASSERT(RawSize > 0); - const uint32_t ScavengedPathIndex = ScavengedLookup.SequenceIndexFirstPathIndex[ScavengedSequenceIndex]; - ZEN_ASSERT_SLOW(IsFile((ScavengePath / ScavengedLocalContent.Paths[ScavengedPathIndex]).make_preferred())); + const uint32_t ScavengedPathIndex = ScavengedLookup.SequenceIndexFirstPathIndex[ScavengedSequenceIndex]; + ZEN_ASSERT_SLOW(IsFile((ScavengePath / ScavengedLocalContent.Paths[ScavengedPathIndex]).make_preferred())); - ScavengedSequenceCopyOperations.push_back({.ScavengedContentIndex = ScavengedContentIndex, - .ScavengedPathIndex = ScavengedPathIndex, - .RemoteSequenceIndex = RemoteSequenceIndex, - .RawSize = RawSize}); + ScavengedSequenceCopyOperations.push_back({.ScavengedContentIndex = ScavengedContentIndex, + .ScavengedPathIndex = ScavengedPathIndex, + .RemoteSequenceIndex = RemoteSequenceIndex, + .RawSize = RawSize}); - SequenceIndexesLeftToFindToRemoteIndex.erase(SequenceRawHash); - SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = 0; + SequenceIndexesLeftToFindToRemoteIndex.erase(SequenceRawHash); + SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = 0; - m_CacheMappingStats.ScavengedPathsMatchingSequencesCount++; - m_CacheMappingStats.ScavengedPathsMatchingSequencesByteCount += RawSize; + m_CacheMappingStats.ScavengedPathsMatchingSequencesCount++; + m_CacheMappingStats.ScavengedPathsMatchingSequencesByteCount += RawSize; + } } + ScavengedPathsCount++; } - ScavengedPathsCount++; } } + m_CacheMappingStats.ScavengeElapsedWallTimeUs += ScavengeTimer.GetElapsedTimeUs(); } - m_CacheMappingStats.ScavengeElapsedWallTimeUs += ScavengeTimer.GetElapsedTimeUs(); - } - - uint32_t RemainingChunkCount = 0; - for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < m_RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++) - { - uint64_t ChunkWriteCount = GetChunkWriteCount(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); - if (ChunkWriteCount > 0) - { - RemainingChunkCount++; - } - } - - // Pick up all chunks in current local state - tsl::robin_map<IoHash, size_t, IoHash::Hasher> RawHashToCopyChunkDataIndex; - std::vector<CopyChunkData> CopyChunkDatas; - - if (!m_Options.PrimeCacheOnly && m_Options.EnableTargetFolderScavenging) - { - ZEN_TRACE_CPU("GetLocalChunks"); - Stopwatch LocalTimer; - - for (uint32_t LocalSequenceIndex = 0; - LocalSequenceIndex < m_LocalContent.ChunkedContent.SequenceRawHashes.size() && (RemainingChunkCount > 0); - LocalSequenceIndex++) + uint32_t RemainingChunkCount = 0; + for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < m_RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++) { - const IoHash& LocalSequenceRawHash = m_LocalContent.ChunkedContent.SequenceRawHashes[LocalSequenceIndex]; - const uint32_t LocalOrderOffset = m_LocalLookup.SequenceIndexChunkOrderOffset[LocalSequenceIndex]; - + uint64_t ChunkWriteCount = GetChunkWriteCount(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); + if (ChunkWriteCount > 0) { - uint64_t SourceOffset = 0; - const uint32_t LocalChunkCount = m_LocalContent.ChunkedContent.ChunkCounts[LocalSequenceIndex]; - for (uint32_t LocalOrderIndex = 0; LocalOrderIndex < LocalChunkCount; LocalOrderIndex++) - { - const uint32_t LocalChunkIndex = m_LocalContent.ChunkedContent.ChunkOrders[LocalOrderOffset + LocalOrderIndex]; - const IoHash& LocalChunkHash = m_LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; - const uint64_t LocalChunkRawSize = m_LocalContent.ChunkedContent.ChunkRawSizes[LocalChunkIndex]; - - if (auto RemoteChunkIt = m_RemoteLookup.ChunkHashToChunkIndex.find(LocalChunkHash); - RemoteChunkIt != m_RemoteLookup.ChunkHashToChunkIndex.end()) - { - const uint32_t RemoteChunkIndex = RemoteChunkIt->second; - if (!RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) - { - std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = - GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); - - if (!ChunkTargetPtrs.empty()) - { - CopyChunkData::ChunkTarget Target = { - .TargetChunkLocationCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()), - .RemoteChunkIndex = RemoteChunkIndex, - .CacheFileOffset = SourceOffset}; - if (auto CopySourceIt = RawHashToCopyChunkDataIndex.find(LocalSequenceRawHash); - CopySourceIt != RawHashToCopyChunkDataIndex.end()) - { - CopyChunkData& Data = CopyChunkDatas[CopySourceIt->second]; - if (Data.TargetChunkLocationPtrs.size() > 1024) - { - RawHashToCopyChunkDataIndex.insert_or_assign(LocalSequenceRawHash, CopyChunkDatas.size()); - CopyChunkDatas.push_back( - CopyChunkData{.ScavengeSourceIndex = (uint32_t)-1, - .SourceSequenceIndex = LocalSequenceIndex, - .TargetChunkLocationPtrs = ChunkTargetPtrs, - .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}}); - } - else - { - Data.TargetChunkLocationPtrs.insert(Data.TargetChunkLocationPtrs.end(), - ChunkTargetPtrs.begin(), - ChunkTargetPtrs.end()); - Data.ChunkTargets.push_back(Target); - } - } - else - { - RawHashToCopyChunkDataIndex.insert_or_assign(LocalSequenceRawHash, CopyChunkDatas.size()); - CopyChunkDatas.push_back( - CopyChunkData{.ScavengeSourceIndex = (uint32_t)-1, - .SourceSequenceIndex = LocalSequenceIndex, - .TargetChunkLocationPtrs = ChunkTargetPtrs, - .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}}); - } - m_CacheMappingStats.LocalChunkMatchingRemoteCount++; - m_CacheMappingStats.LocalChunkMatchingRemoteByteCount += LocalChunkRawSize; - RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex] = true; - RemainingChunkCount--; - } - } - } - SourceOffset += LocalChunkRawSize; - } + RemainingChunkCount++; } } - m_CacheMappingStats.LocalScanElapsedWallTimeUs += LocalTimer.GetElapsedTimeUs(); - } - - if (!m_Options.PrimeCacheOnly && m_Options.EnableOtherDownloadsScavenging) - { - ZEN_TRACE_CPU("GetScavengeChunks"); - Stopwatch ScavengeTimer; + // Pick up all chunks in current local state + tsl::robin_map<IoHash, size_t, IoHash::Hasher> RawHashToCopyChunkDataIndex; + std::vector<CopyChunkData> CopyChunkDatas; - for (uint32_t ScavengedContentIndex = 0; ScavengedContentIndex < ScavengedContents.size() && (RemainingChunkCount > 0); - ScavengedContentIndex++) + if (!m_Options.PrimeCacheOnly && m_Options.EnableTargetFolderScavenging) { - const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengedContentIndex]; - // const std::filesystem::path& ScavengedPath = ScavengedPaths[ScavengedContentIndex]; - const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengedContentIndex]; + ZEN_TRACE_CPU("GetLocalChunks"); + + Stopwatch LocalTimer; - for (uint32_t ScavengedSequenceIndex = 0; - ScavengedSequenceIndex < ScavengedContent.ChunkedContent.SequenceRawHashes.size() && (RemainingChunkCount > 0); - ScavengedSequenceIndex++) + for (uint32_t LocalSequenceIndex = 0; + LocalSequenceIndex < m_LocalContent.ChunkedContent.SequenceRawHashes.size() && (RemainingChunkCount > 0); + LocalSequenceIndex++) { - const IoHash& ScavengedSequenceRawHash = ScavengedContent.ChunkedContent.SequenceRawHashes[ScavengedSequenceIndex]; - const uint32_t ScavengedOrderOffset = ScavengedLookup.SequenceIndexChunkOrderOffset[ScavengedSequenceIndex]; + const IoHash& LocalSequenceRawHash = m_LocalContent.ChunkedContent.SequenceRawHashes[LocalSequenceIndex]; + const uint32_t LocalOrderOffset = m_LocalLookup.SequenceIndexChunkOrderOffset[LocalSequenceIndex]; { - uint64_t SourceOffset = 0; - const uint32_t ScavengedChunkCount = ScavengedContent.ChunkedContent.ChunkCounts[ScavengedSequenceIndex]; - for (uint32_t ScavengedOrderIndex = 0; ScavengedOrderIndex < ScavengedChunkCount; ScavengedOrderIndex++) + uint64_t SourceOffset = 0; + const uint32_t LocalChunkCount = m_LocalContent.ChunkedContent.ChunkCounts[LocalSequenceIndex]; + for (uint32_t LocalOrderIndex = 0; LocalOrderIndex < LocalChunkCount; LocalOrderIndex++) { - const uint32_t ScavengedChunkIndex = - ScavengedContent.ChunkedContent.ChunkOrders[ScavengedOrderOffset + ScavengedOrderIndex]; - const IoHash& ScavengedChunkHash = ScavengedContent.ChunkedContent.ChunkHashes[ScavengedChunkIndex]; - const uint64_t ScavengedChunkRawSize = ScavengedContent.ChunkedContent.ChunkRawSizes[ScavengedChunkIndex]; + const uint32_t LocalChunkIndex = m_LocalContent.ChunkedContent.ChunkOrders[LocalOrderOffset + LocalOrderIndex]; + const IoHash& LocalChunkHash = m_LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; + const uint64_t LocalChunkRawSize = m_LocalContent.ChunkedContent.ChunkRawSizes[LocalChunkIndex]; - if (auto RemoteChunkIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ScavengedChunkHash); + if (auto RemoteChunkIt = m_RemoteLookup.ChunkHashToChunkIndex.find(LocalChunkHash); RemoteChunkIt != m_RemoteLookup.ChunkHashToChunkIndex.end()) { const uint32_t RemoteChunkIndex = RemoteChunkIt->second; @@ -999,16 +823,16 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) .TargetChunkLocationCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()), .RemoteChunkIndex = RemoteChunkIndex, .CacheFileOffset = SourceOffset}; - if (auto CopySourceIt = RawHashToCopyChunkDataIndex.find(ScavengedSequenceRawHash); + if (auto CopySourceIt = RawHashToCopyChunkDataIndex.find(LocalSequenceRawHash); CopySourceIt != RawHashToCopyChunkDataIndex.end()) { CopyChunkData& Data = CopyChunkDatas[CopySourceIt->second]; if (Data.TargetChunkLocationPtrs.size() > 1024) { - RawHashToCopyChunkDataIndex.insert_or_assign(ScavengedSequenceRawHash, CopyChunkDatas.size()); + RawHashToCopyChunkDataIndex.insert_or_assign(LocalSequenceRawHash, CopyChunkDatas.size()); CopyChunkDatas.push_back( - CopyChunkData{.ScavengeSourceIndex = ScavengedContentIndex, - .SourceSequenceIndex = ScavengedSequenceIndex, + CopyChunkData{.ScavengeSourceIndex = (uint32_t)-1, + .SourceSequenceIndex = LocalSequenceIndex, .TargetChunkLocationPtrs = ChunkTargetPtrs, .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}}); } @@ -1022,158 +846,248 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) } else { - RawHashToCopyChunkDataIndex.insert_or_assign(ScavengedSequenceRawHash, CopyChunkDatas.size()); + RawHashToCopyChunkDataIndex.insert_or_assign(LocalSequenceRawHash, CopyChunkDatas.size()); CopyChunkDatas.push_back( - CopyChunkData{.ScavengeSourceIndex = ScavengedContentIndex, - .SourceSequenceIndex = ScavengedSequenceIndex, + CopyChunkData{.ScavengeSourceIndex = (uint32_t)-1, + .SourceSequenceIndex = LocalSequenceIndex, .TargetChunkLocationPtrs = ChunkTargetPtrs, .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}}); } - m_CacheMappingStats.ScavengedChunkMatchingRemoteCount++; - m_CacheMappingStats.ScavengedChunkMatchingRemoteByteCount += ScavengedChunkRawSize; + m_CacheMappingStats.LocalChunkMatchingRemoteCount++; + m_CacheMappingStats.LocalChunkMatchingRemoteByteCount += LocalChunkRawSize; RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex] = true; RemainingChunkCount--; } } } - SourceOffset += ScavengedChunkRawSize; + SourceOffset += LocalChunkRawSize; } } } - } - m_CacheMappingStats.ScavengeElapsedWallTimeUs += ScavengeTimer.GetElapsedTimeUs(); - } - if (!m_Options.IsQuiet) - { - if (m_CacheMappingStats.CacheSequenceHashesCount > 0 || m_CacheMappingStats.CacheChunkCount > 0 || - m_CacheMappingStats.CacheBlockCount > 0) - { - LOG_OUTPUT(m_LogOutput, - "Download cache: Found {} ({}) chunk sequences, {} ({}) chunks, {} ({}) blocks in {}", - m_CacheMappingStats.CacheSequenceHashesCount, - NiceBytes(m_CacheMappingStats.CacheSequenceHashesByteCount), - m_CacheMappingStats.CacheChunkCount, - NiceBytes(m_CacheMappingStats.CacheChunkByteCount), - m_CacheMappingStats.CacheBlockCount, - NiceBytes(m_CacheMappingStats.CacheBlocksByteCount), - NiceTimeSpanMs(m_CacheMappingStats.CacheScanElapsedWallTimeUs / 1000)); + m_CacheMappingStats.LocalScanElapsedWallTimeUs += LocalTimer.GetElapsedTimeUs(); } - if (m_CacheMappingStats.LocalPathsMatchingSequencesCount > 0 || m_CacheMappingStats.LocalChunkMatchingRemoteCount > 0) + if (!m_Options.PrimeCacheOnly && m_Options.EnableOtherDownloadsScavenging) { - LOG_OUTPUT(m_LogOutput, - "Local state : Found {} ({}) chunk sequences, {} ({}) chunks in {}", - m_CacheMappingStats.LocalPathsMatchingSequencesCount, - NiceBytes(m_CacheMappingStats.LocalPathsMatchingSequencesByteCount), - m_CacheMappingStats.LocalChunkMatchingRemoteCount, - NiceBytes(m_CacheMappingStats.LocalChunkMatchingRemoteByteCount), - NiceTimeSpanMs(m_CacheMappingStats.LocalScanElapsedWallTimeUs / 1000)); + ZEN_TRACE_CPU("GetScavengeChunks"); + + Stopwatch ScavengeTimer; + + for (uint32_t ScavengedContentIndex = 0; ScavengedContentIndex < ScavengedContents.size() && (RemainingChunkCount > 0); + ScavengedContentIndex++) + { + const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengedContentIndex]; + // const std::filesystem::path& ScavengedPath = ScavengedPaths[ScavengedContentIndex]; + const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengedContentIndex]; + + for (uint32_t ScavengedSequenceIndex = 0; + ScavengedSequenceIndex < ScavengedContent.ChunkedContent.SequenceRawHashes.size() && (RemainingChunkCount > 0); + ScavengedSequenceIndex++) + { + const IoHash& ScavengedSequenceRawHash = ScavengedContent.ChunkedContent.SequenceRawHashes[ScavengedSequenceIndex]; + const uint32_t ScavengedOrderOffset = ScavengedLookup.SequenceIndexChunkOrderOffset[ScavengedSequenceIndex]; + + { + uint64_t SourceOffset = 0; + const uint32_t ScavengedChunkCount = ScavengedContent.ChunkedContent.ChunkCounts[ScavengedSequenceIndex]; + for (uint32_t ScavengedOrderIndex = 0; ScavengedOrderIndex < ScavengedChunkCount; ScavengedOrderIndex++) + { + const uint32_t ScavengedChunkIndex = + ScavengedContent.ChunkedContent.ChunkOrders[ScavengedOrderOffset + ScavengedOrderIndex]; + const IoHash& ScavengedChunkHash = ScavengedContent.ChunkedContent.ChunkHashes[ScavengedChunkIndex]; + const uint64_t ScavengedChunkRawSize = ScavengedContent.ChunkedContent.ChunkRawSizes[ScavengedChunkIndex]; + + if (auto RemoteChunkIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ScavengedChunkHash); + RemoteChunkIt != m_RemoteLookup.ChunkHashToChunkIndex.end()) + { + const uint32_t RemoteChunkIndex = RemoteChunkIt->second; + if (!RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) + { + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = + GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); + + if (!ChunkTargetPtrs.empty()) + { + CopyChunkData::ChunkTarget Target = { + .TargetChunkLocationCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()), + .RemoteChunkIndex = RemoteChunkIndex, + .CacheFileOffset = SourceOffset}; + if (auto CopySourceIt = RawHashToCopyChunkDataIndex.find(ScavengedSequenceRawHash); + CopySourceIt != RawHashToCopyChunkDataIndex.end()) + { + CopyChunkData& Data = CopyChunkDatas[CopySourceIt->second]; + if (Data.TargetChunkLocationPtrs.size() > 1024) + { + RawHashToCopyChunkDataIndex.insert_or_assign(ScavengedSequenceRawHash, + CopyChunkDatas.size()); + CopyChunkDatas.push_back( + CopyChunkData{.ScavengeSourceIndex = ScavengedContentIndex, + .SourceSequenceIndex = ScavengedSequenceIndex, + .TargetChunkLocationPtrs = ChunkTargetPtrs, + .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}}); + } + else + { + Data.TargetChunkLocationPtrs.insert(Data.TargetChunkLocationPtrs.end(), + ChunkTargetPtrs.begin(), + ChunkTargetPtrs.end()); + Data.ChunkTargets.push_back(Target); + } + } + else + { + RawHashToCopyChunkDataIndex.insert_or_assign(ScavengedSequenceRawHash, CopyChunkDatas.size()); + CopyChunkDatas.push_back( + CopyChunkData{.ScavengeSourceIndex = ScavengedContentIndex, + .SourceSequenceIndex = ScavengedSequenceIndex, + .TargetChunkLocationPtrs = ChunkTargetPtrs, + .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}}); + } + m_CacheMappingStats.ScavengedChunkMatchingRemoteCount++; + m_CacheMappingStats.ScavengedChunkMatchingRemoteByteCount += ScavengedChunkRawSize; + RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex] = true; + RemainingChunkCount--; + } + } + } + SourceOffset += ScavengedChunkRawSize; + } + } + } + } + m_CacheMappingStats.ScavengeElapsedWallTimeUs += ScavengeTimer.GetElapsedTimeUs(); } - if (m_CacheMappingStats.ScavengedPathsMatchingSequencesCount > 0 || m_CacheMappingStats.ScavengedChunkMatchingRemoteCount > 0) + if (!m_Options.IsQuiet) { - LOG_OUTPUT(m_LogOutput, - "Scavenge of {} paths, found {} ({}) chunk sequences, {} ({}) chunks in {}", - ScavengedPathsCount, - m_CacheMappingStats.ScavengedPathsMatchingSequencesCount, - NiceBytes(m_CacheMappingStats.ScavengedPathsMatchingSequencesByteCount), - m_CacheMappingStats.ScavengedChunkMatchingRemoteCount, - NiceBytes(m_CacheMappingStats.ScavengedChunkMatchingRemoteByteCount), - NiceTimeSpanMs(m_CacheMappingStats.ScavengeElapsedWallTimeUs / 1000)); + if (m_CacheMappingStats.CacheSequenceHashesCount > 0 || m_CacheMappingStats.CacheChunkCount > 0 || + m_CacheMappingStats.CacheBlockCount > 0) + { + LOG_OUTPUT(m_LogOutput, + "Download cache: Found {} ({}) chunk sequences, {} ({}) chunks, {} ({}) blocks in {}", + m_CacheMappingStats.CacheSequenceHashesCount, + NiceBytes(m_CacheMappingStats.CacheSequenceHashesByteCount), + m_CacheMappingStats.CacheChunkCount, + NiceBytes(m_CacheMappingStats.CacheChunkByteCount), + m_CacheMappingStats.CacheBlockCount, + NiceBytes(m_CacheMappingStats.CacheBlocksByteCount), + NiceTimeSpanMs(m_CacheMappingStats.CacheScanElapsedWallTimeUs / 1000)); + } + + if (m_CacheMappingStats.LocalPathsMatchingSequencesCount > 0 || m_CacheMappingStats.LocalChunkMatchingRemoteCount > 0) + { + LOG_OUTPUT(m_LogOutput, + "Local state : Found {} ({}) chunk sequences, {} ({}) chunks in {}", + m_CacheMappingStats.LocalPathsMatchingSequencesCount, + NiceBytes(m_CacheMappingStats.LocalPathsMatchingSequencesByteCount), + m_CacheMappingStats.LocalChunkMatchingRemoteCount, + NiceBytes(m_CacheMappingStats.LocalChunkMatchingRemoteByteCount), + NiceTimeSpanMs(m_CacheMappingStats.LocalScanElapsedWallTimeUs / 1000)); + } + if (m_CacheMappingStats.ScavengedPathsMatchingSequencesCount > 0 || m_CacheMappingStats.ScavengedChunkMatchingRemoteCount > 0) + { + LOG_OUTPUT(m_LogOutput, + "Scavenge of {} paths, found {} ({}) chunk sequences, {} ({}) chunks in {}", + ScavengedPathsCount, + m_CacheMappingStats.ScavengedPathsMatchingSequencesCount, + NiceBytes(m_CacheMappingStats.ScavengedPathsMatchingSequencesByteCount), + m_CacheMappingStats.ScavengedChunkMatchingRemoteCount, + NiceBytes(m_CacheMappingStats.ScavengedChunkMatchingRemoteByteCount), + NiceTimeSpanMs(m_CacheMappingStats.ScavengeElapsedWallTimeUs / 1000)); + } } - } - uint64_t BytesToWrite = 0; + uint64_t BytesToWrite = 0; - for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < m_RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++) - { - uint64_t ChunkWriteCount = GetChunkWriteCount(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); - if (ChunkWriteCount > 0) + for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < m_RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++) { - BytesToWrite += m_RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] * ChunkWriteCount; - if (!RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) + uint64_t ChunkWriteCount = GetChunkWriteCount(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); + if (ChunkWriteCount > 0) { - RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex] = true; + BytesToWrite += m_RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] * ChunkWriteCount; + if (!RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) + { + RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex] = true; + } } } - } - for (const ScavengedSequenceCopyOperation& ScavengeCopyOp : ScavengedSequenceCopyOperations) - { - BytesToWrite += ScavengeCopyOp.RawSize; - } + for (const ScavengedSequenceCopyOperation& ScavengeCopyOp : ScavengedSequenceCopyOperations) + { + BytesToWrite += ScavengeCopyOp.RawSize; + } - uint64_t BytesToValidate = BytesToWrite; + uint64_t BytesToValidate = m_Options.ValidateCompletedSequences ? BytesToWrite : 0; - uint64_t TotalRequestCount = 0; - uint64_t TotalPartWriteCount = 0; - std::atomic<uint64_t> WritePartsComplete = 0; + uint64_t TotalRequestCount = 0; + uint64_t TotalPartWriteCount = 0; + std::atomic<uint64_t> WritePartsComplete = 0; - { - ZEN_TRACE_CPU("WriteChunks"); + { + ZEN_TRACE_CPU("WriteChunks"); - m_LogOutput.SetLogOperationProgress(TaskSteps::WriteChunks, TaskSteps::StepCount); + m_LogOutput.SetLogOperationProgress(TaskSteps::WriteChunks, TaskSteps::StepCount); - Stopwatch WriteTimer; + Stopwatch WriteTimer; - FilteredRate FilteredDownloadedBytesPerSecond; - FilteredRate FilteredWrittenBytesPerSecond; + FilteredRate FilteredDownloadedBytesPerSecond; + FilteredRate FilteredWrittenBytesPerSecond; - std::unique_ptr<BuildOpLogOutput::ProgressBar> WriteProgressBarPtr( - m_LogOutput.CreateProgressBar(m_Options.PrimeCacheOnly ? "Downloading" : "Writing")); - BuildOpLogOutput::ProgressBar& WriteProgressBar(*WriteProgressBarPtr); - ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + std::unique_ptr<BuildOpLogOutput::ProgressBar> WriteProgressBarPtr( + m_LogOutput.CreateProgressBar(m_Options.PrimeCacheOnly ? "Downloading" : "Writing")); + BuildOpLogOutput::ProgressBar& WriteProgressBar(*WriteProgressBarPtr); + ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - struct LooseChunkHashWorkData - { - std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs; - uint32_t RemoteChunkIndex = (uint32_t)-1; - }; + struct LooseChunkHashWorkData + { + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs; + uint32_t RemoteChunkIndex = (uint32_t)-1; + }; - std::vector<LooseChunkHashWorkData> LooseChunkHashWorks; - TotalPartWriteCount += CopyChunkDatas.size(); - TotalPartWriteCount += ScavengedSequenceCopyOperations.size(); + std::vector<LooseChunkHashWorkData> LooseChunkHashWorks; + TotalPartWriteCount += CopyChunkDatas.size(); + TotalPartWriteCount += ScavengedSequenceCopyOperations.size(); - for (const IoHash ChunkHash : m_LooseChunkHashes) - { - auto RemoteChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); - ZEN_ASSERT(RemoteChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end()); - const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second; - if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) + for (const IoHash ChunkHash : m_LooseChunkHashes) { - if (m_Options.IsVerbose) - { - LOG_OUTPUT(m_LogOutput, "Skipping chunk {} due to cache reuse", ChunkHash); - } - continue; - } - bool NeedsCopy = true; - if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex].compare_exchange_strong(NeedsCopy, false)) - { - std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = - GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); - - if (ChunkTargetPtrs.empty()) + auto RemoteChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); + ZEN_ASSERT(RemoteChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end()); + const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second; + if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) { if (m_Options.IsVerbose) { LOG_OUTPUT(m_LogOutput, "Skipping chunk {} due to cache reuse", ChunkHash); } + continue; } - else + bool NeedsCopy = true; + if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex].compare_exchange_strong(NeedsCopy, false)) { - TotalRequestCount++; - TotalPartWriteCount++; - LooseChunkHashWorks.push_back( - LooseChunkHashWorkData{.ChunkTargetPtrs = ChunkTargetPtrs, .RemoteChunkIndex = RemoteChunkIndex}); + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = + GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); + + if (ChunkTargetPtrs.empty()) + { + if (m_Options.IsVerbose) + { + LOG_OUTPUT(m_LogOutput, "Skipping chunk {} due to cache reuse", ChunkHash); + } + } + else + { + TotalRequestCount++; + TotalPartWriteCount++; + LooseChunkHashWorks.push_back( + LooseChunkHashWorkData{.ChunkTargetPtrs = ChunkTargetPtrs, .RemoteChunkIndex = RemoteChunkIndex}); + } } } - } - uint32_t BlockCount = gsl::narrow<uint32_t>(m_BlockDescriptions.size()); + uint32_t BlockCount = gsl::narrow<uint32_t>(m_BlockDescriptions.size()); - std::vector<bool> ChunkIsPickedUpByBlock(m_RemoteContent.ChunkedContent.ChunkHashes.size(), false); - auto GetNeededChunkBlockIndexes = - [this, &RemoteChunkIndexNeedsCopyFromSourceFlags, &ChunkIsPickedUpByBlock](const ChunkBlockDescription& BlockDescription) { + std::vector<bool> ChunkIsPickedUpByBlock(m_RemoteContent.ChunkedContent.ChunkHashes.size(), false); + auto GetNeededChunkBlockIndexes = [this, &RemoteChunkIndexNeedsCopyFromSourceFlags, &ChunkIsPickedUpByBlock]( + const ChunkBlockDescription& BlockDescription) { ZEN_TRACE_CPU("GetNeededChunkBlockIndexes"); std::vector<uint32_t> NeededBlockChunkIndexes; for (uint32_t ChunkBlockIndex = 0; ChunkBlockIndex < BlockDescription.ChunkRawHashes.size(); ChunkBlockIndex++) @@ -1195,181 +1109,190 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) return NeededBlockChunkIndexes; }; - std::vector<uint32_t> CachedChunkBlockIndexes; - std::vector<uint32_t> FetchBlockIndexes; - std::vector<std::vector<uint32_t>> AllBlockChunkIndexNeeded; - - for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; BlockIndex++) - { - const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + std::vector<uint32_t> CachedChunkBlockIndexes; + std::vector<uint32_t> FetchBlockIndexes; + std::vector<std::vector<uint32_t>> AllBlockChunkIndexNeeded; - std::vector<uint32_t> BlockChunkIndexNeeded = GetNeededChunkBlockIndexes(BlockDescription); - if (!BlockChunkIndexNeeded.empty()) + for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; BlockIndex++) { - if (m_Options.PrimeCacheOnly) - { - FetchBlockIndexes.push_back(BlockIndex); - } - else + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + + std::vector<uint32_t> BlockChunkIndexNeeded = GetNeededChunkBlockIndexes(BlockDescription); + if (!BlockChunkIndexNeeded.empty()) { - bool UsingCachedBlock = false; - if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end()) + if (m_Options.PrimeCacheOnly) { - TotalPartWriteCount++; + FetchBlockIndexes.push_back(BlockIndex); + } + else + { + bool UsingCachedBlock = false; + if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end()) + { + TotalPartWriteCount++; - std::filesystem::path BlockPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); - if (IsFile(BlockPath)) + std::filesystem::path BlockPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); + if (IsFile(BlockPath)) + { + CachedChunkBlockIndexes.push_back(BlockIndex); + UsingCachedBlock = true; + } + } + if (!UsingCachedBlock) { - CachedChunkBlockIndexes.push_back(BlockIndex); - UsingCachedBlock = true; + FetchBlockIndexes.push_back(BlockIndex); } } - if (!UsingCachedBlock) - { - FetchBlockIndexes.push_back(BlockIndex); - } } + AllBlockChunkIndexNeeded.emplace_back(std::move(BlockChunkIndexNeeded)); } - AllBlockChunkIndexNeeded.emplace_back(std::move(BlockChunkIndexNeeded)); - } - BlobsExistsResult ExistsResult; + BlobsExistsResult ExistsResult; - if (m_Storage.BuildCacheStorage) - { - ZEN_TRACE_CPU("BlobCacheExistCheck"); - Stopwatch Timer; - - tsl::robin_set<IoHash> BlobHashesSet; - - BlobHashesSet.reserve(LooseChunkHashWorks.size() + FetchBlockIndexes.size()); - for (LooseChunkHashWorkData& LooseChunkHashWork : LooseChunkHashWorks) + if (m_Storage.BuildCacheStorage) { - BlobHashesSet.insert(m_RemoteContent.ChunkedContent.ChunkHashes[LooseChunkHashWork.RemoteChunkIndex]); - } - for (uint32_t BlockIndex : FetchBlockIndexes) - { - const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; - BlobHashesSet.insert(BlockDescription.BlockHash); - } + ZEN_TRACE_CPU("BlobCacheExistCheck"); + Stopwatch Timer; - if (!BlobHashesSet.empty()) - { - const std::vector<IoHash> BlobHashes(BlobHashesSet.begin(), BlobHashesSet.end()); - const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult = - m_Storage.BuildCacheStorage->BlobsExists(m_BuildId, BlobHashes); + tsl::robin_set<IoHash> BlobHashesSet; - if (CacheExistsResult.size() == BlobHashes.size()) + BlobHashesSet.reserve(LooseChunkHashWorks.size() + FetchBlockIndexes.size()); + for (LooseChunkHashWorkData& LooseChunkHashWork : LooseChunkHashWorks) + { + BlobHashesSet.insert(m_RemoteContent.ChunkedContent.ChunkHashes[LooseChunkHashWork.RemoteChunkIndex]); + } + for (uint32_t BlockIndex : FetchBlockIndexes) { - ExistsResult.ExistingBlobs.reserve(CacheExistsResult.size()); - for (size_t BlobIndex = 0; BlobIndex < BlobHashes.size(); BlobIndex++) + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + BlobHashesSet.insert(BlockDescription.BlockHash); + } + + if (!BlobHashesSet.empty()) + { + const std::vector<IoHash> BlobHashes(BlobHashesSet.begin(), BlobHashesSet.end()); + const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult = + m_Storage.BuildCacheStorage->BlobsExists(m_BuildId, BlobHashes); + + if (CacheExistsResult.size() == BlobHashes.size()) { - if (CacheExistsResult[BlobIndex].HasBody) + ExistsResult.ExistingBlobs.reserve(CacheExistsResult.size()); + for (size_t BlobIndex = 0; BlobIndex < BlobHashes.size(); BlobIndex++) { - ExistsResult.ExistingBlobs.insert(BlobHashes[BlobIndex]); + if (CacheExistsResult[BlobIndex].HasBody) + { + ExistsResult.ExistingBlobs.insert(BlobHashes[BlobIndex]); + } } } - } - ExistsResult.ElapsedTimeMs = Timer.GetElapsedTimeMs(); - if (!ExistsResult.ExistingBlobs.empty() && !m_Options.IsQuiet) - { - LOG_OUTPUT(m_LogOutput, - "Remote cache : Found {} out of {} needed blobs in {}", - ExistsResult.ExistingBlobs.size(), - BlobHashes.size(), - NiceTimeSpanMs(ExistsResult.ElapsedTimeMs)); + ExistsResult.ElapsedTimeMs = Timer.GetElapsedTimeMs(); + if (!ExistsResult.ExistingBlobs.empty() && !m_Options.IsQuiet) + { + LOG_OUTPUT(m_LogOutput, + "Remote cache : Found {} out of {} needed blobs in {}", + ExistsResult.ExistingBlobs.size(), + BlobHashes.size(), + NiceTimeSpanMs(ExistsResult.ElapsedTimeMs)); + } } } - } - std::vector<BlockRangeDescriptor> BlockRangeWorks; - std::vector<uint32_t> FullBlockWorks; - { - Stopwatch Timer; - - std::vector<uint32_t> PartialBlockIndexes; - - for (uint32_t BlockIndex : FetchBlockIndexes) + std::vector<BlockRangeDescriptor> BlockRangeWorks; + std::vector<uint32_t> FullBlockWorks; { - const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + Stopwatch Timer; - const std::vector<uint32_t> BlockChunkIndexNeeded = std::move(AllBlockChunkIndexNeeded[BlockIndex]); - if (!BlockChunkIndexNeeded.empty()) + std::vector<uint32_t> PartialBlockIndexes; + + for (uint32_t BlockIndex : FetchBlockIndexes) { - bool WantsToDoPartialBlockDownload = BlockChunkIndexNeeded.size() < BlockDescription.ChunkRawHashes.size(); - bool CanDoPartialBlockDownload = (BlockDescription.HeaderSize > 0) && (BlockDescription.ChunkCompressedLengths.size() == - BlockDescription.ChunkRawHashes.size()); + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; - bool AllowedToDoPartialRequest = false; - bool BlockExistInCache = ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash); - switch (m_Options.PartialBlockRequestMode) + const std::vector<uint32_t> BlockChunkIndexNeeded = std::move(AllBlockChunkIndexNeeded[BlockIndex]); + if (!BlockChunkIndexNeeded.empty()) { - case EPartialBlockRequestMode::Off: - break; - case EPartialBlockRequestMode::ZenCacheOnly: - AllowedToDoPartialRequest = BlockExistInCache; - break; - case EPartialBlockRequestMode::Mixed: - case EPartialBlockRequestMode::All: - AllowedToDoPartialRequest = true; - break; - default: - ZEN_ASSERT(false); - break; - } + bool WantsToDoPartialBlockDownload = BlockChunkIndexNeeded.size() < BlockDescription.ChunkRawHashes.size(); + bool CanDoPartialBlockDownload = + (BlockDescription.HeaderSize > 0) && + (BlockDescription.ChunkCompressedLengths.size() == BlockDescription.ChunkRawHashes.size()); + + bool AllowedToDoPartialRequest = false; + bool BlockExistInCache = ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash); + switch (m_Options.PartialBlockRequestMode) + { + case EPartialBlockRequestMode::Off: + break; + case EPartialBlockRequestMode::ZenCacheOnly: + AllowedToDoPartialRequest = BlockExistInCache; + break; + case EPartialBlockRequestMode::Mixed: + case EPartialBlockRequestMode::All: + AllowedToDoPartialRequest = true; + break; + default: + ZEN_ASSERT(false); + break; + } - const uint32_t ChunkStartOffsetInBlock = - gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); + const uint32_t ChunkStartOffsetInBlock = + gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); - const uint64_t TotalBlockSize = std::accumulate(BlockDescription.ChunkCompressedLengths.begin(), - BlockDescription.ChunkCompressedLengths.end(), - std::uint64_t(ChunkStartOffsetInBlock)); + const uint64_t TotalBlockSize = std::accumulate(BlockDescription.ChunkCompressedLengths.begin(), + BlockDescription.ChunkCompressedLengths.end(), + std::uint64_t(ChunkStartOffsetInBlock)); - if (AllowedToDoPartialRequest && WantsToDoPartialBlockDownload && CanDoPartialBlockDownload) - { - ZEN_TRACE_CPU("PartialBlockAnalysis"); - - bool LimitToSingleRange = - BlockExistInCache ? false : m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Mixed; - uint64_t TotalWantedChunksSize = 0; - std::optional<std::vector<BlockRangeDescriptor>> MaybeBlockRanges = CalculateBlockRanges(BlockIndex, - BlockDescription, - BlockChunkIndexNeeded, - LimitToSingleRange, - ChunkStartOffsetInBlock, - TotalBlockSize, - TotalWantedChunksSize); - ZEN_ASSERT(TotalWantedChunksSize <= TotalBlockSize); - - if (MaybeBlockRanges.has_value()) + if (AllowedToDoPartialRequest && WantsToDoPartialBlockDownload && CanDoPartialBlockDownload) { - const std::vector<BlockRangeDescriptor>& BlockRanges = MaybeBlockRanges.value(); - ZEN_ASSERT(!BlockRanges.empty()); - BlockRangeWorks.insert(BlockRangeWorks.end(), BlockRanges.begin(), BlockRanges.end()); - TotalRequestCount += BlockRanges.size(); - TotalPartWriteCount += BlockRanges.size(); - - uint64_t RequestedSize = std::accumulate( - BlockRanges.begin(), - BlockRanges.end(), - uint64_t(0), - [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; }); - PartialBlockIndexes.push_back(BlockIndex); - - if (RequestedSize > TotalWantedChunksSize) + ZEN_TRACE_CPU("PartialBlockAnalysis"); + + bool LimitToSingleRange = + BlockExistInCache ? false : m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Mixed; + uint64_t TotalWantedChunksSize = 0; + std::optional<std::vector<BlockRangeDescriptor>> MaybeBlockRanges = + CalculateBlockRanges(BlockIndex, + BlockDescription, + BlockChunkIndexNeeded, + LimitToSingleRange, + ChunkStartOffsetInBlock, + TotalBlockSize, + TotalWantedChunksSize); + ZEN_ASSERT(TotalWantedChunksSize <= TotalBlockSize); + + if (MaybeBlockRanges.has_value()) { - if (m_Options.IsVerbose) + const std::vector<BlockRangeDescriptor>& BlockRanges = MaybeBlockRanges.value(); + ZEN_ASSERT(!BlockRanges.empty()); + BlockRangeWorks.insert(BlockRangeWorks.end(), BlockRanges.begin(), BlockRanges.end()); + TotalRequestCount += BlockRanges.size(); + TotalPartWriteCount += BlockRanges.size(); + + uint64_t RequestedSize = std::accumulate( + BlockRanges.begin(), + BlockRanges.end(), + uint64_t(0), + [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; }); + PartialBlockIndexes.push_back(BlockIndex); + + if (RequestedSize > TotalWantedChunksSize) { - LOG_OUTPUT(m_LogOutput, - "Requesting {} chunks ({}) from block {} ({}) using {} requests (extra bytes {})", - BlockChunkIndexNeeded.size(), - NiceBytes(RequestedSize), - BlockDescription.BlockHash, - NiceBytes(TotalBlockSize), - BlockRanges.size(), - NiceBytes(RequestedSize - TotalWantedChunksSize)); + if (m_Options.IsVerbose) + { + LOG_OUTPUT(m_LogOutput, + "Requesting {} chunks ({}) from block {} ({}) using {} requests (extra bytes {})", + BlockChunkIndexNeeded.size(), + NiceBytes(RequestedSize), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + BlockRanges.size(), + NiceBytes(RequestedSize - TotalWantedChunksSize)); + } } } + else + { + FullBlockWorks.push_back(BlockIndex); + TotalRequestCount++; + TotalPartWriteCount++; + } } else { @@ -1378,584 +1301,393 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) TotalPartWriteCount++; } } - else - { - FullBlockWorks.push_back(BlockIndex); - TotalRequestCount++; - TotalPartWriteCount++; - } } - } - if (!PartialBlockIndexes.empty()) - { - uint64_t TotalFullBlockRequestBytes = 0; - for (uint32_t BlockIndex : FullBlockWorks) + if (!PartialBlockIndexes.empty()) { - const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; - uint32_t CurrentOffset = - gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); + uint64_t TotalFullBlockRequestBytes = 0; + for (uint32_t BlockIndex : FullBlockWorks) + { + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + uint32_t CurrentOffset = + gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); - TotalFullBlockRequestBytes += std::accumulate(BlockDescription.ChunkCompressedLengths.begin(), - BlockDescription.ChunkCompressedLengths.end(), - std::uint64_t(CurrentOffset)); - } + TotalFullBlockRequestBytes += std::accumulate(BlockDescription.ChunkCompressedLengths.begin(), + BlockDescription.ChunkCompressedLengths.end(), + std::uint64_t(CurrentOffset)); + } - uint64_t TotalPartialBlockBytes = 0; - for (uint32_t BlockIndex : PartialBlockIndexes) - { - const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; - uint32_t CurrentOffset = - gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); + uint64_t TotalPartialBlockBytes = 0; + for (uint32_t BlockIndex : PartialBlockIndexes) + { + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + uint32_t CurrentOffset = + gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); - TotalPartialBlockBytes += std::accumulate(BlockDescription.ChunkCompressedLengths.begin(), - BlockDescription.ChunkCompressedLengths.end(), - std::uint64_t(CurrentOffset)); - } + TotalPartialBlockBytes += std::accumulate(BlockDescription.ChunkCompressedLengths.begin(), + BlockDescription.ChunkCompressedLengths.end(), + std::uint64_t(CurrentOffset)); + } - uint64_t NonPartialTotalBlockBytes = TotalFullBlockRequestBytes + TotalPartialBlockBytes; + uint64_t NonPartialTotalBlockBytes = TotalFullBlockRequestBytes + TotalPartialBlockBytes; - const uint64_t TotalPartialBlockRequestBytes = - std::accumulate(BlockRangeWorks.begin(), - BlockRangeWorks.end(), - uint64_t(0), - [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; }); - uint64_t TotalExtraPartialBlocksRequests = BlockRangeWorks.size() - PartialBlockIndexes.size(); + const uint64_t TotalPartialBlockRequestBytes = + std::accumulate(BlockRangeWorks.begin(), + BlockRangeWorks.end(), + uint64_t(0), + [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; }); + uint64_t TotalExtraPartialBlocksRequests = BlockRangeWorks.size() - PartialBlockIndexes.size(); - uint64_t TotalSavedBlocksSize = TotalPartialBlockBytes - TotalPartialBlockRequestBytes; - double SavedSizePercent = (TotalSavedBlocksSize * 100.0) / NonPartialTotalBlockBytes; + uint64_t TotalSavedBlocksSize = TotalPartialBlockBytes - TotalPartialBlockRequestBytes; + double SavedSizePercent = (TotalSavedBlocksSize * 100.0) / NonPartialTotalBlockBytes; - if (!m_Options.IsQuiet) - { - LOG_OUTPUT(m_LogOutput, - "Analisys of partial block requests saves download of {} out of {} ({:.1f}%) using {} extra " - "requests. Completed in {}", - NiceBytes(TotalSavedBlocksSize), - NiceBytes(NonPartialTotalBlockBytes), - SavedSizePercent, - TotalExtraPartialBlocksRequests, - NiceTimeSpanMs(ExistsResult.ElapsedTimeMs)); + if (!m_Options.IsQuiet) + { + LOG_OUTPUT(m_LogOutput, + "Analisys of partial block requests saves download of {} out of {} ({:.1f}%) using {} extra " + "requests. Completed in {}", + NiceBytes(TotalSavedBlocksSize), + NiceBytes(NonPartialTotalBlockBytes), + SavedSizePercent, + TotalExtraPartialBlocksRequests, + NiceTimeSpanMs(ExistsResult.ElapsedTimeMs)); + } } } - } - BufferedWriteFileCache WriteCache; + BufferedWriteFileCache WriteCache; - for (uint32_t ScavengeOpIndex = 0; ScavengeOpIndex < ScavengedSequenceCopyOperations.size(); ScavengeOpIndex++) - { - if (m_AbortFlag) - { - break; - } - if (!m_Options.PrimeCacheOnly) + for (uint32_t ScavengeOpIndex = 0; ScavengeOpIndex < ScavengedSequenceCopyOperations.size(); ScavengeOpIndex++) { - Work.ScheduleWork( - m_IOWorkerPool, - [this, - &ScavengedPaths, - &ScavengedSequenceCopyOperations, - &ScavengedContents, - &FilteredWrittenBytesPerSecond, - ScavengeOpIndex, - &WritePartsComplete, - TotalPartWriteCount](std::atomic<bool>&) mutable { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Async_WriteScavenged"); + if (m_AbortFlag) + { + break; + } + if (!m_Options.PrimeCacheOnly) + { + Work.ScheduleWork( + m_IOWorkerPool, + [this, + &ScavengedPaths, + &ScavengedSequenceCopyOperations, + &ScavengedContents, + &FilteredWrittenBytesPerSecond, + ScavengeOpIndex, + &WritePartsComplete, + TotalPartWriteCount](std::atomic<bool>&) mutable { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_WriteScavenged"); - FilteredWrittenBytesPerSecond.Start(); + FilteredWrittenBytesPerSecond.Start(); - const ScavengedSequenceCopyOperation& ScavengeOp = ScavengedSequenceCopyOperations[ScavengeOpIndex]; - const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengeOp.ScavengedContentIndex]; - const std::filesystem::path& ScavengeRootPath = ScavengedPaths[ScavengeOp.ScavengedContentIndex]; + const ScavengedSequenceCopyOperation& ScavengeOp = ScavengedSequenceCopyOperations[ScavengeOpIndex]; + const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengeOp.ScavengedContentIndex]; + const std::filesystem::path& ScavengeRootPath = ScavengedPaths[ScavengeOp.ScavengedContentIndex]; - WriteScavengedSequenceToCache(ScavengeRootPath, ScavengedContent, ScavengeOp); + WriteScavengedSequenceToCache(ScavengeRootPath, ScavengedContent, ScavengeOp); - WritePartsComplete++; - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); + WritePartsComplete++; + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } } - } - }); - } - } - - for (uint32_t LooseChunkHashWorkIndex = 0; LooseChunkHashWorkIndex < LooseChunkHashWorks.size(); LooseChunkHashWorkIndex++) - { - if (m_AbortFlag) - { - break; + }); + } } - if (m_Options.PrimeCacheOnly) + for (uint32_t LooseChunkHashWorkIndex = 0; LooseChunkHashWorkIndex < LooseChunkHashWorks.size(); LooseChunkHashWorkIndex++) { - const uint32_t RemoteChunkIndex = LooseChunkHashWorks[LooseChunkHashWorkIndex].RemoteChunkIndex; - if (ExistsResult.ExistingBlobs.contains(m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex])) + if (m_AbortFlag) { - m_DownloadStats.RequestsCompleteCount++; - continue; + break; } - } - - Work.ScheduleWork(m_IOWorkerPool, - [this, - &SequenceIndexChunksLeftToWriteCounters, - &Work, - &ExistsResult, - &WritePartsComplete, - &LooseChunkHashWorks, - LooseChunkHashWorkIndex, - TotalRequestCount, - TotalPartWriteCount, - &WriteCache, - &FilteredDownloadedBytesPerSecond, - &FilteredWrittenBytesPerSecond](std::atomic<bool>&) mutable { - ZEN_TRACE_CPU("Async_ReadPreDownloadedChunk"); - if (!m_AbortFlag) - { - LooseChunkHashWorkData& LooseChunkHashWork = LooseChunkHashWorks[LooseChunkHashWorkIndex]; - const uint32_t RemoteChunkIndex = LooseChunkHashWorks[LooseChunkHashWorkIndex].RemoteChunkIndex; - WriteLooseChunk(RemoteChunkIndex, - ExistsResult, - SequenceIndexChunksLeftToWriteCounters, - WritePartsComplete, - std::move(LooseChunkHashWork.ChunkTargetPtrs), - WriteCache, - Work, - TotalRequestCount, - TotalPartWriteCount, - FilteredDownloadedBytesPerSecond, - FilteredWrittenBytesPerSecond); - } - }); - } - - std::unique_ptr<CloneQueryInterface> CloneQuery; - if (m_Options.AllowFileClone) - { - CloneQuery = GetCloneQueryInterface(m_CacheFolderPath); - } - for (size_t CopyDataIndex = 0; CopyDataIndex < CopyChunkDatas.size(); CopyDataIndex++) - { - ZEN_ASSERT(!m_Options.PrimeCacheOnly); - if (m_AbortFlag) - { - break; - } + if (m_Options.PrimeCacheOnly) + { + const uint32_t RemoteChunkIndex = LooseChunkHashWorks[LooseChunkHashWorkIndex].RemoteChunkIndex; + if (ExistsResult.ExistingBlobs.contains(m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex])) + { + m_DownloadStats.RequestsCompleteCount++; + continue; + } + } - Work.ScheduleWork(m_IOWorkerPool, - [this, - &CloneQuery, - &SequenceIndexChunksLeftToWriteCounters, - &WriteCache, - &Work, - &FilteredWrittenBytesPerSecond, - &CopyChunkDatas, - &ScavengedContents, - &ScavengedLookups, - &ScavengedPaths, - &WritePartsComplete, - TotalPartWriteCount, - CopyDataIndex](std::atomic<bool>&) { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Async_CopyLocal"); - - FilteredWrittenBytesPerSecond.Start(); - const CopyChunkData& CopyData = CopyChunkDatas[CopyDataIndex]; - - std::vector<uint32_t> WrittenSequenceIndexes = WriteLocalChunkToCache(CloneQuery.get(), - CopyData, - ScavengedContents, - ScavengedLookups, - ScavengedPaths, - WriteCache); - WritePartsComplete++; + Work.ScheduleWork(m_IOWorkerPool, + [this, + &SequenceIndexChunksLeftToWriteCounters, + &Work, + &ExistsResult, + &WritePartsComplete, + &LooseChunkHashWorks, + LooseChunkHashWorkIndex, + TotalRequestCount, + TotalPartWriteCount, + &WriteCache, + &FilteredDownloadedBytesPerSecond, + &FilteredWrittenBytesPerSecond](std::atomic<bool>&) mutable { + ZEN_TRACE_CPU("Async_ReadPreDownloadedChunk"); if (!m_AbortFlag) { - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); - } - - // Write tracking, updating this must be done without any files open - std::vector<uint32_t> CompletedChunkSequences; - for (uint32_t RemoteSequenceIndex : WrittenSequenceIndexes) - { - if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) - { - CompletedChunkSequences.push_back(RemoteSequenceIndex); - } - } - WriteCache.Close(CompletedChunkSequences); - VerifyAndCompleteChunkSequencesAsync(CompletedChunkSequences, Work); + LooseChunkHashWorkData& LooseChunkHashWork = LooseChunkHashWorks[LooseChunkHashWorkIndex]; + const uint32_t RemoteChunkIndex = LooseChunkHashWorks[LooseChunkHashWorkIndex].RemoteChunkIndex; + WriteLooseChunk(RemoteChunkIndex, + ExistsResult, + SequenceIndexChunksLeftToWriteCounters, + WritePartsComplete, + std::move(LooseChunkHashWork.ChunkTargetPtrs), + WriteCache, + Work, + TotalRequestCount, + TotalPartWriteCount, + FilteredDownloadedBytesPerSecond, + FilteredWrittenBytesPerSecond); } - } - }); - } + }); + } - for (uint32_t BlockIndex : CachedChunkBlockIndexes) - { - ZEN_ASSERT(!m_Options.PrimeCacheOnly); - if (m_AbortFlag) + std::unique_ptr<CloneQueryInterface> CloneQuery; + if (m_Options.AllowFileClone) { - break; + CloneQuery = GetCloneQueryInterface(m_CacheFolderPath); } - Work.ScheduleWork(m_IOWorkerPool, - [this, - &RemoteChunkIndexNeedsCopyFromSourceFlags, - &SequenceIndexChunksLeftToWriteCounters, - &WriteCache, - &Work, - &FilteredWrittenBytesPerSecond, - &WritePartsComplete, - TotalPartWriteCount, - BlockIndex](std::atomic<bool>&) mutable { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Async_WriteCachedBlock"); - - const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; - FilteredWrittenBytesPerSecond.Start(); - - std::filesystem::path BlockChunkPath = - m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); - IoBuffer BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); - if (!BlockBuffer) - { - throw std::runtime_error( - fmt::format("Can not read block {} at {}", BlockDescription.BlockHash, BlockChunkPath)); - } + for (size_t CopyDataIndex = 0; CopyDataIndex < CopyChunkDatas.size(); CopyDataIndex++) + { + ZEN_ASSERT(!m_Options.PrimeCacheOnly); + if (m_AbortFlag) + { + break; + } + Work.ScheduleWork(m_IOWorkerPool, + [this, + &CloneQuery, + &SequenceIndexChunksLeftToWriteCounters, + &WriteCache, + &Work, + &FilteredWrittenBytesPerSecond, + &CopyChunkDatas, + &ScavengedContents, + &ScavengedLookups, + &ScavengedPaths, + &WritePartsComplete, + TotalPartWriteCount, + CopyDataIndex](std::atomic<bool>&) { if (!m_AbortFlag) { - if (!WriteChunksBlockToCache(BlockDescription, - SequenceIndexChunksLeftToWriteCounters, - Work, - CompositeBuffer(std::move(BlockBuffer)), - RemoteChunkIndexNeedsCopyFromSourceFlags, - WriteCache)) - { - std::error_code DummyEc; - RemoveFile(BlockChunkPath, DummyEc); - throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash)); - } + ZEN_TRACE_CPU("Async_CopyLocal"); - std::error_code Ec = TryRemoveFile(BlockChunkPath); - if (Ec) - { - LOG_OUTPUT_DEBUG(m_LogOutput, - "Failed removing file '{}', reason: ({}) {}", - BlockChunkPath, - Ec.value(), - Ec.message()); - } + FilteredWrittenBytesPerSecond.Start(); + const CopyChunkData& CopyData = CopyChunkDatas[CopyDataIndex]; + std::vector<uint32_t> WrittenSequenceIndexes = WriteLocalChunkToCache(CloneQuery.get(), + CopyData, + ScavengedContents, + ScavengedLookups, + ScavengedPaths, + WriteCache); WritePartsComplete++; - - if (WritePartsComplete == TotalPartWriteCount) + if (!m_AbortFlag) { - FilteredWrittenBytesPerSecond.Stop(); + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } + + // Write tracking, updating this must be done without any files open + std::vector<uint32_t> CompletedChunkSequences; + for (uint32_t RemoteSequenceIndex : WrittenSequenceIndexes) + { + if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) + { + CompletedChunkSequences.push_back(RemoteSequenceIndex); + } + } + WriteCache.Close(CompletedChunkSequences); + VerifyAndCompleteChunkSequencesAsync(CompletedChunkSequences, Work); } } - } - }); - } + }); + } - for (size_t BlockRangeIndex = 0; BlockRangeIndex < BlockRangeWorks.size(); BlockRangeIndex++) - { - ZEN_ASSERT(!m_Options.PrimeCacheOnly); - if (m_AbortFlag) + for (uint32_t BlockIndex : CachedChunkBlockIndexes) { - break; - } - Work.ScheduleWork( - m_NetworkPool, - [this, - &RemoteChunkIndexNeedsCopyFromSourceFlags, - &SequenceIndexChunksLeftToWriteCounters, - &ExistsResult, - &WriteCache, - &FilteredDownloadedBytesPerSecond, - TotalRequestCount, - &WritePartsComplete, - TotalPartWriteCount, - &FilteredWrittenBytesPerSecond, - &Work, - &BlockRangeWorks, - BlockRangeIndex](std::atomic<bool>&) { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Async_GetPartialBlock"); + ZEN_ASSERT(!m_Options.PrimeCacheOnly); + if (m_AbortFlag) + { + break; + } - const BlockRangeDescriptor& BlockRange = BlockRangeWorks[BlockRangeIndex]; + Work.ScheduleWork( + m_IOWorkerPool, + [this, + &RemoteChunkIndexNeedsCopyFromSourceFlags, + &SequenceIndexChunksLeftToWriteCounters, + &WriteCache, + &Work, + &FilteredWrittenBytesPerSecond, + &WritePartsComplete, + TotalPartWriteCount, + BlockIndex](std::atomic<bool>&) mutable { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_WriteCachedBlock"); - FilteredDownloadedBytesPerSecond.Start(); + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + FilteredWrittenBytesPerSecond.Start(); - DownloadPartialBlock( - BlockRange, - ExistsResult, - [this, - &RemoteChunkIndexNeedsCopyFromSourceFlags, - &SequenceIndexChunksLeftToWriteCounters, - &WritePartsComplete, - &WriteCache, - &Work, - TotalRequestCount, - TotalPartWriteCount, - &FilteredDownloadedBytesPerSecond, - &FilteredWrittenBytesPerSecond, - &BlockRange](IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath) { - if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) + std::filesystem::path BlockChunkPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); + IoBuffer BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); + if (!BlockBuffer) + { + throw std::runtime_error( + fmt::format("Can not read block {} at {}", BlockDescription.BlockHash, BlockChunkPath)); + } + + if (!m_AbortFlag) + { + if (!WriteChunksBlockToCache(BlockDescription, + SequenceIndexChunksLeftToWriteCounters, + Work, + CompositeBuffer(std::move(BlockBuffer)), + RemoteChunkIndexNeedsCopyFromSourceFlags, + WriteCache)) { - FilteredDownloadedBytesPerSecond.Stop(); + std::error_code DummyEc; + RemoveFile(BlockChunkPath, DummyEc); + throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash)); } - if (!m_AbortFlag) + std::error_code Ec = TryRemoveFile(BlockChunkPath); + if (Ec) { - Work.ScheduleWork( - m_IOWorkerPool, - [this, - &RemoteChunkIndexNeedsCopyFromSourceFlags, - &SequenceIndexChunksLeftToWriteCounters, - &WritePartsComplete, - &WriteCache, - &Work, - TotalPartWriteCount, - &FilteredWrittenBytesPerSecond, - &BlockRange, - BlockChunkPath = std::move(OnDiskPath), - BlockPartialBuffer = std::move(InMemoryBuffer)](std::atomic<bool>&) mutable { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Async_WritePartialBlock"); - - const uint32_t BlockIndex = BlockRange.BlockIndex; - - const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; - - if (BlockChunkPath.empty()) - { - ZEN_ASSERT(BlockPartialBuffer); - } - else - { - ZEN_ASSERT(!BlockPartialBuffer); - BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); - if (!BlockPartialBuffer) - { - throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}", - BlockDescription.BlockHash, - BlockChunkPath)); - } - } - - FilteredWrittenBytesPerSecond.Start(); - - if (!WritePartialBlockChunksToCache( - BlockDescription, - SequenceIndexChunksLeftToWriteCounters, - Work, - CompositeBuffer(std::move(BlockPartialBuffer)), - BlockRange.ChunkBlockIndexStart, - BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1, - RemoteChunkIndexNeedsCopyFromSourceFlags, - WriteCache)) - { - std::error_code DummyEc; - RemoveFile(BlockChunkPath, DummyEc); - throw std::runtime_error( - fmt::format("Partial block {} is malformed", BlockDescription.BlockHash)); - } + LOG_OUTPUT_DEBUG(m_LogOutput, + "Failed removing file '{}', reason: ({}) {}", + BlockChunkPath, + Ec.value(), + Ec.message()); + } - std::error_code Ec = TryRemoveFile(BlockChunkPath); - if (Ec) - { - LOG_OUTPUT_DEBUG(m_LogOutput, - "Failed removing file '{}', reason: ({}) {}", - BlockChunkPath, - Ec.value(), - Ec.message()); - } + WritePartsComplete++; - WritePartsComplete++; - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); - } - } - }); + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); } - }); - } - }); - } - - for (uint32_t BlockIndex : FullBlockWorks) - { - if (m_AbortFlag) - { - break; + } + } + }); } - if (m_Options.PrimeCacheOnly && ExistsResult.ExistingBlobs.contains(m_BlockDescriptions[BlockIndex].BlockHash)) + for (size_t BlockRangeIndex = 0; BlockRangeIndex < BlockRangeWorks.size(); BlockRangeIndex++) { - m_DownloadStats.RequestsCompleteCount++; - continue; - } - - Work.ScheduleWork( - m_NetworkPool, - [this, - &WritePartsComplete, - TotalPartWriteCount, - &FilteredWrittenBytesPerSecond, - &ExistsResult, - &Work, - &WriteCache, - &RemoteChunkIndexNeedsCopyFromSourceFlags, - &SequenceIndexChunksLeftToWriteCounters, - &FilteredDownloadedBytesPerSecond, - TotalRequestCount, - BlockIndex](std::atomic<bool>&) { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Async_GetFullBlock"); - - const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; - - FilteredDownloadedBytesPerSecond.Start(); - - IoBuffer BlockBuffer; - const bool ExistsInCache = - m_Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash); - if (ExistsInCache) - { - BlockBuffer = m_Storage.BuildCacheStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash); - } - if (!BlockBuffer) - { - BlockBuffer = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash); - if (BlockBuffer && m_Storage.BuildCacheStorage) - { - m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, - BlockDescription.BlockHash, - ZenContentType::kCompressedBinary, - CompositeBuffer(SharedBuffer(BlockBuffer))); - } - } - if (!BlockBuffer) - { - throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); - } + ZEN_ASSERT(!m_Options.PrimeCacheOnly); + if (m_AbortFlag) + { + break; + } + Work.ScheduleWork( + m_NetworkPool, + [this, + &RemoteChunkIndexNeedsCopyFromSourceFlags, + &SequenceIndexChunksLeftToWriteCounters, + &ExistsResult, + &WriteCache, + &FilteredDownloadedBytesPerSecond, + TotalRequestCount, + &WritePartsComplete, + TotalPartWriteCount, + &FilteredWrittenBytesPerSecond, + &Work, + &BlockRangeWorks, + BlockRangeIndex](std::atomic<bool>&) { if (!m_AbortFlag) { - uint64_t BlockSize = BlockBuffer.GetSize(); - m_DownloadStats.DownloadedBlockCount++; - m_DownloadStats.DownloadedBlockByteCount += BlockSize; - m_DownloadStats.RequestsCompleteCount++; - if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } + ZEN_TRACE_CPU("Async_GetPartialBlock"); - if (!m_Options.PrimeCacheOnly) - { - std::filesystem::path BlockChunkPath; + const BlockRangeDescriptor& BlockRange = BlockRangeWorks[BlockRangeIndex]; - // Check if the dowloaded block is file based and we can move it directly without rewriting it - { - IoBufferFileReference FileRef; - if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && - (FileRef.FileChunkSize == BlockSize)) - { - ZEN_TRACE_CPU("MoveTempFullBlock"); - std::error_code Ec; - std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); - if (!Ec) - { - BlockBuffer.SetDeleteOnClose(false); - BlockBuffer = {}; - BlockChunkPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); - RenameFile(TempBlobPath, BlockChunkPath, Ec); - if (Ec) - { - BlockChunkPath = std::filesystem::path{}; + FilteredDownloadedBytesPerSecond.Start(); - // Re-open the temp file again - BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); - BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); - BlockBuffer.SetDeleteOnClose(true); - } - } + DownloadPartialBlock( + BlockRange, + ExistsResult, + [this, + &RemoteChunkIndexNeedsCopyFromSourceFlags, + &SequenceIndexChunksLeftToWriteCounters, + &WritePartsComplete, + &WriteCache, + &Work, + TotalRequestCount, + TotalPartWriteCount, + &FilteredDownloadedBytesPerSecond, + &FilteredWrittenBytesPerSecond, + &BlockRange](IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath) { + if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); } - } - if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u)) - { - ZEN_TRACE_CPU("WriteTempFullBlock"); - // Could not be moved and rather large, lets store it on disk - BlockChunkPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); - TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); - BlockBuffer = {}; - } + if (!m_AbortFlag) + { + Work.ScheduleWork( + m_IOWorkerPool, + [this, + &RemoteChunkIndexNeedsCopyFromSourceFlags, + &SequenceIndexChunksLeftToWriteCounters, + &WritePartsComplete, + &WriteCache, + &Work, + TotalPartWriteCount, + &FilteredWrittenBytesPerSecond, + &BlockRange, + BlockChunkPath = std::move(OnDiskPath), + BlockPartialBuffer = std::move(InMemoryBuffer)](std::atomic<bool>&) mutable { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_WritePartialBlock"); - if (!m_AbortFlag) - { - Work.ScheduleWork( - m_IOWorkerPool, - [this, - &Work, - &RemoteChunkIndexNeedsCopyFromSourceFlags, - &SequenceIndexChunksLeftToWriteCounters, - BlockIndex, - &WriteCache, - &WritePartsComplete, - TotalPartWriteCount, - &FilteredWrittenBytesPerSecond, - BlockChunkPath, - BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Async_WriteFullBlock"); + const uint32_t BlockIndex = BlockRange.BlockIndex; - const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; - if (BlockChunkPath.empty()) - { - ZEN_ASSERT(BlockBuffer); - } - else - { - ZEN_ASSERT(!BlockBuffer); - BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); - if (!BlockBuffer) + if (BlockChunkPath.empty()) { - throw std::runtime_error(fmt::format("Could not open dowloaded block {} from {}", - BlockDescription.BlockHash, - BlockChunkPath)); + ZEN_ASSERT(BlockPartialBuffer); + } + else + { + ZEN_ASSERT(!BlockPartialBuffer); + BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); + if (!BlockPartialBuffer) + { + throw std::runtime_error( + fmt::format("Could not open downloaded block {} from {}", + BlockDescription.BlockHash, + BlockChunkPath)); + } } - } - FilteredWrittenBytesPerSecond.Start(); - if (!WriteChunksBlockToCache(BlockDescription, - SequenceIndexChunksLeftToWriteCounters, - Work, - CompositeBuffer(std::move(BlockBuffer)), - RemoteChunkIndexNeedsCopyFromSourceFlags, - WriteCache)) - { - std::error_code DummyEc; - RemoveFile(BlockChunkPath, DummyEc); - throw std::runtime_error( - fmt::format("Block {} is malformed", BlockDescription.BlockHash)); - } + FilteredWrittenBytesPerSecond.Start(); + + if (!WritePartialBlockChunksToCache( + BlockDescription, + SequenceIndexChunksLeftToWriteCounters, + Work, + CompositeBuffer(std::move(BlockPartialBuffer)), + BlockRange.ChunkBlockIndexStart, + BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1, + RemoteChunkIndexNeedsCopyFromSourceFlags, + WriteCache)) + { + std::error_code DummyEc; + RemoveFile(BlockChunkPath, DummyEc); + throw std::runtime_error( + fmt::format("Partial block {} is malformed", BlockDescription.BlockHash)); + } - if (!BlockChunkPath.empty()) - { std::error_code Ec = TryRemoveFile(BlockChunkPath); if (Ec) { @@ -1965,672 +1697,870 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) Ec.value(), Ec.message()); } + + WritePartsComplete++; + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } } + }); + } + }); + } + }); + } + + for (uint32_t BlockIndex : FullBlockWorks) + { + if (m_AbortFlag) + { + break; + } + + if (m_Options.PrimeCacheOnly && ExistsResult.ExistingBlobs.contains(m_BlockDescriptions[BlockIndex].BlockHash)) + { + m_DownloadStats.RequestsCompleteCount++; + continue; + } - WritePartsComplete++; + Work.ScheduleWork( + m_NetworkPool, + [this, + &WritePartsComplete, + TotalPartWriteCount, + &FilteredWrittenBytesPerSecond, + &ExistsResult, + &Work, + &WriteCache, + &RemoteChunkIndexNeedsCopyFromSourceFlags, + &SequenceIndexChunksLeftToWriteCounters, + &FilteredDownloadedBytesPerSecond, + TotalRequestCount, + BlockIndex](std::atomic<bool>&) { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_GetFullBlock"); - if (WritePartsComplete == TotalPartWriteCount) + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + + FilteredDownloadedBytesPerSecond.Start(); + + IoBuffer BlockBuffer; + const bool ExistsInCache = + m_Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash); + if (ExistsInCache) + { + BlockBuffer = m_Storage.BuildCacheStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash); + } + if (!BlockBuffer) + { + BlockBuffer = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash); + if (BlockBuffer && m_Storage.BuildCacheStorage) + { + m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, + BlockDescription.BlockHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(SharedBuffer(BlockBuffer))); + } + } + if (!BlockBuffer) + { + throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); + } + if (!m_AbortFlag) + { + uint64_t BlockSize = BlockBuffer.GetSize(); + m_DownloadStats.DownloadedBlockCount++; + m_DownloadStats.DownloadedBlockByteCount += BlockSize; + m_DownloadStats.RequestsCompleteCount++; + if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + + if (!m_Options.PrimeCacheOnly) + { + std::filesystem::path BlockChunkPath; + + // Check if the dowloaded block is file based and we can move it directly without rewriting it + { + IoBufferFileReference FileRef; + if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && + (FileRef.FileChunkSize == BlockSize)) + { + ZEN_TRACE_CPU("MoveTempFullBlock"); + std::error_code Ec; + std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); + if (!Ec) + { + BlockBuffer.SetDeleteOnClose(false); + BlockBuffer = {}; + BlockChunkPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); + RenameFile(TempBlobPath, BlockChunkPath, Ec); + if (Ec) { - FilteredWrittenBytesPerSecond.Stop(); + BlockChunkPath = std::filesystem::path{}; + + // Re-open the temp file again + BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); + BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); + BlockBuffer.SetDeleteOnClose(true); } } - }); + } + } + + if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u)) + { + ZEN_TRACE_CPU("WriteTempFullBlock"); + // Could not be moved and rather large, lets store it on disk + BlockChunkPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); + TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); + BlockBuffer = {}; + } + + if (!m_AbortFlag) + { + Work.ScheduleWork(m_IOWorkerPool, + [this, + &Work, + &RemoteChunkIndexNeedsCopyFromSourceFlags, + &SequenceIndexChunksLeftToWriteCounters, + BlockIndex, + &WriteCache, + &WritePartsComplete, + TotalPartWriteCount, + &FilteredWrittenBytesPerSecond, + BlockChunkPath, + BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_WriteFullBlock"); + + const ChunkBlockDescription& BlockDescription = + m_BlockDescriptions[BlockIndex]; + + if (BlockChunkPath.empty()) + { + ZEN_ASSERT(BlockBuffer); + } + else + { + ZEN_ASSERT(!BlockBuffer); + BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); + if (!BlockBuffer) + { + throw std::runtime_error( + fmt::format("Could not open dowloaded block {} from {}", + BlockDescription.BlockHash, + BlockChunkPath)); + } + } + + FilteredWrittenBytesPerSecond.Start(); + if (!WriteChunksBlockToCache(BlockDescription, + SequenceIndexChunksLeftToWriteCounters, + Work, + CompositeBuffer(std::move(BlockBuffer)), + RemoteChunkIndexNeedsCopyFromSourceFlags, + WriteCache)) + { + std::error_code DummyEc; + RemoveFile(BlockChunkPath, DummyEc); + throw std::runtime_error( + fmt::format("Block {} is malformed", BlockDescription.BlockHash)); + } + + if (!BlockChunkPath.empty()) + { + std::error_code Ec = TryRemoveFile(BlockChunkPath); + if (Ec) + { + LOG_OUTPUT_DEBUG(m_LogOutput, + "Failed removing file '{}', reason: ({}) {}", + BlockChunkPath, + Ec.value(), + Ec.message()); + } + } + + WritePartsComplete++; + + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } + } + }); + } } } } - } - }); - } + }); + } - { - ZEN_TRACE_CPU("WriteChunks_Wait"); + { + ZEN_TRACE_CPU("WriteChunks_Wait"); - Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(PendingWork); - uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() + - m_DownloadStats.DownloadedBlockByteCount.load() + - +m_DownloadStats.DownloadedPartialBlockByteCount.load(); - FilteredWrittenBytesPerSecond.Update(m_DiskStats.WriteByteCount.load()); - FilteredDownloadedBytesPerSecond.Update(DownloadedBytes); - std::string DownloadRateString = (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) - ? "" - : fmt::format(" {}bits/s", NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8)); - std::string CloneDetails; - if (m_DiskStats.CloneCount.load() > 0) - { - CloneDetails = fmt::format(" ({} cloned)", NiceBytes(m_DiskStats.CloneByteCount.load())); - } - std::string WriteDetails = m_Options.PrimeCacheOnly ? "" - : fmt::format(" {}/{} ({}B/s) written{}", - NiceBytes(m_WrittenChunkByteCount.load()), - NiceBytes(BytesToWrite), - NiceNum(FilteredWrittenBytesPerSecond.GetCurrent()), - CloneDetails); - - std::string Details = fmt::format("{}/{} ({}{}) downloaded.{}", - m_DownloadStats.RequestsCompleteCount.load(), - TotalRequestCount, - NiceBytes(DownloadedBytes), - DownloadRateString, - WriteDetails); - - std::string Task; - if (m_Options.PrimeCacheOnly) - { - Task = "Downloading "; - } - else if (m_WrittenChunkByteCount < BytesToWrite) - { - Task = "Writing chunks "; - } - else - { - Task = "Verifying chunks "; - } + Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); + uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() + + m_DownloadStats.DownloadedBlockByteCount.load() + + +m_DownloadStats.DownloadedPartialBlockByteCount.load(); + FilteredWrittenBytesPerSecond.Update(m_DiskStats.WriteByteCount.load()); + FilteredDownloadedBytesPerSecond.Update(DownloadedBytes); + std::string DownloadRateString = + (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) + ? "" + : fmt::format(" {}bits/s", NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8)); + std::string CloneDetails; + if (m_DiskStats.CloneCount.load() > 0) + { + CloneDetails = fmt::format(" ({} cloned)", NiceBytes(m_DiskStats.CloneByteCount.load())); + } + std::string WriteDetails = m_Options.PrimeCacheOnly ? "" + : fmt::format(" {}/{} ({}B/s) written{}", + NiceBytes(m_WrittenChunkByteCount.load()), + NiceBytes(BytesToWrite), + NiceNum(FilteredWrittenBytesPerSecond.GetCurrent()), + CloneDetails); + + std::string Details = fmt::format("{}/{} ({}{}) downloaded.{}", + m_DownloadStats.RequestsCompleteCount.load(), + TotalRequestCount, + NiceBytes(DownloadedBytes), + DownloadRateString, + WriteDetails); - WriteProgressBar.UpdateState( - {.Task = Task, - .Details = Details, - .TotalCount = m_Options.PrimeCacheOnly ? TotalRequestCount : (BytesToWrite + BytesToValidate), - .RemainingCount = m_Options.PrimeCacheOnly ? (TotalRequestCount - m_DownloadStats.RequestsCompleteCount.load()) - : ((BytesToWrite + BytesToValidate) - - (m_WrittenChunkByteCount.load() + m_ValidatedChunkByteCount.load())), - .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, - false); - }); - } + std::string Task; + if (m_Options.PrimeCacheOnly) + { + Task = "Downloading "; + } + else if (m_WrittenChunkByteCount < BytesToWrite) + { + Task = "Writing chunks "; + } + else + { + Task = "Verifying chunks "; + } - CloneQuery.reset(); + WriteProgressBar.UpdateState( + {.Task = Task, + .Details = Details, + .TotalCount = m_Options.PrimeCacheOnly ? TotalRequestCount : (BytesToWrite + BytesToValidate), + .RemainingCount = m_Options.PrimeCacheOnly ? (TotalRequestCount - m_DownloadStats.RequestsCompleteCount.load()) + : ((BytesToWrite + BytesToValidate) - + (m_WrittenChunkByteCount.load() + m_ValidatedChunkByteCount.load())), + .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, + false); + }); + } - FilteredWrittenBytesPerSecond.Stop(); - FilteredDownloadedBytesPerSecond.Stop(); + CloneQuery.reset(); - WriteProgressBar.Finish(); - if (m_AbortFlag) - { - return; - } + FilteredWrittenBytesPerSecond.Stop(); + FilteredDownloadedBytesPerSecond.Stop(); - if (!m_Options.PrimeCacheOnly) - { - ZEN_ASSERT(m_WrittenChunkByteCount == BytesToWrite); - ZEN_ASSERT(m_ValidatedChunkByteCount == BytesToValidate); + WriteProgressBar.Finish(); + if (m_AbortFlag) + { + return; + } - uint32_t RawSequencesMissingWriteCount = 0; - for (uint32_t SequenceIndex = 0; SequenceIndex < SequenceIndexChunksLeftToWriteCounters.size(); SequenceIndex++) + if (!m_Options.PrimeCacheOnly) { - const auto& SequenceIndexChunksLeftToWriteCounter = SequenceIndexChunksLeftToWriteCounters[SequenceIndex]; - if (SequenceIndexChunksLeftToWriteCounter.load() != 0) + ZEN_ASSERT(m_WrittenChunkByteCount == BytesToWrite); + ZEN_ASSERT(m_ValidatedChunkByteCount == BytesToValidate); + + uint32_t RawSequencesMissingWriteCount = 0; + for (uint32_t SequenceIndex = 0; SequenceIndex < SequenceIndexChunksLeftToWriteCounters.size(); SequenceIndex++) { - RawSequencesMissingWriteCount++; - const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; - const std::filesystem::path& IncompletePath = m_RemoteContent.Paths[PathIndex]; - ZEN_ASSERT(!IncompletePath.empty()); - const uint32_t ExpectedSequenceCount = m_RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]; - if (!m_Options.IsQuiet) + const auto& SequenceIndexChunksLeftToWriteCounter = SequenceIndexChunksLeftToWriteCounters[SequenceIndex]; + if (SequenceIndexChunksLeftToWriteCounter.load() != 0) { - LOG_OUTPUT(m_LogOutput, - "{}: Max count {}, Current count {}", - IncompletePath, - ExpectedSequenceCount, - SequenceIndexChunksLeftToWriteCounter.load()); + RawSequencesMissingWriteCount++; + const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; + const std::filesystem::path& IncompletePath = m_RemoteContent.Paths[PathIndex]; + ZEN_ASSERT(!IncompletePath.empty()); + const uint32_t ExpectedSequenceCount = m_RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]; + if (!m_Options.IsQuiet) + { + LOG_OUTPUT(m_LogOutput, + "{}: Max count {}, Current count {}", + IncompletePath, + ExpectedSequenceCount, + SequenceIndexChunksLeftToWriteCounter.load()); + } + ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounter.load() <= ExpectedSequenceCount); } - ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounter.load() <= ExpectedSequenceCount); } + ZEN_ASSERT(RawSequencesMissingWriteCount == 0); } - ZEN_ASSERT(RawSequencesMissingWriteCount == 0); - } - const uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() + m_DownloadStats.DownloadedBlockByteCount.load() + - m_DownloadStats.DownloadedPartialBlockByteCount.load(); - if (!m_Options.IsQuiet) - { - std::string CloneDetails; - if (m_DiskStats.CloneCount.load() > 0) + const uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() + + m_DownloadStats.DownloadedBlockByteCount.load() + + m_DownloadStats.DownloadedPartialBlockByteCount.load(); + if (!m_Options.IsQuiet) { - CloneDetails = fmt::format(" ({} cloned)", NiceBytes(m_DiskStats.CloneByteCount.load())); + std::string CloneDetails; + if (m_DiskStats.CloneCount.load() > 0) + { + CloneDetails = fmt::format(" ({} cloned)", NiceBytes(m_DiskStats.CloneByteCount.load())); + } + LOG_OUTPUT(m_LogOutput, + "Downloaded {} ({}bits/s) in {}. Wrote {} ({}B/s){} in {}. Completed in {}", + NiceBytes(DownloadedBytes), + NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), DownloadedBytes * 8)), + NiceTimeSpanMs(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS() / 1000), + NiceBytes(m_WrittenChunkByteCount.load()), + NiceNum(GetBytesPerSecond(FilteredWrittenBytesPerSecond.GetElapsedTimeUS(), m_DiskStats.WriteByteCount.load())), + CloneDetails, + NiceTimeSpanMs(FilteredWrittenBytesPerSecond.GetElapsedTimeUS() / 1000), + NiceTimeSpanMs(WriteTimer.GetElapsedTimeMs())); } - LOG_OUTPUT(m_LogOutput, - "Downloaded {} ({}bits/s) in {}. Wrote {} ({}B/s){} in {}. Completed in {}", - NiceBytes(DownloadedBytes), - NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), DownloadedBytes * 8)), - NiceTimeSpanMs(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS() / 1000), - NiceBytes(m_WrittenChunkByteCount.load()), - NiceNum(GetBytesPerSecond(FilteredWrittenBytesPerSecond.GetElapsedTimeUS(), m_DiskStats.WriteByteCount.load())), - CloneDetails, - NiceTimeSpanMs(FilteredWrittenBytesPerSecond.GetElapsedTimeUS() / 1000), - NiceTimeSpanMs(WriteTimer.GetElapsedTimeMs())); - } - - m_WriteChunkStats.WriteChunksElapsedWallTimeUs = WriteTimer.GetElapsedTimeUs(); - m_WriteChunkStats.DownloadTimeUs = FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(); - m_WriteChunkStats.WriteTimeUs = FilteredWrittenBytesPerSecond.GetElapsedTimeUS(); - } - if (m_Options.PrimeCacheOnly) - { - return; - } + m_WriteChunkStats.WriteChunksElapsedWallTimeUs = WriteTimer.GetElapsedTimeUs(); + m_WriteChunkStats.DownloadTimeUs = FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(); + m_WriteChunkStats.WriteTimeUs = FilteredWrittenBytesPerSecond.GetElapsedTimeUS(); + } - m_LogOutput.SetLogOperationProgress(TaskSteps::PrepareTarget, TaskSteps::StepCount); + if (m_Options.PrimeCacheOnly) + { + return; + } - tsl::robin_map<uint32_t, uint32_t> RemotePathIndexToLocalPathIndex; - RemotePathIndexToLocalPathIndex.reserve(m_RemoteContent.Paths.size()); + m_LogOutput.SetLogOperationProgress(TaskSteps::PrepareTarget, TaskSteps::StepCount); - tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> SequenceHashToLocalPathIndex; - std::vector<uint32_t> RemoveLocalPathIndexes; + tsl::robin_map<uint32_t, uint32_t> RemotePathIndexToLocalPathIndex; + RemotePathIndexToLocalPathIndex.reserve(m_RemoteContent.Paths.size()); - if (m_AbortFlag) - { - return; - } + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> SequenceHashToLocalPathIndex; + std::vector<uint32_t> RemoveLocalPathIndexes; - { - ZEN_TRACE_CPU("PrepareTarget"); - - tsl::robin_set<IoHash, IoHash::Hasher> CachedRemoteSequences; - tsl::robin_map<std::string, uint32_t> RemotePathToRemoteIndex; - RemotePathToRemoteIndex.reserve(m_RemoteContent.Paths.size()); - for (uint32_t RemotePathIndex = 0; RemotePathIndex < m_RemoteContent.Paths.size(); RemotePathIndex++) + if (m_AbortFlag) { - RemotePathToRemoteIndex.insert({m_RemoteContent.Paths[RemotePathIndex].generic_string(), RemotePathIndex}); + return; } - std::vector<uint32_t> FilesToCache; - - uint64_t MatchCount = 0; - uint64_t PathMismatchCount = 0; - uint64_t HashMismatchCount = 0; - std::atomic<uint64_t> CachedCount = 0; - std::atomic<uint64_t> CachedByteCount = 0; - uint64_t SkippedCount = 0; - uint64_t DeleteCount = 0; - for (uint32_t LocalPathIndex = 0; LocalPathIndex < m_LocalContent.Paths.size(); LocalPathIndex++) { - if (m_AbortFlag) + ZEN_TRACE_CPU("PrepareTarget"); + + tsl::robin_set<IoHash, IoHash::Hasher> CachedRemoteSequences; + tsl::robin_map<std::string, uint32_t> RemotePathToRemoteIndex; + RemotePathToRemoteIndex.reserve(m_RemoteContent.Paths.size()); + for (uint32_t RemotePathIndex = 0; RemotePathIndex < m_RemoteContent.Paths.size(); RemotePathIndex++) { - break; + RemotePathToRemoteIndex.insert({m_RemoteContent.Paths[RemotePathIndex].generic_string(), RemotePathIndex}); } - const IoHash& RawHash = m_LocalContent.RawHashes[LocalPathIndex]; - const std::filesystem::path& LocalPath = m_LocalContent.Paths[LocalPathIndex]; - ZEN_ASSERT_SLOW(IsFile((m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred())); + std::vector<uint32_t> FilesToCache; - if (m_Options.EnableTargetFolderScavenging) + uint64_t MatchCount = 0; + uint64_t PathMismatchCount = 0; + uint64_t HashMismatchCount = 0; + std::atomic<uint64_t> CachedCount = 0; + std::atomic<uint64_t> CachedByteCount = 0; + uint64_t SkippedCount = 0; + uint64_t DeleteCount = 0; + for (uint32_t LocalPathIndex = 0; LocalPathIndex < m_LocalContent.Paths.size(); LocalPathIndex++) { - if (!m_Options.WipeTargetFolder) + if (m_AbortFlag) + { + break; + } + const IoHash& RawHash = m_LocalContent.RawHashes[LocalPathIndex]; + const std::filesystem::path& LocalPath = m_LocalContent.Paths[LocalPathIndex]; + + ZEN_ASSERT_SLOW(IsFile((m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred())); + + if (m_Options.EnableTargetFolderScavenging) { - if (auto RemotePathIt = RemotePathToRemoteIndex.find(LocalPath.generic_string()); - RemotePathIt != RemotePathToRemoteIndex.end()) + if (!m_Options.WipeTargetFolder) { - const uint32_t RemotePathIndex = RemotePathIt->second; - if (m_RemoteContent.RawHashes[RemotePathIndex] == RawHash) + if (auto RemotePathIt = RemotePathToRemoteIndex.find(LocalPath.generic_string()); + RemotePathIt != RemotePathToRemoteIndex.end()) { - // It is already in it's desired place - RemotePathIndexToLocalPathIndex[RemotePathIndex] = LocalPathIndex; - SequenceHashToLocalPathIndex.insert({RawHash, LocalPathIndex}); - MatchCount++; - continue; + const uint32_t RemotePathIndex = RemotePathIt->second; + if (m_RemoteContent.RawHashes[RemotePathIndex] == RawHash) + { + // It is already in it's desired place + RemotePathIndexToLocalPathIndex[RemotePathIndex] = LocalPathIndex; + SequenceHashToLocalPathIndex.insert({RawHash, LocalPathIndex}); + MatchCount++; + continue; + } + else + { + HashMismatchCount++; + } } else { - HashMismatchCount++; + PathMismatchCount++; } } - else - { - PathMismatchCount++; - } - } - if (m_RemoteLookup.RawHashToSequenceIndex.contains(RawHash)) - { - if (!CachedRemoteSequences.contains(RawHash)) + if (m_RemoteLookup.RawHashToSequenceIndex.contains(RawHash)) { - ZEN_TRACE_CPU("MoveToCache"); - // We need it - FilesToCache.push_back(LocalPathIndex); - CachedRemoteSequences.insert(RawHash); + if (!CachedRemoteSequences.contains(RawHash)) + { + ZEN_TRACE_CPU("MoveToCache"); + // We need it + FilesToCache.push_back(LocalPathIndex); + CachedRemoteSequences.insert(RawHash); + } + else + { + // We already have it + SkippedCount++; + } } - else + else if (!m_Options.WipeTargetFolder) { - // We already have it - SkippedCount++; + // We don't need it + RemoveLocalPathIndexes.push_back(LocalPathIndex); + DeleteCount++; } } - else if (!m_Options.WipeTargetFolder) + else { - // We don't need it + // Delete local file as we did not scavenge the folder RemoveLocalPathIndexes.push_back(LocalPathIndex); DeleteCount++; } } - else + + if (m_AbortFlag) { - // Delete local file as we did not scavenge the folder - RemoveLocalPathIndexes.push_back(LocalPathIndex); - DeleteCount++; + return; } - } - if (m_AbortFlag) - { - return; - } - - { - ZEN_TRACE_CPU("CopyToCache"); + { + ZEN_TRACE_CPU("CopyToCache"); - Stopwatch Timer; + Stopwatch Timer; - std::unique_ptr<BuildOpLogOutput::ProgressBar> CacheLocalProgressBarPtr(m_LogOutput.CreateProgressBar("Cache Local Data")); - BuildOpLogOutput::ProgressBar& CacheLocalProgressBar(*CacheLocalProgressBarPtr); - ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + std::unique_ptr<BuildOpLogOutput::ProgressBar> CacheLocalProgressBarPtr(m_LogOutput.CreateProgressBar("Cache Local Data")); + BuildOpLogOutput::ProgressBar& CacheLocalProgressBar(*CacheLocalProgressBarPtr); + ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - for (uint32_t LocalPathIndex : FilesToCache) - { - if (m_AbortFlag) + for (uint32_t LocalPathIndex : FilesToCache) { - break; - } - Work.ScheduleWork(m_IOWorkerPool, [this, &CachedCount, &CachedByteCount, LocalPathIndex](std::atomic<bool>&) { - if (!m_AbortFlag) + if (m_AbortFlag) { - ZEN_TRACE_CPU("Async_CopyToCache"); + break; + } + Work.ScheduleWork(m_IOWorkerPool, [this, &CachedCount, &CachedByteCount, LocalPathIndex](std::atomic<bool>&) { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_CopyToCache"); - const IoHash& RawHash = m_LocalContent.RawHashes[LocalPathIndex]; - const std::filesystem::path& LocalPath = m_LocalContent.Paths[LocalPathIndex]; - const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RawHash); - ZEN_ASSERT_SLOW(!IsFileWithRetry(CacheFilePath)); - const std::filesystem::path LocalFilePath = (m_Path / LocalPath).make_preferred(); + const IoHash& RawHash = m_LocalContent.RawHashes[LocalPathIndex]; + const std::filesystem::path& LocalPath = m_LocalContent.Paths[LocalPathIndex]; + const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RawHash); + ZEN_ASSERT_SLOW(!IsFileWithRetry(CacheFilePath)); + const std::filesystem::path LocalFilePath = (m_Path / LocalPath).make_preferred(); - std::error_code Ec = RenameFileWithRetry(LocalFilePath, CacheFilePath); - if (Ec) - { - LOG_OUTPUT_WARN(m_LogOutput, - "Failed to move file from '{}' to '{}', reason: ({}) {}, retrying...", - LocalFilePath, - CacheFilePath, - Ec.value(), - Ec.message()); - Ec = RenameFileWithRetry(LocalFilePath, CacheFilePath); + std::error_code Ec = RenameFileWithRetry(LocalFilePath, CacheFilePath); if (Ec) { - throw std::system_error(std::error_code(Ec.value(), std::system_category()), - fmt::format("Failed to file from '{}' to '{}', reason: ({}) {}", - LocalFilePath, - CacheFilePath, - Ec.value(), - Ec.message())); + LOG_OUTPUT_WARN(m_LogOutput, + "Failed to move file from '{}' to '{}', reason: ({}) {}, retrying...", + LocalFilePath, + CacheFilePath, + Ec.value(), + Ec.message()); + Ec = RenameFileWithRetry(LocalFilePath, CacheFilePath); + if (Ec) + { + throw std::system_error(std::error_code(Ec.value(), std::system_category()), + fmt::format("Failed to file from '{}' to '{}', reason: ({}) {}", + LocalFilePath, + CacheFilePath, + Ec.value(), + Ec.message())); + } } + + CachedCount++; + CachedByteCount += m_LocalContent.RawSizes[LocalPathIndex]; } + }); + } - CachedCount++; - CachedByteCount += m_LocalContent.RawSizes[LocalPathIndex]; - } - }); - } + { + ZEN_TRACE_CPU("CopyToCache_Wait"); + + Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); + const uint64_t WorkTotal = FilesToCache.size(); + const uint64_t WorkComplete = CachedCount.load(); + std::string Details = fmt::format("{}/{} ({}) files", WorkComplete, WorkTotal, NiceBytes(CachedByteCount)); + CacheLocalProgressBar.UpdateState( + {.Task = "Caching local ", + .Details = Details, + .TotalCount = gsl::narrow<uint64_t>(WorkTotal), + .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete), + .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, + false); + }); + } - { - ZEN_TRACE_CPU("CopyToCache_Wait"); + CacheLocalProgressBar.Finish(); + if (m_AbortFlag) + { + return; + } - Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(PendingWork); - const uint64_t WorkTotal = FilesToCache.size(); - const uint64_t WorkComplete = CachedCount.load(); - std::string Details = fmt::format("{}/{} ({}) files", WorkComplete, WorkTotal, NiceBytes(CachedByteCount)); - CacheLocalProgressBar.UpdateState( - {.Task = "Caching local ", - .Details = Details, - .TotalCount = gsl::narrow<uint64_t>(WorkTotal), - .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete), - .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, - false); - }); + LOG_OUTPUT_DEBUG(m_LogOutput, + "Local state prep: Match: {}, PathMismatch: {}, HashMismatch: {}, Cached: {} ({}), Skipped: {}, " + "Delete: {}", + MatchCount, + PathMismatchCount, + HashMismatchCount, + CachedCount.load(), + NiceBytes(CachedByteCount.load()), + SkippedCount, + DeleteCount); } + } - CacheLocalProgressBar.Finish(); - if (m_AbortFlag) + m_LogOutput.SetLogOperationProgress(TaskSteps::FinalizeTarget, TaskSteps::StepCount); + + if (m_Options.WipeTargetFolder) + { + ZEN_TRACE_CPU("WipeTarget"); + Stopwatch Timer; + + // Clean target folder + if (!CleanDirectory(m_LogOutput, m_IOWorkerPool, m_AbortFlag, m_PauseFlag, m_Options.IsQuiet, m_Path, m_Options.ExcludeFolders)) { - return; + LOG_OUTPUT_WARN(m_LogOutput, "Some files in {} could not be removed", m_Path); } - - LOG_OUTPUT_DEBUG(m_LogOutput, - "Local state prep: Match: {}, PathMismatch: {}, HashMismatch: {}, Cached: {} ({}), Skipped: {}, " - "Delete: {}", - MatchCount, - PathMismatchCount, - HashMismatchCount, - CachedCount.load(), - NiceBytes(CachedByteCount.load()), - SkippedCount, - DeleteCount); + m_RebuildFolderStateStats.CleanFolderElapsedWallTimeUs = Timer.GetElapsedTimeUs(); } - } - m_LogOutput.SetLogOperationProgress(TaskSteps::FinalizeTarget, TaskSteps::StepCount); - - if (m_Options.WipeTargetFolder) - { - ZEN_TRACE_CPU("WipeTarget"); - Stopwatch Timer; - - // Clean target folder - if (!CleanDirectory(m_LogOutput, m_IOWorkerPool, m_AbortFlag, m_PauseFlag, m_Options.IsQuiet, m_Path, m_Options.ExcludeFolders)) + if (m_AbortFlag) { - LOG_OUTPUT_WARN(m_LogOutput, "Some files in {} could not be removed", m_Path); + return; } - m_RebuildFolderStateStats.CleanFolderElapsedWallTimeUs = Timer.GetElapsedTimeUs(); - } - - if (m_AbortFlag) - { - return; - } - { - ZEN_TRACE_CPU("FinalizeTree"); + { + ZEN_TRACE_CPU("FinalizeTree"); - Stopwatch Timer; + Stopwatch Timer; - std::unique_ptr<BuildOpLogOutput::ProgressBar> RebuildProgressBarPtr(m_LogOutput.CreateProgressBar("Rebuild State")); - BuildOpLogOutput::ProgressBar& RebuildProgressBar(*RebuildProgressBarPtr); - ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + std::unique_ptr<BuildOpLogOutput::ProgressBar> RebuildProgressBarPtr(m_LogOutput.CreateProgressBar("Rebuild State")); + BuildOpLogOutput::ProgressBar& RebuildProgressBar(*RebuildProgressBarPtr); + ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - OutLocalFolderState.Paths.resize(m_RemoteContent.Paths.size()); - OutLocalFolderState.RawSizes.resize(m_RemoteContent.Paths.size()); - OutLocalFolderState.Attributes.resize(m_RemoteContent.Paths.size()); - OutLocalFolderState.ModificationTicks.resize(m_RemoteContent.Paths.size()); + OutLocalFolderState.Paths.resize(m_RemoteContent.Paths.size()); + OutLocalFolderState.RawSizes.resize(m_RemoteContent.Paths.size()); + OutLocalFolderState.Attributes.resize(m_RemoteContent.Paths.size()); + OutLocalFolderState.ModificationTicks.resize(m_RemoteContent.Paths.size()); - std::atomic<uint64_t> DeletedCount = 0; + std::atomic<uint64_t> DeletedCount = 0; - for (uint32_t LocalPathIndex : RemoveLocalPathIndexes) - { - if (m_AbortFlag) + for (uint32_t LocalPathIndex : RemoveLocalPathIndexes) { - break; - } - Work.ScheduleWork(m_IOWorkerPool, [this, &DeletedCount, LocalPathIndex](std::atomic<bool>&) { - if (!m_AbortFlag) + if (m_AbortFlag) { - ZEN_TRACE_CPU("Async_RemoveFile"); - - const std::filesystem::path LocalFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred(); - SetFileReadOnlyWithRetry(LocalFilePath, false); - RemoveFileWithRetry(LocalFilePath); - DeletedCount++; + break; } - }); - } + Work.ScheduleWork(m_IOWorkerPool, [this, &DeletedCount, LocalPathIndex](std::atomic<bool>&) { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_RemoveFile"); - std::atomic<uint64_t> TargetsComplete = 0; + const std::filesystem::path LocalFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred(); + SetFileReadOnlyWithRetry(LocalFilePath, false); + RemoveFileWithRetry(LocalFilePath); + DeletedCount++; + } + }); + } - struct FinalizeTarget - { - IoHash RawHash; - uint32_t RemotePathIndex; - }; + std::atomic<uint64_t> TargetsComplete = 0; - std::vector<FinalizeTarget> Targets; - Targets.reserve(m_RemoteContent.Paths.size()); - for (uint32_t RemotePathIndex = 0; RemotePathIndex < m_RemoteContent.Paths.size(); RemotePathIndex++) - { - Targets.push_back(FinalizeTarget{.RawHash = m_RemoteContent.RawHashes[RemotePathIndex], .RemotePathIndex = RemotePathIndex}); - } - std::sort(Targets.begin(), Targets.end(), [](const FinalizeTarget& Lhs, const FinalizeTarget& Rhs) { - if (Lhs.RawHash < Rhs.RawHash) - { - return true; - } - else if (Lhs.RawHash > Rhs.RawHash) + struct FinalizeTarget { - return false; - } - return Lhs.RemotePathIndex < Rhs.RemotePathIndex; - }); + IoHash RawHash; + uint32_t RemotePathIndex; + }; - size_t TargetOffset = 0; - while (TargetOffset < Targets.size()) - { - if (m_AbortFlag) + std::vector<FinalizeTarget> Targets; + Targets.reserve(m_RemoteContent.Paths.size()); + for (uint32_t RemotePathIndex = 0; RemotePathIndex < m_RemoteContent.Paths.size(); RemotePathIndex++) { - break; + Targets.push_back( + FinalizeTarget{.RawHash = m_RemoteContent.RawHashes[RemotePathIndex], .RemotePathIndex = RemotePathIndex}); } + std::sort(Targets.begin(), Targets.end(), [](const FinalizeTarget& Lhs, const FinalizeTarget& Rhs) { + if (Lhs.RawHash < Rhs.RawHash) + { + return true; + } + else if (Lhs.RawHash > Rhs.RawHash) + { + return false; + } + return Lhs.RemotePathIndex < Rhs.RemotePathIndex; + }); - size_t TargetCount = 1; - while ((TargetOffset + TargetCount) < Targets.size() && - (Targets[TargetOffset + TargetCount].RawHash == Targets[TargetOffset].RawHash)) + size_t TargetOffset = 0; + while (TargetOffset < Targets.size()) { - TargetCount++; - } - - Work.ScheduleWork( - m_IOWorkerPool, - [this, - &SequenceHashToLocalPathIndex, - &Targets, - &RemotePathIndexToLocalPathIndex, - &OutLocalFolderState, - BaseTargetOffset = TargetOffset, - TargetCount, - &TargetsComplete](std::atomic<bool>&) { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Async_FinalizeChunkSequence"); + if (m_AbortFlag) + { + break; + } - size_t TargetOffset = BaseTargetOffset; - const IoHash& RawHash = Targets[TargetOffset].RawHash; + size_t TargetCount = 1; + while ((TargetOffset + TargetCount) < Targets.size() && + (Targets[TargetOffset + TargetCount].RawHash == Targets[TargetOffset].RawHash)) + { + TargetCount++; + } - if (RawHash == IoHash::Zero) + Work.ScheduleWork( + m_IOWorkerPool, + [this, + &SequenceHashToLocalPathIndex, + &Targets, + &RemotePathIndexToLocalPathIndex, + &OutLocalFolderState, + BaseTargetOffset = TargetOffset, + TargetCount, + &TargetsComplete](std::atomic<bool>&) { + if (!m_AbortFlag) { - ZEN_TRACE_CPU("CreateEmptyFiles"); - while (TargetOffset < (BaseTargetOffset + TargetCount)) + ZEN_TRACE_CPU("Async_FinalizeChunkSequence"); + + size_t TargetOffset = BaseTargetOffset; + const IoHash& RawHash = Targets[TargetOffset].RawHash; + + if (RawHash == IoHash::Zero) { - const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex; - ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash); - const std::filesystem::path& TargetPath = m_RemoteContent.Paths[RemotePathIndex]; - std::filesystem::path TargetFilePath = (m_Path / TargetPath).make_preferred(); - if (!RemotePathIndexToLocalPathIndex[RemotePathIndex]) + ZEN_TRACE_CPU("CreateEmptyFiles"); + while (TargetOffset < (BaseTargetOffset + TargetCount)) { - if (IsFileWithRetry(TargetFilePath)) - { - SetFileReadOnlyWithRetry(TargetFilePath, false); - } - else + const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex; + ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash); + const std::filesystem::path& TargetPath = m_RemoteContent.Paths[RemotePathIndex]; + std::filesystem::path TargetFilePath = (m_Path / TargetPath).make_preferred(); + if (!RemotePathIndexToLocalPathIndex[RemotePathIndex]) { - CreateDirectories(TargetFilePath.parent_path()); + if (IsFileWithRetry(TargetFilePath)) + { + SetFileReadOnlyWithRetry(TargetFilePath, false); + } + else + { + CreateDirectories(TargetFilePath.parent_path()); + } + BasicFile OutputFile; + OutputFile.Open(TargetFilePath, BasicFile::Mode::kTruncate); } - BasicFile OutputFile; - OutputFile.Open(TargetFilePath, BasicFile::Mode::kTruncate); + OutLocalFolderState.Paths[RemotePathIndex] = TargetPath; + OutLocalFolderState.RawSizes[RemotePathIndex] = m_RemoteContent.RawSizes[RemotePathIndex]; + + OutLocalFolderState.Attributes[RemotePathIndex] = + m_RemoteContent.Attributes.empty() + ? GetNativeFileAttributes(TargetFilePath) + : SetNativeFileAttributes(TargetFilePath, + m_RemoteContent.Platform, + m_RemoteContent.Attributes[RemotePathIndex]); + OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath); + + TargetOffset++; + TargetsComplete++; } - OutLocalFolderState.Paths[RemotePathIndex] = TargetPath; - OutLocalFolderState.RawSizes[RemotePathIndex] = m_RemoteContent.RawSizes[RemotePathIndex]; - - OutLocalFolderState.Attributes[RemotePathIndex] = - m_RemoteContent.Attributes.empty() - ? GetNativeFileAttributes(TargetFilePath) - : SetNativeFileAttributes(TargetFilePath, - m_RemoteContent.Platform, - m_RemoteContent.Attributes[RemotePathIndex]); - OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath); - - TargetOffset++; - TargetsComplete++; - } - } - else - { - ZEN_TRACE_CPU("FinalizeFile"); - ZEN_ASSERT(m_RemoteLookup.RawHashToSequenceIndex.contains(RawHash)); - const uint32_t FirstRemotePathIndex = Targets[TargetOffset].RemotePathIndex; - const std::filesystem::path& FirstTargetPath = m_RemoteContent.Paths[FirstRemotePathIndex]; - std::filesystem::path FirstTargetFilePath = (m_Path / FirstTargetPath).make_preferred(); - - if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(FirstRemotePathIndex); - InPlaceIt != RemotePathIndexToLocalPathIndex.end()) - { - ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath)); } else { - if (IsFileWithRetry(FirstTargetFilePath)) - { - SetFileReadOnlyWithRetry(FirstTargetFilePath, false); - } - else - { - CreateDirectories(FirstTargetFilePath.parent_path()); - } + ZEN_TRACE_CPU("FinalizeFile"); + ZEN_ASSERT(m_RemoteLookup.RawHashToSequenceIndex.contains(RawHash)); + const uint32_t FirstRemotePathIndex = Targets[TargetOffset].RemotePathIndex; + const std::filesystem::path& FirstTargetPath = m_RemoteContent.Paths[FirstRemotePathIndex]; + std::filesystem::path FirstTargetFilePath = (m_Path / FirstTargetPath).make_preferred(); - if (auto InplaceIt = SequenceHashToLocalPathIndex.find(RawHash); - InplaceIt != SequenceHashToLocalPathIndex.end()) + if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(FirstRemotePathIndex); + InPlaceIt != RemotePathIndexToLocalPathIndex.end()) { - ZEN_TRACE_CPU("Copy"); - const uint32_t LocalPathIndex = InplaceIt->second; - const std::filesystem::path& SourcePath = m_LocalContent.Paths[LocalPathIndex]; - std::filesystem::path SourceFilePath = (m_Path / SourcePath).make_preferred(); - ZEN_ASSERT_SLOW(IsFileWithRetry(SourceFilePath)); - - LOG_OUTPUT_DEBUG(m_LogOutput, "Copying from '{}' -> '{}'", SourceFilePath, FirstTargetFilePath); - const uint64_t RawSize = m_LocalContent.RawSizes[LocalPathIndex]; - FastCopyFile(m_Options.AllowFileClone, - m_Options.UseSparseFiles, - SourceFilePath, - FirstTargetFilePath, - RawSize, - m_DiskStats.WriteCount, - m_DiskStats.WriteByteCount, - m_DiskStats.CloneCount, - m_DiskStats.CloneByteCount); - - m_RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++; + ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath)); } else { - ZEN_TRACE_CPU("Rename"); - const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RawHash); - ZEN_ASSERT_SLOW(IsFileWithRetry(CacheFilePath)); + if (IsFileWithRetry(FirstTargetFilePath)) + { + SetFileReadOnlyWithRetry(FirstTargetFilePath, false); + } + else + { + CreateDirectories(FirstTargetFilePath.parent_path()); + } - std::error_code Ec = RenameFileWithRetry(CacheFilePath, FirstTargetFilePath); - if (Ec) + if (auto InplaceIt = SequenceHashToLocalPathIndex.find(RawHash); + InplaceIt != SequenceHashToLocalPathIndex.end()) + { + ZEN_TRACE_CPU("Copy"); + const uint32_t LocalPathIndex = InplaceIt->second; + const std::filesystem::path& SourcePath = m_LocalContent.Paths[LocalPathIndex]; + std::filesystem::path SourceFilePath = (m_Path / SourcePath).make_preferred(); + ZEN_ASSERT_SLOW(IsFileWithRetry(SourceFilePath)); + + LOG_OUTPUT_DEBUG(m_LogOutput, "Copying from '{}' -> '{}'", SourceFilePath, FirstTargetFilePath); + const uint64_t RawSize = m_LocalContent.RawSizes[LocalPathIndex]; + FastCopyFile(m_Options.AllowFileClone, + m_Options.UseSparseFiles, + SourceFilePath, + FirstTargetFilePath, + RawSize, + m_DiskStats.WriteCount, + m_DiskStats.WriteByteCount, + m_DiskStats.CloneCount, + m_DiskStats.CloneByteCount); + + m_RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++; + } + else { - LOG_OUTPUT_WARN(m_LogOutput, - "Failed to move file from '{}' to '{}', reason: ({}) {}, retrying...", - CacheFilePath, - FirstTargetFilePath, - Ec.value(), - Ec.message()); - Ec = RenameFileWithRetry(CacheFilePath, FirstTargetFilePath); + ZEN_TRACE_CPU("Rename"); + const std::filesystem::path CacheFilePath = + GetFinalChunkedSequenceFileName(m_CacheFolderPath, RawHash); + ZEN_ASSERT_SLOW(IsFileWithRetry(CacheFilePath)); + + std::error_code Ec = RenameFileWithRetry(CacheFilePath, FirstTargetFilePath); if (Ec) { - throw std::system_error(std::error_code(Ec.value(), std::system_category()), - fmt::format("Failed to move file from '{}' to '{}', reason: ({}) {}", - CacheFilePath, - FirstTargetFilePath, - Ec.value(), - Ec.message())); + LOG_OUTPUT_WARN(m_LogOutput, + "Failed to move file from '{}' to '{}', reason: ({}) {}, retrying...", + CacheFilePath, + FirstTargetFilePath, + Ec.value(), + Ec.message()); + Ec = RenameFileWithRetry(CacheFilePath, FirstTargetFilePath); + if (Ec) + { + throw std::system_error( + std::error_code(Ec.value(), std::system_category()), + fmt::format("Failed to move file from '{}' to '{}', reason: ({}) {}", + CacheFilePath, + FirstTargetFilePath, + Ec.value(), + Ec.message())); + } } - } - m_RebuildFolderStateStats.FinalizeTreeFilesMovedCount++; + m_RebuildFolderStateStats.FinalizeTreeFilesMovedCount++; + } } - } - OutLocalFolderState.Paths[FirstRemotePathIndex] = FirstTargetPath; - OutLocalFolderState.RawSizes[FirstRemotePathIndex] = m_RemoteContent.RawSizes[FirstRemotePathIndex]; + OutLocalFolderState.Paths[FirstRemotePathIndex] = FirstTargetPath; + OutLocalFolderState.RawSizes[FirstRemotePathIndex] = m_RemoteContent.RawSizes[FirstRemotePathIndex]; - OutLocalFolderState.Attributes[FirstRemotePathIndex] = - m_RemoteContent.Attributes.empty() - ? GetNativeFileAttributes(FirstTargetFilePath) - : SetNativeFileAttributes(FirstTargetFilePath, - m_RemoteContent.Platform, - m_RemoteContent.Attributes[FirstRemotePathIndex]); - OutLocalFolderState.ModificationTicks[FirstRemotePathIndex] = GetModificationTickFromPath(FirstTargetFilePath); - - TargetOffset++; - TargetsComplete++; + OutLocalFolderState.Attributes[FirstRemotePathIndex] = + m_RemoteContent.Attributes.empty() + ? GetNativeFileAttributes(FirstTargetFilePath) + : SetNativeFileAttributes(FirstTargetFilePath, + m_RemoteContent.Platform, + m_RemoteContent.Attributes[FirstRemotePathIndex]); + OutLocalFolderState.ModificationTicks[FirstRemotePathIndex] = + GetModificationTickFromPath(FirstTargetFilePath); - while (TargetOffset < (BaseTargetOffset + TargetCount)) - { - const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex; - ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash); - const std::filesystem::path& TargetPath = m_RemoteContent.Paths[RemotePathIndex]; - std::filesystem::path TargetFilePath = (m_Path / TargetPath).make_preferred(); + TargetOffset++; + TargetsComplete++; - if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(RemotePathIndex); - InPlaceIt != RemotePathIndexToLocalPathIndex.end()) - { - ZEN_ASSERT_SLOW(IsFileWithRetry(TargetFilePath)); - } - else + while (TargetOffset < (BaseTargetOffset + TargetCount)) { - ZEN_TRACE_CPU("Copy"); - if (IsFileWithRetry(TargetFilePath)) + const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex; + ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash); + const std::filesystem::path& TargetPath = m_RemoteContent.Paths[RemotePathIndex]; + std::filesystem::path TargetFilePath = (m_Path / TargetPath).make_preferred(); + + if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(RemotePathIndex); + InPlaceIt != RemotePathIndexToLocalPathIndex.end()) { - SetFileReadOnlyWithRetry(TargetFilePath, false); + ZEN_ASSERT_SLOW(IsFileWithRetry(TargetFilePath)); } else { - CreateDirectories(TargetFilePath.parent_path()); - } + ZEN_TRACE_CPU("Copy"); + if (IsFileWithRetry(TargetFilePath)) + { + SetFileReadOnlyWithRetry(TargetFilePath, false); + } + else + { + CreateDirectories(TargetFilePath.parent_path()); + } - ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath)); - LOG_OUTPUT_DEBUG(m_LogOutput, "Copying from '{}' -> '{}'", FirstTargetFilePath, TargetFilePath); - const uint64_t RawSize = m_RemoteContent.RawSizes[RemotePathIndex]; - FastCopyFile(m_Options.AllowFileClone, - m_Options.UseSparseFiles, - FirstTargetFilePath, - TargetFilePath, - RawSize, - m_DiskStats.WriteCount, - m_DiskStats.WriteByteCount, - m_DiskStats.CloneCount, - m_DiskStats.CloneByteCount); - - m_RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++; - } + ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath)); + LOG_OUTPUT_DEBUG(m_LogOutput, "Copying from '{}' -> '{}'", FirstTargetFilePath, TargetFilePath); + const uint64_t RawSize = m_RemoteContent.RawSizes[RemotePathIndex]; + FastCopyFile(m_Options.AllowFileClone, + m_Options.UseSparseFiles, + FirstTargetFilePath, + TargetFilePath, + RawSize, + m_DiskStats.WriteCount, + m_DiskStats.WriteByteCount, + m_DiskStats.CloneCount, + m_DiskStats.CloneByteCount); + + m_RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++; + } - OutLocalFolderState.Paths[RemotePathIndex] = TargetPath; - OutLocalFolderState.RawSizes[RemotePathIndex] = m_RemoteContent.RawSizes[RemotePathIndex]; + OutLocalFolderState.Paths[RemotePathIndex] = TargetPath; + OutLocalFolderState.RawSizes[RemotePathIndex] = m_RemoteContent.RawSizes[RemotePathIndex]; - OutLocalFolderState.Attributes[RemotePathIndex] = - m_RemoteContent.Attributes.empty() - ? GetNativeFileAttributes(TargetFilePath) - : SetNativeFileAttributes(TargetFilePath, - m_RemoteContent.Platform, - m_RemoteContent.Attributes[RemotePathIndex]); - OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath); + OutLocalFolderState.Attributes[RemotePathIndex] = + m_RemoteContent.Attributes.empty() + ? GetNativeFileAttributes(TargetFilePath) + : SetNativeFileAttributes(TargetFilePath, + m_RemoteContent.Platform, + m_RemoteContent.Attributes[RemotePathIndex]); + OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath); - TargetOffset++; - TargetsComplete++; + TargetOffset++; + TargetsComplete++; + } } } - } - }); + }); - TargetOffset += TargetCount; - } + TargetOffset += TargetCount; + } - { - ZEN_TRACE_CPU("FinalizeTree_Wait"); + { + ZEN_TRACE_CPU("FinalizeTree_Wait"); - Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(PendingWork); - const uint64_t WorkTotal = Targets.size() + RemoveLocalPathIndexes.size(); - const uint64_t WorkComplete = TargetsComplete.load() + DeletedCount.load(); - std::string Details = fmt::format("{}/{} files", WorkComplete, WorkTotal); - RebuildProgressBar.UpdateState({.Task = "Rebuilding state ", - .Details = Details, - .TotalCount = gsl::narrow<uint64_t>(WorkTotal), - .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete), - .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, - false); - }); - } + Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); + const uint64_t WorkTotal = Targets.size() + RemoveLocalPathIndexes.size(); + const uint64_t WorkComplete = TargetsComplete.load() + DeletedCount.load(); + std::string Details = fmt::format("{}/{} files", WorkComplete, WorkTotal); + RebuildProgressBar.UpdateState({.Task = "Rebuilding state ", + .Details = Details, + .TotalCount = gsl::narrow<uint64_t>(WorkTotal), + .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete), + .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, + false); + }); + } - m_RebuildFolderStateStats.FinalizeTreeElapsedWallTimeUs = Timer.GetElapsedTimeUs(); - RebuildProgressBar.Finish(); + m_RebuildFolderStateStats.FinalizeTreeElapsedWallTimeUs = Timer.GetElapsedTimeUs(); + RebuildProgressBar.Finish(); + } + m_LogOutput.SetLogOperationProgress(TaskSteps::Cleanup, TaskSteps::StepCount); + } + catch (const std::exception&) + { + m_AbortFlag = true; + throw; } - m_LogOutput.SetLogOperationProgress(TaskSteps::Cleanup, TaskSteps::StepCount); } void @@ -3152,7 +3082,10 @@ BuildsOperationUpdateFolder::WriteScavengedSequenceToCache(const std::filesystem RenameFile(TempFilePath, CacheFilePath); m_WrittenChunkByteCount += RawSize; - m_ValidatedChunkByteCount += RawSize; + if (m_Options.ValidateCompletedSequences) + { + m_ValidatedChunkByteCount += RawSize; + } } void @@ -3971,16 +3904,7 @@ BuildsOperationUpdateFolder::WriteCompressedChunkToCache( { const std::uint32_t SequenceIndex = ChunkTargetPtrs.front()->SequenceIndex; const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]; - StreamDecompress(m_AbortFlag, - m_CacheFolderPath, - SequenceRawHash, - CompositeBuffer(std::move(CompressedPart)), - m_DiskStats.ReadCount, - m_DiskStats.ReadByteCount, - m_DiskStats.WriteCount, - m_DiskStats.WriteByteCount, - m_WrittenChunkByteCount, - m_ValidatedChunkByteCount); + StreamDecompress(SequenceRawHash, CompositeBuffer(std::move(CompressedPart))); return false; } else @@ -4039,6 +3963,92 @@ BuildsOperationUpdateFolder::WriteCompressedChunkToCache( } void +BuildsOperationUpdateFolder::StreamDecompress(const IoHash& SequenceRawHash, CompositeBuffer&& CompressedPart) +{ + ZEN_TRACE_CPU("StreamDecompress"); + const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash); + TemporaryFile DecompressedTemp; + std::error_code Ec; + DecompressedTemp.CreateTemporary(TempChunkSequenceFileName.parent_path(), Ec); + if (Ec) + { + throw std::runtime_error(fmt::format("Failed creating temporary file for decompressing large blob {}, reason: ({}) {}", + SequenceRawHash, + Ec.value(), + Ec.message())); + } + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompressedPart, RawHash, RawSize); + if (!Compressed) + { + throw std::runtime_error(fmt::format("Failed to parse header of compressed large blob {}", SequenceRawHash)); + } + if (RawHash != SequenceRawHash) + { + throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, SequenceRawHash)); + } + PrepareFileForScatteredWrite(DecompressedTemp.Handle(), RawSize); + + IoHashStream Hash; + bool CouldDecompress = + Compressed.DecompressToStream(0, + (uint64_t)-1, + [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { + ZEN_UNUSED(SourceOffset); + ZEN_TRACE_CPU("StreamDecompress_Write"); + m_DiskStats.ReadCount++; + m_DiskStats.ReadByteCount += SourceSize; + if (!m_AbortFlag) + { + for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) + { + if (m_Options.ValidateCompletedSequences) + { + Hash.Append(Segment.GetView()); + m_ValidatedChunkByteCount += Segment.GetSize(); + } + DecompressedTemp.Write(Segment, Offset); + Offset += Segment.GetSize(); + m_DiskStats.WriteByteCount += Segment.GetSize(); + m_DiskStats.WriteCount++; + m_WrittenChunkByteCount += Segment.GetSize(); + } + return true; + } + return false; + }); + + if (m_AbortFlag) + { + return; + } + + if (!CouldDecompress) + { + throw std::runtime_error(fmt::format("Failed to decompress large blob {}", SequenceRawHash)); + } + if (m_Options.ValidateCompletedSequences) + { + const IoHash VerifyHash = Hash.GetHash(); + if (VerifyHash != SequenceRawHash) + { + throw std::runtime_error( + fmt::format("Decompressed blob payload hash {} does not match expected hash {}", VerifyHash, SequenceRawHash)); + } + } + DecompressedTemp.MoveTemporaryIntoPlace(TempChunkSequenceFileName, Ec); + if (Ec) + { + throw std::runtime_error(fmt::format("Failed moving temporary file for decompressing large blob {}, reason: ({}) {}", + SequenceRawHash, + Ec.value(), + Ec.message())); + } + // WriteChunkStats.ChunkCountWritten++; +} + +void BuildsOperationUpdateFolder::WriteSequenceChunkToCache(BufferedWriteFileCache::Local& LocalWriter, const CompositeBuffer& Chunk, const uint32_t SequenceIndex, @@ -4458,28 +4468,40 @@ BuildsOperationUpdateFolder::VerifyAndCompleteChunkSequencesAsync(std::span<cons return; } ZEN_TRACE_CPU("VerifyAndCompleteChunkSequence"); - for (uint32_t RemoteSequenceIndexOffset = 1; RemoteSequenceIndexOffset < RemoteSequenceIndexes.size(); RemoteSequenceIndexOffset++) + if (m_Options.ValidateCompletedSequences) { - const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset]; - Work.ScheduleWork(m_IOWorkerPool, [this, RemoteSequenceIndex](std::atomic<bool>&) { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Async_VerifyAndFinalizeSequence"); - - VerifySequence(RemoteSequenceIndex); + for (uint32_t RemoteSequenceIndexOffset = 1; RemoteSequenceIndexOffset < RemoteSequenceIndexes.size(); RemoteSequenceIndexOffset++) + { + const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset]; + Work.ScheduleWork(m_IOWorkerPool, [this, RemoteSequenceIndex](std::atomic<bool>&) { if (!m_AbortFlag) { - const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; - FinalizeChunkSequence(SequenceRawHash); + ZEN_TRACE_CPU("Async_VerifyAndFinalizeSequence"); + + VerifySequence(RemoteSequenceIndex); + if (!m_AbortFlag) + { + const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + FinalizeChunkSequence(SequenceRawHash); + } } - } - }); - } - const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[0]; + }); + } + const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[0]; - VerifySequence(RemoteSequenceIndex); - const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; - FinalizeChunkSequence(SequenceRawHash); + VerifySequence(RemoteSequenceIndex); + const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + FinalizeChunkSequence(SequenceRawHash); + } + else + { + for (uint32_t RemoteSequenceIndexOffset = 0; RemoteSequenceIndexOffset < RemoteSequenceIndexes.size(); RemoteSequenceIndexOffset++) + { + const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset]; + const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + FinalizeChunkSequence(SequenceRawHash); + } + } } bool @@ -4542,6 +4564,8 @@ BuildsOperationUpdateFolder::VerifySequence(uint32_t RemoteSequenceIndex) { ZEN_TRACE_CPU("VerifySequence"); + ZEN_ASSERT(m_Options.ValidateCompletedSequences); + const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; { ZEN_TRACE_CPU("HashSequence"); @@ -4607,763 +4631,773 @@ BuildsOperationUploadFolder::BuildsOperationUploadFolder(BuildOpLogOutput& L void BuildsOperationUploadFolder::Execute() { - enum TaskSteps : uint32_t - { - PrepareBuild, - CalculateDelta, - GenerateBlocks, - BuildPartManifest, - Upload, - FinalizeBuild, - Cleanup, - StepCount - }; + ZEN_TRACE_CPU("BuildsOperationUploadFolder::Execute"); + try + { + enum TaskSteps : uint32_t + { + PrepareBuild, + CalculateDelta, + GenerateBlocks, + BuildPartManifest, + Upload, + FinalizeBuild, + Cleanup, + StepCount + }; - auto EndProgress = MakeGuard([&]() { m_LogOutput.SetLogOperationProgress(TaskSteps::StepCount, TaskSteps::StepCount); }); + auto EndProgress = MakeGuard([&]() { m_LogOutput.SetLogOperationProgress(TaskSteps::StepCount, TaskSteps::StepCount); }); - Stopwatch ProcessTimer; + Stopwatch ProcessTimer; - CreateDirectories(m_Options.TempDir); - CleanDirectory(m_Options.TempDir, {}); - auto _ = MakeGuard([&]() { - if (CleanDirectory(m_Options.TempDir, {})) - { - std::error_code DummyEc; - RemoveDir(m_Options.TempDir, DummyEc); - } - }); - - m_LogOutput.SetLogOperationProgress(TaskSteps::PrepareBuild, TaskSteps::StepCount); + CreateDirectories(m_Options.TempDir); + CleanDirectory(m_Options.TempDir, {}); + auto _ = MakeGuard([&]() { + if (CleanDirectory(m_Options.TempDir, {})) + { + std::error_code DummyEc; + RemoveDir(m_Options.TempDir, DummyEc); + } + }); - std::uint64_t TotalRawSize = 0; + m_LogOutput.SetLogOperationProgress(TaskSteps::PrepareBuild, TaskSteps::StepCount); - CbObject ChunkerParameters; + std::uint64_t TotalRawSize = 0; - struct PrepareBuildResult - { - std::vector<ChunkBlockDescription> KnownBlocks; - uint64_t PreferredMultipartChunkSize = 0; - uint64_t PayloadSize = 0; - uint64_t PrepareBuildTimeMs = 0; - uint64_t FindBlocksTimeMs = 0; - uint64_t ElapsedTimeMs = 0; - }; + CbObject ChunkerParameters; - std::future<PrepareBuildResult> PrepBuildResultFuture = m_NetworkPool.EnqueueTask( - std::packaged_task<PrepareBuildResult()>{[this] { - ZEN_TRACE_CPU("PrepareBuild"); + struct PrepareBuildResult + { + std::vector<ChunkBlockDescription> KnownBlocks; + uint64_t PreferredMultipartChunkSize = 0; + uint64_t PayloadSize = 0; + uint64_t PrepareBuildTimeMs = 0; + uint64_t FindBlocksTimeMs = 0; + uint64_t ElapsedTimeMs = 0; + }; - PrepareBuildResult Result; - Result.PreferredMultipartChunkSize = m_Options.PreferredMultipartChunkSize; - Stopwatch Timer; - if (m_CreateBuild) - { - ZEN_TRACE_CPU("CreateBuild"); + std::future<PrepareBuildResult> PrepBuildResultFuture = m_NetworkPool.EnqueueTask( + std::packaged_task<PrepareBuildResult()>{[this] { + ZEN_TRACE_CPU("PrepareBuild"); - Stopwatch PutBuildTimer; - CbObject PutBuildResult = m_Storage.BuildStorage->PutBuild(m_BuildId, m_MetaData); - Result.PrepareBuildTimeMs = PutBuildTimer.GetElapsedTimeMs(); - Result.PreferredMultipartChunkSize = PutBuildResult["chunkSize"sv].AsUInt64(Result.PreferredMultipartChunkSize); - Result.PayloadSize = m_MetaData.GetSize(); - } - else - { - ZEN_TRACE_CPU("PutBuild"); - Stopwatch GetBuildTimer; - CbObject Build = m_Storage.BuildStorage->GetBuild(m_BuildId); - Result.PrepareBuildTimeMs = GetBuildTimer.GetElapsedTimeMs(); - Result.PayloadSize = Build.GetSize(); - if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0) + PrepareBuildResult Result; + Result.PreferredMultipartChunkSize = m_Options.PreferredMultipartChunkSize; + Stopwatch Timer; + if (m_CreateBuild) { - Result.PreferredMultipartChunkSize = ChunkSize; + ZEN_TRACE_CPU("CreateBuild"); + + Stopwatch PutBuildTimer; + CbObject PutBuildResult = m_Storage.BuildStorage->PutBuild(m_BuildId, m_MetaData); + Result.PrepareBuildTimeMs = PutBuildTimer.GetElapsedTimeMs(); + Result.PreferredMultipartChunkSize = PutBuildResult["chunkSize"sv].AsUInt64(Result.PreferredMultipartChunkSize); + Result.PayloadSize = m_MetaData.GetSize(); } - else if (m_Options.AllowMultiparts) + else { - LOG_OUTPUT_WARN(m_LogOutput, - "PreferredMultipartChunkSize is unknown. Defaulting to '{}'", - NiceBytes(Result.PreferredMultipartChunkSize)); + ZEN_TRACE_CPU("PutBuild"); + Stopwatch GetBuildTimer; + CbObject Build = m_Storage.BuildStorage->GetBuild(m_BuildId); + Result.PrepareBuildTimeMs = GetBuildTimer.GetElapsedTimeMs(); + Result.PayloadSize = Build.GetSize(); + if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0) + { + Result.PreferredMultipartChunkSize = ChunkSize; + } + else if (m_Options.AllowMultiparts) + { + LOG_OUTPUT_WARN(m_LogOutput, + "PreferredMultipartChunkSize is unknown. Defaulting to '{}'", + NiceBytes(Result.PreferredMultipartChunkSize)); + } } - } - if (!m_Options.IgnoreExistingBlocks) - { - ZEN_TRACE_CPU("FindBlocks"); - Stopwatch KnownBlocksTimer; - CbObject BlockDescriptionList = m_Storage.BuildStorage->FindBlocks(m_BuildId, m_Options.FindBlockMaxCount); - if (BlockDescriptionList) + if (!m_Options.IgnoreExistingBlocks) { - Result.KnownBlocks = ParseChunkBlockDescriptionList(BlockDescriptionList); + ZEN_TRACE_CPU("FindBlocks"); + Stopwatch KnownBlocksTimer; + CbObject BlockDescriptionList = m_Storage.BuildStorage->FindBlocks(m_BuildId, m_Options.FindBlockMaxCount); + if (BlockDescriptionList) + { + Result.KnownBlocks = ParseChunkBlockDescriptionList(BlockDescriptionList); + } + m_FindBlocksStats.FindBlockTimeMS = KnownBlocksTimer.GetElapsedTimeMs(); + m_FindBlocksStats.FoundBlockCount = Result.KnownBlocks.size(); + Result.FindBlocksTimeMs = KnownBlocksTimer.GetElapsedTimeMs(); } - m_FindBlocksStats.FindBlockTimeMS = KnownBlocksTimer.GetElapsedTimeMs(); - m_FindBlocksStats.FoundBlockCount = Result.KnownBlocks.size(); - Result.FindBlocksTimeMs = KnownBlocksTimer.GetElapsedTimeMs(); - } - Result.ElapsedTimeMs = Timer.GetElapsedTimeMs(); - return Result; - }}, - WorkerThreadPool::EMode::EnableBacklog); + Result.ElapsedTimeMs = Timer.GetElapsedTimeMs(); + return Result; + }}, + WorkerThreadPool::EMode::EnableBacklog); - ChunkedFolderContent LocalContent; + ChunkedFolderContent LocalContent; - { - Stopwatch ScanTimer; - FolderContent Content; - if (m_ManifestPath.empty()) { - std::filesystem::path ExcludeManifestPath = m_Path / m_Options.ZenExcludeManifestName; - tsl::robin_set<std::string> ExcludeAssetPaths; - if (IsFile(ExcludeManifestPath)) + Stopwatch ScanTimer; + FolderContent Content; + if (m_ManifestPath.empty()) { - std::vector<std::filesystem::path> AssetPaths = ParseManifest(m_Path, ExcludeManifestPath); - ExcludeAssetPaths.reserve(AssetPaths.size()); - for (const std::filesystem::path& AssetPath : AssetPaths) + std::filesystem::path ExcludeManifestPath = m_Path / m_Options.ZenExcludeManifestName; + tsl::robin_set<std::string> ExcludeAssetPaths; + if (IsFile(ExcludeManifestPath)) { - ExcludeAssetPaths.insert(AssetPath.generic_string()); - } - } - Content = GetFolderContent( - m_LocalFolderScanStats, - m_Path, - [this](const std::string_view& RelativePath) { return IsAcceptedFolder(RelativePath); }, - [this, &ExcludeAssetPaths](const std::string_view& RelativePath, uint64_t Size, uint32_t Attributes) -> bool { - ZEN_UNUSED(Size, Attributes); - if (!IsAcceptedFile(RelativePath)) - { - return false; - } - if (ExcludeAssetPaths.contains(std::filesystem::path(RelativePath).generic_string())) + std::vector<std::filesystem::path> AssetPaths = ParseManifest(m_Path, ExcludeManifestPath); + ExcludeAssetPaths.reserve(AssetPaths.size()); + for (const std::filesystem::path& AssetPath : AssetPaths) { - return false; + ExcludeAssetPaths.insert(AssetPath.generic_string()); } - return true; - }, - m_IOWorkerPool, - m_LogOutput.GetProgressUpdateDelayMS(), - [&](bool, std::ptrdiff_t) { - LOG_OUTPUT(m_LogOutput, "Found {} files in '{}'...", m_LocalFolderScanStats.AcceptedFileCount.load(), m_Path); - }, - m_AbortFlag); - } - else - { - Stopwatch ManifestParseTimer; - std::vector<std::filesystem::path> AssetPaths = ParseManifest(m_Path, m_ManifestPath); - for (const std::filesystem::path& AssetPath : AssetPaths) + } + Content = GetFolderContent( + m_LocalFolderScanStats, + m_Path, + [this](const std::string_view& RelativePath) { return IsAcceptedFolder(RelativePath); }, + [this, &ExcludeAssetPaths](const std::string_view& RelativePath, uint64_t Size, uint32_t Attributes) -> bool { + ZEN_UNUSED(Size, Attributes); + if (!IsAcceptedFile(RelativePath)) + { + return false; + } + if (ExcludeAssetPaths.contains(std::filesystem::path(RelativePath).generic_string())) + { + return false; + } + return true; + }, + m_IOWorkerPool, + m_LogOutput.GetProgressUpdateDelayMS(), + [&](bool, std::ptrdiff_t) { + LOG_OUTPUT(m_LogOutput, "Found {} files in '{}'...", m_LocalFolderScanStats.AcceptedFileCount.load(), m_Path); + }, + m_AbortFlag); + } + else { - Content.Paths.push_back(AssetPath); - const std::filesystem::path AssetFilePath = (m_Path / AssetPath).make_preferred(); - Content.RawSizes.push_back(FileSizeFromPath(AssetFilePath)); + Stopwatch ManifestParseTimer; + std::vector<std::filesystem::path> AssetPaths = ParseManifest(m_Path, m_ManifestPath); + for (const std::filesystem::path& AssetPath : AssetPaths) + { + Content.Paths.push_back(AssetPath); + const std::filesystem::path AssetFilePath = (m_Path / AssetPath).make_preferred(); + Content.RawSizes.push_back(FileSizeFromPath(AssetFilePath)); #if ZEN_PLATFORM_WINDOWS - Content.Attributes.push_back(GetFileAttributesFromPath(AssetFilePath)); + Content.Attributes.push_back(GetFileAttributesFromPath(AssetFilePath)); #endif // ZEN_PLATFORM_WINDOWS #if ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX - Content.Attributes.push_back(GetFileMode(AssetFilePath)); + Content.Attributes.push_back(GetFileMode(AssetFilePath)); #endif // ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX - m_LocalFolderScanStats.AcceptedFileByteCount += Content.RawSizes.back(); - m_LocalFolderScanStats.AcceptedFileCount++; - } - if (m_ManifestPath.is_relative()) - { - Content.Paths.push_back(m_ManifestPath); - const std::filesystem::path ManifestFilePath = (m_Path / m_ManifestPath).make_preferred(); - Content.RawSizes.push_back(FileSizeFromPath(ManifestFilePath)); + m_LocalFolderScanStats.AcceptedFileByteCount += Content.RawSizes.back(); + m_LocalFolderScanStats.AcceptedFileCount++; + } + if (m_ManifestPath.is_relative()) + { + Content.Paths.push_back(m_ManifestPath); + const std::filesystem::path ManifestFilePath = (m_Path / m_ManifestPath).make_preferred(); + Content.RawSizes.push_back(FileSizeFromPath(ManifestFilePath)); #if ZEN_PLATFORM_WINDOWS - Content.Attributes.push_back(GetFileAttributesFromPath(ManifestFilePath)); + Content.Attributes.push_back(GetFileAttributesFromPath(ManifestFilePath)); #endif // ZEN_PLATFORM_WINDOWS #if ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX - Content.Attributes.push_back(GetFileMode(ManifestFilePath)); + Content.Attributes.push_back(GetFileMode(ManifestFilePath)); #endif // ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX - m_LocalFolderScanStats.AcceptedFileByteCount += Content.RawSizes.back(); - m_LocalFolderScanStats.AcceptedFileCount++; + m_LocalFolderScanStats.AcceptedFileByteCount += Content.RawSizes.back(); + m_LocalFolderScanStats.AcceptedFileCount++; + } + m_LocalFolderScanStats.FoundFileByteCount.store(m_LocalFolderScanStats.AcceptedFileByteCount); + m_LocalFolderScanStats.FoundFileCount.store(m_LocalFolderScanStats.AcceptedFileCount); + m_LocalFolderScanStats.ElapsedWallTimeUS = ManifestParseTimer.GetElapsedTimeUs(); } - m_LocalFolderScanStats.FoundFileByteCount.store(m_LocalFolderScanStats.AcceptedFileByteCount); - m_LocalFolderScanStats.FoundFileCount.store(m_LocalFolderScanStats.AcceptedFileCount); - m_LocalFolderScanStats.ElapsedWallTimeUS = ManifestParseTimer.GetElapsedTimeUs(); - } - std::unique_ptr<ChunkingController> ChunkController = CreateStandardChunkingController(StandardChunkingControllerSettings{}); - { - CbObjectWriter ChunkParametersWriter; - ChunkParametersWriter.AddString("name"sv, ChunkController->GetName()); - ChunkParametersWriter.AddObject("parameters"sv, ChunkController->GetParameters()); - ChunkerParameters = ChunkParametersWriter.Save(); - } + std::unique_ptr<ChunkingController> ChunkController = CreateStandardChunkingController(StandardChunkingControllerSettings{}); + { + CbObjectWriter ChunkParametersWriter; + ChunkParametersWriter.AddString("name"sv, ChunkController->GetName()); + ChunkParametersWriter.AddObject("parameters"sv, ChunkController->GetParameters()); + ChunkerParameters = ChunkParametersWriter.Save(); + } - TotalRawSize = std::accumulate(Content.RawSizes.begin(), Content.RawSizes.end(), std::uint64_t(0)); + TotalRawSize = std::accumulate(Content.RawSizes.begin(), Content.RawSizes.end(), std::uint64_t(0)); - { - std::unique_ptr<BuildOpLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Scan Folder")); - BuildOpLogOutput::ProgressBar& Progress(*ProgressBarPtr); + { + std::unique_ptr<BuildOpLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Scan Folder")); + BuildOpLogOutput::ProgressBar& Progress(*ProgressBarPtr); - FilteredRate FilteredBytesHashed; - FilteredBytesHashed.Start(); - LocalContent = ChunkFolderContent( - m_ChunkingStats, - m_IOWorkerPool, - m_Path, - Content, - *ChunkController, - m_LogOutput.GetProgressUpdateDelayMS(), - [&](bool IsAborted, bool IsPaused, std::ptrdiff_t) { - FilteredBytesHashed.Update(m_ChunkingStats.BytesHashed.load()); - std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found", - m_ChunkingStats.FilesProcessed.load(), - Content.Paths.size(), - NiceBytes(m_ChunkingStats.BytesHashed.load()), - NiceBytes(TotalRawSize), - NiceNum(FilteredBytesHashed.GetCurrent()), - m_ChunkingStats.UniqueChunksFound.load(), - NiceBytes(m_ChunkingStats.UniqueBytesFound.load())); - Progress.UpdateState({.Task = "Scanning files ", - .Details = Details, - .TotalCount = TotalRawSize, - .RemainingCount = TotalRawSize - m_ChunkingStats.BytesHashed.load(), - .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, - false); - }, - m_AbortFlag, - m_PauseFlag); - FilteredBytesHashed.Stop(); - Progress.Finish(); - if (m_AbortFlag) + FilteredRate FilteredBytesHashed; + FilteredBytesHashed.Start(); + LocalContent = ChunkFolderContent( + m_ChunkingStats, + m_IOWorkerPool, + m_Path, + Content, + *ChunkController, + m_LogOutput.GetProgressUpdateDelayMS(), + [&](bool IsAborted, bool IsPaused, std::ptrdiff_t) { + FilteredBytesHashed.Update(m_ChunkingStats.BytesHashed.load()); + std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found", + m_ChunkingStats.FilesProcessed.load(), + Content.Paths.size(), + NiceBytes(m_ChunkingStats.BytesHashed.load()), + NiceBytes(TotalRawSize), + NiceNum(FilteredBytesHashed.GetCurrent()), + m_ChunkingStats.UniqueChunksFound.load(), + NiceBytes(m_ChunkingStats.UniqueBytesFound.load())); + Progress.UpdateState({.Task = "Scanning files ", + .Details = Details, + .TotalCount = TotalRawSize, + .RemainingCount = TotalRawSize - m_ChunkingStats.BytesHashed.load(), + .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, + false); + }, + m_AbortFlag, + m_PauseFlag); + FilteredBytesHashed.Stop(); + Progress.Finish(); + if (m_AbortFlag) + { + return; + } + } + + if (!m_Options.IsQuiet) { - return; + LOG_OUTPUT(m_LogOutput, + "Found {} ({}) files divided into {} ({}) unique chunks in '{}' in {}. Average hash rate {}B/sec", + LocalContent.Paths.size(), + NiceBytes(TotalRawSize), + m_ChunkingStats.UniqueChunksFound.load(), + NiceBytes(m_ChunkingStats.UniqueBytesFound.load()), + m_Path, + NiceTimeSpanMs(ScanTimer.GetElapsedTimeMs()), + NiceNum(GetBytesPerSecond(m_ChunkingStats.ElapsedWallTimeUS, m_ChunkingStats.BytesHashed))); } } + const ChunkedContentLookup LocalLookup = BuildChunkedContentLookup(LocalContent); + + std::vector<size_t> ReuseBlockIndexes; + std::vector<uint32_t> NewBlockChunkIndexes; + + PrepareBuildResult PrepBuildResult = PrepBuildResultFuture.get(); + if (!m_Options.IsQuiet) { LOG_OUTPUT(m_LogOutput, - "Found {} ({}) files divided into {} ({}) unique chunks in '{}' in {}. Average hash rate {}B/sec", - LocalContent.Paths.size(), - NiceBytes(TotalRawSize), - m_ChunkingStats.UniqueChunksFound.load(), - NiceBytes(m_ChunkingStats.UniqueBytesFound.load()), - m_Path, - NiceTimeSpanMs(ScanTimer.GetElapsedTimeMs()), - NiceNum(GetBytesPerSecond(m_ChunkingStats.ElapsedWallTimeUS, m_ChunkingStats.BytesHashed))); + "Build prepare took {}. {} took {}, payload size {}{}", + NiceTimeSpanMs(PrepBuildResult.ElapsedTimeMs), + m_CreateBuild ? "PutBuild" : "GetBuild", + NiceTimeSpanMs(PrepBuildResult.PrepareBuildTimeMs), + NiceBytes(PrepBuildResult.PayloadSize), + m_Options.IgnoreExistingBlocks ? "" + : fmt::format(". Found {} blocks in {}", + PrepBuildResult.KnownBlocks.size(), + NiceTimeSpanMs(PrepBuildResult.FindBlocksTimeMs))); } - } - const ChunkedContentLookup LocalLookup = BuildChunkedContentLookup(LocalContent); + m_LogOutput.SetLogOperationProgress(TaskSteps::CalculateDelta, TaskSteps::StepCount); - std::vector<size_t> ReuseBlockIndexes; - std::vector<uint32_t> NewBlockChunkIndexes; + const std::uint64_t LargeAttachmentSize = + m_Options.AllowMultiparts ? PrepBuildResult.PreferredMultipartChunkSize * 4u : (std::uint64_t)-1; - PrepareBuildResult PrepBuildResult = PrepBuildResultFuture.get(); + Stopwatch BlockArrangeTimer; - if (!m_Options.IsQuiet) - { - LOG_OUTPUT(m_LogOutput, - "Build prepare took {}. {} took {}, payload size {}{}", - NiceTimeSpanMs(PrepBuildResult.ElapsedTimeMs), - m_CreateBuild ? "PutBuild" : "GetBuild", - NiceTimeSpanMs(PrepBuildResult.PrepareBuildTimeMs), - NiceBytes(PrepBuildResult.PayloadSize), - m_Options.IgnoreExistingBlocks ? "" - : fmt::format(". Found {} blocks in {}", - PrepBuildResult.KnownBlocks.size(), - NiceTimeSpanMs(PrepBuildResult.FindBlocksTimeMs))); - } - - m_LogOutput.SetLogOperationProgress(TaskSteps::CalculateDelta, TaskSteps::StepCount); - - const std::uint64_t LargeAttachmentSize = - m_Options.AllowMultiparts ? PrepBuildResult.PreferredMultipartChunkSize * 4u : (std::uint64_t)-1; - - Stopwatch BlockArrangeTimer; - - std::vector<std::uint32_t> LooseChunkIndexes; - { - bool EnableBlocks = true; - std::vector<std::uint32_t> BlockChunkIndexes; - for (uint32_t ChunkIndex = 0; ChunkIndex < LocalContent.ChunkedContent.ChunkHashes.size(); ChunkIndex++) + std::vector<std::uint32_t> LooseChunkIndexes; { - const uint64_t ChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; - if (!EnableBlocks || ChunkRawSize == 0 || ChunkRawSize > m_Options.BlockParameters.MaxChunkEmbedSize) + bool EnableBlocks = true; + std::vector<std::uint32_t> BlockChunkIndexes; + for (uint32_t ChunkIndex = 0; ChunkIndex < LocalContent.ChunkedContent.ChunkHashes.size(); ChunkIndex++) { - LooseChunkIndexes.push_back(ChunkIndex); - m_LooseChunksStats.ChunkByteCount += ChunkRawSize; - } - else - { - BlockChunkIndexes.push_back(ChunkIndex); - m_FindBlocksStats.PotentialChunkByteCount += ChunkRawSize; + const uint64_t ChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; + if (!EnableBlocks || ChunkRawSize == 0 || ChunkRawSize > m_Options.BlockParameters.MaxChunkEmbedSize) + { + LooseChunkIndexes.push_back(ChunkIndex); + m_LooseChunksStats.ChunkByteCount += ChunkRawSize; + } + else + { + BlockChunkIndexes.push_back(ChunkIndex); + m_FindBlocksStats.PotentialChunkByteCount += ChunkRawSize; + } } - } - m_FindBlocksStats.PotentialChunkCount = BlockChunkIndexes.size(); - m_LooseChunksStats.ChunkCount = LooseChunkIndexes.size(); + m_FindBlocksStats.PotentialChunkCount = BlockChunkIndexes.size(); + m_LooseChunksStats.ChunkCount = LooseChunkIndexes.size(); - if (m_Options.IgnoreExistingBlocks) - { - if (!m_Options.IsQuiet) + if (m_Options.IgnoreExistingBlocks) { - LOG_OUTPUT(m_LogOutput, "Ignoring any existing blocks in store"); + if (!m_Options.IsQuiet) + { + LOG_OUTPUT(m_LogOutput, "Ignoring any existing blocks in store"); + } + NewBlockChunkIndexes = std::move(BlockChunkIndexes); } - NewBlockChunkIndexes = std::move(BlockChunkIndexes); - } - else - { - ReuseBlockIndexes = FindReuseBlocks(PrepBuildResult.KnownBlocks, - LocalContent.ChunkedContent.ChunkHashes, - BlockChunkIndexes, - NewBlockChunkIndexes); - m_FindBlocksStats.AcceptedBlockCount = ReuseBlockIndexes.size(); - - for (const ChunkBlockDescription& Description : PrepBuildResult.KnownBlocks) + else { - for (uint32_t ChunkRawLength : Description.ChunkRawLengths) + ReuseBlockIndexes = FindReuseBlocks(PrepBuildResult.KnownBlocks, + LocalContent.ChunkedContent.ChunkHashes, + BlockChunkIndexes, + NewBlockChunkIndexes); + m_FindBlocksStats.AcceptedBlockCount = ReuseBlockIndexes.size(); + + for (const ChunkBlockDescription& Description : PrepBuildResult.KnownBlocks) { - m_FindBlocksStats.FoundBlockByteCount += ChunkRawLength; + for (uint32_t ChunkRawLength : Description.ChunkRawLengths) + { + m_FindBlocksStats.FoundBlockByteCount += ChunkRawLength; + } + m_FindBlocksStats.FoundBlockChunkCount += Description.ChunkRawHashes.size(); } - m_FindBlocksStats.FoundBlockChunkCount += Description.ChunkRawHashes.size(); } } - } - std::vector<std::vector<uint32_t>> NewBlockChunks; - ArrangeChunksIntoBlocks(LocalContent, LocalLookup, NewBlockChunkIndexes, NewBlockChunks); + std::vector<std::vector<uint32_t>> NewBlockChunks; + ArrangeChunksIntoBlocks(LocalContent, LocalLookup, NewBlockChunkIndexes, NewBlockChunks); - m_FindBlocksStats.NewBlocksCount = NewBlockChunks.size(); - for (uint32_t ChunkIndex : NewBlockChunkIndexes) - { - m_FindBlocksStats.NewBlocksChunkByteCount += LocalContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; - } - m_FindBlocksStats.NewBlocksChunkCount = NewBlockChunkIndexes.size(); - - const double AcceptedByteCountPercent = - m_FindBlocksStats.PotentialChunkByteCount > 0 - ? (100.0 * m_FindBlocksStats.AcceptedRawByteCount / m_FindBlocksStats.PotentialChunkByteCount) - : 0.0; - - const double AcceptedReduntantByteCountPercent = - m_FindBlocksStats.AcceptedByteCount > 0 ? (100.0 * m_FindBlocksStats.AcceptedReduntantByteCount) / - (m_FindBlocksStats.AcceptedByteCount + m_FindBlocksStats.AcceptedReduntantByteCount) - : 0.0; - if (!m_Options.IsQuiet) - { - LOG_OUTPUT(m_LogOutput, - "Found {} chunks in {} ({}) blocks eligible for reuse in {}\n" - " Reusing {} ({}) matching chunks in {} blocks ({:.1f}%)\n" - " Accepting {} ({}) redundant chunks ({:.1f}%)\n" - " Rejected {} ({}) chunks in {} blocks\n" - " Arranged {} ({}) chunks in {} new blocks\n" - " Keeping {} ({}) chunks as loose chunks\n" - " Discovery completed in {}", - m_FindBlocksStats.FoundBlockChunkCount, - m_FindBlocksStats.FoundBlockCount, - NiceBytes(m_FindBlocksStats.FoundBlockByteCount), - NiceTimeSpanMs(m_FindBlocksStats.FindBlockTimeMS), - - m_FindBlocksStats.AcceptedChunkCount, - NiceBytes(m_FindBlocksStats.AcceptedRawByteCount), - m_FindBlocksStats.AcceptedBlockCount, - AcceptedByteCountPercent, - - m_FindBlocksStats.AcceptedReduntantChunkCount, - NiceBytes(m_FindBlocksStats.AcceptedReduntantByteCount), - AcceptedReduntantByteCountPercent, - - m_FindBlocksStats.RejectedChunkCount, - NiceBytes(m_FindBlocksStats.RejectedByteCount), - m_FindBlocksStats.RejectedBlockCount, - - m_FindBlocksStats.NewBlocksChunkCount, - NiceBytes(m_FindBlocksStats.NewBlocksChunkByteCount), - m_FindBlocksStats.NewBlocksCount, - - m_LooseChunksStats.ChunkCount, - NiceBytes(m_LooseChunksStats.ChunkByteCount), - - NiceTimeSpanMs(BlockArrangeTimer.GetElapsedTimeMs())); - } + m_FindBlocksStats.NewBlocksCount = NewBlockChunks.size(); + for (uint32_t ChunkIndex : NewBlockChunkIndexes) + { + m_FindBlocksStats.NewBlocksChunkByteCount += LocalContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; + } + m_FindBlocksStats.NewBlocksChunkCount = NewBlockChunkIndexes.size(); - m_LogOutput.SetLogOperationProgress(TaskSteps::GenerateBlocks, TaskSteps::StepCount); - GeneratedBlocks NewBlocks; + const double AcceptedByteCountPercent = + m_FindBlocksStats.PotentialChunkByteCount > 0 + ? (100.0 * m_FindBlocksStats.AcceptedRawByteCount / m_FindBlocksStats.PotentialChunkByteCount) + : 0.0; - if (!NewBlockChunks.empty()) - { - Stopwatch GenerateBuildBlocksTimer; - auto __ = MakeGuard([&]() { - uint64_t BlockGenerateTimeUs = GenerateBuildBlocksTimer.GetElapsedTimeUs(); - if (!m_Options.IsQuiet) - { - LOG_OUTPUT(m_LogOutput, - "Generated {} ({}) and uploaded {} ({}) blocks in {}. Generate speed: {}B/sec. Transfer speed {}bits/sec.", - m_GenerateBlocksStats.GeneratedBlockCount.load(), - NiceBytes(m_GenerateBlocksStats.GeneratedBlockByteCount), - m_UploadStats.BlockCount.load(), - NiceBytes(m_UploadStats.BlocksBytes.load()), - NiceTimeSpanMs(BlockGenerateTimeUs / 1000), - NiceNum(GetBytesPerSecond(m_GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS, - m_GenerateBlocksStats.GeneratedBlockByteCount)), - NiceNum(GetBytesPerSecond(m_UploadStats.ElapsedWallTimeUS, m_UploadStats.BlocksBytes * 8))); - } - }); - GenerateBuildBlocks(LocalContent, LocalLookup, NewBlockChunks, NewBlocks); - } + const double AcceptedReduntantByteCountPercent = + m_FindBlocksStats.AcceptedByteCount > 0 + ? (100.0 * m_FindBlocksStats.AcceptedReduntantByteCount) / + (m_FindBlocksStats.AcceptedByteCount + m_FindBlocksStats.AcceptedReduntantByteCount) + : 0.0; + if (!m_Options.IsQuiet) + { + LOG_OUTPUT(m_LogOutput, + "Found {} chunks in {} ({}) blocks eligible for reuse in {}\n" + " Reusing {} ({}) matching chunks in {} blocks ({:.1f}%)\n" + " Accepting {} ({}) redundant chunks ({:.1f}%)\n" + " Rejected {} ({}) chunks in {} blocks\n" + " Arranged {} ({}) chunks in {} new blocks\n" + " Keeping {} ({}) chunks as loose chunks\n" + " Discovery completed in {}", + m_FindBlocksStats.FoundBlockChunkCount, + m_FindBlocksStats.FoundBlockCount, + NiceBytes(m_FindBlocksStats.FoundBlockByteCount), + NiceTimeSpanMs(m_FindBlocksStats.FindBlockTimeMS), - m_LogOutput.SetLogOperationProgress(TaskSteps::BuildPartManifest, TaskSteps::StepCount); + m_FindBlocksStats.AcceptedChunkCount, + NiceBytes(m_FindBlocksStats.AcceptedRawByteCount), + m_FindBlocksStats.AcceptedBlockCount, + AcceptedByteCountPercent, - CbObject PartManifest; - { - CbObjectWriter PartManifestWriter; - Stopwatch ManifestGenerationTimer; - auto __ = MakeGuard([&]() { - if (!m_Options.IsQuiet) - { - LOG_OUTPUT(m_LogOutput, - "Generated build part manifest in {} ({})", - NiceTimeSpanMs(ManifestGenerationTimer.GetElapsedTimeMs()), - NiceBytes(PartManifestWriter.GetSaveSize())); - } - }); - PartManifestWriter.AddObject("chunker"sv, ChunkerParameters); + m_FindBlocksStats.AcceptedReduntantChunkCount, + NiceBytes(m_FindBlocksStats.AcceptedReduntantByteCount), + AcceptedReduntantByteCountPercent, - std::vector<IoHash> AllChunkBlockHashes; - std::vector<ChunkBlockDescription> AllChunkBlockDescriptions; - AllChunkBlockHashes.reserve(ReuseBlockIndexes.size() + NewBlocks.BlockDescriptions.size()); - AllChunkBlockDescriptions.reserve(ReuseBlockIndexes.size() + NewBlocks.BlockDescriptions.size()); - for (size_t ReuseBlockIndex : ReuseBlockIndexes) - { - AllChunkBlockDescriptions.push_back(PrepBuildResult.KnownBlocks[ReuseBlockIndex]); - AllChunkBlockHashes.push_back(PrepBuildResult.KnownBlocks[ReuseBlockIndex].BlockHash); + m_FindBlocksStats.RejectedChunkCount, + NiceBytes(m_FindBlocksStats.RejectedByteCount), + m_FindBlocksStats.RejectedBlockCount, + + m_FindBlocksStats.NewBlocksChunkCount, + NiceBytes(m_FindBlocksStats.NewBlocksChunkByteCount), + m_FindBlocksStats.NewBlocksCount, + + m_LooseChunksStats.ChunkCount, + NiceBytes(m_LooseChunksStats.ChunkByteCount), + + NiceTimeSpanMs(BlockArrangeTimer.GetElapsedTimeMs())); } - AllChunkBlockDescriptions.insert(AllChunkBlockDescriptions.end(), - NewBlocks.BlockDescriptions.begin(), - NewBlocks.BlockDescriptions.end()); - for (const ChunkBlockDescription& BlockDescription : NewBlocks.BlockDescriptions) + + m_LogOutput.SetLogOperationProgress(TaskSteps::GenerateBlocks, TaskSteps::StepCount); + GeneratedBlocks NewBlocks; + + if (!NewBlockChunks.empty()) { - AllChunkBlockHashes.push_back(BlockDescription.BlockHash); - } + Stopwatch GenerateBuildBlocksTimer; + auto __ = MakeGuard([&]() { + uint64_t BlockGenerateTimeUs = GenerateBuildBlocksTimer.GetElapsedTimeUs(); + if (!m_Options.IsQuiet) + { + LOG_OUTPUT(m_LogOutput, + "Generated {} ({}) and uploaded {} ({}) blocks in {}. Generate speed: {}B/sec. Transfer speed {}bits/sec.", + m_GenerateBlocksStats.GeneratedBlockCount.load(), + NiceBytes(m_GenerateBlocksStats.GeneratedBlockByteCount), + m_UploadStats.BlockCount.load(), + NiceBytes(m_UploadStats.BlocksBytes.load()), + NiceTimeSpanMs(BlockGenerateTimeUs / 1000), + NiceNum(GetBytesPerSecond(m_GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS, + m_GenerateBlocksStats.GeneratedBlockByteCount)), + NiceNum(GetBytesPerSecond(m_UploadStats.ElapsedWallTimeUS, m_UploadStats.BlocksBytes * 8))); + } + }); + GenerateBuildBlocks(LocalContent, LocalLookup, NewBlockChunks, NewBlocks); + } + + m_LogOutput.SetLogOperationProgress(TaskSteps::BuildPartManifest, TaskSteps::StepCount); + + CbObject PartManifest; + { + CbObjectWriter PartManifestWriter; + Stopwatch ManifestGenerationTimer; + auto __ = MakeGuard([&]() { + if (!m_Options.IsQuiet) + { + LOG_OUTPUT(m_LogOutput, + "Generated build part manifest in {} ({})", + NiceTimeSpanMs(ManifestGenerationTimer.GetElapsedTimeMs()), + NiceBytes(PartManifestWriter.GetSaveSize())); + } + }); + PartManifestWriter.AddObject("chunker"sv, ChunkerParameters); + + std::vector<IoHash> AllChunkBlockHashes; + std::vector<ChunkBlockDescription> AllChunkBlockDescriptions; + AllChunkBlockHashes.reserve(ReuseBlockIndexes.size() + NewBlocks.BlockDescriptions.size()); + AllChunkBlockDescriptions.reserve(ReuseBlockIndexes.size() + NewBlocks.BlockDescriptions.size()); + for (size_t ReuseBlockIndex : ReuseBlockIndexes) + { + AllChunkBlockDescriptions.push_back(PrepBuildResult.KnownBlocks[ReuseBlockIndex]); + AllChunkBlockHashes.push_back(PrepBuildResult.KnownBlocks[ReuseBlockIndex].BlockHash); + } + AllChunkBlockDescriptions.insert(AllChunkBlockDescriptions.end(), + NewBlocks.BlockDescriptions.begin(), + NewBlocks.BlockDescriptions.end()); + for (const ChunkBlockDescription& BlockDescription : NewBlocks.BlockDescriptions) + { + AllChunkBlockHashes.push_back(BlockDescription.BlockHash); + } #if EXTRA_VERIFY - tsl::robin_map<IoHash, size_t, IoHash::Hasher> ChunkHashToAbsoluteChunkIndex; - std::vector<IoHash> AbsoluteChunkHashes; - AbsoluteChunkHashes.reserve(LocalContent.ChunkedContent.ChunkHashes.size()); - for (uint32_t ChunkIndex : LooseChunkIndexes) - { - ChunkHashToAbsoluteChunkIndex.insert({LocalContent.ChunkedContent.ChunkHashes[ChunkIndex], AbsoluteChunkHashes.size()}); - AbsoluteChunkHashes.push_back(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]); - } - for (const ChunkBlockDescription& Block : AllChunkBlockDescriptions) - { - for (const IoHash& ChunkHash : Block.ChunkRawHashes) + tsl::robin_map<IoHash, size_t, IoHash::Hasher> ChunkHashToAbsoluteChunkIndex; + std::vector<IoHash> AbsoluteChunkHashes; + AbsoluteChunkHashes.reserve(LocalContent.ChunkedContent.ChunkHashes.size()); + for (uint32_t ChunkIndex : LooseChunkIndexes) { - ChunkHashToAbsoluteChunkIndex.insert({ChunkHash, AbsoluteChunkHashes.size()}); - AbsoluteChunkHashes.push_back(ChunkHash); + ChunkHashToAbsoluteChunkIndex.insert({LocalContent.ChunkedContent.ChunkHashes[ChunkIndex], AbsoluteChunkHashes.size()}); + AbsoluteChunkHashes.push_back(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]); + } + for (const ChunkBlockDescription& Block : AllChunkBlockDescriptions) + { + for (const IoHash& ChunkHash : Block.ChunkRawHashes) + { + ChunkHashToAbsoluteChunkIndex.insert({ChunkHash, AbsoluteChunkHashes.size()}); + AbsoluteChunkHashes.push_back(ChunkHash); + } + } + for (const IoHash& ChunkHash : LocalContent.ChunkedContent.ChunkHashes) + { + ZEN_ASSERT(AbsoluteChunkHashes[ChunkHashToAbsoluteChunkIndex.at(ChunkHash)] == ChunkHash); + ZEN_ASSERT(LocalContent.ChunkedContent.ChunkHashes[LocalLookup.ChunkHashToChunkIndex.at(ChunkHash)] == ChunkHash); + } + for (const uint32_t ChunkIndex : LocalContent.ChunkedContent.ChunkOrders) + { + ZEN_ASSERT(AbsoluteChunkHashes[ChunkHashToAbsoluteChunkIndex.at(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex])] == + LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]); + ZEN_ASSERT(LocalLookup.ChunkHashToChunkIndex.at(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]) == ChunkIndex); } - } - for (const IoHash& ChunkHash : LocalContent.ChunkedContent.ChunkHashes) - { - ZEN_ASSERT(AbsoluteChunkHashes[ChunkHashToAbsoluteChunkIndex.at(ChunkHash)] == ChunkHash); - ZEN_ASSERT(LocalContent.ChunkedContent.ChunkHashes[LocalLookup.ChunkHashToChunkIndex.at(ChunkHash)] == ChunkHash); - } - for (const uint32_t ChunkIndex : LocalContent.ChunkedContent.ChunkOrders) - { - ZEN_ASSERT(AbsoluteChunkHashes[ChunkHashToAbsoluteChunkIndex.at(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex])] == - LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]); - ZEN_ASSERT(LocalLookup.ChunkHashToChunkIndex.at(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]) == ChunkIndex); - } #endif // EXTRA_VERIFY - std::vector<uint32_t> AbsoluteChunkOrders = CalculateAbsoluteChunkOrders(LocalContent.ChunkedContent.ChunkHashes, - LocalContent.ChunkedContent.ChunkOrders, - LocalLookup.ChunkHashToChunkIndex, - LooseChunkIndexes, - AllChunkBlockDescriptions); + std::vector<uint32_t> AbsoluteChunkOrders = CalculateAbsoluteChunkOrders(LocalContent.ChunkedContent.ChunkHashes, + LocalContent.ChunkedContent.ChunkOrders, + LocalLookup.ChunkHashToChunkIndex, + LooseChunkIndexes, + AllChunkBlockDescriptions); #if EXTRA_VERIFY - for (uint32_t ChunkOrderIndex = 0; ChunkOrderIndex < LocalContent.ChunkedContent.ChunkOrders.size(); ChunkOrderIndex++) - { - uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[ChunkOrderIndex]; - uint32_t AbsoluteChunkIndex = AbsoluteChunkOrders[ChunkOrderIndex]; - const IoHash& LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; - const IoHash& AbsoluteChunkHash = AbsoluteChunkHashes[AbsoluteChunkIndex]; - ZEN_ASSERT(LocalChunkHash == AbsoluteChunkHash); - } + for (uint32_t ChunkOrderIndex = 0; ChunkOrderIndex < LocalContent.ChunkedContent.ChunkOrders.size(); ChunkOrderIndex++) + { + uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[ChunkOrderIndex]; + uint32_t AbsoluteChunkIndex = AbsoluteChunkOrders[ChunkOrderIndex]; + const IoHash& LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; + const IoHash& AbsoluteChunkHash = AbsoluteChunkHashes[AbsoluteChunkIndex]; + ZEN_ASSERT(LocalChunkHash == AbsoluteChunkHash); + } #endif // EXTRA_VERIFY - WriteBuildContentToCompactBinary(PartManifestWriter, - LocalContent.Platform, - LocalContent.Paths, - LocalContent.RawHashes, - LocalContent.RawSizes, - LocalContent.Attributes, - LocalContent.ChunkedContent.SequenceRawHashes, - LocalContent.ChunkedContent.ChunkCounts, - LocalContent.ChunkedContent.ChunkHashes, - LocalContent.ChunkedContent.ChunkRawSizes, - AbsoluteChunkOrders, - LooseChunkIndexes, - AllChunkBlockHashes); + WriteBuildContentToCompactBinary(PartManifestWriter, + LocalContent.Platform, + LocalContent.Paths, + LocalContent.RawHashes, + LocalContent.RawSizes, + LocalContent.Attributes, + LocalContent.ChunkedContent.SequenceRawHashes, + LocalContent.ChunkedContent.ChunkCounts, + LocalContent.ChunkedContent.ChunkHashes, + LocalContent.ChunkedContent.ChunkRawSizes, + AbsoluteChunkOrders, + LooseChunkIndexes, + AllChunkBlockHashes); #if EXTRA_VERIFY - { - ChunkedFolderContent VerifyFolderContent; - - std::vector<uint32_t> OutAbsoluteChunkOrders; - std::vector<IoHash> OutLooseChunkHashes; - std::vector<uint64_t> OutLooseChunkRawSizes; - std::vector<IoHash> OutBlockRawHashes; - ReadBuildContentFromCompactBinary(PartManifestWriter.Save(), - VerifyFolderContent.Platform, - VerifyFolderContent.Paths, - VerifyFolderContent.RawHashes, - VerifyFolderContent.RawSizes, - VerifyFolderContent.Attributes, - VerifyFolderContent.ChunkedContent.SequenceRawHashes, - VerifyFolderContent.ChunkedContent.ChunkCounts, - OutAbsoluteChunkOrders, - OutLooseChunkHashes, - OutLooseChunkRawSizes, - OutBlockRawHashes); - ZEN_ASSERT(OutBlockRawHashes == AllChunkBlockHashes); - - for (uint32_t OrderIndex = 0; OrderIndex < OutAbsoluteChunkOrders.size(); OrderIndex++) { - uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[OrderIndex]; - const IoHash LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; + ChunkedFolderContent VerifyFolderContent; + + std::vector<uint32_t> OutAbsoluteChunkOrders; + std::vector<IoHash> OutLooseChunkHashes; + std::vector<uint64_t> OutLooseChunkRawSizes; + std::vector<IoHash> OutBlockRawHashes; + ReadBuildContentFromCompactBinary(PartManifestWriter.Save(), + VerifyFolderContent.Platform, + VerifyFolderContent.Paths, + VerifyFolderContent.RawHashes, + VerifyFolderContent.RawSizes, + VerifyFolderContent.Attributes, + VerifyFolderContent.ChunkedContent.SequenceRawHashes, + VerifyFolderContent.ChunkedContent.ChunkCounts, + OutAbsoluteChunkOrders, + OutLooseChunkHashes, + OutLooseChunkRawSizes, + OutBlockRawHashes); + ZEN_ASSERT(OutBlockRawHashes == AllChunkBlockHashes); + + for (uint32_t OrderIndex = 0; OrderIndex < OutAbsoluteChunkOrders.size(); OrderIndex++) + { + uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[OrderIndex]; + const IoHash LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; - uint32_t VerifyChunkIndex = OutAbsoluteChunkOrders[OrderIndex]; - const IoHash VerifyChunkHash = AbsoluteChunkHashes[VerifyChunkIndex]; + uint32_t VerifyChunkIndex = OutAbsoluteChunkOrders[OrderIndex]; + const IoHash VerifyChunkHash = AbsoluteChunkHashes[VerifyChunkIndex]; - ZEN_ASSERT(LocalChunkHash == VerifyChunkHash); - } + ZEN_ASSERT(LocalChunkHash == VerifyChunkHash); + } - CalculateLocalChunkOrders(OutAbsoluteChunkOrders, - OutLooseChunkHashes, - OutLooseChunkRawSizes, - AllChunkBlockDescriptions, - VerifyFolderContent.ChunkedContent.ChunkHashes, - VerifyFolderContent.ChunkedContent.ChunkRawSizes, - VerifyFolderContent.ChunkedContent.ChunkOrders); - - ZEN_ASSERT(LocalContent.Paths == VerifyFolderContent.Paths); - ZEN_ASSERT(LocalContent.RawHashes == VerifyFolderContent.RawHashes); - ZEN_ASSERT(LocalContent.RawSizes == VerifyFolderContent.RawSizes); - ZEN_ASSERT(LocalContent.Attributes == VerifyFolderContent.Attributes); - ZEN_ASSERT(LocalContent.ChunkedContent.SequenceRawHashes == VerifyFolderContent.ChunkedContent.SequenceRawHashes); - ZEN_ASSERT(LocalContent.ChunkedContent.ChunkCounts == VerifyFolderContent.ChunkedContent.ChunkCounts); - - for (uint32_t OrderIndex = 0; OrderIndex < LocalContent.ChunkedContent.ChunkOrders.size(); OrderIndex++) - { - uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[OrderIndex]; - const IoHash LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; - uint64_t LocalChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[LocalChunkIndex]; + CalculateLocalChunkOrders(OutAbsoluteChunkOrders, + OutLooseChunkHashes, + OutLooseChunkRawSizes, + AllChunkBlockDescriptions, + VerifyFolderContent.ChunkedContent.ChunkHashes, + VerifyFolderContent.ChunkedContent.ChunkRawSizes, + VerifyFolderContent.ChunkedContent.ChunkOrders); + + ZEN_ASSERT(LocalContent.Paths == VerifyFolderContent.Paths); + ZEN_ASSERT(LocalContent.RawHashes == VerifyFolderContent.RawHashes); + ZEN_ASSERT(LocalContent.RawSizes == VerifyFolderContent.RawSizes); + ZEN_ASSERT(LocalContent.Attributes == VerifyFolderContent.Attributes); + ZEN_ASSERT(LocalContent.ChunkedContent.SequenceRawHashes == VerifyFolderContent.ChunkedContent.SequenceRawHashes); + ZEN_ASSERT(LocalContent.ChunkedContent.ChunkCounts == VerifyFolderContent.ChunkedContent.ChunkCounts); + + for (uint32_t OrderIndex = 0; OrderIndex < LocalContent.ChunkedContent.ChunkOrders.size(); OrderIndex++) + { + uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[OrderIndex]; + const IoHash LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; + uint64_t LocalChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[LocalChunkIndex]; - uint32_t VerifyChunkIndex = VerifyFolderContent.ChunkedContent.ChunkOrders[OrderIndex]; - const IoHash VerifyChunkHash = VerifyFolderContent.ChunkedContent.ChunkHashes[VerifyChunkIndex]; - uint64_t VerifyChunkRawSize = VerifyFolderContent.ChunkedContent.ChunkRawSizes[VerifyChunkIndex]; + uint32_t VerifyChunkIndex = VerifyFolderContent.ChunkedContent.ChunkOrders[OrderIndex]; + const IoHash VerifyChunkHash = VerifyFolderContent.ChunkedContent.ChunkHashes[VerifyChunkIndex]; + uint64_t VerifyChunkRawSize = VerifyFolderContent.ChunkedContent.ChunkRawSizes[VerifyChunkIndex]; - ZEN_ASSERT(LocalChunkHash == VerifyChunkHash); - ZEN_ASSERT(LocalChunkRawSize == VerifyChunkRawSize); + ZEN_ASSERT(LocalChunkHash == VerifyChunkHash); + ZEN_ASSERT(LocalChunkRawSize == VerifyChunkRawSize); + } } - } #endif // EXTRA_VERIFY - PartManifest = PartManifestWriter.Save(); - } - - m_LogOutput.SetLogOperationProgress(TaskSteps::Upload, TaskSteps::StepCount); - - Stopwatch PutBuildPartResultTimer; - std::pair<IoHash, std::vector<IoHash>> PutBuildPartResult = - m_Storage.BuildStorage->PutBuildPart(m_BuildId, m_BuildPartId, m_BuildPartName, PartManifest); - if (!m_Options.IsQuiet) - { - LOG_OUTPUT(m_LogOutput, - "PutBuildPart took {}, payload size {}. {} attachments are needed.", - NiceTimeSpanMs(PutBuildPartResultTimer.GetElapsedTimeMs()), - NiceBytes(PartManifest.GetSize()), - PutBuildPartResult.second.size()); - } - IoHash PartHash = PutBuildPartResult.first; + PartManifest = PartManifestWriter.Save(); + } - auto UploadAttachments = [this, &LocalContent, &LocalLookup, &NewBlockChunks, &NewBlocks, &LooseChunkIndexes, &LargeAttachmentSize]( - std::span<IoHash> RawHashes) { - if (!m_AbortFlag) - { - UploadStatistics TempUploadStats; - LooseChunksStatistics TempLooseChunksStats; + m_LogOutput.SetLogOperationProgress(TaskSteps::Upload, TaskSteps::StepCount); - Stopwatch TempUploadTimer; - auto __ = MakeGuard([&]() { - if (!m_Options.IsQuiet) - { - uint64_t TempChunkUploadTimeUs = TempUploadTimer.GetElapsedTimeUs(); - LOG_OUTPUT(m_LogOutput, - "Uploaded {} ({}) blocks. " - "Compressed {} ({} {}B/s) and uploaded {} ({}) chunks. " - "Transferred {} ({}bits/s) in {}", - TempUploadStats.BlockCount.load(), - NiceBytes(TempUploadStats.BlocksBytes), - - TempLooseChunksStats.CompressedChunkCount.load(), - NiceBytes(TempLooseChunksStats.CompressedChunkBytes.load()), - NiceNum(GetBytesPerSecond(TempLooseChunksStats.CompressChunksElapsedWallTimeUS, - TempLooseChunksStats.ChunkByteCount)), - TempUploadStats.ChunkCount.load(), - NiceBytes(TempUploadStats.ChunksBytes), - - NiceBytes(TempUploadStats.BlocksBytes + TempUploadStats.ChunksBytes), - NiceNum(GetBytesPerSecond(TempUploadStats.ElapsedWallTimeUS, TempUploadStats.ChunksBytes * 8)), - NiceTimeSpanMs(TempChunkUploadTimeUs / 1000)); - } - }); - UploadPartBlobs(LocalContent, - LocalLookup, - RawHashes, - NewBlockChunks, - NewBlocks, - LooseChunkIndexes, - LargeAttachmentSize, - TempUploadStats, - TempLooseChunksStats); - m_UploadStats += TempUploadStats; - m_LooseChunksStats += TempLooseChunksStats; - } - }; - if (m_Options.IgnoreExistingBlocks) - { - if (m_Options.IsVerbose) + Stopwatch PutBuildPartResultTimer; + std::pair<IoHash, std::vector<IoHash>> PutBuildPartResult = + m_Storage.BuildStorage->PutBuildPart(m_BuildId, m_BuildPartId, m_BuildPartName, PartManifest); + if (!m_Options.IsQuiet) { LOG_OUTPUT(m_LogOutput, - "PutBuildPart uploading all attachments, needs are: {}", - FormatArray<IoHash>(PutBuildPartResult.second, "\n "sv)); + "PutBuildPart took {}, payload size {}. {} attachments are needed.", + NiceTimeSpanMs(PutBuildPartResultTimer.GetElapsedTimeMs()), + NiceBytes(PartManifest.GetSize()), + PutBuildPartResult.second.size()); } + IoHash PartHash = PutBuildPartResult.first; - std::vector<IoHash> ForceUploadChunkHashes; - ForceUploadChunkHashes.reserve(LooseChunkIndexes.size()); - - for (uint32_t ChunkIndex : LooseChunkIndexes) + auto UploadAttachments = [this, &LocalContent, &LocalLookup, &NewBlockChunks, &NewBlocks, &LooseChunkIndexes, &LargeAttachmentSize]( + std::span<IoHash> RawHashes) { + if (!m_AbortFlag) + { + UploadStatistics TempUploadStats; + LooseChunksStatistics TempLooseChunksStats; + + Stopwatch TempUploadTimer; + auto __ = MakeGuard([&]() { + if (!m_Options.IsQuiet) + { + uint64_t TempChunkUploadTimeUs = TempUploadTimer.GetElapsedTimeUs(); + LOG_OUTPUT(m_LogOutput, + "Uploaded {} ({}) blocks. " + "Compressed {} ({} {}B/s) and uploaded {} ({}) chunks. " + "Transferred {} ({}bits/s) in {}", + TempUploadStats.BlockCount.load(), + NiceBytes(TempUploadStats.BlocksBytes), + + TempLooseChunksStats.CompressedChunkCount.load(), + NiceBytes(TempLooseChunksStats.CompressedChunkBytes.load()), + NiceNum(GetBytesPerSecond(TempLooseChunksStats.CompressChunksElapsedWallTimeUS, + TempLooseChunksStats.ChunkByteCount)), + TempUploadStats.ChunkCount.load(), + NiceBytes(TempUploadStats.ChunksBytes), + + NiceBytes(TempUploadStats.BlocksBytes + TempUploadStats.ChunksBytes), + NiceNum(GetBytesPerSecond(TempUploadStats.ElapsedWallTimeUS, TempUploadStats.ChunksBytes * 8)), + NiceTimeSpanMs(TempChunkUploadTimeUs / 1000)); + } + }); + UploadPartBlobs(LocalContent, + LocalLookup, + RawHashes, + NewBlockChunks, + NewBlocks, + LooseChunkIndexes, + LargeAttachmentSize, + TempUploadStats, + TempLooseChunksStats); + m_UploadStats += TempUploadStats; + m_LooseChunksStats += TempLooseChunksStats; + } + }; + if (m_Options.IgnoreExistingBlocks) { - ForceUploadChunkHashes.push_back(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]); - } + if (m_Options.IsVerbose) + { + LOG_OUTPUT(m_LogOutput, + "PutBuildPart uploading all attachments, needs are: {}", + FormatArray<IoHash>(PutBuildPartResult.second, "\n "sv)); + } - for (size_t BlockIndex = 0; BlockIndex < NewBlocks.BlockHeaders.size(); BlockIndex++) - { - if (NewBlocks.BlockHeaders[BlockIndex]) + std::vector<IoHash> ForceUploadChunkHashes; + ForceUploadChunkHashes.reserve(LooseChunkIndexes.size()); + + for (uint32_t ChunkIndex : LooseChunkIndexes) { - // Block was not uploaded during generation - ForceUploadChunkHashes.push_back(NewBlocks.BlockDescriptions[BlockIndex].BlockHash); + ForceUploadChunkHashes.push_back(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]); } - } - UploadAttachments(ForceUploadChunkHashes); - } - else if (!PutBuildPartResult.second.empty()) - { - if (m_Options.IsVerbose) - { - LOG_OUTPUT(m_LogOutput, "PutBuildPart needs attachments: {}", FormatArray<IoHash>(PutBuildPartResult.second, "\n "sv)); - } - UploadAttachments(PutBuildPartResult.second); - } - uint32_t FinalizeBuildPartRetryCount = 5; - while (!m_AbortFlag && (FinalizeBuildPartRetryCount--) > 0) - { - Stopwatch FinalizeBuildPartTimer; - std::vector<IoHash> Needs = m_Storage.BuildStorage->FinalizeBuildPart(m_BuildId, m_BuildPartId, PartHash); - if (!m_Options.IsQuiet) - { - LOG_OUTPUT(m_LogOutput, - "FinalizeBuildPart took {}. {} attachments are missing.", - NiceTimeSpanMs(FinalizeBuildPartTimer.GetElapsedTimeMs()), - Needs.size()); + for (size_t BlockIndex = 0; BlockIndex < NewBlocks.BlockHeaders.size(); BlockIndex++) + { + if (NewBlocks.BlockHeaders[BlockIndex]) + { + // Block was not uploaded during generation + ForceUploadChunkHashes.push_back(NewBlocks.BlockDescriptions[BlockIndex].BlockHash); + } + } + UploadAttachments(ForceUploadChunkHashes); } - if (Needs.empty()) + else if (!PutBuildPartResult.second.empty()) { - break; + if (m_Options.IsVerbose) + { + LOG_OUTPUT(m_LogOutput, "PutBuildPart needs attachments: {}", FormatArray<IoHash>(PutBuildPartResult.second, "\n "sv)); + } + UploadAttachments(PutBuildPartResult.second); } - if (m_Options.IsVerbose) + + uint32_t FinalizeBuildPartRetryCount = 5; + while (!m_AbortFlag && (FinalizeBuildPartRetryCount--) > 0) { - LOG_OUTPUT(m_LogOutput, "FinalizeBuildPart needs attachments: {}", FormatArray<IoHash>(Needs, "\n "sv)); + Stopwatch FinalizeBuildPartTimer; + std::vector<IoHash> Needs = m_Storage.BuildStorage->FinalizeBuildPart(m_BuildId, m_BuildPartId, PartHash); + if (!m_Options.IsQuiet) + { + LOG_OUTPUT(m_LogOutput, + "FinalizeBuildPart took {}. {} attachments are missing.", + NiceTimeSpanMs(FinalizeBuildPartTimer.GetElapsedTimeMs()), + Needs.size()); + } + if (Needs.empty()) + { + break; + } + if (m_Options.IsVerbose) + { + LOG_OUTPUT(m_LogOutput, "FinalizeBuildPart needs attachments: {}", FormatArray<IoHash>(Needs, "\n "sv)); + } + UploadAttachments(Needs); } - UploadAttachments(Needs); - } - m_LogOutput.SetLogOperationProgress(TaskSteps::FinalizeBuild, TaskSteps::StepCount); + m_LogOutput.SetLogOperationProgress(TaskSteps::FinalizeBuild, TaskSteps::StepCount); - if (m_CreateBuild && !m_AbortFlag) - { - Stopwatch FinalizeBuildTimer; - m_Storage.BuildStorage->FinalizeBuild(m_BuildId); - if (!m_Options.IsQuiet) + if (m_CreateBuild && !m_AbortFlag) { - LOG_OUTPUT(m_LogOutput, "FinalizeBuild took {}", NiceTimeSpanMs(FinalizeBuildTimer.GetElapsedTimeMs())); + Stopwatch FinalizeBuildTimer; + m_Storage.BuildStorage->FinalizeBuild(m_BuildId); + if (!m_Options.IsQuiet) + { + LOG_OUTPUT(m_LogOutput, "FinalizeBuild took {}", NiceTimeSpanMs(FinalizeBuildTimer.GetElapsedTimeMs())); + } } - } - - if (!NewBlocks.BlockDescriptions.empty() && !m_AbortFlag) - { - uint64_t UploadBlockMetadataCount = 0; - Stopwatch UploadBlockMetadataTimer; - uint32_t FailedMetadataUploadCount = 1; - int32_t MetadataUploadRetryCount = 3; - while ((MetadataUploadRetryCount-- > 0) && (FailedMetadataUploadCount > 0)) + if (!NewBlocks.BlockDescriptions.empty() && !m_AbortFlag) { - FailedMetadataUploadCount = 0; - for (size_t BlockIndex = 0; BlockIndex < NewBlocks.BlockDescriptions.size(); BlockIndex++) + uint64_t UploadBlockMetadataCount = 0; + Stopwatch UploadBlockMetadataTimer; + + uint32_t FailedMetadataUploadCount = 1; + int32_t MetadataUploadRetryCount = 3; + while ((MetadataUploadRetryCount-- > 0) && (FailedMetadataUploadCount > 0)) { - if (m_AbortFlag) + FailedMetadataUploadCount = 0; + for (size_t BlockIndex = 0; BlockIndex < NewBlocks.BlockDescriptions.size(); BlockIndex++) { - break; - } - const IoHash& BlockHash = NewBlocks.BlockDescriptions[BlockIndex].BlockHash; - if (!NewBlocks.MetaDataHasBeenUploaded[BlockIndex]) - { - const CbObject BlockMetaData = - BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]); - if (m_Storage.BuildCacheStorage) + if (m_AbortFlag) { - m_Storage.BuildCacheStorage->PutBlobMetadatas(m_BuildId, - std::vector<IoHash>({BlockHash}), - std::vector<CbObject>({BlockMetaData})); + break; } - bool MetadataSucceeded = m_Storage.BuildStorage->PutBlockMetadata(m_BuildId, BlockHash, BlockMetaData); - if (MetadataSucceeded) + const IoHash& BlockHash = NewBlocks.BlockDescriptions[BlockIndex].BlockHash; + if (!NewBlocks.MetaDataHasBeenUploaded[BlockIndex]) { - m_UploadStats.BlocksBytes += BlockMetaData.GetSize(); - NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; - UploadBlockMetadataCount++; - } - else - { - FailedMetadataUploadCount++; + const CbObject BlockMetaData = + BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]); + if (m_Storage.BuildCacheStorage) + { + m_Storage.BuildCacheStorage->PutBlobMetadatas(m_BuildId, + std::vector<IoHash>({BlockHash}), + std::vector<CbObject>({BlockMetaData})); + } + bool MetadataSucceeded = m_Storage.BuildStorage->PutBlockMetadata(m_BuildId, BlockHash, BlockMetaData); + if (MetadataSucceeded) + { + m_UploadStats.BlocksBytes += BlockMetaData.GetSize(); + NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; + UploadBlockMetadataCount++; + } + else + { + FailedMetadataUploadCount++; + } } } } - } - if (UploadBlockMetadataCount > 0) - { - uint64_t ElapsedUS = UploadBlockMetadataTimer.GetElapsedTimeUs(); - m_UploadStats.ElapsedWallTimeUS += ElapsedUS; - if (!m_Options.IsQuiet) + if (UploadBlockMetadataCount > 0) { - LOG_OUTPUT(m_LogOutput, - "Uploaded metadata for {} blocks in {}", - UploadBlockMetadataCount, - NiceTimeSpanMs(ElapsedUS / 1000)); + uint64_t ElapsedUS = UploadBlockMetadataTimer.GetElapsedTimeUs(); + m_UploadStats.ElapsedWallTimeUS += ElapsedUS; + if (!m_Options.IsQuiet) + { + LOG_OUTPUT(m_LogOutput, + "Uploaded metadata for {} blocks in {}", + UploadBlockMetadataCount, + NiceTimeSpanMs(ElapsedUS / 1000)); + } } } - } - m_Storage.BuildStorage->PutBuildPartStats( - m_BuildId, - m_BuildPartId, - {{"totalSize", double(m_LocalFolderScanStats.FoundFileByteCount.load())}, - {"reusedRatio", AcceptedByteCountPercent / 100.0}, - {"reusedBlockCount", double(m_FindBlocksStats.AcceptedBlockCount)}, - {"reusedBlockByteCount", double(m_FindBlocksStats.AcceptedRawByteCount)}, - {"newBlockCount", double(m_FindBlocksStats.NewBlocksCount)}, - {"newBlockByteCount", double(m_FindBlocksStats.NewBlocksChunkByteCount)}, - {"uploadedCount", double(m_UploadStats.BlockCount.load() + m_UploadStats.ChunkCount.load())}, - {"uploadedByteCount", double(m_UploadStats.BlocksBytes.load() + m_UploadStats.ChunksBytes.load())}, - {"uploadedBytesPerSec", - double(GetBytesPerSecond(m_UploadStats.ElapsedWallTimeUS, m_UploadStats.ChunksBytes + m_UploadStats.BlocksBytes))}, - {"elapsedTimeSec", double(ProcessTimer.GetElapsedTimeMs() / 1000.0)}}); - - m_LogOutput.SetLogOperationProgress(TaskSteps::Cleanup, TaskSteps::StepCount); + m_Storage.BuildStorage->PutBuildPartStats( + m_BuildId, + m_BuildPartId, + {{"totalSize", double(m_LocalFolderScanStats.FoundFileByteCount.load())}, + {"reusedRatio", AcceptedByteCountPercent / 100.0}, + {"reusedBlockCount", double(m_FindBlocksStats.AcceptedBlockCount)}, + {"reusedBlockByteCount", double(m_FindBlocksStats.AcceptedRawByteCount)}, + {"newBlockCount", double(m_FindBlocksStats.NewBlocksCount)}, + {"newBlockByteCount", double(m_FindBlocksStats.NewBlocksChunkByteCount)}, + {"uploadedCount", double(m_UploadStats.BlockCount.load() + m_UploadStats.ChunkCount.load())}, + {"uploadedByteCount", double(m_UploadStats.BlocksBytes.load() + m_UploadStats.ChunksBytes.load())}, + {"uploadedBytesPerSec", + double(GetBytesPerSecond(m_UploadStats.ElapsedWallTimeUS, m_UploadStats.ChunksBytes + m_UploadStats.BlocksBytes))}, + {"elapsedTimeSec", double(ProcessTimer.GetElapsedTimeMs() / 1000.0)}}); + + m_LogOutput.SetLogOperationProgress(TaskSteps::Cleanup, TaskSteps::StepCount); + } + catch (const std::exception&) + { + m_AbortFlag = true; + throw; + } } std::vector<std::filesystem::path> @@ -6797,260 +6831,272 @@ void BuildsOperationValidateBuildPart::Execute() { ZEN_TRACE_CPU("ValidateBuildPart"); - - enum TaskSteps : uint32_t + try { - FetchBuild, - FetchBuildPart, - ValidateBlobs, - Cleanup, - StepCount - }; + enum TaskSteps : uint32_t + { + FetchBuild, + FetchBuildPart, + ValidateBlobs, + Cleanup, + StepCount + }; - auto EndProgress = MakeGuard([&]() { m_LogOutput.SetLogOperationProgress(TaskSteps::StepCount, TaskSteps::StepCount); }); + auto EndProgress = MakeGuard([&]() { m_LogOutput.SetLogOperationProgress(TaskSteps::StepCount, TaskSteps::StepCount); }); - Stopwatch Timer; - auto _ = MakeGuard([&]() { - if (!m_Options.IsQuiet) - { - ZEN_CONSOLE("Validated build part {}/{} ('{}') in {}", - m_BuildId, - m_BuildPartId, - m_BuildPartName, - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - } - }); + Stopwatch Timer; + auto _ = MakeGuard([&]() { + if (!m_Options.IsQuiet) + { + ZEN_CONSOLE("Validated build part {}/{} ('{}') in {}", + m_BuildId, + m_BuildPartId, + m_BuildPartName, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + } + }); - m_LogOutput.SetLogOperationProgress(TaskSteps::FetchBuild, TaskSteps::StepCount); + m_LogOutput.SetLogOperationProgress(TaskSteps::FetchBuild, TaskSteps::StepCount); - CbObject Build = m_Storage.GetBuild(m_BuildId); - if (!m_BuildPartName.empty()) - { - m_BuildPartId = Build["parts"sv].AsObjectView()[m_BuildPartName].AsObjectId(); - if (m_BuildPartId == Oid::Zero) + CbObject Build = m_Storage.GetBuild(m_BuildId); + if (!m_BuildPartName.empty()) { - throw std::runtime_error(fmt::format("Build {} does not have a part named '{}'", m_BuildId, m_BuildPartName)); + m_BuildPartId = Build["parts"sv].AsObjectView()[m_BuildPartName].AsObjectId(); + if (m_BuildPartId == Oid::Zero) + { + throw std::runtime_error(fmt::format("Build {} does not have a part named '{}'", m_BuildId, m_BuildPartName)); + } + } + m_ValidateStats.BuildBlobSize = Build.GetSize(); + uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u; + if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0) + { + PreferredMultipartChunkSize = ChunkSize; } - } - m_ValidateStats.BuildBlobSize = Build.GetSize(); - uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u; - if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0) - { - PreferredMultipartChunkSize = ChunkSize; - } - - m_LogOutput.SetLogOperationProgress(TaskSteps::FetchBuildPart, TaskSteps::StepCount); - - CbObject BuildPart = m_Storage.GetBuildPart(m_BuildId, m_BuildPartId); - m_ValidateStats.BuildPartSize = BuildPart.GetSize(); - if (!m_Options.IsQuiet) - { - ZEN_CONSOLE("Validating build part {}/{} ({})", m_BuildId, m_BuildPartId, NiceBytes(BuildPart.GetSize())); - } - std::vector<IoHash> ChunkAttachments; - for (CbFieldView LooseFileView : BuildPart["chunkAttachments"sv].AsObjectView()["rawHashes"sv]) - { - ChunkAttachments.push_back(LooseFileView.AsBinaryAttachment()); - } - m_ValidateStats.ChunkAttachmentCount = ChunkAttachments.size(); - std::vector<IoHash> BlockAttachments; - for (CbFieldView BlocksView : BuildPart["blockAttachments"sv].AsObjectView()["rawHashes"sv]) - { - BlockAttachments.push_back(BlocksView.AsBinaryAttachment()); - } - m_ValidateStats.BlockAttachmentCount = BlockAttachments.size(); - - std::vector<ChunkBlockDescription> VerifyBlockDescriptions = - ParseChunkBlockDescriptionList(m_Storage.GetBlockMetadatas(m_BuildId, BlockAttachments)); - if (VerifyBlockDescriptions.size() != BlockAttachments.size()) - { - throw std::runtime_error(fmt::format("Uploaded blocks metadata could not all be found, {} blocks metadata is missing", - BlockAttachments.size() - VerifyBlockDescriptions.size())); - } - ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + m_LogOutput.SetLogOperationProgress(TaskSteps::FetchBuildPart, TaskSteps::StepCount); - const std::filesystem::path TempFolder = ".zen-tmp"; + CbObject BuildPart = m_Storage.GetBuildPart(m_BuildId, m_BuildPartId); + m_ValidateStats.BuildPartSize = BuildPart.GetSize(); + if (!m_Options.IsQuiet) + { + ZEN_CONSOLE("Validating build part {}/{} ({})", m_BuildId, m_BuildPartId, NiceBytes(BuildPart.GetSize())); + } + std::vector<IoHash> ChunkAttachments; + for (CbFieldView LooseFileView : BuildPart["chunkAttachments"sv].AsObjectView()["rawHashes"sv]) + { + ChunkAttachments.push_back(LooseFileView.AsBinaryAttachment()); + } + m_ValidateStats.ChunkAttachmentCount = ChunkAttachments.size(); + std::vector<IoHash> BlockAttachments; + for (CbFieldView BlocksView : BuildPart["blockAttachments"sv].AsObjectView()["rawHashes"sv]) + { + BlockAttachments.push_back(BlocksView.AsBinaryAttachment()); + } + m_ValidateStats.BlockAttachmentCount = BlockAttachments.size(); - CreateDirectories(TempFolder); - auto __ = MakeGuard([&TempFolder]() { - if (CleanDirectory(TempFolder, {})) + std::vector<ChunkBlockDescription> VerifyBlockDescriptions = + ParseChunkBlockDescriptionList(m_Storage.GetBlockMetadatas(m_BuildId, BlockAttachments)); + if (VerifyBlockDescriptions.size() != BlockAttachments.size()) { - std::error_code DummyEc; - RemoveDir(TempFolder, DummyEc); + throw std::runtime_error(fmt::format("Uploaded blocks metadata could not all be found, {} blocks metadata is missing", + BlockAttachments.size() - VerifyBlockDescriptions.size())); } - }); - m_LogOutput.SetLogOperationProgress(TaskSteps::ValidateBlobs, TaskSteps::StepCount); + ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - std::unique_ptr<BuildOpLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Validate Blobs")); - BuildOpLogOutput::ProgressBar& Progress(*ProgressBarPtr); + const std::filesystem::path TempFolder = ".zen-tmp"; - uint64_t AttachmentsToVerifyCount = ChunkAttachments.size() + BlockAttachments.size(); - FilteredRate FilteredDownloadedBytesPerSecond; - FilteredRate FilteredVerifiedBytesPerSecond; + CreateDirectories(TempFolder); + auto __ = MakeGuard([&TempFolder]() { + if (CleanDirectory(TempFolder, {})) + { + std::error_code DummyEc; + RemoveDir(TempFolder, DummyEc); + } + }); - std::atomic<uint64_t> MultipartAttachmentCount = 0; + m_LogOutput.SetLogOperationProgress(TaskSteps::ValidateBlobs, TaskSteps::StepCount); - for (const IoHash& ChunkAttachment : ChunkAttachments) - { - Work.ScheduleWork( - m_NetworkPool, - [this, - &Work, - AttachmentsToVerifyCount, - &TempFolder, - PreferredMultipartChunkSize, - &FilteredDownloadedBytesPerSecond, - &FilteredVerifiedBytesPerSecond, - ChunkAttachment](std::atomic<bool>&) { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("ValidateBuildPart_GetChunk"); - - FilteredDownloadedBytesPerSecond.Start(); - DownloadLargeBlob( - m_Storage, - TempFolder, - m_BuildId, - ChunkAttachment, - PreferredMultipartChunkSize, - Work, - m_NetworkPool, - m_DownloadStats.DownloadedChunkByteCount, - m_DownloadStats.MultipartAttachmentCount, - [this, - &Work, - AttachmentsToVerifyCount, - &FilteredDownloadedBytesPerSecond, - &FilteredVerifiedBytesPerSecond, - ChunkHash = ChunkAttachment](IoBuffer&& Payload) { - m_DownloadStats.DownloadedChunkCount++; - Payload.SetContentType(ZenContentType::kCompressedBinary); - if (!m_AbortFlag) - { - Work.ScheduleWork( - m_IOWorkerPool, - [this, - AttachmentsToVerifyCount, - &FilteredDownloadedBytesPerSecond, - &FilteredVerifiedBytesPerSecond, - Payload = std::move(Payload), - ChunkHash](std::atomic<bool>&) mutable { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("ValidateBuildPart_Validate"); + std::unique_ptr<BuildOpLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Validate Blobs")); + BuildOpLogOutput::ProgressBar& Progress(*ProgressBarPtr); - if (m_DownloadStats.DownloadedChunkCount + m_DownloadStats.DownloadedBlockCount == - AttachmentsToVerifyCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } + uint64_t AttachmentsToVerifyCount = ChunkAttachments.size() + BlockAttachments.size(); + FilteredRate FilteredDownloadedBytesPerSecond; + FilteredRate FilteredVerifiedBytesPerSecond; + + std::atomic<uint64_t> MultipartAttachmentCount = 0; - FilteredVerifiedBytesPerSecond.Start(); + for (const IoHash& ChunkAttachment : ChunkAttachments) + { + Work.ScheduleWork( + m_NetworkPool, + [this, + &Work, + AttachmentsToVerifyCount, + &TempFolder, + PreferredMultipartChunkSize, + &FilteredDownloadedBytesPerSecond, + &FilteredVerifiedBytesPerSecond, + ChunkAttachment](std::atomic<bool>&) { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("ValidateBuildPart_GetChunk"); - uint64_t CompressedSize; - uint64_t DecompressedSize; - ValidateBlob(m_AbortFlag, std::move(Payload), ChunkHash, CompressedSize, DecompressedSize); - m_ValidateStats.VerifiedAttachmentCount++; - m_ValidateStats.VerifiedByteCount += DecompressedSize; - if (m_ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount) + FilteredDownloadedBytesPerSecond.Start(); + DownloadLargeBlob( + m_Storage, + TempFolder, + m_BuildId, + ChunkAttachment, + PreferredMultipartChunkSize, + Work, + m_NetworkPool, + m_DownloadStats.DownloadedChunkByteCount, + m_DownloadStats.MultipartAttachmentCount, + [this, + &Work, + AttachmentsToVerifyCount, + &FilteredDownloadedBytesPerSecond, + &FilteredVerifiedBytesPerSecond, + ChunkHash = ChunkAttachment](IoBuffer&& Payload) { + m_DownloadStats.DownloadedChunkCount++; + Payload.SetContentType(ZenContentType::kCompressedBinary); + if (!m_AbortFlag) + { + Work.ScheduleWork( + m_IOWorkerPool, + [this, + AttachmentsToVerifyCount, + &FilteredDownloadedBytesPerSecond, + &FilteredVerifiedBytesPerSecond, + Payload = std::move(Payload), + ChunkHash](std::atomic<bool>&) mutable { + if (!m_AbortFlag) { - FilteredVerifiedBytesPerSecond.Stop(); - } - } - }); - } - }); - } - }); - } + ZEN_TRACE_CPU("ValidateBuildPart_Validate"); - for (const IoHash& BlockAttachment : BlockAttachments) - { - Work.ScheduleWork( - m_NetworkPool, - [this, &Work, AttachmentsToVerifyCount, &FilteredDownloadedBytesPerSecond, &FilteredVerifiedBytesPerSecond, BlockAttachment]( - std::atomic<bool>&) { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("ValidateBuildPart_GetBlock"); + if (m_DownloadStats.DownloadedChunkCount + m_DownloadStats.DownloadedBlockCount == + AttachmentsToVerifyCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } - FilteredDownloadedBytesPerSecond.Start(); - IoBuffer Payload = m_Storage.GetBuildBlob(m_BuildId, BlockAttachment); - m_DownloadStats.DownloadedBlockCount++; - m_DownloadStats.DownloadedBlockByteCount += Payload.GetSize(); - if (m_DownloadStats.DownloadedChunkCount + m_DownloadStats.DownloadedBlockCount == AttachmentsToVerifyCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - if (!Payload) - { - throw std::runtime_error(fmt::format("Block attachment {} could not be found", BlockAttachment)); + FilteredVerifiedBytesPerSecond.Start(); + + uint64_t CompressedSize; + uint64_t DecompressedSize; + ValidateBlob(m_AbortFlag, std::move(Payload), ChunkHash, CompressedSize, DecompressedSize); + m_ValidateStats.VerifiedAttachmentCount++; + m_ValidateStats.VerifiedByteCount += DecompressedSize; + if (m_ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount) + { + FilteredVerifiedBytesPerSecond.Stop(); + } + } + }); + } + }); } + }); + } + + for (const IoHash& BlockAttachment : BlockAttachments) + { + Work.ScheduleWork( + m_NetworkPool, + [this, + &Work, + AttachmentsToVerifyCount, + &FilteredDownloadedBytesPerSecond, + &FilteredVerifiedBytesPerSecond, + BlockAttachment](std::atomic<bool>&) { if (!m_AbortFlag) { - Work.ScheduleWork(m_IOWorkerPool, - [this, - &FilteredVerifiedBytesPerSecond, - AttachmentsToVerifyCount, - Payload = std::move(Payload), - BlockAttachment](std::atomic<bool>&) mutable { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("ValidateBuildPart_ValidateBlock"); + ZEN_TRACE_CPU("ValidateBuildPart_GetBlock"); + + FilteredDownloadedBytesPerSecond.Start(); + IoBuffer Payload = m_Storage.GetBuildBlob(m_BuildId, BlockAttachment); + m_DownloadStats.DownloadedBlockCount++; + m_DownloadStats.DownloadedBlockByteCount += Payload.GetSize(); + if (m_DownloadStats.DownloadedChunkCount + m_DownloadStats.DownloadedBlockCount == AttachmentsToVerifyCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + if (!Payload) + { + throw std::runtime_error(fmt::format("Block attachment {} could not be found", BlockAttachment)); + } + if (!m_AbortFlag) + { + Work.ScheduleWork( + m_IOWorkerPool, + [this, + &FilteredVerifiedBytesPerSecond, + AttachmentsToVerifyCount, + Payload = std::move(Payload), + BlockAttachment](std::atomic<bool>&) mutable { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("ValidateBuildPart_ValidateBlock"); - FilteredVerifiedBytesPerSecond.Start(); + FilteredVerifiedBytesPerSecond.Start(); - uint64_t CompressedSize; - uint64_t DecompressedSize; - ValidateChunkBlock(std::move(Payload), BlockAttachment, CompressedSize, DecompressedSize); - m_ValidateStats.VerifiedAttachmentCount++; - m_ValidateStats.VerifiedByteCount += DecompressedSize; - if (m_ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount) - { - FilteredVerifiedBytesPerSecond.Stop(); - } - } - }); + uint64_t CompressedSize; + uint64_t DecompressedSize; + ValidateChunkBlock(std::move(Payload), BlockAttachment, CompressedSize, DecompressedSize); + m_ValidateStats.VerifiedAttachmentCount++; + m_ValidateStats.VerifiedByteCount += DecompressedSize; + if (m_ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount) + { + FilteredVerifiedBytesPerSecond.Stop(); + } + } + }); + } } - } - }); - } + }); + } - Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(PendingWork); - - const uint64_t DownloadedAttachmentCount = m_DownloadStats.DownloadedChunkCount + m_DownloadStats.DownloadedBlockCount; - const uint64_t DownloadedByteCount = m_DownloadStats.DownloadedChunkByteCount + m_DownloadStats.DownloadedBlockByteCount; - - FilteredDownloadedBytesPerSecond.Update(DownloadedByteCount); - FilteredVerifiedBytesPerSecond.Update(m_ValidateStats.VerifiedByteCount); - - std::string Details = fmt::format("Downloaded {}/{} ({}, {}bits/s). Verified {}/{} ({}, {}B/s)", - DownloadedAttachmentCount, - AttachmentsToVerifyCount, - NiceBytes(DownloadedByteCount), - NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8), - m_ValidateStats.VerifiedAttachmentCount.load(), - AttachmentsToVerifyCount, - NiceBytes(m_ValidateStats.VerifiedByteCount.load()), - NiceNum(FilteredVerifiedBytesPerSecond.GetCurrent())); - - Progress.UpdateState( - {.Task = "Validating blobs ", - .Details = Details, - .TotalCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2), - .RemainingCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2 - - (DownloadedAttachmentCount + m_ValidateStats.VerifiedAttachmentCount.load())), - .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, - false); - }); + Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); - Progress.Finish(); - m_ValidateStats.ElapsedWallTimeUS = Timer.GetElapsedTimeUs(); + const uint64_t DownloadedAttachmentCount = m_DownloadStats.DownloadedChunkCount + m_DownloadStats.DownloadedBlockCount; + const uint64_t DownloadedByteCount = m_DownloadStats.DownloadedChunkByteCount + m_DownloadStats.DownloadedBlockByteCount; + + FilteredDownloadedBytesPerSecond.Update(DownloadedByteCount); + FilteredVerifiedBytesPerSecond.Update(m_ValidateStats.VerifiedByteCount); + + std::string Details = fmt::format("Downloaded {}/{} ({}, {}bits/s). Verified {}/{} ({}, {}B/s)", + DownloadedAttachmentCount, + AttachmentsToVerifyCount, + NiceBytes(DownloadedByteCount), + NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8), + m_ValidateStats.VerifiedAttachmentCount.load(), + AttachmentsToVerifyCount, + NiceBytes(m_ValidateStats.VerifiedByteCount.load()), + NiceNum(FilteredVerifiedBytesPerSecond.GetCurrent())); + + Progress.UpdateState( + {.Task = "Validating blobs ", + .Details = Details, + .TotalCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2), + .RemainingCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2 - + (DownloadedAttachmentCount + m_ValidateStats.VerifiedAttachmentCount.load())), + .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, + false); + }); + + Progress.Finish(); + m_ValidateStats.ElapsedWallTimeUS = Timer.GetElapsedTimeUs(); - m_LogOutput.SetLogOperationProgress(TaskSteps::Cleanup, TaskSteps::StepCount); + m_LogOutput.SetLogOperationProgress(TaskSteps::Cleanup, TaskSteps::StepCount); + } + catch (const std::exception&) + { + m_AbortFlag = true; + throw; + } } CompositeBuffer diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h index 7e3903892..8b5b63ef9 100644 --- a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h +++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h @@ -197,6 +197,7 @@ public: bool PrimeCacheOnly = false; bool EnableOtherDownloadsScavenging = true; bool EnableTargetFolderScavenging = true; + bool ValidateCompletedSequences = true; std::vector<std::string> ExcludeFolders; std::vector<std::string> ExcludeExtensions; }; @@ -378,6 +379,8 @@ private: BufferedWriteFileCache& WriteCache, IoBuffer&& CompressedPart); + void StreamDecompress(const IoHash& SequenceRawHash, CompositeBuffer&& CompressedPart); + void WriteSequenceChunkToCache(BufferedWriteFileCache::Local& LocalWriter, const CompositeBuffer& Chunk, const uint32_t SequenceIndex, |