diff options
Diffstat (limited to 'src/zenremotestore/builds/buildstorageoperations.cpp')
| -rw-r--r-- | src/zenremotestore/builds/buildstorageoperations.cpp | 4742 |
1 files changed, 4742 insertions, 0 deletions
diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp new file mode 100644 index 000000000..f3a585737 --- /dev/null +++ b/src/zenremotestore/builds/buildstorageoperations.cpp @@ -0,0 +1,4742 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenremotestore/builds/buildstorageoperations.h> + +#include <zenremotestore/builds/buildstorage.h> +#include <zenremotestore/builds/buildstoragecache.h> +#include <zenremotestore/chunking/chunkblock.h> + +#include <zencore/basicfile.h> +#include <zencore/compactbinary.h> +#include <zencore/compactbinaryfile.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/parallelwork.h> +#include <zencore/scopeguard.h> +#include <zencore/string.h> +#include <zencore/timer.h> +#include <zencore/trace.h> +#include <zenutil/bufferedopenfile.h> + +#include <numeric> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_map.h> +#include <tsl/robin_set.h> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +using namespace std::literals; + +#define DO_LOG_OUTPUT(OutputTarget, InLevel, fmtstr, ...) \ + do \ + { \ + using namespace std::literals; \ + ZEN_CHECK_FORMAT_STRING(fmtstr##sv, ##__VA_ARGS__); \ + OutputTarget.EmitLogMessage(InLevel, fmtstr, zen::logging::LogCaptureArguments(__VA_ARGS__)); \ + } while (false) + +#define LOG_OUTPUT(OutputTarget, fmtstr, ...) DO_LOG_OUTPUT(OutputTarget, zen::logging::level::Info, fmtstr, ##__VA_ARGS__) +#define LOG_OUTPUT_DEBUG(OutputTarget, fmtstr, ...) DO_LOG_OUTPUT(OutputTarget, zen::logging::level::Debug, fmtstr, ##__VA_ARGS__) +#define LOG_OUTPUT_WARN(OutputTarget, fmtstr, ...) DO_LOG_OUTPUT(OutputTarget, zen::logging::level::Warn, fmtstr, ##__VA_ARGS__) + +namespace { + std::filesystem::path ZenTempCacheFolderPath(const std::filesystem::path& ZenFolderPath) + { + return ZenTempFolderPath(ZenFolderPath) / "cache"; // Decompressed and verified data - chunks & sequences + } + std::filesystem::path ZenTempBlockFolderPath(const std::filesystem::path& ZenFolderPath) + { + return ZenTempFolderPath(ZenFolderPath) / "blocks"; // Temp storage for whole and partial blocks + } + std::filesystem::path ZenTempDownloadFolderPath(const std::filesystem::path& ZenFolderPath) + { + return ZenTempFolderPath(ZenFolderPath) / "download"; // Temp storage for decompressed and validated chunks + } + + uint64_t GetBytesPerSecond(uint64_t ElapsedWallTimeUS, uint64_t Count) + { + if (ElapsedWallTimeUS == 0) + { + return 0; + } + return Count * 1000000 / ElapsedWallTimeUS; + } + + std::filesystem::path GetTempChunkedSequenceFileName(const std::filesystem::path& CacheFolderPath, const IoHash& RawHash) + { + return CacheFolderPath / (RawHash.ToHexString() + ".tmp"); + } + + std::filesystem::path GetFinalChunkedSequenceFileName(const std::filesystem::path& CacheFolderPath, const IoHash& RawHash) + { + return CacheFolderPath / RawHash.ToHexString(); + } + + uint32_t SetNativeFileAttributes(const std::filesystem::path FilePath, SourcePlatform SourcePlatform, uint32_t Attributes) + { +#if ZEN_PLATFORM_WINDOWS + if (SourcePlatform == SourcePlatform::Windows) + { + SetFileAttributesToPath(FilePath, Attributes); + return Attributes; + } + else + { + uint32_t CurrentAttributes = GetFileAttributesFromPath(FilePath); + uint32_t NewAttributes = zen::MakeFileAttributeReadOnly(CurrentAttributes, zen::IsFileModeReadOnly(Attributes)); + if (CurrentAttributes != NewAttributes) + { + SetFileAttributesToPath(FilePath, NewAttributes); + } + return NewAttributes; + } +#endif // ZEN_PLATFORM_WINDOWS +#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + if (SourcePlatform != SourcePlatform::Windows) + { + zen::SetFileMode(FilePath, Attributes); + return Attributes; + } + else + { + uint32_t CurrentMode = zen::GetFileMode(FilePath); + uint32_t NewMode = zen::MakeFileModeReadOnly(CurrentMode, zen::IsFileAttributeReadOnly(Attributes)); + if (CurrentMode != NewMode) + { + zen::SetFileMode(FilePath, NewMode); + } + return NewMode; + } +#endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + }; + + uint32_t GetNativeFileAttributes(const std::filesystem::path FilePath) + { +#if ZEN_PLATFORM_WINDOWS + return GetFileAttributesFromPath(FilePath); +#endif // ZEN_PLATFORM_WINDOWS +#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + return GetFileMode(FilePath); +#endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + } + + bool IsFileWithRetry(const std::filesystem::path& Path) + { + std::error_code Ec; + bool Result = IsFile(Path, Ec); + for (size_t Retries = 0; Ec && Retries < 3; Retries++) + { + Sleep(100 + int(Retries * 50)); + Ec.clear(); + Result = IsFile(Path, Ec); + } + if (Ec) + { + throw std::system_error(std::error_code(Ec.value(), std::system_category()), + fmt::format("Check path '{}' is file failed with: {} ({})", Path, Ec.message(), Ec.value())); + } + return Result; + } + + bool SetFileReadOnlyWithRetry(const std::filesystem::path& Path, bool ReadOnly) + { + std::error_code Ec; + bool Result = SetFileReadOnly(Path, ReadOnly, Ec); + for (size_t Retries = 0; Ec && Retries < 3; Retries++) + { + Sleep(100 + int(Retries * 50)); + if (!IsFileWithRetry(Path)) + { + return false; + } + Ec.clear(); + Result = SetFileReadOnly(Path, ReadOnly, Ec); + } + if (Ec) + { + throw std::system_error( + std::error_code(Ec.value(), std::system_category()), + fmt::format("Failed {} read only flag for '{}' failed with: {}", ReadOnly ? "setting" : "clearing", Path, Ec.message())); + } + return Result; + } + + void RenameFileWithRetry(BuildOpLogOutput& LogOutput, const std::filesystem::path& SourcePath, const std::filesystem::path& TargetPath) + { + std::error_code Ec; + RenameFile(SourcePath, TargetPath, Ec); + for (size_t Retries = 0; Ec && Retries < 10; Retries++) + { + ZEN_ASSERT_SLOW(IsFile(SourcePath)); + if (Retries > 5) + { + LOG_OUTPUT_WARN(LogOutput, "Unable to overwrite file {} ({}: {}), retrying...", TargetPath, Ec.value(), Ec.message()); + } + Sleep(50 + int(Retries * 150)); + Ec.clear(); + RenameFile(SourcePath, TargetPath, Ec); + } + if (Ec) + { + throw std::system_error(std::error_code(Ec.value(), std::system_category()), + fmt::format("Rename from '{}' to '{}' failed with: {}", SourcePath, TargetPath, Ec.message())); + } + } + + void TryRemoveFile(BuildOpLogOutput& LogOutput, const std::filesystem::path& Path) + { + std::error_code Ec; + RemoveFile(Path, Ec); + if (Ec) + { + if (IsFile(Path, Ec)) + { + Ec.clear(); + RemoveFile(Path, Ec); + if (Ec) + { + LOG_OUTPUT_DEBUG(LogOutput, "Failed removing file '{}', reason: {}", Path, Ec.message()); + } + } + } + } + + void RemoveFileWithRetry(const std::filesystem::path& Path) + { + std::error_code Ec; + RemoveFile(Path, Ec); + for (size_t Retries = 0; Ec && Retries < 6; Retries++) + { + Sleep(100 + int(Retries * 50)); + if (!IsFileWithRetry(Path)) + { + return; + } + Ec.clear(); + RemoveFile(Path, Ec); + } + if (Ec) + { + throw std::system_error(std::error_code(Ec.value(), std::system_category()), + fmt::format("Removing file '{}' failed with: {}", Path, Ec.message())); + } + } + + void RemoveDirWithRetry(const std::filesystem::path& Path) + { + std::error_code Ec; + RemoveDir(Path, Ec); + for (size_t Retries = 0; Ec && Retries < 3; Retries++) + { + Sleep(100 + int(Retries * 50)); + if (!IsDir(Path)) + { + return; + } + Ec.clear(); + RemoveDir(Path, Ec); + } + if (Ec) + { + throw std::system_error(std::error_code(Ec.value(), std::system_category()), + fmt::format("Removing directory '{}' failed with: {}", Path, Ec.message())); + } + } + + bool CleanDirectory(WorkerThreadPool& IOWorkerPool, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag, + BuildOpLogOutput& LogOutput, + bool IsQuiet, + const std::filesystem::path& Path, + std::span<const std::string> ExcludeDirectories) + { + ZEN_TRACE_CPU("CleanDirectory"); + Stopwatch Timer; + + std::unique_ptr<BuildOpLogOutput::ProgressBar> ProgressBarPtr(LogOutput.CreateProgressBar("Clean Folder")); + BuildOpLogOutput::ProgressBar& Progress(*ProgressBarPtr); + + std::atomic<bool> CleanWipe = true; + std::atomic<uint64_t> DiscoveredItemCount = 0; + std::atomic<uint64_t> DeletedItemCount = 0; + std::atomic<uint64_t> DeletedByteCount = 0; + ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + struct AsyncVisitor : public GetDirectoryContentVisitor + { + AsyncVisitor(const std::filesystem::path& InPath, + std::atomic<bool>& InAbortFlag, + BuildOpLogOutput& InLogOutput, + std::atomic<bool>& InCleanWipe, + std::atomic<uint64_t>& InDiscoveredItemCount, + std::atomic<uint64_t>& InDeletedItemCount, + std::atomic<uint64_t>& InDeletedByteCount, + std::span<const std::string> InExcludeDirectories) + : Path(InPath) + , AbortFlag(InAbortFlag) + , LogOutput(InLogOutput) + , CleanWipe(InCleanWipe) + , DiscoveredItemCount(InDiscoveredItemCount) + , DeletedItemCount(InDeletedItemCount) + , DeletedByteCount(InDeletedByteCount) + , ExcludeDirectories(InExcludeDirectories) + { + } + virtual void AsyncVisitDirectory(const std::filesystem::path& RelativeRoot, DirectoryContent&& Content) override + { + ZEN_TRACE_CPU("CleanDirectory_AsyncVisitDirectory"); + if (!AbortFlag) + { + if (!Content.FileNames.empty()) + { + DiscoveredItemCount += Content.FileNames.size(); + + const std::string RelativeRootString = RelativeRoot.generic_string(); + bool RemoveContent = true; + for (const std::string_view ExcludeDirectory : ExcludeDirectories) + { + if (RelativeRootString.starts_with(ExcludeDirectory)) + { + if (RelativeRootString.length() > ExcludeDirectory.length()) + { + const char MaybePathDelimiter = RelativeRootString[ExcludeDirectory.length()]; + if (MaybePathDelimiter == '/' || MaybePathDelimiter == '\\' || + MaybePathDelimiter == std::filesystem::path::preferred_separator) + { + RemoveContent = false; + break; + } + } + else + { + RemoveContent = false; + break; + } + } + } + if (RemoveContent) + { + ZEN_TRACE_CPU("DeleteFiles"); + for (size_t FileIndex = 0; FileIndex < Content.FileNames.size(); FileIndex++) + { + const std::filesystem::path& FileName = Content.FileNames[FileIndex]; + const std::filesystem::path FilePath = (Path / RelativeRoot / FileName).make_preferred(); + try + { + SetFileReadOnlyWithRetry(FilePath, false); + RemoveFileWithRetry(FilePath); + DeletedItemCount++; + DeletedByteCount += Content.FileSizes[FileIndex]; + } + catch (const std::exception& Ex) + { + LOG_OUTPUT_WARN(LogOutput, "Failed removing file {}. Reason: {}", FilePath, Ex.what()); + CleanWipe = false; + } + } + } + } + } + } + const std::filesystem::path& Path; + std::atomic<bool>& AbortFlag; + BuildOpLogOutput& LogOutput; + std::atomic<bool>& CleanWipe; + std::atomic<uint64_t>& DiscoveredItemCount; + std::atomic<uint64_t>& DeletedItemCount; + std::atomic<uint64_t>& DeletedByteCount; + std::span<const std::string> ExcludeDirectories; + } Visitor(Path, AbortFlag, LogOutput, CleanWipe, DiscoveredItemCount, DeletedItemCount, DeletedByteCount, ExcludeDirectories); + + GetDirectoryContent( + Path, + DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive | DirectoryContentFlags::IncludeFileSizes, + Visitor, + IOWorkerPool, + Work.PendingWork()); + + DirectoryContent LocalDirectoryContent; + GetDirectoryContent(Path, DirectoryContentFlags::IncludeDirs | DirectoryContentFlags::IncludeFiles, LocalDirectoryContent); + DiscoveredItemCount += LocalDirectoryContent.Directories.size(); + std::vector<std::filesystem::path> DirectoriesToDelete; + DirectoriesToDelete.reserve(LocalDirectoryContent.Directories.size()); + for (std::filesystem::path& LocalDirPath : LocalDirectoryContent.Directories) + { + bool Leave = false; + for (const std::string_view ExcludeDirectory : ExcludeDirectories) + { + if (LocalDirPath == (Path / ExcludeDirectory)) + { + Leave = true; + break; + } + } + if (!Leave) + { + DirectoriesToDelete.emplace_back(std::move(LocalDirPath)); + DiscoveredItemCount++; + } + } + + uint64_t LastUpdateTimeMs = Timer.GetElapsedTimeMs(); + + Work.Wait(LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); + LastUpdateTimeMs = Timer.GetElapsedTimeMs(); + + uint64_t Deleted = DeletedItemCount.load(); + uint64_t DeletedBytes = DeletedByteCount.load(); + uint64_t Discovered = DiscoveredItemCount.load(); + Progress.UpdateState({.Task = "Cleaning folder ", + .Details = fmt::format("Found {}, Deleted {} ({})", Discovered, Deleted, NiceBytes(DeletedBytes)), + .TotalCount = Discovered, + .RemainingCount = Discovered - Deleted, + .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, + false); + }); + + for (const std::filesystem::path& DirectoryToDelete : DirectoriesToDelete) + { + ZEN_TRACE_CPU("DeleteDirs"); + try + { + std::error_code Ec; + zen::CleanDirectory(DirectoryToDelete, true, Ec); + if (Ec) + { + Sleep(200); + zen::CleanDirectory(DirectoryToDelete, true); + Ec.clear(); + } + + RemoveDirWithRetry(DirectoryToDelete); + + DeletedItemCount++; + } + catch (const std::exception& Ex) + { + LOG_OUTPUT_WARN(LogOutput, "Failed removing directory {}. Reason: {}", DirectoryToDelete, Ex.what()); + CleanWipe = false; + } + + uint64_t NowMs = Timer.GetElapsedTimeMs(); + if ((NowMs - LastUpdateTimeMs) >= LogOutput.GetProgressUpdateDelayMS()) + { + LastUpdateTimeMs = NowMs; + + uint64_t Deleted = DeletedItemCount.load(); + uint64_t DeletedBytes = DeletedByteCount.load(); + uint64_t Discovered = DiscoveredItemCount.load(); + Progress.UpdateState({.Task = "Cleaning folder ", + .Details = fmt::format("Found {}, Deleted {} ({})", Discovered, Deleted, NiceBytes(DeletedBytes)), + .TotalCount = Discovered, + .RemainingCount = Discovered - Deleted, + .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(AbortFlag, PauseFlag)}, + false); + } + } + + Progress.Finish(); + if (AbortFlag) + { + return false; + } + + uint64_t ElapsedTimeMs = Timer.GetElapsedTimeMs(); + if (ElapsedTimeMs >= 200 && !IsQuiet) + { + LOG_OUTPUT(LogOutput, + "Wiped folder '{}' {} ({}) in {}", + Path, + DiscoveredItemCount.load(), + NiceBytes(DeletedByteCount.load()), + NiceTimeSpanMs(ElapsedTimeMs)); + } + return CleanWipe; + } + + void CopyFile(bool AllowFileClone, + bool UseSparseFiles, + const std::filesystem::path& SourceFilePath, + const std::filesystem::path& TargetFilePath, + uint64_t RawSize, + std::atomic<uint64_t>& WriteCount, + std::atomic<uint64_t>& WriteByteCount, + std::atomic<uint64_t>& CloneCount, + std::atomic<uint64_t>& CloneByteCount) + { + ZEN_TRACE_CPU("CopyFile"); + if (AllowFileClone && TryCloneFile(SourceFilePath, TargetFilePath)) + { + WriteCount += 1; + WriteByteCount += RawSize; + CloneCount += 1; + CloneByteCount += RawSize; + } + else + { + BasicFile TargetFile(TargetFilePath, BasicFile::Mode::kTruncate); + if (UseSparseFiles) + { + PrepareFileForScatteredWrite(TargetFile.Handle(), RawSize); + } + uint64_t Offset = 0; + if (!ScanFile(SourceFilePath, 512u * 1024u, [&](const void* Data, size_t Size) { + TargetFile.Write(Data, Size, Offset); + Offset += Size; + WriteCount++; + WriteByteCount += Size; + })) + { + throw std::runtime_error(fmt::format("Failed to copy scavenged file '{}' to '{}'", SourceFilePath, TargetFilePath)); + } + } + } + +} // namespace + +bool +ReadStateObject(BuildOpLogOutput& LogOutput, + CbObjectView StateView, + Oid& OutBuildId, + std::vector<Oid>& BuildPartsIds, + std::vector<std::string>& BuildPartsNames, + std::vector<ChunkedFolderContent>& OutPartContents, + FolderContent& OutLocalFolderState) +{ + try + { + CbObjectView BuildView = StateView["builds"sv].AsArrayView().CreateViewIterator().AsObjectView(); + OutBuildId = BuildView["buildId"sv].AsObjectId(); + for (CbFieldView PartView : BuildView["parts"sv].AsArrayView()) + { + CbObjectView PartObjectView = PartView.AsObjectView(); + BuildPartsIds.push_back(PartObjectView["partId"sv].AsObjectId()); + BuildPartsNames.push_back(std::string(PartObjectView["partName"sv].AsString())); + OutPartContents.push_back(LoadChunkedFolderContentToCompactBinary(PartObjectView["content"sv].AsObjectView())); + } + OutLocalFolderState = LoadFolderContentToCompactBinary(StateView["localFolderState"sv].AsObjectView()); + return true; + } + catch (const std::exception& Ex) + { + LOG_OUTPUT_WARN(LogOutput, "Unable to read local state: ", Ex.what()); + return false; + } +} + +bool +ReadStateFile(BuildOpLogOutput& LogOutput, + const std::filesystem::path& StateFilePath, + FolderContent& OutLocalFolderState, + ChunkedFolderContent& OutLocalContent) +{ + ZEN_TRACE_CPU("ReadStateFile"); + bool HasLocalState = false; + try + { + CbObject CurrentStateObject = LoadCompactBinaryObject(StateFilePath).Object; + if (CurrentStateObject) + { + Oid CurrentBuildId; + std::vector<Oid> SavedBuildPartIds; + std::vector<std::string> SavedBuildPartsNames; + std::vector<ChunkedFolderContent> SavedPartContents; + if (ReadStateObject(LogOutput, + CurrentStateObject, + CurrentBuildId, + SavedBuildPartIds, + SavedBuildPartsNames, + SavedPartContents, + OutLocalFolderState)) + { + if (!SavedPartContents.empty()) + { + if (SavedPartContents.size() == 1) + { + OutLocalContent = std::move(SavedPartContents[0]); + } + else + { + OutLocalContent = MergeChunkedFolderContents(SavedPartContents[0], + std::span<const ChunkedFolderContent>(SavedPartContents).subspan(1)); + } + HasLocalState = true; + } + } + } + } + catch (const std::exception& Ex) + { + LOG_OUTPUT_WARN(LogOutput, "Failed reading state file {}, falling back to scannning. Reason: {}", StateFilePath, Ex.what()); + } + return HasLocalState; +} + +struct BlockRangeDescriptor +{ + uint32_t BlockIndex = (uint32_t)-1; + uint64_t RangeStart = 0; + uint64_t RangeLength = 0; + uint32_t ChunkBlockIndexStart = 0; + uint32_t ChunkBlockIndexCount = 0; +}; + +BlockRangeDescriptor +MergeBlockRanges(std::span<const BlockRangeDescriptor> Ranges) +{ + ZEN_ASSERT(Ranges.size() > 1); + const BlockRangeDescriptor& First = Ranges.front(); + const BlockRangeDescriptor& Last = Ranges.back(); + + return BlockRangeDescriptor{.BlockIndex = First.BlockIndex, + .RangeStart = First.RangeStart, + .RangeLength = Last.RangeStart + Last.RangeLength - First.RangeStart, + .ChunkBlockIndexStart = First.ChunkBlockIndexStart, + .ChunkBlockIndexCount = Last.ChunkBlockIndexStart + Last.ChunkBlockIndexCount - First.ChunkBlockIndexStart}; +} + +std::optional<std::vector<BlockRangeDescriptor>> +MakeOptionalBlockRangeVector(uint64_t TotalBlockSize, const BlockRangeDescriptor& Range) +{ + if (Range.RangeLength == TotalBlockSize) + { + return {}; + } + else + { + return std::vector<BlockRangeDescriptor>{Range}; + } +}; + +struct BlockRangeLimit +{ + uint16_t SizePercent; + uint16_t MaxRangeCount; +}; + +static const uint16_t FullBlockRangePercentLimit = 95; + +const std::vector<BlockRangeLimit> ForceMergeLimits = {{.SizePercent = FullBlockRangePercentLimit, .MaxRangeCount = 1}, + {.SizePercent = 90, .MaxRangeCount = 2}, + {.SizePercent = 85, .MaxRangeCount = 8}, + {.SizePercent = 80, .MaxRangeCount = 16}, + {.SizePercent = 70, .MaxRangeCount = 32}, + {.SizePercent = 60, .MaxRangeCount = 48}, + {.SizePercent = 2, .MaxRangeCount = 56}, + {.SizePercent = 0, .MaxRangeCount = 64}}; + +const BlockRangeLimit* +GetBlockRangeLimitForRange(std::span<const BlockRangeLimit> Limits, uint64_t TotalBlockSize, std::span<const BlockRangeDescriptor> Ranges) +{ + if (Ranges.size() > 1) + { + const std::uint64_t WantedSize = + std::accumulate(Ranges.begin(), Ranges.end(), uint64_t(0), [](uint64_t Current, const BlockRangeDescriptor& Range) { + return Current + Range.RangeLength; + }); + + const double RangeRequestedPercent = (WantedSize * 100.0) / TotalBlockSize; + + for (const BlockRangeLimit& Limit : Limits) + { + if (RangeRequestedPercent >= Limit.SizePercent && Ranges.size() > Limit.MaxRangeCount) + { + return &Limit; + } + } + } + return nullptr; +}; + +std::vector<BlockRangeDescriptor> +CollapseBlockRanges(const uint64_t AlwaysAcceptableGap, std::span<const BlockRangeDescriptor> BlockRanges) +{ + ZEN_ASSERT(BlockRanges.size() > 1); + std::vector<BlockRangeDescriptor> CollapsedBlockRanges; + + auto BlockRangesIt = BlockRanges.begin(); + CollapsedBlockRanges.push_back(*BlockRangesIt++); + for (; BlockRangesIt != BlockRanges.end(); BlockRangesIt++) + { + BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back(); + + const uint64_t BothRangeSize = BlockRangesIt->RangeLength + LastRange.RangeLength; + + const uint64_t Gap = BlockRangesIt->RangeStart - (LastRange.RangeStart + LastRange.RangeLength); + if (Gap <= Max(BothRangeSize / 16, AlwaysAcceptableGap)) + { + LastRange.ChunkBlockIndexCount = + (BlockRangesIt->ChunkBlockIndexStart + BlockRangesIt->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart; + LastRange.RangeLength = (BlockRangesIt->RangeStart + BlockRangesIt->RangeLength) - LastRange.RangeStart; + } + else + { + CollapsedBlockRanges.push_back(*BlockRangesIt); + } + } + + return CollapsedBlockRanges; +}; + +uint64_t +CalculateNextGap(std::span<const BlockRangeDescriptor> BlockRanges) +{ + ZEN_ASSERT(BlockRanges.size() > 1); + uint64_t AcceptableGap = (uint64_t)-1; + for (size_t RangeIndex = 0; RangeIndex < BlockRanges.size() - 1; RangeIndex++) + { + const BlockRangeDescriptor& Range = BlockRanges[RangeIndex]; + const BlockRangeDescriptor& NextRange = BlockRanges[RangeIndex + 1]; + + const uint64_t Gap = NextRange.RangeStart - (Range.RangeStart + Range.RangeLength); + AcceptableGap = Min(Gap, AcceptableGap); + } + AcceptableGap = RoundUp(AcceptableGap, 16u * 1024u); + return AcceptableGap; +}; + +std::optional<std::vector<BlockRangeDescriptor>> +CalculateBlockRanges(BuildOpLogOutput& LogOutput, + bool IsVerbose, + uint32_t BlockIndex, + const ChunkBlockDescription& BlockDescription, + std::span<const uint32_t> BlockChunkIndexNeeded, + bool LimitToSingleRange, + const uint64_t ChunkStartOffsetInBlock, + const uint64_t TotalBlockSize, + uint64_t& OutTotalWantedChunksSize) +{ + ZEN_TRACE_CPU("CalculateBlockRanges"); + + std::vector<BlockRangeDescriptor> BlockRanges; + { + uint64_t CurrentOffset = ChunkStartOffsetInBlock; + uint32_t ChunkBlockIndex = 0; + uint32_t NeedBlockChunkIndexOffset = 0; + BlockRangeDescriptor NextRange{.BlockIndex = BlockIndex}; + while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() && ChunkBlockIndex < BlockDescription.ChunkRawHashes.size()) + { + const uint32_t ChunkCompressedLength = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex]; + if (ChunkBlockIndex < BlockChunkIndexNeeded[NeedBlockChunkIndexOffset]) + { + if (NextRange.RangeLength > 0) + { + BlockRanges.push_back(NextRange); + NextRange = {.BlockIndex = BlockIndex}; + } + ChunkBlockIndex++; + CurrentOffset += ChunkCompressedLength; + } + else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset]) + { + if (NextRange.RangeLength == 0) + { + NextRange.RangeStart = CurrentOffset; + NextRange.ChunkBlockIndexStart = ChunkBlockIndex; + } + NextRange.RangeLength += ChunkCompressedLength; + NextRange.ChunkBlockIndexCount++; + ChunkBlockIndex++; + CurrentOffset += ChunkCompressedLength; + NeedBlockChunkIndexOffset++; + } + else + { + ZEN_ASSERT(false); + } + } + if (NextRange.RangeLength > 0) + { + BlockRanges.push_back(NextRange); + } + } + ZEN_ASSERT(!BlockRanges.empty()); + + OutTotalWantedChunksSize = + std::accumulate(BlockRanges.begin(), BlockRanges.end(), uint64_t(0), [](uint64_t Current, const BlockRangeDescriptor& Range) { + return Current + Range.RangeLength; + }); + + double RangeWantedPercent = (OutTotalWantedChunksSize * 100.0) / TotalBlockSize; + + if (BlockRanges.size() == 1) + { + if (IsVerbose) + { + LOG_OUTPUT(LogOutput, + "Range request of {} ({:.2f}%) using single range from block {} ({}) as is", + NiceBytes(OutTotalWantedChunksSize), + RangeWantedPercent, + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize)); + } + return BlockRanges; + } + + if (LimitToSingleRange) + { + const BlockRangeDescriptor MergedRange = MergeBlockRanges(BlockRanges); + if (IsVerbose) + { + const double RangeRequestedPercent = (MergedRange.RangeLength * 100.0) / TotalBlockSize; + const double WastedPercent = ((MergedRange.RangeLength - OutTotalWantedChunksSize) * 100.0) / MergedRange.RangeLength; + + LOG_OUTPUT( + LogOutput, + "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) limited to single block range {} ({:.2f}%) wasting " + "{:.2f}% ({})", + NiceBytes(OutTotalWantedChunksSize), + RangeWantedPercent, + BlockRanges.size(), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + NiceBytes(MergedRange.RangeLength), + RangeRequestedPercent, + WastedPercent, + NiceBytes(MergedRange.RangeLength - OutTotalWantedChunksSize)); + } + return MakeOptionalBlockRangeVector(TotalBlockSize, MergedRange); + } + + if (RangeWantedPercent > FullBlockRangePercentLimit) + { + const BlockRangeDescriptor MergedRange = MergeBlockRanges(BlockRanges); + if (IsVerbose) + { + const double RangeRequestedPercent = (MergedRange.RangeLength * 100.0) / TotalBlockSize; + const double WastedPercent = ((MergedRange.RangeLength - OutTotalWantedChunksSize) * 100.0) / MergedRange.RangeLength; + + LOG_OUTPUT(LogOutput, + "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) exceeds {}%. Merged to single block range {} " + "({:.2f}%) wasting {:.2f}% ({})", + NiceBytes(OutTotalWantedChunksSize), + RangeWantedPercent, + BlockRanges.size(), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + FullBlockRangePercentLimit, + NiceBytes(MergedRange.RangeLength), + RangeRequestedPercent, + WastedPercent, + NiceBytes(MergedRange.RangeLength - OutTotalWantedChunksSize)); + } + return MakeOptionalBlockRangeVector(TotalBlockSize, MergedRange); + } + + std::vector<BlockRangeDescriptor> CollapsedBlockRanges = CollapseBlockRanges(16u * 1024u, BlockRanges); + while (GetBlockRangeLimitForRange(ForceMergeLimits, TotalBlockSize, CollapsedBlockRanges)) + { + CollapsedBlockRanges = CollapseBlockRanges(CalculateNextGap(CollapsedBlockRanges), CollapsedBlockRanges); + } + + const std::uint64_t WantedCollapsedSize = + std::accumulate(CollapsedBlockRanges.begin(), + CollapsedBlockRanges.end(), + uint64_t(0), + [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; }); + + const double CollapsedRangeRequestedPercent = (WantedCollapsedSize * 100.0) / TotalBlockSize; + + if (IsVerbose) + { + const double WastedPercent = ((WantedCollapsedSize - OutTotalWantedChunksSize) * 100.0) / WantedCollapsedSize; + + LOG_OUTPUT( + LogOutput, + "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) collapsed to {} {:.2f}% using {} ranges wasting {:.2f}% " + "({})", + NiceBytes(OutTotalWantedChunksSize), + RangeWantedPercent, + BlockRanges.size(), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + NiceBytes(WantedCollapsedSize), + CollapsedRangeRequestedPercent, + CollapsedBlockRanges.size(), + WastedPercent, + NiceBytes(WantedCollapsedSize - OutTotalWantedChunksSize)); + } + return CollapsedBlockRanges; +} + +bool +IsSingleFileChunk(const ChunkedFolderContent& RemoteContent, + const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> Locations) +{ + if (Locations.size() == 1) + { + const uint32_t FirstSequenceIndex = Locations[0]->SequenceIndex; + if (RemoteContent.ChunkedContent.ChunkCounts[FirstSequenceIndex] == 1) + { + ZEN_ASSERT_SLOW(Locations[0]->Offset == 0); + return true; + } + } + return false; +} + +IoBuffer +MakeBufferMemoryBased(const CompositeBuffer& PartialBlockBuffer) +{ + ZEN_TRACE_CPU("MakeBufferMemoryBased"); + IoBuffer BlockMemoryBuffer; + std::span<const SharedBuffer> Segments = PartialBlockBuffer.GetSegments(); + if (Segments.size() == 1) + { + IoBufferFileReference FileRef = {}; + if (PartialBlockBuffer.GetSegments().front().AsIoBuffer().GetFileReference(FileRef)) + { + BlockMemoryBuffer = UniqueBuffer::Alloc(FileRef.FileChunkSize).MoveToShared().AsIoBuffer(); + BasicFile Reader; + Reader.Attach(FileRef.FileHandle); + auto _ = MakeGuard([&Reader]() { Reader.Detach(); }); + MutableMemoryView ReadMem = BlockMemoryBuffer.GetMutableView(); + Reader.Read(ReadMem.GetData(), FileRef.FileChunkSize, FileRef.FileChunkOffset); + return BlockMemoryBuffer; + } + else + { + return PartialBlockBuffer.GetSegments().front().AsIoBuffer(); + } + } + else + { + // Not a homogenous memory buffer, read all to memory + + BlockMemoryBuffer = UniqueBuffer::Alloc(PartialBlockBuffer.GetSize()).MoveToShared().AsIoBuffer(); + MutableMemoryView ReadMem = BlockMemoryBuffer.GetMutableView(); + for (const SharedBuffer& Segment : Segments) + { + IoBufferFileReference FileRef = {}; + if (Segment.AsIoBuffer().GetFileReference(FileRef)) + { + BasicFile Reader; + Reader.Attach(FileRef.FileHandle); + Reader.Read(ReadMem.GetData(), FileRef.FileChunkSize, FileRef.FileChunkOffset); + Reader.Detach(); + ReadMem = ReadMem.Mid(FileRef.FileChunkSize); + } + else + { + ReadMem = ReadMem.CopyFrom(Segment.AsIoBuffer().GetView()); + } + } + return BlockMemoryBuffer; + } +} + +void +StreamDecompress(std::atomic<bool>& AbortFlag, + const std::filesystem::path& CacheFolderPath, + const IoHash& SequenceRawHash, + CompositeBuffer&& CompressedPart, + DiskStatistics& DiskStats) +{ + ZEN_TRACE_CPU("StreamDecompress"); + const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash); + TemporaryFile DecompressedTemp; + std::error_code Ec; + DecompressedTemp.CreateTemporary(TempChunkSequenceFileName.parent_path(), Ec); + if (Ec) + { + throw std::runtime_error( + fmt::format("Failed creating temporary file for decompressing large blob {}. Reason: {}", SequenceRawHash, Ec.message())); + } + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompressedPart, RawHash, RawSize); + if (!Compressed) + { + throw std::runtime_error(fmt::format("Failed to parse header of compressed large blob {}", SequenceRawHash)); + } + if (RawHash != SequenceRawHash) + { + throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, SequenceRawHash)); + } + PrepareFileForScatteredWrite(DecompressedTemp.Handle(), RawSize); + + IoHashStream Hash; + bool CouldDecompress = + Compressed.DecompressToStream(0, + (uint64_t)-1, + [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { + ZEN_UNUSED(SourceOffset); + ZEN_TRACE_CPU("StreamDecompress_Write"); + DiskStats.ReadByteCount += SourceSize; + if (!AbortFlag) + { + for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) + { + Hash.Append(Segment.GetView()); + DecompressedTemp.Write(Segment, Offset); + Offset += Segment.GetSize(); + DiskStats.WriteByteCount += Segment.GetSize(); + DiskStats.WriteCount++; + } + return true; + } + return false; + }); + + if (AbortFlag) + { + return; + } + + if (!CouldDecompress) + { + throw std::runtime_error(fmt::format("Failed to decompress large blob {}", SequenceRawHash)); + } + const IoHash VerifyHash = Hash.GetHash(); + if (VerifyHash != SequenceRawHash) + { + throw std::runtime_error( + fmt::format("Decompressed blob payload hash {} does not match expected hash {}", VerifyHash, SequenceRawHash)); + } + DecompressedTemp.MoveTemporaryIntoPlace(TempChunkSequenceFileName, Ec); + if (Ec) + { + throw std::runtime_error( + fmt::format("Failed moving temporary file for decompressing large blob {}. Reason: {}", SequenceRawHash, Ec.message())); + } + // WriteChunkStats.ChunkCountWritten++; +} + +class FilteredRate +{ +public: + FilteredRate() {} + + void Start() + { + if (StartTimeUS == (uint64_t)-1) + { + uint64_t Expected = (uint64_t)-1; + if (StartTimeUS.compare_exchange_weak(Expected, Timer.GetElapsedTimeUs())) + { + LastTimeUS = StartTimeUS.load(); + } + } + } + void Stop() + { + if (EndTimeUS == (uint64_t)-1) + { + uint64_t Expected = (uint64_t)-1; + EndTimeUS.compare_exchange_weak(Expected, Timer.GetElapsedTimeUs()); + } + } + + void Update(uint64_t Count) + { + if (LastTimeUS == (uint64_t)-1) + { + return; + } + uint64_t TimeUS = Timer.GetElapsedTimeUs(); + uint64_t TimeDeltaUS = TimeUS - LastTimeUS; + if (TimeDeltaUS >= 2000000) + { + uint64_t Delta = Count - LastCount; + uint64_t PerSecond = (Delta * 1000000) / TimeDeltaUS; + + LastPerSecond = PerSecond; + + LastCount = Count; + + FilteredPerSecond = (PerSecond + (LastPerSecond * 7)) / 8; + + LastTimeUS = TimeUS; + } + } + + uint64_t GetCurrent() const // If Stopped - return total count / total time + { + if (LastTimeUS == (uint64_t)-1) + { + return 0; + } + return FilteredPerSecond; + } + + uint64_t GetElapsedTimeUS() const + { + if (StartTimeUS == (uint64_t)-1) + { + return 0; + } + if (EndTimeUS == (uint64_t)-1) + { + return 0; + } + uint64_t TimeDeltaUS = EndTimeUS - StartTimeUS; + return TimeDeltaUS; + } + + bool IsActive() const { return (StartTimeUS != (uint64_t)-1) && (EndTimeUS == (uint64_t)-1); } + +private: + Stopwatch Timer; + std::atomic<uint64_t> StartTimeUS = (uint64_t)-1; + std::atomic<uint64_t> EndTimeUS = (uint64_t)-1; + std::atomic<uint64_t> LastTimeUS = (uint64_t)-1; + uint64_t LastCount = 0; + uint64_t LastPerSecond = 0; + uint64_t FilteredPerSecond = 0; +}; + +EPartialBlockRequestMode +PartialBlockRequestModeFromString(const std::string_view ModeString) +{ + switch (HashStringAsLowerDjb2(ModeString)) + { + case HashStringDjb2("false"): + return EPartialBlockRequestMode::Off; + case HashStringDjb2("zencacheonly"): + return EPartialBlockRequestMode::ZenCacheOnly; + case HashStringDjb2("mixed"): + return EPartialBlockRequestMode::Mixed; + case HashStringDjb2("true"): + return EPartialBlockRequestMode::All; + default: + return EPartialBlockRequestMode::Invalid; + } +} + +std::filesystem::path +ZenStateFilePath(const std::filesystem::path& ZenFolderPath) +{ + return ZenFolderPath / "current_state.cbo"; +} +std::filesystem::path +ZenTempFolderPath(const std::filesystem::path& ZenFolderPath) +{ + return ZenFolderPath / "tmp"; +} + +BuildsOperationUpdateFolder::BuildsOperationUpdateFolder(BuildOpLogOutput& LogOutput, + StorageInstance& Storage, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag, + WorkerThreadPool& IOWorkerPool, + WorkerThreadPool& NetworkPool, + const Oid& BuildId, + const std::filesystem::path& Path, + const ChunkedFolderContent& LocalContent, + const ChunkedContentLookup& LocalLookup, + const ChunkedFolderContent& RemoteContent, + const ChunkedContentLookup& RemoteLookup, + const std::vector<ChunkBlockDescription>& BlockDescriptions, + const std::vector<IoHash>& LooseChunkHashes, + const Options& Options, + DiskStatistics& DiskStats) +: m_LogOutput(LogOutput) +, m_Storage(Storage) +, m_AbortFlag(AbortFlag) +, m_PauseFlag(PauseFlag) +, m_IOWorkerPool(IOWorkerPool) +, m_NetworkPool(NetworkPool) +, m_BuildId(BuildId) +, m_Path(Path) +, m_LocalContent(LocalContent) +, m_LocalLookup(LocalLookup) +, m_RemoteContent(RemoteContent) +, m_RemoteLookup(RemoteLookup) +, m_BlockDescriptions(BlockDescriptions) +, m_LooseChunkHashes(LooseChunkHashes) +, m_Options(Options) +, m_DiskStats(DiskStats) +, m_CacheFolderPath(ZenTempCacheFolderPath(m_Options.ZenFolderPath)) +{ +} + +void +BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) +{ + ZEN_TRACE_CPU("BuildsOperationUpdateFolder::Execute"); + + ZEN_ASSERT((!m_Options.PrimeCacheOnly) || + (m_Options.PrimeCacheOnly && (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Off))); + + Stopwatch IndexTimer; + + if (!m_Options.IsQuiet) + { + LOG_OUTPUT(m_LogOutput, "Indexed local and remote content in {}", NiceTimeSpanMs(IndexTimer.GetElapsedTimeMs())); + } + + Stopwatch CacheMappingTimer; + + std::vector<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters(m_RemoteContent.ChunkedContent.SequenceRawHashes.size()); + std::vector<bool> RemoteChunkIndexNeedsCopyFromLocalFileFlags(m_RemoteContent.ChunkedContent.ChunkHashes.size()); + std::vector<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags(m_RemoteContent.ChunkedContent.ChunkHashes.size()); + + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedChunkHashesFound; + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedSequenceHashesFound; + if (!m_Options.PrimeCacheOnly) + { + ScanCacheFolder(CachedChunkHashesFound, CachedSequenceHashesFound); + } + + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedBlocksFound; + if (!m_Options.PrimeCacheOnly) + { + ScanTempBlocksFolder(CachedBlocksFound); + } + + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> SequenceIndexesLeftToFindToRemoteIndex; + + if (!m_Options.PrimeCacheOnly && m_Options.EnableTargetFolderScavenging) + { + // Pick up all whole files we can use from current local state + ZEN_TRACE_CPU("GetLocalSequences"); + + Stopwatch LocalTimer; + + std::vector<uint32_t> MissingSequenceIndexes = ScanTargetFolder(CachedChunkHashesFound, CachedSequenceHashesFound); + + for (uint32_t RemoteSequenceIndex : MissingSequenceIndexes) + { + // We must write the sequence + const uint32_t ChunkCount = m_RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]; + const IoHash& RemoteSequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = ChunkCount; + SequenceIndexesLeftToFindToRemoteIndex.insert({RemoteSequenceRawHash, RemoteSequenceIndex}); + } + } + else + { + for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < m_RemoteContent.ChunkedContent.SequenceRawHashes.size(); + RemoteSequenceIndex++) + { + const uint32_t ChunkCount = m_RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]; + SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = ChunkCount; + } + } + + std::vector<ChunkedFolderContent> ScavengedContents; + std::vector<ChunkedContentLookup> ScavengedLookups; + std::vector<std::filesystem::path> ScavengedPaths; + + std::vector<ScavengedSequenceCopyOperation> ScavengedSequenceCopyOperations; + uint64_t ScavengedPathsCount = 0; + + if (!m_Options.PrimeCacheOnly && m_Options.EnableOtherDownloadsScavenging) + { + ZEN_TRACE_CPU("GetScavengedSequences"); + + Stopwatch ScavengeTimer; + + if (!SequenceIndexesLeftToFindToRemoteIndex.empty()) + { + std::vector<ScavengeSource> ScavengeSources = FindScavengeSources(); + + const size_t ScavengePathCount = ScavengeSources.size(); + + ScavengedContents.resize(ScavengePathCount); + ScavengedLookups.resize(ScavengePathCount); + ScavengedPaths.resize(ScavengePathCount); + + std::unique_ptr<BuildOpLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Scavenging")); + BuildOpLogOutput::ProgressBar& ScavengeProgressBar(*ProgressBarPtr); + + ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + std::atomic<uint64_t> PathsFound(0); + std::atomic<uint64_t> ChunksFound(0); + std::atomic<uint64_t> PathsScavenged(0); + + for (size_t ScavengeIndex = 0; ScavengeIndex < ScavengePathCount; ScavengeIndex++) + { + Work.ScheduleWork(m_IOWorkerPool, + [this, + &ScavengeSources, + &ScavengedContents, + &ScavengedPaths, + &ScavengedLookups, + &PathsFound, + &ChunksFound, + &PathsScavenged, + ScavengeIndex](std::atomic<bool>&) { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_FindScavengeContent"); + + const ScavengeSource& Source = ScavengeSources[ScavengeIndex]; + ChunkedFolderContent& ScavengedLocalContent = ScavengedContents[ScavengeIndex]; + ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengeIndex]; + + if (FindScavengeContent(Source, ScavengedLocalContent, ScavengedLookup)) + { + ScavengedPaths[ScavengeIndex] = Source.Path; + PathsFound += ScavengedLocalContent.Paths.size(); + ChunksFound += ScavengedLocalContent.ChunkedContent.ChunkHashes.size(); + } + else + { + ScavengedPaths[ScavengeIndex].clear(); + } + PathsScavenged++; + } + }); + } + { + ZEN_TRACE_CPU("ScavengeScan_Wait"); + + Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); + std::string Details = fmt::format("{}/{} scanned. {} paths and {} chunks found for scavenging", + PathsScavenged.load(), + ScavengePathCount, + PathsFound.load(), + ChunksFound.load()); + ScavengeProgressBar.UpdateState({.Task = "Scavenging ", + .Details = Details, + .TotalCount = ScavengePathCount, + .RemainingCount = ScavengePathCount - PathsScavenged.load(), + .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, + false); + }); + } + + ScavengeProgressBar.Finish(); + if (m_AbortFlag) + { + return; + } + + for (uint32_t ScavengedContentIndex = 0; + ScavengedContentIndex < ScavengedContents.size() && (!SequenceIndexesLeftToFindToRemoteIndex.empty()); + ScavengedContentIndex++) + { + const std::filesystem::path& ScavengePath = ScavengedPaths[ScavengedContentIndex]; + if (!ScavengePath.empty()) + { + const ChunkedFolderContent& ScavengedLocalContent = ScavengedContents[ScavengedContentIndex]; + const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengedContentIndex]; + + for (uint32_t ScavengedSequenceIndex = 0; + ScavengedSequenceIndex < ScavengedLocalContent.ChunkedContent.SequenceRawHashes.size(); + ScavengedSequenceIndex++) + { + const IoHash& SequenceRawHash = ScavengedLocalContent.ChunkedContent.SequenceRawHashes[ScavengedSequenceIndex]; + if (auto It = SequenceIndexesLeftToFindToRemoteIndex.find(SequenceRawHash); + It != SequenceIndexesLeftToFindToRemoteIndex.end()) + { + const uint32_t RemoteSequenceIndex = It->second; + const uint64_t RawSize = + m_RemoteContent.RawSizes[m_RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]]; + ZEN_ASSERT(RawSize > 0); + + const uint32_t ScavengedPathIndex = ScavengedLookup.SequenceIndexFirstPathIndex[ScavengedSequenceIndex]; + ZEN_ASSERT_SLOW(IsFile((ScavengePath / ScavengedLocalContent.Paths[ScavengedPathIndex]).make_preferred())); + + ScavengedSequenceCopyOperations.push_back({.ScavengedContentIndex = ScavengedContentIndex, + .ScavengedPathIndex = ScavengedPathIndex, + .RemoteSequenceIndex = RemoteSequenceIndex, + .RawSize = RawSize}); + + SequenceIndexesLeftToFindToRemoteIndex.erase(SequenceRawHash); + SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = 0; + + m_CacheMappingStats.ScavengedPathsMatchingSequencesCount++; + m_CacheMappingStats.ScavengedPathsMatchingSequencesByteCount += RawSize; + } + } + ScavengedPathsCount++; + } + } + } + m_CacheMappingStats.ScavengeElapsedWallTimeUs += ScavengeTimer.GetElapsedTimeUs(); + } + + uint32_t RemainingChunkCount = 0; + for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < m_RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++) + { + uint64_t ChunkWriteCount = GetChunkWriteCount(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); + if (ChunkWriteCount > 0) + { + RemainingChunkCount++; + } + } + + // Pick up all chunks in current local state + tsl::robin_map<IoHash, size_t, IoHash::Hasher> RawHashToCopyChunkDataIndex; + std::vector<CopyChunkData> CopyChunkDatas; + + if (!m_Options.PrimeCacheOnly && m_Options.EnableTargetFolderScavenging) + { + ZEN_TRACE_CPU("GetLocalChunks"); + + Stopwatch LocalTimer; + + for (uint32_t LocalSequenceIndex = 0; + LocalSequenceIndex < m_LocalContent.ChunkedContent.SequenceRawHashes.size() && (RemainingChunkCount > 0); + LocalSequenceIndex++) + { + const IoHash& LocalSequenceRawHash = m_LocalContent.ChunkedContent.SequenceRawHashes[LocalSequenceIndex]; + const uint32_t LocalOrderOffset = m_LocalLookup.SequenceIndexChunkOrderOffset[LocalSequenceIndex]; + + { + uint64_t SourceOffset = 0; + const uint32_t LocalChunkCount = m_LocalContent.ChunkedContent.ChunkCounts[LocalSequenceIndex]; + for (uint32_t LocalOrderIndex = 0; LocalOrderIndex < LocalChunkCount; LocalOrderIndex++) + { + const uint32_t LocalChunkIndex = m_LocalContent.ChunkedContent.ChunkOrders[LocalOrderOffset + LocalOrderIndex]; + const IoHash& LocalChunkHash = m_LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; + const uint64_t LocalChunkRawSize = m_LocalContent.ChunkedContent.ChunkRawSizes[LocalChunkIndex]; + + if (auto RemoteChunkIt = m_RemoteLookup.ChunkHashToChunkIndex.find(LocalChunkHash); + RemoteChunkIt != m_RemoteLookup.ChunkHashToChunkIndex.end()) + { + const uint32_t RemoteChunkIndex = RemoteChunkIt->second; + if (!RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) + { + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = + GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); + + if (!ChunkTargetPtrs.empty()) + { + CopyChunkData::ChunkTarget Target = { + .TargetChunkLocationCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()), + .RemoteChunkIndex = RemoteChunkIndex, + .CacheFileOffset = SourceOffset}; + if (auto CopySourceIt = RawHashToCopyChunkDataIndex.find(LocalSequenceRawHash); + CopySourceIt != RawHashToCopyChunkDataIndex.end()) + { + CopyChunkData& Data = CopyChunkDatas[CopySourceIt->second]; + if (Data.TargetChunkLocationPtrs.size() > 1024) + { + RawHashToCopyChunkDataIndex.insert_or_assign(LocalSequenceRawHash, CopyChunkDatas.size()); + CopyChunkDatas.push_back( + CopyChunkData{.ScavengeSourceIndex = (uint32_t)-1, + .SourceSequenceIndex = LocalSequenceIndex, + .TargetChunkLocationPtrs = ChunkTargetPtrs, + .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}}); + } + else + { + Data.TargetChunkLocationPtrs.insert(Data.TargetChunkLocationPtrs.end(), + ChunkTargetPtrs.begin(), + ChunkTargetPtrs.end()); + Data.ChunkTargets.push_back(Target); + } + } + else + { + RawHashToCopyChunkDataIndex.insert_or_assign(LocalSequenceRawHash, CopyChunkDatas.size()); + CopyChunkDatas.push_back( + CopyChunkData{.ScavengeSourceIndex = (uint32_t)-1, + .SourceSequenceIndex = LocalSequenceIndex, + .TargetChunkLocationPtrs = ChunkTargetPtrs, + .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}}); + } + m_CacheMappingStats.LocalChunkMatchingRemoteCount++; + m_CacheMappingStats.LocalChunkMatchingRemoteByteCount += LocalChunkRawSize; + RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex] = true; + RemainingChunkCount--; + } + } + } + SourceOffset += LocalChunkRawSize; + } + } + } + m_CacheMappingStats.LocalScanElapsedWallTimeUs += LocalTimer.GetElapsedTimeUs(); + } + + if (!m_Options.PrimeCacheOnly && m_Options.EnableOtherDownloadsScavenging) + { + ZEN_TRACE_CPU("GetScavengeChunks"); + + Stopwatch ScavengeTimer; + + for (uint32_t ScavengedContentIndex = 0; ScavengedContentIndex < ScavengedContents.size() && (RemainingChunkCount > 0); + ScavengedContentIndex++) + { + const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengedContentIndex]; + // const std::filesystem::path& ScavengedPath = ScavengedPaths[ScavengedContentIndex]; + const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengedContentIndex]; + + for (uint32_t ScavengedSequenceIndex = 0; + ScavengedSequenceIndex < ScavengedContent.ChunkedContent.SequenceRawHashes.size() && (RemainingChunkCount > 0); + ScavengedSequenceIndex++) + { + const IoHash& ScavengedSequenceRawHash = ScavengedContent.ChunkedContent.SequenceRawHashes[ScavengedSequenceIndex]; + const uint32_t ScavengedOrderOffset = ScavengedLookup.SequenceIndexChunkOrderOffset[ScavengedSequenceIndex]; + + { + uint64_t SourceOffset = 0; + const uint32_t ScavengedChunkCount = ScavengedContent.ChunkedContent.ChunkCounts[ScavengedSequenceIndex]; + for (uint32_t ScavengedOrderIndex = 0; ScavengedOrderIndex < ScavengedChunkCount; ScavengedOrderIndex++) + { + const uint32_t ScavengedChunkIndex = + ScavengedContent.ChunkedContent.ChunkOrders[ScavengedOrderOffset + ScavengedOrderIndex]; + const IoHash& ScavengedChunkHash = ScavengedContent.ChunkedContent.ChunkHashes[ScavengedChunkIndex]; + const uint64_t ScavengedChunkRawSize = ScavengedContent.ChunkedContent.ChunkRawSizes[ScavengedChunkIndex]; + + if (auto RemoteChunkIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ScavengedChunkHash); + RemoteChunkIt != m_RemoteLookup.ChunkHashToChunkIndex.end()) + { + const uint32_t RemoteChunkIndex = RemoteChunkIt->second; + if (!RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) + { + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = + GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); + + if (!ChunkTargetPtrs.empty()) + { + CopyChunkData::ChunkTarget Target = { + .TargetChunkLocationCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()), + .RemoteChunkIndex = RemoteChunkIndex, + .CacheFileOffset = SourceOffset}; + if (auto CopySourceIt = RawHashToCopyChunkDataIndex.find(ScavengedSequenceRawHash); + CopySourceIt != RawHashToCopyChunkDataIndex.end()) + { + CopyChunkData& Data = CopyChunkDatas[CopySourceIt->second]; + if (Data.TargetChunkLocationPtrs.size() > 1024) + { + RawHashToCopyChunkDataIndex.insert_or_assign(ScavengedSequenceRawHash, CopyChunkDatas.size()); + CopyChunkDatas.push_back( + CopyChunkData{.ScavengeSourceIndex = ScavengedContentIndex, + .SourceSequenceIndex = ScavengedSequenceIndex, + .TargetChunkLocationPtrs = ChunkTargetPtrs, + .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}}); + } + else + { + Data.TargetChunkLocationPtrs.insert(Data.TargetChunkLocationPtrs.end(), + ChunkTargetPtrs.begin(), + ChunkTargetPtrs.end()); + Data.ChunkTargets.push_back(Target); + } + } + else + { + RawHashToCopyChunkDataIndex.insert_or_assign(ScavengedSequenceRawHash, CopyChunkDatas.size()); + CopyChunkDatas.push_back( + CopyChunkData{.ScavengeSourceIndex = ScavengedContentIndex, + .SourceSequenceIndex = ScavengedSequenceIndex, + .TargetChunkLocationPtrs = ChunkTargetPtrs, + .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}}); + } + m_CacheMappingStats.ScavengedChunkMatchingRemoteCount++; + m_CacheMappingStats.ScavengedChunkMatchingRemoteByteCount += ScavengedChunkRawSize; + RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex] = true; + RemainingChunkCount--; + } + } + } + SourceOffset += ScavengedChunkRawSize; + } + } + } + } + m_CacheMappingStats.ScavengeElapsedWallTimeUs += ScavengeTimer.GetElapsedTimeUs(); + } + if (!m_Options.IsQuiet) + { + if (m_CacheMappingStats.CacheSequenceHashesCount > 0 || m_CacheMappingStats.CacheChunkCount > 0 || + m_CacheMappingStats.CacheBlockCount > 0) + { + LOG_OUTPUT(m_LogOutput, + "Download cache: Found {} ({}) chunk sequences, {} ({}) chunks, {} ({}) blocks in {}", + m_CacheMappingStats.CacheSequenceHashesCount, + NiceBytes(m_CacheMappingStats.CacheSequenceHashesByteCount), + m_CacheMappingStats.CacheChunkCount, + NiceBytes(m_CacheMappingStats.CacheChunkByteCount), + m_CacheMappingStats.CacheBlockCount, + NiceBytes(m_CacheMappingStats.CacheBlocksByteCount), + NiceTimeSpanMs(m_CacheMappingStats.CacheScanElapsedWallTimeUs / 1000)); + } + + if (m_CacheMappingStats.LocalPathsMatchingSequencesCount > 0 || m_CacheMappingStats.LocalChunkMatchingRemoteCount > 0) + { + LOG_OUTPUT(m_LogOutput, + "Local state : Found {} ({}) chunk sequences, {} ({}) chunks in {}", + m_CacheMappingStats.LocalPathsMatchingSequencesCount, + NiceBytes(m_CacheMappingStats.LocalPathsMatchingSequencesByteCount), + m_CacheMappingStats.LocalChunkMatchingRemoteCount, + NiceBytes(m_CacheMappingStats.LocalChunkMatchingRemoteByteCount), + NiceTimeSpanMs(m_CacheMappingStats.LocalScanElapsedWallTimeUs / 1000)); + } + if (m_CacheMappingStats.ScavengedPathsMatchingSequencesCount > 0 || m_CacheMappingStats.ScavengedChunkMatchingRemoteCount > 0) + { + LOG_OUTPUT(m_LogOutput, + "Scavenge of {} paths, found {} ({}) chunk sequences, {} ({}) chunks in {}", + ScavengedPathsCount, + m_CacheMappingStats.ScavengedPathsMatchingSequencesCount, + NiceBytes(m_CacheMappingStats.ScavengedPathsMatchingSequencesByteCount), + m_CacheMappingStats.ScavengedChunkMatchingRemoteCount, + NiceBytes(m_CacheMappingStats.ScavengedChunkMatchingRemoteByteCount), + NiceTimeSpanMs(m_CacheMappingStats.ScavengeElapsedWallTimeUs / 1000)); + } + } + + uint64_t BytesToWrite = 0; + + for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < m_RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++) + { + uint64_t ChunkWriteCount = GetChunkWriteCount(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); + if (ChunkWriteCount > 0) + { + BytesToWrite += m_RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] * ChunkWriteCount; + if (!RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) + { + RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex] = true; + } + } + } + + for (const ScavengedSequenceCopyOperation& ScavengeCopyOp : ScavengedSequenceCopyOperations) + { + BytesToWrite += ScavengeCopyOp.RawSize; + } + + uint64_t TotalRequestCount = 0; + uint64_t TotalPartWriteCount = 0; + std::atomic<uint64_t> WritePartsComplete = 0; + + { + ZEN_TRACE_CPU("WriteChunks"); + + Stopwatch WriteTimer; + + FilteredRate FilteredDownloadedBytesPerSecond; + FilteredRate FilteredWrittenBytesPerSecond; + + std::unique_ptr<BuildOpLogOutput::ProgressBar> WriteProgressBarPtr( + m_LogOutput.CreateProgressBar(m_Options.PrimeCacheOnly ? "Downloading" : "Writing")); + BuildOpLogOutput::ProgressBar& WriteProgressBar(*WriteProgressBarPtr); + ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + struct LooseChunkHashWorkData + { + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs; + uint32_t RemoteChunkIndex = (uint32_t)-1; + }; + + std::vector<LooseChunkHashWorkData> LooseChunkHashWorks; + TotalPartWriteCount += CopyChunkDatas.size(); + TotalPartWriteCount += ScavengedSequenceCopyOperations.size(); + + for (const IoHash ChunkHash : m_LooseChunkHashes) + { + auto RemoteChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); + ZEN_ASSERT(RemoteChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end()); + const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second; + if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) + { + if (m_Options.IsVerbose) + { + LOG_OUTPUT(m_LogOutput, "Skipping chunk {} due to cache reuse", ChunkHash); + } + continue; + } + bool NeedsCopy = true; + if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex].compare_exchange_strong(NeedsCopy, false)) + { + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = + GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); + + if (ChunkTargetPtrs.empty()) + { + if (m_Options.IsVerbose) + { + LOG_OUTPUT(m_LogOutput, "Skipping chunk {} due to cache reuse", ChunkHash); + } + } + else + { + TotalRequestCount++; + TotalPartWriteCount++; + LooseChunkHashWorks.push_back( + LooseChunkHashWorkData{.ChunkTargetPtrs = ChunkTargetPtrs, .RemoteChunkIndex = RemoteChunkIndex}); + } + } + } + + uint32_t BlockCount = gsl::narrow<uint32_t>(m_BlockDescriptions.size()); + + std::vector<bool> ChunkIsPickedUpByBlock(m_RemoteContent.ChunkedContent.ChunkHashes.size(), false); + auto GetNeededChunkBlockIndexes = + [this, &RemoteChunkIndexNeedsCopyFromSourceFlags, &ChunkIsPickedUpByBlock](const ChunkBlockDescription& BlockDescription) { + ZEN_TRACE_CPU("GetNeededChunkBlockIndexes"); + std::vector<uint32_t> NeededBlockChunkIndexes; + for (uint32_t ChunkBlockIndex = 0; ChunkBlockIndex < BlockDescription.ChunkRawHashes.size(); ChunkBlockIndex++) + { + const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex]; + if (auto It = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); It != m_RemoteLookup.ChunkHashToChunkIndex.end()) + { + const uint32_t RemoteChunkIndex = It->second; + if (!ChunkIsPickedUpByBlock[RemoteChunkIndex]) + { + if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex]) + { + ChunkIsPickedUpByBlock[RemoteChunkIndex] = true; + NeededBlockChunkIndexes.push_back(ChunkBlockIndex); + } + } + } + } + return NeededBlockChunkIndexes; + }; + + std::vector<uint32_t> CachedChunkBlockIndexes; + std::vector<uint32_t> FetchBlockIndexes; + std::vector<std::vector<uint32_t>> AllBlockChunkIndexNeeded; + + for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; BlockIndex++) + { + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + + std::vector<uint32_t> BlockChunkIndexNeeded = GetNeededChunkBlockIndexes(BlockDescription); + if (!BlockChunkIndexNeeded.empty()) + { + if (m_Options.PrimeCacheOnly) + { + FetchBlockIndexes.push_back(BlockIndex); + } + else + { + bool UsingCachedBlock = false; + if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end()) + { + TotalPartWriteCount++; + + std::filesystem::path BlockPath = + ZenTempBlockFolderPath(m_Options.ZenFolderPath) / BlockDescription.BlockHash.ToHexString(); + if (IsFile(BlockPath)) + { + CachedChunkBlockIndexes.push_back(BlockIndex); + UsingCachedBlock = true; + } + } + if (!UsingCachedBlock) + { + FetchBlockIndexes.push_back(BlockIndex); + } + } + } + AllBlockChunkIndexNeeded.emplace_back(std::move(BlockChunkIndexNeeded)); + } + + struct BlobsExistsResult + { + tsl::robin_set<IoHash> ExistingBlobs; + uint64_t ElapsedTimeMs = 0; + }; + + BlobsExistsResult ExistsResult; + + if (m_Storage.BuildCacheStorage) + { + ZEN_TRACE_CPU("BlobCacheExistCheck"); + Stopwatch Timer; + + tsl::robin_set<IoHash> BlobHashesSet; + + BlobHashesSet.reserve(LooseChunkHashWorks.size() + FetchBlockIndexes.size()); + for (LooseChunkHashWorkData& LooseChunkHashWork : LooseChunkHashWorks) + { + BlobHashesSet.insert(m_RemoteContent.ChunkedContent.ChunkHashes[LooseChunkHashWork.RemoteChunkIndex]); + } + for (uint32_t BlockIndex : FetchBlockIndexes) + { + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + BlobHashesSet.insert(BlockDescription.BlockHash); + } + + if (!BlobHashesSet.empty()) + { + const std::vector<IoHash> BlobHashes(BlobHashesSet.begin(), BlobHashesSet.end()); + const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult = + m_Storage.BuildCacheStorage->BlobsExists(m_BuildId, BlobHashes); + + if (CacheExistsResult.size() == BlobHashes.size()) + { + ExistsResult.ExistingBlobs.reserve(CacheExistsResult.size()); + for (size_t BlobIndex = 0; BlobIndex < BlobHashes.size(); BlobIndex++) + { + if (CacheExistsResult[BlobIndex].HasBody) + { + ExistsResult.ExistingBlobs.insert(BlobHashes[BlobIndex]); + } + } + } + ExistsResult.ElapsedTimeMs = Timer.GetElapsedTimeMs(); + if (!ExistsResult.ExistingBlobs.empty() && !m_Options.IsQuiet) + { + LOG_OUTPUT(m_LogOutput, + "Remote cache : Found {} out of {} needed blobs in {}", + ExistsResult.ExistingBlobs.size(), + BlobHashes.size(), + NiceTimeSpanMs(ExistsResult.ElapsedTimeMs)); + } + } + } + + std::vector<BlockRangeDescriptor> BlockRangeWorks; + std::vector<uint32_t> FullBlockWorks; + { + Stopwatch Timer; + + std::vector<uint32_t> PartialBlockIndexes; + + for (uint32_t BlockIndex : FetchBlockIndexes) + { + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + + const std::vector<uint32_t> BlockChunkIndexNeeded = std::move(AllBlockChunkIndexNeeded[BlockIndex]); + if (!BlockChunkIndexNeeded.empty()) + { + bool WantsToDoPartialBlockDownload = BlockChunkIndexNeeded.size() < BlockDescription.ChunkRawHashes.size(); + bool CanDoPartialBlockDownload = (BlockDescription.HeaderSize > 0) && (BlockDescription.ChunkCompressedLengths.size() == + BlockDescription.ChunkRawHashes.size()); + + bool AllowedToDoPartialRequest = false; + bool BlockExistInCache = ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash); + switch (m_Options.PartialBlockRequestMode) + { + case EPartialBlockRequestMode::Off: + break; + case EPartialBlockRequestMode::ZenCacheOnly: + AllowedToDoPartialRequest = BlockExistInCache; + break; + case EPartialBlockRequestMode::Mixed: + case EPartialBlockRequestMode::All: + AllowedToDoPartialRequest = true; + break; + default: + ZEN_ASSERT(false); + break; + } + + const uint32_t ChunkStartOffsetInBlock = + gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); + + const uint64_t TotalBlockSize = std::accumulate(BlockDescription.ChunkCompressedLengths.begin(), + BlockDescription.ChunkCompressedLengths.end(), + std::uint64_t(ChunkStartOffsetInBlock)); + + if (AllowedToDoPartialRequest && WantsToDoPartialBlockDownload && CanDoPartialBlockDownload) + { + ZEN_TRACE_CPU("PartialBlockAnalysis"); + + bool LimitToSingleRange = + BlockExistInCache ? false : m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Mixed; + uint64_t TotalWantedChunksSize = 0; + std::optional<std::vector<BlockRangeDescriptor>> MaybeBlockRanges = CalculateBlockRanges(m_LogOutput, + m_Options.IsQuiet, + BlockIndex, + BlockDescription, + BlockChunkIndexNeeded, + LimitToSingleRange, + ChunkStartOffsetInBlock, + TotalBlockSize, + TotalWantedChunksSize); + ZEN_ASSERT(TotalWantedChunksSize <= TotalBlockSize); + + if (MaybeBlockRanges.has_value()) + { + const std::vector<BlockRangeDescriptor>& BlockRanges = MaybeBlockRanges.value(); + ZEN_ASSERT(!BlockRanges.empty()); + BlockRangeWorks.insert(BlockRangeWorks.end(), BlockRanges.begin(), BlockRanges.end()); + TotalRequestCount += BlockRanges.size(); + TotalPartWriteCount += BlockRanges.size(); + + uint64_t RequestedSize = std::accumulate( + BlockRanges.begin(), + BlockRanges.end(), + uint64_t(0), + [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; }); + PartialBlockIndexes.push_back(BlockIndex); + + if (RequestedSize > TotalWantedChunksSize) + { + if (m_Options.IsVerbose) + { + LOG_OUTPUT(m_LogOutput, + "Requesting {} chunks ({}) from block {} ({}) using {} requests (extra bytes {})", + BlockChunkIndexNeeded.size(), + NiceBytes(RequestedSize), + BlockDescription.BlockHash, + NiceBytes(TotalBlockSize), + BlockRanges.size(), + NiceBytes(RequestedSize - TotalWantedChunksSize)); + } + } + } + else + { + FullBlockWorks.push_back(BlockIndex); + TotalRequestCount++; + TotalPartWriteCount++; + } + } + else + { + FullBlockWorks.push_back(BlockIndex); + TotalRequestCount++; + TotalPartWriteCount++; + } + } + } + + if (!PartialBlockIndexes.empty()) + { + uint64_t TotalFullBlockRequestBytes = 0; + for (uint32_t BlockIndex : FullBlockWorks) + { + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + uint32_t CurrentOffset = + gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); + + TotalFullBlockRequestBytes += std::accumulate(BlockDescription.ChunkCompressedLengths.begin(), + BlockDescription.ChunkCompressedLengths.end(), + std::uint64_t(CurrentOffset)); + } + + uint64_t TotalPartialBlockBytes = 0; + for (uint32_t BlockIndex : PartialBlockIndexes) + { + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + uint32_t CurrentOffset = + gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); + + TotalPartialBlockBytes += std::accumulate(BlockDescription.ChunkCompressedLengths.begin(), + BlockDescription.ChunkCompressedLengths.end(), + std::uint64_t(CurrentOffset)); + } + + uint64_t NonPartialTotalBlockBytes = TotalFullBlockRequestBytes + TotalPartialBlockBytes; + + const uint64_t TotalPartialBlockRequestBytes = + std::accumulate(BlockRangeWorks.begin(), + BlockRangeWorks.end(), + uint64_t(0), + [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; }); + uint64_t TotalExtraPartialBlocksRequests = BlockRangeWorks.size() - PartialBlockIndexes.size(); + + uint64_t TotalSavedBlocksSize = TotalPartialBlockBytes - TotalPartialBlockRequestBytes; + double SavedSizePercent = (TotalSavedBlocksSize * 100.0) / NonPartialTotalBlockBytes; + + if (!m_Options.IsQuiet) + { + LOG_OUTPUT(m_LogOutput, + "Analisys of partial block requests saves download of {} out of {} ({:.1f}%) using {} extra " + "requests. Completed in {}", + NiceBytes(TotalSavedBlocksSize), + NiceBytes(NonPartialTotalBlockBytes), + SavedSizePercent, + TotalExtraPartialBlocksRequests, + NiceTimeSpanMs(ExistsResult.ElapsedTimeMs)); + } + } + } + + BufferedWriteFileCache WriteCache; + + for (uint32_t ScavengeOpIndex = 0; ScavengeOpIndex < ScavengedSequenceCopyOperations.size(); ScavengeOpIndex++) + { + if (m_AbortFlag) + { + break; + } + if (!m_Options.PrimeCacheOnly) + { + Work.ScheduleWork( + m_IOWorkerPool, + [this, + &ScavengedPaths, + &ScavengedSequenceCopyOperations, + &ScavengedContents, + &FilteredWrittenBytesPerSecond, + ScavengeOpIndex, + &WritePartsComplete, + TotalPartWriteCount](std::atomic<bool>&) mutable { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_WriteScavenged"); + + FilteredWrittenBytesPerSecond.Start(); + + const ScavengedSequenceCopyOperation& ScavengeOp = ScavengedSequenceCopyOperations[ScavengeOpIndex]; + const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengeOp.ScavengedContentIndex]; + const std::filesystem::path& ScavengeRootPath = ScavengedPaths[ScavengeOp.ScavengedContentIndex]; + + WriteScavengedSequenceToCache(ScavengeRootPath, ScavengedContent, ScavengeOp); + + WritePartsComplete++; + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } + } + }); + } + } + + for (uint32_t LooseChunkHashWorkIndex = 0; LooseChunkHashWorkIndex < LooseChunkHashWorks.size(); LooseChunkHashWorkIndex++) + { + if (m_AbortFlag) + { + break; + } + + LooseChunkHashWorkData& LooseChunkHashWork = LooseChunkHashWorks[LooseChunkHashWorkIndex]; + + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = std::move(LooseChunkHashWork.ChunkTargetPtrs); + const uint32_t RemoteChunkIndex = LooseChunkHashWork.RemoteChunkIndex; + + if (m_Options.PrimeCacheOnly && + ExistsResult.ExistingBlobs.contains(m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex])) + { + m_DownloadStats.RequestsCompleteCount++; + continue; + } + + Work.ScheduleWork( + m_IOWorkerPool, + [this, + &SequenceIndexChunksLeftToWriteCounters, + &Work, + &ExistsResult, + &WritePartsComplete, + RemoteChunkIndex, + ChunkTargetPtrs, + TotalRequestCount, + TotalPartWriteCount, + &WriteCache, + &FilteredDownloadedBytesPerSecond, + &FilteredWrittenBytesPerSecond](std::atomic<bool>&) mutable { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_ReadPreDownloadedChunk"); + + std::filesystem::path ExistingCompressedChunkPath; + if (!m_Options.PrimeCacheOnly) + { + const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + ExistingCompressedChunkPath = FindDownloadedChunk(ChunkHash); + if (!ExistingCompressedChunkPath.empty()) + { + m_DownloadStats.RequestsCompleteCount++; + if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + } + } + if (!m_AbortFlag) + + { + if (!ExistingCompressedChunkPath.empty()) + { + Work.ScheduleWork( + m_IOWorkerPool, + [this, + &SequenceIndexChunksLeftToWriteCounters, + &WriteCache, + &Work, + &WritePartsComplete, + TotalPartWriteCount, + &FilteredWrittenBytesPerSecond, + RemoteChunkIndex, + ChunkTargetPtrs, + CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>& AbortFlag) mutable { + if (!AbortFlag) + { + ZEN_TRACE_CPU("Async_WritePreDownloadedChunk"); + + FilteredWrittenBytesPerSecond.Start(); + + const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + + IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); + if (!CompressedPart) + { + throw std::runtime_error(fmt::format("Could not open dowloaded compressed chunk {} from {}", + ChunkHash, + CompressedChunkPath)); + } + + bool NeedHashVerify = WriteCompressedChunkToCache(ChunkHash, + ChunkTargetPtrs, + WriteCache, + std::move(CompressedPart)); + WritePartsComplete++; + + if (!AbortFlag) + { + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } + + TryRemoveFile(m_LogOutput, CompressedChunkPath); + + std::vector<uint32_t> CompletedSequences = + CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); + WriteCache.Close(CompletedSequences); + if (NeedHashVerify) + { + VerifyAndCompleteChunkSequencesAsync(CompletedSequences, Work); + } + else + { + FinalizeChunkSequences(CompletedSequences); + } + } + } + }); + } + else + { + Work.ScheduleWork( + m_NetworkPool, + [this, + &ExistsResult, + &SequenceIndexChunksLeftToWriteCounters, + &WriteCache, + &Work, + &WritePartsComplete, + TotalPartWriteCount, + TotalRequestCount, + &FilteredDownloadedBytesPerSecond, + &FilteredWrittenBytesPerSecond, + RemoteChunkIndex, + ChunkTargetPtrs](std::atomic<bool>&) mutable { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_DownloadChunk"); + + const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + FilteredDownloadedBytesPerSecond.Start(); + IoBuffer BuildBlob; + const bool ExistsInCache = + m_Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash); + if (ExistsInCache) + { + BuildBlob = m_Storage.BuildCacheStorage->GetBuildBlob(m_BuildId, ChunkHash); + } + if (BuildBlob) + { + uint64_t BlobSize = BuildBlob.GetSize(); + m_DownloadStats.DownloadedChunkCount++; + m_DownloadStats.DownloadedChunkByteCount += BlobSize; + m_DownloadStats.RequestsCompleteCount++; + if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + AsyncWriteDownloadedChunk(m_Options.ZenFolderPath, + RemoteChunkIndex, + std::move(ChunkTargetPtrs), + WriteCache, + Work, + std::move(BuildBlob), + SequenceIndexChunksLeftToWriteCounters, + WritePartsComplete, + TotalPartWriteCount, + FilteredWrittenBytesPerSecond); + } + else + { + if (m_RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= + m_Options.LargeAttachmentSize) + { + DownloadLargeBlob(*m_Storage.BuildStorage, + ZenTempDownloadFolderPath(m_Options.ZenFolderPath), + m_BuildId, + ChunkHash, + m_Options.PreferredMultipartChunkSize, + Work, + m_NetworkPool, + m_DownloadStats, + [this, + &SequenceIndexChunksLeftToWriteCounters, + &WriteCache, + &Work, + ChunkHash, + TotalPartWriteCount, + TotalRequestCount, + &WritePartsComplete, + &FilteredWrittenBytesPerSecond, + &FilteredDownloadedBytesPerSecond, + RemoteChunkIndex, + ChunkTargetPtrs](IoBuffer&& Payload) mutable { + m_DownloadStats.RequestsCompleteCount++; + if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + if (Payload && m_Storage.BuildCacheStorage) + { + m_Storage.BuildCacheStorage->PutBuildBlob( + m_BuildId, + ChunkHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(SharedBuffer(Payload))); + } + if (!m_Options.PrimeCacheOnly) + { + if (!m_AbortFlag) + { + AsyncWriteDownloadedChunk( + m_Options.ZenFolderPath, + RemoteChunkIndex, + std::move(ChunkTargetPtrs), + WriteCache, + Work, + std::move(Payload), + SequenceIndexChunksLeftToWriteCounters, + WritePartsComplete, + TotalPartWriteCount, + FilteredWrittenBytesPerSecond); + } + } + }); + } + else + { + BuildBlob = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, ChunkHash); + if (BuildBlob && m_Storage.BuildCacheStorage) + { + m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, + ChunkHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(SharedBuffer(BuildBlob))); + } + if (!BuildBlob) + { + throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash)); + } + if (!m_Options.PrimeCacheOnly) + { + if (!m_AbortFlag) + { + uint64_t BlobSize = BuildBlob.GetSize(); + m_DownloadStats.DownloadedChunkCount++; + m_DownloadStats.DownloadedChunkByteCount += BlobSize; + m_DownloadStats.RequestsCompleteCount++; + if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + AsyncWriteDownloadedChunk(m_Options.ZenFolderPath, + RemoteChunkIndex, + std::move(ChunkTargetPtrs), + WriteCache, + Work, + std::move(BuildBlob), + SequenceIndexChunksLeftToWriteCounters, + WritePartsComplete, + TotalPartWriteCount, + FilteredWrittenBytesPerSecond); + } + } + } + } + } + }); + } + } + } + }); + } + + std::unique_ptr<CloneQueryInterface> CloneQuery; + if (m_Options.AllowFileClone) + { + CloneQuery = GetCloneQueryInterface(m_CacheFolderPath); + } + + for (size_t CopyDataIndex = 0; CopyDataIndex < CopyChunkDatas.size(); CopyDataIndex++) + { + ZEN_ASSERT(!m_Options.PrimeCacheOnly); + if (m_AbortFlag) + { + break; + } + + Work.ScheduleWork(m_IOWorkerPool, + [this, + &CloneQuery, + &SequenceIndexChunksLeftToWriteCounters, + &WriteCache, + &Work, + &FilteredWrittenBytesPerSecond, + &CopyChunkDatas, + &ScavengedContents, + &ScavengedLookups, + &ScavengedPaths, + &WritePartsComplete, + TotalPartWriteCount, + CopyDataIndex](std::atomic<bool>&) { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_CopyLocal"); + + FilteredWrittenBytesPerSecond.Start(); + const CopyChunkData& CopyData = CopyChunkDatas[CopyDataIndex]; + + std::vector<uint32_t> WrittenSequenceIndexes = WriteLocalChunkToCache(CloneQuery.get(), + CopyData, + ScavengedContents, + ScavengedLookups, + ScavengedPaths, + WriteCache); + WritePartsComplete++; + if (!m_AbortFlag) + { + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } + + // Write tracking, updating this must be done without any files open + std::vector<uint32_t> CompletedChunkSequences; + for (uint32_t RemoteSequenceIndex : WrittenSequenceIndexes) + { + if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) + { + CompletedChunkSequences.push_back(RemoteSequenceIndex); + } + } + WriteCache.Close(CompletedChunkSequences); + VerifyAndCompleteChunkSequencesAsync(CompletedChunkSequences, Work); + } + } + }); + } + + for (uint32_t BlockIndex : CachedChunkBlockIndexes) + { + ZEN_ASSERT(!m_Options.PrimeCacheOnly); + if (m_AbortFlag) + { + break; + } + + Work.ScheduleWork(m_IOWorkerPool, + [this, + &RemoteChunkIndexNeedsCopyFromSourceFlags, + &SequenceIndexChunksLeftToWriteCounters, + &WriteCache, + &Work, + &FilteredWrittenBytesPerSecond, + &WritePartsComplete, + TotalPartWriteCount, + BlockIndex](std::atomic<bool>&) mutable { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_WriteCachedBlock"); + + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + FilteredWrittenBytesPerSecond.Start(); + + std::filesystem::path BlockChunkPath = + ZenTempBlockFolderPath(m_Options.ZenFolderPath) / BlockDescription.BlockHash.ToHexString(); + IoBuffer BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); + if (!BlockBuffer) + { + throw std::runtime_error( + fmt::format("Can not read block {} at {}", BlockDescription.BlockHash, BlockChunkPath)); + } + + if (!m_AbortFlag) + { + if (!WriteChunksBlockToCache(BlockDescription, + SequenceIndexChunksLeftToWriteCounters, + Work, + CompositeBuffer(std::move(BlockBuffer)), + RemoteChunkIndexNeedsCopyFromSourceFlags, + WriteCache)) + { + std::error_code DummyEc; + RemoveFile(BlockChunkPath, DummyEc); + throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash)); + } + + TryRemoveFile(m_LogOutput, BlockChunkPath); + + WritePartsComplete++; + + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } + } + } + }); + } + + for (size_t BlockRangeIndex = 0; BlockRangeIndex < BlockRangeWorks.size(); BlockRangeIndex++) + { + ZEN_ASSERT(!m_Options.PrimeCacheOnly); + if (m_AbortFlag) + { + break; + } + const BlockRangeDescriptor BlockRange = BlockRangeWorks[BlockRangeIndex]; + ZEN_ASSERT(BlockRange.BlockIndex != (uint32_t)-1); + const uint32_t BlockIndex = BlockRange.BlockIndex; + Work.ScheduleWork( + m_NetworkPool, + [this, + &RemoteChunkIndexNeedsCopyFromSourceFlags, + &SequenceIndexChunksLeftToWriteCounters, + &ExistsResult, + &WriteCache, + &FilteredDownloadedBytesPerSecond, + TotalRequestCount, + &WritePartsComplete, + TotalPartWriteCount, + &FilteredWrittenBytesPerSecond, + &Work, + BlockIndex, + BlockRange](std::atomic<bool>&) { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_GetPartialBlock"); + + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + + FilteredDownloadedBytesPerSecond.Start(); + IoBuffer BlockBuffer; + if (m_Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash)) + { + BlockBuffer = m_Storage.BuildCacheStorage->GetBuildBlob(m_BuildId, + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength); + } + if (!BlockBuffer) + { + BlockBuffer = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength); + } + if (!BlockBuffer) + { + throw std::runtime_error(fmt::format("Block {} is missing when fetching range {} -> {}", + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeStart + BlockRange.RangeLength)); + } + if (!m_AbortFlag) + { + uint64_t BlockSize = BlockBuffer.GetSize(); + m_DownloadStats.DownloadedBlockCount++; + m_DownloadStats.DownloadedBlockByteCount += BlockSize; + m_DownloadStats.RequestsCompleteCount++; + if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + + std::filesystem::path BlockChunkPath; + + // Check if the dowloaded block is file based and we can move it directly without rewriting it + { + IoBufferFileReference FileRef; + if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && + (FileRef.FileChunkSize == BlockSize)) + { + ZEN_TRACE_CPU("MoveTempPartialBlock"); + + std::error_code Ec; + std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); + if (!Ec) + { + BlockBuffer.SetDeleteOnClose(false); + BlockBuffer = {}; + BlockChunkPath = + ZenTempBlockFolderPath(m_Options.ZenFolderPath) / fmt::format("{}_{:x}_{:x}", + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength); + RenameFile(TempBlobPath, BlockChunkPath, Ec); + if (Ec) + { + BlockChunkPath = std::filesystem::path{}; + + // Re-open the temp file again + BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); + BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); + BlockBuffer.SetDeleteOnClose(true); + } + } + } + } + + if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u)) + { + ZEN_TRACE_CPU("WriteTempPartialBlock"); + // Could not be moved and rather large, lets store it on disk + BlockChunkPath = + ZenTempBlockFolderPath(m_Options.ZenFolderPath) / + fmt::format("{}_{:x}_{:x}", BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength); + TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); + BlockBuffer = {}; + } + + if (!m_AbortFlag) + { + Work.ScheduleWork( + m_IOWorkerPool, + [this, + &RemoteChunkIndexNeedsCopyFromSourceFlags, + &SequenceIndexChunksLeftToWriteCounters, + &WritePartsComplete, + &WriteCache, + &Work, + TotalPartWriteCount, + &FilteredWrittenBytesPerSecond, + BlockIndex, + BlockRange, + BlockChunkPath, + BlockPartialBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_WritePartialBlock"); + + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + + if (BlockChunkPath.empty()) + { + ZEN_ASSERT(BlockPartialBuffer); + } + else + { + ZEN_ASSERT(!BlockPartialBuffer); + BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); + if (!BlockPartialBuffer) + { + throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}", + BlockDescription.BlockHash, + BlockChunkPath)); + } + } + + FilteredWrittenBytesPerSecond.Start(); + + if (!WritePartialBlockChunksToCache( + BlockDescription, + SequenceIndexChunksLeftToWriteCounters, + Work, + CompositeBuffer(std::move(BlockPartialBuffer)), + BlockRange.ChunkBlockIndexStart, + BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1, + RemoteChunkIndexNeedsCopyFromSourceFlags, + WriteCache)) + { + std::error_code DummyEc; + RemoveFile(BlockChunkPath, DummyEc); + throw std::runtime_error( + fmt::format("Partial block {} is malformed", BlockDescription.BlockHash)); + } + + if (!BlockChunkPath.empty()) + { + TryRemoveFile(m_LogOutput, BlockChunkPath); + } + + WritePartsComplete++; + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } + } + }); + } + } + } + }); + } + + for (uint32_t BlockIndex : FullBlockWorks) + { + if (m_AbortFlag) + { + break; + } + + if (m_Options.PrimeCacheOnly && ExistsResult.ExistingBlobs.contains(m_BlockDescriptions[BlockIndex].BlockHash)) + { + m_DownloadStats.RequestsCompleteCount++; + continue; + } + + Work.ScheduleWork( + m_NetworkPool, + [this, + &WritePartsComplete, + TotalPartWriteCount, + &FilteredWrittenBytesPerSecond, + &ExistsResult, + &Work, + &WriteCache, + &RemoteChunkIndexNeedsCopyFromSourceFlags, + &SequenceIndexChunksLeftToWriteCounters, + &FilteredDownloadedBytesPerSecond, + TotalRequestCount, + BlockIndex](std::atomic<bool>&) { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_GetFullBlock"); + + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + + FilteredDownloadedBytesPerSecond.Start(); + + IoBuffer BlockBuffer; + const bool ExistsInCache = + m_Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash); + if (ExistsInCache) + { + BlockBuffer = m_Storage.BuildCacheStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash); + } + if (!BlockBuffer) + { + BlockBuffer = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash); + if (BlockBuffer && m_Storage.BuildCacheStorage) + { + m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, + BlockDescription.BlockHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(SharedBuffer(BlockBuffer))); + } + } + if (!BlockBuffer) + { + throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); + } + if (!m_AbortFlag) + { + uint64_t BlockSize = BlockBuffer.GetSize(); + m_DownloadStats.DownloadedBlockCount++; + m_DownloadStats.DownloadedBlockByteCount += BlockSize; + m_DownloadStats.RequestsCompleteCount++; + if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + + if (!m_Options.PrimeCacheOnly) + { + std::filesystem::path BlockChunkPath; + + // Check if the dowloaded block is file based and we can move it directly without rewriting it + { + IoBufferFileReference FileRef; + if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && + (FileRef.FileChunkSize == BlockSize)) + { + ZEN_TRACE_CPU("MoveTempFullBlock"); + std::error_code Ec; + std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); + if (!Ec) + { + BlockBuffer.SetDeleteOnClose(false); + BlockBuffer = {}; + BlockChunkPath = + ZenTempBlockFolderPath(m_Options.ZenFolderPath) / BlockDescription.BlockHash.ToHexString(); + RenameFile(TempBlobPath, BlockChunkPath, Ec); + if (Ec) + { + BlockChunkPath = std::filesystem::path{}; + + // Re-open the temp file again + BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); + BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); + BlockBuffer.SetDeleteOnClose(true); + } + } + } + } + + if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u)) + { + ZEN_TRACE_CPU("WriteTempFullBlock"); + // Could not be moved and rather large, lets store it on disk + BlockChunkPath = + ZenTempBlockFolderPath(m_Options.ZenFolderPath) / BlockDescription.BlockHash.ToHexString(); + TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); + BlockBuffer = {}; + } + + if (!m_AbortFlag) + { + Work.ScheduleWork( + m_IOWorkerPool, + [this, + &Work, + &RemoteChunkIndexNeedsCopyFromSourceFlags, + &SequenceIndexChunksLeftToWriteCounters, + BlockIndex, + &WriteCache, + &WritePartsComplete, + TotalPartWriteCount, + &FilteredWrittenBytesPerSecond, + BlockChunkPath, + BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_WriteFullBlock"); + + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + + if (BlockChunkPath.empty()) + { + ZEN_ASSERT(BlockBuffer); + } + else + { + ZEN_ASSERT(!BlockBuffer); + BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); + if (!BlockBuffer) + { + throw std::runtime_error(fmt::format("Could not open dowloaded block {} from {}", + BlockDescription.BlockHash, + BlockChunkPath)); + } + } + + FilteredWrittenBytesPerSecond.Start(); + if (!WriteChunksBlockToCache(BlockDescription, + SequenceIndexChunksLeftToWriteCounters, + Work, + CompositeBuffer(std::move(BlockBuffer)), + RemoteChunkIndexNeedsCopyFromSourceFlags, + WriteCache)) + { + std::error_code DummyEc; + RemoveFile(BlockChunkPath, DummyEc); + throw std::runtime_error( + fmt::format("Block {} is malformed", BlockDescription.BlockHash)); + } + + if (!BlockChunkPath.empty()) + { + TryRemoveFile(m_LogOutput, BlockChunkPath); + } + + WritePartsComplete++; + + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } + } + }); + } + } + } + } + }); + } + + { + ZEN_TRACE_CPU("WriteChunks_Wait"); + + Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); + uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() + + m_DownloadStats.DownloadedBlockByteCount.load() + + +m_DownloadStats.DownloadedPartialBlockByteCount.load(); + FilteredWrittenBytesPerSecond.Update(m_DiskStats.WriteByteCount.load()); + FilteredDownloadedBytesPerSecond.Update(DownloadedBytes); + std::string DownloadRateString = (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) + ? "" + : fmt::format(" {}bits/s", NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8)); + std::string CloneDetails; + if (m_DiskStats.CloneCount.load() > 0) + { + CloneDetails = fmt::format(" ({} cloned)", NiceBytes(m_DiskStats.CloneByteCount.load())); + } + std::string WriteDetails = m_Options.PrimeCacheOnly ? "" + : fmt::format(" {}/{} ({}B/s) written{}.", + NiceBytes(m_DiskStats.WriteByteCount.load()), + NiceBytes(BytesToWrite), + NiceNum(FilteredWrittenBytesPerSecond.GetCurrent()), + CloneDetails); + std::string Details = fmt::format("{}/{} ({}{}) downloaded.{}", + m_DownloadStats.RequestsCompleteCount.load(), + TotalRequestCount, + NiceBytes(DownloadedBytes), + DownloadRateString, + WriteDetails); + WriteProgressBar.UpdateState( + {.Task = m_Options.PrimeCacheOnly ? "Downloading " : "Writing chunks ", + .Details = Details, + .TotalCount = m_Options.PrimeCacheOnly ? TotalRequestCount : BytesToWrite, + .RemainingCount = m_Options.PrimeCacheOnly ? (TotalRequestCount - m_DownloadStats.RequestsCompleteCount.load()) + : (BytesToWrite - m_DiskStats.WriteByteCount.load()), + .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, + false); + }); + } + + CloneQuery.reset(); + + FilteredWrittenBytesPerSecond.Stop(); + FilteredDownloadedBytesPerSecond.Stop(); + + WriteProgressBar.Finish(); + if (m_AbortFlag) + { + return; + } + + if (!m_Options.PrimeCacheOnly) + { + uint32_t RawSequencesMissingWriteCount = 0; + for (uint32_t SequenceIndex = 0; SequenceIndex < SequenceIndexChunksLeftToWriteCounters.size(); SequenceIndex++) + { + const auto& SequenceIndexChunksLeftToWriteCounter = SequenceIndexChunksLeftToWriteCounters[SequenceIndex]; + if (SequenceIndexChunksLeftToWriteCounter.load() != 0) + { + RawSequencesMissingWriteCount++; + const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; + const std::filesystem::path& IncompletePath = m_RemoteContent.Paths[PathIndex]; + ZEN_ASSERT(!IncompletePath.empty()); + const uint32_t ExpectedSequenceCount = m_RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]; + if (!m_Options.IsQuiet) + { + LOG_OUTPUT(m_LogOutput, + "{}: Max count {}, Current count {}", + IncompletePath, + ExpectedSequenceCount, + SequenceIndexChunksLeftToWriteCounter.load()); + } + ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounter.load() <= ExpectedSequenceCount); + } + } + ZEN_ASSERT(RawSequencesMissingWriteCount == 0); + } + + const uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() + m_DownloadStats.DownloadedBlockByteCount.load() + + m_DownloadStats.DownloadedPartialBlockByteCount.load(); + if (!m_Options.IsQuiet) + { + std::string CloneDetails; + if (m_DiskStats.CloneCount.load() > 0) + { + CloneDetails = fmt::format(" ({} cloned)", NiceBytes(m_DiskStats.CloneByteCount.load())); + } + LOG_OUTPUT(m_LogOutput, + "Downloaded {} ({}bits/s) in {}. Wrote {} ({}B/s){} in {}. Completed in {}", + NiceBytes(DownloadedBytes), + NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), DownloadedBytes * 8)), + NiceTimeSpanMs(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS() / 1000), + NiceBytes(m_DiskStats.WriteByteCount.load()), + NiceNum(GetBytesPerSecond(FilteredWrittenBytesPerSecond.GetElapsedTimeUS(), m_DiskStats.WriteByteCount.load())), + CloneDetails, + NiceTimeSpanMs(FilteredWrittenBytesPerSecond.GetElapsedTimeUS() / 1000), + NiceTimeSpanMs(WriteTimer.GetElapsedTimeMs())); + } + + m_WriteChunkStats.WriteChunksElapsedWallTimeUs = WriteTimer.GetElapsedTimeUs(); + m_WriteChunkStats.DownloadTimeUs = FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(); + m_WriteChunkStats.WriteTimeUs = FilteredWrittenBytesPerSecond.GetElapsedTimeUS(); + } + + if (m_Options.PrimeCacheOnly) + { + return; + } + + tsl::robin_map<uint32_t, uint32_t> RemotePathIndexToLocalPathIndex; + RemotePathIndexToLocalPathIndex.reserve(m_RemoteContent.Paths.size()); + + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> SequenceHashToLocalPathIndex; + std::vector<uint32_t> RemoveLocalPathIndexes; + + if (m_AbortFlag) + { + return; + } + + { + ZEN_TRACE_CPU("PrepareTarget"); + + tsl::robin_set<IoHash, IoHash::Hasher> CachedRemoteSequences; + tsl::robin_map<std::string, uint32_t> RemotePathToRemoteIndex; + RemotePathToRemoteIndex.reserve(m_RemoteContent.Paths.size()); + for (uint32_t RemotePathIndex = 0; RemotePathIndex < m_RemoteContent.Paths.size(); RemotePathIndex++) + { + RemotePathToRemoteIndex.insert({m_RemoteContent.Paths[RemotePathIndex].generic_string(), RemotePathIndex}); + } + + std::vector<uint32_t> FilesToCache; + + uint64_t MatchCount = 0; + uint64_t PathMismatchCount = 0; + uint64_t HashMismatchCount = 0; + std::atomic<uint64_t> CachedCount = 0; + std::atomic<uint64_t> CachedByteCount = 0; + uint64_t SkippedCount = 0; + uint64_t DeleteCount = 0; + for (uint32_t LocalPathIndex = 0; LocalPathIndex < m_LocalContent.Paths.size(); LocalPathIndex++) + { + if (m_AbortFlag) + { + break; + } + const IoHash& RawHash = m_LocalContent.RawHashes[LocalPathIndex]; + const std::filesystem::path& LocalPath = m_LocalContent.Paths[LocalPathIndex]; + + ZEN_ASSERT_SLOW(IsFile((m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred())); + + if (m_Options.EnableTargetFolderScavenging) + { + if (!m_Options.WipeTargetFolder) + { + if (auto RemotePathIt = RemotePathToRemoteIndex.find(LocalPath.generic_string()); + RemotePathIt != RemotePathToRemoteIndex.end()) + { + const uint32_t RemotePathIndex = RemotePathIt->second; + if (m_RemoteContent.RawHashes[RemotePathIndex] == RawHash) + { + // It is already in it's desired place + RemotePathIndexToLocalPathIndex[RemotePathIndex] = LocalPathIndex; + SequenceHashToLocalPathIndex.insert({RawHash, LocalPathIndex}); + MatchCount++; + continue; + } + else + { + HashMismatchCount++; + } + } + else + { + PathMismatchCount++; + } + } + if (m_RemoteLookup.RawHashToSequenceIndex.contains(RawHash)) + { + if (!CachedRemoteSequences.contains(RawHash)) + { + ZEN_TRACE_CPU("MoveToCache"); + // We need it + FilesToCache.push_back(LocalPathIndex); + CachedRemoteSequences.insert(RawHash); + } + else + { + // We already have it + SkippedCount++; + } + } + else if (!m_Options.WipeTargetFolder) + { + // We don't need it + RemoveLocalPathIndexes.push_back(LocalPathIndex); + DeleteCount++; + } + } + else + { + // Delete local file as we did not scavenge the folder + RemoveLocalPathIndexes.push_back(LocalPathIndex); + DeleteCount++; + } + } + + if (m_AbortFlag) + { + return; + } + + { + ZEN_TRACE_CPU("CopyToCache"); + + Stopwatch Timer; + + std::unique_ptr<BuildOpLogOutput::ProgressBar> CacheLocalProgressBarPtr(m_LogOutput.CreateProgressBar("Cache Local Data")); + BuildOpLogOutput::ProgressBar& CacheLocalProgressBar(*CacheLocalProgressBarPtr); + ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + for (uint32_t LocalPathIndex : FilesToCache) + { + if (m_AbortFlag) + { + break; + } + Work.ScheduleWork(m_IOWorkerPool, [this, &CachedCount, &CachedByteCount, LocalPathIndex](std::atomic<bool>&) { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_CopyToCache"); + + const IoHash& RawHash = m_LocalContent.RawHashes[LocalPathIndex]; + const std::filesystem::path& LocalPath = m_LocalContent.Paths[LocalPathIndex]; + const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RawHash); + ZEN_ASSERT_SLOW(!IsFileWithRetry(CacheFilePath)); + const std::filesystem::path LocalFilePath = (m_Path / LocalPath).make_preferred(); + RenameFileWithRetry(m_LogOutput, LocalFilePath, CacheFilePath); + CachedCount++; + CachedByteCount += m_LocalContent.RawSizes[LocalPathIndex]; + } + }); + } + + { + ZEN_TRACE_CPU("CopyToCache_Wait"); + + Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); + const uint64_t WorkTotal = FilesToCache.size(); + const uint64_t WorkComplete = CachedCount.load(); + std::string Details = fmt::format("{}/{} ({}) files", WorkComplete, WorkTotal, NiceBytes(CachedByteCount)); + CacheLocalProgressBar.UpdateState( + {.Task = "Caching local ", + .Details = Details, + .TotalCount = gsl::narrow<uint64_t>(WorkTotal), + .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete), + .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, + false); + }); + } + + CacheLocalProgressBar.Finish(); + if (m_AbortFlag) + { + return; + } + + LOG_OUTPUT_DEBUG(m_LogOutput, + "Local state prep: Match: {}, PathMismatch: {}, HashMismatch: {}, Cached: {} ({}), Skipped: {}, " + "Delete: {}", + MatchCount, + PathMismatchCount, + HashMismatchCount, + CachedCount.load(), + NiceBytes(CachedByteCount.load()), + SkippedCount, + DeleteCount); + } + } + + if (m_Options.WipeTargetFolder) + { + ZEN_TRACE_CPU("WipeTarget"); + Stopwatch Timer; + + // Clean target folder + if (!CleanDirectory(m_IOWorkerPool, + m_AbortFlag, + m_PauseFlag, + m_LogOutput, + m_Options.IsQuiet, + m_Path, + m_Options.DefaultExcludeFolders)) + { + LOG_OUTPUT_WARN(m_LogOutput, "Some files in {} could not be removed", m_Path); + } + m_RebuildFolderStateStats.CleanFolderElapsedWallTimeUs = Timer.GetElapsedTimeUs(); + } + + if (m_AbortFlag) + { + return; + } + + { + ZEN_TRACE_CPU("FinalizeTree"); + Stopwatch Timer; + + std::unique_ptr<BuildOpLogOutput::ProgressBar> RebuildProgressBarPtr(m_LogOutput.CreateProgressBar("Rebuild State")); + BuildOpLogOutput::ProgressBar& RebuildProgressBar(*RebuildProgressBarPtr); + ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + OutLocalFolderState.Paths.resize(m_RemoteContent.Paths.size()); + OutLocalFolderState.RawSizes.resize(m_RemoteContent.Paths.size()); + OutLocalFolderState.Attributes.resize(m_RemoteContent.Paths.size()); + OutLocalFolderState.ModificationTicks.resize(m_RemoteContent.Paths.size()); + + std::atomic<uint64_t> DeletedCount = 0; + + for (uint32_t LocalPathIndex : RemoveLocalPathIndexes) + { + if (m_AbortFlag) + { + break; + } + Work.ScheduleWork(m_IOWorkerPool, [this, &DeletedCount, LocalPathIndex](std::atomic<bool>&) { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_RemoveFile"); + + const std::filesystem::path LocalFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred(); + SetFileReadOnlyWithRetry(LocalFilePath, false); + RemoveFileWithRetry(LocalFilePath); + DeletedCount++; + } + }); + } + + std::atomic<uint64_t> TargetsComplete = 0; + + struct FinalizeTarget + { + IoHash RawHash; + uint32_t RemotePathIndex; + }; + + std::vector<FinalizeTarget> Targets; + Targets.reserve(m_RemoteContent.Paths.size()); + for (uint32_t RemotePathIndex = 0; RemotePathIndex < m_RemoteContent.Paths.size(); RemotePathIndex++) + { + Targets.push_back(FinalizeTarget{.RawHash = m_RemoteContent.RawHashes[RemotePathIndex], .RemotePathIndex = RemotePathIndex}); + } + std::sort(Targets.begin(), Targets.end(), [](const FinalizeTarget& Lhs, const FinalizeTarget& Rhs) { + if (Lhs.RawHash < Rhs.RawHash) + { + return true; + } + else if (Lhs.RawHash > Rhs.RawHash) + { + return false; + } + return Lhs.RemotePathIndex < Rhs.RemotePathIndex; + }); + + size_t TargetOffset = 0; + while (TargetOffset < Targets.size()) + { + if (m_AbortFlag) + { + break; + } + + size_t TargetCount = 1; + while ((TargetOffset + TargetCount) < Targets.size() && + (Targets[TargetOffset + TargetCount].RawHash == Targets[TargetOffset].RawHash)) + { + TargetCount++; + } + + Work.ScheduleWork( + m_IOWorkerPool, + [this, + &SequenceHashToLocalPathIndex, + &Targets, + &RemotePathIndexToLocalPathIndex, + &OutLocalFolderState, + BaseTargetOffset = TargetOffset, + TargetCount, + &TargetsComplete](std::atomic<bool>&) { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_FinalizeChunkSequence"); + + size_t TargetOffset = BaseTargetOffset; + const IoHash& RawHash = Targets[TargetOffset].RawHash; + + if (RawHash == IoHash::Zero) + { + ZEN_TRACE_CPU("CreateEmptyFiles"); + while (TargetOffset < (BaseTargetOffset + TargetCount)) + { + const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex; + ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash); + const std::filesystem::path& TargetPath = m_RemoteContent.Paths[RemotePathIndex]; + std::filesystem::path TargetFilePath = (m_Path / TargetPath).make_preferred(); + if (!RemotePathIndexToLocalPathIndex[RemotePathIndex]) + { + if (IsFileWithRetry(TargetFilePath)) + { + SetFileReadOnlyWithRetry(TargetFilePath, false); + } + else + { + CreateDirectories(TargetFilePath.parent_path()); + } + BasicFile OutputFile; + OutputFile.Open(TargetFilePath, BasicFile::Mode::kTruncate); + } + OutLocalFolderState.Paths[RemotePathIndex] = TargetPath; + OutLocalFolderState.RawSizes[RemotePathIndex] = m_RemoteContent.RawSizes[RemotePathIndex]; + + OutLocalFolderState.Attributes[RemotePathIndex] = + m_RemoteContent.Attributes.empty() + ? GetNativeFileAttributes(TargetFilePath) + : SetNativeFileAttributes(TargetFilePath, + m_RemoteContent.Platform, + m_RemoteContent.Attributes[RemotePathIndex]); + OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath); + + TargetOffset++; + TargetsComplete++; + } + } + else + { + ZEN_TRACE_CPU("FinalizeFile"); + ZEN_ASSERT(m_RemoteLookup.RawHashToSequenceIndex.contains(RawHash)); + const uint32_t FirstRemotePathIndex = Targets[TargetOffset].RemotePathIndex; + const std::filesystem::path& FirstTargetPath = m_RemoteContent.Paths[FirstRemotePathIndex]; + std::filesystem::path FirstTargetFilePath = (m_Path / FirstTargetPath).make_preferred(); + + if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(FirstRemotePathIndex); + InPlaceIt != RemotePathIndexToLocalPathIndex.end()) + { + ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath)); + } + else + { + if (IsFileWithRetry(FirstTargetFilePath)) + { + SetFileReadOnlyWithRetry(FirstTargetFilePath, false); + } + else + { + CreateDirectories(FirstTargetFilePath.parent_path()); + } + + if (auto InplaceIt = SequenceHashToLocalPathIndex.find(RawHash); + InplaceIt != SequenceHashToLocalPathIndex.end()) + { + ZEN_TRACE_CPU("Copy"); + const uint32_t LocalPathIndex = InplaceIt->second; + const std::filesystem::path& SourcePath = m_LocalContent.Paths[LocalPathIndex]; + std::filesystem::path SourceFilePath = (m_Path / SourcePath).make_preferred(); + ZEN_ASSERT_SLOW(IsFileWithRetry(SourceFilePath)); + + LOG_OUTPUT_DEBUG(m_LogOutput, "Copying from '{}' -> '{}'", SourceFilePath, FirstTargetFilePath); + const uint64_t RawSize = m_LocalContent.RawSizes[LocalPathIndex]; + std::atomic<uint64_t> WriteCount; + std::atomic<uint64_t> WriteByteCount; + std::atomic<uint64_t> CloneCount; + std::atomic<uint64_t> CloneByteCount; + CopyFile(m_Options.AllowFileClone, + m_Options.UseSparseFiles, + SourceFilePath, + FirstTargetFilePath, + RawSize, + WriteCount, + WriteByteCount, + CloneCount, + CloneByteCount); + m_RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++; + } + else + { + ZEN_TRACE_CPU("Rename"); + const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RawHash); + ZEN_ASSERT_SLOW(IsFileWithRetry(CacheFilePath)); + + RenameFileWithRetry(m_LogOutput, CacheFilePath, FirstTargetFilePath); + + m_RebuildFolderStateStats.FinalizeTreeFilesMovedCount++; + } + } + + OutLocalFolderState.Paths[FirstRemotePathIndex] = FirstTargetPath; + OutLocalFolderState.RawSizes[FirstRemotePathIndex] = m_RemoteContent.RawSizes[FirstRemotePathIndex]; + + OutLocalFolderState.Attributes[FirstRemotePathIndex] = + m_RemoteContent.Attributes.empty() + ? GetNativeFileAttributes(FirstTargetFilePath) + : SetNativeFileAttributes(FirstTargetFilePath, + m_RemoteContent.Platform, + m_RemoteContent.Attributes[FirstRemotePathIndex]); + OutLocalFolderState.ModificationTicks[FirstRemotePathIndex] = GetModificationTickFromPath(FirstTargetFilePath); + + TargetOffset++; + TargetsComplete++; + + while (TargetOffset < (BaseTargetOffset + TargetCount)) + { + const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex; + ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash); + const std::filesystem::path& TargetPath = m_RemoteContent.Paths[RemotePathIndex]; + std::filesystem::path TargetFilePath = (m_Path / TargetPath).make_preferred(); + + if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(RemotePathIndex); + InPlaceIt != RemotePathIndexToLocalPathIndex.end()) + { + ZEN_ASSERT_SLOW(IsFileWithRetry(TargetFilePath)); + } + else + { + ZEN_TRACE_CPU("Copy"); + if (IsFileWithRetry(TargetFilePath)) + { + SetFileReadOnlyWithRetry(TargetFilePath, false); + } + else + { + CreateDirectories(TargetFilePath.parent_path()); + } + + ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath)); + LOG_OUTPUT_DEBUG(m_LogOutput, "Copying from '{}' -> '{}'", FirstTargetFilePath, TargetFilePath); + const uint64_t RawSize = m_RemoteContent.RawSizes[RemotePathIndex]; + std::atomic<uint64_t> WriteCount; + std::atomic<uint64_t> WriteByteCount; + std::atomic<uint64_t> CloneCount; + std::atomic<uint64_t> CloneByteCount; + CopyFile(m_Options.AllowFileClone, + m_Options.UseSparseFiles, + FirstTargetFilePath, + TargetFilePath, + RawSize, + WriteCount, + WriteByteCount, + CloneCount, + CloneByteCount); + m_RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++; + } + + OutLocalFolderState.Paths[RemotePathIndex] = TargetPath; + OutLocalFolderState.RawSizes[RemotePathIndex] = m_RemoteContent.RawSizes[RemotePathIndex]; + + OutLocalFolderState.Attributes[RemotePathIndex] = + m_RemoteContent.Attributes.empty() + ? GetNativeFileAttributes(TargetFilePath) + : SetNativeFileAttributes(TargetFilePath, + m_RemoteContent.Platform, + m_RemoteContent.Attributes[RemotePathIndex]); + OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath); + + TargetOffset++; + TargetsComplete++; + } + } + } + }); + + TargetOffset += TargetCount; + } + + { + ZEN_TRACE_CPU("FinalizeTree_Wait"); + + Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); + const uint64_t WorkTotal = Targets.size() + RemoveLocalPathIndexes.size(); + const uint64_t WorkComplete = TargetsComplete.load() + DeletedCount.load(); + std::string Details = fmt::format("{}/{} files", WorkComplete, WorkTotal); + RebuildProgressBar.UpdateState({.Task = "Rebuilding state ", + .Details = Details, + .TotalCount = gsl::narrow<uint64_t>(WorkTotal), + .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete), + .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, + false); + }); + } + + m_RebuildFolderStateStats.FinalizeTreeElapsedWallTimeUs = Timer.GetElapsedTimeUs(); + RebuildProgressBar.Finish(); + } +} + +void +BuildsOperationUpdateFolder::ScanCacheFolder(tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& OutCachedChunkHashesFound, + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& OutCachedSequenceHashesFound) +{ + ZEN_TRACE_CPU("ScanCacheFolder"); + + Stopwatch CacheTimer; + + DirectoryContent CacheDirContent; + GetDirectoryContent(m_CacheFolderPath, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeFileSizes, CacheDirContent); + for (size_t Index = 0; Index < CacheDirContent.Files.size(); Index++) + { + if (m_Options.EnableTargetFolderScavenging) + { + IoHash FileHash; + if (IoHash::TryParse(CacheDirContent.Files[Index].filename().string(), FileHash)) + { + if (auto ChunkIt = m_RemoteLookup.ChunkHashToChunkIndex.find(FileHash); + ChunkIt != m_RemoteLookup.ChunkHashToChunkIndex.end()) + { + const uint32_t ChunkIndex = ChunkIt->second; + const uint64_t ChunkSize = m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; + if (ChunkSize == CacheDirContent.FileSizes[Index]) + { + OutCachedChunkHashesFound.insert({FileHash, ChunkIndex}); + m_CacheMappingStats.CacheChunkCount++; + m_CacheMappingStats.CacheChunkByteCount += ChunkSize; + continue; + } + } + else if (auto SequenceIt = m_RemoteLookup.RawHashToSequenceIndex.find(FileHash); + SequenceIt != m_RemoteLookup.RawHashToSequenceIndex.end()) + { + const uint32_t SequenceIndex = SequenceIt->second; + const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; + const uint64_t SequenceSize = m_RemoteContent.RawSizes[PathIndex]; + if (SequenceSize == CacheDirContent.FileSizes[Index]) + { + OutCachedSequenceHashesFound.insert({FileHash, SequenceIndex}); + m_CacheMappingStats.CacheSequenceHashesCount++; + m_CacheMappingStats.CacheSequenceHashesByteCount += SequenceSize; + + const std::filesystem::path CacheFilePath = + GetFinalChunkedSequenceFileName(m_CacheFolderPath, + m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); + ZEN_ASSERT_SLOW(IsFile(CacheFilePath)); + + continue; + } + } + } + } + TryRemoveFile(m_LogOutput, CacheDirContent.Files[Index]); + } + m_CacheMappingStats.CacheScanElapsedWallTimeUs += CacheTimer.GetElapsedTimeUs(); +} + +void +BuildsOperationUpdateFolder::ScanTempBlocksFolder(tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& OutCachedBlocksFound) +{ + ZEN_TRACE_CPU("ScanTempBlocksFolder"); + + Stopwatch CacheTimer; + + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> AllBlockSizes; + AllBlockSizes.reserve(m_BlockDescriptions.size()); + for (uint32_t BlockIndex = 0; BlockIndex < m_BlockDescriptions.size(); BlockIndex++) + { + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + AllBlockSizes.insert({BlockDescription.BlockHash, BlockIndex}); + } + + DirectoryContent BlockDirContent; + GetDirectoryContent(ZenTempBlockFolderPath(m_Options.ZenFolderPath), + DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeFileSizes, + BlockDirContent); + OutCachedBlocksFound.reserve(BlockDirContent.Files.size()); + for (size_t Index = 0; Index < BlockDirContent.Files.size(); Index++) + { + if (m_Options.EnableTargetFolderScavenging) + { + IoHash FileHash; + if (IoHash::TryParse(BlockDirContent.Files[Index].filename().string(), FileHash)) + { + if (auto BlockIt = AllBlockSizes.find(FileHash); BlockIt != AllBlockSizes.end()) + { + const uint32_t BlockIndex = BlockIt->second; + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; + uint64_t BlockSize = CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize; + for (uint64_t ChunkSize : BlockDescription.ChunkCompressedLengths) + { + BlockSize += ChunkSize; + } + + if (BlockSize == BlockDirContent.FileSizes[Index]) + { + OutCachedBlocksFound.insert({FileHash, BlockIndex}); + m_CacheMappingStats.CacheBlockCount++; + m_CacheMappingStats.CacheBlocksByteCount += BlockSize; + continue; + } + } + } + } + TryRemoveFile(m_LogOutput, BlockDirContent.Files[Index]); + } + + m_CacheMappingStats.CacheScanElapsedWallTimeUs += CacheTimer.GetElapsedTimeUs(); +} + +std::vector<BuildsOperationUpdateFolder::ScavengeSource> +BuildsOperationUpdateFolder::FindScavengeSources() +{ + ZEN_TRACE_CPU("FindScavengeSources"); + + std::vector<ScavengeSource> Result; + DirectoryContent Content; + GetDirectoryContent(m_Options.SystemRootDir / "builds" / "downloads", DirectoryContentFlags::IncludeFiles, Content); + for (const std::filesystem::path& EntryPath : Content.Files) + { + bool DeleteEntry = false; + IoHash EntryPathHash; + if (IoHash::TryParse(EntryPath.stem().string(), EntryPathHash)) + { + // Read state and verify that it is valid + IoBuffer MetaDataJson = ReadFile(EntryPath).Flatten(); + std::string_view Json(reinterpret_cast<const char*>(MetaDataJson.GetData()), MetaDataJson.GetSize()); + std::string JsonError; + CbObject DownloadInfo = LoadCompactBinaryFromJson(Json, JsonError).AsObject(); + if (JsonError.empty()) + { + std::filesystem::path StateFilePath = DownloadInfo["statePath"].AsU8String(); + if (IsFile(StateFilePath)) + { + std::filesystem::path Path = DownloadInfo["path"].AsU8String(); + if (!std::filesystem::equivalent(Path, m_Path)) + { + if (IsDir(Path)) + { + Result.push_back({.StateFilePath = std::move(StateFilePath), .Path = std::move(Path)}); + } + else + { + DeleteEntry = true; + } + } + } + else + { + DeleteEntry = true; + } + } + else + { + LOG_OUTPUT_WARN(m_LogOutput, "Invalid download state file at {}. '{}'", EntryPath, JsonError); + DeleteEntry = true; + } + } + + if (DeleteEntry) + { + std::error_code DummyEc; + std::filesystem::remove(EntryPath, DummyEc); + } + } + return Result; +} + +std::vector<uint32_t> +BuildsOperationUpdateFolder::ScanTargetFolder(const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& CachedChunkHashesFound, + const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& CachedSequenceHashesFound) +{ + ZEN_TRACE_CPU("ScanTargetFolder"); + + Stopwatch LocalTimer; + + std::vector<uint32_t> MissingSequenceIndexes; + + for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < m_RemoteContent.ChunkedContent.SequenceRawHashes.size(); + RemoteSequenceIndex++) + { + const IoHash& RemoteSequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + const uint32_t RemotePathIndex = GetFirstPathIndexForSeqeuenceIndex(m_RemoteLookup, RemoteSequenceIndex); + const uint64_t RemoteRawSize = m_RemoteContent.RawSizes[RemotePathIndex]; + if (auto CacheSequenceIt = CachedSequenceHashesFound.find(RemoteSequenceRawHash); + CacheSequenceIt != CachedSequenceHashesFound.end()) + { + const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RemoteSequenceRawHash); + ZEN_ASSERT_SLOW(IsFile(CacheFilePath)); + if (m_Options.IsVerbose) + { + LOG_OUTPUT(m_LogOutput, "Found sequence {} at {} ({})", RemoteSequenceRawHash, CacheFilePath, NiceBytes(RemoteRawSize)); + } + } + else if (auto CacheChunkIt = CachedChunkHashesFound.find(RemoteSequenceRawHash); CacheChunkIt != CachedChunkHashesFound.end()) + { + const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RemoteSequenceRawHash); + ZEN_ASSERT_SLOW(IsFile(CacheFilePath)); + if (m_Options.IsVerbose) + { + LOG_OUTPUT(m_LogOutput, "Found chunk {} at {} ({})", RemoteSequenceRawHash, CacheFilePath, NiceBytes(RemoteRawSize)); + } + } + else if (auto It = m_LocalLookup.RawHashToSequenceIndex.find(RemoteSequenceRawHash); + It != m_LocalLookup.RawHashToSequenceIndex.end()) + { + const uint32_t LocalSequenceIndex = It->second; + const uint32_t LocalPathIndex = GetFirstPathIndexForSeqeuenceIndex(m_LocalLookup, LocalSequenceIndex); + const std::filesystem::path LocalFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred(); + ZEN_ASSERT_SLOW(IsFile(LocalFilePath)); + m_CacheMappingStats.LocalPathsMatchingSequencesCount++; + m_CacheMappingStats.LocalPathsMatchingSequencesByteCount += RemoteRawSize; + if (m_Options.IsVerbose) + { + LOG_OUTPUT(m_LogOutput, "Found sequence {} at {} ({})", RemoteSequenceRawHash, LocalFilePath, NiceBytes(RemoteRawSize)); + } + } + else + { + MissingSequenceIndexes.push_back(RemoteSequenceIndex); + } + } + + m_CacheMappingStats.LocalScanElapsedWallTimeUs += LocalTimer.GetElapsedTimeUs(); + return MissingSequenceIndexes; +} + +bool +BuildsOperationUpdateFolder::FindScavengeContent(const ScavengeSource& Source, + ChunkedFolderContent& OutScavengedLocalContent, + ChunkedContentLookup& OutScavengedLookup) +{ + ZEN_TRACE_CPU("FindScavengeContent"); + + FolderContent LocalFolderState; + if (!ReadStateFile(m_LogOutput, Source.StateFilePath, LocalFolderState, OutScavengedLocalContent)) + { + return false; + } + if (!IsDir(Source.Path)) + { + return false; + } + + OutScavengedLookup = BuildChunkedContentLookup(OutScavengedLocalContent); + + std::vector<uint32_t> PathIndexesToScavange; + uint32_t ScavengedStatePathCount = gsl::narrow<uint32_t>(OutScavengedLocalContent.Paths.size()); + PathIndexesToScavange.reserve(ScavengedStatePathCount); + for (uint32_t ScavengedStatePathIndex = 0; ScavengedStatePathIndex < ScavengedStatePathCount; ScavengedStatePathIndex++) + { + const IoHash& SequenceHash = OutScavengedLocalContent.RawHashes[ScavengedStatePathIndex]; + if (auto ScavengeSequenceIt = OutScavengedLookup.RawHashToSequenceIndex.find(SequenceHash); + ScavengeSequenceIt != OutScavengedLookup.RawHashToSequenceIndex.end()) + { + const uint32_t ScavengeSequenceIndex = ScavengeSequenceIt->second; + if (m_RemoteLookup.RawHashToSequenceIndex.contains(SequenceHash)) + { + PathIndexesToScavange.push_back(ScavengedStatePathIndex); + } + else + { + const uint32_t ScavengeChunkCount = OutScavengedLocalContent.ChunkedContent.ChunkCounts[ScavengeSequenceIndex]; + for (uint32_t ScavengeChunkIndexOffset = 0; ScavengeChunkIndexOffset < ScavengeChunkCount; ScavengeChunkIndexOffset++) + { + const size_t ScavengeChunkOrderIndex = + OutScavengedLookup.ChunkSequenceLocationOffset[ScavengeSequenceIndex] + ScavengeChunkIndexOffset; + const uint32_t ScavengeChunkIndex = OutScavengedLocalContent.ChunkedContent.ChunkOrders[ScavengeChunkOrderIndex]; + const IoHash& ScavengeChunkHash = OutScavengedLocalContent.ChunkedContent.ChunkHashes[ScavengeChunkIndex]; + if (m_RemoteLookup.ChunkHashToChunkIndex.contains(ScavengeChunkHash)) + { + PathIndexesToScavange.push_back(ScavengedStatePathIndex); + break; + } + } + } + } + } + + if (PathIndexesToScavange.empty()) + { + OutScavengedLookup = {}; + OutScavengedLocalContent = {}; + return false; + } + + std::vector<std::filesystem::path> PathsToScavenge; + PathsToScavenge.reserve(PathIndexesToScavange.size()); + for (uint32_t ScavengedStatePathIndex : PathIndexesToScavange) + { + PathsToScavenge.push_back(OutScavengedLocalContent.Paths[ScavengedStatePathIndex]); + } + + FolderContent ValidFolderContent = GetValidFolderContent(m_ScavengedFolderScanStats, Source.Path, PathsToScavenge, {}); + + if (!LocalFolderState.AreKnownFilesEqual(ValidFolderContent)) + { + std::vector<std::filesystem::path> DeletedPaths; + FolderContent UpdatedContent = GetUpdatedContent(LocalFolderState, ValidFolderContent, DeletedPaths); + + // If the files are modified since the state was saved we ignore the files since we don't + // want to incur the cost of scanning/hashing scavenged files + DeletedPaths.insert(DeletedPaths.end(), UpdatedContent.Paths.begin(), UpdatedContent.Paths.end()); + if (!DeletedPaths.empty()) + { + OutScavengedLocalContent = DeletePathsFromChunkedContent(OutScavengedLocalContent, OutScavengedLookup, DeletedPaths); + OutScavengedLookup = BuildChunkedContentLookup(OutScavengedLocalContent); + } + } + + if (OutScavengedLocalContent.Paths.empty()) + { + OutScavengedLookup = {}; + OutScavengedLocalContent = {}; + return false; + } + + return true; +} + +std::filesystem::path +BuildsOperationUpdateFolder::FindDownloadedChunk(const IoHash& ChunkHash) +{ + ZEN_TRACE_CPU("FindDownloadedChunk"); + + std::filesystem::path CompressedChunkPath = ZenTempDownloadFolderPath(m_Options.ZenFolderPath) / ChunkHash.ToHexString(); + if (IsFile(CompressedChunkPath)) + { + IoBuffer ExistingCompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); + if (ExistingCompressedPart) + { + IoHash RawHash; + uint64_t RawSize; + if (CompressedBuffer::ValidateCompressedHeader(ExistingCompressedPart, RawHash, RawSize)) + { + return CompressedChunkPath; + } + else + { + std::error_code DummyEc; + RemoveFile(CompressedChunkPath, DummyEc); + } + } + } + return {}; +} + +FolderContent +BuildsOperationUpdateFolder::GetValidFolderContent(GetFolderContentStatistics& LocalFolderScanStats, + const std::filesystem::path& Path, + std::span<const std::filesystem::path> PathsToCheck, + std::function<void(uint64_t PathCount, uint64_t CompletedPathCount)>&& ProgressCallback) +{ + ZEN_TRACE_CPU("GetValidFolderContent"); + + FolderContent Result; + const uint32_t PathCount = gsl::narrow<uint32_t>(PathsToCheck.size()); + + Result.Paths.resize(PathCount); + Result.RawSizes.resize(PathCount); + Result.Attributes.resize(PathCount); + Result.ModificationTicks.resize(PathCount); + + { + Stopwatch Timer; + auto _ = MakeGuard([&LocalFolderScanStats, &Timer]() { LocalFolderScanStats.ElapsedWallTimeUS = Timer.GetElapsedTimeUs(); }); + + ParallelWork Work(m_AbortFlag, + m_PauseFlag, + ProgressCallback ? WorkerThreadPool::EMode::EnableBacklog : WorkerThreadPool::EMode::DisableBacklog); + std::atomic<uint64_t> CompletedPathCount = 0; + uint32_t PathIndex = 0; + + while (PathIndex < PathCount) + { + uint32_t PathRangeCount = Min(128u, PathCount - PathIndex); + Work.ScheduleWork(m_IOWorkerPool, + [PathIndex, PathRangeCount, &PathsToCheck, &Path, &Result, &CompletedPathCount, &LocalFolderScanStats]( + std::atomic<bool>& AbortFlag) { + if (!AbortFlag) + { + ZEN_TRACE_CPU("Async_ValidateFiles"); + + for (uint32_t PathRangeIndex = PathIndex; PathRangeIndex < PathIndex + PathRangeCount; + PathRangeIndex++) + { + const std::filesystem::path& FilePath = PathsToCheck[PathRangeIndex]; + std::filesystem::path LocalFilePath = (Path / FilePath).make_preferred(); + if (TryGetFileProperties(LocalFilePath, + Result.RawSizes[PathRangeIndex], + Result.ModificationTicks[PathRangeIndex], + Result.Attributes[PathRangeIndex])) + { + Result.Paths[PathRangeIndex] = std::move(FilePath); + LocalFolderScanStats.FoundFileCount++; + LocalFolderScanStats.FoundFileByteCount += Result.RawSizes[PathRangeIndex]; + LocalFolderScanStats.AcceptedFileCount++; + LocalFolderScanStats.AcceptedFileByteCount += Result.RawSizes[PathRangeIndex]; + } + CompletedPathCount++; + } + } + }); + PathIndex += PathRangeCount; + } + Work.Wait(200, [&](bool, bool, ptrdiff_t) { + if (ProgressCallback) + { + ProgressCallback(PathCount, CompletedPathCount.load()); + } + }); + } + + uint32_t WritePathIndex = 0; + for (uint32_t ReadPathIndex = 0; ReadPathIndex < PathCount; ReadPathIndex++) + { + if (!Result.Paths[ReadPathIndex].empty()) + { + if (WritePathIndex < ReadPathIndex) + { + Result.Paths[WritePathIndex] = std::move(Result.Paths[ReadPathIndex]); + Result.RawSizes[WritePathIndex] = Result.RawSizes[ReadPathIndex]; + Result.Attributes[WritePathIndex] = Result.Attributes[ReadPathIndex]; + Result.ModificationTicks[WritePathIndex] = Result.ModificationTicks[ReadPathIndex]; + } + WritePathIndex++; + } + } + + Result.Paths.resize(WritePathIndex); + Result.RawSizes.resize(WritePathIndex); + Result.Attributes.resize(WritePathIndex); + Result.ModificationTicks.resize(WritePathIndex); + return Result; +} + +std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> +BuildsOperationUpdateFolder::GetRemainingChunkTargets(std::span<const std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + uint32_t ChunkIndex) +{ + ZEN_TRACE_CPU("GetRemainingChunkTargets"); + + std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkSources = GetChunkSequenceLocations(m_RemoteLookup, ChunkIndex); + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs; + if (!ChunkSources.empty()) + { + ChunkTargetPtrs.reserve(ChunkSources.size()); + for (const ChunkedContentLookup::ChunkSequenceLocation& Source : ChunkSources) + { + if (SequenceIndexChunksLeftToWriteCounters[Source.SequenceIndex].load() > 0) + { + ChunkTargetPtrs.push_back(&Source); + } + } + } + return ChunkTargetPtrs; +}; + +uint64_t +BuildsOperationUpdateFolder::GetChunkWriteCount(std::span<const std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + uint32_t ChunkIndex) +{ + ZEN_TRACE_CPU("GetChunkWriteCount"); + + uint64_t WriteCount = 0; + std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkSources = GetChunkSequenceLocations(m_RemoteLookup, ChunkIndex); + for (const ChunkedContentLookup::ChunkSequenceLocation& Source : ChunkSources) + { + if (SequenceIndexChunksLeftToWriteCounters[Source.SequenceIndex].load() > 0) + { + WriteCount++; + } + } + return WriteCount; +}; + +void +BuildsOperationUpdateFolder::WriteScavengedSequenceToCache(const std::filesystem::path& ScavengeRootPath, + const ChunkedFolderContent& ScavengedContent, + const ScavengedSequenceCopyOperation& ScavengeOp) +{ + ZEN_TRACE_CPU("WriteScavengedSequenceToCache"); + + const std::filesystem::path ScavengedPath = ScavengedContent.Paths[ScavengeOp.ScavengedPathIndex]; + const std::filesystem::path ScavengedFilePath = (ScavengeRootPath / ScavengedPath).make_preferred(); + ZEN_ASSERT_SLOW(FileSizeFromPath(ScavengedFilePath) == ScavengeOp.RawSize); + + const IoHash& RemoteSequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[ScavengeOp.RemoteSequenceIndex]; + const std::filesystem::path TempFilePath = GetTempChunkedSequenceFileName(m_CacheFolderPath, RemoteSequenceRawHash); + + const uint64_t RawSize = ScavengedContent.RawSizes[ScavengeOp.ScavengedPathIndex]; + CopyFile(m_Options.AllowFileClone, + m_Options.UseSparseFiles, + ScavengedFilePath, + TempFilePath, + RawSize, + m_DiskStats.WriteCount, + m_DiskStats.WriteByteCount, + m_DiskStats.CloneCount, + m_DiskStats.CloneByteCount); + + const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RemoteSequenceRawHash); + RenameFile(TempFilePath, CacheFilePath); +} + +std::vector<uint32_t> +BuildsOperationUpdateFolder::WriteLocalChunkToCache(CloneQueryInterface* CloneQuery, + const CopyChunkData& CopyData, + const std::vector<ChunkedFolderContent>& ScavengedContents, + const std::vector<ChunkedContentLookup>& ScavengedLookups, + const std::vector<std::filesystem::path>& ScavengedPaths, + BufferedWriteFileCache& WriteCache) +{ + ZEN_TRACE_CPU("WriteLocalChunkToCache"); + + std::filesystem::path SourceFilePath; + + if (CopyData.ScavengeSourceIndex == (uint32_t)-1) + { + const uint32_t LocalPathIndex = m_LocalLookup.SequenceIndexFirstPathIndex[CopyData.SourceSequenceIndex]; + SourceFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred(); + } + else + { + const ChunkedFolderContent& ScavengedContent = ScavengedContents[CopyData.ScavengeSourceIndex]; + const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[CopyData.ScavengeSourceIndex]; + const std::filesystem::path ScavengedPath = ScavengedPaths[CopyData.ScavengeSourceIndex]; + const uint32_t ScavengedPathIndex = ScavengedLookup.SequenceIndexFirstPathIndex[CopyData.SourceSequenceIndex]; + SourceFilePath = (ScavengedPath / ScavengedContent.Paths[ScavengedPathIndex]).make_preferred(); + } + ZEN_ASSERT_SLOW(IsFile(SourceFilePath)); + ZEN_ASSERT(!CopyData.TargetChunkLocationPtrs.empty()); + + uint64_t CacheLocalFileBytesRead = 0; + + size_t TargetStart = 0; + const std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> AllTargets(CopyData.TargetChunkLocationPtrs); + + struct WriteOp + { + const ChunkedContentLookup::ChunkSequenceLocation* Target = nullptr; + uint64_t CacheFileOffset = (uint64_t)-1; + uint32_t ChunkIndex = (uint32_t)-1; + }; + + std::vector<WriteOp> WriteOps; + + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Sort"); + WriteOps.reserve(AllTargets.size()); + for (const CopyChunkData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets) + { + std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> TargetRange = + AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount); + for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange) + { + WriteOps.push_back( + WriteOp{.Target = Target, .CacheFileOffset = ChunkTarget.CacheFileOffset, .ChunkIndex = ChunkTarget.RemoteChunkIndex}); + } + TargetStart += ChunkTarget.TargetChunkLocationCount; + } + + std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) { + if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex) + { + return true; + } + else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex) + { + return false; + } + if (Lhs.Target->Offset < Rhs.Target->Offset) + { + return true; + } + return false; + }); + } + + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Write"); + + tsl::robin_set<uint32_t> ChunkIndexesWritten; + + BufferedOpenFile SourceFile(SourceFilePath, + m_DiskStats.OpenReadCount, + m_DiskStats.CurrentOpenFileCount, + m_DiskStats.ReadCount, + m_DiskStats.ReadByteCount); + + bool CanCloneSource = CloneQuery && CloneQuery->CanClone(SourceFile.Handle()); + + BufferedWriteFileCache::Local LocalWriter(WriteCache); + + for (size_t WriteOpIndex = 0; WriteOpIndex < WriteOps.size();) + { + if (m_AbortFlag) + { + break; + } + const WriteOp& Op = WriteOps[WriteOpIndex]; + + const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex; + const uint32_t RemotePathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]; + const uint64_t TargetSize = m_RemoteContent.RawSizes[RemotePathIndex]; + const uint64_t ChunkSize = m_RemoteContent.ChunkedContent.ChunkRawSizes[Op.ChunkIndex]; + + uint64_t ReadLength = ChunkSize; + size_t WriteCount = 1; + uint64_t OpSourceEnd = Op.CacheFileOffset + ChunkSize; + uint64_t OpTargetEnd = Op.Target->Offset + ChunkSize; + while ((WriteOpIndex + WriteCount) < WriteOps.size()) + { + const WriteOp& NextOp = WriteOps[WriteOpIndex + WriteCount]; + if (NextOp.Target->SequenceIndex != Op.Target->SequenceIndex) + { + break; + } + if (NextOp.Target->Offset != OpTargetEnd) + { + break; + } + if (NextOp.CacheFileOffset != OpSourceEnd) + { + break; + } + const uint64_t NextChunkLength = m_RemoteContent.ChunkedContent.ChunkRawSizes[NextOp.ChunkIndex]; + if (ReadLength + NextChunkLength > 512u * 1024u) + { + break; + } + ReadLength += NextChunkLength; + OpSourceEnd += NextChunkLength; + OpTargetEnd += NextChunkLength; + WriteCount++; + } + + { + bool DidClone = false; + + if (CanCloneSource) + { + uint64_t PreBytes = 0; + uint64_t PostBytes = 0; + uint64_t ClonableBytes = + CloneQuery->GetClonableRange(Op.CacheFileOffset, Op.Target->Offset, ReadLength, PreBytes, PostBytes); + if (ClonableBytes > 0) + { + // We need to open the file... + BufferedWriteFileCache::Local::Writer* Writer = LocalWriter.GetWriter(RemoteSequenceIndex); + if (!Writer) + { + Writer = LocalWriter.PutWriter(RemoteSequenceIndex, std::make_unique<BufferedWriteFileCache::Local::Writer>()); + + Writer->File = std::make_unique<BasicFile>(); + + const std::filesystem::path FileName = + GetTempChunkedSequenceFileName(m_CacheFolderPath, + m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]); + Writer->File->Open(FileName, BasicFile::Mode::kWrite); + if (m_Options.UseSparseFiles) + { + PrepareFileForScatteredWrite(Writer->File->Handle(), TargetSize); + } + } + DidClone = CloneQuery->TryClone(SourceFile.Handle(), + Writer->File->Handle(), + Op.CacheFileOffset + PreBytes, + Op.Target->Offset + PreBytes, + ClonableBytes, + TargetSize); + if (DidClone) + { + m_DiskStats.WriteCount++; + m_DiskStats.WriteByteCount += ClonableBytes; + + m_DiskStats.CloneCount++; + m_DiskStats.CloneByteCount += ClonableBytes; + + if (PreBytes > 0) + { + CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, PreBytes); + const uint64_t FileOffset = Op.Target->Offset; + + WriteSequenceChunkToCache(LocalWriter, ChunkSource, RemoteSequenceIndex, FileOffset, RemotePathIndex); + } + if (PostBytes > 0) + { + CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset + ReadLength - PostBytes, PostBytes); + const uint64_t FileOffset = Op.Target->Offset + ReadLength - PostBytes; + + WriteSequenceChunkToCache(LocalWriter, ChunkSource, RemoteSequenceIndex, FileOffset, RemotePathIndex); + } + } + } + } + + if (!DidClone) + { + CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ReadLength); + + const uint64_t FileOffset = Op.Target->Offset; + + WriteSequenceChunkToCache(LocalWriter, ChunkSource, RemoteSequenceIndex, FileOffset, RemotePathIndex); + } + } + + CacheLocalFileBytesRead += ReadLength; // TODO: This should be the sum of unique chunk sizes? + + WriteOpIndex += WriteCount; + } + } + + if (m_Options.IsVerbose) + { + LOG_OUTPUT(m_LogOutput, "Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), SourceFilePath); + } + + std::vector<uint32_t> Result; + Result.reserve(WriteOps.size()); + + for (const WriteOp& Op : WriteOps) + { + Result.push_back(Op.Target->SequenceIndex); + } + return Result; +} + +bool +BuildsOperationUpdateFolder::WriteCompressedChunkToCache( + const IoHash& ChunkHash, + const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs, + BufferedWriteFileCache& WriteCache, + IoBuffer&& CompressedPart) +{ + ZEN_TRACE_CPU("WriteCompressedChunkToCache"); + + auto ChunkHashToChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); + ZEN_ASSERT(ChunkHashToChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end()); + if (IsSingleFileChunk(m_RemoteContent, ChunkTargetPtrs)) + { + const std::uint32_t SequenceIndex = ChunkTargetPtrs.front()->SequenceIndex; + const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]; + StreamDecompress(m_AbortFlag, m_CacheFolderPath, SequenceRawHash, CompositeBuffer(std::move(CompressedPart)), m_DiskStats); + return false; + } + else + { + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompositeBuffer(std::move(CompressedPart)), RawHash, RawSize); + if (!Compressed) + { + throw std::runtime_error(fmt::format("Failed to parse header of compressed large blob {}", ChunkHash)); + } + if (RawHash != ChunkHash) + { + throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, ChunkHash)); + } + + BufferedWriteFileCache::Local LocalWriter(WriteCache); + + IoHashStream Hash; + bool CouldDecompress = Compressed.DecompressToStream( + 0, + (uint64_t)-1, + [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { + ZEN_UNUSED(SourceOffset); + ZEN_TRACE_CPU("Async_StreamDecompress_Write"); + m_DiskStats.ReadByteCount += SourceSize; + if (!m_AbortFlag) + { + for (const ChunkedContentLookup::ChunkSequenceLocation* TargetPtr : ChunkTargetPtrs) + { + const auto& Target = *TargetPtr; + const uint64_t FileOffset = Target.Offset + Offset; + const uint32_t SequenceIndex = Target.SequenceIndex; + const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; + + WriteSequenceChunkToCache(LocalWriter, RangeBuffer, SequenceIndex, FileOffset, PathIndex); + } + + return true; + } + return false; + }); + + if (m_AbortFlag) + { + return false; + } + + if (!CouldDecompress) + { + throw std::runtime_error(fmt::format("Failed to decompress large chunk {}", ChunkHash)); + } + + return true; + } +} + +void +BuildsOperationUpdateFolder::WriteSequenceChunkToCache(BufferedWriteFileCache::Local& LocalWriter, + const CompositeBuffer& Chunk, + const uint32_t SequenceIndex, + const uint64_t FileOffset, + const uint32_t PathIndex) +{ + ZEN_TRACE_CPU("WriteSequenceChunkToCache"); + + const uint64_t SequenceSize = m_RemoteContent.RawSizes[PathIndex]; + + auto OpenFile = [&](BasicFile& File) { + const std::filesystem::path FileName = + GetTempChunkedSequenceFileName(m_CacheFolderPath, m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); + File.Open(FileName, BasicFile::Mode::kWrite); + if (m_Options.UseSparseFiles) + { + PrepareFileForScatteredWrite(File.Handle(), SequenceSize); + } + }; + + const uint64_t ChunkSize = Chunk.GetSize(); + ZEN_ASSERT(FileOffset + ChunkSize <= SequenceSize); + if (ChunkSize == SequenceSize) + { + BasicFile SingleChunkFile; + OpenFile(SingleChunkFile); + + m_DiskStats.CurrentOpenFileCount++; + auto _ = MakeGuard([this]() { m_DiskStats.CurrentOpenFileCount--; }); + SingleChunkFile.Write(Chunk, FileOffset); + + m_DiskStats.WriteCount++; + m_DiskStats.WriteByteCount += ChunkSize; + } + else + { + const uint64_t MaxWriterBufferSize = 256u * 1025u; + + BufferedWriteFileCache::Local::Writer* Writer = LocalWriter.GetWriter(SequenceIndex); + if (Writer) + { + if ((!Writer->Writer) && (ChunkSize < MaxWriterBufferSize)) + { + Writer->Writer = std::make_unique<BasicFileWriter>(*Writer->File, Min(SequenceSize, MaxWriterBufferSize)); + } + Writer->Write(Chunk, FileOffset); + + m_DiskStats.WriteCount++; + m_DiskStats.WriteByteCount += ChunkSize; + } + else + { + Writer = LocalWriter.PutWriter(SequenceIndex, std::make_unique<BufferedWriteFileCache::Local::Writer>()); + + Writer->File = std::make_unique<BasicFile>(); + OpenFile(*Writer->File); + if (ChunkSize < MaxWriterBufferSize) + { + Writer->Writer = std::make_unique<BasicFileWriter>(*Writer->File, Min(SequenceSize, MaxWriterBufferSize)); + } + Writer->Write(Chunk, FileOffset); + + m_DiskStats.WriteCount++; + m_DiskStats.WriteByteCount += ChunkSize; + } + } +} + +bool +BuildsOperationUpdateFolder::GetBlockWriteOps(std::span<const IoHash> ChunkRawHashes, + std::span<const uint32_t> ChunkCompressedLengths, + std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, + const MemoryView BlockView, + uint32_t FirstIncludedBlockChunkIndex, + uint32_t LastIncludedBlockChunkIndex, + BlockWriteOps& OutOps) +{ + ZEN_TRACE_CPU("GetBlockWriteOps"); + + uint32_t OffsetInBlock = 0; + for (uint32_t ChunkBlockIndex = FirstIncludedBlockChunkIndex; ChunkBlockIndex <= LastIncludedBlockChunkIndex; ChunkBlockIndex++) + { + const uint32_t ChunkCompressedSize = ChunkCompressedLengths[ChunkBlockIndex]; + const IoHash& ChunkHash = ChunkRawHashes[ChunkBlockIndex]; + if (auto It = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); It != m_RemoteLookup.ChunkHashToChunkIndex.end()) + { + const uint32_t ChunkIndex = It->second; + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = + GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, ChunkIndex); + + if (!ChunkTargetPtrs.empty()) + { + bool NeedsWrite = true; + if (RemoteChunkIndexNeedsCopyFromSourceFlags[ChunkIndex].compare_exchange_strong(NeedsWrite, false)) + { + MemoryView ChunkMemoryView = BlockView.Mid(OffsetInBlock, ChunkCompressedSize); + IoHash VerifyChunkHash; + uint64_t VerifyChunkSize; + CompressedBuffer CompressedChunk = + CompressedBuffer::FromCompressed(SharedBuffer::MakeView(ChunkMemoryView), VerifyChunkHash, VerifyChunkSize); + ZEN_ASSERT(CompressedChunk); + ZEN_ASSERT(VerifyChunkHash == ChunkHash); + ZEN_ASSERT(VerifyChunkSize == m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]); + + OodleCompressor ChunkCompressor; + OodleCompressionLevel ChunkCompressionLevel; + uint64_t ChunkBlockSize; + + bool GetCompressParametersSuccess = + CompressedChunk.TryGetCompressParameters(ChunkCompressor, ChunkCompressionLevel, ChunkBlockSize); + ZEN_ASSERT(GetCompressParametersSuccess); + + IoBuffer Decompressed; + if (ChunkCompressionLevel == OodleCompressionLevel::None) + { + MemoryView ChunkDecompressedMemoryView = ChunkMemoryView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder()); + Decompressed = + IoBuffer(IoBuffer::Wrap, ChunkDecompressedMemoryView.GetData(), ChunkDecompressedMemoryView.GetSize()); + } + else + { + Decompressed = CompressedChunk.Decompress().AsIoBuffer(); + } + ZEN_ASSERT(Decompressed.GetSize() == m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]); + ZEN_ASSERT_SLOW(ChunkHash == IoHash::HashBuffer(Decompressed)); + for (const ChunkedContentLookup::ChunkSequenceLocation* Target : ChunkTargetPtrs) + { + OutOps.WriteOps.push_back( + BlockWriteOps::WriteOpData{.Target = Target, .ChunkBufferIndex = OutOps.ChunkBuffers.size()}); + } + OutOps.ChunkBuffers.emplace_back(std::move(Decompressed)); + } + } + } + + OffsetInBlock += ChunkCompressedSize; + } + { + ZEN_TRACE_CPU("Sort"); + std::sort(OutOps.WriteOps.begin(), + OutOps.WriteOps.end(), + [](const BlockWriteOps::WriteOpData& Lhs, const BlockWriteOps::WriteOpData& Rhs) { + if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex) + { + return true; + } + if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex) + { + return false; + } + return Lhs.Target->Offset < Rhs.Target->Offset; + }); + } + return true; +} + +void +BuildsOperationUpdateFolder::WriteBlockChunkOpsToCache(std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + const BlockWriteOps& Ops, + BufferedWriteFileCache& WriteCache, + ParallelWork& Work) +{ + ZEN_TRACE_CPU("WriteBlockChunkOpsToCache"); + + { + BufferedWriteFileCache::Local LocalWriter(WriteCache); + for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps) + { + if (Work.IsAborted()) + { + break; + } + const CompositeBuffer& Chunk = Ops.ChunkBuffers[WriteOp.ChunkBufferIndex]; + const uint32_t SequenceIndex = WriteOp.Target->SequenceIndex; + ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() <= + m_RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]); + ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() > 0); + const uint64_t FileOffset = WriteOp.Target->Offset; + const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; + + WriteSequenceChunkToCache(LocalWriter, Chunk, SequenceIndex, FileOffset, PathIndex); + } + } + if (!Work.IsAborted()) + { + // Write tracking, updating this must be done without any files open (BufferedWriteFileCache::Local) + std::vector<uint32_t> CompletedChunkSequences; + for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps) + { + const uint32_t RemoteSequenceIndex = WriteOp.Target->SequenceIndex; + if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) + { + CompletedChunkSequences.push_back(RemoteSequenceIndex); + } + } + WriteCache.Close(CompletedChunkSequences); + VerifyAndCompleteChunkSequencesAsync(CompletedChunkSequences, Work); + } +} + +bool +BuildsOperationUpdateFolder::WriteChunksBlockToCache(const ChunkBlockDescription& BlockDescription, + std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + ParallelWork& Work, + CompositeBuffer&& BlockBuffer, + std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, + BufferedWriteFileCache& WriteCache) +{ + ZEN_TRACE_CPU("WriteChunksBlockToCache"); + + IoBuffer BlockMemoryBuffer = MakeBufferMemoryBased(BlockBuffer); + const MemoryView BlockView = BlockMemoryBuffer.GetView(); + + BlockWriteOps Ops; + if ((BlockDescription.HeaderSize == 0) || BlockDescription.ChunkCompressedLengths.empty()) + { + ZEN_TRACE_CPU("WriteChunksBlockToCache_Legacy"); + + uint64_t HeaderSize; + const std::vector<uint32_t> ChunkCompressedLengths = + ReadChunkBlockHeader(BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder()), HeaderSize); + + if (GetBlockWriteOps(BlockDescription.ChunkRawHashes, + ChunkCompressedLengths, + SequenceIndexChunksLeftToWriteCounters, + RemoteChunkIndexNeedsCopyFromSourceFlags, + BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder() + HeaderSize), + 0, + gsl::narrow<uint32_t>(BlockDescription.ChunkRawHashes.size() - 1), + Ops)) + { + WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work); + return true; + } + return false; + } + + if (GetBlockWriteOps(BlockDescription.ChunkRawHashes, + BlockDescription.ChunkCompressedLengths, + SequenceIndexChunksLeftToWriteCounters, + RemoteChunkIndexNeedsCopyFromSourceFlags, + BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize), + 0, + gsl::narrow<uint32_t>(BlockDescription.ChunkRawHashes.size() - 1), + Ops)) + { + WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work); + return true; + } + return false; +} + +bool +BuildsOperationUpdateFolder::WritePartialBlockChunksToCache(const ChunkBlockDescription& BlockDescription, + std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + ParallelWork& Work, + CompositeBuffer&& PartialBlockBuffer, + uint32_t FirstIncludedBlockChunkIndex, + uint32_t LastIncludedBlockChunkIndex, + std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, + BufferedWriteFileCache& WriteCache) +{ + ZEN_TRACE_CPU("WritePartialBlockChunksToCache"); + + IoBuffer BlockMemoryBuffer = MakeBufferMemoryBased(PartialBlockBuffer); + const MemoryView BlockView = BlockMemoryBuffer.GetView(); + + BlockWriteOps Ops; + if (GetBlockWriteOps(BlockDescription.ChunkRawHashes, + BlockDescription.ChunkCompressedLengths, + SequenceIndexChunksLeftToWriteCounters, + RemoteChunkIndexNeedsCopyFromSourceFlags, + BlockView, + FirstIncludedBlockChunkIndex, + LastIncludedBlockChunkIndex, + Ops)) + { + WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work); + return true; + } + else + { + return false; + } +} + +void +BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::path& ZenFolderPath, + uint32_t RemoteChunkIndex, + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>&& ChunkTargetPtrs, + BufferedWriteFileCache& WriteCache, + ParallelWork& Work, + IoBuffer&& Payload, + std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + std::atomic<uint64_t>& WritePartsComplete, + const uint64_t TotalPartWriteCount, + FilteredRate& FilteredWrittenBytesPerSecond) +{ + ZEN_TRACE_CPU("AsyncWriteDownloadedChunk"); + + const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + + const uint64_t Size = Payload.GetSize(); + + std::filesystem::path CompressedChunkPath; + + // Check if the dowloaded chunk is file based and we can move it directly without rewriting it + { + IoBufferFileReference FileRef; + if (Payload.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == Size)) + { + ZEN_TRACE_CPU("MoveTempChunk"); + std::error_code Ec; + std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); + if (!Ec) + { + Payload.SetDeleteOnClose(false); + Payload = {}; + CompressedChunkPath = ZenTempDownloadFolderPath(ZenFolderPath) / ChunkHash.ToHexString(); + RenameFile(TempBlobPath, CompressedChunkPath, Ec); + if (Ec) + { + CompressedChunkPath = std::filesystem::path{}; + + // Re-open the temp file again + BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); + Payload = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, Size, true); + Payload.SetDeleteOnClose(true); + } + } + } + } + + if (CompressedChunkPath.empty() && (Size > 512u * 1024u)) + { + ZEN_TRACE_CPU("WriteTempChunk"); + // Could not be moved and rather large, lets store it on disk + CompressedChunkPath = ZenTempDownloadFolderPath(ZenFolderPath) / ChunkHash.ToHexString(); + TemporaryFile::SafeWriteFile(CompressedChunkPath, Payload); + Payload = {}; + } + + Work.ScheduleWork( + m_IOWorkerPool, + [&ZenFolderPath, + this, + SequenceIndexChunksLeftToWriteCounters, + &Work, + CompressedChunkPath, + RemoteChunkIndex, + TotalPartWriteCount, + &WriteCache, + &WritePartsComplete, + &FilteredWrittenBytesPerSecond, + ChunkTargetPtrs = std::move(ChunkTargetPtrs), + CompressedPart = std::move(Payload)](std::atomic<bool>&) mutable { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_WriteChunk"); + + FilteredWrittenBytesPerSecond.Start(); + + const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + if (CompressedChunkPath.empty()) + { + ZEN_ASSERT(CompressedPart); + } + else + { + ZEN_ASSERT(!CompressedPart); + CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); + if (!CompressedPart) + { + throw std::runtime_error( + fmt::format("Could not open dowloaded compressed chunk {} from {}", ChunkHash, CompressedChunkPath)); + } + } + + bool NeedHashVerify = WriteCompressedChunkToCache(ChunkHash, ChunkTargetPtrs, WriteCache, std::move(CompressedPart)); + if (!m_AbortFlag) + { + WritePartsComplete++; + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } + + if (!CompressedChunkPath.empty()) + { + TryRemoveFile(m_LogOutput, CompressedChunkPath); + } + + std::vector<uint32_t> CompletedSequences = + CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); + WriteCache.Close(CompletedSequences); + if (NeedHashVerify) + { + VerifyAndCompleteChunkSequencesAsync(CompletedSequences, Work); + } + else + { + FinalizeChunkSequences(CompletedSequences); + } + } + } + }); +} + +void +BuildsOperationUpdateFolder::VerifyAndCompleteChunkSequencesAsync(std::span<const uint32_t> RemoteSequenceIndexes, ParallelWork& Work) +{ + if (RemoteSequenceIndexes.empty()) + { + return; + } + ZEN_TRACE_CPU("VerifyAndCompleteChunkSequence"); + for (uint32_t RemoteSequenceIndexOffset = 1; RemoteSequenceIndexOffset < RemoteSequenceIndexes.size(); RemoteSequenceIndexOffset++) + { + const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset]; + Work.ScheduleWork(m_IOWorkerPool, [this, RemoteSequenceIndex](std::atomic<bool>&) { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_VerifyAndFinalizeSequence"); + + VerifySequence(RemoteSequenceIndex); + if (!m_AbortFlag) + { + const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + FinalizeChunkSequence(SequenceRawHash); + } + } + }); + } + const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[0]; + + VerifySequence(RemoteSequenceIndex); + const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + FinalizeChunkSequence(SequenceRawHash); +} + +bool +BuildsOperationUpdateFolder::CompleteSequenceChunk(uint32_t RemoteSequenceIndex, + std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters) +{ + uint32_t PreviousValue = SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1); + ZEN_ASSERT(PreviousValue >= 1); + ZEN_ASSERT(PreviousValue != (uint32_t)-1); + return PreviousValue == 1; +} + +std::vector<uint32_t> +BuildsOperationUpdateFolder::CompleteChunkTargets(const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs, + std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters) +{ + ZEN_TRACE_CPU("CompleteChunkTargets"); + + std::vector<uint32_t> CompletedSequenceIndexes; + for (const ChunkedContentLookup::ChunkSequenceLocation* Location : ChunkTargetPtrs) + { + const uint32_t RemoteSequenceIndex = Location->SequenceIndex; + if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) + { + CompletedSequenceIndexes.push_back(RemoteSequenceIndex); + } + } + return CompletedSequenceIndexes; +} + +void +BuildsOperationUpdateFolder::FinalizeChunkSequence(const IoHash& SequenceRawHash) +{ + ZEN_TRACE_CPU("FinalizeChunkSequence"); + + ZEN_ASSERT_SLOW(!IsFile(GetFinalChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash))); + std::error_code Ec; + RenameFile(GetTempChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash), + GetFinalChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash), + Ec); + if (Ec) + { + throw std::system_error(Ec); + } +} + +void +BuildsOperationUpdateFolder::FinalizeChunkSequences(std::span<const uint32_t> RemoteSequenceIndexes) +{ + ZEN_TRACE_CPU("FinalizeChunkSequences"); + + for (uint32_t SequenceIndex : RemoteSequenceIndexes) + { + FinalizeChunkSequence(m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); + } +} + +void +BuildsOperationUpdateFolder::VerifySequence(uint32_t RemoteSequenceIndex) +{ + ZEN_TRACE_CPU("VerifySequence"); + + const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + { + ZEN_TRACE_CPU("HashSequence"); + const std::uint32_t RemotePathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]; + const uint64_t ExpectedSize = m_RemoteContent.RawSizes[RemotePathIndex]; + IoBuffer VerifyBuffer = IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash)); + const uint64_t VerifySize = VerifyBuffer.GetSize(); + if (VerifySize != ExpectedSize) + { + throw std::runtime_error(fmt::format("Written chunk sequence {} size {} does not match expected size {}", + SequenceRawHash, + VerifySize, + ExpectedSize)); + } + + const IoHash VerifyChunkHash = IoHash::HashBuffer(std::move(VerifyBuffer)); + if (VerifyChunkHash != SequenceRawHash) + { + throw std::runtime_error( + fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash)); + } + } +} + +void +DownloadLargeBlob(BuildStorage& Storage, + const std::filesystem::path& DownloadFolder, + const Oid& BuildId, + const IoHash& ChunkHash, + const std::uint64_t PreferredMultipartChunkSize, + ParallelWork& Work, + WorkerThreadPool& NetworkPool, + DownloadStatistics& DownloadStats, + std::function<void(IoBuffer&& Payload)>&& OnDownloadComplete) +{ + ZEN_TRACE_CPU("DownloadLargeBlob"); + + struct WorkloadData + { + TemporaryFile TempFile; + }; + std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>()); + + std::error_code Ec; + Workload->TempFile.CreateTemporary(DownloadFolder, Ec); + if (Ec) + { + throw std::runtime_error( + fmt::format("Failed opening temporary file '{}': {} ({})", Workload->TempFile.GetPath(), Ec.message(), Ec.value())); + } + std::vector<std::function<void()>> WorkItems = Storage.GetLargeBuildBlob( + BuildId, + ChunkHash, + PreferredMultipartChunkSize, + [&Work, Workload, &DownloadStats](uint64_t Offset, const IoBuffer& Chunk) { + DownloadStats.DownloadedChunkByteCount += Chunk.GetSize(); + + if (!Work.IsAborted()) + { + ZEN_TRACE_CPU("Async_DownloadLargeBlob_OnReceive"); + Workload->TempFile.Write(Chunk.GetView(), Offset); + } + }, + [&Work, Workload, &DownloadStats, OnDownloadComplete = std::move(OnDownloadComplete)]() { + DownloadStats.DownloadedChunkCount++; + if (!Work.IsAborted()) + { + ZEN_TRACE_CPU("Async_DownloadLargeBlob_OnComplete"); + + uint64_t PayloadSize = Workload->TempFile.FileSize(); + void* FileHandle = Workload->TempFile.Detach(); + ZEN_ASSERT(FileHandle != nullptr); + IoBuffer Payload(IoBuffer::File, FileHandle, 0, PayloadSize, true); + Payload.SetDeleteOnClose(true); + OnDownloadComplete(std::move(Payload)); + } + }); + if (!WorkItems.empty()) + { + DownloadStats.MultipartAttachmentCount++; + } + for (auto& WorkItem : WorkItems) + { + Work.ScheduleWork(NetworkPool, [WorkItem = std::move(WorkItem)](std::atomic<bool>& AbortFlag) { + if (!AbortFlag) + { + ZEN_TRACE_CPU("Async_DownloadLargeBlob_Work"); + + WorkItem(); + } + }); + } +} + +} // namespace zen |