diff options
Diffstat (limited to 'src/zenremotestore/builds/buildupdatefolder.cpp')
| -rw-r--r-- | src/zenremotestore/builds/buildupdatefolder.cpp | 4947 |
1 files changed, 4947 insertions, 0 deletions
diff --git a/src/zenremotestore/builds/buildupdatefolder.cpp b/src/zenremotestore/builds/buildupdatefolder.cpp new file mode 100644 index 000000000..98972740a --- /dev/null +++ b/src/zenremotestore/builds/buildupdatefolder.cpp @@ -0,0 +1,4947 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenremotestore/builds/buildupdatefolder.h> + +#include <zencore/basicfile.h> +#include <zencore/fmtutils.h> +#include <zencore/parallelwork.h> +#include <zencore/scopeguard.h> +#include <zencore/trace.h> +#include <zenremotestore/builds/buildcontent.h> +#include <zenremotestore/builds/buildmanifest.h> +#include <zenremotestore/chunking/chunkingcache.h> +#include <zenremotestore/chunking/chunkingcontroller.h> +#include <zenremotestore/transferthreadworkers.h> +#include <zenutil/filesystemutils.h> +#include <zenutil/filteredrate.h> +#include <zenutil/progress.h> + +#include <numeric> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_set.h> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +using namespace std::literals; + +namespace { + std::filesystem::path ZenTempCacheFolderPath(const std::filesystem::path& ZenFolderPath) + { + return ZenTempFolderPath(ZenFolderPath) / "cache"; // Decompressed and verified data - chunks & sequences + } + std::filesystem::path ZenTempBlockFolderPath(const std::filesystem::path& ZenFolderPath) + { + return ZenTempFolderPath(ZenFolderPath) / "blocks"; // Temp storage for whole and partial blocks + } + std::filesystem::path ZenTempDownloadFolderPath(const std::filesystem::path& ZenFolderPath) + { + return ZenTempFolderPath(ZenFolderPath) / "download"; // Temp storage for decompressed and validated chunks + } + std::filesystem::path GetTempChunkedSequenceFileName(const std::filesystem::path& CacheFolderPath, const IoHash& RawHash) + { + return CacheFolderPath / (RawHash.ToHexString() + ".tmp"); + } + + std::filesystem::path GetFinalChunkedSequenceFileName(const std::filesystem::path& CacheFolderPath, const IoHash& RawHash) + { + return CacheFolderPath / RawHash.ToHexString(); + } + bool CleanDirectory(LoggerRef InLog, + ProgressBase& Progress, + WorkerThreadPool& IOWorkerPool, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag, + bool IsQuiet, + const std::filesystem::path& Path, + std::span<const std::string> ExcludeDirectories) + { + ZEN_TRACE_CPU("CleanDirectory"); + ZEN_SCOPED_LOG(InLog); + Stopwatch Timer; + + std::unique_ptr<ProgressBase::ProgressBar> ProgressBar = Progress.CreateProgressBar("Clean Folder"); + + CleanDirectoryResult Result = CleanDirectory( + IOWorkerPool, + AbortFlag, + PauseFlag, + Path, + ExcludeDirectories, + [&](const std::string_view Details, uint64_t TotalCount, uint64_t RemainingCount, bool IsPaused, bool IsAborted) { + ProgressBar->UpdateState({.Task = "Cleaning folder ", + .Details = std::string(Details), + .TotalCount = TotalCount, + .RemainingCount = RemainingCount, + .Status = ProgressBase::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, + false); + }, + Progress.GetProgressUpdateDelayMS()); + + ProgressBar->Finish(); + + if (AbortFlag) + { + return false; + } + + uint64_t ElapsedTimeMs = Timer.GetElapsedTimeMs(); + + if (!Result.FailedRemovePaths.empty()) + { + ExtendableStringBuilder<512> SB; + for (size_t FailedPathIndex = 0; FailedPathIndex < Result.FailedRemovePaths.size(); FailedPathIndex++) + { + SB << fmt::format("\n '{}': ({}) {}", + Result.FailedRemovePaths[FailedPathIndex].first, + Result.FailedRemovePaths[FailedPathIndex].second.value(), + Result.FailedRemovePaths[FailedPathIndex].second.message()); + } + ZEN_WARN("Clean failed to remove files from '{}': {}", Path, SB.ToView()); + } + + if (ElapsedTimeMs >= 200 && !IsQuiet) + { + ZEN_INFO("Wiped folder '{}' {} ({}) in {}", + Path, + Result.FoundCount, + NiceBytes(Result.DeletedByteCount), + NiceTimeSpanMs(ElapsedTimeMs)); + } + + return Result.FailedRemovePaths.empty(); + } + uint32_t SetNativeFileAttributes(const std::filesystem::path FilePath, SourcePlatform SourcePlatform, uint32_t Attributes) + { +#if ZEN_PLATFORM_WINDOWS + if (SourcePlatform == SourcePlatform::Windows) + { + SetFileAttributesToPath(FilePath, Attributes); + return Attributes; + } + else + { + uint32_t CurrentAttributes = GetFileAttributesFromPath(FilePath); + uint32_t NewAttributes = zen::MakeFileAttributeReadOnly(CurrentAttributes, zen::IsFileModeReadOnly(Attributes)); + if (CurrentAttributes != NewAttributes) + { + SetFileAttributesToPath(FilePath, NewAttributes); + } + return NewAttributes; + } +#endif // ZEN_PLATFORM_WINDOWS +#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + if (SourcePlatform != SourcePlatform::Windows) + { + zen::SetFileMode(FilePath, Attributes); + return Attributes; + } + else + { + uint32_t CurrentMode = zen::GetFileMode(FilePath); + uint32_t NewMode = zen::MakeFileModeReadOnly(CurrentMode, zen::IsFileAttributeReadOnly(Attributes)); + if (CurrentMode != NewMode) + { + zen::SetFileMode(FilePath, NewMode); + } + return NewMode; + } +#endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + }; + + uint32_t GetNativeFileAttributes(const std::filesystem::path FilePath) + { +#if ZEN_PLATFORM_WINDOWS + return GetFileAttributesFromPath(FilePath); +#endif // ZEN_PLATFORM_WINDOWS +#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + return GetFileMode(FilePath); +#endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + } + std::filesystem::path TryMoveDownloadedChunk(IoBuffer& BlockBuffer, const std::filesystem::path& Path, bool ForceDiskBased) + { + uint64_t BlockSize = BlockBuffer.GetSize(); + IoBufferFileReference FileRef; + if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == BlockSize)) + { + ZEN_TRACE_CPU("MoveTempFullBlock"); + std::error_code Ec; + std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); + if (!Ec) + { + BlockBuffer.SetDeleteOnClose(false); + BlockBuffer = {}; + RenameFile(TempBlobPath, Path, Ec); + if (Ec) + { + // Re-open the temp file again + BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); + BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); + BlockBuffer.SetDeleteOnClose(true); + } + else + { + return Path; + } + } + } + + if (ForceDiskBased) + { + // Could not be moved and rather large, lets store it on disk + ZEN_TRACE_CPU("WriteTempFullBlock"); + TemporaryFile::SafeWriteFile(Path, BlockBuffer); + BlockBuffer = {}; + return Path; + } + + return {}; + } + bool IsSingleFileChunk(const ChunkedFolderContent& RemoteContent, + const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> Locations) + { + if (Locations.size() == 1) + { + const uint32_t FirstSequenceIndex = Locations[0]->SequenceIndex; + if (RemoteContent.ChunkedContent.ChunkCounts[FirstSequenceIndex] == 1) + { + ZEN_ASSERT_SLOW(Locations[0]->Offset == 0); + return true; + } + } + return false; + } + IoBuffer MakeBufferMemoryBased(const CompositeBuffer& PartialBlockBuffer) + { + ZEN_TRACE_CPU("MakeBufferMemoryBased"); + IoBuffer BlockMemoryBuffer; + std::span<const SharedBuffer> Segments = PartialBlockBuffer.GetSegments(); + if (Segments.size() == 1) + { + IoBufferFileReference FileRef = {}; + if (PartialBlockBuffer.GetSegments().front().AsIoBuffer().GetFileReference(FileRef)) + { + BlockMemoryBuffer = UniqueBuffer::Alloc(FileRef.FileChunkSize).MoveToShared().AsIoBuffer(); + BasicFile Reader; + Reader.Attach(FileRef.FileHandle); + auto _ = MakeGuard([&Reader]() { Reader.Detach(); }); + MutableMemoryView ReadMem = BlockMemoryBuffer.GetMutableView(); + Reader.Read(ReadMem.GetData(), FileRef.FileChunkSize, FileRef.FileChunkOffset); + return BlockMemoryBuffer; + } + else + { + return PartialBlockBuffer.GetSegments().front().AsIoBuffer(); + } + } + else + { + // Not a homogenous memory buffer, read all to memory + + BlockMemoryBuffer = UniqueBuffer::Alloc(PartialBlockBuffer.GetSize()).MoveToShared().AsIoBuffer(); + MutableMemoryView ReadMem = BlockMemoryBuffer.GetMutableView(); + for (const SharedBuffer& Segment : Segments) + { + IoBufferFileReference FileRef = {}; + if (Segment.AsIoBuffer().GetFileReference(FileRef)) + { + BasicFile Reader; + Reader.Attach(FileRef.FileHandle); + auto _ = MakeGuard([&Reader]() { Reader.Detach(); }); + Reader.Read(ReadMem.GetData(), FileRef.FileChunkSize, FileRef.FileChunkOffset); + ReadMem = ReadMem.Mid(FileRef.FileChunkSize); + } + else + { + ReadMem = ReadMem.CopyFrom(Segment.AsIoBuffer().GetView()); + } + } + return BlockMemoryBuffer; + } + } + + FolderContent CheckFolderFiles(ProgressBase& Progress, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag, + std::string_view ProgressLabel, + TransferThreadWorkers& Workers, + GetFolderContentStatistics& LocalFolderScanStats, + const std::filesystem::path& Path, + std::span<const std::filesystem::path> PathsToCheck) + { + std::unique_ptr<ProgressBase::ProgressBar> ProgressBar = Progress.CreateProgressBar(ProgressLabel); + FolderContent Result = GetValidFolderContent( + Workers.GetIOWorkerPool(), + LocalFolderScanStats, + Path, + PathsToCheck, + [&ProgressBar, &LocalFolderScanStats, &AbortFlag, &PauseFlag](uint64_t PathCount, uint64_t CompletedPathCount) { + std::string Details = + fmt::format("{}/{} checked, {} found", CompletedPathCount, PathCount, LocalFolderScanStats.FoundFileCount.load()); + ProgressBar->UpdateState({.Task = "Checking files ", + .Details = Details, + .TotalCount = PathCount, + .RemainingCount = PathCount - CompletedPathCount, + .Status = ProgressBase::ProgressBar::State::CalculateStatus(AbortFlag, PauseFlag)}, + false); + }, + Progress.GetProgressUpdateDelayMS(), + AbortFlag, + PauseFlag); + ProgressBar->Finish(); + return Result; + } + + ChunkedFolderContent ScanFolderFiles(ProgressBase& Progress, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag, + std::string_view ProgressLabel, + TransferThreadWorkers& Workers, + const std::filesystem::path& Path, + const FolderContent& FolderSource, + ChunkingController& ChunkController, + ChunkingCache& ChunkCache, + ChunkingStatistics& OutChunkingStats) + { + uint64_t ByteCountToScan = 0; + for (const uint64_t RawSize : FolderSource.RawSizes) + { + ByteCountToScan += RawSize; + } + std::unique_ptr<ProgressBase::ProgressBar> ProgressBar = Progress.CreateProgressBar(ProgressLabel); + FilteredRate FilteredBytesHashed; + FilteredBytesHashed.Start(); + ChunkingStatistics LocalChunkingStats; + ChunkedFolderContent Result = ChunkFolderContent( + LocalChunkingStats, + Workers.GetIOWorkerPool(), + Path, + FolderSource, + ChunkController, + ChunkCache, + Progress.GetProgressUpdateDelayMS(), + [&](bool IsAborted, bool IsPaused, std::ptrdiff_t) { + FilteredBytesHashed.Update(LocalChunkingStats.BytesHashed.load()); + std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found", + LocalChunkingStats.FilesProcessed.load(), + FolderSource.Paths.size(), + NiceBytes(LocalChunkingStats.BytesHashed.load()), + NiceBytes(ByteCountToScan), + NiceNum(FilteredBytesHashed.GetCurrent()), + LocalChunkingStats.UniqueChunksFound.load(), + NiceBytes(LocalChunkingStats.UniqueBytesFound.load())); + ProgressBar->UpdateState({.Task = "Scanning files ", + .Details = Details, + .TotalCount = ByteCountToScan, + .RemainingCount = ByteCountToScan - LocalChunkingStats.BytesHashed.load(), + .Status = ProgressBase::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, + false); + }, + AbortFlag, + PauseFlag); + OutChunkingStats += LocalChunkingStats; + FilteredBytesHashed.Stop(); + ProgressBar->Finish(); + return Result; + } +} // namespace + +BuildsOperationUpdateFolder::BuildsOperationUpdateFolder(LoggerRef Log, + ProgressBase& Progress, + StorageInstance& Storage, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag, + WorkerThreadPool& IOWorkerPool, + WorkerThreadPool& NetworkPool, + const Oid& BuildId, + const std::filesystem::path& Path, + const ChunkedFolderContent& LocalContent, + const ChunkedContentLookup& LocalLookup, + const ChunkedFolderContent& RemoteContent, + const ChunkedContentLookup& RemoteLookup, + const std::vector<ChunkBlockDescription>& BlockDescriptions, + const std::vector<IoHash>& LooseChunkHashes, + const Options& Options) +: m_Log(Log) +, m_Progress(Progress) +, m_Storage(Storage) +, m_AbortFlag(AbortFlag) +, m_PauseFlag(PauseFlag) +, m_IOWorkerPool(IOWorkerPool) +, m_NetworkPool(NetworkPool) +, m_BuildId(BuildId) +, m_Path(Path) +, m_LocalContent(LocalContent) +, m_LocalLookup(LocalLookup) +, m_RemoteContent(RemoteContent) +, m_RemoteLookup(RemoteLookup) +, m_BlockDescriptions(BlockDescriptions) +, m_LooseChunkHashes(LooseChunkHashes) +, m_Options(Options) +, m_CacheFolderPath(ZenTempCacheFolderPath(m_Options.ZenFolderPath)) +, m_TempDownloadFolderPath(ZenTempDownloadFolderPath(m_Options.ZenFolderPath)) +, m_TempBlockFolderPath(ZenTempBlockFolderPath(m_Options.ZenFolderPath)) +{ +} + +void +BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) +{ + ZEN_TRACE_CPU("BuildsOperationUpdateFolder::Execute"); + try + { + enum class TaskSteps : uint32_t + { + ScanExistingData, + WriteChunks, + PrepareTarget, + FinalizeTarget, + Cleanup, + StepCount + }; + + auto EndProgress = + MakeGuard([&]() { m_Progress.SetLogOperationProgress((uint32_t)TaskSteps::StepCount, (uint32_t)TaskSteps::StepCount); }); + + m_Progress.SetLogOperationProgress((uint32_t)TaskSteps::ScanExistingData, (uint32_t)TaskSteps::StepCount); + + CreateDirectories(m_CacheFolderPath); + CreateDirectories(m_TempDownloadFolderPath); + CreateDirectories(m_TempBlockFolderPath); + + std::vector<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters(m_RemoteContent.ChunkedContent.SequenceRawHashes.size()); + std::vector<bool> RemoteChunkIndexNeedsCopyFromLocalFileFlags(m_RemoteContent.ChunkedContent.ChunkHashes.size()); + std::vector<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags(m_RemoteContent.ChunkedContent.ChunkHashes.size()); + + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedChunkHashesFound; + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedSequenceHashesFound; + ScanCacheFolder(CachedChunkHashesFound, CachedSequenceHashesFound); + + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedBlocksFound; + ScanTempBlocksFolder(CachedBlocksFound); + + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> SequenceIndexesLeftToFindToRemoteIndex; + InitializeSequenceCounters(SequenceIndexChunksLeftToWriteCounters, + SequenceIndexesLeftToFindToRemoteIndex, + CachedChunkHashesFound, + CachedSequenceHashesFound); + + std::vector<ChunkedFolderContent> ScavengedContents; + std::vector<ChunkedContentLookup> ScavengedLookups; + std::vector<std::filesystem::path> ScavengedPaths; + + std::vector<ScavengedSequenceCopyOperation> ScavengedSequenceCopyOperations; + uint64_t ScavengedPathsCount = 0; + + if (m_Options.EnableOtherDownloadsScavenging) + { + ZEN_TRACE_CPU("GetScavengedSequences"); + + Stopwatch ScavengeTimer; + + if (!SequenceIndexesLeftToFindToRemoteIndex.empty()) + { + std::vector<ScavengeSource> ScavengeSources = FindScavengeSources(); + ScanScavengeSources(ScavengeSources, ScavengedContents, ScavengedLookups, ScavengedPaths); + if (m_AbortFlag) + { + return; + } + + MatchScavengedSequencesToRemote(ScavengedContents, + ScavengedLookups, + ScavengedPaths, + SequenceIndexesLeftToFindToRemoteIndex, + SequenceIndexChunksLeftToWriteCounters, + ScavengedSequenceCopyOperations, + ScavengedPathsCount); + } + m_CacheMappingStats.ScavengeElapsedWallTimeUs += ScavengeTimer.GetElapsedTimeUs(); + } + + uint32_t RemainingChunkCount = 0; + for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < m_RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++) + { + uint64_t ChunkWriteCount = GetChunkWriteCount(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); + if (ChunkWriteCount > 0) + { + RemainingChunkCount++; + } + } + + // Pick up all chunks in current local state + tsl::robin_map<IoHash, size_t, IoHash::Hasher> RawHashToCopyChunkDataIndex; + std::vector<CopyChunkData> CopyChunkDatas; + + if (m_Options.EnableTargetFolderScavenging) + { + ZEN_TRACE_CPU("GetLocalChunks"); + + Stopwatch LocalTimer; + + ScavengeSourceForChunks(RemainingChunkCount, + RemoteChunkIndexNeedsCopyFromLocalFileFlags, + RawHashToCopyChunkDataIndex, + SequenceIndexChunksLeftToWriteCounters, + m_LocalContent, + m_LocalLookup, + CopyChunkDatas, + uint32_t(-1), + m_CacheMappingStats.LocalChunkMatchingRemoteCount, + m_CacheMappingStats.LocalChunkMatchingRemoteByteCount); + + m_CacheMappingStats.LocalScanElapsedWallTimeUs += LocalTimer.GetElapsedTimeUs(); + } + + if (m_Options.EnableOtherDownloadsScavenging) + { + ZEN_TRACE_CPU("GetScavengeChunks"); + + Stopwatch ScavengeTimer; + + for (uint32_t ScavengedContentIndex = 0; ScavengedContentIndex < ScavengedContents.size() && (RemainingChunkCount > 0); + ScavengedContentIndex++) + { + const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengedContentIndex]; + const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengedContentIndex]; + + ScavengeSourceForChunks(RemainingChunkCount, + RemoteChunkIndexNeedsCopyFromLocalFileFlags, + RawHashToCopyChunkDataIndex, + SequenceIndexChunksLeftToWriteCounters, + ScavengedContent, + ScavengedLookup, + CopyChunkDatas, + ScavengedContentIndex, + m_CacheMappingStats.ScavengedChunkMatchingRemoteCount, + m_CacheMappingStats.ScavengedChunkMatchingRemoteByteCount); + } + m_CacheMappingStats.ScavengeElapsedWallTimeUs += ScavengeTimer.GetElapsedTimeUs(); + } + + if (!m_Options.IsQuiet) + { + if (m_CacheMappingStats.CacheSequenceHashesCount > 0 || m_CacheMappingStats.CacheChunkCount > 0 || + m_CacheMappingStats.CacheBlockCount > 0) + { + ZEN_INFO("Download cache: Found {} ({}) chunk sequences, {} ({}) chunks, {} ({}) blocks in {}", + m_CacheMappingStats.CacheSequenceHashesCount, + NiceBytes(m_CacheMappingStats.CacheSequenceHashesByteCount), + m_CacheMappingStats.CacheChunkCount, + NiceBytes(m_CacheMappingStats.CacheChunkByteCount), + m_CacheMappingStats.CacheBlockCount, + NiceBytes(m_CacheMappingStats.CacheBlocksByteCount), + NiceTimeSpanMs(m_CacheMappingStats.CacheScanElapsedWallTimeUs / 1000)); + } + + if (m_CacheMappingStats.LocalPathsMatchingSequencesCount > 0 || m_CacheMappingStats.LocalChunkMatchingRemoteCount > 0) + { + ZEN_INFO("Local state : Found {} ({}) chunk sequences, {} ({}) chunks in {}", + m_CacheMappingStats.LocalPathsMatchingSequencesCount, + NiceBytes(m_CacheMappingStats.LocalPathsMatchingSequencesByteCount), + m_CacheMappingStats.LocalChunkMatchingRemoteCount, + NiceBytes(m_CacheMappingStats.LocalChunkMatchingRemoteByteCount), + NiceTimeSpanMs(m_CacheMappingStats.LocalScanElapsedWallTimeUs / 1000)); + } + if (m_CacheMappingStats.ScavengedPathsMatchingSequencesCount > 0 || m_CacheMappingStats.ScavengedChunkMatchingRemoteCount > 0) + { + ZEN_INFO("Scavenge of {} paths, found {} ({}) chunk sequences, {} ({}) chunks in {}", + ScavengedPathsCount, + m_CacheMappingStats.ScavengedPathsMatchingSequencesCount, + NiceBytes(m_CacheMappingStats.ScavengedPathsMatchingSequencesByteCount), + m_CacheMappingStats.ScavengedChunkMatchingRemoteCount, + NiceBytes(m_CacheMappingStats.ScavengedChunkMatchingRemoteByteCount), + NiceTimeSpanMs(m_CacheMappingStats.ScavengeElapsedWallTimeUs / 1000)); + } + } + + uint64_t BytesToWrite = CalculateBytesToWriteAndFlagNeededChunks(SequenceIndexChunksLeftToWriteCounters, + RemoteChunkIndexNeedsCopyFromLocalFileFlags, + RemoteChunkIndexNeedsCopyFromSourceFlags); + + for (const ScavengedSequenceCopyOperation& ScavengeCopyOp : ScavengedSequenceCopyOperations) + { + BytesToWrite += ScavengeCopyOp.RawSize; + } + + uint64_t BytesToValidate = m_Options.ValidateCompletedSequences ? BytesToWrite : 0; + + uint64_t TotalRequestCount = 0; + uint64_t TotalPartWriteCount = 0; + std::atomic<uint64_t> WritePartsComplete = 0; + + tsl::robin_map<std::string, uint32_t> RemotePathToRemoteIndex; + RemotePathToRemoteIndex.reserve(m_RemoteContent.Paths.size()); + for (uint32_t RemotePathIndex = 0; RemotePathIndex < m_RemoteContent.Paths.size(); RemotePathIndex++) + { + RemotePathToRemoteIndex.insert({m_RemoteContent.Paths[RemotePathIndex].generic_string(), RemotePathIndex}); + } + + CheckRequiredDiskSpace(RemotePathToRemoteIndex); + + BlobsExistsResult ExistsResult; + { + ChunkBlockAnalyser BlockAnalyser( + Log(), + m_BlockDescriptions, + ChunkBlockAnalyser::Options{.IsQuiet = m_Options.IsQuiet, + .IsVerbose = m_Options.IsVerbose, + .HostLatencySec = m_Storage.BuildStorageHost.LatencySec, + .HostHighSpeedLatencySec = m_Storage.CacheHost.LatencySec, + .HostMaxRangeCountPerRequest = m_Storage.BuildStorageHost.Caps.MaxRangeCountPerRequest, + .HostHighSpeedMaxRangeCountPerRequest = m_Storage.CacheHost.Caps.MaxRangeCountPerRequest}); + + std::vector<ChunkBlockAnalyser::NeededBlock> NeededBlocks = BlockAnalyser.GetNeeded( + m_RemoteLookup.ChunkHashToChunkIndex, + [&](uint32_t RemoteChunkIndex) -> bool { return RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex]; }); + + std::vector<uint32_t> FetchBlockIndexes; + std::vector<uint32_t> CachedChunkBlockIndexes; + ClassifyCachedAndFetchBlocks(NeededBlocks, CachedBlocksFound, TotalPartWriteCount, CachedChunkBlockIndexes, FetchBlockIndexes); + + std::vector<uint32_t> NeededLooseChunkIndexes = DetermineNeededLooseChunkIndexes(SequenceIndexChunksLeftToWriteCounters, + RemoteChunkIndexNeedsCopyFromLocalFileFlags, + RemoteChunkIndexNeedsCopyFromSourceFlags); + + ExistsResult = QueryBlobCacheExists(NeededLooseChunkIndexes, FetchBlockIndexes); + + std::vector<ChunkBlockAnalyser::EPartialBlockDownloadMode> BlockPartialDownloadModes = + DeterminePartialDownloadModes(ExistsResult); + ZEN_ASSERT(BlockPartialDownloadModes.size() == m_BlockDescriptions.size()); + + ChunkBlockAnalyser::BlockResult PartialBlocks = + BlockAnalyser.CalculatePartialBlockDownloads(NeededBlocks, BlockPartialDownloadModes); + + TotalRequestCount += NeededLooseChunkIndexes.size(); + TotalPartWriteCount += NeededLooseChunkIndexes.size(); + TotalRequestCount += PartialBlocks.BlockRanges.size(); + TotalPartWriteCount += PartialBlocks.BlockRanges.size(); + TotalRequestCount += PartialBlocks.FullBlockIndexes.size(); + TotalPartWriteCount += PartialBlocks.FullBlockIndexes.size(); + + std::vector<LooseChunkHashWorkData> LooseChunkHashWorks = + BuildLooseChunkHashWorks(NeededLooseChunkIndexes, SequenceIndexChunksLeftToWriteCounters); + + ZEN_TRACE_CPU("WriteChunks"); + + m_Progress.SetLogOperationProgress((uint32_t)TaskSteps::WriteChunks, (uint32_t)TaskSteps::StepCount); + + Stopwatch WriteTimer; + + FilteredRate FilteredDownloadedBytesPerSecond; + FilteredRate FilteredWrittenBytesPerSecond; + + std::unique_ptr<ProgressBase::ProgressBar> ProgressBar = m_Progress.CreateProgressBar("Writing"); + ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + TotalPartWriteCount += CopyChunkDatas.size(); + TotalPartWriteCount += ScavengedSequenceCopyOperations.size(); + + BufferedWriteFileCache WriteCache; + + WriteChunksContext Context{.Work = Work, + .WriteCache = WriteCache, + .SequenceIndexChunksLeftToWriteCounters = SequenceIndexChunksLeftToWriteCounters, + .RemoteChunkIndexNeedsCopyFromSourceFlags = RemoteChunkIndexNeedsCopyFromSourceFlags, + .WritePartsComplete = WritePartsComplete, + .TotalPartWriteCount = TotalPartWriteCount, + .TotalRequestCount = TotalRequestCount, + .ExistsResult = ExistsResult, + .FilteredDownloadedBytesPerSecond = FilteredDownloadedBytesPerSecond, + .FilteredWrittenBytesPerSecond = FilteredWrittenBytesPerSecond}; + + ScheduleScavengedSequenceWrites(Context, ScavengedSequenceCopyOperations, ScavengedContents, ScavengedPaths); + ScheduleLooseChunkWrites(Context, LooseChunkHashWorks); + + std::unique_ptr<CloneQueryInterface> CloneQuery = + m_Options.AllowFileClone ? GetCloneQueryInterface(m_CacheFolderPath) : nullptr; + + ScheduleLocalChunkCopies(Context, CopyChunkDatas, CloneQuery.get(), ScavengedContents, ScavengedLookups, ScavengedPaths); + ScheduleCachedBlockWrites(Context, CachedChunkBlockIndexes); + SchedulePartialBlockDownloads(Context, PartialBlocks); + ScheduleFullBlockDownloads(Context, PartialBlocks.FullBlockIndexes); + + { + ZEN_TRACE_CPU("WriteChunks_Wait"); + + Work.Wait(m_Progress.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); + uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() + + m_DownloadStats.DownloadedBlockByteCount.load() + + +m_DownloadStats.DownloadedPartialBlockByteCount.load(); + FilteredWrittenBytesPerSecond.Update(m_DiskStats.WriteByteCount.load()); + FilteredDownloadedBytesPerSecond.Update(DownloadedBytes); + std::string DownloadRateString = + (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) + ? "" + : fmt::format(" {}bits/s", NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8)); + std::string CloneDetails; + if (m_DiskStats.CloneCount.load() > 0) + { + CloneDetails = fmt::format(" ({} cloned)", NiceBytes(m_DiskStats.CloneByteCount.load())); + } + std::string WriteDetails = fmt::format(" {}/{} ({}B/s) written{}", + NiceBytes(m_WrittenChunkByteCount.load()), + NiceBytes(BytesToWrite), + NiceNum(FilteredWrittenBytesPerSecond.GetCurrent()), + CloneDetails); + + std::string Details = fmt::format("{}/{} ({}{}) downloaded.{}", + m_DownloadStats.RequestsCompleteCount.load(), + TotalRequestCount, + NiceBytes(DownloadedBytes), + DownloadRateString, + WriteDetails); + + std::string Task; + if ((m_WrittenChunkByteCount < BytesToWrite) || (BytesToValidate == 0)) + { + Task = "Writing chunks "; + } + else + { + Task = "Verifying chunks "; + } + + ProgressBar->UpdateState({.Task = Task, + .Details = Details, + .TotalCount = (BytesToWrite + BytesToValidate), + .RemainingCount = ((BytesToWrite + BytesToValidate) - + (m_WrittenChunkByteCount.load() + m_ValidatedChunkByteCount.load())), + .Status = ProgressBase::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, + false); + }); + } + + CloneQuery.reset(); + + FilteredWrittenBytesPerSecond.Stop(); + FilteredDownloadedBytesPerSecond.Stop(); + + ProgressBar->Finish(); + if (m_AbortFlag) + { + return; + } + + VerifyWriteChunksComplete(SequenceIndexChunksLeftToWriteCounters, BytesToWrite, BytesToValidate); + + const uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() + + m_DownloadStats.DownloadedBlockByteCount.load() + + m_DownloadStats.DownloadedPartialBlockByteCount.load(); + if (!m_Options.IsQuiet) + { + std::string CloneDetails; + if (m_DiskStats.CloneCount.load() > 0) + { + CloneDetails = fmt::format(" ({} cloned)", NiceBytes(m_DiskStats.CloneByteCount.load())); + } + ZEN_INFO("Downloaded {} ({}bits/s) in {}. Wrote {} ({}B/s){} in {}. Completed in {}", + NiceBytes(DownloadedBytes), + NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), DownloadedBytes * 8)), + NiceTimeSpanMs(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS() / 1000), + NiceBytes(m_WrittenChunkByteCount.load()), + NiceNum(GetBytesPerSecond(FilteredWrittenBytesPerSecond.GetElapsedTimeUS(), m_DiskStats.WriteByteCount.load())), + CloneDetails, + NiceTimeSpanMs(FilteredWrittenBytesPerSecond.GetElapsedTimeUS() / 1000), + NiceTimeSpanMs(WriteTimer.GetElapsedTimeMs())); + } + + m_WriteChunkStats.WriteChunksElapsedWallTimeUs = WriteTimer.GetElapsedTimeUs(); + m_WriteChunkStats.DownloadTimeUs = FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(); + m_WriteChunkStats.WriteTimeUs = FilteredWrittenBytesPerSecond.GetElapsedTimeUS(); + } + + m_Progress.SetLogOperationProgress((uint32_t)TaskSteps::PrepareTarget, (uint32_t)TaskSteps::StepCount); + + if (m_AbortFlag) + { + return; + } + + LocalPathCategorization Categorization = CategorizeLocalPaths(RemotePathToRemoteIndex); + + if (m_AbortFlag) + { + return; + } + + std::atomic<uint64_t> CachedCount = 0; + std::atomic<uint64_t> CachedByteCount = 0; + ScheduleLocalFileCaching(Categorization.FilesToCache, CachedCount, CachedByteCount); + if (m_AbortFlag) + { + return; + } + + ZEN_DEBUG( + "Local state prep: Match: {}, PathMismatch: {}, HashMismatch: {}, Cached: {} ({}), Skipped: {}, " + "Delete: {}", + Categorization.MatchCount, + Categorization.PathMismatchCount, + Categorization.HashMismatchCount, + CachedCount.load(), + NiceBytes(CachedByteCount.load()), + Categorization.SkippedCount, + Categorization.DeleteCount); + + m_Progress.SetLogOperationProgress((uint32_t)TaskSteps::FinalizeTarget, (uint32_t)TaskSteps::StepCount); + + if (m_Options.WipeTargetFolder) + { + ZEN_TRACE_CPU("WipeTarget"); + Stopwatch Timer; + + // Clean target folder + if (!CleanDirectory(Log(), + m_Progress, + m_IOWorkerPool, + m_AbortFlag, + m_PauseFlag, + m_Options.IsQuiet, + m_Path, + m_Options.ExcludeFolders)) + { + ZEN_WARN("Some files in {} could not be removed", m_Path); + } + m_RebuildFolderStateStats.CleanFolderElapsedWallTimeUs = Timer.GetElapsedTimeUs(); + } + + if (m_AbortFlag) + { + return; + } + + { + ZEN_TRACE_CPU("FinalizeTree"); + + Stopwatch Timer; + + std::unique_ptr<ProgressBase::ProgressBar> ProgressBar = m_Progress.CreateProgressBar("Rebuild State"); + ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + OutLocalFolderState.Paths.resize(m_RemoteContent.Paths.size()); + OutLocalFolderState.RawSizes.resize(m_RemoteContent.Paths.size()); + OutLocalFolderState.Attributes.resize(m_RemoteContent.Paths.size()); + OutLocalFolderState.ModificationTicks.resize(m_RemoteContent.Paths.size()); + + std::atomic<uint64_t> DeletedCount = 0; + std::atomic<uint64_t> TargetsComplete = 0; + + ScheduleLocalFileRemovals(Work, Categorization.RemoveLocalPathIndexes, DeletedCount); + + std::vector<FinalizeTarget> Targets = BuildSortedFinalizeTargets(); + + ScheduleTargetFinalization(Work, + Targets, + Categorization.SequenceHashToLocalPathIndex, + Categorization.RemotePathIndexToLocalPathIndex, + OutLocalFolderState, + TargetsComplete); + + { + ZEN_TRACE_CPU("FinalizeTree_Wait"); + + Work.Wait(m_Progress.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); + const uint64_t WorkTotal = Targets.size() + Categorization.RemoveLocalPathIndexes.size(); + const uint64_t WorkComplete = TargetsComplete.load() + DeletedCount.load(); + std::string Details = fmt::format("{}/{} files", WorkComplete, WorkTotal); + ProgressBar->UpdateState({.Task = "Rebuilding state ", + .Details = Details, + .TotalCount = gsl::narrow<uint64_t>(WorkTotal), + .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete), + .Status = ProgressBase::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, + false); + }); + } + + m_RebuildFolderStateStats.FinalizeTreeElapsedWallTimeUs = Timer.GetElapsedTimeUs(); + ProgressBar->Finish(); + } + m_Progress.SetLogOperationProgress((uint32_t)TaskSteps::Cleanup, (uint32_t)TaskSteps::StepCount); + } + catch (const std::exception&) + { + m_AbortFlag = true; + throw; + } +} + +void +BuildsOperationUpdateFolder::ScanCacheFolder(tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& OutCachedChunkHashesFound, + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& OutCachedSequenceHashesFound) +{ + ZEN_TRACE_CPU("ScanCacheFolder"); + + Stopwatch CacheTimer; + + DirectoryContent CacheDirContent; + GetDirectoryContent(m_CacheFolderPath, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeFileSizes, CacheDirContent); + for (size_t Index = 0; Index < CacheDirContent.Files.size(); Index++) + { + if (m_Options.EnableTargetFolderScavenging) + { + IoHash FileHash; + if (IoHash::TryParse(CacheDirContent.Files[Index].filename().string(), FileHash)) + { + if (auto ChunkIt = m_RemoteLookup.ChunkHashToChunkIndex.find(FileHash); + ChunkIt != m_RemoteLookup.ChunkHashToChunkIndex.end()) + { + const uint32_t ChunkIndex = ChunkIt->second; + const uint64_t ChunkSize = m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; + if (ChunkSize == CacheDirContent.FileSizes[Index]) + { + OutCachedChunkHashesFound.insert({FileHash, ChunkIndex}); + m_CacheMappingStats.CacheChunkCount++; + m_CacheMappingStats.CacheChunkByteCount += ChunkSize; + continue; + } + } + else if (auto SequenceIt = m_RemoteLookup.RawHashToSequenceIndex.find(FileHash); + SequenceIt != m_RemoteLookup.RawHashToSequenceIndex.end()) + { + const uint32_t SequenceIndex = SequenceIt->second; + const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; + const uint64_t SequenceSize = m_RemoteContent.RawSizes[PathIndex]; + if (SequenceSize == CacheDirContent.FileSizes[Index]) + { + OutCachedSequenceHashesFound.insert({FileHash, SequenceIndex}); + m_CacheMappingStats.CacheSequenceHashesCount++; + m_CacheMappingStats.CacheSequenceHashesByteCount += SequenceSize; + + const std::filesystem::path CacheFilePath = + GetFinalChunkedSequenceFileName(m_CacheFolderPath, + m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); + ZEN_ASSERT_SLOW(IsFile(CacheFilePath)); + + continue; + } + } + } + } + std::error_code Ec = TryRemoveFile(CacheDirContent.Files[Index]); + if (Ec) + { + ZEN_DEBUG("Failed removing file '{}', reason: ({}) {}", CacheDirContent.Files[Index], Ec.value(), Ec.message()); + } + } + m_CacheMappingStats.CacheScanElapsedWallTimeUs += CacheTimer.GetElapsedTimeUs(); +} + +void +BuildsOperationUpdateFolder::ScanTempBlocksFolder(tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& OutCachedBlocksFound) +{ + ZEN_TRACE_CPU("ScanTempBlocksFolder"); + + Stopwatch CacheTimer; + + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> AllBlockSizes; + AllBlockSizes.reserve(m_BlockDescriptions.size()); + for (uint32_t BlockIndex = 0; BlockIndex < m_BlockDescriptions.size(); BlockIndex++) + { + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + AllBlockSizes.insert({BlockDescription.BlockHash, BlockIndex}); + } + + DirectoryContent BlockDirContent; + GetDirectoryContent(m_TempBlockFolderPath, + DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeFileSizes, + BlockDirContent); + OutCachedBlocksFound.reserve(BlockDirContent.Files.size()); + for (size_t Index = 0; Index < BlockDirContent.Files.size(); Index++) + { + if (m_Options.EnableTargetFolderScavenging) + { + IoHash FileHash; + if (IoHash::TryParse(BlockDirContent.Files[Index].filename().string(), FileHash)) + { + if (auto BlockIt = AllBlockSizes.find(FileHash); BlockIt != AllBlockSizes.end()) + { + const uint32_t BlockIndex = BlockIt->second; + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + uint64_t BlockSize = CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize; + for (uint64_t ChunkSize : BlockDescription.ChunkCompressedLengths) + { + BlockSize += ChunkSize; + } + + if (BlockSize == BlockDirContent.FileSizes[Index]) + { + OutCachedBlocksFound.insert({FileHash, BlockIndex}); + m_CacheMappingStats.CacheBlockCount++; + m_CacheMappingStats.CacheBlocksByteCount += BlockSize; + continue; + } + } + } + } + std::error_code Ec = TryRemoveFile(BlockDirContent.Files[Index]); + if (Ec) + { + ZEN_DEBUG("Failed removing file '{}', reason: ({}) {}", BlockDirContent.Files[Index], Ec.value(), Ec.message()); + } + } + + m_CacheMappingStats.CacheScanElapsedWallTimeUs += CacheTimer.GetElapsedTimeUs(); +} + +void +BuildsOperationUpdateFolder::InitializeSequenceCounters(std::vector<std::atomic<uint32_t>>& OutSequenceCounters, + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& OutSequencesLeftToFind, + const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& CachedChunkHashesFound, + const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& CachedSequenceHashesFound) +{ + if (m_Options.EnableTargetFolderScavenging) + { + // Pick up all whole files we can use from current local state + ZEN_TRACE_CPU("GetLocalSequences"); + + std::vector<uint32_t> MissingSequenceIndexes = ScanTargetFolder(CachedChunkHashesFound, CachedSequenceHashesFound); + + for (uint32_t RemoteSequenceIndex : MissingSequenceIndexes) + { + // We must write the sequence + const uint32_t ChunkCount = m_RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]; + const IoHash& RemoteSequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + OutSequenceCounters[RemoteSequenceIndex] = ChunkCount; + OutSequencesLeftToFind.insert({RemoteSequenceRawHash, RemoteSequenceIndex}); + } + } + else + { + for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < m_RemoteContent.ChunkedContent.SequenceRawHashes.size(); + RemoteSequenceIndex++) + { + OutSequenceCounters[RemoteSequenceIndex] = m_RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]; + } + } +} + +void +BuildsOperationUpdateFolder::MatchScavengedSequencesToRemote(std::span<const ChunkedFolderContent> Contents, + std::span<const ChunkedContentLookup> Lookups, + std::span<const std::filesystem::path> Paths, + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& InOutSequencesLeftToFind, + std::vector<std::atomic<uint32_t>>& InOutSequenceCounters, + std::vector<ScavengedSequenceCopyOperation>& OutCopyOperations, + uint64_t& OutScavengedPathsCount) +{ + for (uint32_t ScavengedContentIndex = 0; ScavengedContentIndex < Contents.size() && !InOutSequencesLeftToFind.empty(); + ScavengedContentIndex++) + { + const std::filesystem::path& ScavengePath = Paths[ScavengedContentIndex]; + if (ScavengePath.empty()) + { + continue; + } + const ChunkedFolderContent& ScavengedLocalContent = Contents[ScavengedContentIndex]; + const ChunkedContentLookup& ScavengedLookup = Lookups[ScavengedContentIndex]; + + for (uint32_t ScavengedSequenceIndex = 0; ScavengedSequenceIndex < ScavengedLocalContent.ChunkedContent.SequenceRawHashes.size(); + ScavengedSequenceIndex++) + { + const IoHash& SequenceRawHash = ScavengedLocalContent.ChunkedContent.SequenceRawHashes[ScavengedSequenceIndex]; + auto It = InOutSequencesLeftToFind.find(SequenceRawHash); + if (It == InOutSequencesLeftToFind.end()) + { + continue; + } + const uint32_t RemoteSequenceIndex = It->second; + const uint64_t RawSize = m_RemoteContent.RawSizes[m_RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]]; + ZEN_ASSERT(RawSize > 0); + + const uint32_t ScavengedPathIndex = ScavengedLookup.SequenceIndexFirstPathIndex[ScavengedSequenceIndex]; + ZEN_ASSERT_SLOW(IsFile((ScavengePath / ScavengedLocalContent.Paths[ScavengedPathIndex]).make_preferred())); + + OutCopyOperations.push_back({.ScavengedContentIndex = ScavengedContentIndex, + .ScavengedPathIndex = ScavengedPathIndex, + .RemoteSequenceIndex = RemoteSequenceIndex, + .RawSize = RawSize}); + + InOutSequencesLeftToFind.erase(SequenceRawHash); + InOutSequenceCounters[RemoteSequenceIndex] = 0; + + m_CacheMappingStats.ScavengedPathsMatchingSequencesCount++; + m_CacheMappingStats.ScavengedPathsMatchingSequencesByteCount += RawSize; + } + OutScavengedPathsCount++; + } +} + +uint64_t +BuildsOperationUpdateFolder::CalculateBytesToWriteAndFlagNeededChunks(std::span<const std::atomic<uint32_t>> SequenceCounters, + const std::vector<bool>& NeedsCopyFromLocalFileFlags, + std::span<std::atomic<bool>> OutNeedsCopyFromSourceFlags) +{ + uint64_t BytesToWrite = 0; + for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < m_RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++) + { + const uint64_t ChunkWriteCount = GetChunkWriteCount(SequenceCounters, RemoteChunkIndex); + if (ChunkWriteCount > 0) + { + BytesToWrite += m_RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] * ChunkWriteCount; + if (!NeedsCopyFromLocalFileFlags[RemoteChunkIndex]) + { + OutNeedsCopyFromSourceFlags[RemoteChunkIndex] = true; + } + } + } + return BytesToWrite; +} + +void +BuildsOperationUpdateFolder::ClassifyCachedAndFetchBlocks(std::span<const ChunkBlockAnalyser::NeededBlock> NeededBlocks, + const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& CachedBlocksFound, + uint64_t& TotalPartWriteCount, + std::vector<uint32_t>& OutCachedChunkBlockIndexes, + std::vector<uint32_t>& OutFetchBlockIndexes) +{ + ZEN_TRACE_CPU("BlockCacheFileExists"); + for (const ChunkBlockAnalyser::NeededBlock& NeededBlock : NeededBlocks) + { + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[NeededBlock.BlockIndex]; + bool UsingCachedBlock = false; + if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end()) + { + TotalPartWriteCount++; + + std::filesystem::path BlockPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); + if (IsFile(BlockPath)) + { + OutCachedChunkBlockIndexes.push_back(NeededBlock.BlockIndex); + UsingCachedBlock = true; + } + } + if (!UsingCachedBlock) + { + OutFetchBlockIndexes.push_back(NeededBlock.BlockIndex); + } + } +} + +std::vector<uint32_t> +BuildsOperationUpdateFolder::DetermineNeededLooseChunkIndexes(std::span<const std::atomic<uint32_t>> SequenceCounters, + const std::vector<bool>& NeedsCopyFromLocalFileFlags, + std::span<std::atomic<bool>> NeedsCopyFromSourceFlags) +{ + std::vector<uint32_t> NeededLooseChunkIndexes; + NeededLooseChunkIndexes.reserve(m_LooseChunkHashes.size()); + for (uint32_t LooseChunkIndex = 0; LooseChunkIndex < m_LooseChunkHashes.size(); LooseChunkIndex++) + { + const IoHash& ChunkHash = m_LooseChunkHashes[LooseChunkIndex]; + auto RemoteChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); + ZEN_ASSERT(RemoteChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end()); + const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second; + + if (NeedsCopyFromLocalFileFlags[RemoteChunkIndex]) + { + if (m_Options.IsVerbose) + { + ZEN_INFO("Skipping chunk {} due to cache reuse", m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]); + } + continue; + } + + bool NeedsCopy = true; + if (NeedsCopyFromSourceFlags[RemoteChunkIndex].compare_exchange_strong(NeedsCopy, false)) + { + const uint64_t WriteCount = GetChunkWriteCount(SequenceCounters, RemoteChunkIndex); + if (WriteCount == 0) + { + if (m_Options.IsVerbose) + { + ZEN_INFO("Skipping chunk {} due to cache reuse", m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]); + } + } + else + { + NeededLooseChunkIndexes.push_back(LooseChunkIndex); + } + } + } + return NeededLooseChunkIndexes; +} + +BuildsOperationUpdateFolder::BlobsExistsResult +BuildsOperationUpdateFolder::QueryBlobCacheExists(std::span<const uint32_t> NeededLooseChunkIndexes, + std::span<const uint32_t> FetchBlockIndexes) +{ + BlobsExistsResult Result; + if (!m_Storage.CacheStorage) + { + return Result; + } + + ZEN_TRACE_CPU("BlobCacheExistCheck"); + Stopwatch Timer; + + std::vector<IoHash> BlobHashes; + BlobHashes.reserve(NeededLooseChunkIndexes.size() + FetchBlockIndexes.size()); + + for (const uint32_t LooseChunkIndex : NeededLooseChunkIndexes) + { + BlobHashes.push_back(m_LooseChunkHashes[LooseChunkIndex]); + } + + for (uint32_t BlockIndex : FetchBlockIndexes) + { + BlobHashes.push_back(m_BlockDescriptions[BlockIndex].BlockHash); + } + + const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult = m_Storage.CacheStorage->BlobsExists(m_BuildId, BlobHashes); + + if (CacheExistsResult.size() == BlobHashes.size()) + { + Result.ExistingBlobs.reserve(CacheExistsResult.size()); + for (size_t BlobIndex = 0; BlobIndex < BlobHashes.size(); BlobIndex++) + { + if (CacheExistsResult[BlobIndex].HasBody) + { + Result.ExistingBlobs.insert(BlobHashes[BlobIndex]); + } + } + } + Result.ElapsedTimeMs = Timer.GetElapsedTimeMs(); + if (!Result.ExistingBlobs.empty() && !m_Options.IsQuiet) + { + ZEN_INFO("Remote cache : Found {} out of {} needed blobs in {}", + Result.ExistingBlobs.size(), + BlobHashes.size(), + NiceTimeSpanMs(Result.ElapsedTimeMs)); + } + return Result; +} + +std::vector<ChunkBlockAnalyser::EPartialBlockDownloadMode> +BuildsOperationUpdateFolder::DeterminePartialDownloadModes(const BlobsExistsResult& ExistsResult) +{ + std::vector<ChunkBlockAnalyser::EPartialBlockDownloadMode> Modes; + + if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Off) + { + Modes.resize(m_BlockDescriptions.size(), ChunkBlockAnalyser::EPartialBlockDownloadMode::Off); + return Modes; + } + + const bool MultiRangeCache = m_Storage.CacheHost.Caps.MaxRangeCountPerRequest > 1; + const bool MultiRangeBuild = m_Storage.BuildStorageHost.Caps.MaxRangeCountPerRequest > 1; + ChunkBlockAnalyser::EPartialBlockDownloadMode CachePartialDownloadMode = + MultiRangeCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed + : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange; + ChunkBlockAnalyser::EPartialBlockDownloadMode CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::Off; + + switch (m_Options.PartialBlockRequestMode) + { + case EPartialBlockRequestMode::Off: + break; + case EPartialBlockRequestMode::ZenCacheOnly: + CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::Off; + break; + case EPartialBlockRequestMode::Mixed: + CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange; + break; + case EPartialBlockRequestMode::All: + CloudPartialDownloadMode = MultiRangeBuild ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange + : ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange; + break; + default: + ZEN_ASSERT(false); + break; + } + + Modes.reserve(m_BlockDescriptions.size()); + for (uint32_t BlockIndex = 0; BlockIndex < m_BlockDescriptions.size(); BlockIndex++) + { + const bool BlockExistInCache = ExistsResult.ExistingBlobs.contains(m_BlockDescriptions[BlockIndex].BlockHash); + Modes.push_back(BlockExistInCache ? CachePartialDownloadMode : CloudPartialDownloadMode); + } + return Modes; +} + +std::vector<BuildsOperationUpdateFolder::LooseChunkHashWorkData> +BuildsOperationUpdateFolder::BuildLooseChunkHashWorks(std::span<const uint32_t> NeededLooseChunkIndexes, + std::span<const std::atomic<uint32_t>> SequenceCounters) +{ + std::vector<LooseChunkHashWorkData> LooseChunkHashWorks; + LooseChunkHashWorks.reserve(NeededLooseChunkIndexes.size()); + for (uint32_t LooseChunkIndex : NeededLooseChunkIndexes) + { + const IoHash& ChunkHash = m_LooseChunkHashes[LooseChunkIndex]; + auto RemoteChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); + ZEN_ASSERT(RemoteChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end()); + const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second; + + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = + GetRemainingChunkTargets(SequenceCounters, RemoteChunkIndex); + + ZEN_ASSERT(!ChunkTargetPtrs.empty()); + LooseChunkHashWorks.push_back(LooseChunkHashWorkData{.ChunkTargetPtrs = ChunkTargetPtrs, .RemoteChunkIndex = RemoteChunkIndex}); + } + return LooseChunkHashWorks; +} + +void +BuildsOperationUpdateFolder::VerifyWriteChunksComplete(std::span<const std::atomic<uint32_t>> SequenceCounters, + uint64_t BytesToWrite, + uint64_t BytesToValidate) +{ + uint32_t RawSequencesMissingWriteCount = 0; + for (uint32_t SequenceIndex = 0; SequenceIndex < SequenceCounters.size(); SequenceIndex++) + { + const auto& Counter = SequenceCounters[SequenceIndex]; + if (Counter.load() != 0) + { + RawSequencesMissingWriteCount++; + const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; + const std::filesystem::path& IncompletePath = m_RemoteContent.Paths[PathIndex]; + ZEN_ASSERT(!IncompletePath.empty()); + const uint32_t ExpectedSequenceCount = m_RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]; + if (!m_Options.IsQuiet) + { + ZEN_INFO("{}: Max count {}, Current count {}", IncompletePath, ExpectedSequenceCount, Counter.load()); + } + ZEN_ASSERT(Counter.load() <= ExpectedSequenceCount); + } + } + ZEN_ASSERT(RawSequencesMissingWriteCount == 0); + ZEN_ASSERT(m_WrittenChunkByteCount == BytesToWrite); + ZEN_ASSERT(m_ValidatedChunkByteCount == BytesToValidate); +} + +std::vector<BuildsOperationUpdateFolder::FinalizeTarget> +BuildsOperationUpdateFolder::BuildSortedFinalizeTargets() +{ + std::vector<FinalizeTarget> Targets; + Targets.reserve(m_RemoteContent.Paths.size()); + for (uint32_t RemotePathIndex = 0; RemotePathIndex < m_RemoteContent.Paths.size(); RemotePathIndex++) + { + Targets.push_back(FinalizeTarget{.RawHash = m_RemoteContent.RawHashes[RemotePathIndex], .RemotePathIndex = RemotePathIndex}); + } + std::sort(Targets.begin(), Targets.end(), [](const FinalizeTarget& Lhs, const FinalizeTarget& Rhs) { + return std::tie(Lhs.RawHash, Lhs.RemotePathIndex) < std::tie(Rhs.RawHash, Rhs.RemotePathIndex); + }); + return Targets; +} + +void +BuildsOperationUpdateFolder::ScanScavengeSources(std::span<const ScavengeSource> Sources, + std::vector<ChunkedFolderContent>& OutContents, + std::vector<ChunkedContentLookup>& OutLookups, + std::vector<std::filesystem::path>& OutPaths) +{ + ZEN_TRACE_CPU("ScanScavengeSources"); + + const size_t ScavengePathCount = Sources.size(); + OutContents.resize(ScavengePathCount); + OutLookups.resize(ScavengePathCount); + OutPaths.resize(ScavengePathCount); + + std::unique_ptr<ProgressBase::ProgressBar> ProgressBar = m_Progress.CreateProgressBar("Scavenging"); + + ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + std::atomic<uint64_t> PathsFound(0); + std::atomic<uint64_t> ChunksFound(0); + std::atomic<uint64_t> PathsScavenged(0); + + for (size_t ScavengeIndex = 0; ScavengeIndex < ScavengePathCount; ScavengeIndex++) + { + Work.ScheduleWork(m_IOWorkerPool, + [this, &Sources, &OutContents, &OutPaths, &OutLookups, &PathsFound, &ChunksFound, &PathsScavenged, ScavengeIndex]( + std::atomic<bool>&) { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_FindScavengeContent"); + + const ScavengeSource& Source = Sources[ScavengeIndex]; + ChunkedFolderContent& ScavengedLocalContent = OutContents[ScavengeIndex]; + ChunkedContentLookup& ScavengedLookup = OutLookups[ScavengeIndex]; + + if (FindScavengeContent(Source, ScavengedLocalContent, ScavengedLookup)) + { + OutPaths[ScavengeIndex] = Source.Path; + PathsFound += ScavengedLocalContent.Paths.size(); + ChunksFound += ScavengedLocalContent.ChunkedContent.ChunkHashes.size(); + } + else + { + OutPaths[ScavengeIndex].clear(); + } + PathsScavenged++; + } + }); + } + { + ZEN_TRACE_CPU("ScavengeScan_Wait"); + + Work.Wait(m_Progress.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); + std::string Details = fmt::format("{}/{} scanned. {} paths and {} chunks found for scavenging", + PathsScavenged.load(), + ScavengePathCount, + PathsFound.load(), + ChunksFound.load()); + ProgressBar->UpdateState({.Task = "Scavenging ", + .Details = Details, + .TotalCount = ScavengePathCount, + .RemainingCount = ScavengePathCount - PathsScavenged.load(), + .Status = ProgressBase::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, + false); + }); + } + + ProgressBar->Finish(); +} + +BuildsOperationUpdateFolder::LocalPathCategorization +BuildsOperationUpdateFolder::CategorizeLocalPaths(const tsl::robin_map<std::string, uint32_t>& RemotePathToRemoteIndex) +{ + ZEN_TRACE_CPU("PrepareTarget"); + + LocalPathCategorization Result; + tsl::robin_set<IoHash, IoHash::Hasher> CachedRemoteSequences; + + Result.RemotePathIndexToLocalPathIndex.reserve(m_RemoteContent.Paths.size()); + + for (uint32_t LocalPathIndex = 0; LocalPathIndex < m_LocalContent.Paths.size(); LocalPathIndex++) + { + if (m_AbortFlag) + { + break; + } + const IoHash& RawHash = m_LocalContent.RawHashes[LocalPathIndex]; + const std::filesystem::path& LocalPath = m_LocalContent.Paths[LocalPathIndex]; + + ZEN_ASSERT_SLOW(IsFile((m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred())); + + if (m_Options.EnableTargetFolderScavenging) + { + if (!m_Options.WipeTargetFolder) + { + // Check if it is already in the correct place + if (auto RemotePathIt = RemotePathToRemoteIndex.find(LocalPath.generic_string()); + RemotePathIt != RemotePathToRemoteIndex.end()) + { + const uint32_t RemotePathIndex = RemotePathIt->second; + if (m_RemoteContent.RawHashes[RemotePathIndex] == RawHash) + { + // It is already in it's correct place + Result.RemotePathIndexToLocalPathIndex[RemotePathIndex] = LocalPathIndex; + Result.SequenceHashToLocalPathIndex.insert({RawHash, LocalPathIndex}); + Result.MatchCount++; + continue; + } + else + { + Result.HashMismatchCount++; + } + } + else + { + Result.PathMismatchCount++; + } + } + + // Do we need it? + if (m_RemoteLookup.RawHashToSequenceIndex.contains(RawHash)) + { + if (!CachedRemoteSequences.contains(RawHash)) + { + // We need it, make sure we move it to the cache + Result.FilesToCache.push_back(LocalPathIndex); + CachedRemoteSequences.insert(RawHash); + continue; + } + else + { + Result.SkippedCount++; + } + } + } + + if (!m_Options.WipeTargetFolder) + { + // Explicitly delete the unneeded local file + Result.RemoveLocalPathIndexes.push_back(LocalPathIndex); + Result.DeleteCount++; + } + } + + return Result; +} + +void +BuildsOperationUpdateFolder::ScheduleLocalFileCaching(std::span<const uint32_t> FilesToCache, + std::atomic<uint64_t>& OutCachedCount, + std::atomic<uint64_t>& OutCachedByteCount) +{ + ZEN_TRACE_CPU("CopyToCache"); + + std::unique_ptr<ProgressBase::ProgressBar> ProgressBar = m_Progress.CreateProgressBar("Cache Local Data"); + ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + for (uint32_t LocalPathIndex : FilesToCache) + { + if (m_AbortFlag) + { + break; + } + Work.ScheduleWork(m_IOWorkerPool, [this, &OutCachedCount, &OutCachedByteCount, LocalPathIndex](std::atomic<bool>&) { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_CopyToCache"); + + const IoHash& RawHash = m_LocalContent.RawHashes[LocalPathIndex]; + const std::filesystem::path& LocalPath = m_LocalContent.Paths[LocalPathIndex]; + const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RawHash); + ZEN_ASSERT_SLOW(!IsFileWithRetry(CacheFilePath)); + const std::filesystem::path LocalFilePath = (m_Path / LocalPath).make_preferred(); + + std::error_code Ec = RenameFileWithRetry(LocalFilePath, CacheFilePath); + if (Ec) + { + ZEN_WARN("Failed to move file from '{}' to '{}', reason: ({}) {}, retrying...", + LocalFilePath, + CacheFilePath, + Ec.value(), + Ec.message()); + Ec = RenameFileWithRetry(LocalFilePath, CacheFilePath); + if (Ec) + { + throw std::system_error(std::error_code(Ec.value(), std::system_category()), + fmt::format("Failed to file from '{}' to '{}', reason: ({}) {}", + LocalFilePath, + CacheFilePath, + Ec.value(), + Ec.message())); + } + } + + OutCachedCount++; + OutCachedByteCount += m_LocalContent.RawSizes[LocalPathIndex]; + } + }); + } + + { + ZEN_TRACE_CPU("CopyToCache_Wait"); + + Work.Wait(m_Progress.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); + const uint64_t WorkTotal = FilesToCache.size(); + const uint64_t WorkComplete = OutCachedCount.load(); + std::string Details = fmt::format("{}/{} ({}) files", WorkComplete, WorkTotal, NiceBytes(OutCachedByteCount)); + ProgressBar->UpdateState({.Task = "Caching local ", + .Details = Details, + .TotalCount = gsl::narrow<uint64_t>(WorkTotal), + .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete), + .Status = ProgressBase::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, + false); + }); + } + + ProgressBar->Finish(); +} + +void +BuildsOperationUpdateFolder::ScheduleScavengedSequenceWrites(WriteChunksContext& Context, + std::span<const ScavengedSequenceCopyOperation> CopyOperations, + const std::vector<ChunkedFolderContent>& ScavengedContents, + const std::vector<std::filesystem::path>& ScavengedPaths) +{ + for (uint32_t ScavengeOpIndex = 0; ScavengeOpIndex < CopyOperations.size(); ScavengeOpIndex++) + { + if (m_AbortFlag) + { + break; + } + Context.Work.ScheduleWork( + m_IOWorkerPool, + [this, &Context, &CopyOperations, &ScavengedContents, &ScavengedPaths, ScavengeOpIndex](std::atomic<bool>&) { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_WriteScavenged"); + + Context.FilteredWrittenBytesPerSecond.Start(); + + const ScavengedSequenceCopyOperation& ScavengeOp = CopyOperations[ScavengeOpIndex]; + const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengeOp.ScavengedContentIndex]; + const std::filesystem::path& ScavengeRootPath = ScavengedPaths[ScavengeOp.ScavengedContentIndex]; + + WriteScavengedSequenceToCache(ScavengeRootPath, ScavengedContent, ScavengeOp); + + if (Context.WritePartsComplete.fetch_add(1) + 1 == Context.TotalPartWriteCount) + { + Context.FilteredWrittenBytesPerSecond.Stop(); + } + } + }); + } +} + +void +BuildsOperationUpdateFolder::ScheduleLooseChunkWrites(WriteChunksContext& Context, std::vector<LooseChunkHashWorkData>& LooseChunkHashWorks) +{ + for (uint32_t LooseChunkHashWorkIndex = 0; LooseChunkHashWorkIndex < LooseChunkHashWorks.size(); LooseChunkHashWorkIndex++) + { + if (m_AbortFlag) + { + break; + } + + Context.Work.ScheduleWork( + m_IOWorkerPool, + [this, &Context, &LooseChunkHashWorks, LooseChunkHashWorkIndex](std::atomic<bool>&) { + ZEN_TRACE_CPU("Async_ReadPreDownloadedChunk"); + if (!m_AbortFlag) + { + LooseChunkHashWorkData& LooseChunkHashWork = LooseChunkHashWorks[LooseChunkHashWorkIndex]; + const uint32_t RemoteChunkIndex = LooseChunkHashWork.RemoteChunkIndex; + WriteLooseChunk(RemoteChunkIndex, + Context.ExistsResult, + Context.SequenceIndexChunksLeftToWriteCounters, + Context.WritePartsComplete, + std::move(LooseChunkHashWork.ChunkTargetPtrs), + Context.WriteCache, + Context.Work, + Context.TotalRequestCount, + Context.TotalPartWriteCount, + Context.FilteredDownloadedBytesPerSecond, + Context.FilteredWrittenBytesPerSecond); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + } +} + +void +BuildsOperationUpdateFolder::ScheduleLocalChunkCopies(WriteChunksContext& Context, + std::span<const CopyChunkData> CopyChunkDatas, + CloneQueryInterface* CloneQuery, + const std::vector<ChunkedFolderContent>& ScavengedContents, + const std::vector<ChunkedContentLookup>& ScavengedLookups, + const std::vector<std::filesystem::path>& ScavengedPaths) +{ + for (size_t CopyDataIndex = 0; CopyDataIndex < CopyChunkDatas.size(); CopyDataIndex++) + { + if (m_AbortFlag) + { + break; + } + + Context.Work.ScheduleWork( + m_IOWorkerPool, + [this, &Context, CloneQuery, &CopyChunkDatas, &ScavengedContents, &ScavengedLookups, &ScavengedPaths, CopyDataIndex]( + std::atomic<bool>&) { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_CopyLocal"); + + Context.FilteredWrittenBytesPerSecond.Start(); + const CopyChunkData& CopyData = CopyChunkDatas[CopyDataIndex]; + + std::vector<uint32_t> WrittenSequenceIndexes = WriteLocalChunkToCache(CloneQuery, + CopyData, + ScavengedContents, + ScavengedLookups, + ScavengedPaths, + Context.WriteCache); + bool WritePartsDone = Context.WritePartsComplete.fetch_add(1) + 1 == Context.TotalPartWriteCount; + if (!m_AbortFlag) + { + if (WritePartsDone) + { + Context.FilteredWrittenBytesPerSecond.Stop(); + } + + // Write tracking, updating this must be done without any files open + std::vector<uint32_t> CompletedChunkSequences; + for (uint32_t RemoteSequenceIndex : WrittenSequenceIndexes) + { + if (CompleteSequenceChunk(RemoteSequenceIndex, Context.SequenceIndexChunksLeftToWriteCounters)) + { + CompletedChunkSequences.push_back(RemoteSequenceIndex); + } + } + Context.WriteCache.Close(CompletedChunkSequences); + VerifyAndCompleteChunkSequencesAsync(CompletedChunkSequences, Context.Work); + } + } + }); + } +} + +void +BuildsOperationUpdateFolder::ScheduleCachedBlockWrites(WriteChunksContext& Context, std::span<const uint32_t> CachedBlockIndexes) +{ + for (uint32_t BlockIndex : CachedBlockIndexes) + { + if (m_AbortFlag) + { + break; + } + + Context.Work.ScheduleWork(m_IOWorkerPool, [this, &Context, BlockIndex](std::atomic<bool>&) { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_WriteCachedBlock"); + + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + Context.FilteredWrittenBytesPerSecond.Start(); + + std::filesystem::path BlockChunkPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); + IoBuffer BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); + if (!BlockBuffer) + { + throw std::runtime_error(fmt::format("Can not read block {} at {}", BlockDescription.BlockHash, BlockChunkPath)); + } + + if (!m_AbortFlag) + { + if (!WriteChunksBlockToCache(BlockDescription, + Context.SequenceIndexChunksLeftToWriteCounters, + Context.Work, + CompositeBuffer(std::move(BlockBuffer)), + Context.RemoteChunkIndexNeedsCopyFromSourceFlags, + Context.WriteCache)) + { + std::error_code DummyEc; + RemoveFile(BlockChunkPath, DummyEc); + throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash)); + } + + std::error_code Ec = TryRemoveFile(BlockChunkPath); + if (Ec) + { + ZEN_DEBUG("Failed removing file '{}', reason: ({}) {}", BlockChunkPath, Ec.value(), Ec.message()); + } + + if (Context.WritePartsComplete.fetch_add(1) + 1 == Context.TotalPartWriteCount) + { + Context.FilteredWrittenBytesPerSecond.Stop(); + } + } + } + }); + } +} + +void +BuildsOperationUpdateFolder::SchedulePartialBlockDownloads(WriteChunksContext& Context, + const ChunkBlockAnalyser::BlockResult& PartialBlocks) +{ + for (size_t BlockRangeIndex = 0; BlockRangeIndex < PartialBlocks.BlockRanges.size();) + { + if (m_AbortFlag) + { + break; + } + + size_t RangeCount = 1; + size_t RangesLeft = PartialBlocks.BlockRanges.size() - BlockRangeIndex; + const ChunkBlockAnalyser::BlockRangeDescriptor& CurrentBlockRange = PartialBlocks.BlockRanges[BlockRangeIndex]; + while (RangeCount < RangesLeft && + CurrentBlockRange.BlockIndex == PartialBlocks.BlockRanges[BlockRangeIndex + RangeCount].BlockIndex) + { + RangeCount++; + } + + Context.Work.ScheduleWork( + m_NetworkPool, + [this, &Context, &PartialBlocks, BlockRangeStartIndex = BlockRangeIndex, RangeCount = RangeCount](std::atomic<bool>&) { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_GetPartialBlockRanges"); + + Context.FilteredDownloadedBytesPerSecond.Start(); + + DownloadPartialBlock( + PartialBlocks.BlockRanges, + BlockRangeStartIndex, + RangeCount, + Context.ExistsResult, + Context.TotalRequestCount, + Context.FilteredDownloadedBytesPerSecond, + [this, &Context, &PartialBlocks](IoBuffer&& InMemoryBuffer, + const std::filesystem::path& OnDiskPath, + size_t BlockRangeStartIndex, + std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengths) { + if (!m_AbortFlag) + { + Context.Work.ScheduleWork( + m_IOWorkerPool, + [this, + &Context, + &PartialBlocks, + BlockRangeStartIndex, + BlockChunkPath = std::filesystem::path(OnDiskPath), + BlockPartialBuffer = std::move(InMemoryBuffer), + OffsetAndLengths = + std::vector<std::pair<uint64_t, uint64_t>>(OffsetAndLengths.begin(), OffsetAndLengths.end())]( + std::atomic<bool>&) mutable { + if (!m_AbortFlag) + { + WritePartialBlockToCache(Context, + BlockRangeStartIndex, + std::move(BlockPartialBuffer), + BlockChunkPath, + OffsetAndLengths, + PartialBlocks); + } + }, + OnDiskPath.empty() ? WorkerThreadPool::EMode::DisableBacklog : WorkerThreadPool::EMode::EnableBacklog); + } + }); + } + }); + BlockRangeIndex += RangeCount; + } +} + +void +BuildsOperationUpdateFolder::WritePartialBlockToCache(WriteChunksContext& Context, + size_t BlockRangeStartIndex, + IoBuffer BlockPartialBuffer, + const std::filesystem::path& BlockChunkPath, + std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengths, + const ChunkBlockAnalyser::BlockResult& PartialBlocks) +{ + ZEN_TRACE_CPU("Async_WritePartialBlock"); + + const uint32_t BlockIndex = PartialBlocks.BlockRanges[BlockRangeStartIndex].BlockIndex; + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + + if (BlockChunkPath.empty()) + { + ZEN_ASSERT(BlockPartialBuffer); + } + else + { + ZEN_ASSERT(!BlockPartialBuffer); + BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); + if (!BlockPartialBuffer) + { + throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}", BlockDescription.BlockHash, BlockChunkPath)); + } + } + + Context.FilteredWrittenBytesPerSecond.Start(); + + const size_t RangeCount = OffsetAndLengths.size(); + + for (size_t PartialRangeIndex = 0; PartialRangeIndex < RangeCount; PartialRangeIndex++) + { + const std::pair<uint64_t, uint64_t>& OffsetAndLength = OffsetAndLengths[PartialRangeIndex]; + IoBuffer BlockRangeBuffer(BlockPartialBuffer, OffsetAndLength.first, OffsetAndLength.second); + + const ChunkBlockAnalyser::BlockRangeDescriptor& RangeDescriptor = + PartialBlocks.BlockRanges[BlockRangeStartIndex + PartialRangeIndex]; + + if (!WritePartialBlockChunksToCache(BlockDescription, + Context.SequenceIndexChunksLeftToWriteCounters, + Context.Work, + CompositeBuffer(std::move(BlockRangeBuffer)), + RangeDescriptor.ChunkBlockIndexStart, + RangeDescriptor.ChunkBlockIndexStart + RangeDescriptor.ChunkBlockIndexCount - 1, + Context.RemoteChunkIndexNeedsCopyFromSourceFlags, + Context.WriteCache)) + { + std::error_code DummyEc; + RemoveFile(BlockChunkPath, DummyEc); + throw std::runtime_error(fmt::format("Partial block {} is malformed", BlockDescription.BlockHash)); + } + + if (Context.WritePartsComplete.fetch_add(1) + 1 == Context.TotalPartWriteCount) + { + Context.FilteredWrittenBytesPerSecond.Stop(); + } + } + std::error_code Ec = TryRemoveFile(BlockChunkPath); + if (Ec) + { + ZEN_DEBUG("Failed removing file '{}', reason: ({}) {}", BlockChunkPath, Ec.value(), Ec.message()); + } +} + +void +BuildsOperationUpdateFolder::ScheduleFullBlockDownloads(WriteChunksContext& Context, std::span<const uint32_t> FullBlockIndexes) +{ + for (uint32_t BlockIndex : FullBlockIndexes) + { + if (m_AbortFlag) + { + break; + } + + Context.Work.ScheduleWork(m_NetworkPool, [this, &Context, BlockIndex](std::atomic<bool>&) { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_GetFullBlock"); + + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + + Context.FilteredDownloadedBytesPerSecond.Start(); + + IoBuffer BlockBuffer; + const bool ExistsInCache = + m_Storage.CacheStorage && Context.ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash); + if (ExistsInCache) + { + BlockBuffer = m_Storage.CacheStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash); + } + if (!BlockBuffer) + { + try + { + BlockBuffer = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash); + } + catch (const std::exception&) + { + // Silence http errors due to abort + if (!m_AbortFlag) + { + throw; + } + } + } + if (!m_AbortFlag) + { + if (!BlockBuffer) + { + throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); + } + + uint64_t BlockSize = BlockBuffer.GetSize(); + m_DownloadStats.DownloadedBlockCount++; + m_DownloadStats.DownloadedBlockByteCount += BlockSize; + if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == Context.TotalRequestCount) + { + Context.FilteredDownloadedBytesPerSecond.Stop(); + } + + const bool PutInCache = !ExistsInCache && m_Storage.CacheStorage && m_Options.PopulateCache; + + std::filesystem::path BlockChunkPath = + TryMoveDownloadedChunk(BlockBuffer, + m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(), + /* ForceDiskBased */ PutInCache || (BlockSize > m_Options.MaximumInMemoryPayloadSize)); + + if (PutInCache) + { + ZEN_ASSERT(!BlockChunkPath.empty()); + IoBuffer CacheBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); + if (CacheBuffer) + { + m_Storage.CacheStorage->PutBuildBlob(m_BuildId, + BlockDescription.BlockHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(SharedBuffer(CacheBuffer))); + } + } + + if (!m_AbortFlag) + { + Context.Work.ScheduleWork( + m_IOWorkerPool, + [this, &Context, BlockIndex, BlockChunkPath, BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable { + if (!m_AbortFlag) + { + WriteFullBlockToCache(Context, BlockIndex, std::move(BlockBuffer), BlockChunkPath); + } + }, + BlockChunkPath.empty() ? WorkerThreadPool::EMode::DisableBacklog : WorkerThreadPool::EMode::EnableBacklog); + } + } + } + }); + } +} + +void +BuildsOperationUpdateFolder::WriteFullBlockToCache(WriteChunksContext& Context, + uint32_t BlockIndex, + IoBuffer BlockBuffer, + const std::filesystem::path& BlockChunkPath) +{ + ZEN_TRACE_CPU("Async_WriteFullBlock"); + + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + + if (BlockChunkPath.empty()) + { + ZEN_ASSERT(BlockBuffer); + } + else + { + ZEN_ASSERT(!BlockBuffer); + BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); + if (!BlockBuffer) + { + throw std::runtime_error(fmt::format("Could not open dowloaded block {} from {}", BlockDescription.BlockHash, BlockChunkPath)); + } + } + + Context.FilteredWrittenBytesPerSecond.Start(); + if (!WriteChunksBlockToCache(BlockDescription, + Context.SequenceIndexChunksLeftToWriteCounters, + Context.Work, + CompositeBuffer(std::move(BlockBuffer)), + Context.RemoteChunkIndexNeedsCopyFromSourceFlags, + Context.WriteCache)) + { + std::error_code DummyEc; + RemoveFile(BlockChunkPath, DummyEc); + throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash)); + } + + if (!BlockChunkPath.empty()) + { + std::error_code Ec = TryRemoveFile(BlockChunkPath); + if (Ec) + { + ZEN_DEBUG("Failed removing file '{}', reason: ({}) {}", BlockChunkPath, Ec.value(), Ec.message()); + } + } + + if (Context.WritePartsComplete.fetch_add(1) + 1 == Context.TotalPartWriteCount) + { + Context.FilteredWrittenBytesPerSecond.Stop(); + } +} + +void +BuildsOperationUpdateFolder::ScheduleLocalFileRemovals(ParallelWork& Work, + std::span<const uint32_t> RemoveLocalPathIndexes, + std::atomic<uint64_t>& DeletedCount) +{ + for (uint32_t LocalPathIndex : RemoveLocalPathIndexes) + { + if (m_AbortFlag) + { + break; + } + Work.ScheduleWork(m_IOWorkerPool, [this, &DeletedCount, LocalPathIndex](std::atomic<bool>&) { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_RemoveFile"); + + const std::filesystem::path LocalFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred(); + SetFileReadOnlyWithRetry(LocalFilePath, false); + RemoveFileWithRetry(LocalFilePath); + DeletedCount++; + } + }); + } +} + +void +BuildsOperationUpdateFolder::ScheduleTargetFinalization( + ParallelWork& Work, + std::span<const FinalizeTarget> Targets, + const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& SequenceHashToLocalPathIndex, + const tsl::robin_map<uint32_t, uint32_t>& RemotePathIndexToLocalPathIndex, + FolderContent& OutLocalFolderState, + std::atomic<uint64_t>& TargetsComplete) +{ + size_t TargetOffset = 0; + while (TargetOffset < Targets.size()) + { + if (m_AbortFlag) + { + break; + } + + size_t TargetCount = 1; + while ((TargetOffset + TargetCount) < Targets.size() && + (Targets[TargetOffset + TargetCount].RawHash == Targets[TargetOffset].RawHash)) + { + TargetCount++; + } + + Work.ScheduleWork(m_IOWorkerPool, + [this, + &SequenceHashToLocalPathIndex, + Targets, + &RemotePathIndexToLocalPathIndex, + &OutLocalFolderState, + BaseTargetOffset = TargetOffset, + TargetCount, + &TargetsComplete](std::atomic<bool>&) { + if (!m_AbortFlag) + { + FinalizeTargetGroup(BaseTargetOffset, + TargetCount, + Targets, + SequenceHashToLocalPathIndex, + RemotePathIndexToLocalPathIndex, + OutLocalFolderState, + TargetsComplete); + } + }); + + TargetOffset += TargetCount; + } +} + +void +BuildsOperationUpdateFolder::FinalizeTargetGroup(size_t BaseOffset, + size_t Count, + std::span<const FinalizeTarget> Targets, + const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& SequenceHashToLocalPathIndex, + const tsl::robin_map<uint32_t, uint32_t>& RemotePathIndexToLocalPathIndex, + FolderContent& OutLocalFolderState, + std::atomic<uint64_t>& TargetsComplete) +{ + ZEN_TRACE_CPU("Async_FinalizeChunkSequence"); + + size_t TargetOffset = BaseOffset; + const IoHash& RawHash = Targets[TargetOffset].RawHash; + + if (RawHash == IoHash::Zero) + { + ZEN_TRACE_CPU("CreateEmptyFiles"); + while (TargetOffset < (BaseOffset + Count)) + { + const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex; + ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash); + const std::filesystem::path& TargetPath = m_RemoteContent.Paths[RemotePathIndex]; + std::filesystem::path TargetFilePath = (m_Path / TargetPath).make_preferred(); + auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(RemotePathIndex); + if (InPlaceIt == RemotePathIndexToLocalPathIndex.end() || InPlaceIt->second == 0) + { + if (IsFileWithRetry(TargetFilePath)) + { + SetFileReadOnlyWithRetry(TargetFilePath, false); + } + else + { + CreateDirectories(TargetFilePath.parent_path()); + } + BasicFile OutputFile; + OutputFile.Open(TargetFilePath, BasicFile::Mode::kTruncate); + } + OutLocalFolderState.Paths[RemotePathIndex] = TargetPath; + OutLocalFolderState.RawSizes[RemotePathIndex] = m_RemoteContent.RawSizes[RemotePathIndex]; + + OutLocalFolderState.Attributes[RemotePathIndex] = + m_RemoteContent.Attributes.empty() + ? GetNativeFileAttributes(TargetFilePath) + : SetNativeFileAttributes(TargetFilePath, m_RemoteContent.Platform, m_RemoteContent.Attributes[RemotePathIndex]); + OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath); + + TargetOffset++; + TargetsComplete++; + } + } + else + { + ZEN_TRACE_CPU("FinalizeFile"); + ZEN_ASSERT(m_RemoteLookup.RawHashToSequenceIndex.contains(RawHash)); + const uint32_t FirstRemotePathIndex = Targets[TargetOffset].RemotePathIndex; + const std::filesystem::path& FirstTargetPath = m_RemoteContent.Paths[FirstRemotePathIndex]; + std::filesystem::path FirstTargetFilePath = (m_Path / FirstTargetPath).make_preferred(); + + if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(FirstRemotePathIndex); InPlaceIt != RemotePathIndexToLocalPathIndex.end()) + { + ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath)); + } + else + { + if (IsFileWithRetry(FirstTargetFilePath)) + { + SetFileReadOnlyWithRetry(FirstTargetFilePath, false); + } + else + { + CreateDirectories(FirstTargetFilePath.parent_path()); + } + + if (auto InplaceIt = SequenceHashToLocalPathIndex.find(RawHash); InplaceIt != SequenceHashToLocalPathIndex.end()) + { + ZEN_TRACE_CPU("Copy"); + const uint32_t LocalPathIndex = InplaceIt->second; + const std::filesystem::path& SourcePath = m_LocalContent.Paths[LocalPathIndex]; + std::filesystem::path SourceFilePath = (m_Path / SourcePath).make_preferred(); + ZEN_ASSERT_SLOW(IsFileWithRetry(SourceFilePath)); + + ZEN_DEBUG("Copying from '{}' -> '{}'", SourceFilePath, FirstTargetFilePath); + const uint64_t RawSize = m_LocalContent.RawSizes[LocalPathIndex]; + FastCopyFile(m_Options.AllowFileClone, + m_Options.UseSparseFiles, + SourceFilePath, + FirstTargetFilePath, + RawSize, + m_DiskStats.WriteCount, + m_DiskStats.WriteByteCount, + m_DiskStats.CloneCount, + m_DiskStats.CloneByteCount); + + m_RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++; + } + else + { + ZEN_TRACE_CPU("Rename"); + const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RawHash); + ZEN_ASSERT_SLOW(IsFileWithRetry(CacheFilePath)); + + std::error_code Ec = RenameFileWithRetry(CacheFilePath, FirstTargetFilePath); + if (Ec) + { + ZEN_WARN("Failed to move file from '{}' to '{}', reason: ({}) {}, retrying...", + CacheFilePath, + FirstTargetFilePath, + Ec.value(), + Ec.message()); + Ec = RenameFileWithRetry(CacheFilePath, FirstTargetFilePath); + if (Ec) + { + throw std::system_error(std::error_code(Ec.value(), std::system_category()), + fmt::format("Failed to move file from '{}' to '{}', reason: ({}) {}", + CacheFilePath, + FirstTargetFilePath, + Ec.value(), + Ec.message())); + } + } + + m_RebuildFolderStateStats.FinalizeTreeFilesMovedCount++; + } + } + + OutLocalFolderState.Paths[FirstRemotePathIndex] = FirstTargetPath; + OutLocalFolderState.RawSizes[FirstRemotePathIndex] = m_RemoteContent.RawSizes[FirstRemotePathIndex]; + + OutLocalFolderState.Attributes[FirstRemotePathIndex] = + m_RemoteContent.Attributes.empty() + ? GetNativeFileAttributes(FirstTargetFilePath) + : SetNativeFileAttributes(FirstTargetFilePath, m_RemoteContent.Platform, m_RemoteContent.Attributes[FirstRemotePathIndex]); + OutLocalFolderState.ModificationTicks[FirstRemotePathIndex] = GetModificationTickFromPath(FirstTargetFilePath); + + TargetOffset++; + TargetsComplete++; + + while (TargetOffset < (BaseOffset + Count)) + { + const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex; + ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash); + const std::filesystem::path& TargetPath = m_RemoteContent.Paths[RemotePathIndex]; + std::filesystem::path TargetFilePath = (m_Path / TargetPath).make_preferred(); + + if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(RemotePathIndex); InPlaceIt != RemotePathIndexToLocalPathIndex.end()) + { + ZEN_ASSERT_SLOW(IsFileWithRetry(TargetFilePath)); + } + else + { + ZEN_TRACE_CPU("Copy"); + if (IsFileWithRetry(TargetFilePath)) + { + SetFileReadOnlyWithRetry(TargetFilePath, false); + } + else + { + CreateDirectories(TargetFilePath.parent_path()); + } + + ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath)); + ZEN_DEBUG("Copying from '{}' -> '{}'", FirstTargetFilePath, TargetFilePath); + const uint64_t RawSize = m_RemoteContent.RawSizes[RemotePathIndex]; + FastCopyFile(m_Options.AllowFileClone, + m_Options.UseSparseFiles, + FirstTargetFilePath, + TargetFilePath, + RawSize, + m_DiskStats.WriteCount, + m_DiskStats.WriteByteCount, + m_DiskStats.CloneCount, + m_DiskStats.CloneByteCount); + + m_RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++; + } + + OutLocalFolderState.Paths[RemotePathIndex] = TargetPath; + OutLocalFolderState.RawSizes[RemotePathIndex] = m_RemoteContent.RawSizes[RemotePathIndex]; + + OutLocalFolderState.Attributes[RemotePathIndex] = + m_RemoteContent.Attributes.empty() + ? GetNativeFileAttributes(TargetFilePath) + : SetNativeFileAttributes(TargetFilePath, m_RemoteContent.Platform, m_RemoteContent.Attributes[RemotePathIndex]); + OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath); + + TargetOffset++; + TargetsComplete++; + } + } +} + +std::vector<BuildsOperationUpdateFolder::ScavengeSource> +BuildsOperationUpdateFolder::FindScavengeSources() +{ + ZEN_TRACE_CPU("FindScavengeSources"); + + const bool TargetPathExists = IsDir(m_Path); + + std::vector<std::filesystem::path> StatePaths = GetDownloadedStatePaths(m_Options.SystemRootDir); + + std::vector<ScavengeSource> Result; + for (const std::filesystem::path& EntryPath : StatePaths) + { + if (IsFile(EntryPath)) + { + bool DeleteEntry = false; + + try + { + BuildsDownloadInfo Info = ReadDownloadedInfoFile(EntryPath); + const bool LocalPathExists = !Info.LocalPath.empty() && IsDir(Info.LocalPath); + const bool LocalStateFileExists = IsFile(Info.StateFilePath); + if (LocalPathExists && LocalStateFileExists) + { + if (TargetPathExists && std::filesystem::equivalent(Info.LocalPath, m_Path)) + { + DeleteEntry = true; + } + else + { + Result.push_back({.StateFilePath = std::move(Info.StateFilePath), .Path = std::move(Info.LocalPath)}); + } + } + else + { + DeleteEntry = true; + } + } + catch (const std::exception& Ex) + { + ZEN_WARN("{}", Ex.what()); + DeleteEntry = true; + } + + if (DeleteEntry) + { + std::error_code DummyEc; + std::filesystem::remove(EntryPath, DummyEc); + } + } + } + return Result; +} + +std::vector<uint32_t> +BuildsOperationUpdateFolder::ScanTargetFolder(const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& CachedChunkHashesFound, + const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& CachedSequenceHashesFound) +{ + ZEN_TRACE_CPU("ScanTargetFolder"); + + Stopwatch LocalTimer; + + std::vector<uint32_t> MissingSequenceIndexes; + + for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < m_RemoteContent.ChunkedContent.SequenceRawHashes.size(); + RemoteSequenceIndex++) + { + const IoHash& RemoteSequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + const uint32_t RemotePathIndex = GetFirstPathIndexForSeqeuenceIndex(m_RemoteLookup, RemoteSequenceIndex); + const uint64_t RemoteRawSize = m_RemoteContent.RawSizes[RemotePathIndex]; + if (auto CacheSequenceIt = CachedSequenceHashesFound.find(RemoteSequenceRawHash); + CacheSequenceIt != CachedSequenceHashesFound.end()) + { + const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RemoteSequenceRawHash); + ZEN_ASSERT_SLOW(IsFile(CacheFilePath)); + if (m_Options.IsVerbose) + { + ZEN_INFO("Found sequence {} at {} ({})", RemoteSequenceRawHash, CacheFilePath, NiceBytes(RemoteRawSize)); + } + } + else if (auto CacheChunkIt = CachedChunkHashesFound.find(RemoteSequenceRawHash); CacheChunkIt != CachedChunkHashesFound.end()) + { + const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RemoteSequenceRawHash); + ZEN_ASSERT_SLOW(IsFile(CacheFilePath)); + if (m_Options.IsVerbose) + { + ZEN_INFO("Found chunk {} at {} ({})", RemoteSequenceRawHash, CacheFilePath, NiceBytes(RemoteRawSize)); + } + } + else if (auto It = m_LocalLookup.RawHashToSequenceIndex.find(RemoteSequenceRawHash); + It != m_LocalLookup.RawHashToSequenceIndex.end()) + { + const uint32_t LocalSequenceIndex = It->second; + const uint32_t LocalPathIndex = GetFirstPathIndexForSeqeuenceIndex(m_LocalLookup, LocalSequenceIndex); + const std::filesystem::path LocalFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred(); + ZEN_ASSERT_SLOW(IsFile(LocalFilePath)); + m_CacheMappingStats.LocalPathsMatchingSequencesCount++; + m_CacheMappingStats.LocalPathsMatchingSequencesByteCount += RemoteRawSize; + if (m_Options.IsVerbose) + { + ZEN_INFO("Found sequence {} at {} ({})", RemoteSequenceRawHash, LocalFilePath, NiceBytes(RemoteRawSize)); + } + } + else + { + MissingSequenceIndexes.push_back(RemoteSequenceIndex); + } + } + + m_CacheMappingStats.LocalScanElapsedWallTimeUs += LocalTimer.GetElapsedTimeUs(); + return MissingSequenceIndexes; +} + +bool +BuildsOperationUpdateFolder::FindScavengeContent(const ScavengeSource& Source, + ChunkedFolderContent& OutScavengedLocalContent, + ChunkedContentLookup& OutScavengedLookup) +{ + ZEN_TRACE_CPU("FindScavengeContent"); + + FolderContent LocalFolderState; + try + { + BuildSaveState SavedState = ReadBuildSaveStateFile(Source.StateFilePath); + if (SavedState.Version == BuildSaveState::NoVersion) + { + ZEN_DEBUG("Skipping old build state at '{}', state files before version {} can not be trusted during scavenge", + Source.StateFilePath, + BuildSaveState::kVersion1); + return false; + } + OutScavengedLocalContent = std::move(SavedState.State.ChunkedContent); + LocalFolderState = std::move(SavedState.FolderState); + } + catch (const std::exception& Ex) + { + ZEN_DEBUG("Skipping invalid build state at '{}', reason: {}", Source.StateFilePath, Ex.what()); + return false; + } + + tsl::robin_set<uint32_t> PathIndexesToScavenge; + PathIndexesToScavenge.reserve(OutScavengedLocalContent.Paths.size()); + std::vector<uint32_t> ChunkOrderOffsets = BuildChunkOrderOffset(OutScavengedLocalContent.ChunkedContent.ChunkCounts); + + { + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> RawHashToPathIndex; + + RawHashToPathIndex.reserve(OutScavengedLocalContent.Paths.size()); + for (uint32_t ScavengedPathIndex = 0; ScavengedPathIndex < OutScavengedLocalContent.RawHashes.size(); ScavengedPathIndex++) + { + if (!RawHashToPathIndex.contains(OutScavengedLocalContent.RawHashes[ScavengedPathIndex])) + { + RawHashToPathIndex.insert_or_assign(OutScavengedLocalContent.RawHashes[ScavengedPathIndex], ScavengedPathIndex); + } + } + + for (uint32_t ScavengeSequenceIndex = 0; ScavengeSequenceIndex < OutScavengedLocalContent.ChunkedContent.SequenceRawHashes.size(); + ScavengeSequenceIndex++) + { + const IoHash& SequenceHash = OutScavengedLocalContent.ChunkedContent.SequenceRawHashes[ScavengeSequenceIndex]; + if (auto It = RawHashToPathIndex.find(SequenceHash); It != RawHashToPathIndex.end()) + { + uint32_t PathIndex = It->second; + if (!PathIndexesToScavenge.contains(PathIndex)) + { + if (m_RemoteLookup.RawHashToSequenceIndex.contains(SequenceHash)) + { + PathIndexesToScavenge.insert(PathIndex); + } + else + { + uint32_t ChunkOrderIndexStart = ChunkOrderOffsets[ScavengeSequenceIndex]; + const uint32_t ChunkCount = OutScavengedLocalContent.ChunkedContent.ChunkCounts[ScavengeSequenceIndex]; + for (uint32_t ChunkOrderIndex = 0; ChunkOrderIndex < ChunkCount; ChunkOrderIndex++) + { + const uint32_t ChunkIndex = + OutScavengedLocalContent.ChunkedContent.ChunkOrders[ChunkOrderIndexStart + ChunkOrderIndex]; + const IoHash& ChunkHash = OutScavengedLocalContent.ChunkedContent.ChunkHashes[ChunkIndex]; + if (m_RemoteLookup.ChunkHashToChunkIndex.contains(ChunkHash)) + { + PathIndexesToScavenge.insert(PathIndex); + break; + } + } + } + } + } + else + { + ZEN_WARN("Scavenged state file at '{}' for '{}' is invalid, skipping scavenging for sequence {}", + Source.StateFilePath, + Source.Path, + SequenceHash); + } + } + } + + if (PathIndexesToScavenge.empty()) + { + OutScavengedLocalContent = {}; + return false; + } + + std::vector<std::filesystem::path> PathsToScavenge; + PathsToScavenge.reserve(PathIndexesToScavenge.size()); + for (uint32_t ScavengedStatePathIndex : PathIndexesToScavenge) + { + PathsToScavenge.push_back(OutScavengedLocalContent.Paths[ScavengedStatePathIndex]); + } + + FolderContent ValidFolderContent = + GetValidFolderContent(m_IOWorkerPool, m_ScavengedFolderScanStats, Source.Path, PathsToScavenge, {}, 0, m_AbortFlag, m_PauseFlag); + + if (!LocalFolderState.AreKnownFilesEqual(ValidFolderContent)) + { + std::vector<std::filesystem::path> DeletedPaths; + FolderContent UpdatedContent = GetUpdatedContent(LocalFolderState, ValidFolderContent, DeletedPaths); + + // If the files are modified since the state was saved we ignore the files since we don't + // want to incur the cost of scanning/hashing scavenged files + DeletedPaths.insert(DeletedPaths.end(), UpdatedContent.Paths.begin(), UpdatedContent.Paths.end()); + if (!DeletedPaths.empty()) + { + OutScavengedLocalContent = + DeletePathsFromChunkedContent(OutScavengedLocalContent, + BuildHashLookup(OutScavengedLocalContent.ChunkedContent.SequenceRawHashes), + ChunkOrderOffsets, + DeletedPaths); + } + } + + if (OutScavengedLocalContent.Paths.empty()) + { + OutScavengedLocalContent = {}; + return false; + } + + OutScavengedLookup = BuildChunkedContentLookup(OutScavengedLocalContent); + + return true; +} + +void +BuildsOperationUpdateFolder::ScavengeSourceForChunks(uint32_t& InOutRemainingChunkCount, + std::vector<bool>& InOutRemoteChunkIndexNeedsCopyFromLocalFileFlags, + tsl::robin_map<IoHash, size_t, IoHash::Hasher>& InOutRawHashToCopyChunkDataIndex, + const std::vector<std::atomic<uint32_t>>& SequenceIndexChunksLeftToWriteCounters, + const ChunkedFolderContent& ScavengedContent, + const ChunkedContentLookup& ScavengedLookup, + std::vector<CopyChunkData>& InOutCopyChunkDatas, + uint32_t ScavengedContentIndex, + uint64_t& InOutChunkMatchingRemoteCount, + uint64_t& InOutChunkMatchingRemoteByteCount) +{ + for (uint32_t RemoteChunkIndex = 0; + RemoteChunkIndex < m_RemoteContent.ChunkedContent.ChunkHashes.size() && (InOutRemainingChunkCount > 0); + RemoteChunkIndex++) + { + if (!InOutRemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) + { + const IoHash& RemoteChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + if (auto It = ScavengedLookup.ChunkHashToChunkIndex.find(RemoteChunkHash); It != ScavengedLookup.ChunkHashToChunkIndex.end()) + { + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = + GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); + + if (!ChunkTargetPtrs.empty()) + { + const uint32_t ScavengedChunkIndex = It->second; + const uint64_t ScavengedChunkRawSize = ScavengedContent.ChunkedContent.ChunkRawSizes[ScavengedChunkIndex]; + const size_t ChunkSequenceLocationOffset = ScavengedLookup.ChunkSequenceLocationOffset[ScavengedChunkIndex]; + const ChunkedContentLookup::ChunkSequenceLocation& ScavengeLocation = + ScavengedLookup.ChunkSequenceLocations[ChunkSequenceLocationOffset]; + const IoHash& ScavengedSequenceRawHash = + ScavengedContent.ChunkedContent.SequenceRawHashes[ScavengeLocation.SequenceIndex]; + + CopyChunkData::ChunkTarget Target = {.TargetChunkLocationCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()), + .RemoteChunkIndex = RemoteChunkIndex, + .CacheFileOffset = ScavengeLocation.Offset}; + if (auto CopySourceIt = InOutRawHashToCopyChunkDataIndex.find(ScavengedSequenceRawHash); + CopySourceIt != InOutRawHashToCopyChunkDataIndex.end()) + { + CopyChunkData& Data = InOutCopyChunkDatas[CopySourceIt->second]; + if (Data.TargetChunkLocationPtrs.size() > 1024) + { + InOutRawHashToCopyChunkDataIndex.insert_or_assign(ScavengedSequenceRawHash, InOutCopyChunkDatas.size()); + InOutCopyChunkDatas.push_back(CopyChunkData{.ScavengeSourceIndex = ScavengedContentIndex, + .SourceSequenceIndex = ScavengeLocation.SequenceIndex, + .TargetChunkLocationPtrs = ChunkTargetPtrs, + .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}}); + } + else + { + Data.TargetChunkLocationPtrs.insert(Data.TargetChunkLocationPtrs.end(), + ChunkTargetPtrs.begin(), + ChunkTargetPtrs.end()); + Data.ChunkTargets.push_back(Target); + } + } + else + { + InOutRawHashToCopyChunkDataIndex.insert_or_assign(ScavengedSequenceRawHash, InOutCopyChunkDatas.size()); + InOutCopyChunkDatas.push_back(CopyChunkData{.ScavengeSourceIndex = ScavengedContentIndex, + .SourceSequenceIndex = ScavengeLocation.SequenceIndex, + .TargetChunkLocationPtrs = ChunkTargetPtrs, + .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}}); + } + InOutChunkMatchingRemoteCount++; + InOutChunkMatchingRemoteByteCount += ScavengedChunkRawSize; + InOutRemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex] = true; + InOutRemainingChunkCount--; + } + } + } + } +} + +std::filesystem::path +BuildsOperationUpdateFolder::FindDownloadedChunk(const IoHash& ChunkHash) +{ + ZEN_TRACE_CPU("FindDownloadedChunk"); + + std::filesystem::path CompressedChunkPath = m_TempDownloadFolderPath / ChunkHash.ToHexString(); + if (IsFile(CompressedChunkPath)) + { + IoBuffer ExistingCompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); + if (ExistingCompressedPart) + { + IoHash RawHash; + uint64_t RawSize; + if (CompressedBuffer::ValidateCompressedHeader(ExistingCompressedPart, + RawHash, + RawSize, + /*OutOptionalTotalCompressedSize*/ nullptr)) + { + return CompressedChunkPath; + } + else + { + std::error_code DummyEc; + RemoveFile(CompressedChunkPath, DummyEc); + } + } + } + return {}; +} + +std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> +BuildsOperationUpdateFolder::GetRemainingChunkTargets(std::span<const std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + uint32_t ChunkIndex) +{ + ZEN_TRACE_CPU("GetRemainingChunkTargets"); + + std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkSources = GetChunkSequenceLocations(m_RemoteLookup, ChunkIndex); + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs; + if (!ChunkSources.empty()) + { + ChunkTargetPtrs.reserve(ChunkSources.size()); + for (const ChunkedContentLookup::ChunkSequenceLocation& Source : ChunkSources) + { + if (SequenceIndexChunksLeftToWriteCounters[Source.SequenceIndex].load() > 0) + { + ChunkTargetPtrs.push_back(&Source); + } + } + } + return ChunkTargetPtrs; +}; + +uint64_t +BuildsOperationUpdateFolder::GetChunkWriteCount(std::span<const std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + uint32_t ChunkIndex) +{ + ZEN_TRACE_CPU("GetChunkWriteCount"); + + uint64_t WriteCount = 0; + std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkSources = GetChunkSequenceLocations(m_RemoteLookup, ChunkIndex); + for (const ChunkedContentLookup::ChunkSequenceLocation& Source : ChunkSources) + { + if (SequenceIndexChunksLeftToWriteCounters[Source.SequenceIndex].load() > 0) + { + WriteCount++; + } + } + return WriteCount; +}; + +void +BuildsOperationUpdateFolder::CheckRequiredDiskSpace(const tsl::robin_map<std::string, uint32_t>& RemotePathToRemoteIndex) +{ + tsl::robin_set<uint32_t> ExistingRemotePaths; + + if (m_Options.EnableTargetFolderScavenging) + { + for (uint32_t LocalPathIndex = 0; LocalPathIndex < m_LocalContent.Paths.size(); LocalPathIndex++) + { + const IoHash& RawHash = m_LocalContent.RawHashes[LocalPathIndex]; + const std::filesystem::path& LocalPath = m_LocalContent.Paths[LocalPathIndex]; + + if (auto RemotePathIt = RemotePathToRemoteIndex.find(LocalPath.generic_string()); RemotePathIt != RemotePathToRemoteIndex.end()) + { + const uint32_t RemotePathIndex = RemotePathIt->second; + if (m_RemoteContent.RawHashes[RemotePathIndex] == RawHash) + { + ExistingRemotePaths.insert(RemotePathIndex); + } + } + } + } + + uint64_t RequiredSpace = 0; + for (uint32_t RemotePathIndex = 0; RemotePathIndex < m_RemoteContent.Paths.size(); RemotePathIndex++) + { + if (!ExistingRemotePaths.contains(RemotePathIndex)) + { + RequiredSpace += m_RemoteContent.RawSizes[RemotePathIndex]; + } + } + + std::error_code Ec; + DiskSpace Space = DiskSpaceInfo(m_Path, Ec); + if (Ec) + { + throw std::runtime_error(fmt::format("Get free disk space for target path '{}' FAILED, reason: {}", m_Path, Ec.message())); + } + if (Space.Free < (RequiredSpace + 16u * 1024u * 1024u)) + { + throw std::runtime_error( + fmt::format("Not enough free space for target path '{}', {} of free space is needed but only {} is available", + m_Path, + NiceBytes(RequiredSpace), + NiceBytes(Space.Free))); + } +} + +void +BuildsOperationUpdateFolder::WriteScavengedSequenceToCache(const std::filesystem::path& ScavengeRootPath, + const ChunkedFolderContent& ScavengedContent, + const ScavengedSequenceCopyOperation& ScavengeOp) +{ + ZEN_TRACE_CPU("WriteScavengedSequenceToCache"); + + const std::filesystem::path ScavengedPath = ScavengedContent.Paths[ScavengeOp.ScavengedPathIndex]; + const std::filesystem::path ScavengedFilePath = (ScavengeRootPath / ScavengedPath).make_preferred(); + ZEN_ASSERT_SLOW(FileSizeFromPath(ScavengedFilePath) == ScavengeOp.RawSize); + + const IoHash& RemoteSequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[ScavengeOp.RemoteSequenceIndex]; + const std::filesystem::path TempFilePath = GetTempChunkedSequenceFileName(m_CacheFolderPath, RemoteSequenceRawHash); + + const uint64_t RawSize = ScavengedContent.RawSizes[ScavengeOp.ScavengedPathIndex]; + FastCopyFile(m_Options.AllowFileClone, + m_Options.UseSparseFiles, + ScavengedFilePath, + TempFilePath, + RawSize, + m_DiskStats.WriteCount, + m_DiskStats.WriteByteCount, + m_DiskStats.CloneCount, + m_DiskStats.CloneByteCount); + + const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RemoteSequenceRawHash); + RenameFile(TempFilePath, CacheFilePath); + + m_WrittenChunkByteCount += RawSize; + if (m_Options.ValidateCompletedSequences) + { + m_ValidatedChunkByteCount += RawSize; + } +} + +void +BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkIndex, + const BlobsExistsResult& ExistsResult, + std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + std::atomic<uint64_t>& WritePartsComplete, + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>&& ChunkTargetPtrs, + BufferedWriteFileCache& WriteCache, + ParallelWork& Work, + uint64_t TotalRequestCount, + uint64_t TotalPartWriteCount, + FilteredRate& FilteredDownloadedBytesPerSecond, + FilteredRate& FilteredWrittenBytesPerSecond) +{ + const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + std::filesystem::path ExistingCompressedChunkPath = FindDownloadedChunk(ChunkHash); + if (!ExistingCompressedChunkPath.empty()) + { + if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + } + if (!m_AbortFlag) + { + if (!ExistingCompressedChunkPath.empty()) + { + Work.ScheduleWork( + m_IOWorkerPool, + [this, + SequenceIndexChunksLeftToWriteCounters, + &WriteCache, + &Work, + &WritePartsComplete, + TotalPartWriteCount, + &FilteredWrittenBytesPerSecond, + RemoteChunkIndex, + ChunkTargetPtrs = std::move(ChunkTargetPtrs), + CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>& AbortFlag) { + if (!AbortFlag) + { + ZEN_TRACE_CPU("Async_WritePreDownloadedChunk"); + + FilteredWrittenBytesPerSecond.Start(); + + const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + + IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); + if (!CompressedPart) + { + throw std::runtime_error( + fmt::format("Could not open dowloaded compressed chunk {} from {}", ChunkHash, CompressedChunkPath)); + } + + bool NeedHashVerify = + WriteCompressedChunkToCache(ChunkHash, ChunkTargetPtrs, WriteCache, std::move(CompressedPart)); + bool WritePartsDone = WritePartsComplete.fetch_add(1) + 1 == TotalPartWriteCount; + + if (!AbortFlag) + { + if (WritePartsDone) + { + FilteredWrittenBytesPerSecond.Stop(); + } + + std::error_code Ec = TryRemoveFile(CompressedChunkPath); + if (Ec) + { + ZEN_DEBUG("Failed removing file '{}', reason: ({}) {}", CompressedChunkPath, Ec.value(), Ec.message()); + } + + std::vector<uint32_t> CompletedSequences = + CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); + WriteCache.Close(CompletedSequences); + if (NeedHashVerify) + { + VerifyAndCompleteChunkSequencesAsync(CompletedSequences, Work); + } + else + { + FinalizeChunkSequences(CompletedSequences); + } + } + } + }); + } + else + { + Work.ScheduleWork(m_NetworkPool, + [this, + &ExistsResult, + SequenceIndexChunksLeftToWriteCounters, + &WriteCache, + &Work, + &WritePartsComplete, + TotalPartWriteCount, + TotalRequestCount, + &FilteredDownloadedBytesPerSecond, + &FilteredWrittenBytesPerSecond, + RemoteChunkIndex, + ChunkTargetPtrs = std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>( + std::move(ChunkTargetPtrs))](std::atomic<bool>&) mutable { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_DownloadChunk"); + + FilteredDownloadedBytesPerSecond.Start(); + DownloadBuildBlob(RemoteChunkIndex, + ExistsResult, + Work, + TotalRequestCount, + FilteredDownloadedBytesPerSecond, + [this, + &ExistsResult, + SequenceIndexChunksLeftToWriteCounters, + &WriteCache, + &Work, + &WritePartsComplete, + TotalPartWriteCount, + RemoteChunkIndex, + &FilteredWrittenBytesPerSecond, + ChunkTargetPtrs = std::move(ChunkTargetPtrs)](IoBuffer&& Payload) mutable { + AsyncWriteDownloadedChunk(RemoteChunkIndex, + ExistsResult, + std::move(ChunkTargetPtrs), + WriteCache, + Work, + std::move(Payload), + SequenceIndexChunksLeftToWriteCounters, + WritePartsComplete, + TotalPartWriteCount, + FilteredWrittenBytesPerSecond); + }); + } + }); + } + } +} + +void +BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkIndex, + const BlobsExistsResult& ExistsResult, + ParallelWork& Work, + uint64_t TotalRequestCount, + FilteredRate& FilteredDownloadedBytesPerSecond, + std::function<void(IoBuffer&& Payload)>&& OnDownloaded) +{ + const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + // FilteredDownloadedBytesPerSecond.Start(); + IoBuffer BuildBlob; + const bool ExistsInCache = m_Storage.CacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash); + if (ExistsInCache) + { + BuildBlob = m_Storage.CacheStorage->GetBuildBlob(m_BuildId, ChunkHash); + } + if (BuildBlob) + { + uint64_t BlobSize = BuildBlob.GetSize(); + m_DownloadStats.DownloadedChunkCount++; + m_DownloadStats.DownloadedChunkByteCount += BlobSize; + if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + OnDownloaded(std::move(BuildBlob)); + } + else + { + if (m_RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= m_Options.LargeAttachmentSize) + { + DownloadLargeBlob( + *m_Storage.BuildStorage, + m_TempDownloadFolderPath, + m_BuildId, + ChunkHash, + m_Options.PreferredMultipartChunkSize, + Work, + m_NetworkPool, + m_DownloadStats.DownloadedChunkByteCount, + m_DownloadStats.MultipartAttachmentCount, + [this, &FilteredDownloadedBytesPerSecond, TotalRequestCount, OnDownloaded = std::move(OnDownloaded)](IoBuffer&& Payload) { + m_DownloadStats.DownloadedChunkCount++; + if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + + OnDownloaded(std::move(Payload)); + }); + } + else + { + try + { + BuildBlob = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, ChunkHash); + } + catch (const std::exception&) + { + // Silence http errors due to abort + if (!m_AbortFlag) + { + throw; + } + } + if (!m_AbortFlag) + { + if (!BuildBlob) + { + throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash)); + } + + if (!m_AbortFlag) + { + uint64_t BlobSize = BuildBlob.GetSize(); + m_DownloadStats.DownloadedChunkCount++; + m_DownloadStats.DownloadedChunkByteCount += BlobSize; + if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + + OnDownloaded(std::move(BuildBlob)); + } + } + } + } +} + +void +BuildsOperationUpdateFolder::DownloadPartialBlock( + std::span<const ChunkBlockAnalyser::BlockRangeDescriptor> BlockRanges, + size_t BlockRangeStartIndex, + size_t BlockRangeCount, + const BlobsExistsResult& ExistsResult, + uint64_t TotalRequestCount, + FilteredRate& FilteredDownloadedBytesPerSecond, + std::function<void(IoBuffer&& InMemoryBuffer, + const std::filesystem::path& OnDiskPath, + size_t BlockRangeStartIndex, + std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengths)>&& OnDownloaded) +{ + const uint32_t BlockIndex = BlockRanges[BlockRangeStartIndex].BlockIndex; + + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + + auto ProcessDownload = [this]( + const ChunkBlockDescription& BlockDescription, + IoBuffer&& BlockRangeBuffer, + size_t BlockRangeStartIndex, + std::span<const std::pair<uint64_t, uint64_t>> BlockOffsetAndLengths, + uint64_t TotalRequestCount, + FilteredRate& FilteredDownloadedBytesPerSecond, + const std::function<void(IoBuffer && InMemoryBuffer, + const std::filesystem::path& OnDiskPath, + size_t BlockRangeStartIndex, + std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengths)>& OnDownloaded) { + uint64_t BlockRangeBufferSize = BlockRangeBuffer.GetSize(); + m_DownloadStats.DownloadedBlockCount++; + m_DownloadStats.DownloadedBlockByteCount += BlockRangeBufferSize; + if (m_DownloadStats.RequestsCompleteCount.fetch_add(BlockOffsetAndLengths.size()) + BlockOffsetAndLengths.size() == + TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + + IoHashStream RangeId; + for (const std::pair<uint64_t, uint64_t>& Range : BlockOffsetAndLengths) + { + RangeId.Append(&Range.first, sizeof(uint64_t)); + RangeId.Append(&Range.second, sizeof(uint64_t)); + } + std::filesystem::path BlockChunkPath = + TryMoveDownloadedChunk(BlockRangeBuffer, + m_TempBlockFolderPath / fmt::format("{}_{}", BlockDescription.BlockHash, RangeId.GetHash()), + /* ForceDiskBased */ BlockRangeBufferSize > m_Options.MaximumInMemoryPayloadSize); + + if (!m_AbortFlag) + { + OnDownloaded(std::move(BlockRangeBuffer), std::move(BlockChunkPath), BlockRangeStartIndex, BlockOffsetAndLengths); + } + }; + + std::vector<std::pair<uint64_t, uint64_t>> Ranges; + Ranges.reserve(BlockRangeCount); + for (size_t BlockRangeIndex = BlockRangeStartIndex; BlockRangeIndex < BlockRangeStartIndex + BlockRangeCount; BlockRangeIndex++) + { + const ChunkBlockAnalyser::BlockRangeDescriptor& BlockRange = BlockRanges[BlockRangeIndex]; + Ranges.push_back(std::make_pair(BlockRange.RangeStart, BlockRange.RangeLength)); + } + + const bool ExistsInCache = m_Storage.CacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash); + + size_t SubBlockRangeCount = BlockRangeCount; + size_t SubRangeCountComplete = 0; + std::span<const std::pair<uint64_t, uint64_t>> RangesSpan(Ranges); + while (SubRangeCountComplete < SubBlockRangeCount) + { + if (m_AbortFlag) + { + break; + } + + // First try to get subrange from cache. + // If not successful, try to get the ranges from the build store and adapt SubRangeCount... + + size_t SubRangeStartIndex = BlockRangeStartIndex + SubRangeCountComplete; + if (ExistsInCache) + { + size_t SubRangeCount = Min(BlockRangeCount - SubRangeCountComplete, m_Storage.CacheHost.Caps.MaxRangeCountPerRequest); + + if (SubRangeCount == 1) + { + // Legacy single-range path, prefer that for max compatibility + + const std::pair<uint64_t, uint64_t> SubRange = RangesSpan[SubRangeCountComplete]; + IoBuffer PayloadBuffer = + m_Storage.CacheStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash, SubRange.first, SubRange.second); + if (m_AbortFlag) + { + break; + } + if (PayloadBuffer) + { + ProcessDownload(BlockDescription, + std::move(PayloadBuffer), + SubRangeStartIndex, + std::vector<std::pair<uint64_t, uint64_t>>{std::make_pair(0u, SubRange.second)}, + TotalRequestCount, + FilteredDownloadedBytesPerSecond, + OnDownloaded); + SubRangeCountComplete += SubRangeCount; + continue; + } + } + else + { + auto SubRanges = RangesSpan.subspan(SubRangeCountComplete, SubRangeCount); + + BuildStorageCache::BuildBlobRanges RangeBuffers = + m_Storage.CacheStorage->GetBuildBlobRanges(m_BuildId, BlockDescription.BlockHash, SubRanges); + if (m_AbortFlag) + { + break; + } + if (RangeBuffers.PayloadBuffer) + { + if (RangeBuffers.Ranges.empty()) + { + SubRangeCount = Ranges.size() - SubRangeCountComplete; + ProcessDownload(BlockDescription, + std::move(RangeBuffers.PayloadBuffer), + SubRangeStartIndex, + RangesSpan.subspan(SubRangeCountComplete, SubRangeCount), + TotalRequestCount, + FilteredDownloadedBytesPerSecond, + OnDownloaded); + SubRangeCountComplete += SubRangeCount; + continue; + } + else if (RangeBuffers.Ranges.size() == SubRangeCount) + { + ProcessDownload(BlockDescription, + std::move(RangeBuffers.PayloadBuffer), + SubRangeStartIndex, + RangeBuffers.Ranges, + TotalRequestCount, + FilteredDownloadedBytesPerSecond, + OnDownloaded); + SubRangeCountComplete += SubRangeCount; + continue; + } + } + } + } + + size_t SubRangeCount = Min(BlockRangeCount - SubRangeCountComplete, m_Storage.BuildStorageHost.Caps.MaxRangeCountPerRequest); + + auto SubRanges = RangesSpan.subspan(SubRangeCountComplete, SubRangeCount); + + BuildStorageBase::BuildBlobRanges RangeBuffers; + + try + { + RangeBuffers = m_Storage.BuildStorage->GetBuildBlobRanges(m_BuildId, BlockDescription.BlockHash, SubRanges); + } + catch (const std::exception&) + { + // Silence http errors due to abort + if (!m_AbortFlag) + { + throw; + } + } + + if (!m_AbortFlag) + { + if (RangeBuffers.PayloadBuffer) + { + if (RangeBuffers.Ranges.empty()) + { + // Jupiter will ignore the ranges and send the whole payload if it fetches the payload from S3 + // Upload to cache (if enabled) and use the whole payload for the remaining ranges + + const uint64_t Size = RangeBuffers.PayloadBuffer.GetSize(); + + const bool PopulateCache = !ExistsInCache && m_Storage.CacheStorage && m_Options.PopulateCache; + + std::filesystem::path BlockPath = + TryMoveDownloadedChunk(RangeBuffers.PayloadBuffer, + m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(), + /* ForceDiskBased */ PopulateCache || Size > m_Options.MaximumInMemoryPayloadSize); + if (!BlockPath.empty()) + { + RangeBuffers.PayloadBuffer = IoBufferBuilder::MakeFromFile(BlockPath); + if (!RangeBuffers.PayloadBuffer) + { + throw std::runtime_error( + fmt::format("Failed to read block {} from temporary path '{}'", BlockDescription.BlockHash, BlockPath)); + } + RangeBuffers.PayloadBuffer.SetDeleteOnClose(true); + } + + if (PopulateCache) + { + m_Storage.CacheStorage->PutBuildBlob(m_BuildId, + BlockDescription.BlockHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(SharedBuffer(RangeBuffers.PayloadBuffer))); + } + + if (m_AbortFlag) + { + break; + } + + SubRangeCount = Ranges.size() - SubRangeCountComplete; + ProcessDownload(BlockDescription, + std::move(RangeBuffers.PayloadBuffer), + SubRangeStartIndex, + RangesSpan.subspan(SubRangeCountComplete, SubRangeCount), + TotalRequestCount, + FilteredDownloadedBytesPerSecond, + OnDownloaded); + } + else + { + if (RangeBuffers.Ranges.size() != SubRanges.size()) + { + throw std::runtime_error(fmt::format("Fetching {} ranges from {} resulted in {} ranges", + SubRanges.size(), + BlockDescription.BlockHash, + RangeBuffers.Ranges.size())); + } + ProcessDownload(BlockDescription, + std::move(RangeBuffers.PayloadBuffer), + SubRangeStartIndex, + RangeBuffers.Ranges, + TotalRequestCount, + FilteredDownloadedBytesPerSecond, + OnDownloaded); + } + } + else + { + throw std::runtime_error( + fmt::format("Block {} is missing when fetching {} ranges", BlockDescription.BlockHash, SubRangeCount)); + } + + SubRangeCountComplete += SubRangeCount; + } + } +} + +std::vector<uint32_t> +BuildsOperationUpdateFolder::WriteLocalChunkToCache(CloneQueryInterface* CloneQuery, + const CopyChunkData& CopyData, + const std::vector<ChunkedFolderContent>& ScavengedContents, + const std::vector<ChunkedContentLookup>& ScavengedLookups, + const std::vector<std::filesystem::path>& ScavengedPaths, + BufferedWriteFileCache& WriteCache) +{ + ZEN_TRACE_CPU("WriteLocalChunkToCache"); + + std::filesystem::path SourceFilePath; + + if (CopyData.ScavengeSourceIndex == (uint32_t)-1) + { + const uint32_t LocalPathIndex = m_LocalLookup.SequenceIndexFirstPathIndex[CopyData.SourceSequenceIndex]; + SourceFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred(); + } + else + { + const ChunkedFolderContent& ScavengedContent = ScavengedContents[CopyData.ScavengeSourceIndex]; + const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[CopyData.ScavengeSourceIndex]; + const std::filesystem::path ScavengedPath = ScavengedPaths[CopyData.ScavengeSourceIndex]; + const uint32_t ScavengedPathIndex = ScavengedLookup.SequenceIndexFirstPathIndex[CopyData.SourceSequenceIndex]; + SourceFilePath = (ScavengedPath / ScavengedContent.Paths[ScavengedPathIndex]).make_preferred(); + } + ZEN_ASSERT_SLOW(IsFile(SourceFilePath)); + ZEN_ASSERT(!CopyData.TargetChunkLocationPtrs.empty()); + + uint64_t CacheLocalFileBytesRead = 0; + + size_t TargetStart = 0; + const std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> AllTargets(CopyData.TargetChunkLocationPtrs); + + struct WriteOp + { + const ChunkedContentLookup::ChunkSequenceLocation* Target = nullptr; + uint64_t CacheFileOffset = (uint64_t)-1; + uint32_t ChunkIndex = (uint32_t)-1; + }; + + std::vector<WriteOp> WriteOps; + + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Sort"); + WriteOps.reserve(AllTargets.size()); + for (const CopyChunkData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets) + { + std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> TargetRange = + AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount); + for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange) + { + WriteOps.push_back( + WriteOp{.Target = Target, .CacheFileOffset = ChunkTarget.CacheFileOffset, .ChunkIndex = ChunkTarget.RemoteChunkIndex}); + } + TargetStart += ChunkTarget.TargetChunkLocationCount; + } + + std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) { + if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex) + { + return true; + } + else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex) + { + return false; + } + if (Lhs.Target->Offset < Rhs.Target->Offset) + { + return true; + } + return false; + }); + } + + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Write"); + + tsl::robin_set<uint32_t> ChunkIndexesWritten; + + BufferedOpenFile SourceFile(SourceFilePath, + m_DiskStats.OpenReadCount, + m_DiskStats.CurrentOpenFileCount, + m_DiskStats.ReadCount, + m_DiskStats.ReadByteCount); + + bool CanCloneSource = CloneQuery && CloneQuery->CanClone(SourceFile.Handle()); + + BufferedWriteFileCache::Local LocalWriter(WriteCache); + + for (size_t WriteOpIndex = 0; WriteOpIndex < WriteOps.size();) + { + if (m_AbortFlag) + { + break; + } + const WriteOp& Op = WriteOps[WriteOpIndex]; + + const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex; + const uint32_t RemotePathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]; + const uint64_t TargetSize = m_RemoteContent.RawSizes[RemotePathIndex]; + const uint64_t ChunkSize = m_RemoteContent.ChunkedContent.ChunkRawSizes[Op.ChunkIndex]; + + uint64_t ReadLength = ChunkSize; + size_t WriteCount = 1; + uint64_t OpSourceEnd = Op.CacheFileOffset + ChunkSize; + uint64_t OpTargetEnd = Op.Target->Offset + ChunkSize; + while ((WriteOpIndex + WriteCount) < WriteOps.size()) + { + const WriteOp& NextOp = WriteOps[WriteOpIndex + WriteCount]; + if (NextOp.Target->SequenceIndex != Op.Target->SequenceIndex) + { + break; + } + if (NextOp.Target->Offset != OpTargetEnd) + { + break; + } + if (NextOp.CacheFileOffset != OpSourceEnd) + { + break; + } + const uint64_t NextChunkLength = m_RemoteContent.ChunkedContent.ChunkRawSizes[NextOp.ChunkIndex]; + if (ReadLength + NextChunkLength > BufferedOpenFile::BlockSize) + { + break; + } + ReadLength += NextChunkLength; + OpSourceEnd += NextChunkLength; + OpTargetEnd += NextChunkLength; + WriteCount++; + } + + { + bool DidClone = false; + + if (CanCloneSource) + { + uint64_t PreBytes = 0; + uint64_t PostBytes = 0; + uint64_t ClonableBytes = + CloneQuery->GetClonableRange(Op.CacheFileOffset, Op.Target->Offset, ReadLength, PreBytes, PostBytes); + if (ClonableBytes > 0) + { + // We need to open the file... + BufferedWriteFileCache::Local::Writer* Writer = LocalWriter.GetWriter(RemoteSequenceIndex); + if (!Writer) + { + Writer = LocalWriter.PutWriter(RemoteSequenceIndex, std::make_unique<BufferedWriteFileCache::Local::Writer>()); + + Writer->File = std::make_unique<BasicFile>(); + + const std::filesystem::path FileName = + GetTempChunkedSequenceFileName(m_CacheFolderPath, + m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]); + Writer->File->Open(FileName, BasicFile::Mode::kWrite); + if (m_Options.UseSparseFiles) + { + PrepareFileForScatteredWrite(Writer->File->Handle(), TargetSize); + } + } + DidClone = CloneQuery->TryClone(SourceFile.Handle(), + Writer->File->Handle(), + Op.CacheFileOffset + PreBytes, + Op.Target->Offset + PreBytes, + ClonableBytes, + TargetSize); + if (DidClone) + { + m_DiskStats.WriteCount++; + m_DiskStats.WriteByteCount += ClonableBytes; + + m_DiskStats.CloneCount++; + m_DiskStats.CloneByteCount += ClonableBytes; + + m_WrittenChunkByteCount += ClonableBytes; + + if (PreBytes > 0) + { + CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, PreBytes); + const uint64_t FileOffset = Op.Target->Offset; + + WriteSequenceChunkToCache(LocalWriter, ChunkSource, RemoteSequenceIndex, FileOffset, RemotePathIndex); + } + if (PostBytes > 0) + { + CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset + ReadLength - PostBytes, PostBytes); + const uint64_t FileOffset = Op.Target->Offset + ReadLength - PostBytes; + + WriteSequenceChunkToCache(LocalWriter, ChunkSource, RemoteSequenceIndex, FileOffset, RemotePathIndex); + } + } + } + } + + if (!DidClone) + { + CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ReadLength); + + const uint64_t FileOffset = Op.Target->Offset; + + WriteSequenceChunkToCache(LocalWriter, ChunkSource, RemoteSequenceIndex, FileOffset, RemotePathIndex); + } + } + + CacheLocalFileBytesRead += ReadLength; // TODO: This should be the sum of unique chunk sizes? + + WriteOpIndex += WriteCount; + } + } + + if (m_Options.IsVerbose) + { + ZEN_INFO("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), SourceFilePath); + } + + std::vector<uint32_t> Result; + Result.reserve(WriteOps.size()); + + for (const WriteOp& Op : WriteOps) + { + Result.push_back(Op.Target->SequenceIndex); + } + return Result; +} + +bool +BuildsOperationUpdateFolder::WriteCompressedChunkToCache( + const IoHash& ChunkHash, + const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs, + BufferedWriteFileCache& WriteCache, + IoBuffer&& CompressedPart) +{ + ZEN_TRACE_CPU("WriteCompressedChunkToCache"); + + auto ChunkHashToChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); + ZEN_ASSERT(ChunkHashToChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end()); + if (IsSingleFileChunk(m_RemoteContent, ChunkTargetPtrs)) + { + const std::uint32_t SequenceIndex = ChunkTargetPtrs.front()->SequenceIndex; + const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]; + StreamDecompress(SequenceRawHash, CompositeBuffer(std::move(CompressedPart))); + return false; + } + else + { + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompositeBuffer(std::move(CompressedPart)), RawHash, RawSize); + if (!Compressed) + { + throw std::runtime_error(fmt::format("Failed to parse header of compressed large blob {}", ChunkHash)); + } + if (RawHash != ChunkHash) + { + throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, ChunkHash)); + } + + BufferedWriteFileCache::Local LocalWriter(WriteCache); + + IoHashStream Hash; + bool CouldDecompress = Compressed.DecompressToStream( + 0, + (uint64_t)-1, + [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { + ZEN_UNUSED(SourceOffset); + ZEN_TRACE_CPU("Async_StreamDecompress_Write"); + m_DiskStats.ReadByteCount += SourceSize; + if (!m_AbortFlag) + { + for (const ChunkedContentLookup::ChunkSequenceLocation* TargetPtr : ChunkTargetPtrs) + { + const auto& Target = *TargetPtr; + const uint64_t FileOffset = Target.Offset + Offset; + const uint32_t SequenceIndex = Target.SequenceIndex; + const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; + + WriteSequenceChunkToCache(LocalWriter, RangeBuffer, SequenceIndex, FileOffset, PathIndex); + } + + return true; + } + return false; + }); + + if (m_AbortFlag) + { + return false; + } + + if (!CouldDecompress) + { + throw std::runtime_error(fmt::format("Failed to decompress large chunk {}", ChunkHash)); + } + + return true; + } +} + +void +BuildsOperationUpdateFolder::StreamDecompress(const IoHash& SequenceRawHash, CompositeBuffer&& CompressedPart) +{ + ZEN_TRACE_CPU("StreamDecompress"); + const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash); + TemporaryFile DecompressedTemp; + std::error_code Ec; + DecompressedTemp.CreateTemporary(TempChunkSequenceFileName.parent_path(), Ec); + if (Ec) + { + throw std::runtime_error(fmt::format("Failed creating temporary file for decompressing large blob {}, reason: ({}) {}", + SequenceRawHash, + Ec.value(), + Ec.message())); + } + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompressedPart, RawHash, RawSize); + if (!Compressed) + { + throw std::runtime_error(fmt::format("Failed to parse header of compressed large blob {}", SequenceRawHash)); + } + if (RawHash != SequenceRawHash) + { + throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, SequenceRawHash)); + } + PrepareFileForScatteredWrite(DecompressedTemp.Handle(), RawSize); + + IoHashStream Hash; + bool CouldDecompress = + Compressed.DecompressToStream(0, + (uint64_t)-1, + [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { + ZEN_UNUSED(SourceOffset); + ZEN_TRACE_CPU("StreamDecompress_Write"); + m_DiskStats.ReadCount++; + m_DiskStats.ReadByteCount += SourceSize; + if (!m_AbortFlag) + { + for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) + { + if (m_Options.ValidateCompletedSequences) + { + Hash.Append(Segment.GetView()); + m_ValidatedChunkByteCount += Segment.GetSize(); + } + DecompressedTemp.Write(Segment, Offset); + Offset += Segment.GetSize(); + m_DiskStats.WriteByteCount += Segment.GetSize(); + m_DiskStats.WriteCount++; + m_WrittenChunkByteCount += Segment.GetSize(); + } + return true; + } + return false; + }); + + if (m_AbortFlag) + { + return; + } + + if (!CouldDecompress) + { + throw std::runtime_error(fmt::format("Failed to decompress large blob {}", SequenceRawHash)); + } + if (m_Options.ValidateCompletedSequences) + { + const IoHash VerifyHash = Hash.GetHash(); + if (VerifyHash != SequenceRawHash) + { + throw std::runtime_error( + fmt::format("Decompressed blob payload hash {} does not match expected hash {}", VerifyHash, SequenceRawHash)); + } + } + DecompressedTemp.MoveTemporaryIntoPlace(TempChunkSequenceFileName, Ec); + if (Ec) + { + throw std::runtime_error(fmt::format("Failed moving temporary file for decompressing large blob {}, reason: ({}) {}", + SequenceRawHash, + Ec.value(), + Ec.message())); + } + // WriteChunkStats.ChunkCountWritten++; +} + +void +BuildsOperationUpdateFolder::WriteSequenceChunkToCache(BufferedWriteFileCache::Local& LocalWriter, + const CompositeBuffer& Chunk, + const uint32_t SequenceIndex, + const uint64_t FileOffset, + const uint32_t PathIndex) +{ + ZEN_TRACE_CPU("WriteSequenceChunkToCache"); + + const uint64_t SequenceSize = m_RemoteContent.RawSizes[PathIndex]; + + auto OpenFile = [&](BasicFile& File) { + const std::filesystem::path FileName = + GetTempChunkedSequenceFileName(m_CacheFolderPath, m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); + File.Open(FileName, BasicFile::Mode::kWrite); + if (m_Options.UseSparseFiles) + { + PrepareFileForScatteredWrite(File.Handle(), SequenceSize); + } + }; + + const uint64_t ChunkSize = Chunk.GetSize(); + ZEN_ASSERT(FileOffset + ChunkSize <= SequenceSize); + if (ChunkSize == SequenceSize) + { + BasicFile SingleChunkFile; + OpenFile(SingleChunkFile); + + m_DiskStats.CurrentOpenFileCount++; + auto _ = MakeGuard([this]() { m_DiskStats.CurrentOpenFileCount--; }); + SingleChunkFile.Write(Chunk, FileOffset); + } + else + { + const uint64_t MaxWriterBufferSize = 256u * 1025u; + + BufferedWriteFileCache::Local::Writer* Writer = LocalWriter.GetWriter(SequenceIndex); + if (Writer) + { + if ((!Writer->Writer) && (ChunkSize < MaxWriterBufferSize)) + { + Writer->Writer = std::make_unique<BasicFileWriter>(*Writer->File, Min(SequenceSize, MaxWriterBufferSize)); + } + Writer->Write(Chunk, FileOffset); + } + else + { + Writer = LocalWriter.PutWriter(SequenceIndex, std::make_unique<BufferedWriteFileCache::Local::Writer>()); + + Writer->File = std::make_unique<BasicFile>(); + OpenFile(*Writer->File); + if (ChunkSize < MaxWriterBufferSize) + { + Writer->Writer = std::make_unique<BasicFileWriter>(*Writer->File, Min(SequenceSize, MaxWriterBufferSize)); + } + Writer->Write(Chunk, FileOffset); + } + } + m_DiskStats.WriteCount++; + m_DiskStats.WriteByteCount += ChunkSize; + m_WrittenChunkByteCount += ChunkSize; +} + +bool +BuildsOperationUpdateFolder::GetBlockWriteOps(const IoHash& BlockRawHash, + std::span<const IoHash> ChunkRawHashes, + std::span<const uint32_t> ChunkCompressedLengths, + std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, + const MemoryView BlockView, + uint32_t FirstIncludedBlockChunkIndex, + uint32_t LastIncludedBlockChunkIndex, + BlockWriteOps& OutOps) +{ + ZEN_TRACE_CPU("GetBlockWriteOps"); + + uint32_t OffsetInBlock = 0; + for (uint32_t ChunkBlockIndex = FirstIncludedBlockChunkIndex; ChunkBlockIndex <= LastIncludedBlockChunkIndex; ChunkBlockIndex++) + { + const uint32_t ChunkCompressedSize = ChunkCompressedLengths[ChunkBlockIndex]; + const IoHash& ChunkHash = ChunkRawHashes[ChunkBlockIndex]; + if (auto It = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); It != m_RemoteLookup.ChunkHashToChunkIndex.end()) + { + const uint32_t ChunkIndex = It->second; + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = + GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, ChunkIndex); + + if (!ChunkTargetPtrs.empty()) + { + bool NeedsWrite = true; + if (RemoteChunkIndexNeedsCopyFromSourceFlags[ChunkIndex].compare_exchange_strong(NeedsWrite, false)) + { + MemoryView ChunkMemoryView = BlockView.Mid(OffsetInBlock, ChunkCompressedSize); + IoHash VerifyChunkHash; + uint64_t VerifyChunkSize; + CompressedBuffer CompressedChunk = + CompressedBuffer::FromCompressed(SharedBuffer::MakeView(ChunkMemoryView), VerifyChunkHash, VerifyChunkSize); + if (!CompressedChunk) + { + throw std::runtime_error(fmt::format("Chunk {} at {}, size {} in block {} is not a valid compressed buffer", + ChunkHash, + OffsetInBlock, + ChunkCompressedSize, + BlockRawHash)); + } + if (VerifyChunkHash != ChunkHash) + { + throw std::runtime_error(fmt::format("Chunk {} at {}, size {} in block {} has a mismatching content hash {}", + ChunkHash, + OffsetInBlock, + ChunkCompressedSize, + BlockRawHash, + VerifyChunkHash)); + } + if (VerifyChunkSize != m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]) + { + throw std::runtime_error( + fmt::format("Chunk {} at {}, size {} in block {} has a mismatching raw size {}, expected {}", + ChunkHash, + OffsetInBlock, + ChunkCompressedSize, + BlockRawHash, + VerifyChunkSize, + m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex])); + } + + OodleCompressor ChunkCompressor; + OodleCompressionLevel ChunkCompressionLevel; + uint64_t ChunkBlockSize; + + bool GetCompressParametersSuccess = + CompressedChunk.TryGetCompressParameters(ChunkCompressor, ChunkCompressionLevel, ChunkBlockSize); + ZEN_ASSERT(GetCompressParametersSuccess); + + IoBuffer Decompressed; + if (ChunkCompressionLevel == OodleCompressionLevel::None) + { + MemoryView ChunkDecompressedMemoryView = ChunkMemoryView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder()); + Decompressed = + IoBuffer(IoBuffer::Wrap, ChunkDecompressedMemoryView.GetData(), ChunkDecompressedMemoryView.GetSize()); + } + else + { + Decompressed = CompressedChunk.Decompress().AsIoBuffer(); + } + + if (Decompressed.GetSize() != m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]) + { + throw std::runtime_error(fmt::format("Chunk {} at {}, size {} in block {} decompressed to size {}, expected {}", + ChunkHash, + OffsetInBlock, + ChunkCompressedSize, + BlockRawHash, + Decompressed.GetSize(), + m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex])); + } + + ZEN_ASSERT_SLOW(ChunkHash == IoHash::HashBuffer(Decompressed)); + for (const ChunkedContentLookup::ChunkSequenceLocation* Target : ChunkTargetPtrs) + { + OutOps.WriteOps.push_back( + BlockWriteOps::WriteOpData{.Target = Target, .ChunkBufferIndex = OutOps.ChunkBuffers.size()}); + } + OutOps.ChunkBuffers.emplace_back(std::move(Decompressed)); + } + } + } + + OffsetInBlock += ChunkCompressedSize; + } + { + ZEN_TRACE_CPU("Sort"); + std::sort(OutOps.WriteOps.begin(), + OutOps.WriteOps.end(), + [](const BlockWriteOps::WriteOpData& Lhs, const BlockWriteOps::WriteOpData& Rhs) { + if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex) + { + return true; + } + if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex) + { + return false; + } + return Lhs.Target->Offset < Rhs.Target->Offset; + }); + } + return true; +} + +void +BuildsOperationUpdateFolder::WriteBlockChunkOpsToCache(std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + const BlockWriteOps& Ops, + BufferedWriteFileCache& WriteCache, + ParallelWork& Work) +{ + ZEN_TRACE_CPU("WriteBlockChunkOpsToCache"); + + { + BufferedWriteFileCache::Local LocalWriter(WriteCache); + for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps) + { + if (Work.IsAborted()) + { + break; + } + const CompositeBuffer& Chunk = Ops.ChunkBuffers[WriteOp.ChunkBufferIndex]; + const uint32_t SequenceIndex = WriteOp.Target->SequenceIndex; + ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() <= + m_RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]); + ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() > 0); + const uint64_t FileOffset = WriteOp.Target->Offset; + const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; + + WriteSequenceChunkToCache(LocalWriter, Chunk, SequenceIndex, FileOffset, PathIndex); + } + } + if (!Work.IsAborted()) + { + // Write tracking, updating this must be done without any files open (BufferedWriteFileCache::Local) + std::vector<uint32_t> CompletedChunkSequences; + for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps) + { + const uint32_t RemoteSequenceIndex = WriteOp.Target->SequenceIndex; + if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) + { + CompletedChunkSequences.push_back(RemoteSequenceIndex); + } + } + WriteCache.Close(CompletedChunkSequences); + VerifyAndCompleteChunkSequencesAsync(CompletedChunkSequences, Work); + } +} + +bool +BuildsOperationUpdateFolder::WriteChunksBlockToCache(const ChunkBlockDescription& BlockDescription, + std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + ParallelWork& Work, + CompositeBuffer&& BlockBuffer, + std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, + BufferedWriteFileCache& WriteCache) +{ + ZEN_TRACE_CPU("WriteChunksBlockToCache"); + + IoBuffer BlockMemoryBuffer = MakeBufferMemoryBased(BlockBuffer); + const MemoryView BlockView = BlockMemoryBuffer.GetView(); + + BlockWriteOps Ops; + if ((BlockDescription.HeaderSize == 0) || BlockDescription.ChunkCompressedLengths.empty()) + { + ZEN_TRACE_CPU("WriteChunksBlockToCache_Legacy"); + + uint64_t HeaderSize; + const std::vector<uint32_t> ChunkCompressedLengths = + ReadChunkBlockHeader(BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder()), HeaderSize); + + if (GetBlockWriteOps(BlockDescription.BlockHash, + BlockDescription.ChunkRawHashes, + ChunkCompressedLengths, + SequenceIndexChunksLeftToWriteCounters, + RemoteChunkIndexNeedsCopyFromSourceFlags, + BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder() + HeaderSize), + 0, + gsl::narrow<uint32_t>(BlockDescription.ChunkRawHashes.size() - 1), + Ops)) + { + WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work); + return true; + } + return false; + } + + if (GetBlockWriteOps(BlockDescription.BlockHash, + BlockDescription.ChunkRawHashes, + BlockDescription.ChunkCompressedLengths, + SequenceIndexChunksLeftToWriteCounters, + RemoteChunkIndexNeedsCopyFromSourceFlags, + BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize), + 0, + gsl::narrow<uint32_t>(BlockDescription.ChunkRawHashes.size() - 1), + Ops)) + { + WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work); + return true; + } + return false; +} + +bool +BuildsOperationUpdateFolder::WritePartialBlockChunksToCache(const ChunkBlockDescription& BlockDescription, + std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + ParallelWork& Work, + CompositeBuffer&& PartialBlockBuffer, + uint32_t FirstIncludedBlockChunkIndex, + uint32_t LastIncludedBlockChunkIndex, + std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, + BufferedWriteFileCache& WriteCache) +{ + ZEN_TRACE_CPU("WritePartialBlockChunksToCache"); + + IoBuffer BlockMemoryBuffer = MakeBufferMemoryBased(PartialBlockBuffer); + const MemoryView BlockView = BlockMemoryBuffer.GetView(); + + BlockWriteOps Ops; + if (GetBlockWriteOps(BlockDescription.BlockHash, + BlockDescription.ChunkRawHashes, + BlockDescription.ChunkCompressedLengths, + SequenceIndexChunksLeftToWriteCounters, + RemoteChunkIndexNeedsCopyFromSourceFlags, + BlockView, + FirstIncludedBlockChunkIndex, + LastIncludedBlockChunkIndex, + Ops)) + { + WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work); + return true; + } + else + { + return false; + } +} + +void +BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(uint32_t RemoteChunkIndex, + const BlobsExistsResult& ExistsResult, + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>&& ChunkTargetPtrs, + BufferedWriteFileCache& WriteCache, + ParallelWork& Work, + IoBuffer&& Payload, + std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + std::atomic<uint64_t>& WritePartsComplete, + const uint64_t TotalPartWriteCount, + FilteredRate& FilteredWrittenBytesPerSecond) +{ + ZEN_TRACE_CPU("AsyncWriteDownloadedChunk"); + + const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + + const uint64_t Size = Payload.GetSize(); + + const bool ExistsInCache = m_Storage.CacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash); + + const bool PopulateCache = !ExistsInCache && m_Storage.CacheStorage && m_Options.PopulateCache; + + std::filesystem::path CompressedChunkPath = + TryMoveDownloadedChunk(Payload, + m_TempDownloadFolderPath / ChunkHash.ToHexString(), + /* ForceDiskBased */ PopulateCache || Size > m_Options.MaximumInMemoryPayloadSize); + if (PopulateCache) + { + IoBuffer CacheBlob = IoBufferBuilder::MakeFromFile(CompressedChunkPath); + if (CacheBlob) + { + m_Storage.CacheStorage->PutBuildBlob(m_BuildId, + ChunkHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(SharedBuffer(CacheBlob))); + } + } + + IoBufferFileReference FileRef; + bool EnableBacklog = !CompressedChunkPath.empty() || Payload.GetFileReference(FileRef); + + Work.ScheduleWork( + m_IOWorkerPool, + [this, + SequenceIndexChunksLeftToWriteCounters, + &Work, + CompressedChunkPath, + RemoteChunkIndex, + TotalPartWriteCount, + &WriteCache, + &WritePartsComplete, + &FilteredWrittenBytesPerSecond, + ChunkTargetPtrs = std::move(ChunkTargetPtrs), + CompressedPart = IoBuffer(std::move(Payload))](std::atomic<bool>&) mutable { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_WriteChunk"); + + FilteredWrittenBytesPerSecond.Start(); + + const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + if (CompressedChunkPath.empty()) + { + ZEN_ASSERT(CompressedPart); + } + else + { + ZEN_ASSERT(!CompressedPart); + CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); + if (!CompressedPart) + { + throw std::runtime_error( + fmt::format("Could not open dowloaded compressed chunk {} from {}", ChunkHash, CompressedChunkPath)); + } + } + + bool NeedHashVerify = WriteCompressedChunkToCache(ChunkHash, ChunkTargetPtrs, WriteCache, std::move(CompressedPart)); + if (!m_AbortFlag) + { + if (WritePartsComplete.fetch_add(1) + 1 == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } + + if (!CompressedChunkPath.empty()) + { + std::error_code Ec = TryRemoveFile(CompressedChunkPath); + if (Ec) + { + ZEN_DEBUG("Failed removing file '{}', reason: ({}) {}", CompressedChunkPath, Ec.value(), Ec.message()); + } + } + + std::vector<uint32_t> CompletedSequences = + CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); + WriteCache.Close(CompletedSequences); + if (NeedHashVerify) + { + VerifyAndCompleteChunkSequencesAsync(CompletedSequences, Work); + } + else + { + FinalizeChunkSequences(CompletedSequences); + } + } + } + }, + EnableBacklog ? WorkerThreadPool::EMode::EnableBacklog : WorkerThreadPool::EMode::DisableBacklog); +} + +void +BuildsOperationUpdateFolder::VerifyAndCompleteChunkSequencesAsync(std::span<const uint32_t> RemoteSequenceIndexes, ParallelWork& Work) +{ + if (RemoteSequenceIndexes.empty()) + { + return; + } + ZEN_TRACE_CPU("VerifyAndCompleteChunkSequence"); + if (m_Options.ValidateCompletedSequences) + { + for (uint32_t RemoteSequenceIndexOffset = 1; RemoteSequenceIndexOffset < RemoteSequenceIndexes.size(); RemoteSequenceIndexOffset++) + { + const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset]; + Work.ScheduleWork(m_IOWorkerPool, [this, RemoteSequenceIndex](std::atomic<bool>&) { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_VerifyAndFinalizeSequence"); + + VerifySequence(RemoteSequenceIndex); + if (!m_AbortFlag) + { + const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + FinalizeChunkSequence(SequenceRawHash); + } + } + }); + } + const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[0]; + + VerifySequence(RemoteSequenceIndex); + const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + FinalizeChunkSequence(SequenceRawHash); + } + else + { + for (uint32_t RemoteSequenceIndexOffset = 0; RemoteSequenceIndexOffset < RemoteSequenceIndexes.size(); RemoteSequenceIndexOffset++) + { + const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset]; + const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + FinalizeChunkSequence(SequenceRawHash); + } + } +} + +bool +BuildsOperationUpdateFolder::CompleteSequenceChunk(uint32_t RemoteSequenceIndex, + std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters) +{ + uint32_t PreviousValue = SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1); + ZEN_ASSERT(PreviousValue >= 1); + ZEN_ASSERT(PreviousValue != (uint32_t)-1); + return PreviousValue == 1; +} + +std::vector<uint32_t> +BuildsOperationUpdateFolder::CompleteChunkTargets(const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs, + std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters) +{ + ZEN_TRACE_CPU("CompleteChunkTargets"); + + std::vector<uint32_t> CompletedSequenceIndexes; + for (const ChunkedContentLookup::ChunkSequenceLocation* Location : ChunkTargetPtrs) + { + const uint32_t RemoteSequenceIndex = Location->SequenceIndex; + if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) + { + CompletedSequenceIndexes.push_back(RemoteSequenceIndex); + } + } + return CompletedSequenceIndexes; +} + +void +BuildsOperationUpdateFolder::FinalizeChunkSequence(const IoHash& SequenceRawHash) +{ + ZEN_TRACE_CPU("FinalizeChunkSequence"); + + ZEN_ASSERT_SLOW(!IsFile(GetFinalChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash))); + std::error_code Ec; + RenameFile(GetTempChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash), + GetFinalChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash), + Ec); + if (Ec) + { + throw std::system_error(Ec); + } +} + +void +BuildsOperationUpdateFolder::FinalizeChunkSequences(std::span<const uint32_t> RemoteSequenceIndexes) +{ + ZEN_TRACE_CPU("FinalizeChunkSequences"); + + for (uint32_t SequenceIndex : RemoteSequenceIndexes) + { + FinalizeChunkSequence(m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); + } +} + +void +BuildsOperationUpdateFolder::VerifySequence(uint32_t RemoteSequenceIndex) +{ + ZEN_TRACE_CPU("VerifySequence"); + + ZEN_ASSERT(m_Options.ValidateCompletedSequences); + + const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + { + ZEN_TRACE_CPU("HashSequence"); + const std::uint32_t RemotePathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]; + const uint64_t ExpectedSize = m_RemoteContent.RawSizes[RemotePathIndex]; + IoBuffer VerifyBuffer = IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash)); + const uint64_t VerifySize = VerifyBuffer.GetSize(); + if (VerifySize != ExpectedSize) + { + throw std::runtime_error(fmt::format("Written chunk sequence {} size {} does not match expected size {}", + SequenceRawHash, + VerifySize, + ExpectedSize)); + } + + const IoHash VerifyChunkHash = IoHash::HashBuffer(std::move(VerifyBuffer), &m_ValidatedChunkByteCount); + if (VerifyChunkHash != SequenceRawHash) + { + throw std::runtime_error( + fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash)); + } + } +} + +void +VerifyFolder(ProgressBase& Progress, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag, + TransferThreadWorkers& Workers, + const ChunkedFolderContent& Content, + const ChunkedContentLookup& Lookup, + const std::filesystem::path& Path, + const std::vector<std::string>& ExcludeFolders, + bool VerifyFileHash, + VerifyFolderStatistics& VerifyFolderStats) +{ + ZEN_TRACE_CPU("VerifyFolder"); + + Stopwatch Timer; + + std::unique_ptr<ProgressBase::ProgressBar> ProgressBar = Progress.CreateProgressBar("Verify Files"); + + WorkerThreadPool& VerifyPool = Workers.GetIOWorkerPool(); + + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + const uint32_t PathCount = gsl::narrow<uint32_t>(Content.Paths.size()); + + RwLock ErrorLock; + std::vector<std::string> Errors; + + auto IsAcceptedFolder = [ExcludeFolders = ExcludeFolders](const std::string_view& RelativePath) -> bool { + for (const std::string& ExcludeFolder : ExcludeFolders) + { + if (RelativePath.starts_with(ExcludeFolder)) + { + if (RelativePath.length() == ExcludeFolder.length()) + { + return false; + } + else if (RelativePath[ExcludeFolder.length()] == '/') + { + return false; + } + } + } + return true; + }; + + for (uint32_t PathIndex = 0; PathIndex < PathCount; PathIndex++) + { + if (Work.IsAborted()) + { + break; + } + + Work.ScheduleWork( + VerifyPool, + [&Path, &Content, &Lookup, &ErrorLock, &Errors, &VerifyFolderStats, VerifyFileHash, &IsAcceptedFolder, PathIndex, &AbortFlag]( + std::atomic<bool>&) { + if (!AbortFlag) + { + ZEN_TRACE_CPU("VerifyFile_work"); + + // TODO: Convert ScheduleWork body to function + + const std::filesystem::path TargetPath = (Path / Content.Paths[PathIndex]).make_preferred(); + if (IsAcceptedFolder(TargetPath.parent_path().generic_string())) + { + const uint64_t ExpectedSize = Content.RawSizes[PathIndex]; + if (!IsFile(TargetPath)) + { + ErrorLock.WithExclusiveLock([&]() { + Errors.push_back(fmt::format("File {} with expected size {} does not exist", TargetPath, ExpectedSize)); + }); + VerifyFolderStats.FilesFailed++; + } + else + { + std::error_code Ec; + uint64_t SizeOnDisk = gsl::narrow<uint64_t>(FileSizeFromPath(TargetPath, Ec)); + if (Ec) + { + ErrorLock.WithExclusiveLock([&]() { + Errors.push_back( + fmt::format("Failed to get size of file {}: {} ({})", TargetPath, Ec.message(), Ec.value())); + }); + VerifyFolderStats.FilesFailed++; + } + else if (SizeOnDisk < ExpectedSize) + { + ErrorLock.WithExclusiveLock([&]() { + Errors.push_back(fmt::format("Size of file {} is smaller than expected. Expected: {}, Found: {}", + TargetPath, + ExpectedSize, + SizeOnDisk)); + }); + VerifyFolderStats.FilesFailed++; + } + else if (SizeOnDisk > ExpectedSize) + { + ErrorLock.WithExclusiveLock([&]() { + Errors.push_back(fmt::format("Size of file {} is bigger than expected. Expected: {}, Found: {}", + TargetPath, + ExpectedSize, + SizeOnDisk)); + }); + VerifyFolderStats.FilesFailed++; + } + else if (SizeOnDisk > 0 && VerifyFileHash) + { + const IoHash& ExpectedRawHash = Content.RawHashes[PathIndex]; + IoBuffer Buffer = IoBufferBuilder::MakeFromFile(TargetPath); + IoHash RawHash = IoHash::HashBuffer(Buffer); + if (RawHash != ExpectedRawHash) + { + uint64_t FileOffset = 0; + const uint32_t SequenceIndex = Lookup.RawHashToSequenceIndex.at(ExpectedRawHash); + const uint32_t OrderOffset = Lookup.SequenceIndexChunkOrderOffset[SequenceIndex]; + for (uint32_t OrderIndex = OrderOffset; + OrderIndex < OrderOffset + Content.ChunkedContent.ChunkCounts[SequenceIndex]; + OrderIndex++) + { + uint32_t ChunkIndex = Content.ChunkedContent.ChunkOrders[OrderIndex]; + uint64_t ChunkSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; + IoHash ChunkHash = Content.ChunkedContent.ChunkHashes[ChunkIndex]; + IoBuffer FileChunk = IoBuffer(Buffer, FileOffset, ChunkSize); + if (IoHash::HashBuffer(FileChunk) != ChunkHash) + { + ErrorLock.WithExclusiveLock([&]() { + Errors.push_back(fmt::format( + "WARNING: Hash of file {} does not match expected hash. Expected: {}, Found: {}. " + "Mismatch at chunk {}", + TargetPath, + ExpectedRawHash, + RawHash, + OrderIndex - OrderOffset)); + }); + break; + } + FileOffset += ChunkSize; + } + VerifyFolderStats.FilesFailed++; + } + VerifyFolderStats.ReadBytes += SizeOnDisk; + } + } + } + VerifyFolderStats.FilesVerified++; + } + }, + [&, PathIndex](std::exception_ptr Ex, std::atomic<bool>&) { + std::string Description; + try + { + std::rethrow_exception(Ex); + } + catch (const std::exception& Ex) + { + Description = Ex.what(); + } + ErrorLock.WithExclusiveLock([&]() { + Errors.push_back(fmt::format("Failed verifying file '{}'. Reason: {}", + (Path / Content.Paths[PathIndex]).make_preferred(), + Description)); + }); + VerifyFolderStats.FilesFailed++; + }); + } + + Work.Wait(Progress.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); + std::string Details = fmt::format("Verified {}/{} ({}). Failed files: {}", + VerifyFolderStats.FilesVerified.load(), + PathCount, + NiceBytes(VerifyFolderStats.ReadBytes.load()), + VerifyFolderStats.FilesFailed.load()); + ProgressBar->UpdateState({.Task = "Verifying files ", + .Details = Details, + .TotalCount = gsl::narrow<uint64_t>(PathCount), + .RemainingCount = gsl::narrow<uint64_t>(PathCount - VerifyFolderStats.FilesVerified.load()), + .Status = ProgressBase::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, + false); + }); + VerifyFolderStats.VerifyElapsedWallTimeUs = Timer.GetElapsedTimeUs(); + + ProgressBar->Finish(); + if (AbortFlag) + { + return; + } + + for (const std::string& Error : Errors) + { + ZEN_CONSOLE_ERROR("{}", Error); + } + if (!Errors.empty()) + { + throw std::runtime_error(fmt::format("Verify failed with {} errors", Errors.size())); + } +} + +std::vector<std::filesystem::path> +GetNewPaths(const std::span<const std::filesystem::path> KnownPaths, const std::span<const std::filesystem::path> Paths) +{ + tsl::robin_set<std::string> KnownPathsSet; + KnownPathsSet.reserve(KnownPaths.size()); + for (const std::filesystem::path& LocalPath : KnownPaths) + { + KnownPathsSet.insert(LocalPath.generic_string()); + } + + std::vector<std::filesystem::path> NewPaths; + for (const std::filesystem::path& UntrackedPath : Paths) + { + if (!KnownPathsSet.contains(UntrackedPath.generic_string())) + { + NewPaths.push_back(UntrackedPath); + } + } + return NewPaths; +} + +BuildSaveState +GetLocalStateFromPaths(ProgressBase& Progress, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag, + TransferThreadWorkers& Workers, + GetFolderContentStatistics& LocalFolderScanStats, + ChunkingStatistics& ChunkingStats, + const std::filesystem::path& Path, + ChunkingController& ChunkController, + ChunkingCache& ChunkCache, + std::span<const std::filesystem::path> PathsToCheck) +{ + FolderContent FolderState = + CheckFolderFiles(Progress, AbortFlag, PauseFlag, "Check Files", Workers, LocalFolderScanStats, Path, PathsToCheck); + + ChunkedFolderContent ChunkedContent; + if (FolderState.Paths.size() > 0) + { + ChunkedContent = ScanFolderFiles(Progress, + AbortFlag, + PauseFlag, + "Scan Files", + Workers, + Path, + FolderState, + ChunkController, + ChunkCache, + ChunkingStats); + } + + return BuildSaveState{.State = BuildState{.ChunkedContent = std::move(ChunkedContent)}, .FolderState = FolderState, .LocalPath = Path}; +} + +BuildSaveState +GetLocalContent(ProgressBase& Progress, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag, + bool IsQuiet, + TransferThreadWorkers& Workers, + GetFolderContentStatistics& LocalFolderScanStats, + ChunkingStatistics& ChunkingStats, + const std::filesystem::path& Path, + const std::filesystem::path& StateFilePath, + ChunkingController& ChunkController, + ChunkingCache& ChunkCache) +{ + Stopwatch ReadStateTimer; + bool FileExists = IsFile(StateFilePath); + if (!FileExists) + { + ZEN_CONSOLE("No known local state file in {}, falling back to scanning", Path); + return {}; + } + + BuildSaveState SavedLocalState; + try + { + SavedLocalState = ReadBuildSaveStateFile(StateFilePath); + if (!IsQuiet) + { + ZEN_CONSOLE("Read local state file {} in {}", StateFilePath, NiceTimeSpanMs(ReadStateTimer.GetElapsedTimeMs())); + } + } + catch (const std::exception& Ex) + { + ZEN_CONSOLE_WARN("Failed reading state file {}, falling back to scannning. Reason: {}", StateFilePath, Ex.what()); + return {}; + } + + FolderContent CurrentLocalFolderState = CheckFolderFiles(Progress, + AbortFlag, + PauseFlag, + "Check Known Files", + Workers, + LocalFolderScanStats, + Path, + SavedLocalState.FolderState.Paths); + if (AbortFlag) + { + return {}; + } + + if (!SavedLocalState.FolderState.AreKnownFilesEqual(CurrentLocalFolderState)) + { + const size_t LocalStatePathCount = SavedLocalState.FolderState.Paths.size(); + std::vector<std::filesystem::path> DeletedPaths; + FolderContent UpdatedContent = GetUpdatedContent(SavedLocalState.FolderState, CurrentLocalFolderState, DeletedPaths); + if (!DeletedPaths.empty()) + { + SavedLocalState.State.ChunkedContent = DeletePathsFromChunkedContent(SavedLocalState.State.ChunkedContent, DeletedPaths); + } + + if (!IsQuiet) + { + ZEN_CONSOLE("Updating state, {} local files deleted and {} local files updated out of {}", + DeletedPaths.size(), + UpdatedContent.Paths.size(), + LocalStatePathCount); + } + if (UpdatedContent.Paths.size() > 0) + { + ChunkedFolderContent UpdatedLocalContent = ScanFolderFiles(Progress, + AbortFlag, + PauseFlag, + "Scan Known Files", + Workers, + Path, + UpdatedContent, + ChunkController, + ChunkCache, + ChunkingStats); + if (AbortFlag) + { + return {}; + } + SavedLocalState.State.ChunkedContent = + MergeChunkedFolderContents(SavedLocalState.State.ChunkedContent, {{UpdatedLocalContent}}); + } + } + else + { + // Remove files from LocalContent no longer in LocalFolderState + tsl::robin_set<std::string> LocalFolderPaths; + LocalFolderPaths.reserve(SavedLocalState.FolderState.Paths.size()); + for (const std::filesystem::path& LocalFolderPath : SavedLocalState.FolderState.Paths) + { + LocalFolderPaths.insert(LocalFolderPath.generic_string()); + } + std::vector<std::filesystem::path> DeletedPaths; + for (const std::filesystem::path& LocalContentPath : SavedLocalState.State.ChunkedContent.Paths) + { + if (!LocalFolderPaths.contains(LocalContentPath.generic_string())) + { + DeletedPaths.push_back(LocalContentPath); + } + } + if (!DeletedPaths.empty()) + { + SavedLocalState.State.ChunkedContent = DeletePathsFromChunkedContent(SavedLocalState.State.ChunkedContent, DeletedPaths); + } + } + + SavedLocalState.FolderState = CurrentLocalFolderState; + + return SavedLocalState; +} + +void +DownloadFolder(LoggerRef InLog, + ProgressBase& Progress, + TransferThreadWorkers& Workers, + StorageInstance& Storage, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag, + const BuildStorageCache::Statistics& StorageCacheStats, + const Oid& BuildId, + const std::vector<Oid>& BuildPartIds, + std::span<const std::string> BuildPartNames, + const std::filesystem::path& DownloadSpecPath, + const std::filesystem::path& Path, + const DownloadOptions& Options) +{ + ZEN_TRACE_CPU("DownloadFolder"); + ZEN_SCOPED_LOG(InLog); + + Progress.SetLogOperationName("Download Folder"); + + enum TaskSteps : uint32_t + { + CheckState, + CompareState, + Download, + Verify, + Cleanup, + StepCount + }; + + auto EndProgress = MakeGuard([&]() { Progress.SetLogOperationProgress(TaskSteps::StepCount, TaskSteps::StepCount); }); + + Stopwatch DownloadTimer; + + Progress.SetLogOperationProgress(TaskSteps::CheckState, TaskSteps::StepCount); + + const std::filesystem::path ZenTempFolder = ZenTempFolderPath(Options.ZenFolderPath); + CreateDirectories(ZenTempFolder); + + std::uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u; + + CbObject BuildObject = GetBuild(*Storage.BuildStorage, BuildId, Options.IsQuiet); + + std::vector<std::pair<Oid, std::string>> AllBuildParts = + ResolveBuildPartNames(BuildObject, BuildId, BuildPartIds, BuildPartNames, PreferredMultipartChunkSize); + + BuildManifest Manifest; + if (!DownloadSpecPath.empty()) + { + const std::filesystem::path AbsoluteDownloadSpecPath = + DownloadSpecPath.is_relative() ? MakeSafeAbsolutePath(Path / DownloadSpecPath) : MakeSafeAbsolutePath(DownloadSpecPath); + Manifest = ParseBuildManifest(AbsoluteDownloadSpecPath); + } + + std::vector<ChunkedFolderContent> PartContents; + + std::unique_ptr<ChunkingController> ChunkController; + + std::vector<ChunkBlockDescription> BlockDescriptions; + std::vector<IoHash> LooseChunkHashes; + + Progress.SetLogOperationProgress(TaskSteps::CompareState, TaskSteps::StepCount); + + ChunkedFolderContent RemoteContent = GetRemoteContent(InLog, + Storage, + BuildId, + AllBuildParts, + Manifest, + Options.IncludeWildcards, + Options.ExcludeWildcards, + ChunkController, + PartContents, + BlockDescriptions, + LooseChunkHashes, + Options.IsQuiet, + Options.IsVerbose, + Options.DoExtraContentVerify); + + const std::uint64_t LargeAttachmentSize = Options.AllowMultiparts ? PreferredMultipartChunkSize * 4u : (std::uint64_t)-1; + GetFolderContentStatistics LocalFolderScanStats; + ChunkingStatistics ChunkingStats; + + BuildSaveState LocalState; + + if (IsDir(Path)) + { + if (!ChunkController && !Options.IsQuiet) + { + ZEN_CONSOLE_INFO("Unspecified chunking algorithm, using default"); + ChunkController = CreateStandardChunkingController(StandardChunkingControllerSettings{}); + } + std::unique_ptr<ChunkingCache> ChunkCache(CreateNullChunkingCache()); + + LocalState = GetLocalContent(Progress, + AbortFlag, + PauseFlag, + Options.IsQuiet, + Workers, + LocalFolderScanStats, + ChunkingStats, + Path, + ZenStateFilePath(Path / ZenFolderName), + *ChunkController, + *ChunkCache); + + std::vector<std::filesystem::path> UntrackedPaths = GetNewPaths(LocalState.State.ChunkedContent.Paths, RemoteContent.Paths); + + BuildSaveState UntrackedLocalContent = GetLocalStateFromPaths(Progress, + AbortFlag, + PauseFlag, + Workers, + LocalFolderScanStats, + ChunkingStats, + Path, + *ChunkController, + *ChunkCache, + UntrackedPaths); + + if (!UntrackedLocalContent.State.ChunkedContent.Paths.empty()) + { + LocalState.State.ChunkedContent = + MergeChunkedFolderContents(LocalState.State.ChunkedContent, + std::vector<ChunkedFolderContent>{UntrackedLocalContent.State.ChunkedContent}); + + // TODO: Helper + LocalState.FolderState.Paths.insert(LocalState.FolderState.Paths.begin(), + UntrackedLocalContent.FolderState.Paths.begin(), + UntrackedLocalContent.FolderState.Paths.end()); + LocalState.FolderState.RawSizes.insert(LocalState.FolderState.RawSizes.begin(), + UntrackedLocalContent.FolderState.RawSizes.begin(), + UntrackedLocalContent.FolderState.RawSizes.end()); + LocalState.FolderState.Attributes.insert(LocalState.FolderState.Attributes.begin(), + UntrackedLocalContent.FolderState.Attributes.begin(), + UntrackedLocalContent.FolderState.Attributes.end()); + LocalState.FolderState.ModificationTicks.insert(LocalState.FolderState.ModificationTicks.begin(), + UntrackedLocalContent.FolderState.ModificationTicks.begin(), + UntrackedLocalContent.FolderState.ModificationTicks.end()); + } + + if (Options.AppendNewContent) + { + RemoteContent = ApplyChunkedContentOverlay(LocalState.State.ChunkedContent, + RemoteContent, + Options.IncludeWildcards, + Options.ExcludeWildcards); + } +#if ZEN_BUILD_DEBUG + ValidateChunkedFolderContent(RemoteContent, + BlockDescriptions, + LooseChunkHashes, + Options.IncludeWildcards, + Options.ExcludeWildcards); +#endif // ZEN_BUILD_DEBUG + } + else + { + CreateDirectories(Path); + } + if (AbortFlag) + { + return; + } + + LocalState.LocalPath = Path; + + { + BuildsSelection::Build RemoteBuildState = {.Id = BuildId, + .IncludeWildcards = Options.IncludeWildcards, + .ExcludeWildcards = Options.ExcludeWildcards}; + RemoteBuildState.Parts.reserve(BuildPartIds.size()); + for (size_t PartIndex = 0; PartIndex < BuildPartIds.size(); PartIndex++) + { + RemoteBuildState.Parts.push_back( + {BuildsSelection::BuildPart{.Id = BuildPartIds[PartIndex], + .Name = PartIndex < BuildPartNames.size() ? BuildPartNames[PartIndex] : ""}}); + } + + if (Options.AppendNewContent) + { + LocalState.State.Selection.Builds.emplace_back(std::move(RemoteBuildState)); + } + else + { + LocalState.State.Selection.Builds = std::vector<BuildsSelection::Build>{std::move(RemoteBuildState)}; + } + } + + if ((Options.EnableTargetFolderScavenging || Options.AppendNewContent) && !Options.CleanTargetFolder && + CompareChunkedContent(RemoteContent, LocalState.State.ChunkedContent)) + { + if (!Options.IsQuiet) + { + ZEN_CONSOLE("Local state is identical to build to download. All done. Completed in {}.", + NiceTimeSpanMs(DownloadTimer.GetElapsedTimeMs())); + } + + Stopwatch WriteStateTimer; + + CbObject StateObject = CreateBuildSaveStateObject(LocalState); + CreateDirectories(ZenStateFilePath(Options.ZenFolderPath).parent_path()); + TemporaryFile::SafeWriteFile(ZenStateFilePath(Options.ZenFolderPath), StateObject.GetView()); + if (!Options.IsQuiet) + { + ZEN_CONSOLE("Wrote local state in {}", NiceTimeSpanMs(WriteStateTimer.GetElapsedTimeMs())); + } + + AddDownloadedPath(Options.SystemRootDir, + BuildsDownloadInfo{.Selection = LocalState.State.Selection, + .LocalPath = Path, + .StateFilePath = ZenStateFilePath(Options.ZenFolderPath), + .Iso8601Date = DateTime::Now().ToIso8601()}); + } + else + { + ExtendableStringBuilder<128> BuildPartString; + for (const std::pair<Oid, std::string>& BuildPart : AllBuildParts) + { + BuildPartString.Append(fmt::format(" {} ({})", BuildPart.second, BuildPart.first)); + } + + uint64_t RawSize = std::accumulate(RemoteContent.RawSizes.begin(), RemoteContent.RawSizes.end(), std::uint64_t(0)); + + if (!Options.IsQuiet) + { + ZEN_CONSOLE("Downloading build {}, parts:{} to '{}' ({})", BuildId, BuildPartString.ToView(), Path, NiceBytes(RawSize)); + } + + Stopwatch IndexTimer; + + const ChunkedContentLookup LocalLookup = BuildChunkedContentLookup(LocalState.State.ChunkedContent); + const ChunkedContentLookup RemoteLookup = BuildChunkedContentLookup(RemoteContent); + + if (!Options.IsQuiet) + { + ZEN_INFO("Indexed local and remote content in {}", NiceTimeSpanMs(IndexTimer.GetElapsedTimeMs())); + } + + Progress.SetLogOperationProgress(TaskSteps::Download, TaskSteps::StepCount); + + BuildsOperationUpdateFolder Updater( + InLog, + Progress, + Storage, + AbortFlag, + PauseFlag, + Workers.GetIOWorkerPool(), + Workers.GetNetworkPool(), + BuildId, + Path, + LocalState.State.ChunkedContent, + LocalLookup, + RemoteContent, + RemoteLookup, + BlockDescriptions, + LooseChunkHashes, + BuildsOperationUpdateFolder::Options{ + .IsQuiet = Options.IsQuiet, + .IsVerbose = Options.IsVerbose, + .AllowFileClone = Options.AllowFileClone, + .UseSparseFiles = Options.UseSparseFiles, + .SystemRootDir = Options.SystemRootDir, + .ZenFolderPath = Options.ZenFolderPath, + .LargeAttachmentSize = LargeAttachmentSize, + .PreferredMultipartChunkSize = PreferredMultipartChunkSize, + .PartialBlockRequestMode = Options.PartialBlockRequestMode, + .WipeTargetFolder = Options.CleanTargetFolder, + .EnableOtherDownloadsScavenging = Options.EnableOtherDownloadsScavenging, + .EnableTargetFolderScavenging = Options.EnableTargetFolderScavenging || Options.AppendNewContent, + .ValidateCompletedSequences = Options.PostDownloadVerify, + .ExcludeFolders = Options.ExcludeFolders, + .MaximumInMemoryPayloadSize = Options.MaximumInMemoryPayloadSize, + .PopulateCache = Options.PopulateCache}); + { + Progress.PushLogOperation("Download"); + auto _ = MakeGuard([&Progress]() { Progress.PopLogOperation(); }); + FolderContent UpdatedLocalFolderState; + Updater.Execute(UpdatedLocalFolderState); + + LocalState.State.ChunkedContent = RemoteContent; + LocalState.FolderState = std::move(UpdatedLocalFolderState); + } + + VerifyFolderStatistics VerifyFolderStats; + if (!AbortFlag) + { + AddDownloadedPath(Options.SystemRootDir, + BuildsDownloadInfo{.Selection = LocalState.State.Selection, + .LocalPath = Path, + .StateFilePath = ZenStateFilePath(Options.ZenFolderPath), + .Iso8601Date = DateTime::Now().ToIso8601()}); + + Progress.SetLogOperationProgress(TaskSteps::Verify, TaskSteps::StepCount); + + VerifyFolder(Progress, + AbortFlag, + PauseFlag, + Workers, + RemoteContent, + RemoteLookup, + Path, + Options.ExcludeFolders, + Options.PostDownloadVerify, + VerifyFolderStats); + + Stopwatch WriteStateTimer; + CbObject StateObject = CreateBuildSaveStateObject(LocalState); + + CreateDirectories(ZenStateFilePath(Options.ZenFolderPath).parent_path()); + TemporaryFile::SafeWriteFile(ZenStateFilePath(Options.ZenFolderPath), StateObject.GetView()); + if (!Options.IsQuiet) + { + ZEN_CONSOLE("Wrote local state in {}", NiceTimeSpanMs(WriteStateTimer.GetElapsedTimeMs())); + } + +#if 0 + ExtendableStringBuilder<1024> SB; + CompactBinaryToJson(StateObject, SB); + WriteFile(ZenStateFileJsonPath(Options.ZenFolderPath), IoBuffer(IoBuffer::Wrap, SB.Data(), SB.Size())); +#endif // 0 + const uint64_t DownloadCount = Updater.m_DownloadStats.DownloadedChunkCount.load() + + Updater.m_DownloadStats.DownloadedBlockCount.load() + + Updater.m_DownloadStats.DownloadedPartialBlockCount.load(); + const uint64_t DownloadByteCount = Updater.m_DownloadStats.DownloadedChunkByteCount.load() + + Updater.m_DownloadStats.DownloadedBlockByteCount.load() + + Updater.m_DownloadStats.DownloadedPartialBlockByteCount.load(); + const uint64_t DownloadTimeMs = DownloadTimer.GetElapsedTimeMs(); + + if (!Options.IsQuiet) + { + std::string CloneInfo; + if (Updater.m_DiskStats.CloneByteCount > 0) + { + CloneInfo = fmt::format(" ({} cloned)", NiceBytes(Updater.m_DiskStats.CloneByteCount.load())); + } + + std::string DownloadDetails; + { + ExtendableStringBuilder<128> SB; + BuildStorageBase::ExtendedStatistics ExtendedDownloadStats; + if (Storage.BuildStorage->GetExtendedStatistics(ExtendedDownloadStats)) + { + if (!ExtendedDownloadStats.ReceivedBytesPerSource.empty()) + { + for (auto& It : ExtendedDownloadStats.ReceivedBytesPerSource) + { + if (SB.Size() > 0) + { + SB.Append(", "sv); + } + SB.Append(It.first); + SB.Append(": "sv); + SB.Append(NiceBytes(It.second)); + } + } + } + if (Storage.CacheStorage) + { + if (SB.Size() > 0) + { + SB.Append(", "sv); + } + SB.Append("Cache: "); + SB.Append(NiceBytes(StorageCacheStats.TotalBytesRead.load())); + } + if (SB.Size() > 0) + { + DownloadDetails = fmt::format(" ({})", SB.ToView()); + } + } + + ZEN_CONSOLE( + "Downloaded build {}, parts:{} in {}\n" + " Scavenge: {} (Target: {}, Cache: {}, Others: {})\n" + " Download: {} ({}) {}bits/s{}\n" + " Write: {} ({}) {}B/s{}\n" + " Clean: {}\n" + " Finalize: {}\n" + " Verify: {}", + BuildId, + BuildPartString.ToView(), + NiceTimeSpanMs(DownloadTimeMs), + + NiceTimeSpanMs((Updater.m_CacheMappingStats.CacheScanElapsedWallTimeUs + + Updater.m_CacheMappingStats.LocalScanElapsedWallTimeUs + + Updater.m_CacheMappingStats.ScavengeElapsedWallTimeUs) / + 1000), + NiceTimeSpanMs(Updater.m_CacheMappingStats.LocalScanElapsedWallTimeUs / 1000), + NiceTimeSpanMs(Updater.m_CacheMappingStats.CacheScanElapsedWallTimeUs / 1000), + NiceTimeSpanMs(Updater.m_CacheMappingStats.ScavengeElapsedWallTimeUs / 1000), + + DownloadCount, + NiceBytes(DownloadByteCount), + NiceNum(GetBytesPerSecond(Updater.m_WriteChunkStats.DownloadTimeUs, DownloadByteCount * 8)), + DownloadDetails, + + Updater.m_DiskStats.WriteCount.load(), + NiceBytes(Updater.m_WrittenChunkByteCount.load()), + NiceNum(GetBytesPerSecond(Updater.m_WriteChunkStats.WriteTimeUs, Updater.m_DiskStats.WriteByteCount.load())), + CloneInfo, + + NiceTimeSpanMs(Updater.m_RebuildFolderStateStats.CleanFolderElapsedWallTimeUs / 1000), + + NiceTimeSpanMs(Updater.m_RebuildFolderStateStats.FinalizeTreeElapsedWallTimeUs / 1000), + + NiceTimeSpanMs(VerifyFolderStats.VerifyElapsedWallTimeUs / 1000)); + } + } + } + + Progress.SetLogOperationProgress(TaskSteps::Cleanup, TaskSteps::StepCount); + + CleanAndRemoveDirectory(Workers.GetIOWorkerPool(), AbortFlag, PauseFlag, ZenTempFolder); +} +} // namespace zen |