// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include ZEN_THIRD_PARTY_INCLUDES_END namespace zen { using namespace std::literals; namespace { std::filesystem::path ZenTempCacheFolderPath(const std::filesystem::path& ZenFolderPath) { return ZenTempFolderPath(ZenFolderPath) / "cache"; // Decompressed and verified data - chunks & sequences } std::filesystem::path ZenTempBlockFolderPath(const std::filesystem::path& ZenFolderPath) { return ZenTempFolderPath(ZenFolderPath) / "blocks"; // Temp storage for whole and partial blocks } std::filesystem::path ZenTempDownloadFolderPath(const std::filesystem::path& ZenFolderPath) { return ZenTempFolderPath(ZenFolderPath) / "download"; // Temp storage for decompressed and validated chunks } std::filesystem::path GetTempChunkedSequenceFileName(const std::filesystem::path& CacheFolderPath, const IoHash& RawHash) { return CacheFolderPath / (RawHash.ToHexString() + ".tmp"); } std::filesystem::path GetFinalChunkedSequenceFileName(const std::filesystem::path& CacheFolderPath, const IoHash& RawHash) { return CacheFolderPath / RawHash.ToHexString(); } bool CleanDirectory(LoggerRef InLog, ProgressBase& Progress, WorkerThreadPool& IOWorkerPool, std::atomic& AbortFlag, std::atomic& PauseFlag, bool IsQuiet, const std::filesystem::path& Path, std::span ExcludeDirectories) { ZEN_TRACE_CPU("CleanDirectory"); ZEN_SCOPED_LOG(InLog); Stopwatch Timer; std::unique_ptr ProgressBar = Progress.CreateProgressBar("Clean Folder"); CleanDirectoryResult Result = CleanDirectory( IOWorkerPool, AbortFlag, PauseFlag, Path, ExcludeDirectories, [&](const std::string_view Details, uint64_t TotalCount, uint64_t RemainingCount, bool IsPaused, bool IsAborted) { ProgressBar->UpdateState({.Task = "Cleaning folder ", .Details = std::string(Details), .TotalCount = TotalCount, .RemainingCount = RemainingCount, .Status = ProgressBase::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }, Progress.GetProgressUpdateDelayMS()); ProgressBar->Finish(); if (AbortFlag) { return false; } uint64_t ElapsedTimeMs = Timer.GetElapsedTimeMs(); if (!Result.FailedRemovePaths.empty()) { ExtendableStringBuilder<512> SB; for (size_t FailedPathIndex = 0; FailedPathIndex < Result.FailedRemovePaths.size(); FailedPathIndex++) { SB << fmt::format("\n '{}': ({}) {}", Result.FailedRemovePaths[FailedPathIndex].first, Result.FailedRemovePaths[FailedPathIndex].second.value(), Result.FailedRemovePaths[FailedPathIndex].second.message()); } ZEN_WARN("Clean failed to remove files from '{}': {}", Path, SB.ToView()); } if (ElapsedTimeMs >= 200 && !IsQuiet) { ZEN_INFO("Wiped folder '{}' {} ({}) in {}", Path, Result.FoundCount, NiceBytes(Result.DeletedByteCount), NiceTimeSpanMs(ElapsedTimeMs)); } return Result.FailedRemovePaths.empty(); } uint32_t SetNativeFileAttributes(const std::filesystem::path FilePath, SourcePlatform SourcePlatform, uint32_t Attributes) { #if ZEN_PLATFORM_WINDOWS if (SourcePlatform == SourcePlatform::Windows) { SetFileAttributesToPath(FilePath, Attributes); return Attributes; } else { uint32_t CurrentAttributes = GetFileAttributesFromPath(FilePath); uint32_t NewAttributes = zen::MakeFileAttributeReadOnly(CurrentAttributes, zen::IsFileModeReadOnly(Attributes)); if (CurrentAttributes != NewAttributes) { SetFileAttributesToPath(FilePath, NewAttributes); } return NewAttributes; } #endif // ZEN_PLATFORM_WINDOWS #if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC if (SourcePlatform != SourcePlatform::Windows) { zen::SetFileMode(FilePath, Attributes); return Attributes; } else { uint32_t CurrentMode = zen::GetFileMode(FilePath); uint32_t NewMode = zen::MakeFileModeReadOnly(CurrentMode, zen::IsFileAttributeReadOnly(Attributes)); if (CurrentMode != NewMode) { zen::SetFileMode(FilePath, NewMode); } return NewMode; } #endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC }; uint32_t GetNativeFileAttributes(const std::filesystem::path FilePath) { #if ZEN_PLATFORM_WINDOWS return GetFileAttributesFromPath(FilePath); #endif // ZEN_PLATFORM_WINDOWS #if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC return GetFileMode(FilePath); #endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC } std::filesystem::path TryMoveDownloadedChunk(IoBuffer& BlockBuffer, const std::filesystem::path& Path, bool ForceDiskBased) { uint64_t BlockSize = BlockBuffer.GetSize(); IoBufferFileReference FileRef; if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == BlockSize)) { ZEN_TRACE_CPU("MoveTempFullBlock"); std::error_code Ec; std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); if (!Ec) { BlockBuffer.SetDeleteOnClose(false); BlockBuffer = {}; RenameFile(TempBlobPath, Path, Ec); if (Ec) { // Re-open the temp file again BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); BlockBuffer.SetDeleteOnClose(true); } else { return Path; } } } if (ForceDiskBased) { // Could not be moved and rather large, lets store it on disk ZEN_TRACE_CPU("WriteTempFullBlock"); TemporaryFile::SafeWriteFile(Path, BlockBuffer); BlockBuffer = {}; return Path; } return {}; } bool IsSingleFileChunk(const ChunkedFolderContent& RemoteContent, const std::vector Locations) { if (Locations.size() == 1) { const uint32_t FirstSequenceIndex = Locations[0]->SequenceIndex; if (RemoteContent.ChunkedContent.ChunkCounts[FirstSequenceIndex] == 1) { ZEN_ASSERT_SLOW(Locations[0]->Offset == 0); return true; } } return false; } IoBuffer MakeBufferMemoryBased(const CompositeBuffer& PartialBlockBuffer) { ZEN_TRACE_CPU("MakeBufferMemoryBased"); IoBuffer BlockMemoryBuffer; std::span Segments = PartialBlockBuffer.GetSegments(); if (Segments.size() == 1) { IoBufferFileReference FileRef = {}; if (PartialBlockBuffer.GetSegments().front().AsIoBuffer().GetFileReference(FileRef)) { BlockMemoryBuffer = UniqueBuffer::Alloc(FileRef.FileChunkSize).MoveToShared().AsIoBuffer(); BasicFile Reader; Reader.Attach(FileRef.FileHandle); auto _ = MakeGuard([&Reader]() { Reader.Detach(); }); MutableMemoryView ReadMem = BlockMemoryBuffer.GetMutableView(); Reader.Read(ReadMem.GetData(), FileRef.FileChunkSize, FileRef.FileChunkOffset); return BlockMemoryBuffer; } else { return PartialBlockBuffer.GetSegments().front().AsIoBuffer(); } } else { // Not a homogenous memory buffer, read all to memory BlockMemoryBuffer = UniqueBuffer::Alloc(PartialBlockBuffer.GetSize()).MoveToShared().AsIoBuffer(); MutableMemoryView ReadMem = BlockMemoryBuffer.GetMutableView(); for (const SharedBuffer& Segment : Segments) { IoBufferFileReference FileRef = {}; if (Segment.AsIoBuffer().GetFileReference(FileRef)) { BasicFile Reader; Reader.Attach(FileRef.FileHandle); auto _ = MakeGuard([&Reader]() { Reader.Detach(); }); Reader.Read(ReadMem.GetData(), FileRef.FileChunkSize, FileRef.FileChunkOffset); ReadMem = ReadMem.Mid(FileRef.FileChunkSize); } else { ReadMem = ReadMem.CopyFrom(Segment.AsIoBuffer().GetView()); } } return BlockMemoryBuffer; } } FolderContent CheckFolderFiles(ProgressBase& Progress, std::atomic& AbortFlag, std::atomic& PauseFlag, std::string_view ProgressLabel, TransferThreadWorkers& Workers, GetFolderContentStatistics& LocalFolderScanStats, const std::filesystem::path& Path, std::span PathsToCheck) { std::unique_ptr ProgressBar = Progress.CreateProgressBar(ProgressLabel); FolderContent Result = GetValidFolderContent( Workers.GetIOWorkerPool(), LocalFolderScanStats, Path, PathsToCheck, [&ProgressBar, &LocalFolderScanStats, &AbortFlag, &PauseFlag](uint64_t PathCount, uint64_t CompletedPathCount) { std::string Details = fmt::format("{}/{} checked, {} found", CompletedPathCount, PathCount, LocalFolderScanStats.FoundFileCount.load()); ProgressBar->UpdateState({.Task = "Checking files ", .Details = Details, .TotalCount = PathCount, .RemainingCount = PathCount - CompletedPathCount, .Status = ProgressBase::ProgressBar::State::CalculateStatus(AbortFlag, PauseFlag)}, false); }, Progress.GetProgressUpdateDelayMS(), AbortFlag, PauseFlag); ProgressBar->Finish(); return Result; } ChunkedFolderContent ScanFolderFiles(ProgressBase& Progress, std::atomic& AbortFlag, std::atomic& PauseFlag, std::string_view ProgressLabel, TransferThreadWorkers& Workers, const std::filesystem::path& Path, const FolderContent& FolderSource, ChunkingController& ChunkController, ChunkingCache& ChunkCache, ChunkingStatistics& OutChunkingStats) { uint64_t ByteCountToScan = 0; for (const uint64_t RawSize : FolderSource.RawSizes) { ByteCountToScan += RawSize; } std::unique_ptr ProgressBar = Progress.CreateProgressBar(ProgressLabel); FilteredRate FilteredBytesHashed; FilteredBytesHashed.Start(); ChunkingStatistics LocalChunkingStats; ChunkedFolderContent Result = ChunkFolderContent( LocalChunkingStats, Workers.GetIOWorkerPool(), Path, FolderSource, ChunkController, ChunkCache, Progress.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t) { FilteredBytesHashed.Update(LocalChunkingStats.BytesHashed.load()); std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found", LocalChunkingStats.FilesProcessed.load(), FolderSource.Paths.size(), NiceBytes(LocalChunkingStats.BytesHashed.load()), NiceBytes(ByteCountToScan), NiceNum(FilteredBytesHashed.GetCurrent()), LocalChunkingStats.UniqueChunksFound.load(), NiceBytes(LocalChunkingStats.UniqueBytesFound.load())); ProgressBar->UpdateState({.Task = "Scanning files ", .Details = Details, .TotalCount = ByteCountToScan, .RemainingCount = ByteCountToScan - LocalChunkingStats.BytesHashed.load(), .Status = ProgressBase::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }, AbortFlag, PauseFlag); OutChunkingStats += LocalChunkingStats; FilteredBytesHashed.Stop(); ProgressBar->Finish(); return Result; } } // namespace BuildsOperationUpdateFolder::BuildsOperationUpdateFolder(LoggerRef Log, ProgressBase& Progress, StorageInstance& Storage, std::atomic& AbortFlag, std::atomic& PauseFlag, WorkerThreadPool& IOWorkerPool, WorkerThreadPool& NetworkPool, const Oid& BuildId, const std::filesystem::path& Path, const ChunkedFolderContent& LocalContent, const ChunkedContentLookup& LocalLookup, const ChunkedFolderContent& RemoteContent, const ChunkedContentLookup& RemoteLookup, const std::vector& BlockDescriptions, const std::vector& LooseChunkHashes, const Options& Options) : m_Log(Log) , m_Progress(Progress) , m_Storage(Storage) , m_AbortFlag(AbortFlag) , m_PauseFlag(PauseFlag) , m_IOWorkerPool(IOWorkerPool) , m_NetworkPool(NetworkPool) , m_BuildId(BuildId) , m_Path(Path) , m_LocalContent(LocalContent) , m_LocalLookup(LocalLookup) , m_RemoteContent(RemoteContent) , m_RemoteLookup(RemoteLookup) , m_BlockDescriptions(BlockDescriptions) , m_LooseChunkHashes(LooseChunkHashes) , m_Options(Options) , m_CacheFolderPath(ZenTempCacheFolderPath(m_Options.ZenFolderPath)) , m_TempDownloadFolderPath(ZenTempDownloadFolderPath(m_Options.ZenFolderPath)) , m_TempBlockFolderPath(ZenTempBlockFolderPath(m_Options.ZenFolderPath)) { } void BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) { ZEN_TRACE_CPU("BuildsOperationUpdateFolder::Execute"); try { enum class TaskSteps : uint32_t { ScanExistingData, WriteChunks, PrepareTarget, FinalizeTarget, Cleanup, StepCount }; auto EndProgress = MakeGuard([&]() { m_Progress.SetLogOperationProgress((uint32_t)TaskSteps::StepCount, (uint32_t)TaskSteps::StepCount); }); m_Progress.SetLogOperationProgress((uint32_t)TaskSteps::ScanExistingData, (uint32_t)TaskSteps::StepCount); CreateDirectories(m_CacheFolderPath); CreateDirectories(m_TempDownloadFolderPath); CreateDirectories(m_TempBlockFolderPath); std::vector> SequenceIndexChunksLeftToWriteCounters(m_RemoteContent.ChunkedContent.SequenceRawHashes.size()); std::vector RemoteChunkIndexNeedsCopyFromLocalFileFlags(m_RemoteContent.ChunkedContent.ChunkHashes.size()); std::vector> RemoteChunkIndexNeedsCopyFromSourceFlags(m_RemoteContent.ChunkedContent.ChunkHashes.size()); tsl::robin_map CachedChunkHashesFound; tsl::robin_map CachedSequenceHashesFound; ScanCacheFolder(CachedChunkHashesFound, CachedSequenceHashesFound); tsl::robin_map CachedBlocksFound; ScanTempBlocksFolder(CachedBlocksFound); tsl::robin_map SequenceIndexesLeftToFindToRemoteIndex; InitializeSequenceCounters(SequenceIndexChunksLeftToWriteCounters, SequenceIndexesLeftToFindToRemoteIndex, CachedChunkHashesFound, CachedSequenceHashesFound); std::vector ScavengedContents; std::vector ScavengedLookups; std::vector ScavengedPaths; std::vector ScavengedSequenceCopyOperations; uint64_t ScavengedPathsCount = 0; if (m_Options.EnableOtherDownloadsScavenging) { ZEN_TRACE_CPU("GetScavengedSequences"); Stopwatch ScavengeTimer; if (!SequenceIndexesLeftToFindToRemoteIndex.empty()) { std::vector ScavengeSources = FindScavengeSources(); ScanScavengeSources(ScavengeSources, ScavengedContents, ScavengedLookups, ScavengedPaths); if (m_AbortFlag) { return; } MatchScavengedSequencesToRemote(ScavengedContents, ScavengedLookups, ScavengedPaths, SequenceIndexesLeftToFindToRemoteIndex, SequenceIndexChunksLeftToWriteCounters, ScavengedSequenceCopyOperations, ScavengedPathsCount); } m_CacheMappingStats.ScavengeElapsedWallTimeUs += ScavengeTimer.GetElapsedTimeUs(); } uint32_t RemainingChunkCount = 0; for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < m_RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++) { uint64_t ChunkWriteCount = GetChunkWriteCount(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); if (ChunkWriteCount > 0) { RemainingChunkCount++; } } // Pick up all chunks in current local state tsl::robin_map RawHashToCopyChunkDataIndex; std::vector CopyChunkDatas; if (m_Options.EnableTargetFolderScavenging) { ZEN_TRACE_CPU("GetLocalChunks"); Stopwatch LocalTimer; ScavengeSourceForChunks(RemainingChunkCount, RemoteChunkIndexNeedsCopyFromLocalFileFlags, RawHashToCopyChunkDataIndex, SequenceIndexChunksLeftToWriteCounters, m_LocalContent, m_LocalLookup, CopyChunkDatas, uint32_t(-1), m_CacheMappingStats.LocalChunkMatchingRemoteCount, m_CacheMappingStats.LocalChunkMatchingRemoteByteCount); m_CacheMappingStats.LocalScanElapsedWallTimeUs += LocalTimer.GetElapsedTimeUs(); } if (m_Options.EnableOtherDownloadsScavenging) { ZEN_TRACE_CPU("GetScavengeChunks"); Stopwatch ScavengeTimer; for (uint32_t ScavengedContentIndex = 0; ScavengedContentIndex < ScavengedContents.size() && (RemainingChunkCount > 0); ScavengedContentIndex++) { const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengedContentIndex]; const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengedContentIndex]; ScavengeSourceForChunks(RemainingChunkCount, RemoteChunkIndexNeedsCopyFromLocalFileFlags, RawHashToCopyChunkDataIndex, SequenceIndexChunksLeftToWriteCounters, ScavengedContent, ScavengedLookup, CopyChunkDatas, ScavengedContentIndex, m_CacheMappingStats.ScavengedChunkMatchingRemoteCount, m_CacheMappingStats.ScavengedChunkMatchingRemoteByteCount); } m_CacheMappingStats.ScavengeElapsedWallTimeUs += ScavengeTimer.GetElapsedTimeUs(); } if (!m_Options.IsQuiet) { if (m_CacheMappingStats.CacheSequenceHashesCount > 0 || m_CacheMappingStats.CacheChunkCount > 0 || m_CacheMappingStats.CacheBlockCount > 0) { ZEN_INFO("Download cache: Found {} ({}) chunk sequences, {} ({}) chunks, {} ({}) blocks in {}", m_CacheMappingStats.CacheSequenceHashesCount, NiceBytes(m_CacheMappingStats.CacheSequenceHashesByteCount), m_CacheMappingStats.CacheChunkCount, NiceBytes(m_CacheMappingStats.CacheChunkByteCount), m_CacheMappingStats.CacheBlockCount, NiceBytes(m_CacheMappingStats.CacheBlocksByteCount), NiceTimeSpanMs(m_CacheMappingStats.CacheScanElapsedWallTimeUs / 1000)); } if (m_CacheMappingStats.LocalPathsMatchingSequencesCount > 0 || m_CacheMappingStats.LocalChunkMatchingRemoteCount > 0) { ZEN_INFO("Local state : Found {} ({}) chunk sequences, {} ({}) chunks in {}", m_CacheMappingStats.LocalPathsMatchingSequencesCount, NiceBytes(m_CacheMappingStats.LocalPathsMatchingSequencesByteCount), m_CacheMappingStats.LocalChunkMatchingRemoteCount, NiceBytes(m_CacheMappingStats.LocalChunkMatchingRemoteByteCount), NiceTimeSpanMs(m_CacheMappingStats.LocalScanElapsedWallTimeUs / 1000)); } if (m_CacheMappingStats.ScavengedPathsMatchingSequencesCount > 0 || m_CacheMappingStats.ScavengedChunkMatchingRemoteCount > 0) { ZEN_INFO("Scavenge of {} paths, found {} ({}) chunk sequences, {} ({}) chunks in {}", ScavengedPathsCount, m_CacheMappingStats.ScavengedPathsMatchingSequencesCount, NiceBytes(m_CacheMappingStats.ScavengedPathsMatchingSequencesByteCount), m_CacheMappingStats.ScavengedChunkMatchingRemoteCount, NiceBytes(m_CacheMappingStats.ScavengedChunkMatchingRemoteByteCount), NiceTimeSpanMs(m_CacheMappingStats.ScavengeElapsedWallTimeUs / 1000)); } } uint64_t BytesToWrite = CalculateBytesToWriteAndFlagNeededChunks(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndexNeedsCopyFromLocalFileFlags, RemoteChunkIndexNeedsCopyFromSourceFlags); for (const ScavengedSequenceCopyOperation& ScavengeCopyOp : ScavengedSequenceCopyOperations) { BytesToWrite += ScavengeCopyOp.RawSize; } uint64_t BytesToValidate = m_Options.ValidateCompletedSequences ? BytesToWrite : 0; uint64_t TotalRequestCount = 0; uint64_t TotalPartWriteCount = 0; std::atomic WritePartsComplete = 0; tsl::robin_map RemotePathToRemoteIndex; RemotePathToRemoteIndex.reserve(m_RemoteContent.Paths.size()); for (uint32_t RemotePathIndex = 0; RemotePathIndex < m_RemoteContent.Paths.size(); RemotePathIndex++) { RemotePathToRemoteIndex.insert({m_RemoteContent.Paths[RemotePathIndex].generic_string(), RemotePathIndex}); } CheckRequiredDiskSpace(RemotePathToRemoteIndex); BlobsExistsResult ExistsResult; { ChunkBlockAnalyser BlockAnalyser( Log(), m_BlockDescriptions, ChunkBlockAnalyser::Options{.IsQuiet = m_Options.IsQuiet, .IsVerbose = m_Options.IsVerbose, .HostLatencySec = m_Storage.BuildStorageHost.LatencySec, .HostHighSpeedLatencySec = m_Storage.CacheHost.LatencySec, .HostMaxRangeCountPerRequest = m_Storage.BuildStorageHost.Caps.MaxRangeCountPerRequest, .HostHighSpeedMaxRangeCountPerRequest = m_Storage.CacheHost.Caps.MaxRangeCountPerRequest}); std::vector NeededBlocks = BlockAnalyser.GetNeeded( m_RemoteLookup.ChunkHashToChunkIndex, [&](uint32_t RemoteChunkIndex) -> bool { return RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex]; }); std::vector FetchBlockIndexes; std::vector CachedChunkBlockIndexes; ClassifyCachedAndFetchBlocks(NeededBlocks, CachedBlocksFound, TotalPartWriteCount, CachedChunkBlockIndexes, FetchBlockIndexes); std::vector NeededLooseChunkIndexes = DetermineNeededLooseChunkIndexes(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndexNeedsCopyFromLocalFileFlags, RemoteChunkIndexNeedsCopyFromSourceFlags); ExistsResult = QueryBlobCacheExists(NeededLooseChunkIndexes, FetchBlockIndexes); std::vector BlockPartialDownloadModes = DeterminePartialDownloadModes(ExistsResult); ZEN_ASSERT(BlockPartialDownloadModes.size() == m_BlockDescriptions.size()); ChunkBlockAnalyser::BlockResult PartialBlocks = BlockAnalyser.CalculatePartialBlockDownloads(NeededBlocks, BlockPartialDownloadModes); TotalRequestCount += NeededLooseChunkIndexes.size(); TotalPartWriteCount += NeededLooseChunkIndexes.size(); TotalRequestCount += PartialBlocks.BlockRanges.size(); TotalPartWriteCount += PartialBlocks.BlockRanges.size(); TotalRequestCount += PartialBlocks.FullBlockIndexes.size(); TotalPartWriteCount += PartialBlocks.FullBlockIndexes.size(); std::vector LooseChunkHashWorks = BuildLooseChunkHashWorks(NeededLooseChunkIndexes, SequenceIndexChunksLeftToWriteCounters); ZEN_TRACE_CPU("WriteChunks"); m_Progress.SetLogOperationProgress((uint32_t)TaskSteps::WriteChunks, (uint32_t)TaskSteps::StepCount); Stopwatch WriteTimer; FilteredRate FilteredDownloadedBytesPerSecond; FilteredRate FilteredWrittenBytesPerSecond; std::unique_ptr ProgressBar = m_Progress.CreateProgressBar("Writing"); ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); TotalPartWriteCount += CopyChunkDatas.size(); TotalPartWriteCount += ScavengedSequenceCopyOperations.size(); BufferedWriteFileCache WriteCache; WriteChunksContext Context{.Work = Work, .WriteCache = WriteCache, .SequenceIndexChunksLeftToWriteCounters = SequenceIndexChunksLeftToWriteCounters, .RemoteChunkIndexNeedsCopyFromSourceFlags = RemoteChunkIndexNeedsCopyFromSourceFlags, .WritePartsComplete = WritePartsComplete, .TotalPartWriteCount = TotalPartWriteCount, .TotalRequestCount = TotalRequestCount, .ExistsResult = ExistsResult, .FilteredDownloadedBytesPerSecond = FilteredDownloadedBytesPerSecond, .FilteredWrittenBytesPerSecond = FilteredWrittenBytesPerSecond}; ScheduleScavengedSequenceWrites(Context, ScavengedSequenceCopyOperations, ScavengedContents, ScavengedPaths); ScheduleLooseChunkWrites(Context, LooseChunkHashWorks); std::unique_ptr CloneQuery = m_Options.AllowFileClone ? GetCloneQueryInterface(m_CacheFolderPath) : nullptr; ScheduleLocalChunkCopies(Context, CopyChunkDatas, CloneQuery.get(), ScavengedContents, ScavengedLookups, ScavengedPaths); ScheduleCachedBlockWrites(Context, CachedChunkBlockIndexes); SchedulePartialBlockDownloads(Context, PartialBlocks); ScheduleFullBlockDownloads(Context, PartialBlocks.FullBlockIndexes); { ZEN_TRACE_CPU("WriteChunks_Wait"); Work.Wait(m_Progress.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { ZEN_UNUSED(PendingWork); uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() + m_DownloadStats.DownloadedBlockByteCount.load() + +m_DownloadStats.DownloadedPartialBlockByteCount.load(); FilteredWrittenBytesPerSecond.Update(m_DiskStats.WriteByteCount.load()); FilteredDownloadedBytesPerSecond.Update(DownloadedBytes); std::string DownloadRateString = (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) ? "" : fmt::format(" {}bits/s", NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8)); std::string CloneDetails; if (m_DiskStats.CloneCount.load() > 0) { CloneDetails = fmt::format(" ({} cloned)", NiceBytes(m_DiskStats.CloneByteCount.load())); } std::string WriteDetails = fmt::format(" {}/{} ({}B/s) written{}", NiceBytes(m_WrittenChunkByteCount.load()), NiceBytes(BytesToWrite), NiceNum(FilteredWrittenBytesPerSecond.GetCurrent()), CloneDetails); std::string Details = fmt::format("{}/{} ({}{}) downloaded.{}", m_DownloadStats.RequestsCompleteCount.load(), TotalRequestCount, NiceBytes(DownloadedBytes), DownloadRateString, WriteDetails); std::string Task; if ((m_WrittenChunkByteCount < BytesToWrite) || (BytesToValidate == 0)) { Task = "Writing chunks "; } else { Task = "Verifying chunks "; } ProgressBar->UpdateState({.Task = Task, .Details = Details, .TotalCount = (BytesToWrite + BytesToValidate), .RemainingCount = ((BytesToWrite + BytesToValidate) - (m_WrittenChunkByteCount.load() + m_ValidatedChunkByteCount.load())), .Status = ProgressBase::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); } CloneQuery.reset(); FilteredWrittenBytesPerSecond.Stop(); FilteredDownloadedBytesPerSecond.Stop(); ProgressBar->Finish(); if (m_AbortFlag) { return; } VerifyWriteChunksComplete(SequenceIndexChunksLeftToWriteCounters, BytesToWrite, BytesToValidate); const uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() + m_DownloadStats.DownloadedBlockByteCount.load() + m_DownloadStats.DownloadedPartialBlockByteCount.load(); if (!m_Options.IsQuiet) { std::string CloneDetails; if (m_DiskStats.CloneCount.load() > 0) { CloneDetails = fmt::format(" ({} cloned)", NiceBytes(m_DiskStats.CloneByteCount.load())); } ZEN_INFO("Downloaded {} ({}bits/s) in {}. Wrote {} ({}B/s){} in {}. Completed in {}", NiceBytes(DownloadedBytes), NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), DownloadedBytes * 8)), NiceTimeSpanMs(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS() / 1000), NiceBytes(m_WrittenChunkByteCount.load()), NiceNum(GetBytesPerSecond(FilteredWrittenBytesPerSecond.GetElapsedTimeUS(), m_DiskStats.WriteByteCount.load())), CloneDetails, NiceTimeSpanMs(FilteredWrittenBytesPerSecond.GetElapsedTimeUS() / 1000), NiceTimeSpanMs(WriteTimer.GetElapsedTimeMs())); } m_WriteChunkStats.WriteChunksElapsedWallTimeUs = WriteTimer.GetElapsedTimeUs(); m_WriteChunkStats.DownloadTimeUs = FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(); m_WriteChunkStats.WriteTimeUs = FilteredWrittenBytesPerSecond.GetElapsedTimeUS(); } m_Progress.SetLogOperationProgress((uint32_t)TaskSteps::PrepareTarget, (uint32_t)TaskSteps::StepCount); if (m_AbortFlag) { return; } LocalPathCategorization Categorization = CategorizeLocalPaths(RemotePathToRemoteIndex); if (m_AbortFlag) { return; } std::atomic CachedCount = 0; std::atomic CachedByteCount = 0; ScheduleLocalFileCaching(Categorization.FilesToCache, CachedCount, CachedByteCount); if (m_AbortFlag) { return; } ZEN_DEBUG( "Local state prep: Match: {}, PathMismatch: {}, HashMismatch: {}, Cached: {} ({}), Skipped: {}, " "Delete: {}", Categorization.MatchCount, Categorization.PathMismatchCount, Categorization.HashMismatchCount, CachedCount.load(), NiceBytes(CachedByteCount.load()), Categorization.SkippedCount, Categorization.DeleteCount); m_Progress.SetLogOperationProgress((uint32_t)TaskSteps::FinalizeTarget, (uint32_t)TaskSteps::StepCount); if (m_Options.WipeTargetFolder) { ZEN_TRACE_CPU("WipeTarget"); Stopwatch Timer; // Clean target folder if (!CleanDirectory(Log(), m_Progress, m_IOWorkerPool, m_AbortFlag, m_PauseFlag, m_Options.IsQuiet, m_Path, m_Options.ExcludeFolders)) { ZEN_WARN("Some files in {} could not be removed", m_Path); } m_RebuildFolderStateStats.CleanFolderElapsedWallTimeUs = Timer.GetElapsedTimeUs(); } if (m_AbortFlag) { return; } { ZEN_TRACE_CPU("FinalizeTree"); Stopwatch Timer; std::unique_ptr ProgressBar = m_Progress.CreateProgressBar("Rebuild State"); ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); OutLocalFolderState.Paths.resize(m_RemoteContent.Paths.size()); OutLocalFolderState.RawSizes.resize(m_RemoteContent.Paths.size()); OutLocalFolderState.Attributes.resize(m_RemoteContent.Paths.size()); OutLocalFolderState.ModificationTicks.resize(m_RemoteContent.Paths.size()); std::atomic DeletedCount = 0; std::atomic TargetsComplete = 0; ScheduleLocalFileRemovals(Work, Categorization.RemoveLocalPathIndexes, DeletedCount); std::vector Targets = BuildSortedFinalizeTargets(); ScheduleTargetFinalization(Work, Targets, Categorization.SequenceHashToLocalPathIndex, Categorization.RemotePathIndexToLocalPathIndex, OutLocalFolderState, TargetsComplete); { ZEN_TRACE_CPU("FinalizeTree_Wait"); Work.Wait(m_Progress.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { ZEN_UNUSED(PendingWork); const uint64_t WorkTotal = Targets.size() + Categorization.RemoveLocalPathIndexes.size(); const uint64_t WorkComplete = TargetsComplete.load() + DeletedCount.load(); std::string Details = fmt::format("{}/{} files", WorkComplete, WorkTotal); ProgressBar->UpdateState({.Task = "Rebuilding state ", .Details = Details, .TotalCount = gsl::narrow(WorkTotal), .RemainingCount = gsl::narrow(WorkTotal - WorkComplete), .Status = ProgressBase::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); } m_RebuildFolderStateStats.FinalizeTreeElapsedWallTimeUs = Timer.GetElapsedTimeUs(); ProgressBar->Finish(); } m_Progress.SetLogOperationProgress((uint32_t)TaskSteps::Cleanup, (uint32_t)TaskSteps::StepCount); } catch (const std::exception&) { m_AbortFlag = true; throw; } } void BuildsOperationUpdateFolder::ScanCacheFolder(tsl::robin_map& OutCachedChunkHashesFound, tsl::robin_map& OutCachedSequenceHashesFound) { ZEN_TRACE_CPU("ScanCacheFolder"); Stopwatch CacheTimer; DirectoryContent CacheDirContent; GetDirectoryContent(m_CacheFolderPath, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeFileSizes, CacheDirContent); for (size_t Index = 0; Index < CacheDirContent.Files.size(); Index++) { if (m_Options.EnableTargetFolderScavenging) { IoHash FileHash; if (IoHash::TryParse(CacheDirContent.Files[Index].filename().string(), FileHash)) { if (auto ChunkIt = m_RemoteLookup.ChunkHashToChunkIndex.find(FileHash); ChunkIt != m_RemoteLookup.ChunkHashToChunkIndex.end()) { const uint32_t ChunkIndex = ChunkIt->second; const uint64_t ChunkSize = m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; if (ChunkSize == CacheDirContent.FileSizes[Index]) { OutCachedChunkHashesFound.insert({FileHash, ChunkIndex}); m_CacheMappingStats.CacheChunkCount++; m_CacheMappingStats.CacheChunkByteCount += ChunkSize; continue; } } else if (auto SequenceIt = m_RemoteLookup.RawHashToSequenceIndex.find(FileHash); SequenceIt != m_RemoteLookup.RawHashToSequenceIndex.end()) { const uint32_t SequenceIndex = SequenceIt->second; const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; const uint64_t SequenceSize = m_RemoteContent.RawSizes[PathIndex]; if (SequenceSize == CacheDirContent.FileSizes[Index]) { OutCachedSequenceHashesFound.insert({FileHash, SequenceIndex}); m_CacheMappingStats.CacheSequenceHashesCount++; m_CacheMappingStats.CacheSequenceHashesByteCount += SequenceSize; const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); ZEN_ASSERT_SLOW(IsFile(CacheFilePath)); continue; } } } } std::error_code Ec = TryRemoveFile(CacheDirContent.Files[Index]); if (Ec) { ZEN_DEBUG("Failed removing file '{}', reason: ({}) {}", CacheDirContent.Files[Index], Ec.value(), Ec.message()); } } m_CacheMappingStats.CacheScanElapsedWallTimeUs += CacheTimer.GetElapsedTimeUs(); } void BuildsOperationUpdateFolder::ScanTempBlocksFolder(tsl::robin_map& OutCachedBlocksFound) { ZEN_TRACE_CPU("ScanTempBlocksFolder"); Stopwatch CacheTimer; tsl::robin_map AllBlockSizes; AllBlockSizes.reserve(m_BlockDescriptions.size()); for (uint32_t BlockIndex = 0; BlockIndex < m_BlockDescriptions.size(); BlockIndex++) { const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; AllBlockSizes.insert({BlockDescription.BlockHash, BlockIndex}); } DirectoryContent BlockDirContent; GetDirectoryContent(m_TempBlockFolderPath, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeFileSizes, BlockDirContent); OutCachedBlocksFound.reserve(BlockDirContent.Files.size()); for (size_t Index = 0; Index < BlockDirContent.Files.size(); Index++) { if (m_Options.EnableTargetFolderScavenging) { IoHash FileHash; if (IoHash::TryParse(BlockDirContent.Files[Index].filename().string(), FileHash)) { if (auto BlockIt = AllBlockSizes.find(FileHash); BlockIt != AllBlockSizes.end()) { const uint32_t BlockIndex = BlockIt->second; const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; uint64_t BlockSize = CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize; for (uint64_t ChunkSize : BlockDescription.ChunkCompressedLengths) { BlockSize += ChunkSize; } if (BlockSize == BlockDirContent.FileSizes[Index]) { OutCachedBlocksFound.insert({FileHash, BlockIndex}); m_CacheMappingStats.CacheBlockCount++; m_CacheMappingStats.CacheBlocksByteCount += BlockSize; continue; } } } } std::error_code Ec = TryRemoveFile(BlockDirContent.Files[Index]); if (Ec) { ZEN_DEBUG("Failed removing file '{}', reason: ({}) {}", BlockDirContent.Files[Index], Ec.value(), Ec.message()); } } m_CacheMappingStats.CacheScanElapsedWallTimeUs += CacheTimer.GetElapsedTimeUs(); } void BuildsOperationUpdateFolder::InitializeSequenceCounters(std::vector>& OutSequenceCounters, tsl::robin_map& OutSequencesLeftToFind, const tsl::robin_map& CachedChunkHashesFound, const tsl::robin_map& CachedSequenceHashesFound) { if (m_Options.EnableTargetFolderScavenging) { // Pick up all whole files we can use from current local state ZEN_TRACE_CPU("GetLocalSequences"); std::vector MissingSequenceIndexes = ScanTargetFolder(CachedChunkHashesFound, CachedSequenceHashesFound); for (uint32_t RemoteSequenceIndex : MissingSequenceIndexes) { // We must write the sequence const uint32_t ChunkCount = m_RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]; const IoHash& RemoteSequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; OutSequenceCounters[RemoteSequenceIndex] = ChunkCount; OutSequencesLeftToFind.insert({RemoteSequenceRawHash, RemoteSequenceIndex}); } } else { for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < m_RemoteContent.ChunkedContent.SequenceRawHashes.size(); RemoteSequenceIndex++) { OutSequenceCounters[RemoteSequenceIndex] = m_RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]; } } } void BuildsOperationUpdateFolder::MatchScavengedSequencesToRemote(std::span Contents, std::span Lookups, std::span Paths, tsl::robin_map& InOutSequencesLeftToFind, std::vector>& InOutSequenceCounters, std::vector& OutCopyOperations, uint64_t& OutScavengedPathsCount) { for (uint32_t ScavengedContentIndex = 0; ScavengedContentIndex < Contents.size() && !InOutSequencesLeftToFind.empty(); ScavengedContentIndex++) { const std::filesystem::path& ScavengePath = Paths[ScavengedContentIndex]; if (ScavengePath.empty()) { continue; } const ChunkedFolderContent& ScavengedLocalContent = Contents[ScavengedContentIndex]; const ChunkedContentLookup& ScavengedLookup = Lookups[ScavengedContentIndex]; for (uint32_t ScavengedSequenceIndex = 0; ScavengedSequenceIndex < ScavengedLocalContent.ChunkedContent.SequenceRawHashes.size(); ScavengedSequenceIndex++) { const IoHash& SequenceRawHash = ScavengedLocalContent.ChunkedContent.SequenceRawHashes[ScavengedSequenceIndex]; auto It = InOutSequencesLeftToFind.find(SequenceRawHash); if (It == InOutSequencesLeftToFind.end()) { continue; } const uint32_t RemoteSequenceIndex = It->second; const uint64_t RawSize = m_RemoteContent.RawSizes[m_RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]]; ZEN_ASSERT(RawSize > 0); const uint32_t ScavengedPathIndex = ScavengedLookup.SequenceIndexFirstPathIndex[ScavengedSequenceIndex]; ZEN_ASSERT_SLOW(IsFile((ScavengePath / ScavengedLocalContent.Paths[ScavengedPathIndex]).make_preferred())); OutCopyOperations.push_back({.ScavengedContentIndex = ScavengedContentIndex, .ScavengedPathIndex = ScavengedPathIndex, .RemoteSequenceIndex = RemoteSequenceIndex, .RawSize = RawSize}); InOutSequencesLeftToFind.erase(SequenceRawHash); InOutSequenceCounters[RemoteSequenceIndex] = 0; m_CacheMappingStats.ScavengedPathsMatchingSequencesCount++; m_CacheMappingStats.ScavengedPathsMatchingSequencesByteCount += RawSize; } OutScavengedPathsCount++; } } uint64_t BuildsOperationUpdateFolder::CalculateBytesToWriteAndFlagNeededChunks(std::span> SequenceCounters, const std::vector& NeedsCopyFromLocalFileFlags, std::span> OutNeedsCopyFromSourceFlags) { uint64_t BytesToWrite = 0; for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < m_RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++) { const uint64_t ChunkWriteCount = GetChunkWriteCount(SequenceCounters, RemoteChunkIndex); if (ChunkWriteCount > 0) { BytesToWrite += m_RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] * ChunkWriteCount; if (!NeedsCopyFromLocalFileFlags[RemoteChunkIndex]) { OutNeedsCopyFromSourceFlags[RemoteChunkIndex] = true; } } } return BytesToWrite; } void BuildsOperationUpdateFolder::ClassifyCachedAndFetchBlocks(std::span NeededBlocks, const tsl::robin_map& CachedBlocksFound, uint64_t& TotalPartWriteCount, std::vector& OutCachedChunkBlockIndexes, std::vector& OutFetchBlockIndexes) { ZEN_TRACE_CPU("BlockCacheFileExists"); for (const ChunkBlockAnalyser::NeededBlock& NeededBlock : NeededBlocks) { const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[NeededBlock.BlockIndex]; bool UsingCachedBlock = false; if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end()) { TotalPartWriteCount++; std::filesystem::path BlockPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); if (IsFile(BlockPath)) { OutCachedChunkBlockIndexes.push_back(NeededBlock.BlockIndex); UsingCachedBlock = true; } } if (!UsingCachedBlock) { OutFetchBlockIndexes.push_back(NeededBlock.BlockIndex); } } } std::vector BuildsOperationUpdateFolder::DetermineNeededLooseChunkIndexes(std::span> SequenceCounters, const std::vector& NeedsCopyFromLocalFileFlags, std::span> NeedsCopyFromSourceFlags) { std::vector NeededLooseChunkIndexes; NeededLooseChunkIndexes.reserve(m_LooseChunkHashes.size()); for (uint32_t LooseChunkIndex = 0; LooseChunkIndex < m_LooseChunkHashes.size(); LooseChunkIndex++) { const IoHash& ChunkHash = m_LooseChunkHashes[LooseChunkIndex]; auto RemoteChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); ZEN_ASSERT(RemoteChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end()); const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second; if (NeedsCopyFromLocalFileFlags[RemoteChunkIndex]) { if (m_Options.IsVerbose) { ZEN_INFO("Skipping chunk {} due to cache reuse", m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]); } continue; } bool NeedsCopy = true; if (NeedsCopyFromSourceFlags[RemoteChunkIndex].compare_exchange_strong(NeedsCopy, false)) { const uint64_t WriteCount = GetChunkWriteCount(SequenceCounters, RemoteChunkIndex); if (WriteCount == 0) { if (m_Options.IsVerbose) { ZEN_INFO("Skipping chunk {} due to cache reuse", m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]); } } else { NeededLooseChunkIndexes.push_back(LooseChunkIndex); } } } return NeededLooseChunkIndexes; } BuildsOperationUpdateFolder::BlobsExistsResult BuildsOperationUpdateFolder::QueryBlobCacheExists(std::span NeededLooseChunkIndexes, std::span FetchBlockIndexes) { BlobsExistsResult Result; if (!m_Storage.CacheStorage) { return Result; } ZEN_TRACE_CPU("BlobCacheExistCheck"); Stopwatch Timer; std::vector BlobHashes; BlobHashes.reserve(NeededLooseChunkIndexes.size() + FetchBlockIndexes.size()); for (const uint32_t LooseChunkIndex : NeededLooseChunkIndexes) { BlobHashes.push_back(m_LooseChunkHashes[LooseChunkIndex]); } for (uint32_t BlockIndex : FetchBlockIndexes) { BlobHashes.push_back(m_BlockDescriptions[BlockIndex].BlockHash); } const std::vector CacheExistsResult = m_Storage.CacheStorage->BlobsExists(m_BuildId, BlobHashes); if (CacheExistsResult.size() == BlobHashes.size()) { Result.ExistingBlobs.reserve(CacheExistsResult.size()); for (size_t BlobIndex = 0; BlobIndex < BlobHashes.size(); BlobIndex++) { if (CacheExistsResult[BlobIndex].HasBody) { Result.ExistingBlobs.insert(BlobHashes[BlobIndex]); } } } Result.ElapsedTimeMs = Timer.GetElapsedTimeMs(); if (!Result.ExistingBlobs.empty() && !m_Options.IsQuiet) { ZEN_INFO("Remote cache : Found {} out of {} needed blobs in {}", Result.ExistingBlobs.size(), BlobHashes.size(), NiceTimeSpanMs(Result.ElapsedTimeMs)); } return Result; } std::vector BuildsOperationUpdateFolder::DeterminePartialDownloadModes(const BlobsExistsResult& ExistsResult) { std::vector Modes; if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Off) { Modes.resize(m_BlockDescriptions.size(), ChunkBlockAnalyser::EPartialBlockDownloadMode::Off); return Modes; } const bool MultiRangeCache = m_Storage.CacheHost.Caps.MaxRangeCountPerRequest > 1; const bool MultiRangeBuild = m_Storage.BuildStorageHost.Caps.MaxRangeCountPerRequest > 1; ChunkBlockAnalyser::EPartialBlockDownloadMode CachePartialDownloadMode = MultiRangeCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange; ChunkBlockAnalyser::EPartialBlockDownloadMode CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::Off; switch (m_Options.PartialBlockRequestMode) { case EPartialBlockRequestMode::Off: break; case EPartialBlockRequestMode::ZenCacheOnly: CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::Off; break; case EPartialBlockRequestMode::Mixed: CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange; break; case EPartialBlockRequestMode::All: CloudPartialDownloadMode = MultiRangeBuild ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange : ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange; break; default: ZEN_ASSERT(false); break; } Modes.reserve(m_BlockDescriptions.size()); for (uint32_t BlockIndex = 0; BlockIndex < m_BlockDescriptions.size(); BlockIndex++) { const bool BlockExistInCache = ExistsResult.ExistingBlobs.contains(m_BlockDescriptions[BlockIndex].BlockHash); Modes.push_back(BlockExistInCache ? CachePartialDownloadMode : CloudPartialDownloadMode); } return Modes; } std::vector BuildsOperationUpdateFolder::BuildLooseChunkHashWorks(std::span NeededLooseChunkIndexes, std::span> SequenceCounters) { std::vector LooseChunkHashWorks; LooseChunkHashWorks.reserve(NeededLooseChunkIndexes.size()); for (uint32_t LooseChunkIndex : NeededLooseChunkIndexes) { const IoHash& ChunkHash = m_LooseChunkHashes[LooseChunkIndex]; auto RemoteChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); ZEN_ASSERT(RemoteChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end()); const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second; std::vector ChunkTargetPtrs = GetRemainingChunkTargets(SequenceCounters, RemoteChunkIndex); ZEN_ASSERT(!ChunkTargetPtrs.empty()); LooseChunkHashWorks.push_back(LooseChunkHashWorkData{.ChunkTargetPtrs = ChunkTargetPtrs, .RemoteChunkIndex = RemoteChunkIndex}); } return LooseChunkHashWorks; } void BuildsOperationUpdateFolder::VerifyWriteChunksComplete(std::span> SequenceCounters, uint64_t BytesToWrite, uint64_t BytesToValidate) { uint32_t RawSequencesMissingWriteCount = 0; for (uint32_t SequenceIndex = 0; SequenceIndex < SequenceCounters.size(); SequenceIndex++) { const auto& Counter = SequenceCounters[SequenceIndex]; if (Counter.load() != 0) { RawSequencesMissingWriteCount++; const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; const std::filesystem::path& IncompletePath = m_RemoteContent.Paths[PathIndex]; ZEN_ASSERT(!IncompletePath.empty()); const uint32_t ExpectedSequenceCount = m_RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]; if (!m_Options.IsQuiet) { ZEN_INFO("{}: Max count {}, Current count {}", IncompletePath, ExpectedSequenceCount, Counter.load()); } ZEN_ASSERT(Counter.load() <= ExpectedSequenceCount); } } ZEN_ASSERT(RawSequencesMissingWriteCount == 0); ZEN_ASSERT(m_WrittenChunkByteCount == BytesToWrite); ZEN_ASSERT(m_ValidatedChunkByteCount == BytesToValidate); } std::vector BuildsOperationUpdateFolder::BuildSortedFinalizeTargets() { std::vector Targets; Targets.reserve(m_RemoteContent.Paths.size()); for (uint32_t RemotePathIndex = 0; RemotePathIndex < m_RemoteContent.Paths.size(); RemotePathIndex++) { Targets.push_back(FinalizeTarget{.RawHash = m_RemoteContent.RawHashes[RemotePathIndex], .RemotePathIndex = RemotePathIndex}); } std::sort(Targets.begin(), Targets.end(), [](const FinalizeTarget& Lhs, const FinalizeTarget& Rhs) { return std::tie(Lhs.RawHash, Lhs.RemotePathIndex) < std::tie(Rhs.RawHash, Rhs.RemotePathIndex); }); return Targets; } void BuildsOperationUpdateFolder::ScanScavengeSources(std::span Sources, std::vector& OutContents, std::vector& OutLookups, std::vector& OutPaths) { ZEN_TRACE_CPU("ScanScavengeSources"); const size_t ScavengePathCount = Sources.size(); OutContents.resize(ScavengePathCount); OutLookups.resize(ScavengePathCount); OutPaths.resize(ScavengePathCount); std::unique_ptr ProgressBar = m_Progress.CreateProgressBar("Scavenging"); ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); std::atomic PathsFound(0); std::atomic ChunksFound(0); std::atomic PathsScavenged(0); for (size_t ScavengeIndex = 0; ScavengeIndex < ScavengePathCount; ScavengeIndex++) { Work.ScheduleWork(m_IOWorkerPool, [this, &Sources, &OutContents, &OutPaths, &OutLookups, &PathsFound, &ChunksFound, &PathsScavenged, ScavengeIndex]( std::atomic&) { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_FindScavengeContent"); const ScavengeSource& Source = Sources[ScavengeIndex]; ChunkedFolderContent& ScavengedLocalContent = OutContents[ScavengeIndex]; ChunkedContentLookup& ScavengedLookup = OutLookups[ScavengeIndex]; if (FindScavengeContent(Source, ScavengedLocalContent, ScavengedLookup)) { OutPaths[ScavengeIndex] = Source.Path; PathsFound += ScavengedLocalContent.Paths.size(); ChunksFound += ScavengedLocalContent.ChunkedContent.ChunkHashes.size(); } else { OutPaths[ScavengeIndex].clear(); } PathsScavenged++; } }); } { ZEN_TRACE_CPU("ScavengeScan_Wait"); Work.Wait(m_Progress.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { ZEN_UNUSED(PendingWork); std::string Details = fmt::format("{}/{} scanned. {} paths and {} chunks found for scavenging", PathsScavenged.load(), ScavengePathCount, PathsFound.load(), ChunksFound.load()); ProgressBar->UpdateState({.Task = "Scavenging ", .Details = Details, .TotalCount = ScavengePathCount, .RemainingCount = ScavengePathCount - PathsScavenged.load(), .Status = ProgressBase::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); } ProgressBar->Finish(); } BuildsOperationUpdateFolder::LocalPathCategorization BuildsOperationUpdateFolder::CategorizeLocalPaths(const tsl::robin_map& RemotePathToRemoteIndex) { ZEN_TRACE_CPU("PrepareTarget"); LocalPathCategorization Result; tsl::robin_set CachedRemoteSequences; Result.RemotePathIndexToLocalPathIndex.reserve(m_RemoteContent.Paths.size()); for (uint32_t LocalPathIndex = 0; LocalPathIndex < m_LocalContent.Paths.size(); LocalPathIndex++) { if (m_AbortFlag) { break; } const IoHash& RawHash = m_LocalContent.RawHashes[LocalPathIndex]; const std::filesystem::path& LocalPath = m_LocalContent.Paths[LocalPathIndex]; ZEN_ASSERT_SLOW(IsFile((m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred())); if (m_Options.EnableTargetFolderScavenging) { if (!m_Options.WipeTargetFolder) { // Check if it is already in the correct place if (auto RemotePathIt = RemotePathToRemoteIndex.find(LocalPath.generic_string()); RemotePathIt != RemotePathToRemoteIndex.end()) { const uint32_t RemotePathIndex = RemotePathIt->second; if (m_RemoteContent.RawHashes[RemotePathIndex] == RawHash) { // It is already in it's correct place Result.RemotePathIndexToLocalPathIndex[RemotePathIndex] = LocalPathIndex; Result.SequenceHashToLocalPathIndex.insert({RawHash, LocalPathIndex}); Result.MatchCount++; continue; } else { Result.HashMismatchCount++; } } else { Result.PathMismatchCount++; } } // Do we need it? if (m_RemoteLookup.RawHashToSequenceIndex.contains(RawHash)) { if (!CachedRemoteSequences.contains(RawHash)) { // We need it, make sure we move it to the cache Result.FilesToCache.push_back(LocalPathIndex); CachedRemoteSequences.insert(RawHash); continue; } else { Result.SkippedCount++; } } } if (!m_Options.WipeTargetFolder) { // Explicitly delete the unneeded local file Result.RemoveLocalPathIndexes.push_back(LocalPathIndex); Result.DeleteCount++; } } return Result; } void BuildsOperationUpdateFolder::ScheduleLocalFileCaching(std::span FilesToCache, std::atomic& OutCachedCount, std::atomic& OutCachedByteCount) { ZEN_TRACE_CPU("CopyToCache"); std::unique_ptr ProgressBar = m_Progress.CreateProgressBar("Cache Local Data"); ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (uint32_t LocalPathIndex : FilesToCache) { if (m_AbortFlag) { break; } Work.ScheduleWork(m_IOWorkerPool, [this, &OutCachedCount, &OutCachedByteCount, LocalPathIndex](std::atomic&) { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_CopyToCache"); const IoHash& RawHash = m_LocalContent.RawHashes[LocalPathIndex]; const std::filesystem::path& LocalPath = m_LocalContent.Paths[LocalPathIndex]; const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RawHash); ZEN_ASSERT_SLOW(!IsFileWithRetry(CacheFilePath)); const std::filesystem::path LocalFilePath = (m_Path / LocalPath).make_preferred(); std::error_code Ec = RenameFileWithRetry(LocalFilePath, CacheFilePath); if (Ec) { ZEN_WARN("Failed to move file from '{}' to '{}', reason: ({}) {}, retrying...", LocalFilePath, CacheFilePath, Ec.value(), Ec.message()); Ec = RenameFileWithRetry(LocalFilePath, CacheFilePath); if (Ec) { throw std::system_error(std::error_code(Ec.value(), std::system_category()), fmt::format("Failed to file from '{}' to '{}', reason: ({}) {}", LocalFilePath, CacheFilePath, Ec.value(), Ec.message())); } } OutCachedCount++; OutCachedByteCount += m_LocalContent.RawSizes[LocalPathIndex]; } }); } { ZEN_TRACE_CPU("CopyToCache_Wait"); Work.Wait(m_Progress.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { ZEN_UNUSED(PendingWork); const uint64_t WorkTotal = FilesToCache.size(); const uint64_t WorkComplete = OutCachedCount.load(); std::string Details = fmt::format("{}/{} ({}) files", WorkComplete, WorkTotal, NiceBytes(OutCachedByteCount)); ProgressBar->UpdateState({.Task = "Caching local ", .Details = Details, .TotalCount = gsl::narrow(WorkTotal), .RemainingCount = gsl::narrow(WorkTotal - WorkComplete), .Status = ProgressBase::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); } ProgressBar->Finish(); } void BuildsOperationUpdateFolder::ScheduleScavengedSequenceWrites(WriteChunksContext& Context, std::span CopyOperations, const std::vector& ScavengedContents, const std::vector& ScavengedPaths) { for (uint32_t ScavengeOpIndex = 0; ScavengeOpIndex < CopyOperations.size(); ScavengeOpIndex++) { if (m_AbortFlag) { break; } Context.Work.ScheduleWork( m_IOWorkerPool, [this, &Context, CopyOperations, &ScavengedContents, &ScavengedPaths, ScavengeOpIndex](std::atomic&) { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_WriteScavenged"); Context.FilteredWrittenBytesPerSecond.Start(); const ScavengedSequenceCopyOperation& ScavengeOp = CopyOperations[ScavengeOpIndex]; const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengeOp.ScavengedContentIndex]; const std::filesystem::path& ScavengeRootPath = ScavengedPaths[ScavengeOp.ScavengedContentIndex]; WriteScavengedSequenceToCache(ScavengeRootPath, ScavengedContent, ScavengeOp); if (Context.WritePartsComplete.fetch_add(1) + 1 == Context.TotalPartWriteCount) { Context.FilteredWrittenBytesPerSecond.Stop(); } } }); } } void BuildsOperationUpdateFolder::ScheduleLooseChunkWrites(WriteChunksContext& Context, std::vector& LooseChunkHashWorks) { for (uint32_t LooseChunkHashWorkIndex = 0; LooseChunkHashWorkIndex < LooseChunkHashWorks.size(); LooseChunkHashWorkIndex++) { if (m_AbortFlag) { break; } Context.Work.ScheduleWork( m_IOWorkerPool, [this, &Context, &LooseChunkHashWorks, LooseChunkHashWorkIndex](std::atomic&) { ZEN_TRACE_CPU("Async_ReadPreDownloadedChunk"); if (!m_AbortFlag) { LooseChunkHashWorkData& LooseChunkHashWork = LooseChunkHashWorks[LooseChunkHashWorkIndex]; const uint32_t RemoteChunkIndex = LooseChunkHashWork.RemoteChunkIndex; WriteLooseChunk(RemoteChunkIndex, Context.ExistsResult, Context.SequenceIndexChunksLeftToWriteCounters, Context.WritePartsComplete, std::move(LooseChunkHashWork.ChunkTargetPtrs), Context.WriteCache, Context.Work, Context.TotalRequestCount, Context.TotalPartWriteCount, Context.FilteredDownloadedBytesPerSecond, Context.FilteredWrittenBytesPerSecond); } }, WorkerThreadPool::EMode::EnableBacklog); } } void BuildsOperationUpdateFolder::ScheduleLocalChunkCopies(WriteChunksContext& Context, std::span CopyChunkDatas, CloneQueryInterface* CloneQuery, const std::vector& ScavengedContents, const std::vector& ScavengedLookups, const std::vector& ScavengedPaths) { for (size_t CopyDataIndex = 0; CopyDataIndex < CopyChunkDatas.size(); CopyDataIndex++) { if (m_AbortFlag) { break; } Context.Work.ScheduleWork( m_IOWorkerPool, [this, &Context, CloneQuery, CopyChunkDatas, &ScavengedContents, &ScavengedLookups, &ScavengedPaths, CopyDataIndex]( std::atomic&) { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_CopyLocal"); Context.FilteredWrittenBytesPerSecond.Start(); const CopyChunkData& CopyData = CopyChunkDatas[CopyDataIndex]; std::vector WrittenSequenceIndexes = WriteLocalChunkToCache(CloneQuery, CopyData, ScavengedContents, ScavengedLookups, ScavengedPaths, Context.WriteCache); bool WritePartsDone = Context.WritePartsComplete.fetch_add(1) + 1 == Context.TotalPartWriteCount; if (!m_AbortFlag) { if (WritePartsDone) { Context.FilteredWrittenBytesPerSecond.Stop(); } // Write tracking, updating this must be done without any files open std::vector CompletedChunkSequences; for (uint32_t RemoteSequenceIndex : WrittenSequenceIndexes) { if (CompleteSequenceChunk(RemoteSequenceIndex, Context.SequenceIndexChunksLeftToWriteCounters)) { CompletedChunkSequences.push_back(RemoteSequenceIndex); } } Context.WriteCache.Close(CompletedChunkSequences); VerifyAndCompleteChunkSequencesAsync(CompletedChunkSequences, Context.Work); } } }); } } void BuildsOperationUpdateFolder::ScheduleCachedBlockWrites(WriteChunksContext& Context, std::span CachedBlockIndexes) { for (uint32_t BlockIndex : CachedBlockIndexes) { if (m_AbortFlag) { break; } Context.Work.ScheduleWork(m_IOWorkerPool, [this, &Context, BlockIndex](std::atomic&) { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_WriteCachedBlock"); const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; Context.FilteredWrittenBytesPerSecond.Start(); std::filesystem::path BlockChunkPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); IoBuffer BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); if (!BlockBuffer) { throw std::runtime_error(fmt::format("Can not read block {} at {}", BlockDescription.BlockHash, BlockChunkPath)); } if (!m_AbortFlag) { if (!WriteChunksBlockToCache(BlockDescription, Context.SequenceIndexChunksLeftToWriteCounters, Context.Work, CompositeBuffer(std::move(BlockBuffer)), Context.RemoteChunkIndexNeedsCopyFromSourceFlags, Context.WriteCache)) { std::error_code DummyEc; RemoveFile(BlockChunkPath, DummyEc); throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash)); } std::error_code Ec = TryRemoveFile(BlockChunkPath); if (Ec) { ZEN_DEBUG("Failed removing file '{}', reason: ({}) {}", BlockChunkPath, Ec.value(), Ec.message()); } if (Context.WritePartsComplete.fetch_add(1) + 1 == Context.TotalPartWriteCount) { Context.FilteredWrittenBytesPerSecond.Stop(); } } } }); } } void BuildsOperationUpdateFolder::SchedulePartialBlockDownloads(WriteChunksContext& Context, const ChunkBlockAnalyser::BlockResult& PartialBlocks) { for (size_t BlockRangeIndex = 0; BlockRangeIndex < PartialBlocks.BlockRanges.size();) { if (m_AbortFlag) { break; } size_t RangeCount = 1; size_t RangesLeft = PartialBlocks.BlockRanges.size() - BlockRangeIndex; const ChunkBlockAnalyser::BlockRangeDescriptor& CurrentBlockRange = PartialBlocks.BlockRanges[BlockRangeIndex]; while (RangeCount < RangesLeft && CurrentBlockRange.BlockIndex == PartialBlocks.BlockRanges[BlockRangeIndex + RangeCount].BlockIndex) { RangeCount++; } Context.Work.ScheduleWork( m_NetworkPool, [this, &Context, &PartialBlocks, BlockRangeStartIndex = BlockRangeIndex, RangeCount = RangeCount](std::atomic&) { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_GetPartialBlockRanges"); Context.FilteredDownloadedBytesPerSecond.Start(); DownloadPartialBlock( PartialBlocks.BlockRanges, BlockRangeStartIndex, RangeCount, Context.ExistsResult, Context.TotalRequestCount, Context.FilteredDownloadedBytesPerSecond, [this, &Context, &PartialBlocks](IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath, size_t BlockRangeStartIndex, std::span> OffsetAndLengths) { if (!m_AbortFlag) { Context.Work.ScheduleWork( m_IOWorkerPool, [this, &Context, &PartialBlocks, BlockRangeStartIndex, BlockChunkPath = std::filesystem::path(OnDiskPath), BlockPartialBuffer = std::move(InMemoryBuffer), OffsetAndLengths = std::vector>(OffsetAndLengths.begin(), OffsetAndLengths.end())]( std::atomic&) mutable { if (!m_AbortFlag) { WritePartialBlockToCache(Context, BlockRangeStartIndex, std::move(BlockPartialBuffer), BlockChunkPath, OffsetAndLengths, PartialBlocks); } }, OnDiskPath.empty() ? WorkerThreadPool::EMode::DisableBacklog : WorkerThreadPool::EMode::EnableBacklog); } }); } }); BlockRangeIndex += RangeCount; } } void BuildsOperationUpdateFolder::WritePartialBlockToCache(WriteChunksContext& Context, size_t BlockRangeStartIndex, IoBuffer BlockPartialBuffer, const std::filesystem::path& BlockChunkPath, std::span> OffsetAndLengths, const ChunkBlockAnalyser::BlockResult& PartialBlocks) { ZEN_TRACE_CPU("Async_WritePartialBlock"); const uint32_t BlockIndex = PartialBlocks.BlockRanges[BlockRangeStartIndex].BlockIndex; const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; if (BlockChunkPath.empty()) { ZEN_ASSERT(BlockPartialBuffer); } else { ZEN_ASSERT(!BlockPartialBuffer); BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); if (!BlockPartialBuffer) { throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}", BlockDescription.BlockHash, BlockChunkPath)); } } Context.FilteredWrittenBytesPerSecond.Start(); const size_t RangeCount = OffsetAndLengths.size(); for (size_t PartialRangeIndex = 0; PartialRangeIndex < RangeCount; PartialRangeIndex++) { const std::pair& OffsetAndLength = OffsetAndLengths[PartialRangeIndex]; IoBuffer BlockRangeBuffer(BlockPartialBuffer, OffsetAndLength.first, OffsetAndLength.second); const ChunkBlockAnalyser::BlockRangeDescriptor& RangeDescriptor = PartialBlocks.BlockRanges[BlockRangeStartIndex + PartialRangeIndex]; if (!WritePartialBlockChunksToCache(BlockDescription, Context.SequenceIndexChunksLeftToWriteCounters, Context.Work, CompositeBuffer(std::move(BlockRangeBuffer)), RangeDescriptor.ChunkBlockIndexStart, RangeDescriptor.ChunkBlockIndexStart + RangeDescriptor.ChunkBlockIndexCount - 1, Context.RemoteChunkIndexNeedsCopyFromSourceFlags, Context.WriteCache)) { std::error_code DummyEc; RemoveFile(BlockChunkPath, DummyEc); throw std::runtime_error(fmt::format("Partial block {} is malformed", BlockDescription.BlockHash)); } if (Context.WritePartsComplete.fetch_add(1) + 1 == Context.TotalPartWriteCount) { Context.FilteredWrittenBytesPerSecond.Stop(); } } std::error_code Ec = TryRemoveFile(BlockChunkPath); if (Ec) { ZEN_DEBUG("Failed removing file '{}', reason: ({}) {}", BlockChunkPath, Ec.value(), Ec.message()); } } void BuildsOperationUpdateFolder::ScheduleFullBlockDownloads(WriteChunksContext& Context, std::span FullBlockIndexes) { for (uint32_t BlockIndex : FullBlockIndexes) { if (m_AbortFlag) { break; } Context.Work.ScheduleWork(m_NetworkPool, [this, &Context, BlockIndex](std::atomic&) { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_GetFullBlock"); const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; Context.FilteredDownloadedBytesPerSecond.Start(); IoBuffer BlockBuffer; const bool ExistsInCache = m_Storage.CacheStorage && Context.ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash); if (ExistsInCache) { BlockBuffer = m_Storage.CacheStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash); } if (!BlockBuffer) { try { BlockBuffer = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash); } catch (const std::exception&) { // Silence http errors due to abort if (!m_AbortFlag) { throw; } } } if (!m_AbortFlag) { if (!BlockBuffer) { throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); } uint64_t BlockSize = BlockBuffer.GetSize(); m_DownloadStats.DownloadedBlockCount++; m_DownloadStats.DownloadedBlockByteCount += BlockSize; if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == Context.TotalRequestCount) { Context.FilteredDownloadedBytesPerSecond.Stop(); } const bool PutInCache = !ExistsInCache && m_Storage.CacheStorage && m_Options.PopulateCache; std::filesystem::path BlockChunkPath = TryMoveDownloadedChunk(BlockBuffer, m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(), /* ForceDiskBased */ PutInCache || (BlockSize > m_Options.MaximumInMemoryPayloadSize)); if (PutInCache) { ZEN_ASSERT(!BlockChunkPath.empty()); IoBuffer CacheBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); if (CacheBuffer) { m_Storage.CacheStorage->PutBuildBlob(m_BuildId, BlockDescription.BlockHash, ZenContentType::kCompressedBinary, CompositeBuffer(SharedBuffer(CacheBuffer))); } } if (!m_AbortFlag) { Context.Work.ScheduleWork( m_IOWorkerPool, [this, &Context, BlockIndex, BlockChunkPath, BlockBuffer = std::move(BlockBuffer)](std::atomic&) mutable { if (!m_AbortFlag) { WriteFullBlockToCache(Context, BlockIndex, std::move(BlockBuffer), BlockChunkPath); } }, BlockChunkPath.empty() ? WorkerThreadPool::EMode::DisableBacklog : WorkerThreadPool::EMode::EnableBacklog); } } } }); } } void BuildsOperationUpdateFolder::WriteFullBlockToCache(WriteChunksContext& Context, uint32_t BlockIndex, IoBuffer BlockBuffer, const std::filesystem::path& BlockChunkPath) { ZEN_TRACE_CPU("Async_WriteFullBlock"); const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; if (BlockChunkPath.empty()) { ZEN_ASSERT(BlockBuffer); } else { ZEN_ASSERT(!BlockBuffer); BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); if (!BlockBuffer) { throw std::runtime_error(fmt::format("Could not open dowloaded block {} from {}", BlockDescription.BlockHash, BlockChunkPath)); } } Context.FilteredWrittenBytesPerSecond.Start(); if (!WriteChunksBlockToCache(BlockDescription, Context.SequenceIndexChunksLeftToWriteCounters, Context.Work, CompositeBuffer(std::move(BlockBuffer)), Context.RemoteChunkIndexNeedsCopyFromSourceFlags, Context.WriteCache)) { std::error_code DummyEc; RemoveFile(BlockChunkPath, DummyEc); throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash)); } if (!BlockChunkPath.empty()) { std::error_code Ec = TryRemoveFile(BlockChunkPath); if (Ec) { ZEN_DEBUG("Failed removing file '{}', reason: ({}) {}", BlockChunkPath, Ec.value(), Ec.message()); } } if (Context.WritePartsComplete.fetch_add(1) + 1 == Context.TotalPartWriteCount) { Context.FilteredWrittenBytesPerSecond.Stop(); } } void BuildsOperationUpdateFolder::ScheduleLocalFileRemovals(ParallelWork& Work, std::span RemoveLocalPathIndexes, std::atomic& DeletedCount) { for (uint32_t LocalPathIndex : RemoveLocalPathIndexes) { if (m_AbortFlag) { break; } Work.ScheduleWork(m_IOWorkerPool, [this, &DeletedCount, LocalPathIndex](std::atomic&) { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_RemoveFile"); const std::filesystem::path LocalFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred(); SetFileReadOnlyWithRetry(LocalFilePath, false); RemoveFileWithRetry(LocalFilePath); DeletedCount++; } }); } } void BuildsOperationUpdateFolder::ScheduleTargetFinalization( ParallelWork& Work, std::span Targets, const tsl::robin_map& SequenceHashToLocalPathIndex, const tsl::robin_map& RemotePathIndexToLocalPathIndex, FolderContent& OutLocalFolderState, std::atomic& TargetsComplete) { size_t TargetOffset = 0; while (TargetOffset < Targets.size()) { if (m_AbortFlag) { break; } size_t TargetCount = 1; while ((TargetOffset + TargetCount) < Targets.size() && (Targets[TargetOffset + TargetCount].RawHash == Targets[TargetOffset].RawHash)) { TargetCount++; } Work.ScheduleWork(m_IOWorkerPool, [this, &SequenceHashToLocalPathIndex, Targets, &RemotePathIndexToLocalPathIndex, &OutLocalFolderState, BaseTargetOffset = TargetOffset, TargetCount, &TargetsComplete](std::atomic&) { if (!m_AbortFlag) { FinalizeTargetGroup(BaseTargetOffset, TargetCount, Targets, SequenceHashToLocalPathIndex, RemotePathIndexToLocalPathIndex, OutLocalFolderState, TargetsComplete); } }); TargetOffset += TargetCount; } } void BuildsOperationUpdateFolder::FinalizeTargetGroup(size_t BaseOffset, size_t Count, std::span Targets, const tsl::robin_map& SequenceHashToLocalPathIndex, const tsl::robin_map& RemotePathIndexToLocalPathIndex, FolderContent& OutLocalFolderState, std::atomic& TargetsComplete) { ZEN_TRACE_CPU("Async_FinalizeChunkSequence"); size_t TargetOffset = BaseOffset; const IoHash& RawHash = Targets[TargetOffset].RawHash; if (RawHash == IoHash::Zero) { ZEN_TRACE_CPU("CreateEmptyFiles"); while (TargetOffset < (BaseOffset + Count)) { const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex; ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash); const std::filesystem::path& TargetPath = m_RemoteContent.Paths[RemotePathIndex]; std::filesystem::path TargetFilePath = (m_Path / TargetPath).make_preferred(); auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(RemotePathIndex); if (InPlaceIt == RemotePathIndexToLocalPathIndex.end() || InPlaceIt->second == 0) { if (IsFileWithRetry(TargetFilePath)) { SetFileReadOnlyWithRetry(TargetFilePath, false); } else { CreateDirectories(TargetFilePath.parent_path()); } BasicFile OutputFile; OutputFile.Open(TargetFilePath, BasicFile::Mode::kTruncate); } OutLocalFolderState.Paths[RemotePathIndex] = TargetPath; OutLocalFolderState.RawSizes[RemotePathIndex] = m_RemoteContent.RawSizes[RemotePathIndex]; OutLocalFolderState.Attributes[RemotePathIndex] = m_RemoteContent.Attributes.empty() ? GetNativeFileAttributes(TargetFilePath) : SetNativeFileAttributes(TargetFilePath, m_RemoteContent.Platform, m_RemoteContent.Attributes[RemotePathIndex]); OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath); TargetOffset++; TargetsComplete++; } } else { ZEN_TRACE_CPU("FinalizeFile"); ZEN_ASSERT(m_RemoteLookup.RawHashToSequenceIndex.contains(RawHash)); const uint32_t FirstRemotePathIndex = Targets[TargetOffset].RemotePathIndex; const std::filesystem::path& FirstTargetPath = m_RemoteContent.Paths[FirstRemotePathIndex]; std::filesystem::path FirstTargetFilePath = (m_Path / FirstTargetPath).make_preferred(); if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(FirstRemotePathIndex); InPlaceIt != RemotePathIndexToLocalPathIndex.end()) { ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath)); } else { if (IsFileWithRetry(FirstTargetFilePath)) { SetFileReadOnlyWithRetry(FirstTargetFilePath, false); } else { CreateDirectories(FirstTargetFilePath.parent_path()); } if (auto InplaceIt = SequenceHashToLocalPathIndex.find(RawHash); InplaceIt != SequenceHashToLocalPathIndex.end()) { ZEN_TRACE_CPU("Copy"); const uint32_t LocalPathIndex = InplaceIt->second; const std::filesystem::path& SourcePath = m_LocalContent.Paths[LocalPathIndex]; std::filesystem::path SourceFilePath = (m_Path / SourcePath).make_preferred(); ZEN_ASSERT_SLOW(IsFileWithRetry(SourceFilePath)); ZEN_DEBUG("Copying from '{}' -> '{}'", SourceFilePath, FirstTargetFilePath); const uint64_t RawSize = m_LocalContent.RawSizes[LocalPathIndex]; FastCopyFile(m_Options.AllowFileClone, m_Options.UseSparseFiles, SourceFilePath, FirstTargetFilePath, RawSize, m_DiskStats.WriteCount, m_DiskStats.WriteByteCount, m_DiskStats.CloneCount, m_DiskStats.CloneByteCount); m_RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++; } else { ZEN_TRACE_CPU("Rename"); const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RawHash); ZEN_ASSERT_SLOW(IsFileWithRetry(CacheFilePath)); std::error_code Ec = RenameFileWithRetry(CacheFilePath, FirstTargetFilePath); if (Ec) { ZEN_WARN("Failed to move file from '{}' to '{}', reason: ({}) {}, retrying...", CacheFilePath, FirstTargetFilePath, Ec.value(), Ec.message()); Ec = RenameFileWithRetry(CacheFilePath, FirstTargetFilePath); if (Ec) { throw std::system_error(std::error_code(Ec.value(), std::system_category()), fmt::format("Failed to move file from '{}' to '{}', reason: ({}) {}", CacheFilePath, FirstTargetFilePath, Ec.value(), Ec.message())); } } m_RebuildFolderStateStats.FinalizeTreeFilesMovedCount++; } } OutLocalFolderState.Paths[FirstRemotePathIndex] = FirstTargetPath; OutLocalFolderState.RawSizes[FirstRemotePathIndex] = m_RemoteContent.RawSizes[FirstRemotePathIndex]; OutLocalFolderState.Attributes[FirstRemotePathIndex] = m_RemoteContent.Attributes.empty() ? GetNativeFileAttributes(FirstTargetFilePath) : SetNativeFileAttributes(FirstTargetFilePath, m_RemoteContent.Platform, m_RemoteContent.Attributes[FirstRemotePathIndex]); OutLocalFolderState.ModificationTicks[FirstRemotePathIndex] = GetModificationTickFromPath(FirstTargetFilePath); TargetOffset++; TargetsComplete++; while (TargetOffset < (BaseOffset + Count)) { const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex; ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash); const std::filesystem::path& TargetPath = m_RemoteContent.Paths[RemotePathIndex]; std::filesystem::path TargetFilePath = (m_Path / TargetPath).make_preferred(); if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(RemotePathIndex); InPlaceIt != RemotePathIndexToLocalPathIndex.end()) { ZEN_ASSERT_SLOW(IsFileWithRetry(TargetFilePath)); } else { ZEN_TRACE_CPU("Copy"); if (IsFileWithRetry(TargetFilePath)) { SetFileReadOnlyWithRetry(TargetFilePath, false); } else { CreateDirectories(TargetFilePath.parent_path()); } ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath)); ZEN_DEBUG("Copying from '{}' -> '{}'", FirstTargetFilePath, TargetFilePath); const uint64_t RawSize = m_RemoteContent.RawSizes[RemotePathIndex]; FastCopyFile(m_Options.AllowFileClone, m_Options.UseSparseFiles, FirstTargetFilePath, TargetFilePath, RawSize, m_DiskStats.WriteCount, m_DiskStats.WriteByteCount, m_DiskStats.CloneCount, m_DiskStats.CloneByteCount); m_RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++; } OutLocalFolderState.Paths[RemotePathIndex] = TargetPath; OutLocalFolderState.RawSizes[RemotePathIndex] = m_RemoteContent.RawSizes[RemotePathIndex]; OutLocalFolderState.Attributes[RemotePathIndex] = m_RemoteContent.Attributes.empty() ? GetNativeFileAttributes(TargetFilePath) : SetNativeFileAttributes(TargetFilePath, m_RemoteContent.Platform, m_RemoteContent.Attributes[RemotePathIndex]); OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath); TargetOffset++; TargetsComplete++; } } } std::vector BuildsOperationUpdateFolder::FindScavengeSources() { ZEN_TRACE_CPU("FindScavengeSources"); const bool TargetPathExists = IsDir(m_Path); std::vector StatePaths = GetDownloadedStatePaths(m_Options.SystemRootDir); std::vector Result; for (const std::filesystem::path& EntryPath : StatePaths) { if (IsFile(EntryPath)) { bool DeleteEntry = false; try { BuildsDownloadInfo Info = ReadDownloadedInfoFile(EntryPath); const bool LocalPathExists = !Info.LocalPath.empty() && IsDir(Info.LocalPath); const bool LocalStateFileExists = IsFile(Info.StateFilePath); if (LocalPathExists && LocalStateFileExists) { if (TargetPathExists && std::filesystem::equivalent(Info.LocalPath, m_Path)) { DeleteEntry = true; } else { Result.push_back({.StateFilePath = std::move(Info.StateFilePath), .Path = std::move(Info.LocalPath)}); } } else { DeleteEntry = true; } } catch (const std::exception& Ex) { ZEN_WARN("{}", Ex.what()); DeleteEntry = true; } if (DeleteEntry) { std::error_code DummyEc; std::filesystem::remove(EntryPath, DummyEc); } } } return Result; } std::vector BuildsOperationUpdateFolder::ScanTargetFolder(const tsl::robin_map& CachedChunkHashesFound, const tsl::robin_map& CachedSequenceHashesFound) { ZEN_TRACE_CPU("ScanTargetFolder"); Stopwatch LocalTimer; std::vector MissingSequenceIndexes; for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < m_RemoteContent.ChunkedContent.SequenceRawHashes.size(); RemoteSequenceIndex++) { const IoHash& RemoteSequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; const uint32_t RemotePathIndex = GetFirstPathIndexForSeqeuenceIndex(m_RemoteLookup, RemoteSequenceIndex); const uint64_t RemoteRawSize = m_RemoteContent.RawSizes[RemotePathIndex]; if (auto CacheSequenceIt = CachedSequenceHashesFound.find(RemoteSequenceRawHash); CacheSequenceIt != CachedSequenceHashesFound.end()) { const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RemoteSequenceRawHash); ZEN_ASSERT_SLOW(IsFile(CacheFilePath)); if (m_Options.IsVerbose) { ZEN_INFO("Found sequence {} at {} ({})", RemoteSequenceRawHash, CacheFilePath, NiceBytes(RemoteRawSize)); } } else if (auto CacheChunkIt = CachedChunkHashesFound.find(RemoteSequenceRawHash); CacheChunkIt != CachedChunkHashesFound.end()) { const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RemoteSequenceRawHash); ZEN_ASSERT_SLOW(IsFile(CacheFilePath)); if (m_Options.IsVerbose) { ZEN_INFO("Found chunk {} at {} ({})", RemoteSequenceRawHash, CacheFilePath, NiceBytes(RemoteRawSize)); } } else if (auto It = m_LocalLookup.RawHashToSequenceIndex.find(RemoteSequenceRawHash); It != m_LocalLookup.RawHashToSequenceIndex.end()) { const uint32_t LocalSequenceIndex = It->second; const uint32_t LocalPathIndex = GetFirstPathIndexForSeqeuenceIndex(m_LocalLookup, LocalSequenceIndex); const std::filesystem::path LocalFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred(); ZEN_ASSERT_SLOW(IsFile(LocalFilePath)); m_CacheMappingStats.LocalPathsMatchingSequencesCount++; m_CacheMappingStats.LocalPathsMatchingSequencesByteCount += RemoteRawSize; if (m_Options.IsVerbose) { ZEN_INFO("Found sequence {} at {} ({})", RemoteSequenceRawHash, LocalFilePath, NiceBytes(RemoteRawSize)); } } else { MissingSequenceIndexes.push_back(RemoteSequenceIndex); } } m_CacheMappingStats.LocalScanElapsedWallTimeUs += LocalTimer.GetElapsedTimeUs(); return MissingSequenceIndexes; } bool BuildsOperationUpdateFolder::FindScavengeContent(const ScavengeSource& Source, ChunkedFolderContent& OutScavengedLocalContent, ChunkedContentLookup& OutScavengedLookup) { ZEN_TRACE_CPU("FindScavengeContent"); FolderContent LocalFolderState; try { BuildSaveState SavedState = ReadBuildSaveStateFile(Source.StateFilePath); if (SavedState.Version == BuildSaveState::NoVersion) { ZEN_DEBUG("Skipping old build state at '{}', state files before version {} can not be trusted during scavenge", Source.StateFilePath, BuildSaveState::kVersion1); return false; } OutScavengedLocalContent = std::move(SavedState.State.ChunkedContent); LocalFolderState = std::move(SavedState.FolderState); } catch (const std::exception& Ex) { ZEN_DEBUG("Skipping invalid build state at '{}', reason: {}", Source.StateFilePath, Ex.what()); return false; } tsl::robin_set PathIndexesToScavenge; PathIndexesToScavenge.reserve(OutScavengedLocalContent.Paths.size()); std::vector ChunkOrderOffsets = BuildChunkOrderOffset(OutScavengedLocalContent.ChunkedContent.ChunkCounts); { tsl::robin_map RawHashToPathIndex; RawHashToPathIndex.reserve(OutScavengedLocalContent.Paths.size()); for (uint32_t ScavengedPathIndex = 0; ScavengedPathIndex < OutScavengedLocalContent.RawHashes.size(); ScavengedPathIndex++) { if (!RawHashToPathIndex.contains(OutScavengedLocalContent.RawHashes[ScavengedPathIndex])) { RawHashToPathIndex.insert_or_assign(OutScavengedLocalContent.RawHashes[ScavengedPathIndex], ScavengedPathIndex); } } for (uint32_t ScavengeSequenceIndex = 0; ScavengeSequenceIndex < OutScavengedLocalContent.ChunkedContent.SequenceRawHashes.size(); ScavengeSequenceIndex++) { const IoHash& SequenceHash = OutScavengedLocalContent.ChunkedContent.SequenceRawHashes[ScavengeSequenceIndex]; if (auto It = RawHashToPathIndex.find(SequenceHash); It != RawHashToPathIndex.end()) { uint32_t PathIndex = It->second; if (!PathIndexesToScavenge.contains(PathIndex)) { if (m_RemoteLookup.RawHashToSequenceIndex.contains(SequenceHash)) { PathIndexesToScavenge.insert(PathIndex); } else { uint32_t ChunkOrderIndexStart = ChunkOrderOffsets[ScavengeSequenceIndex]; const uint32_t ChunkCount = OutScavengedLocalContent.ChunkedContent.ChunkCounts[ScavengeSequenceIndex]; for (uint32_t ChunkOrderIndex = 0; ChunkOrderIndex < ChunkCount; ChunkOrderIndex++) { const uint32_t ChunkIndex = OutScavengedLocalContent.ChunkedContent.ChunkOrders[ChunkOrderIndexStart + ChunkOrderIndex]; const IoHash& ChunkHash = OutScavengedLocalContent.ChunkedContent.ChunkHashes[ChunkIndex]; if (m_RemoteLookup.ChunkHashToChunkIndex.contains(ChunkHash)) { PathIndexesToScavenge.insert(PathIndex); break; } } } } } else { ZEN_WARN("Scavenged state file at '{}' for '{}' is invalid, skipping scavenging for sequence {}", Source.StateFilePath, Source.Path, SequenceHash); } } } if (PathIndexesToScavenge.empty()) { OutScavengedLocalContent = {}; return false; } std::vector PathsToScavenge; PathsToScavenge.reserve(PathIndexesToScavenge.size()); for (uint32_t ScavengedStatePathIndex : PathIndexesToScavenge) { PathsToScavenge.push_back(OutScavengedLocalContent.Paths[ScavengedStatePathIndex]); } FolderContent ValidFolderContent = GetValidFolderContent(m_IOWorkerPool, m_ScavengedFolderScanStats, Source.Path, PathsToScavenge, {}, 0, m_AbortFlag, m_PauseFlag); if (!LocalFolderState.AreKnownFilesEqual(ValidFolderContent)) { std::vector DeletedPaths; FolderContent UpdatedContent = GetUpdatedContent(LocalFolderState, ValidFolderContent, DeletedPaths); // If the files are modified since the state was saved we ignore the files since we don't // want to incur the cost of scanning/hashing scavenged files DeletedPaths.insert(DeletedPaths.end(), UpdatedContent.Paths.begin(), UpdatedContent.Paths.end()); if (!DeletedPaths.empty()) { OutScavengedLocalContent = DeletePathsFromChunkedContent(OutScavengedLocalContent, BuildHashLookup(OutScavengedLocalContent.ChunkedContent.SequenceRawHashes), ChunkOrderOffsets, DeletedPaths); } } if (OutScavengedLocalContent.Paths.empty()) { OutScavengedLocalContent = {}; return false; } OutScavengedLookup = BuildChunkedContentLookup(OutScavengedLocalContent); return true; } void BuildsOperationUpdateFolder::ScavengeSourceForChunks(uint32_t& InOutRemainingChunkCount, std::vector& InOutRemoteChunkIndexNeedsCopyFromLocalFileFlags, tsl::robin_map& InOutRawHashToCopyChunkDataIndex, const std::vector>& SequenceIndexChunksLeftToWriteCounters, const ChunkedFolderContent& ScavengedContent, const ChunkedContentLookup& ScavengedLookup, std::vector& InOutCopyChunkDatas, uint32_t ScavengedContentIndex, uint64_t& InOutChunkMatchingRemoteCount, uint64_t& InOutChunkMatchingRemoteByteCount) { for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < m_RemoteContent.ChunkedContent.ChunkHashes.size() && (InOutRemainingChunkCount > 0); RemoteChunkIndex++) { if (!InOutRemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) { const IoHash& RemoteChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; if (auto It = ScavengedLookup.ChunkHashToChunkIndex.find(RemoteChunkHash); It != ScavengedLookup.ChunkHashToChunkIndex.end()) { std::vector ChunkTargetPtrs = GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); if (!ChunkTargetPtrs.empty()) { const uint32_t ScavengedChunkIndex = It->second; const uint64_t ScavengedChunkRawSize = ScavengedContent.ChunkedContent.ChunkRawSizes[ScavengedChunkIndex]; const size_t ChunkSequenceLocationOffset = ScavengedLookup.ChunkSequenceLocationOffset[ScavengedChunkIndex]; const ChunkedContentLookup::ChunkSequenceLocation& ScavengeLocation = ScavengedLookup.ChunkSequenceLocations[ChunkSequenceLocationOffset]; const IoHash& ScavengedSequenceRawHash = ScavengedContent.ChunkedContent.SequenceRawHashes[ScavengeLocation.SequenceIndex]; CopyChunkData::ChunkTarget Target = {.TargetChunkLocationCount = gsl::narrow(ChunkTargetPtrs.size()), .RemoteChunkIndex = RemoteChunkIndex, .CacheFileOffset = ScavengeLocation.Offset}; if (auto CopySourceIt = InOutRawHashToCopyChunkDataIndex.find(ScavengedSequenceRawHash); CopySourceIt != InOutRawHashToCopyChunkDataIndex.end()) { CopyChunkData& Data = InOutCopyChunkDatas[CopySourceIt->second]; if (Data.TargetChunkLocationPtrs.size() > 1024) { InOutRawHashToCopyChunkDataIndex.insert_or_assign(ScavengedSequenceRawHash, InOutCopyChunkDatas.size()); InOutCopyChunkDatas.push_back(CopyChunkData{.ScavengeSourceIndex = ScavengedContentIndex, .SourceSequenceIndex = ScavengeLocation.SequenceIndex, .TargetChunkLocationPtrs = ChunkTargetPtrs, .ChunkTargets = std::vector{Target}}); } else { Data.TargetChunkLocationPtrs.insert(Data.TargetChunkLocationPtrs.end(), ChunkTargetPtrs.begin(), ChunkTargetPtrs.end()); Data.ChunkTargets.push_back(Target); } } else { InOutRawHashToCopyChunkDataIndex.insert_or_assign(ScavengedSequenceRawHash, InOutCopyChunkDatas.size()); InOutCopyChunkDatas.push_back(CopyChunkData{.ScavengeSourceIndex = ScavengedContentIndex, .SourceSequenceIndex = ScavengeLocation.SequenceIndex, .TargetChunkLocationPtrs = ChunkTargetPtrs, .ChunkTargets = std::vector{Target}}); } InOutChunkMatchingRemoteCount++; InOutChunkMatchingRemoteByteCount += ScavengedChunkRawSize; InOutRemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex] = true; InOutRemainingChunkCount--; } } } } } std::filesystem::path BuildsOperationUpdateFolder::FindDownloadedChunk(const IoHash& ChunkHash) { ZEN_TRACE_CPU("FindDownloadedChunk"); std::filesystem::path CompressedChunkPath = m_TempDownloadFolderPath / ChunkHash.ToHexString(); if (IsFile(CompressedChunkPath)) { IoBuffer ExistingCompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); if (ExistingCompressedPart) { IoHash RawHash; uint64_t RawSize; if (CompressedBuffer::ValidateCompressedHeader(ExistingCompressedPart, RawHash, RawSize, /*OutOptionalTotalCompressedSize*/ nullptr)) { return CompressedChunkPath; } else { std::error_code DummyEc; RemoveFile(CompressedChunkPath, DummyEc); } } } return {}; } std::vector BuildsOperationUpdateFolder::GetRemainingChunkTargets(std::span> SequenceIndexChunksLeftToWriteCounters, uint32_t ChunkIndex) { ZEN_TRACE_CPU("GetRemainingChunkTargets"); std::span ChunkSources = GetChunkSequenceLocations(m_RemoteLookup, ChunkIndex); std::vector ChunkTargetPtrs; if (!ChunkSources.empty()) { ChunkTargetPtrs.reserve(ChunkSources.size()); for (const ChunkedContentLookup::ChunkSequenceLocation& Source : ChunkSources) { if (SequenceIndexChunksLeftToWriteCounters[Source.SequenceIndex].load() > 0) { ChunkTargetPtrs.push_back(&Source); } } } return ChunkTargetPtrs; }; uint64_t BuildsOperationUpdateFolder::GetChunkWriteCount(std::span> SequenceIndexChunksLeftToWriteCounters, uint32_t ChunkIndex) { ZEN_TRACE_CPU("GetChunkWriteCount"); uint64_t WriteCount = 0; std::span ChunkSources = GetChunkSequenceLocations(m_RemoteLookup, ChunkIndex); for (const ChunkedContentLookup::ChunkSequenceLocation& Source : ChunkSources) { if (SequenceIndexChunksLeftToWriteCounters[Source.SequenceIndex].load() > 0) { WriteCount++; } } return WriteCount; }; void BuildsOperationUpdateFolder::CheckRequiredDiskSpace(const tsl::robin_map& RemotePathToRemoteIndex) { tsl::robin_set ExistingRemotePaths; if (m_Options.EnableTargetFolderScavenging) { for (uint32_t LocalPathIndex = 0; LocalPathIndex < m_LocalContent.Paths.size(); LocalPathIndex++) { const IoHash& RawHash = m_LocalContent.RawHashes[LocalPathIndex]; const std::filesystem::path& LocalPath = m_LocalContent.Paths[LocalPathIndex]; if (auto RemotePathIt = RemotePathToRemoteIndex.find(LocalPath.generic_string()); RemotePathIt != RemotePathToRemoteIndex.end()) { const uint32_t RemotePathIndex = RemotePathIt->second; if (m_RemoteContent.RawHashes[RemotePathIndex] == RawHash) { ExistingRemotePaths.insert(RemotePathIndex); } } } } uint64_t RequiredSpace = 0; for (uint32_t RemotePathIndex = 0; RemotePathIndex < m_RemoteContent.Paths.size(); RemotePathIndex++) { if (!ExistingRemotePaths.contains(RemotePathIndex)) { RequiredSpace += m_RemoteContent.RawSizes[RemotePathIndex]; } } std::error_code Ec; DiskSpace Space = DiskSpaceInfo(m_Path, Ec); if (Ec) { throw std::runtime_error(fmt::format("Get free disk space for target path '{}' FAILED, reason: {}", m_Path, Ec.message())); } if (Space.Free < (RequiredSpace + 16u * 1024u * 1024u)) { throw std::runtime_error( fmt::format("Not enough free space for target path '{}', {} of free space is needed but only {} is available", m_Path, NiceBytes(RequiredSpace), NiceBytes(Space.Free))); } } void BuildsOperationUpdateFolder::WriteScavengedSequenceToCache(const std::filesystem::path& ScavengeRootPath, const ChunkedFolderContent& ScavengedContent, const ScavengedSequenceCopyOperation& ScavengeOp) { ZEN_TRACE_CPU("WriteScavengedSequenceToCache"); const std::filesystem::path ScavengedPath = ScavengedContent.Paths[ScavengeOp.ScavengedPathIndex]; const std::filesystem::path ScavengedFilePath = (ScavengeRootPath / ScavengedPath).make_preferred(); ZEN_ASSERT_SLOW(FileSizeFromPath(ScavengedFilePath) == ScavengeOp.RawSize); const IoHash& RemoteSequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[ScavengeOp.RemoteSequenceIndex]; const std::filesystem::path TempFilePath = GetTempChunkedSequenceFileName(m_CacheFolderPath, RemoteSequenceRawHash); const uint64_t RawSize = ScavengedContent.RawSizes[ScavengeOp.ScavengedPathIndex]; FastCopyFile(m_Options.AllowFileClone, m_Options.UseSparseFiles, ScavengedFilePath, TempFilePath, RawSize, m_DiskStats.WriteCount, m_DiskStats.WriteByteCount, m_DiskStats.CloneCount, m_DiskStats.CloneByteCount); const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RemoteSequenceRawHash); RenameFile(TempFilePath, CacheFilePath); m_WrittenChunkByteCount += RawSize; if (m_Options.ValidateCompletedSequences) { m_ValidatedChunkByteCount += RawSize; } } void BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkIndex, const BlobsExistsResult& ExistsResult, std::span> SequenceIndexChunksLeftToWriteCounters, std::atomic& WritePartsComplete, std::vector&& ChunkTargetPtrs, BufferedWriteFileCache& WriteCache, ParallelWork& Work, uint64_t TotalRequestCount, uint64_t TotalPartWriteCount, FilteredRate& FilteredDownloadedBytesPerSecond, FilteredRate& FilteredWrittenBytesPerSecond) { const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; std::filesystem::path ExistingCompressedChunkPath = FindDownloadedChunk(ChunkHash); if (!ExistingCompressedChunkPath.empty()) { if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount) { FilteredDownloadedBytesPerSecond.Stop(); } } if (!m_AbortFlag) { if (!ExistingCompressedChunkPath.empty()) { Work.ScheduleWork( m_IOWorkerPool, [this, SequenceIndexChunksLeftToWriteCounters, &WriteCache, &Work, &WritePartsComplete, TotalPartWriteCount, &FilteredWrittenBytesPerSecond, RemoteChunkIndex, ChunkTargetPtrs = std::move(ChunkTargetPtrs), CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic& AbortFlag) { if (!AbortFlag) { ZEN_TRACE_CPU("Async_WritePreDownloadedChunk"); FilteredWrittenBytesPerSecond.Start(); const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); if (!CompressedPart) { throw std::runtime_error( fmt::format("Could not open dowloaded compressed chunk {} from {}", ChunkHash, CompressedChunkPath)); } bool NeedHashVerify = WriteCompressedChunkToCache(ChunkHash, ChunkTargetPtrs, WriteCache, std::move(CompressedPart)); bool WritePartsDone = WritePartsComplete.fetch_add(1) + 1 == TotalPartWriteCount; if (!AbortFlag) { if (WritePartsDone) { FilteredWrittenBytesPerSecond.Stop(); } std::error_code Ec = TryRemoveFile(CompressedChunkPath); if (Ec) { ZEN_DEBUG("Failed removing file '{}', reason: ({}) {}", CompressedChunkPath, Ec.value(), Ec.message()); } std::vector CompletedSequences = CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); WriteCache.Close(CompletedSequences); if (NeedHashVerify) { VerifyAndCompleteChunkSequencesAsync(CompletedSequences, Work); } else { FinalizeChunkSequences(CompletedSequences); } } } }); } else { Work.ScheduleWork(m_NetworkPool, [this, &ExistsResult, SequenceIndexChunksLeftToWriteCounters, &WriteCache, &Work, &WritePartsComplete, TotalPartWriteCount, TotalRequestCount, &FilteredDownloadedBytesPerSecond, &FilteredWrittenBytesPerSecond, RemoteChunkIndex, ChunkTargetPtrs = std::vector( std::move(ChunkTargetPtrs))](std::atomic&) mutable { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_DownloadChunk"); FilteredDownloadedBytesPerSecond.Start(); DownloadBuildBlob(RemoteChunkIndex, ExistsResult, Work, TotalRequestCount, FilteredDownloadedBytesPerSecond, [this, &ExistsResult, SequenceIndexChunksLeftToWriteCounters, &WriteCache, &Work, &WritePartsComplete, TotalPartWriteCount, RemoteChunkIndex, &FilteredWrittenBytesPerSecond, ChunkTargetPtrs = std::move(ChunkTargetPtrs)](IoBuffer&& Payload) mutable { AsyncWriteDownloadedChunk(RemoteChunkIndex, ExistsResult, std::move(ChunkTargetPtrs), WriteCache, Work, std::move(Payload), SequenceIndexChunksLeftToWriteCounters, WritePartsComplete, TotalPartWriteCount, FilteredWrittenBytesPerSecond); }); } }); } } } void BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkIndex, const BlobsExistsResult& ExistsResult, ParallelWork& Work, uint64_t TotalRequestCount, FilteredRate& FilteredDownloadedBytesPerSecond, std::function&& OnDownloaded) { const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; // FilteredDownloadedBytesPerSecond.Start(); IoBuffer BuildBlob; const bool ExistsInCache = m_Storage.CacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash); if (ExistsInCache) { BuildBlob = m_Storage.CacheStorage->GetBuildBlob(m_BuildId, ChunkHash); } if (BuildBlob) { uint64_t BlobSize = BuildBlob.GetSize(); m_DownloadStats.DownloadedChunkCount++; m_DownloadStats.DownloadedChunkByteCount += BlobSize; if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount) { FilteredDownloadedBytesPerSecond.Stop(); } OnDownloaded(std::move(BuildBlob)); } else { if (m_RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= m_Options.LargeAttachmentSize) { DownloadLargeBlob( *m_Storage.BuildStorage, m_TempDownloadFolderPath, m_BuildId, ChunkHash, m_Options.PreferredMultipartChunkSize, Work, m_NetworkPool, m_DownloadStats.DownloadedChunkByteCount, m_DownloadStats.MultipartAttachmentCount, [this, &FilteredDownloadedBytesPerSecond, TotalRequestCount, OnDownloaded = std::move(OnDownloaded)](IoBuffer&& Payload) { m_DownloadStats.DownloadedChunkCount++; if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount) { FilteredDownloadedBytesPerSecond.Stop(); } OnDownloaded(std::move(Payload)); }); } else { try { BuildBlob = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, ChunkHash); } catch (const std::exception&) { // Silence http errors due to abort if (!m_AbortFlag) { throw; } } if (!m_AbortFlag) { if (!BuildBlob) { throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash)); } if (!m_AbortFlag) { uint64_t BlobSize = BuildBlob.GetSize(); m_DownloadStats.DownloadedChunkCount++; m_DownloadStats.DownloadedChunkByteCount += BlobSize; if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount) { FilteredDownloadedBytesPerSecond.Stop(); } OnDownloaded(std::move(BuildBlob)); } } } } } void BuildsOperationUpdateFolder::DownloadPartialBlock( std::span BlockRanges, size_t BlockRangeStartIndex, size_t BlockRangeCount, const BlobsExistsResult& ExistsResult, uint64_t TotalRequestCount, FilteredRate& FilteredDownloadedBytesPerSecond, std::function> OffsetAndLengths)>&& OnDownloaded) { const uint32_t BlockIndex = BlockRanges[BlockRangeStartIndex].BlockIndex; const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; auto ProcessDownload = [this]( const ChunkBlockDescription& BlockDescription, IoBuffer&& BlockRangeBuffer, size_t BlockRangeStartIndex, std::span> BlockOffsetAndLengths, uint64_t TotalRequestCount, FilteredRate& FilteredDownloadedBytesPerSecond, const std::function> OffsetAndLengths)>& OnDownloaded) { uint64_t BlockRangeBufferSize = BlockRangeBuffer.GetSize(); m_DownloadStats.DownloadedBlockCount++; m_DownloadStats.DownloadedBlockByteCount += BlockRangeBufferSize; if (m_DownloadStats.RequestsCompleteCount.fetch_add(BlockOffsetAndLengths.size()) + BlockOffsetAndLengths.size() == TotalRequestCount) { FilteredDownloadedBytesPerSecond.Stop(); } IoHashStream RangeId; for (const std::pair& Range : BlockOffsetAndLengths) { RangeId.Append(&Range.first, sizeof(uint64_t)); RangeId.Append(&Range.second, sizeof(uint64_t)); } std::filesystem::path BlockChunkPath = TryMoveDownloadedChunk(BlockRangeBuffer, m_TempBlockFolderPath / fmt::format("{}_{}", BlockDescription.BlockHash, RangeId.GetHash()), /* ForceDiskBased */ BlockRangeBufferSize > m_Options.MaximumInMemoryPayloadSize); if (!m_AbortFlag) { OnDownloaded(std::move(BlockRangeBuffer), std::move(BlockChunkPath), BlockRangeStartIndex, BlockOffsetAndLengths); } }; std::vector> Ranges; Ranges.reserve(BlockRangeCount); for (size_t BlockRangeIndex = BlockRangeStartIndex; BlockRangeIndex < BlockRangeStartIndex + BlockRangeCount; BlockRangeIndex++) { const ChunkBlockAnalyser::BlockRangeDescriptor& BlockRange = BlockRanges[BlockRangeIndex]; Ranges.push_back(std::make_pair(BlockRange.RangeStart, BlockRange.RangeLength)); } const bool ExistsInCache = m_Storage.CacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash); size_t SubBlockRangeCount = BlockRangeCount; size_t SubRangeCountComplete = 0; std::span> RangesSpan(Ranges); while (SubRangeCountComplete < SubBlockRangeCount) { if (m_AbortFlag) { break; } // First try to get subrange from cache. // If not successful, try to get the ranges from the build store and adapt SubRangeCount... size_t SubRangeStartIndex = BlockRangeStartIndex + SubRangeCountComplete; if (ExistsInCache) { size_t SubRangeCount = Min(BlockRangeCount - SubRangeCountComplete, m_Storage.CacheHost.Caps.MaxRangeCountPerRequest); if (SubRangeCount == 1) { // Legacy single-range path, prefer that for max compatibility const std::pair SubRange = RangesSpan[SubRangeCountComplete]; IoBuffer PayloadBuffer = m_Storage.CacheStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash, SubRange.first, SubRange.second); if (m_AbortFlag) { break; } if (PayloadBuffer) { ProcessDownload(BlockDescription, std::move(PayloadBuffer), SubRangeStartIndex, std::vector>{std::make_pair(0u, SubRange.second)}, TotalRequestCount, FilteredDownloadedBytesPerSecond, OnDownloaded); SubRangeCountComplete += SubRangeCount; continue; } } else { auto SubRanges = RangesSpan.subspan(SubRangeCountComplete, SubRangeCount); BuildStorageCache::BuildBlobRanges RangeBuffers = m_Storage.CacheStorage->GetBuildBlobRanges(m_BuildId, BlockDescription.BlockHash, SubRanges); if (m_AbortFlag) { break; } if (RangeBuffers.PayloadBuffer) { if (RangeBuffers.Ranges.empty()) { SubRangeCount = Ranges.size() - SubRangeCountComplete; ProcessDownload(BlockDescription, std::move(RangeBuffers.PayloadBuffer), SubRangeStartIndex, RangesSpan.subspan(SubRangeCountComplete, SubRangeCount), TotalRequestCount, FilteredDownloadedBytesPerSecond, OnDownloaded); SubRangeCountComplete += SubRangeCount; continue; } else if (RangeBuffers.Ranges.size() == SubRangeCount) { ProcessDownload(BlockDescription, std::move(RangeBuffers.PayloadBuffer), SubRangeStartIndex, RangeBuffers.Ranges, TotalRequestCount, FilteredDownloadedBytesPerSecond, OnDownloaded); SubRangeCountComplete += SubRangeCount; continue; } } } } size_t SubRangeCount = Min(BlockRangeCount - SubRangeCountComplete, m_Storage.BuildStorageHost.Caps.MaxRangeCountPerRequest); auto SubRanges = RangesSpan.subspan(SubRangeCountComplete, SubRangeCount); BuildStorageBase::BuildBlobRanges RangeBuffers; try { RangeBuffers = m_Storage.BuildStorage->GetBuildBlobRanges(m_BuildId, BlockDescription.BlockHash, SubRanges); } catch (const std::exception&) { // Silence http errors due to abort if (!m_AbortFlag) { throw; } } if (!m_AbortFlag) { if (RangeBuffers.PayloadBuffer) { if (RangeBuffers.Ranges.empty()) { // Jupiter will ignore the ranges and send the whole payload if it fetches the payload from S3 // Upload to cache (if enabled) and use the whole payload for the remaining ranges const uint64_t Size = RangeBuffers.PayloadBuffer.GetSize(); const bool PopulateCache = !ExistsInCache && m_Storage.CacheStorage && m_Options.PopulateCache; std::filesystem::path BlockPath = TryMoveDownloadedChunk(RangeBuffers.PayloadBuffer, m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(), /* ForceDiskBased */ PopulateCache || Size > m_Options.MaximumInMemoryPayloadSize); if (!BlockPath.empty()) { RangeBuffers.PayloadBuffer = IoBufferBuilder::MakeFromFile(BlockPath); if (!RangeBuffers.PayloadBuffer) { throw std::runtime_error( fmt::format("Failed to read block {} from temporary path '{}'", BlockDescription.BlockHash, BlockPath)); } RangeBuffers.PayloadBuffer.SetDeleteOnClose(true); } if (PopulateCache) { m_Storage.CacheStorage->PutBuildBlob(m_BuildId, BlockDescription.BlockHash, ZenContentType::kCompressedBinary, CompositeBuffer(SharedBuffer(RangeBuffers.PayloadBuffer))); } if (m_AbortFlag) { break; } SubRangeCount = Ranges.size() - SubRangeCountComplete; ProcessDownload(BlockDescription, std::move(RangeBuffers.PayloadBuffer), SubRangeStartIndex, RangesSpan.subspan(SubRangeCountComplete, SubRangeCount), TotalRequestCount, FilteredDownloadedBytesPerSecond, OnDownloaded); } else { if (RangeBuffers.Ranges.size() != SubRanges.size()) { throw std::runtime_error(fmt::format("Fetching {} ranges from {} resulted in {} ranges", SubRanges.size(), BlockDescription.BlockHash, RangeBuffers.Ranges.size())); } ProcessDownload(BlockDescription, std::move(RangeBuffers.PayloadBuffer), SubRangeStartIndex, RangeBuffers.Ranges, TotalRequestCount, FilteredDownloadedBytesPerSecond, OnDownloaded); } } else { throw std::runtime_error( fmt::format("Block {} is missing when fetching {} ranges", BlockDescription.BlockHash, SubRangeCount)); } SubRangeCountComplete += SubRangeCount; } } } std::vector BuildsOperationUpdateFolder::WriteLocalChunkToCache(CloneQueryInterface* CloneQuery, const CopyChunkData& CopyData, const std::vector& ScavengedContents, const std::vector& ScavengedLookups, const std::vector& ScavengedPaths, BufferedWriteFileCache& WriteCache) { ZEN_TRACE_CPU("WriteLocalChunkToCache"); std::filesystem::path SourceFilePath; if (CopyData.ScavengeSourceIndex == (uint32_t)-1) { const uint32_t LocalPathIndex = m_LocalLookup.SequenceIndexFirstPathIndex[CopyData.SourceSequenceIndex]; SourceFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred(); } else { const ChunkedFolderContent& ScavengedContent = ScavengedContents[CopyData.ScavengeSourceIndex]; const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[CopyData.ScavengeSourceIndex]; const std::filesystem::path ScavengedPath = ScavengedPaths[CopyData.ScavengeSourceIndex]; const uint32_t ScavengedPathIndex = ScavengedLookup.SequenceIndexFirstPathIndex[CopyData.SourceSequenceIndex]; SourceFilePath = (ScavengedPath / ScavengedContent.Paths[ScavengedPathIndex]).make_preferred(); } ZEN_ASSERT_SLOW(IsFile(SourceFilePath)); ZEN_ASSERT(!CopyData.TargetChunkLocationPtrs.empty()); uint64_t CacheLocalFileBytesRead = 0; size_t TargetStart = 0; const std::span AllTargets(CopyData.TargetChunkLocationPtrs); struct WriteOp { const ChunkedContentLookup::ChunkSequenceLocation* Target = nullptr; uint64_t CacheFileOffset = (uint64_t)-1; uint32_t ChunkIndex = (uint32_t)-1; }; std::vector WriteOps; if (!m_AbortFlag) { ZEN_TRACE_CPU("Sort"); WriteOps.reserve(AllTargets.size()); for (const CopyChunkData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets) { std::span TargetRange = AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount); for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange) { WriteOps.push_back( WriteOp{.Target = Target, .CacheFileOffset = ChunkTarget.CacheFileOffset, .ChunkIndex = ChunkTarget.RemoteChunkIndex}); } TargetStart += ChunkTarget.TargetChunkLocationCount; } std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) { if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex) { return true; } else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex) { return false; } if (Lhs.Target->Offset < Rhs.Target->Offset) { return true; } return false; }); } if (!m_AbortFlag) { ZEN_TRACE_CPU("Write"); tsl::robin_set ChunkIndexesWritten; BufferedOpenFile SourceFile(SourceFilePath, m_DiskStats.OpenReadCount, m_DiskStats.CurrentOpenFileCount, m_DiskStats.ReadCount, m_DiskStats.ReadByteCount); bool CanCloneSource = CloneQuery && CloneQuery->CanClone(SourceFile.Handle()); BufferedWriteFileCache::Local LocalWriter(WriteCache); for (size_t WriteOpIndex = 0; WriteOpIndex < WriteOps.size();) { if (m_AbortFlag) { break; } const WriteOp& Op = WriteOps[WriteOpIndex]; const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex; const uint32_t RemotePathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]; const uint64_t TargetSize = m_RemoteContent.RawSizes[RemotePathIndex]; const uint64_t ChunkSize = m_RemoteContent.ChunkedContent.ChunkRawSizes[Op.ChunkIndex]; uint64_t ReadLength = ChunkSize; size_t WriteCount = 1; uint64_t OpSourceEnd = Op.CacheFileOffset + ChunkSize; uint64_t OpTargetEnd = Op.Target->Offset + ChunkSize; while ((WriteOpIndex + WriteCount) < WriteOps.size()) { const WriteOp& NextOp = WriteOps[WriteOpIndex + WriteCount]; if (NextOp.Target->SequenceIndex != Op.Target->SequenceIndex) { break; } if (NextOp.Target->Offset != OpTargetEnd) { break; } if (NextOp.CacheFileOffset != OpSourceEnd) { break; } const uint64_t NextChunkLength = m_RemoteContent.ChunkedContent.ChunkRawSizes[NextOp.ChunkIndex]; if (ReadLength + NextChunkLength > BufferedOpenFile::BlockSize) { break; } ReadLength += NextChunkLength; OpSourceEnd += NextChunkLength; OpTargetEnd += NextChunkLength; WriteCount++; } { bool DidClone = false; if (CanCloneSource) { uint64_t PreBytes = 0; uint64_t PostBytes = 0; uint64_t ClonableBytes = CloneQuery->GetClonableRange(Op.CacheFileOffset, Op.Target->Offset, ReadLength, PreBytes, PostBytes); if (ClonableBytes > 0) { // We need to open the file... BufferedWriteFileCache::Local::Writer* Writer = LocalWriter.GetWriter(RemoteSequenceIndex); if (!Writer) { Writer = LocalWriter.PutWriter(RemoteSequenceIndex, std::make_unique()); Writer->File = std::make_unique(); const std::filesystem::path FileName = GetTempChunkedSequenceFileName(m_CacheFolderPath, m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]); Writer->File->Open(FileName, BasicFile::Mode::kWrite); if (m_Options.UseSparseFiles) { PrepareFileForScatteredWrite(Writer->File->Handle(), TargetSize); } } DidClone = CloneQuery->TryClone(SourceFile.Handle(), Writer->File->Handle(), Op.CacheFileOffset + PreBytes, Op.Target->Offset + PreBytes, ClonableBytes, TargetSize); if (DidClone) { m_DiskStats.WriteCount++; m_DiskStats.WriteByteCount += ClonableBytes; m_DiskStats.CloneCount++; m_DiskStats.CloneByteCount += ClonableBytes; m_WrittenChunkByteCount += ClonableBytes; if (PreBytes > 0) { CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, PreBytes); const uint64_t FileOffset = Op.Target->Offset; WriteSequenceChunkToCache(LocalWriter, ChunkSource, RemoteSequenceIndex, FileOffset, RemotePathIndex); } if (PostBytes > 0) { CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset + ReadLength - PostBytes, PostBytes); const uint64_t FileOffset = Op.Target->Offset + ReadLength - PostBytes; WriteSequenceChunkToCache(LocalWriter, ChunkSource, RemoteSequenceIndex, FileOffset, RemotePathIndex); } } } } if (!DidClone) { CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ReadLength); const uint64_t FileOffset = Op.Target->Offset; WriteSequenceChunkToCache(LocalWriter, ChunkSource, RemoteSequenceIndex, FileOffset, RemotePathIndex); } } CacheLocalFileBytesRead += ReadLength; // TODO: This should be the sum of unique chunk sizes? WriteOpIndex += WriteCount; } } if (m_Options.IsVerbose) { ZEN_INFO("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), SourceFilePath); } std::vector Result; Result.reserve(WriteOps.size()); for (const WriteOp& Op : WriteOps) { Result.push_back(Op.Target->SequenceIndex); } return Result; } bool BuildsOperationUpdateFolder::WriteCompressedChunkToCache( const IoHash& ChunkHash, const std::vector& ChunkTargetPtrs, BufferedWriteFileCache& WriteCache, IoBuffer&& CompressedPart) { ZEN_TRACE_CPU("WriteCompressedChunkToCache"); auto ChunkHashToChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); ZEN_ASSERT(ChunkHashToChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end()); if (IsSingleFileChunk(m_RemoteContent, ChunkTargetPtrs)) { const std::uint32_t SequenceIndex = ChunkTargetPtrs.front()->SequenceIndex; const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]; StreamDecompress(SequenceRawHash, CompositeBuffer(std::move(CompressedPart))); return false; } else { IoHash RawHash; uint64_t RawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompositeBuffer(std::move(CompressedPart)), RawHash, RawSize); if (!Compressed) { throw std::runtime_error(fmt::format("Failed to parse header of compressed large blob {}", ChunkHash)); } if (RawHash != ChunkHash) { throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, ChunkHash)); } BufferedWriteFileCache::Local LocalWriter(WriteCache); IoHashStream Hash; bool CouldDecompress = Compressed.DecompressToStream( 0, (uint64_t)-1, [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { ZEN_UNUSED(SourceOffset); ZEN_TRACE_CPU("Async_StreamDecompress_Write"); m_DiskStats.ReadByteCount += SourceSize; if (!m_AbortFlag) { for (const ChunkedContentLookup::ChunkSequenceLocation* TargetPtr : ChunkTargetPtrs) { const auto& Target = *TargetPtr; const uint64_t FileOffset = Target.Offset + Offset; const uint32_t SequenceIndex = Target.SequenceIndex; const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; WriteSequenceChunkToCache(LocalWriter, RangeBuffer, SequenceIndex, FileOffset, PathIndex); } return true; } return false; }); if (m_AbortFlag) { return false; } if (!CouldDecompress) { throw std::runtime_error(fmt::format("Failed to decompress large chunk {}", ChunkHash)); } return true; } } void BuildsOperationUpdateFolder::StreamDecompress(const IoHash& SequenceRawHash, CompositeBuffer&& CompressedPart) { ZEN_TRACE_CPU("StreamDecompress"); const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash); TemporaryFile DecompressedTemp; std::error_code Ec; DecompressedTemp.CreateTemporary(TempChunkSequenceFileName.parent_path(), Ec); if (Ec) { throw std::runtime_error(fmt::format("Failed creating temporary file for decompressing large blob {}, reason: ({}) {}", SequenceRawHash, Ec.value(), Ec.message())); } IoHash RawHash; uint64_t RawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompressedPart, RawHash, RawSize); if (!Compressed) { throw std::runtime_error(fmt::format("Failed to parse header of compressed large blob {}", SequenceRawHash)); } if (RawHash != SequenceRawHash) { throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, SequenceRawHash)); } PrepareFileForScatteredWrite(DecompressedTemp.Handle(), RawSize); IoHashStream Hash; bool CouldDecompress = Compressed.DecompressToStream(0, (uint64_t)-1, [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { ZEN_UNUSED(SourceOffset); ZEN_TRACE_CPU("StreamDecompress_Write"); m_DiskStats.ReadCount++; m_DiskStats.ReadByteCount += SourceSize; if (!m_AbortFlag) { for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) { if (m_Options.ValidateCompletedSequences) { Hash.Append(Segment.GetView()); m_ValidatedChunkByteCount += Segment.GetSize(); } DecompressedTemp.Write(Segment, Offset); Offset += Segment.GetSize(); m_DiskStats.WriteByteCount += Segment.GetSize(); m_DiskStats.WriteCount++; m_WrittenChunkByteCount += Segment.GetSize(); } return true; } return false; }); if (m_AbortFlag) { return; } if (!CouldDecompress) { throw std::runtime_error(fmt::format("Failed to decompress large blob {}", SequenceRawHash)); } if (m_Options.ValidateCompletedSequences) { const IoHash VerifyHash = Hash.GetHash(); if (VerifyHash != SequenceRawHash) { throw std::runtime_error( fmt::format("Decompressed blob payload hash {} does not match expected hash {}", VerifyHash, SequenceRawHash)); } } DecompressedTemp.MoveTemporaryIntoPlace(TempChunkSequenceFileName, Ec); if (Ec) { throw std::runtime_error(fmt::format("Failed moving temporary file for decompressing large blob {}, reason: ({}) {}", SequenceRawHash, Ec.value(), Ec.message())); } // WriteChunkStats.ChunkCountWritten++; } void BuildsOperationUpdateFolder::WriteSequenceChunkToCache(BufferedWriteFileCache::Local& LocalWriter, const CompositeBuffer& Chunk, const uint32_t SequenceIndex, const uint64_t FileOffset, const uint32_t PathIndex) { ZEN_TRACE_CPU("WriteSequenceChunkToCache"); const uint64_t SequenceSize = m_RemoteContent.RawSizes[PathIndex]; auto OpenFile = [&](BasicFile& File) { const std::filesystem::path FileName = GetTempChunkedSequenceFileName(m_CacheFolderPath, m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); File.Open(FileName, BasicFile::Mode::kWrite); if (m_Options.UseSparseFiles) { PrepareFileForScatteredWrite(File.Handle(), SequenceSize); } }; const uint64_t ChunkSize = Chunk.GetSize(); ZEN_ASSERT(FileOffset + ChunkSize <= SequenceSize); if (ChunkSize == SequenceSize) { BasicFile SingleChunkFile; OpenFile(SingleChunkFile); m_DiskStats.CurrentOpenFileCount++; auto _ = MakeGuard([this]() { m_DiskStats.CurrentOpenFileCount--; }); SingleChunkFile.Write(Chunk, FileOffset); } else { const uint64_t MaxWriterBufferSize = 256u * 1025u; BufferedWriteFileCache::Local::Writer* Writer = LocalWriter.GetWriter(SequenceIndex); if (Writer) { if ((!Writer->Writer) && (ChunkSize < MaxWriterBufferSize)) { Writer->Writer = std::make_unique(*Writer->File, Min(SequenceSize, MaxWriterBufferSize)); } Writer->Write(Chunk, FileOffset); } else { Writer = LocalWriter.PutWriter(SequenceIndex, std::make_unique()); Writer->File = std::make_unique(); OpenFile(*Writer->File); if (ChunkSize < MaxWriterBufferSize) { Writer->Writer = std::make_unique(*Writer->File, Min(SequenceSize, MaxWriterBufferSize)); } Writer->Write(Chunk, FileOffset); } } m_DiskStats.WriteCount++; m_DiskStats.WriteByteCount += ChunkSize; m_WrittenChunkByteCount += ChunkSize; } bool BuildsOperationUpdateFolder::GetBlockWriteOps(const IoHash& BlockRawHash, std::span ChunkRawHashes, std::span ChunkCompressedLengths, std::span> SequenceIndexChunksLeftToWriteCounters, std::span> RemoteChunkIndexNeedsCopyFromSourceFlags, const MemoryView BlockView, uint32_t FirstIncludedBlockChunkIndex, uint32_t LastIncludedBlockChunkIndex, BlockWriteOps& OutOps) { ZEN_TRACE_CPU("GetBlockWriteOps"); uint32_t OffsetInBlock = 0; for (uint32_t ChunkBlockIndex = FirstIncludedBlockChunkIndex; ChunkBlockIndex <= LastIncludedBlockChunkIndex; ChunkBlockIndex++) { const uint32_t ChunkCompressedSize = ChunkCompressedLengths[ChunkBlockIndex]; const IoHash& ChunkHash = ChunkRawHashes[ChunkBlockIndex]; if (auto It = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); It != m_RemoteLookup.ChunkHashToChunkIndex.end()) { const uint32_t ChunkIndex = It->second; std::vector ChunkTargetPtrs = GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, ChunkIndex); if (!ChunkTargetPtrs.empty()) { bool NeedsWrite = true; if (RemoteChunkIndexNeedsCopyFromSourceFlags[ChunkIndex].compare_exchange_strong(NeedsWrite, false)) { MemoryView ChunkMemoryView = BlockView.Mid(OffsetInBlock, ChunkCompressedSize); IoHash VerifyChunkHash; uint64_t VerifyChunkSize; CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(SharedBuffer::MakeView(ChunkMemoryView), VerifyChunkHash, VerifyChunkSize); if (!CompressedChunk) { throw std::runtime_error(fmt::format("Chunk {} at {}, size {} in block {} is not a valid compressed buffer", ChunkHash, OffsetInBlock, ChunkCompressedSize, BlockRawHash)); } if (VerifyChunkHash != ChunkHash) { throw std::runtime_error(fmt::format("Chunk {} at {}, size {} in block {} has a mismatching content hash {}", ChunkHash, OffsetInBlock, ChunkCompressedSize, BlockRawHash, VerifyChunkHash)); } if (VerifyChunkSize != m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]) { throw std::runtime_error( fmt::format("Chunk {} at {}, size {} in block {} has a mismatching raw size {}, expected {}", ChunkHash, OffsetInBlock, ChunkCompressedSize, BlockRawHash, VerifyChunkSize, m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex])); } OodleCompressor ChunkCompressor; OodleCompressionLevel ChunkCompressionLevel; uint64_t ChunkBlockSize; bool GetCompressParametersSuccess = CompressedChunk.TryGetCompressParameters(ChunkCompressor, ChunkCompressionLevel, ChunkBlockSize); ZEN_ASSERT(GetCompressParametersSuccess); IoBuffer Decompressed; if (ChunkCompressionLevel == OodleCompressionLevel::None) { MemoryView ChunkDecompressedMemoryView = ChunkMemoryView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder()); Decompressed = IoBuffer(IoBuffer::Wrap, ChunkDecompressedMemoryView.GetData(), ChunkDecompressedMemoryView.GetSize()); } else { Decompressed = CompressedChunk.Decompress().AsIoBuffer(); } if (Decompressed.GetSize() != m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]) { throw std::runtime_error(fmt::format("Chunk {} at {}, size {} in block {} decompressed to size {}, expected {}", ChunkHash, OffsetInBlock, ChunkCompressedSize, BlockRawHash, Decompressed.GetSize(), m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex])); } ZEN_ASSERT_SLOW(ChunkHash == IoHash::HashBuffer(Decompressed)); for (const ChunkedContentLookup::ChunkSequenceLocation* Target : ChunkTargetPtrs) { OutOps.WriteOps.push_back( BlockWriteOps::WriteOpData{.Target = Target, .ChunkBufferIndex = OutOps.ChunkBuffers.size()}); } OutOps.ChunkBuffers.emplace_back(std::move(Decompressed)); } } } OffsetInBlock += ChunkCompressedSize; } { ZEN_TRACE_CPU("Sort"); std::sort(OutOps.WriteOps.begin(), OutOps.WriteOps.end(), [](const BlockWriteOps::WriteOpData& Lhs, const BlockWriteOps::WriteOpData& Rhs) { if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex) { return true; } if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex) { return false; } return Lhs.Target->Offset < Rhs.Target->Offset; }); } return true; } void BuildsOperationUpdateFolder::WriteBlockChunkOpsToCache(std::span> SequenceIndexChunksLeftToWriteCounters, const BlockWriteOps& Ops, BufferedWriteFileCache& WriteCache, ParallelWork& Work) { ZEN_TRACE_CPU("WriteBlockChunkOpsToCache"); { BufferedWriteFileCache::Local LocalWriter(WriteCache); for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps) { if (Work.IsAborted()) { break; } const CompositeBuffer& Chunk = Ops.ChunkBuffers[WriteOp.ChunkBufferIndex]; const uint32_t SequenceIndex = WriteOp.Target->SequenceIndex; ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() <= m_RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]); ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() > 0); const uint64_t FileOffset = WriteOp.Target->Offset; const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; WriteSequenceChunkToCache(LocalWriter, Chunk, SequenceIndex, FileOffset, PathIndex); } } if (!Work.IsAborted()) { // Write tracking, updating this must be done without any files open (BufferedWriteFileCache::Local) std::vector CompletedChunkSequences; for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps) { const uint32_t RemoteSequenceIndex = WriteOp.Target->SequenceIndex; if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) { CompletedChunkSequences.push_back(RemoteSequenceIndex); } } WriteCache.Close(CompletedChunkSequences); VerifyAndCompleteChunkSequencesAsync(CompletedChunkSequences, Work); } } bool BuildsOperationUpdateFolder::WriteChunksBlockToCache(const ChunkBlockDescription& BlockDescription, std::span> SequenceIndexChunksLeftToWriteCounters, ParallelWork& Work, CompositeBuffer&& BlockBuffer, std::span> RemoteChunkIndexNeedsCopyFromSourceFlags, BufferedWriteFileCache& WriteCache) { ZEN_TRACE_CPU("WriteChunksBlockToCache"); IoBuffer BlockMemoryBuffer = MakeBufferMemoryBased(BlockBuffer); const MemoryView BlockView = BlockMemoryBuffer.GetView(); BlockWriteOps Ops; if ((BlockDescription.HeaderSize == 0) || BlockDescription.ChunkCompressedLengths.empty()) { ZEN_TRACE_CPU("WriteChunksBlockToCache_Legacy"); uint64_t HeaderSize; const std::vector ChunkCompressedLengths = ReadChunkBlockHeader(BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder()), HeaderSize); if (GetBlockWriteOps(BlockDescription.BlockHash, BlockDescription.ChunkRawHashes, ChunkCompressedLengths, SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndexNeedsCopyFromSourceFlags, BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder() + HeaderSize), 0, gsl::narrow(BlockDescription.ChunkRawHashes.size() - 1), Ops)) { WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work); return true; } return false; } if (GetBlockWriteOps(BlockDescription.BlockHash, BlockDescription.ChunkRawHashes, BlockDescription.ChunkCompressedLengths, SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndexNeedsCopyFromSourceFlags, BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize), 0, gsl::narrow(BlockDescription.ChunkRawHashes.size() - 1), Ops)) { WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work); return true; } return false; } bool BuildsOperationUpdateFolder::WritePartialBlockChunksToCache(const ChunkBlockDescription& BlockDescription, std::span> SequenceIndexChunksLeftToWriteCounters, ParallelWork& Work, CompositeBuffer&& PartialBlockBuffer, uint32_t FirstIncludedBlockChunkIndex, uint32_t LastIncludedBlockChunkIndex, std::span> RemoteChunkIndexNeedsCopyFromSourceFlags, BufferedWriteFileCache& WriteCache) { ZEN_TRACE_CPU("WritePartialBlockChunksToCache"); IoBuffer BlockMemoryBuffer = MakeBufferMemoryBased(PartialBlockBuffer); const MemoryView BlockView = BlockMemoryBuffer.GetView(); BlockWriteOps Ops; if (GetBlockWriteOps(BlockDescription.BlockHash, BlockDescription.ChunkRawHashes, BlockDescription.ChunkCompressedLengths, SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndexNeedsCopyFromSourceFlags, BlockView, FirstIncludedBlockChunkIndex, LastIncludedBlockChunkIndex, Ops)) { WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work); return true; } else { return false; } } void BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(uint32_t RemoteChunkIndex, const BlobsExistsResult& ExistsResult, std::vector&& ChunkTargetPtrs, BufferedWriteFileCache& WriteCache, ParallelWork& Work, IoBuffer&& Payload, std::span> SequenceIndexChunksLeftToWriteCounters, std::atomic& WritePartsComplete, const uint64_t TotalPartWriteCount, FilteredRate& FilteredWrittenBytesPerSecond) { ZEN_TRACE_CPU("AsyncWriteDownloadedChunk"); const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; const uint64_t Size = Payload.GetSize(); const bool ExistsInCache = m_Storage.CacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash); const bool PopulateCache = !ExistsInCache && m_Storage.CacheStorage && m_Options.PopulateCache; std::filesystem::path CompressedChunkPath = TryMoveDownloadedChunk(Payload, m_TempDownloadFolderPath / ChunkHash.ToHexString(), /* ForceDiskBased */ PopulateCache || Size > m_Options.MaximumInMemoryPayloadSize); if (PopulateCache) { IoBuffer CacheBlob = IoBufferBuilder::MakeFromFile(CompressedChunkPath); if (CacheBlob) { m_Storage.CacheStorage->PutBuildBlob(m_BuildId, ChunkHash, ZenContentType::kCompressedBinary, CompositeBuffer(SharedBuffer(CacheBlob))); } } IoBufferFileReference FileRef; bool EnableBacklog = !CompressedChunkPath.empty() || Payload.GetFileReference(FileRef); Work.ScheduleWork( m_IOWorkerPool, [this, SequenceIndexChunksLeftToWriteCounters, &Work, CompressedChunkPath, RemoteChunkIndex, TotalPartWriteCount, &WriteCache, &WritePartsComplete, &FilteredWrittenBytesPerSecond, ChunkTargetPtrs = std::move(ChunkTargetPtrs), CompressedPart = IoBuffer(std::move(Payload))](std::atomic&) mutable { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_WriteChunk"); FilteredWrittenBytesPerSecond.Start(); const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; if (CompressedChunkPath.empty()) { ZEN_ASSERT(CompressedPart); } else { ZEN_ASSERT(!CompressedPart); CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); if (!CompressedPart) { throw std::runtime_error( fmt::format("Could not open dowloaded compressed chunk {} from {}", ChunkHash, CompressedChunkPath)); } } bool NeedHashVerify = WriteCompressedChunkToCache(ChunkHash, ChunkTargetPtrs, WriteCache, std::move(CompressedPart)); if (!m_AbortFlag) { if (WritePartsComplete.fetch_add(1) + 1 == TotalPartWriteCount) { FilteredWrittenBytesPerSecond.Stop(); } if (!CompressedChunkPath.empty()) { std::error_code Ec = TryRemoveFile(CompressedChunkPath); if (Ec) { ZEN_DEBUG("Failed removing file '{}', reason: ({}) {}", CompressedChunkPath, Ec.value(), Ec.message()); } } std::vector CompletedSequences = CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); WriteCache.Close(CompletedSequences); if (NeedHashVerify) { VerifyAndCompleteChunkSequencesAsync(CompletedSequences, Work); } else { FinalizeChunkSequences(CompletedSequences); } } } }, EnableBacklog ? WorkerThreadPool::EMode::EnableBacklog : WorkerThreadPool::EMode::DisableBacklog); } void BuildsOperationUpdateFolder::VerifyAndCompleteChunkSequencesAsync(std::span RemoteSequenceIndexes, ParallelWork& Work) { if (RemoteSequenceIndexes.empty()) { return; } ZEN_TRACE_CPU("VerifyAndCompleteChunkSequence"); if (m_Options.ValidateCompletedSequences) { for (uint32_t RemoteSequenceIndexOffset = 1; RemoteSequenceIndexOffset < RemoteSequenceIndexes.size(); RemoteSequenceIndexOffset++) { const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset]; Work.ScheduleWork(m_IOWorkerPool, [this, RemoteSequenceIndex](std::atomic&) { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_VerifyAndFinalizeSequence"); VerifySequence(RemoteSequenceIndex); if (!m_AbortFlag) { const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; FinalizeChunkSequence(SequenceRawHash); } } }); } const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[0]; VerifySequence(RemoteSequenceIndex); const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; FinalizeChunkSequence(SequenceRawHash); } else { for (uint32_t RemoteSequenceIndexOffset = 0; RemoteSequenceIndexOffset < RemoteSequenceIndexes.size(); RemoteSequenceIndexOffset++) { const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset]; const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; FinalizeChunkSequence(SequenceRawHash); } } } bool BuildsOperationUpdateFolder::CompleteSequenceChunk(uint32_t RemoteSequenceIndex, std::span> SequenceIndexChunksLeftToWriteCounters) { uint32_t PreviousValue = SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1); ZEN_ASSERT(PreviousValue >= 1); ZEN_ASSERT(PreviousValue != (uint32_t)-1); return PreviousValue == 1; } std::vector BuildsOperationUpdateFolder::CompleteChunkTargets(const std::vector& ChunkTargetPtrs, std::span> SequenceIndexChunksLeftToWriteCounters) { ZEN_TRACE_CPU("CompleteChunkTargets"); std::vector CompletedSequenceIndexes; for (const ChunkedContentLookup::ChunkSequenceLocation* Location : ChunkTargetPtrs) { const uint32_t RemoteSequenceIndex = Location->SequenceIndex; if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) { CompletedSequenceIndexes.push_back(RemoteSequenceIndex); } } return CompletedSequenceIndexes; } void BuildsOperationUpdateFolder::FinalizeChunkSequence(const IoHash& SequenceRawHash) { ZEN_TRACE_CPU("FinalizeChunkSequence"); ZEN_ASSERT_SLOW(!IsFile(GetFinalChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash))); std::error_code Ec; RenameFile(GetTempChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash), GetFinalChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash), Ec); if (Ec) { throw std::system_error(Ec); } } void BuildsOperationUpdateFolder::FinalizeChunkSequences(std::span RemoteSequenceIndexes) { ZEN_TRACE_CPU("FinalizeChunkSequences"); for (uint32_t SequenceIndex : RemoteSequenceIndexes) { FinalizeChunkSequence(m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); } } void BuildsOperationUpdateFolder::VerifySequence(uint32_t RemoteSequenceIndex) { ZEN_TRACE_CPU("VerifySequence"); ZEN_ASSERT(m_Options.ValidateCompletedSequences); const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; { ZEN_TRACE_CPU("HashSequence"); const std::uint32_t RemotePathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]; const uint64_t ExpectedSize = m_RemoteContent.RawSizes[RemotePathIndex]; IoBuffer VerifyBuffer = IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash)); const uint64_t VerifySize = VerifyBuffer.GetSize(); if (VerifySize != ExpectedSize) { throw std::runtime_error(fmt::format("Written chunk sequence {} size {} does not match expected size {}", SequenceRawHash, VerifySize, ExpectedSize)); } const IoHash VerifyChunkHash = IoHash::HashBuffer(std::move(VerifyBuffer), &m_ValidatedChunkByteCount); if (VerifyChunkHash != SequenceRawHash) { throw std::runtime_error( fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash)); } } } void VerifyFolder(ProgressBase& Progress, std::atomic& AbortFlag, std::atomic& PauseFlag, TransferThreadWorkers& Workers, const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, const std::filesystem::path& Path, const std::vector& ExcludeFolders, bool VerifyFileHash, VerifyFolderStatistics& VerifyFolderStats) { ZEN_TRACE_CPU("VerifyFolder"); Stopwatch Timer; std::unique_ptr ProgressBar = Progress.CreateProgressBar("Verify Files"); WorkerThreadPool& VerifyPool = Workers.GetIOWorkerPool(); ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); const uint32_t PathCount = gsl::narrow(Content.Paths.size()); RwLock ErrorLock; std::vector Errors; auto IsAcceptedFolder = [ExcludeFolders = ExcludeFolders](const std::string_view& RelativePath) -> bool { for (const std::string& ExcludeFolder : ExcludeFolders) { if (StrCaseStartsWith(RelativePath, ExcludeFolder)) { if (RelativePath.length() == ExcludeFolder.length()) { return false; } else if (RelativePath[ExcludeFolder.length()] == '/') { return false; } } } return true; }; for (uint32_t PathIndex = 0; PathIndex < PathCount; PathIndex++) { if (Work.IsAborted()) { break; } Work.ScheduleWork( VerifyPool, [&Path, &Content, &Lookup, &ErrorLock, &Errors, &VerifyFolderStats, VerifyFileHash, &IsAcceptedFolder, PathIndex, &AbortFlag]( std::atomic&) { if (!AbortFlag) { ZEN_TRACE_CPU("VerifyFile_work"); // TODO: Convert ScheduleWork body to function const std::filesystem::path TargetPath = (Path / Content.Paths[PathIndex]).make_preferred(); if (IsAcceptedFolder(TargetPath.parent_path().generic_string())) { const uint64_t ExpectedSize = Content.RawSizes[PathIndex]; if (!IsFile(TargetPath)) { ErrorLock.WithExclusiveLock([&]() { Errors.push_back(fmt::format("File {} with expected size {} does not exist", TargetPath, ExpectedSize)); }); VerifyFolderStats.FilesFailed++; } else { std::error_code Ec; uint64_t SizeOnDisk = gsl::narrow(FileSizeFromPath(TargetPath, Ec)); if (Ec) { ErrorLock.WithExclusiveLock([&]() { Errors.push_back( fmt::format("Failed to get size of file {}: {} ({})", TargetPath, Ec.message(), Ec.value())); }); VerifyFolderStats.FilesFailed++; } else if (SizeOnDisk < ExpectedSize) { ErrorLock.WithExclusiveLock([&]() { Errors.push_back(fmt::format("Size of file {} is smaller than expected. Expected: {}, Found: {}", TargetPath, ExpectedSize, SizeOnDisk)); }); VerifyFolderStats.FilesFailed++; } else if (SizeOnDisk > ExpectedSize) { ErrorLock.WithExclusiveLock([&]() { Errors.push_back(fmt::format("Size of file {} is bigger than expected. Expected: {}, Found: {}", TargetPath, ExpectedSize, SizeOnDisk)); }); VerifyFolderStats.FilesFailed++; } else if (SizeOnDisk > 0 && VerifyFileHash) { const IoHash& ExpectedRawHash = Content.RawHashes[PathIndex]; IoBuffer Buffer = IoBufferBuilder::MakeFromFile(TargetPath); IoHash RawHash = IoHash::HashBuffer(Buffer); if (RawHash != ExpectedRawHash) { uint64_t FileOffset = 0; const uint32_t SequenceIndex = Lookup.RawHashToSequenceIndex.at(ExpectedRawHash); const uint32_t OrderOffset = Lookup.SequenceIndexChunkOrderOffset[SequenceIndex]; for (uint32_t OrderIndex = OrderOffset; OrderIndex < OrderOffset + Content.ChunkedContent.ChunkCounts[SequenceIndex]; OrderIndex++) { uint32_t ChunkIndex = Content.ChunkedContent.ChunkOrders[OrderIndex]; uint64_t ChunkSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; IoHash ChunkHash = Content.ChunkedContent.ChunkHashes[ChunkIndex]; IoBuffer FileChunk = IoBuffer(Buffer, FileOffset, ChunkSize); if (IoHash::HashBuffer(FileChunk) != ChunkHash) { ErrorLock.WithExclusiveLock([&]() { Errors.push_back(fmt::format( "WARNING: Hash of file {} does not match expected hash. Expected: {}, Found: {}. " "Mismatch at chunk {}", TargetPath, ExpectedRawHash, RawHash, OrderIndex - OrderOffset)); }); break; } FileOffset += ChunkSize; } VerifyFolderStats.FilesFailed++; } VerifyFolderStats.ReadBytes += SizeOnDisk; } } } VerifyFolderStats.FilesVerified++; } }, [&, PathIndex](std::exception_ptr Ex, std::atomic&) { std::string Description; try { std::rethrow_exception(Ex); } catch (const std::exception& Ex) { Description = Ex.what(); } ErrorLock.WithExclusiveLock([&]() { Errors.push_back(fmt::format("Failed verifying file '{}'. Reason: {}", (Path / Content.Paths[PathIndex]).make_preferred(), Description)); }); VerifyFolderStats.FilesFailed++; }); } Work.Wait(Progress.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { ZEN_UNUSED(PendingWork); std::string Details = fmt::format("Verified {}/{} ({}). Failed files: {}", VerifyFolderStats.FilesVerified.load(), PathCount, NiceBytes(VerifyFolderStats.ReadBytes.load()), VerifyFolderStats.FilesFailed.load()); ProgressBar->UpdateState({.Task = "Verifying files ", .Details = Details, .TotalCount = gsl::narrow(PathCount), .RemainingCount = gsl::narrow(PathCount - VerifyFolderStats.FilesVerified.load()), .Status = ProgressBase::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); VerifyFolderStats.VerifyElapsedWallTimeUs = Timer.GetElapsedTimeUs(); ProgressBar->Finish(); if (AbortFlag) { return; } for (const std::string& Error : Errors) { ZEN_CONSOLE_ERROR("{}", Error); } if (!Errors.empty()) { throw std::runtime_error(fmt::format("Verify failed with {} errors", Errors.size())); } } std::vector GetNewPaths(const std::span KnownPaths, const std::span Paths) { tsl::robin_set KnownPathsSet; KnownPathsSet.reserve(KnownPaths.size()); for (const std::filesystem::path& LocalPath : KnownPaths) { KnownPathsSet.insert(LocalPath.generic_string()); } std::vector NewPaths; for (const std::filesystem::path& UntrackedPath : Paths) { if (!KnownPathsSet.contains(UntrackedPath.generic_string())) { NewPaths.push_back(UntrackedPath); } } return NewPaths; } BuildSaveState GetLocalStateFromPaths(ProgressBase& Progress, std::atomic& AbortFlag, std::atomic& PauseFlag, TransferThreadWorkers& Workers, GetFolderContentStatistics& LocalFolderScanStats, ChunkingStatistics& ChunkingStats, const std::filesystem::path& Path, ChunkingController& ChunkController, ChunkingCache& ChunkCache, std::span PathsToCheck) { FolderContent FolderState = CheckFolderFiles(Progress, AbortFlag, PauseFlag, "Check Files", Workers, LocalFolderScanStats, Path, PathsToCheck); ChunkedFolderContent ChunkedContent; if (FolderState.Paths.size() > 0) { ChunkedContent = ScanFolderFiles(Progress, AbortFlag, PauseFlag, "Scan Files", Workers, Path, FolderState, ChunkController, ChunkCache, ChunkingStats); } return BuildSaveState{.State = BuildState{.ChunkedContent = std::move(ChunkedContent)}, .FolderState = FolderState, .LocalPath = Path}; } BuildSaveState GetLocalContent(ProgressBase& Progress, std::atomic& AbortFlag, std::atomic& PauseFlag, bool IsQuiet, TransferThreadWorkers& Workers, GetFolderContentStatistics& LocalFolderScanStats, ChunkingStatistics& ChunkingStats, const std::filesystem::path& Path, const std::filesystem::path& StateFilePath, ChunkingController& ChunkController, ChunkingCache& ChunkCache) { Stopwatch ReadStateTimer; bool FileExists = IsFile(StateFilePath); if (!FileExists) { ZEN_CONSOLE("No known local state file in {}, falling back to scanning", Path); return {}; } BuildSaveState SavedLocalState; try { SavedLocalState = ReadBuildSaveStateFile(StateFilePath); if (!IsQuiet) { ZEN_CONSOLE("Read local state file {} in {}", StateFilePath, NiceTimeSpanMs(ReadStateTimer.GetElapsedTimeMs())); } } catch (const std::exception& Ex) { ZEN_CONSOLE_WARN("Failed reading state file {}, falling back to scannning. Reason: {}", StateFilePath, Ex.what()); return {}; } FolderContent CurrentLocalFolderState = CheckFolderFiles(Progress, AbortFlag, PauseFlag, "Check Known Files", Workers, LocalFolderScanStats, Path, SavedLocalState.FolderState.Paths); if (AbortFlag) { return {}; } if (!SavedLocalState.FolderState.AreKnownFilesEqual(CurrentLocalFolderState)) { const size_t LocalStatePathCount = SavedLocalState.FolderState.Paths.size(); std::vector DeletedPaths; FolderContent UpdatedContent = GetUpdatedContent(SavedLocalState.FolderState, CurrentLocalFolderState, DeletedPaths); if (!DeletedPaths.empty()) { SavedLocalState.State.ChunkedContent = DeletePathsFromChunkedContent(SavedLocalState.State.ChunkedContent, DeletedPaths); } if (!IsQuiet) { ZEN_CONSOLE("Updating state, {} local files deleted and {} local files updated out of {}", DeletedPaths.size(), UpdatedContent.Paths.size(), LocalStatePathCount); } if (UpdatedContent.Paths.size() > 0) { ChunkedFolderContent UpdatedLocalContent = ScanFolderFiles(Progress, AbortFlag, PauseFlag, "Scan Known Files", Workers, Path, UpdatedContent, ChunkController, ChunkCache, ChunkingStats); if (AbortFlag) { return {}; } SavedLocalState.State.ChunkedContent = MergeChunkedFolderContents(SavedLocalState.State.ChunkedContent, {{UpdatedLocalContent}}); } } else { // Remove files from LocalContent no longer in LocalFolderState tsl::robin_set LocalFolderPaths; LocalFolderPaths.reserve(SavedLocalState.FolderState.Paths.size()); for (const std::filesystem::path& LocalFolderPath : SavedLocalState.FolderState.Paths) { LocalFolderPaths.insert(LocalFolderPath.generic_string()); } std::vector DeletedPaths; for (const std::filesystem::path& LocalContentPath : SavedLocalState.State.ChunkedContent.Paths) { if (!LocalFolderPaths.contains(LocalContentPath.generic_string())) { DeletedPaths.push_back(LocalContentPath); } } if (!DeletedPaths.empty()) { SavedLocalState.State.ChunkedContent = DeletePathsFromChunkedContent(SavedLocalState.State.ChunkedContent, DeletedPaths); } } SavedLocalState.FolderState = CurrentLocalFolderState; return SavedLocalState; } void DownloadFolder(LoggerRef InLog, ProgressBase& Progress, TransferThreadWorkers& Workers, StorageInstance& Storage, std::atomic& AbortFlag, std::atomic& PauseFlag, const BuildStorageCache::Statistics& StorageCacheStats, const Oid& BuildId, const std::vector& BuildPartIds, std::span BuildPartNames, const std::filesystem::path& DownloadSpecPath, const std::filesystem::path& Path, const DownloadOptions& Options) { ZEN_TRACE_CPU("DownloadFolder"); ZEN_SCOPED_LOG(InLog); Progress.SetLogOperationName("Download Folder"); enum TaskSteps : uint32_t { CheckState, CompareState, Download, Verify, Cleanup, StepCount }; auto EndProgress = MakeGuard([&]() { Progress.SetLogOperationProgress(TaskSteps::StepCount, TaskSteps::StepCount); }); Stopwatch DownloadTimer; Progress.SetLogOperationProgress(TaskSteps::CheckState, TaskSteps::StepCount); const std::filesystem::path ZenTempFolder = ZenTempFolderPath(Options.ZenFolderPath); CreateDirectories(ZenTempFolder); std::uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u; CbObject BuildObject = GetBuild(*Storage.BuildStorage, BuildId, Options.IsQuiet); std::vector> AllBuildParts = ResolveBuildPartNames(BuildObject, BuildId, BuildPartIds, BuildPartNames, PreferredMultipartChunkSize); BuildManifest Manifest; if (!DownloadSpecPath.empty()) { const std::filesystem::path AbsoluteDownloadSpecPath = DownloadSpecPath.is_relative() ? MakeSafeAbsolutePath(Path / DownloadSpecPath) : MakeSafeAbsolutePath(DownloadSpecPath); Manifest = ParseBuildManifest(AbsoluteDownloadSpecPath); } std::vector PartContents; std::unique_ptr ChunkController; std::vector BlockDescriptions; std::vector LooseChunkHashes; Progress.SetLogOperationProgress(TaskSteps::CompareState, TaskSteps::StepCount); ChunkedFolderContent RemoteContent = GetRemoteContent(InLog, Storage, BuildId, AllBuildParts, Manifest, Options.IncludeWildcards, Options.ExcludeWildcards, ChunkController, PartContents, BlockDescriptions, LooseChunkHashes, Options.IsQuiet, Options.IsVerbose, Options.DoExtraContentVerify); const std::uint64_t LargeAttachmentSize = Options.AllowMultiparts ? PreferredMultipartChunkSize * 4u : (std::uint64_t)-1; GetFolderContentStatistics LocalFolderScanStats; ChunkingStatistics ChunkingStats; BuildSaveState LocalState; if (IsDir(Path)) { if (!ChunkController && !Options.IsQuiet) { ZEN_CONSOLE_INFO("Unspecified chunking algorithm, using default"); ChunkController = CreateStandardChunkingController(StandardChunkingControllerSettings{}); } std::unique_ptr ChunkCache(CreateNullChunkingCache()); LocalState = GetLocalContent(Progress, AbortFlag, PauseFlag, Options.IsQuiet, Workers, LocalFolderScanStats, ChunkingStats, Path, ZenStateFilePath(Path / ZenFolderName), *ChunkController, *ChunkCache); std::vector UntrackedPaths = GetNewPaths(LocalState.State.ChunkedContent.Paths, RemoteContent.Paths); BuildSaveState UntrackedLocalContent = GetLocalStateFromPaths(Progress, AbortFlag, PauseFlag, Workers, LocalFolderScanStats, ChunkingStats, Path, *ChunkController, *ChunkCache, UntrackedPaths); if (!UntrackedLocalContent.State.ChunkedContent.Paths.empty()) { LocalState.State.ChunkedContent = MergeChunkedFolderContents(LocalState.State.ChunkedContent, std::vector{UntrackedLocalContent.State.ChunkedContent}); // TODO: Helper LocalState.FolderState.Paths.insert(LocalState.FolderState.Paths.begin(), UntrackedLocalContent.FolderState.Paths.begin(), UntrackedLocalContent.FolderState.Paths.end()); LocalState.FolderState.RawSizes.insert(LocalState.FolderState.RawSizes.begin(), UntrackedLocalContent.FolderState.RawSizes.begin(), UntrackedLocalContent.FolderState.RawSizes.end()); LocalState.FolderState.Attributes.insert(LocalState.FolderState.Attributes.begin(), UntrackedLocalContent.FolderState.Attributes.begin(), UntrackedLocalContent.FolderState.Attributes.end()); LocalState.FolderState.ModificationTicks.insert(LocalState.FolderState.ModificationTicks.begin(), UntrackedLocalContent.FolderState.ModificationTicks.begin(), UntrackedLocalContent.FolderState.ModificationTicks.end()); } if (Options.AppendNewContent) { RemoteContent = ApplyChunkedContentOverlay(LocalState.State.ChunkedContent, RemoteContent, Options.IncludeWildcards, Options.ExcludeWildcards); } #if ZEN_BUILD_DEBUG ValidateChunkedFolderContent(RemoteContent, BlockDescriptions, LooseChunkHashes, Options.IncludeWildcards, Options.ExcludeWildcards); #endif // ZEN_BUILD_DEBUG } else { CreateDirectories(Path); } if (AbortFlag) { return; } LocalState.LocalPath = Path; { BuildsSelection::Build RemoteBuildState = {.Id = BuildId, .IncludeWildcards = Options.IncludeWildcards, .ExcludeWildcards = Options.ExcludeWildcards}; RemoteBuildState.Parts.reserve(BuildPartIds.size()); for (size_t PartIndex = 0; PartIndex < BuildPartIds.size(); PartIndex++) { RemoteBuildState.Parts.push_back( {BuildsSelection::BuildPart{.Id = BuildPartIds[PartIndex], .Name = PartIndex < BuildPartNames.size() ? BuildPartNames[PartIndex] : ""}}); } if (Options.AppendNewContent) { LocalState.State.Selection.Builds.emplace_back(std::move(RemoteBuildState)); } else { LocalState.State.Selection.Builds = std::vector{std::move(RemoteBuildState)}; } } if ((Options.EnableTargetFolderScavenging || Options.AppendNewContent) && !Options.CleanTargetFolder && CompareChunkedContent(RemoteContent, LocalState.State.ChunkedContent)) { if (!Options.IsQuiet) { ZEN_CONSOLE("Local state is identical to build to download. All done. Completed in {}.", NiceTimeSpanMs(DownloadTimer.GetElapsedTimeMs())); } Stopwatch WriteStateTimer; CbObject StateObject = CreateBuildSaveStateObject(LocalState); CreateDirectories(ZenStateFilePath(Options.ZenFolderPath).parent_path()); TemporaryFile::SafeWriteFile(ZenStateFilePath(Options.ZenFolderPath), StateObject.GetView()); if (!Options.IsQuiet) { ZEN_CONSOLE("Wrote local state in {}", NiceTimeSpanMs(WriteStateTimer.GetElapsedTimeMs())); } AddDownloadedPath(Options.SystemRootDir, BuildsDownloadInfo{.Selection = LocalState.State.Selection, .LocalPath = Path, .StateFilePath = ZenStateFilePath(Options.ZenFolderPath), .Iso8601Date = DateTime::Now().ToIso8601()}); } else { ExtendableStringBuilder<128> BuildPartString; for (const std::pair& BuildPart : AllBuildParts) { BuildPartString.Append(fmt::format(" {} ({})", BuildPart.second, BuildPart.first)); } uint64_t RawSize = std::accumulate(RemoteContent.RawSizes.begin(), RemoteContent.RawSizes.end(), std::uint64_t(0)); if (!Options.IsQuiet) { ZEN_CONSOLE("Downloading build {}, parts:{} to '{}' ({})", BuildId, BuildPartString.ToView(), Path, NiceBytes(RawSize)); } Stopwatch IndexTimer; const ChunkedContentLookup LocalLookup = BuildChunkedContentLookup(LocalState.State.ChunkedContent); const ChunkedContentLookup RemoteLookup = BuildChunkedContentLookup(RemoteContent); if (!Options.IsQuiet) { ZEN_INFO("Indexed local and remote content in {}", NiceTimeSpanMs(IndexTimer.GetElapsedTimeMs())); } Progress.SetLogOperationProgress(TaskSteps::Download, TaskSteps::StepCount); BuildsOperationUpdateFolder Updater( InLog, Progress, Storage, AbortFlag, PauseFlag, Workers.GetIOWorkerPool(), Workers.GetNetworkPool(), BuildId, Path, LocalState.State.ChunkedContent, LocalLookup, RemoteContent, RemoteLookup, BlockDescriptions, LooseChunkHashes, BuildsOperationUpdateFolder::Options{ .IsQuiet = Options.IsQuiet, .IsVerbose = Options.IsVerbose, .AllowFileClone = Options.AllowFileClone, .UseSparseFiles = Options.UseSparseFiles, .SystemRootDir = Options.SystemRootDir, .ZenFolderPath = Options.ZenFolderPath, .LargeAttachmentSize = LargeAttachmentSize, .PreferredMultipartChunkSize = PreferredMultipartChunkSize, .PartialBlockRequestMode = Options.PartialBlockRequestMode, .WipeTargetFolder = Options.CleanTargetFolder, .EnableOtherDownloadsScavenging = Options.EnableOtherDownloadsScavenging, .EnableTargetFolderScavenging = Options.EnableTargetFolderScavenging || Options.AppendNewContent, .ValidateCompletedSequences = Options.PostDownloadVerify, .ExcludeFolders = Options.ExcludeFolders, .MaximumInMemoryPayloadSize = Options.MaximumInMemoryPayloadSize, .PopulateCache = Options.PopulateCache}); { Progress.PushLogOperation("Download"); auto _ = MakeGuard([&Progress]() { Progress.PopLogOperation(); }); FolderContent UpdatedLocalFolderState; Updater.Execute(UpdatedLocalFolderState); LocalState.State.ChunkedContent = RemoteContent; LocalState.FolderState = std::move(UpdatedLocalFolderState); } VerifyFolderStatistics VerifyFolderStats; if (!AbortFlag) { AddDownloadedPath(Options.SystemRootDir, BuildsDownloadInfo{.Selection = LocalState.State.Selection, .LocalPath = Path, .StateFilePath = ZenStateFilePath(Options.ZenFolderPath), .Iso8601Date = DateTime::Now().ToIso8601()}); Progress.SetLogOperationProgress(TaskSteps::Verify, TaskSteps::StepCount); VerifyFolder(Progress, AbortFlag, PauseFlag, Workers, RemoteContent, RemoteLookup, Path, Options.ExcludeFolders, Options.PostDownloadVerify, VerifyFolderStats); Stopwatch WriteStateTimer; CbObject StateObject = CreateBuildSaveStateObject(LocalState); CreateDirectories(ZenStateFilePath(Options.ZenFolderPath).parent_path()); TemporaryFile::SafeWriteFile(ZenStateFilePath(Options.ZenFolderPath), StateObject.GetView()); if (!Options.IsQuiet) { ZEN_CONSOLE("Wrote local state in {}", NiceTimeSpanMs(WriteStateTimer.GetElapsedTimeMs())); } #if 0 ExtendableStringBuilder<1024> SB; CompactBinaryToJson(StateObject, SB); WriteFile(ZenStateFileJsonPath(Options.ZenFolderPath), IoBuffer(IoBuffer::Wrap, SB.Data(), SB.Size())); #endif // 0 const uint64_t DownloadCount = Updater.m_DownloadStats.DownloadedChunkCount.load() + Updater.m_DownloadStats.DownloadedBlockCount.load() + Updater.m_DownloadStats.DownloadedPartialBlockCount.load(); const uint64_t DownloadByteCount = Updater.m_DownloadStats.DownloadedChunkByteCount.load() + Updater.m_DownloadStats.DownloadedBlockByteCount.load() + Updater.m_DownloadStats.DownloadedPartialBlockByteCount.load(); const uint64_t DownloadTimeMs = DownloadTimer.GetElapsedTimeMs(); if (!Options.IsQuiet) { std::string CloneInfo; if (Updater.m_DiskStats.CloneByteCount > 0) { CloneInfo = fmt::format(" ({} cloned)", NiceBytes(Updater.m_DiskStats.CloneByteCount.load())); } std::string DownloadDetails; { ExtendableStringBuilder<128> SB; BuildStorageBase::ExtendedStatistics ExtendedDownloadStats; if (Storage.BuildStorage->GetExtendedStatistics(ExtendedDownloadStats)) { if (!ExtendedDownloadStats.ReceivedBytesPerSource.empty()) { for (auto& It : ExtendedDownloadStats.ReceivedBytesPerSource) { if (SB.Size() > 0) { SB.Append(", "sv); } SB.Append(It.first); SB.Append(": "sv); SB.Append(NiceBytes(It.second)); } } } if (Storage.CacheStorage) { if (SB.Size() > 0) { SB.Append(", "sv); } SB.Append("Cache: "); SB.Append(NiceBytes(StorageCacheStats.TotalBytesRead.load())); } if (SB.Size() > 0) { DownloadDetails = fmt::format(" ({})", SB.ToView()); } } ZEN_CONSOLE( "Downloaded build {}, parts:{} in {}\n" " Scavenge: {} (Target: {}, Cache: {}, Others: {})\n" " Download: {} ({}) {}bits/s{}\n" " Write: {} ({}) {}B/s{}\n" " Clean: {}\n" " Finalize: {}\n" " Verify: {}", BuildId, BuildPartString.ToView(), NiceTimeSpanMs(DownloadTimeMs), NiceTimeSpanMs((Updater.m_CacheMappingStats.CacheScanElapsedWallTimeUs + Updater.m_CacheMappingStats.LocalScanElapsedWallTimeUs + Updater.m_CacheMappingStats.ScavengeElapsedWallTimeUs) / 1000), NiceTimeSpanMs(Updater.m_CacheMappingStats.LocalScanElapsedWallTimeUs / 1000), NiceTimeSpanMs(Updater.m_CacheMappingStats.CacheScanElapsedWallTimeUs / 1000), NiceTimeSpanMs(Updater.m_CacheMappingStats.ScavengeElapsedWallTimeUs / 1000), DownloadCount, NiceBytes(DownloadByteCount), NiceNum(GetBytesPerSecond(Updater.m_WriteChunkStats.DownloadTimeUs, DownloadByteCount * 8)), DownloadDetails, Updater.m_DiskStats.WriteCount.load(), NiceBytes(Updater.m_WrittenChunkByteCount.load()), NiceNum(GetBytesPerSecond(Updater.m_WriteChunkStats.WriteTimeUs, Updater.m_DiskStats.WriteByteCount.load())), CloneInfo, NiceTimeSpanMs(Updater.m_RebuildFolderStateStats.CleanFolderElapsedWallTimeUs / 1000), NiceTimeSpanMs(Updater.m_RebuildFolderStateStats.FinalizeTreeElapsedWallTimeUs / 1000), NiceTimeSpanMs(VerifyFolderStats.VerifyElapsedWallTimeUs / 1000)); } } } Progress.SetLogOperationProgress(TaskSteps::Cleanup, TaskSteps::StepCount); CleanAndRemoveDirectory(Workers.GetIOWorkerPool(), AbortFlag, PauseFlag, ZenTempFolder); } } // namespace zen