// Copyright Epic Games, Inc. All Rights Reserved. #include "builds_cmd.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include #include #include ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_PLATFORM_WINDOWS # include #else # include # include # include # include #endif #define EXTRA_VERIFY 0 #define ZEN_CLOUD_STORAGE "Cloud Storage" namespace zen { namespace { static std::atomic AbortFlag = false; static void SignalCallbackHandler(int SigNum) { if (SigNum == SIGINT) { AbortFlag = true; } #if ZEN_PLATFORM_WINDOWS if (SigNum == SIGBREAK) { AbortFlag = true; } #endif // ZEN_PLATFORM_WINDOWS } using namespace std::literals; static const size_t DefaultMaxBlockSize = 64u * 1024u * 1024u; static const size_t DefaultMaxChunkEmbedSize = 3u * 512u * 1024u; struct ChunksBlockParameters { size_t MaxBlockSize = DefaultMaxBlockSize; size_t MaxChunkEmbedSize = DefaultMaxChunkEmbedSize; }; const ChunksBlockParameters DefaultChunksBlockParams{.MaxBlockSize = 32u * 1024u * 1024u, .MaxChunkEmbedSize = DefaultChunkedParams.MaxSize}; const uint64_t DefaultPreferredMultipartChunkSize = 32u * 1024u * 1024u; const double DefaultLatency = 0; // .0010; const double DefaultDelayPerKBSec = 0; // 0.00005; const std::string ZenFolderName = ".zen"; const std::string ZenStateFilePath = fmt::format("{}/current_state.cbo", ZenFolderName); const std::string ZenStateFileJsonPath = fmt::format("{}/current_state.json", ZenFolderName); const std::string ZenTempFolderName = fmt::format("{}/tmp", ZenFolderName); const std::string ZenTempCacheFolderName = fmt::format("{}/cache", ZenTempFolderName); // Decompressed and verified data - chunks & sequences const std::string ZenTempBlockFolderName = fmt::format("{}/blocks", ZenTempFolderName); // Temp storage for whole and partial blocks const std::string ZenTempChunkFolderName = fmt::format("{}/chunks", ZenTempFolderName); // Temp storage for decompressed and validated chunks const std::string ZenTempDownloadFolderName = fmt::format("{}/download", ZenTempFolderName); // Temp storage for unverfied downloaded blobs const std::string ZenTempStorageFolderName = fmt::format("{}/storage", ZenTempFolderName); // Temp storage folder for BuildStorage implementations const std::string ZenExcludeManifestName = ".zen_exclude_manifest.txt"; const std::string UnsyncFolderName = ".unsync"; const std::string UGSFolderName = ".ugs"; const std::string LegacyZenTempFolderName = ".zen-tmp"; const std::vector DefaultExcludeFolders({UnsyncFolderName, ZenFolderName, UGSFolderName, LegacyZenTempFolderName}); const std::vector DefaultExcludeExtensions({}); static bool IsVerbose = false; static bool UsePlainProgress = false; #define ZEN_CONSOLE_VERBOSE(fmtstr, ...) \ if (IsVerbose) \ { \ ZEN_CONSOLE_LOG(zen::logging::level::Info, fmtstr, ##__VA_ARGS__); \ } const std::string DefaultAccessTokenEnvVariableName( #if ZEN_PLATFORM_WINDOWS "UE-CloudDataCacheAccessToken"sv #endif #if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC "UE_CloudDataCacheAccessToken"sv #endif ); uint32_t SetNativeFileAttributes(const std::filesystem::path FilePath, SourcePlatform SourcePlatform, uint32_t Attributes) { #if ZEN_PLATFORM_WINDOWS if (SourcePlatform == SourcePlatform::Windows) { SetFileAttributes(FilePath, Attributes); return Attributes; } else { uint32_t CurrentAttributes = GetFileAttributes(FilePath); uint32_t NewAttributes = MakeFileAttributeReadOnly(CurrentAttributes, IsFileModeReadOnly(Attributes)); if (CurrentAttributes != NewAttributes) { SetFileAttributes(FilePath, NewAttributes); } return NewAttributes; } #endif // ZEN_PLATFORM_WINDOWS #if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC if (SourcePlatform != SourcePlatform::Windows) { SetFileMode(FilePath, Attributes); return Attributes; } else { uint32_t CurrentMode = GetFileMode(FilePath); uint32_t NewMode = MakeFileModeReadOnly(CurrentMode, IsFileAttributeReadOnly(Attributes)); if (CurrentMode != NewMode) { SetFileMode(FilePath, NewMode); } return NewMode; } #endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC }; uint32_t GetNativeFileAttributes(const std::filesystem::path FilePath) { #if ZEN_PLATFORM_WINDOWS return GetFileAttributes(FilePath); #endif // ZEN_PLATFORM_WINDOWS #if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC return GetFileMode(FilePath); #endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC } template std::string FormatArray(std::span Items, std::string_view Prefix) { ExtendableStringBuilder<512> SB; for (const T& Item : Items) { SB.Append(fmt::format("{}{}", Prefix, Item)); } return SB.ToString(); } bool CleanDirectory(const std::filesystem::path& Path, std::span ExcludeDirectories) { ZEN_TRACE_CPU("CleanDirectory"); bool CleanWipe = true; DirectoryContent LocalDirectoryContent; GetDirectoryContent(Path, DirectoryContentFlags::IncludeDirs | DirectoryContentFlags::IncludeFiles, LocalDirectoryContent); for (const std::filesystem::path& LocalFilePath : LocalDirectoryContent.Files) { try { std::filesystem::remove(LocalFilePath); } catch (const std::exception&) { // DeleteOnClose files may be a bit slow in getting cleaned up, so pause amd retry one time Sleep(200); try { std::filesystem::remove(LocalFilePath); } catch (const std::exception& Ex) { ZEN_WARN("Failed removing file {}. Reason: {}", LocalFilePath, Ex.what()); CleanWipe = false; } } } for (const std::filesystem::path& LocalDirPath : LocalDirectoryContent.Directories) { bool Leave = false; for (const std::string_view ExcludeDirectory : ExcludeDirectories) { if (LocalDirPath == (Path / ExcludeDirectory)) { Leave = true; break; } } if (!Leave) { try { zen::CleanDirectory(LocalDirPath); std::filesystem::remove(LocalDirPath); } catch (const std::exception&) { Sleep(200); try { zen::CleanDirectory(LocalDirPath); std::filesystem::remove(LocalDirPath); } catch (const std::exception& Ex) { ZEN_WARN("Failed removing directory {}. Reason: {}", LocalDirPath, Ex.what()); CleanWipe = false; } } } } return CleanWipe; } std::string ReadAccessTokenFromFile(const std::filesystem::path& Path) { if (!std::filesystem::is_regular_file(Path)) { throw std::runtime_error(fmt::format("the file '{}' does not exist", Path)); } IoBuffer Body = IoBufferBuilder::MakeFromFile(Path); std::string JsonText(reinterpret_cast(Body.GetData()), Body.GetSize()); std::string JsonError; json11::Json TokenInfo = json11::Json::parse(JsonText, JsonError); if (!JsonError.empty()) { throw std::runtime_error(fmt::format("failed parsing json file '{}'. Reason: '{}'", Path, JsonError)); } const std::string AuthToken = TokenInfo["Token"].string_value(); if (AuthToken.empty()) { throw std::runtime_error(fmt::format("the json file '{}' does not contain a value for \"Token\"", Path)); } return AuthToken; } bool IsBufferDiskBased(const IoBuffer& Buffer) { IoBufferFileReference FileRef; if (Buffer.GetFileReference(FileRef)) { return true; } return false; } bool IsBufferDiskBased(const CompositeBuffer& Buffer) { // If this is a file based buffer or a compressed buffer with a memory-based header, we don't need to rewrite to disk to save memory std::span Segments = Buffer.GetSegments(); ZEN_ASSERT(Buffer.GetSegments().size() > 0); return IsBufferDiskBased(Segments.back().AsIoBuffer()); } IoBuffer WriteToTempFile(CompositeBuffer&& Buffer, const std::filesystem::path& TempFolderPath, const IoHash& Hash, const std::string& Suffix = {}) { std::filesystem::path TempFilePath = (TempFolderPath / (Hash.ToHexString() + Suffix)).make_preferred(); return WriteToTempFile(std::move(Buffer), TempFilePath); } class FilteredRate { public: FilteredRate() {} void Start() { if (StartTimeUS == (uint64_t)-1) { uint64_t Expected = (uint64_t)-1; if (StartTimeUS.compare_exchange_weak(Expected, Timer.GetElapsedTimeUs())) { LastTimeUS = StartTimeUS.load(); } } } void Stop() { if (EndTimeUS == (uint64_t)-1) { uint64_t Expected = (uint64_t)-1; EndTimeUS.compare_exchange_weak(Expected, Timer.GetElapsedTimeUs()); } } void Update(uint64_t Count) { if (LastTimeUS == (uint64_t)-1) { return; } uint64_t TimeUS = Timer.GetElapsedTimeUs(); uint64_t TimeDeltaUS = TimeUS - LastTimeUS; if (TimeDeltaUS >= 2000000) { uint64_t Delta = Count - LastCount; uint64_t PerSecond = (Delta * 1000000) / TimeDeltaUS; LastPerSecond = PerSecond; LastCount = Count; FilteredPerSecond = (PerSecond + (LastPerSecond * 7)) / 8; LastTimeUS = TimeUS; } } uint64_t GetCurrent() const // If Stopped - return total count / total time { if (LastTimeUS == (uint64_t)-1) { return 0; } return FilteredPerSecond; } uint64_t GetElapsedTimeUS() const { if (StartTimeUS == (uint64_t)-1) { return 0; } if (EndTimeUS == (uint64_t)-1) { return 0; } uint64_t TimeDeltaUS = EndTimeUS - StartTimeUS; return TimeDeltaUS; } bool IsActive() const { return (StartTimeUS != (uint64_t)-1) && (EndTimeUS == (uint64_t)-1); } private: Stopwatch Timer; std::atomic StartTimeUS = (uint64_t)-1; std::atomic EndTimeUS = (uint64_t)-1; std::atomic LastTimeUS = (uint64_t)-1; uint64_t LastCount = 0; uint64_t LastPerSecond = 0; uint64_t FilteredPerSecond = 0; }; uint64_t GetBytesPerSecond(uint64_t ElapsedWallTimeUS, uint64_t Count) { if (ElapsedWallTimeUS == 0) { return 0; } return Count * 1000000 / ElapsedWallTimeUS; } std::filesystem::path GetTempChunkedSequenceFileName(const std::filesystem::path& CacheFolderPath, const IoHash& RawHash) { return (CacheFolderPath / (RawHash.ToHexString() + ".tmp")).make_preferred(); } std::filesystem::path GetFinalChunkedSequenceFileName(const std::filesystem::path& CacheFolderPath, const IoHash& RawHash) { return (CacheFolderPath / RawHash.ToHexString()).make_preferred(); } ChunkedFolderContent ScanAndChunkFolder( GetFolderContentStatistics& GetFolderContentStats, ChunkingStatistics& ChunkingStats, const std::filesystem::path& Path, std::function&& IsAcceptedFolder, std::function&& IsAcceptedFile, ChunkingController& ChunkController) { ZEN_TRACE_CPU("ScanAndChunkFolder"); FolderContent Content = GetFolderContent( GetFolderContentStats, Path, std::move(IsAcceptedFolder), std::move(IsAcceptedFile), GetMediumWorkerPool(EWorkloadType::Burst), UsePlainProgress ? 5000 : 200, [](bool, std::ptrdiff_t) {}, AbortFlag); if (AbortFlag) { return {}; } ProgressBar ProgressBar(UsePlainProgress); FilteredRate FilteredBytesHashed; FilteredBytesHashed.Start(); ChunkedFolderContent FolderContent = ChunkFolderContent( ChunkingStats, GetMediumWorkerPool(EWorkloadType::Burst), Path, Content, ChunkController, UsePlainProgress ? 5000 : 200, [&](bool, std::ptrdiff_t) { FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load()); std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found", ChunkingStats.FilesProcessed.load(), GetFolderContentStats.AcceptedFileCount.load(), NiceBytes(ChunkingStats.BytesHashed.load()), NiceBytes(GetFolderContentStats.FoundFileByteCount), NiceNum(FilteredBytesHashed.GetCurrent()), ChunkingStats.UniqueChunksFound.load(), NiceBytes(ChunkingStats.UniqueBytesFound.load())); ProgressBar.UpdateState({.Task = "Scanning files ", .Details = Details, .TotalCount = GetFolderContentStats.AcceptedFileByteCount, .RemainingCount = GetFolderContentStats.AcceptedFileByteCount - ChunkingStats.BytesHashed.load()}, false); }, AbortFlag); if (AbortFlag) { return {}; } FilteredBytesHashed.Stop(); ProgressBar.Finish(); ZEN_CONSOLE("Found {} ({}) files divided into {} ({}) unique chunks in '{}' in {}. Average hash rate {}B/sec", ChunkingStats.FilesProcessed.load(), NiceBytes(ChunkingStats.BytesHashed.load()), ChunkingStats.UniqueChunksFound.load(), NiceBytes(ChunkingStats.UniqueBytesFound.load()), Path, NiceTimeSpanMs((GetFolderContentStats.ElapsedWallTimeUS + ChunkingStats.ElapsedWallTimeUS) / 1000), NiceNum(GetBytesPerSecond(ChunkingStats.ElapsedWallTimeUS, ChunkingStats.BytesHashed))); return FolderContent; }; struct DiskStatistics { std::atomic OpenReadCount = 0; std::atomic OpenWriteCount = 0; std::atomic ReadCount = 0; std::atomic ReadByteCount = 0; std::atomic WriteCount = 0; std::atomic WriteByteCount = 0; std::atomic CurrentOpenFileCount = 0; }; struct FindBlocksStatistics { uint64_t FindBlockTimeMS = 0; uint64_t PotentialChunkCount = 0; uint64_t PotentialChunkByteCount = 0; uint64_t FoundBlockCount = 0; uint64_t FoundBlockChunkCount = 0; uint64_t FoundBlockByteCount = 0; uint64_t AcceptedBlockCount = 0; uint64_t AcceptedChunkCount = 0; uint64_t AcceptedByteCount = 0; uint64_t RejectedBlockCount = 0; uint64_t RejectedChunkCount = 0; uint64_t RejectedByteCount = 0; uint64_t AcceptedReduntantChunkCount = 0; uint64_t AcceptedReduntantByteCount = 0; uint64_t NewBlocksCount = 0; uint64_t NewBlocksChunkCount = 0; uint64_t NewBlocksChunkByteCount = 0; }; struct UploadStatistics { std::atomic BlockCount = 0; std::atomic BlocksBytes = 0; std::atomic ChunkCount = 0; std::atomic ChunksBytes = 0; std::atomic ReadFromDiskBytes = 0; std::atomic MultipartAttachmentCount = 0; uint64_t ElapsedWallTimeUS = 0; UploadStatistics& operator+=(const UploadStatistics& Rhs) { BlockCount += Rhs.BlockCount; BlocksBytes += Rhs.BlocksBytes; ChunkCount += Rhs.ChunkCount; ChunksBytes += Rhs.ChunksBytes; ReadFromDiskBytes += Rhs.ReadFromDiskBytes; MultipartAttachmentCount += Rhs.MultipartAttachmentCount; ElapsedWallTimeUS += Rhs.ElapsedWallTimeUS; return *this; } }; struct LooseChunksStatistics { uint64_t ChunkCount = 0; uint64_t ChunkByteCount = 0; std::atomic CompressedChunkCount = 0; std::atomic CompressedChunkBytes = 0; uint64_t CompressChunksElapsedWallTimeUS = 0; LooseChunksStatistics& operator+=(const LooseChunksStatistics& Rhs) { ChunkCount += Rhs.ChunkCount; ChunkByteCount += Rhs.ChunkByteCount; CompressedChunkCount += Rhs.CompressedChunkCount; CompressedChunkBytes += Rhs.CompressedChunkBytes; CompressChunksElapsedWallTimeUS += Rhs.CompressChunksElapsedWallTimeUS; return *this; } }; struct GenerateBlocksStatistics { std::atomic GeneratedBlockByteCount = 0; std::atomic GeneratedBlockCount = 0; uint64_t GenerateBlocksElapsedWallTimeUS = 0; GenerateBlocksStatistics& operator+=(const GenerateBlocksStatistics& Rhs) { GeneratedBlockByteCount += Rhs.GeneratedBlockByteCount; GeneratedBlockCount += Rhs.GeneratedBlockCount; GenerateBlocksElapsedWallTimeUS += Rhs.GenerateBlocksElapsedWallTimeUS; return *this; } }; struct CacheMappingStatistics { uint64_t CacheChunkCount = 0; uint64_t CacheChunkByteCount = 0; uint64_t CacheBlockCount = 0; uint64_t CacheBlocksByteCount = 0; uint64_t CacheSequenceHashesCount = 0; uint64_t CacheSequenceHashesByteCount = 0; uint32_t LocalPathsMatchingSequencesCount = 0; uint64_t LocalPathsMatchingSequencesByteCount = 0; uint64_t LocalChunkMatchingRemoteCount = 0; uint64_t LocalChunkMatchingRemoteByteCount = 0; }; struct DownloadStatistics { std::atomic RequestsCompleteCount = 0; std::atomic DownloadedChunkCount = 0; std::atomic DownloadedChunkByteCount = 0; std::atomic MultipartAttachmentCount = 0; std::atomic DownloadedBlockCount = 0; std::atomic DownloadedBlockByteCount = 0; std::atomic DownloadedPartialBlockCount = 0; std::atomic DownloadedPartialBlockByteCount = 0; }; struct WriteChunkStatistics { std::atomic ChunkCountWritten = 0; std::atomic ChunkBytesWritten = 0; uint64_t DownloadTimeUs = 0; uint64_t WriteTimeUs = 0; uint64_t WriteChunksElapsedWallTimeUs = 0; }; struct RebuildFolderStateStatistics { uint64_t CleanFolderElapsedWallTimeUs = 0; std::atomic FinalizeTreeFilesMovedCount = 0; std::atomic FinalizeTreeFilesCopiedCount = 0; uint64_t FinalizeTreeElapsedWallTimeUs = 0; }; struct VerifyFolderStatistics { std::atomic FilesVerified = 0; std::atomic FilesFailed = 0; std::atomic ReadBytes = 0; uint64_t VerifyElapsedWallTimeUs = 0; }; std::vector CalculateAbsoluteChunkOrders(const std::span LocalChunkHashes, const std::span LocalChunkOrder, const tsl::robin_map& ChunkHashToLocalChunkIndex, const std::span& LooseChunkIndexes, const std::span& BlockDescriptions) { ZEN_TRACE_CPU("CalculateAbsoluteChunkOrders"); #if EXTRA_VERIFY std::vector TmpAbsoluteChunkHashes; TmpAbsoluteChunkHashes.reserve(LocalChunkHashes.size()); #endif // EXTRA_VERIFY std::vector LocalChunkIndexToAbsoluteChunkIndex; LocalChunkIndexToAbsoluteChunkIndex.resize(LocalChunkHashes.size(), (uint32_t)-1); std::uint32_t AbsoluteChunkCount = 0; for (uint32_t ChunkIndex : LooseChunkIndexes) { LocalChunkIndexToAbsoluteChunkIndex[ChunkIndex] = AbsoluteChunkCount; #if EXTRA_VERIFY TmpAbsoluteChunkHashes.push_back(LocalChunkHashes[ChunkIndex]); #endif // EXTRA_VERIFY AbsoluteChunkCount++; } for (const ChunkBlockDescription& Block : BlockDescriptions) { for (const IoHash& ChunkHash : Block.ChunkRawHashes) { if (auto It = ChunkHashToLocalChunkIndex.find(ChunkHash); It != ChunkHashToLocalChunkIndex.end()) { const uint32_t LocalChunkIndex = It->second; ZEN_ASSERT_SLOW(LocalChunkHashes[LocalChunkIndex] == ChunkHash); LocalChunkIndexToAbsoluteChunkIndex[LocalChunkIndex] = AbsoluteChunkCount; } #if EXTRA_VERIFY TmpAbsoluteChunkHashes.push_back(ChunkHash); #endif // EXTRA_VERIFY AbsoluteChunkCount++; } } std::vector AbsoluteChunkOrder; AbsoluteChunkOrder.reserve(LocalChunkHashes.size()); for (const uint32_t LocalChunkIndex : LocalChunkOrder) { const uint32_t AbsoluteChunkIndex = LocalChunkIndexToAbsoluteChunkIndex[LocalChunkIndex]; #if EXTRA_VERIFY ZEN_ASSERT(LocalChunkHashes[LocalChunkIndex] == TmpAbsoluteChunkHashes[AbsoluteChunkIndex]); #endif // EXTRA_VERIFY AbsoluteChunkOrder.push_back(AbsoluteChunkIndex); } #if EXTRA_VERIFY { uint32_t OrderIndex = 0; while (OrderIndex < LocalChunkOrder.size()) { const uint32_t LocalChunkIndex = LocalChunkOrder[OrderIndex]; const IoHash& LocalChunkHash = LocalChunkHashes[LocalChunkIndex]; const uint32_t AbsoluteChunkIndex = AbsoluteChunkOrder[OrderIndex]; const IoHash& AbsoluteChunkHash = TmpAbsoluteChunkHashes[AbsoluteChunkIndex]; ZEN_ASSERT(LocalChunkHash == AbsoluteChunkHash); OrderIndex++; } } #endif // EXTRA_VERIFY return AbsoluteChunkOrder; } void CalculateLocalChunkOrders(const std::span& AbsoluteChunkOrders, const std::span LooseChunkHashes, const std::span LooseChunkRawSizes, const std::span& BlockDescriptions, std::vector& OutLocalChunkHashes, std::vector& OutLocalChunkRawSizes, std::vector& OutLocalChunkOrders) { ZEN_TRACE_CPU("CalculateLocalChunkOrders"); std::vector AbsoluteChunkHashes; std::vector AbsoluteChunkRawSizes; AbsoluteChunkHashes.insert(AbsoluteChunkHashes.end(), LooseChunkHashes.begin(), LooseChunkHashes.end()); AbsoluteChunkRawSizes.insert(AbsoluteChunkRawSizes.end(), LooseChunkRawSizes.begin(), LooseChunkRawSizes.end()); for (const ChunkBlockDescription& Block : BlockDescriptions) { AbsoluteChunkHashes.insert(AbsoluteChunkHashes.end(), Block.ChunkRawHashes.begin(), Block.ChunkRawHashes.end()); AbsoluteChunkRawSizes.insert(AbsoluteChunkRawSizes.end(), Block.ChunkRawLengths.begin(), Block.ChunkRawLengths.end()); } OutLocalChunkHashes.reserve(AbsoluteChunkHashes.size()); OutLocalChunkRawSizes.reserve(AbsoluteChunkRawSizes.size()); OutLocalChunkOrders.reserve(AbsoluteChunkOrders.size()); tsl::robin_map ChunkHashToChunkIndex; ChunkHashToChunkIndex.reserve(AbsoluteChunkHashes.size()); for (uint32_t AbsoluteChunkOrderIndex = 0; AbsoluteChunkOrderIndex < AbsoluteChunkOrders.size(); AbsoluteChunkOrderIndex++) { const uint32_t AbsoluteChunkIndex = AbsoluteChunkOrders[AbsoluteChunkOrderIndex]; const IoHash& AbsoluteChunkHash = AbsoluteChunkHashes[AbsoluteChunkIndex]; const uint64_t AbsoluteChunkRawSize = AbsoluteChunkRawSizes[AbsoluteChunkIndex]; if (auto It = ChunkHashToChunkIndex.find(AbsoluteChunkHash); It != ChunkHashToChunkIndex.end()) { const uint32_t LocalChunkIndex = It->second; OutLocalChunkOrders.push_back(LocalChunkIndex); } else { uint32_t LocalChunkIndex = gsl::narrow(OutLocalChunkHashes.size()); OutLocalChunkHashes.push_back(AbsoluteChunkHash); OutLocalChunkRawSizes.push_back(AbsoluteChunkRawSize); OutLocalChunkOrders.push_back(LocalChunkIndex); ChunkHashToChunkIndex.insert_or_assign(AbsoluteChunkHash, LocalChunkIndex); } #if EXTRA_VERIFY const uint32_t LocalChunkIndex = OutLocalChunkOrders[AbsoluteChunkOrderIndex]; const IoHash& LocalChunkHash = OutLocalChunkHashes[LocalChunkIndex]; const uint64_t& LocalChunkRawSize = OutLocalChunkRawSizes[LocalChunkIndex]; ZEN_ASSERT(LocalChunkHash == AbsoluteChunkHash); ZEN_ASSERT(LocalChunkRawSize == AbsoluteChunkRawSize); #endif // EXTRA_VERIFY } #if EXTRA_VERIFY for (uint32_t OrderIndex = 0; OrderIndex < OutLocalChunkOrders.size(); OrderIndex++) { uint32_t LocalChunkIndex = OutLocalChunkOrders[OrderIndex]; const IoHash LocalChunkHash = OutLocalChunkHashes[LocalChunkIndex]; uint64_t LocalChunkRawSize = OutLocalChunkRawSizes[LocalChunkIndex]; uint32_t VerifyChunkIndex = AbsoluteChunkOrders[OrderIndex]; const IoHash VerifyChunkHash = AbsoluteChunkHashes[VerifyChunkIndex]; uint64_t VerifyChunkRawSize = AbsoluteChunkRawSizes[VerifyChunkIndex]; ZEN_ASSERT(LocalChunkHash == VerifyChunkHash); ZEN_ASSERT(LocalChunkRawSize == VerifyChunkRawSize); } #endif // EXTRA_VERIFY } bool g_UseDeltaEncoding = true; void WriteBuildContentToCompactBinary(CbObjectWriter& PartManifestWriter, const SourcePlatform Platform, std::span Paths, std::span RawHashes, std::span RawSizes, std::span Attributes, std::span SequenceRawHashes, std::span ChunkCounts, std::span LocalChunkHashes, std::span LocalChunkRawSizes, const std::vector& AbsoluteChunkOrders, const std::span LooseLocalChunkIndexes, const std::span BlockHashes) { ZEN_ASSERT(Platform != SourcePlatform::_Count); PartManifestWriter.AddString("platform"sv, ToString(Platform)); uint64_t TotalSize = 0; for (const uint64_t Size : RawSizes) { TotalSize += Size; } PartManifestWriter.AddInteger("totalSize", TotalSize); PartManifestWriter.BeginObject("files"sv); { compactbinary_helpers::WriteArray(Paths, "paths"sv, PartManifestWriter); compactbinary_helpers::WriteArray(RawHashes, "rawhashes"sv, PartManifestWriter); compactbinary_helpers::WriteArray(RawSizes, "rawsizes"sv, PartManifestWriter); if (Platform == SourcePlatform::Windows) { compactbinary_helpers::WriteArray(Attributes, "attributes"sv, PartManifestWriter); } if (Platform == SourcePlatform::Linux || Platform == SourcePlatform::MacOS) { compactbinary_helpers::WriteArray(Attributes, "mode"sv, PartManifestWriter); } } PartManifestWriter.EndObject(); // files PartManifestWriter.BeginObject("chunkedContent"); { compactbinary_helpers::WriteArray(SequenceRawHashes, "sequenceRawHashes"sv, PartManifestWriter); compactbinary_helpers::WriteArray(ChunkCounts, "chunkcounts"sv, PartManifestWriter); if (g_UseDeltaEncoding) { compactbinary_helpers::WriteDeltaArray(AbsoluteChunkOrders, "chunkorders_delta"sv, PartManifestWriter); } else { compactbinary_helpers::WriteArray(AbsoluteChunkOrders, "chunkorders"sv, PartManifestWriter); } } PartManifestWriter.EndObject(); // chunkedContent size_t LooseChunkCount = LooseLocalChunkIndexes.size(); if (LooseChunkCount > 0) { PartManifestWriter.BeginObject("chunkAttachments"); { PartManifestWriter.BeginArray("rawHashes"sv); for (uint32_t ChunkIndex : LooseLocalChunkIndexes) { PartManifestWriter.AddBinaryAttachment(LocalChunkHashes[ChunkIndex]); } PartManifestWriter.EndArray(); // rawHashes PartManifestWriter.BeginArray("chunkRawSizes"sv); for (uint32_t ChunkIndex : LooseLocalChunkIndexes) { PartManifestWriter.AddInteger(LocalChunkRawSizes[ChunkIndex]); } PartManifestWriter.EndArray(); // chunkSizes } PartManifestWriter.EndObject(); // } if (BlockHashes.size() > 0) { PartManifestWriter.BeginObject("blockAttachments"); { compactbinary_helpers::WriteBinaryAttachmentArray(BlockHashes, "rawHashes"sv, PartManifestWriter); } PartManifestWriter.EndObject(); // blocks } } void ReadBuildContentFromCompactBinary(CbObjectView BuildPartManifest, SourcePlatform& OutPlatform, std::vector& OutPaths, std::vector& OutRawHashes, std::vector& OutRawSizes, std::vector& OutAttributes, std::vector& OutSequenceRawHashes, std::vector& OutChunkCounts, std::vector& OutAbsoluteChunkOrders, std::vector& OutLooseChunkHashes, std::vector& OutLooseChunkRawSizes, std::vector& OutBlockRawHashes) { OutPlatform = FromString(BuildPartManifest["platform"sv].AsString(), SourcePlatform::_Count); CbObjectView FilesObject = BuildPartManifest["files"sv].AsObjectView(); compactbinary_helpers::ReadArray("paths"sv, FilesObject, OutPaths); compactbinary_helpers::ReadArray("rawhashes"sv, FilesObject, OutRawHashes); compactbinary_helpers::ReadArray("rawsizes"sv, FilesObject, OutRawSizes); uint64_t PathCount = OutPaths.size(); if (OutRawHashes.size() != PathCount) { throw std::runtime_error(fmt::format("Number of raw hashes entries does not match number of paths")); } if (OutRawSizes.size() != PathCount) { throw std::runtime_error(fmt::format("Number of raw sizes entries does not match number of paths")); } std::vector ModeArray; compactbinary_helpers::ReadArray("mode"sv, FilesObject, ModeArray); if (ModeArray.size() != PathCount && ModeArray.size() != 0) { throw std::runtime_error(fmt::format("Number of attribute entries does not match number of paths")); } std::vector AttributeArray; compactbinary_helpers::ReadArray("attributes"sv, FilesObject, ModeArray); if (AttributeArray.size() != PathCount && AttributeArray.size() != 0) { throw std::runtime_error(fmt::format("Number of attribute entries does not match number of paths")); } if (ModeArray.size() > 0) { if (OutPlatform == SourcePlatform::_Count) { OutPlatform = SourcePlatform::Linux; // Best guess - under dev format } OutAttributes = std::move(ModeArray); } else if (AttributeArray.size() > 0) { if (OutPlatform == SourcePlatform::_Count) { OutPlatform = SourcePlatform::Windows; } OutAttributes = std::move(AttributeArray); } else { if (OutPlatform == SourcePlatform::_Count) { OutPlatform = GetSourceCurrentPlatform(); } } if (CbObjectView ChunkContentView = BuildPartManifest["chunkedContent"sv].AsObjectView(); ChunkContentView) { compactbinary_helpers::ReadArray("sequenceRawHashes"sv, ChunkContentView, OutSequenceRawHashes); compactbinary_helpers::ReadArray("chunkcounts"sv, ChunkContentView, OutChunkCounts); if (OutChunkCounts.size() != OutSequenceRawHashes.size()) { throw std::runtime_error(fmt::format("Number of chunk count entries does not match number of paths")); } if (ChunkContentView["chunkorders_delta"sv]) { compactbinary_helpers::ReadDeltaArray("chunkorders_delta"sv, ChunkContentView, OutAbsoluteChunkOrders); } else { compactbinary_helpers::ReadArray("chunkorders"sv, ChunkContentView, OutAbsoluteChunkOrders); } } else if (FilesObject["chunkcounts"sv]) { // Legacy zen style std::vector LegacyChunkCounts; compactbinary_helpers::ReadArray("chunkcounts"sv, FilesObject, LegacyChunkCounts); if (LegacyChunkCounts.size() != PathCount) { throw std::runtime_error(fmt::format("Number of chunk count entries does not match number of paths")); } std::vector LegacyAbsoluteChunkOrders; compactbinary_helpers::ReadArray("chunkorders"sv, FilesObject, LegacyAbsoluteChunkOrders); CbArrayView ChunkOrdersArray = BuildPartManifest["chunkorders"sv].AsArrayView(); const uint64_t ChunkOrdersCount = ChunkOrdersArray.Num(); tsl::robin_set FoundRawHashes; FoundRawHashes.reserve(PathCount); OutChunkCounts.reserve(PathCount); OutAbsoluteChunkOrders.reserve(ChunkOrdersCount); uint32_t OrderIndexOffset = 0; for (uint32_t PathIndex = 0; PathIndex < OutPaths.size(); PathIndex++) { const IoHash& PathRawHash = OutRawHashes[PathIndex]; uint32_t LegacyChunkCount = LegacyChunkCounts[PathIndex]; if (FoundRawHashes.insert(PathRawHash).second) { OutSequenceRawHashes.push_back(PathRawHash); OutChunkCounts.push_back(LegacyChunkCount); std::span AbsoluteChunkOrder = std::span(LegacyAbsoluteChunkOrders).subspan(OrderIndexOffset, LegacyChunkCount); OutAbsoluteChunkOrders.insert(OutAbsoluteChunkOrders.end(), AbsoluteChunkOrder.begin(), AbsoluteChunkOrder.end()); } OrderIndexOffset += LegacyChunkCounts[PathIndex]; } } else { // Legacy C# style tsl::robin_set FoundRawHashes; FoundRawHashes.reserve(PathCount); uint32_t OrderIndexOffset = 0; for (uint32_t PathIndex = 0; PathIndex < OutPaths.size(); PathIndex++) { if (OutRawSizes[PathIndex] > 0) { const IoHash& PathRawHash = OutRawHashes[PathIndex]; if (FoundRawHashes.insert(PathRawHash).second) { OutSequenceRawHashes.push_back(PathRawHash); OutChunkCounts.push_back(1); OutAbsoluteChunkOrders.push_back(OrderIndexOffset); OutLooseChunkHashes.push_back(PathRawHash); OutLooseChunkRawSizes.push_back(OutRawSizes[PathIndex]); OrderIndexOffset += 1; } } } } CbObjectView ChunkAttachmentsView = BuildPartManifest["chunkAttachments"sv].AsObjectView(); { compactbinary_helpers::ReadBinaryAttachmentArray("rawHashes"sv, ChunkAttachmentsView, OutLooseChunkHashes); compactbinary_helpers::ReadArray("chunkRawSizes"sv, ChunkAttachmentsView, OutLooseChunkRawSizes); if (OutLooseChunkHashes.size() != OutLooseChunkRawSizes.size()) { throw std::runtime_error( fmt::format("Number of attachment chunk hashes does not match number of attachemnt chunk raw sizes")); } } CbObjectView BlocksView = BuildPartManifest["blockAttachments"sv].AsObjectView(); { compactbinary_helpers::ReadBinaryAttachmentArray("rawHashes"sv, BlocksView, OutBlockRawHashes); } } bool ReadStateObject(CbObjectView StateView, Oid& OutBuildId, std::vector& BuildPartsIds, std::vector& BuildPartsNames, std::vector& OutPartContents, FolderContent& OutLocalFolderState) { try { CbObjectView BuildView = StateView["builds"sv].AsArrayView().CreateViewIterator().AsObjectView(); OutBuildId = BuildView["buildId"sv].AsObjectId(); for (CbFieldView PartView : BuildView["parts"sv].AsArrayView()) { CbObjectView PartObjectView = PartView.AsObjectView(); BuildPartsIds.push_back(PartObjectView["partId"sv].AsObjectId()); BuildPartsNames.push_back(std::string(PartObjectView["partName"sv].AsString())); OutPartContents.push_back(LoadChunkedFolderContentToCompactBinary(PartObjectView["content"sv].AsObjectView())); } OutLocalFolderState = LoadFolderContentToCompactBinary(StateView["localFolderState"sv].AsObjectView()); return true; } catch (const std::exception& Ex) { ZEN_CONSOLE("Unable to read local state: ", Ex.what()); return false; } } CbObject CreateStateObject(const Oid& BuildId, std::vector> AllBuildParts, std::span PartContents, const FolderContent& LocalFolderState) { CbObjectWriter CurrentStateWriter; CurrentStateWriter.BeginArray("builds"sv); { CurrentStateWriter.BeginObject(); { CurrentStateWriter.AddObjectId("buildId"sv, BuildId); CurrentStateWriter.BeginArray("parts"sv); for (size_t PartIndex = 0; PartIndex < AllBuildParts.size(); PartIndex++) { const Oid BuildPartId = AllBuildParts[PartIndex].first; CurrentStateWriter.BeginObject(); { CurrentStateWriter.AddObjectId("partId"sv, BuildPartId); CurrentStateWriter.AddString("partName"sv, AllBuildParts[PartIndex].second); CurrentStateWriter.BeginObject("content"); { SaveChunkedFolderContentToCompactBinary(PartContents[PartIndex], CurrentStateWriter); } CurrentStateWriter.EndObject(); } CurrentStateWriter.EndObject(); } CurrentStateWriter.EndArray(); // parts } CurrentStateWriter.EndObject(); } CurrentStateWriter.EndArray(); // builds CurrentStateWriter.BeginObject("localFolderState"sv); { SaveFolderContentToCompactBinary(LocalFolderState, CurrentStateWriter); } CurrentStateWriter.EndObject(); // localFolderState return CurrentStateWriter.Save(); } class BufferedOpenFile { public: BufferedOpenFile(const std::filesystem::path Path, DiskStatistics& DiskStats) : m_Source(Path, BasicFile::Mode::kRead) , m_SourceSize(m_Source.FileSize()) , m_DiskStats(DiskStats) { m_DiskStats.OpenReadCount++; m_DiskStats.CurrentOpenFileCount++; } ~BufferedOpenFile() { m_DiskStats.CurrentOpenFileCount--; } BufferedOpenFile() = delete; BufferedOpenFile(const BufferedOpenFile&) = delete; BufferedOpenFile(BufferedOpenFile&&) = delete; BufferedOpenFile& operator=(BufferedOpenFile&&) = delete; BufferedOpenFile& operator=(const BufferedOpenFile&) = delete; const uint64_t BlockSize = 256u * 1024u; CompositeBuffer GetRange(uint64_t Offset, uint64_t Size) { ZEN_TRACE_CPU("BufferedOpenFile::GetRange"); ZEN_ASSERT((m_CacheBlockIndex == (uint64_t)-1) || m_Cache); auto _ = MakeGuard([&]() { ZEN_ASSERT((m_CacheBlockIndex == (uint64_t)-1) || m_Cache); }); ZEN_ASSERT((Offset + Size) <= m_SourceSize); const uint64_t BlockIndexStart = Offset / BlockSize; const uint64_t BlockIndexEnd = (Offset + Size - 1) / BlockSize; std::vector BufferRanges; BufferRanges.reserve(BlockIndexEnd - BlockIndexStart + 1); uint64_t ReadOffset = Offset; for (uint64_t BlockIndex = BlockIndexStart; BlockIndex <= BlockIndexEnd; BlockIndex++) { const uint64_t BlockStartOffset = BlockIndex * BlockSize; if (m_CacheBlockIndex != BlockIndex) { uint64_t CacheSize = Min(BlockSize, m_SourceSize - BlockStartOffset); ZEN_ASSERT(CacheSize > 0); m_Cache = IoBuffer(CacheSize); m_Source.Read(m_Cache.GetMutableView().GetData(), CacheSize, BlockStartOffset); m_DiskStats.ReadCount++; m_DiskStats.ReadByteCount += CacheSize; m_CacheBlockIndex = BlockIndex; } const uint64_t BytesRead = ReadOffset - Offset; ZEN_ASSERT(BlockStartOffset <= ReadOffset); const uint64_t OffsetIntoBlock = ReadOffset - BlockStartOffset; ZEN_ASSERT(OffsetIntoBlock < m_Cache.GetSize()); const uint64_t BlockBytes = Min(m_Cache.GetSize() - OffsetIntoBlock, Size - BytesRead); BufferRanges.emplace_back(SharedBuffer(IoBuffer(m_Cache, OffsetIntoBlock, BlockBytes))); ReadOffset += BlockBytes; } CompositeBuffer Result(std::move(BufferRanges)); ZEN_ASSERT(Result.GetSize() == Size); return Result; } private: BasicFile m_Source; const uint64_t m_SourceSize; DiskStatistics& m_DiskStats; uint64_t m_CacheBlockIndex = (uint64_t)-1; IoBuffer m_Cache; }; class ReadFileCache { public: // A buffered file reader that provides CompositeBuffer where the buffers are owned and the memory never overwritten ReadFileCache(DiskStatistics& DiskStats, const std::filesystem::path& Path, const ChunkedFolderContent& LocalContent, const ChunkedContentLookup& LocalLookup, size_t MaxOpenFileCount) : m_Path(Path) , m_LocalContent(LocalContent) , m_LocalLookup(LocalLookup) , m_DiskStats(DiskStats) { m_OpenFiles.reserve(MaxOpenFileCount); } ~ReadFileCache() { m_OpenFiles.clear(); } CompositeBuffer GetRange(uint32_t SequenceIndex, uint64_t Offset, uint64_t Size) { ZEN_TRACE_CPU("ReadFileCache::GetRange"); auto CacheIt = std::find_if(m_OpenFiles.begin(), m_OpenFiles.end(), [SequenceIndex](const auto& Lhs) { return Lhs.first == SequenceIndex; }); if (CacheIt != m_OpenFiles.end()) { if (CacheIt != m_OpenFiles.begin()) { auto CachedFile(std::move(CacheIt->second)); m_OpenFiles.erase(CacheIt); m_OpenFiles.insert(m_OpenFiles.begin(), std::make_pair(SequenceIndex, std::move(CachedFile))); } CompositeBuffer Result = m_OpenFiles.front().second->GetRange(Offset, Size); return Result; } const uint32_t LocalPathIndex = m_LocalLookup.SequenceIndexFirstPathIndex[SequenceIndex]; const std::filesystem::path LocalFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred(); if (Size == m_LocalContent.RawSizes[LocalPathIndex]) { IoBuffer Result = IoBufferBuilder::MakeFromFile(LocalFilePath); return CompositeBuffer(SharedBuffer(Result)); } if (m_OpenFiles.size() == m_OpenFiles.capacity()) { m_OpenFiles.pop_back(); } m_OpenFiles.insert(m_OpenFiles.begin(), std::make_pair(SequenceIndex, std::make_unique(LocalFilePath, m_DiskStats))); CompositeBuffer Result = m_OpenFiles.front().second->GetRange(Offset, Size); return Result; } private: const std::filesystem::path m_Path; const ChunkedFolderContent& m_LocalContent; const ChunkedContentLookup& m_LocalLookup; std::vector>> m_OpenFiles; DiskStatistics& m_DiskStats; }; CompositeBuffer ValidateBlob(IoBuffer&& Payload, const IoHash& BlobHash, uint64_t& OutCompressedSize, uint64_t& OutDecompressedSize) { ZEN_TRACE_CPU("ValidateBlob"); if (Payload.GetContentType() != ZenContentType::kCompressedBinary) { throw std::runtime_error(fmt::format("Blob {} ({} bytes) has unexpected content type '{}'", BlobHash, Payload.GetSize(), ToString(Payload.GetContentType()))); } IoHash RawHash; uint64_t RawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, RawSize); if (!Compressed) { throw std::runtime_error(fmt::format("Blob {} ({} bytes) compressed header is invalid", BlobHash, Payload.GetSize())); } if (RawHash != BlobHash) { throw std::runtime_error( fmt::format("Blob {} ({} bytes) compressed header has a mismatching raw hash {}", BlobHash, Payload.GetSize(), RawHash)); } IoHashStream Hash; bool CouldDecompress = Compressed.DecompressToStream( 0, RawSize, [&Hash](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { ZEN_UNUSED(SourceOffset, SourceSize, Offset); if (!AbortFlag) { for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) { Hash.Append(Segment.GetView()); } return true; } return false; }); if (AbortFlag) { return CompositeBuffer{}; } if (!CouldDecompress) { throw std::runtime_error( fmt::format("Blob {} ({} bytes) failed to decompress - header information mismatch", BlobHash, Payload.GetSize())); } IoHash ValidateRawHash = Hash.GetHash(); if (ValidateRawHash != BlobHash) { throw std::runtime_error(fmt::format("Blob {} ({} bytes) decompressed hash {} does not match header information", BlobHash, Payload.GetSize(), ValidateRawHash)); } OodleCompressor Compressor; OodleCompressionLevel CompressionLevel; uint64_t BlockSize; if (!Compressed.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) { throw std::runtime_error(fmt::format("Blob {} ({} bytes) failed to get compression details", BlobHash, Payload.GetSize())); } OutCompressedSize = Payload.GetSize(); OutDecompressedSize = RawSize; if (CompressionLevel == OodleCompressionLevel::None) { // Only decompress to composite if we need it for block verification CompositeBuffer DecompressedComposite = Compressed.DecompressToComposite(); if (!DecompressedComposite) { throw std::runtime_error(fmt::format("Blob {} ({} bytes) failed to decompress to composite", BlobHash, Payload.GetSize())); } return DecompressedComposite; } return CompositeBuffer{}; } CompositeBuffer ValidateBlob(BuildStorage& Storage, const Oid& BuildId, const IoHash& BlobHash, uint64_t& OutCompressedSize, uint64_t& OutDecompressedSize) { ZEN_TRACE_CPU("ValidateBlob"); IoBuffer Payload = Storage.GetBuildBlob(BuildId, BlobHash); if (!Payload) { throw std::runtime_error(fmt::format("Blob {} could not be found", BlobHash)); } return ValidateBlob(std::move(Payload), BlobHash, OutCompressedSize, OutDecompressedSize); } ChunkBlockDescription ValidateChunkBlock(IoBuffer&& Payload, const IoHash& BlobHash, uint64_t& OutCompressedSize, uint64_t& OutDecompressedSize) { CompositeBuffer BlockBuffer = ValidateBlob(std::move(Payload), BlobHash, OutCompressedSize, OutDecompressedSize); if (!BlockBuffer) { throw std::runtime_error(fmt::format("Chunk block blob {} is not compressed using 'None' compression level", BlobHash)); } return GetChunkBlockDescription(BlockBuffer.Flatten(), BlobHash); } CompositeBuffer FetchChunk(const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, const IoHash& ChunkHash, ReadFileCache& OpenFileCache) { ZEN_TRACE_CPU("FetchChunk"); auto It = Lookup.ChunkHashToChunkIndex.find(ChunkHash); ZEN_ASSERT(It != Lookup.ChunkHashToChunkIndex.end()); uint32_t ChunkIndex = It->second; std::span ChunkLocations = GetChunkSequenceLocations(Lookup, ChunkIndex); ZEN_ASSERT(!ChunkLocations.empty()); CompositeBuffer Chunk = OpenFileCache.GetRange(ChunkLocations[0].SequenceIndex, ChunkLocations[0].Offset, Content.ChunkedContent.ChunkRawSizes[ChunkIndex]); ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == ChunkHash); return Chunk; }; CompressedBuffer GenerateBlock(const std::filesystem::path& Path, const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, const std::vector& ChunksInBlock, ChunkBlockDescription& OutBlockDescription, DiskStatistics& DiskStats) { ZEN_TRACE_CPU("GenerateBlock"); ReadFileCache OpenFileCache(DiskStats, Path, Content, Lookup, 4); std::vector> BlockContent; BlockContent.reserve(ChunksInBlock.size()); for (uint32_t ChunkIndex : ChunksInBlock) { BlockContent.emplace_back(std::make_pair( Content.ChunkedContent.ChunkHashes[ChunkIndex], [&Content, &Lookup, &OpenFileCache, ChunkIndex](const IoHash& ChunkHash) -> std::pair { CompositeBuffer Chunk = FetchChunk(Content, Lookup, ChunkHash, OpenFileCache); if (!Chunk) { ZEN_ASSERT(false); } uint64_t RawSize = Chunk.GetSize(); return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid, OodleCompressionLevel::None)}; })); } return GenerateChunkBlock(std::move(BlockContent), OutBlockDescription); }; CompressedBuffer RebuildBlock(const std::filesystem::path& Path, const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, CompositeBuffer&& HeaderBuffer, const std::vector& ChunksInBlock, DiskStatistics& DiskStats) { ZEN_TRACE_CPU("RebuildBlock"); ReadFileCache OpenFileCache(DiskStats, Path, Content, Lookup, 4); std::vector ResultBuffers; ResultBuffers.reserve(HeaderBuffer.GetSegments().size() + ChunksInBlock.size()); ResultBuffers.insert(ResultBuffers.end(), HeaderBuffer.GetSegments().begin(), HeaderBuffer.GetSegments().end()); for (uint32_t ChunkIndex : ChunksInBlock) { std::span ChunkLocations = GetChunkSequenceLocations(Lookup, ChunkIndex); ZEN_ASSERT(!ChunkLocations.empty()); CompositeBuffer Chunk = OpenFileCache.GetRange(ChunkLocations[0].SequenceIndex, ChunkLocations[0].Offset, Content.ChunkedContent.ChunkRawSizes[ChunkIndex]); ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == Content.ChunkedContent.ChunkHashes[ChunkIndex]); CompositeBuffer CompressedChunk = CompressedBuffer::Compress(std::move(Chunk), OodleCompressor::Mermaid, OodleCompressionLevel::None).GetCompressed(); ResultBuffers.insert(ResultBuffers.end(), CompressedChunk.GetSegments().begin(), CompressedChunk.GetSegments().end()); } return CompressedBuffer::FromCompressedNoValidate(CompositeBuffer(std::move(ResultBuffers))); }; void DownloadLargeBlob(BuildStorage& Storage, const std::filesystem::path& DownloadFolder, const Oid& BuildId, const IoHash& ChunkHash, const std::uint64_t PreferredMultipartChunkSize, ParallellWork& Work, WorkerThreadPool& NetworkPool, DownloadStatistics& DownloadStats, std::function&& OnDownloadComplete) { ZEN_TRACE_CPU("DownloadLargeBlob"); struct WorkloadData { TemporaryFile TempFile; }; std::shared_ptr Workload(std::make_shared()); std::error_code Ec; Workload->TempFile.CreateTemporary(DownloadFolder, Ec); if (Ec) { throw std::runtime_error( fmt::format("Failed opening temporary file '{}': {} ({})", Workload->TempFile.GetPath(), Ec.message(), Ec.value())); } std::vector> WorkItems = Storage.GetLargeBuildBlob( BuildId, ChunkHash, PreferredMultipartChunkSize, [Workload, &DownloadStats, OnDownloadComplete = std::move(OnDownloadComplete)](uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining) { DownloadStats.DownloadedChunkByteCount += Chunk.GetSize(); if (!AbortFlag.load()) { ZEN_TRACE_CPU("DownloadLargeBlob_Save"); Workload->TempFile.Write(Chunk.GetView(), Offset); if (Chunk.GetSize() == BytesRemaining) { DownloadStats.DownloadedChunkCount++; uint64_t PayloadSize = Workload->TempFile.FileSize(); void* FileHandle = Workload->TempFile.Detach(); ZEN_ASSERT(FileHandle != nullptr); IoBuffer Payload(IoBuffer::File, FileHandle, 0, PayloadSize, true); Payload.SetDeleteOnClose(true); OnDownloadComplete(std::move(Payload)); } } }); if (!WorkItems.empty()) { DownloadStats.MultipartAttachmentCount++; } for (auto& WorkItem : WorkItems) { Work.ScheduleWork( NetworkPool, // GetSyncWorkerPool(),// [WorkItem = std::move(WorkItem)](std::atomic&) { ZEN_TRACE_CPU("DownloadLargeBlob_Work"); if (!AbortFlag) { WorkItem(); } }, Work.DefaultErrorFunction()); } } struct ValidateStatistics { uint64_t BuildBlobSize = 0; uint64_t BuildPartSize = 0; uint64_t ChunkAttachmentCount = 0; uint64_t BlockAttachmentCount = 0; std::atomic VerifiedAttachmentCount = 0; std::atomic VerifiedByteCount = 0; uint64_t ElapsedWallTimeUS = 0; }; void ValidateBuildPart(BuildStorage& Storage, const Oid& BuildId, Oid BuildPartId, const std::string_view BuildPartName, ValidateStatistics& ValidateStats, DownloadStatistics& DownloadStats) { Stopwatch Timer; auto _ = MakeGuard([&]() { ZEN_CONSOLE("Validated build part {}/{} ('{}') in {}", BuildId, BuildPartId, BuildPartName, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); CbObject Build = Storage.GetBuild(BuildId); if (!BuildPartName.empty()) { BuildPartId = Build["parts"sv].AsObjectView()[BuildPartName].AsObjectId(); if (BuildPartId == Oid::Zero) { throw std::runtime_error(fmt::format("Build {} does not have a part named '{}'", BuildId, BuildPartName)); } } ValidateStats.BuildBlobSize = Build.GetSize(); uint64_t PreferredMultipartChunkSize = DefaultPreferredMultipartChunkSize; if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0) { PreferredMultipartChunkSize = ChunkSize; } CbObject BuildPart = Storage.GetBuildPart(BuildId, BuildPartId); ValidateStats.BuildPartSize = BuildPart.GetSize(); ZEN_CONSOLE("Validating build part {}/{} ({})", BuildId, BuildPartId, NiceBytes(BuildPart.GetSize())); std::vector ChunkAttachments; for (CbFieldView LooseFileView : BuildPart["chunkAttachments"sv].AsObjectView()["rawHashes"sv]) { ChunkAttachments.push_back(LooseFileView.AsBinaryAttachment()); } ValidateStats.ChunkAttachmentCount = ChunkAttachments.size(); std::vector BlockAttachments; for (CbFieldView BlocksView : BuildPart["blockAttachments"sv].AsObjectView()["rawHashes"sv]) { BlockAttachments.push_back(BlocksView.AsBinaryAttachment()); } ValidateStats.BlockAttachmentCount = BlockAttachments.size(); std::vector VerifyBlockDescriptions = Storage.GetBlockMetadata(BuildId, BlockAttachments); if (VerifyBlockDescriptions.size() != BlockAttachments.size()) { throw std::runtime_error(fmt::format("Uploaded blocks metadata could not all be found, {} blocks metadata is missing", BlockAttachments.size() - VerifyBlockDescriptions.size())); } WorkerThreadPool& NetworkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // WorkerThreadPool& VerifyPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // ParallellWork Work(AbortFlag); const std::filesystem::path TempFolder = ".zen-tmp"; CreateDirectories(TempFolder); auto __ = MakeGuard([&TempFolder]() { if (CleanDirectory(TempFolder, {})) { std::filesystem::remove(TempFolder); } }); ProgressBar ProgressBar(UsePlainProgress); uint64_t AttachmentsToVerifyCount = ChunkAttachments.size() + BlockAttachments.size(); FilteredRate FilteredDownloadedBytesPerSecond; FilteredRate FilteredVerifiedBytesPerSecond; std::atomic MultipartAttachmentCount = 0; for (const IoHash& ChunkAttachment : ChunkAttachments) { Work.ScheduleWork( NetworkPool, [&, ChunkAttachment](std::atomic&) { if (!AbortFlag) { ZEN_TRACE_CPU("ValidateBuildPart_GetChunk"); FilteredDownloadedBytesPerSecond.Start(); DownloadLargeBlob(Storage, TempFolder, BuildId, ChunkAttachment, PreferredMultipartChunkSize, Work, NetworkPool, DownloadStats, [&, ChunkHash = ChunkAttachment](IoBuffer&& Payload) { Payload.SetContentType(ZenContentType::kCompressedBinary); if (!AbortFlag) { Work.ScheduleWork( VerifyPool, [&, Payload = std::move(Payload), ChunkHash](std::atomic&) mutable { if (!AbortFlag) { ZEN_TRACE_CPU("ValidateBuildPart_Validate"); if (DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount == AttachmentsToVerifyCount) { FilteredDownloadedBytesPerSecond.Stop(); } FilteredVerifiedBytesPerSecond.Start(); uint64_t CompressedSize; uint64_t DecompressedSize; ValidateBlob(std::move(Payload), ChunkHash, CompressedSize, DecompressedSize); ZEN_CONSOLE_VERBOSE("Chunk attachment {} ({} -> {}) is valid", ChunkHash, NiceBytes(CompressedSize), NiceBytes(DecompressedSize)); ValidateStats.VerifiedAttachmentCount++; ValidateStats.VerifiedByteCount += DecompressedSize; if (ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount) { FilteredVerifiedBytesPerSecond.Stop(); } } }, Work.DefaultErrorFunction()); } }); } }, Work.DefaultErrorFunction()); } for (const IoHash& BlockAttachment : BlockAttachments) { Work.ScheduleWork( NetworkPool, [&, BlockAttachment](std::atomic&) { if (!AbortFlag) { ZEN_TRACE_CPU("ValidateBuildPart_GetBlock"); FilteredDownloadedBytesPerSecond.Start(); IoBuffer Payload = Storage.GetBuildBlob(BuildId, BlockAttachment); DownloadStats.DownloadedBlockCount++; DownloadStats.DownloadedBlockByteCount += Payload.GetSize(); if (DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount == AttachmentsToVerifyCount) { FilteredDownloadedBytesPerSecond.Stop(); } if (!Payload) { throw std::runtime_error(fmt::format("Block attachment {} could not be found", BlockAttachment)); } if (!AbortFlag) { Work.ScheduleWork( VerifyPool, [&, Payload = std::move(Payload), BlockAttachment](std::atomic&) mutable { if (!AbortFlag) { ZEN_TRACE_CPU("ValidateBuildPart_ValidateBlock"); FilteredVerifiedBytesPerSecond.Start(); uint64_t CompressedSize; uint64_t DecompressedSize; ValidateChunkBlock(std::move(Payload), BlockAttachment, CompressedSize, DecompressedSize); ZEN_CONSOLE_VERBOSE("Chunk block {} ({} -> {}) is valid", BlockAttachment, NiceBytes(CompressedSize), NiceBytes(DecompressedSize)); ValidateStats.VerifiedAttachmentCount++; ValidateStats.VerifiedByteCount += DecompressedSize; if (ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount) { FilteredVerifiedBytesPerSecond.Stop(); } } }, Work.DefaultErrorFunction()); } } }, Work.DefaultErrorFunction()); } Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted, PendingWork); const uint64_t DownloadedAttachmentCount = DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount; const uint64_t DownloadedByteCount = DownloadStats.DownloadedChunkByteCount + DownloadStats.DownloadedBlockByteCount; FilteredDownloadedBytesPerSecond.Update(DownloadedByteCount); FilteredVerifiedBytesPerSecond.Update(ValidateStats.VerifiedByteCount); std::string Details = fmt::format("Downloaded {}/{} ({}, {}bits/s). Verified {}/{} ({}, {}B/s)", DownloadedAttachmentCount, AttachmentsToVerifyCount, NiceBytes(DownloadedByteCount), NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8), ValidateStats.VerifiedAttachmentCount.load(), AttachmentsToVerifyCount, NiceBytes(ValidateStats.VerifiedByteCount.load()), NiceNum(FilteredVerifiedBytesPerSecond.GetCurrent())); ProgressBar.UpdateState( {.Task = "Validating blobs ", .Details = Details, .TotalCount = gsl::narrow(AttachmentsToVerifyCount * 2), .RemainingCount = gsl::narrow(AttachmentsToVerifyCount * 2 - (DownloadedAttachmentCount + ValidateStats.VerifiedAttachmentCount.load()))}, false); }); ProgressBar.Finish(); ValidateStats.ElapsedWallTimeUS = Timer.GetElapsedTimeUs(); } void ArrangeChunksIntoBlocks(const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, uint64_t MaxBlockSize, std::vector& ChunkIndexes, std::vector>& OutBlocks) { ZEN_TRACE_CPU("ArrangeChunksIntoBlocks"); std::sort(ChunkIndexes.begin(), ChunkIndexes.end(), [&Content, &Lookup](uint32_t Lhs, uint32_t Rhs) { const ChunkedContentLookup::ChunkSequenceLocation& LhsLocation = GetChunkSequenceLocations(Lookup, Lhs)[0]; const ChunkedContentLookup::ChunkSequenceLocation& RhsLocation = GetChunkSequenceLocations(Lookup, Rhs)[0]; if (LhsLocation.SequenceIndex < RhsLocation.SequenceIndex) { return true; } else if (LhsLocation.SequenceIndex > RhsLocation.SequenceIndex) { return false; } return LhsLocation.Offset < RhsLocation.Offset; }); uint64_t MaxBlockSizeLowThreshold = MaxBlockSize - (MaxBlockSize / 16); uint64_t BlockSize = 0; uint32_t ChunkIndexStart = 0; for (uint32_t ChunkIndexOffset = 0; ChunkIndexOffset < ChunkIndexes.size();) { const uint32_t ChunkIndex = ChunkIndexes[ChunkIndexOffset]; const uint64_t ChunkSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; if ((BlockSize + ChunkSize) > MaxBlockSize) { // Within the span of MaxBlockSizeLowThreshold and MaxBlockSize, see if there is a break // between source paths for chunks. Break the block at the last such break if any. ZEN_ASSERT(ChunkIndexOffset > ChunkIndexStart); const uint32_t ChunkSequenceIndex = Lookup.ChunkSequenceLocations[Lookup.ChunkSequenceLocationOffset[ChunkIndex]].SequenceIndex; uint64_t ScanBlockSize = BlockSize; uint32_t ScanChunkIndexOffset = ChunkIndexOffset - 1; while (ScanChunkIndexOffset > (ChunkIndexStart + 2)) { const uint32_t TestChunkIndex = ChunkIndexes[ScanChunkIndexOffset]; const uint64_t TestChunkSize = Content.ChunkedContent.ChunkRawSizes[TestChunkIndex]; if ((ScanBlockSize - TestChunkSize) < MaxBlockSizeLowThreshold) { break; } const uint32_t TestSequenceIndex = Lookup.ChunkSequenceLocations[Lookup.ChunkSequenceLocationOffset[TestChunkIndex]].SequenceIndex; if (ChunkSequenceIndex != TestSequenceIndex) { ChunkIndexOffset = ScanChunkIndexOffset + 1; break; } ScanBlockSize -= TestChunkSize; ScanChunkIndexOffset--; } std::vector ChunksInBlock; ChunksInBlock.reserve(ChunkIndexOffset - ChunkIndexStart); for (uint32_t AddIndexOffset = ChunkIndexStart; AddIndexOffset < ChunkIndexOffset; AddIndexOffset++) { const uint32_t AddChunkIndex = ChunkIndexes[AddIndexOffset]; ChunksInBlock.push_back(AddChunkIndex); } OutBlocks.emplace_back(std::move(ChunksInBlock)); BlockSize = 0; ChunkIndexStart = ChunkIndexOffset; } else { ChunkIndexOffset++; BlockSize += ChunkSize; } } if (ChunkIndexStart < ChunkIndexes.size()) { std::vector ChunksInBlock; ChunksInBlock.reserve(ChunkIndexes.size() - ChunkIndexStart); for (uint32_t AddIndexOffset = ChunkIndexStart; AddIndexOffset < ChunkIndexes.size(); AddIndexOffset++) { const uint32_t AddChunkIndex = ChunkIndexes[AddIndexOffset]; ChunksInBlock.push_back(AddChunkIndex); } OutBlocks.emplace_back(std::move(ChunksInBlock)); } } CompositeBuffer CompressChunk(const std::filesystem::path& Path, const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, uint32_t ChunkIndex, const std::filesystem::path& TempFolderPath, std::atomic& ReadRawBytes, LooseChunksStatistics& LooseChunksStats) { ZEN_TRACE_CPU("CompressChunk"); ZEN_ASSERT(!TempFolderPath.empty()); const IoHash& ChunkHash = Content.ChunkedContent.ChunkHashes[ChunkIndex]; const uint64_t ChunkSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; const ChunkedContentLookup::ChunkSequenceLocation& Source = GetChunkSequenceLocations(Lookup, ChunkIndex)[0]; const std::uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[Source.SequenceIndex]; IoBuffer RawSource = IoBufferBuilder::MakeFromFile((Path / Content.Paths[PathIndex]).make_preferred(), Source.Offset, ChunkSize); if (!RawSource) { throw std::runtime_error(fmt::format("Failed fetching chunk {}", ChunkHash)); } if (RawSource.GetSize() != ChunkSize) { throw std::runtime_error(fmt::format("Fetched chunk {} has invalid size", ChunkHash)); } ZEN_ASSERT_SLOW(IoHash::HashBuffer(RawSource) == ChunkHash); { std::filesystem::path TempFilePath = (TempFolderPath / ChunkHash.ToHexString()).make_preferred(); BasicFile CompressedFile; std::error_code Ec; CompressedFile.Open(TempFilePath, BasicFile::Mode::kTruncate, Ec); if (Ec) { throw std::runtime_error( fmt::format("Failed creating temporary file for compressing blob {}. Reason: {}", ChunkHash, Ec.message())); } bool CouldCompress = CompressedBuffer::CompressToStream( CompositeBuffer(SharedBuffer(RawSource)), [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { ZEN_UNUSED(SourceOffset); ReadRawBytes += SourceSize; CompressedFile.Write(RangeBuffer, Offset); LooseChunksStats.CompressedChunkBytes += RangeBuffer.GetSize(); }); if (CouldCompress) { uint64_t CompressedSize = CompressedFile.FileSize(); void* FileHandle = CompressedFile.Detach(); IoBuffer TempPayload = IoBuffer(IoBuffer::File, FileHandle, 0, CompressedSize, /*IsWholeFile*/ true); ZEN_ASSERT(TempPayload); TempPayload.SetDeleteOnClose(true); IoHash RawHash; uint64_t RawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(TempPayload), RawHash, RawSize); ZEN_ASSERT(Compressed); ZEN_ASSERT(RawHash == ChunkHash); ZEN_ASSERT(RawSize == ChunkSize); LooseChunksStats.CompressedChunkCount++; return Compressed.GetCompressed(); } CompressedFile.Close(); std::filesystem::remove(TempFilePath, Ec); ZEN_UNUSED(Ec); } // Try regular compress - decompress may fail if compressed data is larger than non-compressed CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(RawSource))); if (!CompressedBlob) { throw std::runtime_error(fmt::format("Failed to compress large blob {}", ChunkHash)); } if (!IsBufferDiskBased(CompressedBlob.GetCompressed())) { IoBuffer TempPayload = WriteToTempFile(std::move(CompressedBlob).GetCompressed(), TempFolderPath, ChunkHash); CompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(TempPayload)); } return std::move(CompressedBlob).GetCompressed(); } struct GeneratedBlocks { std::vector BlockDescriptions; std::vector BlockSizes; std::vector BlockHeaders; std::vector BlockMetaDatas; std::vector MetaDataHasBeenUploaded; tsl::robin_map BlockHashToBlockIndex; }; void GenerateBuildBlocks(const std::filesystem::path& Path, const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, BuildStorage& Storage, const Oid& BuildId, const std::vector>& NewBlockChunks, GeneratedBlocks& OutBlocks, DiskStatistics& DiskStats, UploadStatistics& UploadStats, GenerateBlocksStatistics& GenerateBlocksStats) { ZEN_TRACE_CPU("GenerateBuildBlocks"); const std::size_t NewBlockCount = NewBlockChunks.size(); if (NewBlockCount > 0) { ProgressBar ProgressBar(UsePlainProgress); OutBlocks.BlockDescriptions.resize(NewBlockCount); OutBlocks.BlockSizes.resize(NewBlockCount); OutBlocks.BlockMetaDatas.resize(NewBlockCount); OutBlocks.BlockHeaders.resize(NewBlockCount); OutBlocks.MetaDataHasBeenUploaded.resize(NewBlockCount, false); OutBlocks.BlockHashToBlockIndex.reserve(NewBlockCount); RwLock Lock; WorkerThreadPool& GenerateBlobsPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool();// WorkerThreadPool& UploadBlocksPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool();// FilteredRate FilteredGeneratedBytesPerSecond; FilteredRate FilteredUploadedBytesPerSecond; ParallellWork Work(AbortFlag); std::atomic QueuedPendingBlocksForUpload = 0; for (size_t BlockIndex = 0; BlockIndex < NewBlockCount; BlockIndex++) { if (Work.IsAborted()) { break; } const std::vector& ChunksInBlock = NewBlockChunks[BlockIndex]; Work.ScheduleWork( GenerateBlobsPool, [&, BlockIndex](std::atomic&) { if (!AbortFlag) { ZEN_TRACE_CPU("GenerateBuildBlocks_Generate"); FilteredGeneratedBytesPerSecond.Start(); // TODO: Convert ScheduleWork body to function CompressedBuffer CompressedBlock = GenerateBlock(Path, Content, Lookup, ChunksInBlock, OutBlocks.BlockDescriptions[BlockIndex], DiskStats); ZEN_CONSOLE_VERBOSE("Generated block {} ({}) containing {} chunks", OutBlocks.BlockDescriptions[BlockIndex].BlockHash, NiceBytes(CompressedBlock.GetCompressedSize()), OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); OutBlocks.BlockSizes[BlockIndex] = CompressedBlock.GetCompressedSize(); { CbObjectWriter Writer; Writer.AddString("createdBy", "zen"); OutBlocks.BlockMetaDatas[BlockIndex] = Writer.Save(); } GenerateBlocksStats.GeneratedBlockByteCount += OutBlocks.BlockSizes[BlockIndex]; GenerateBlocksStats.GeneratedBlockCount++; Lock.WithExclusiveLock([&]() { OutBlocks.BlockHashToBlockIndex.insert_or_assign(OutBlocks.BlockDescriptions[BlockIndex].BlockHash, BlockIndex); }); { std::span Segments = CompressedBlock.GetCompressed().GetSegments(); ZEN_ASSERT(Segments.size() >= 2); OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]); } if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount) { FilteredGeneratedBytesPerSecond.Stop(); } if (QueuedPendingBlocksForUpload.load() > 16) { std::span Segments = CompressedBlock.GetCompressed().GetSegments(); ZEN_ASSERT(Segments.size() >= 2); OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]); } else { if (!AbortFlag) { QueuedPendingBlocksForUpload++; Work.ScheduleWork( UploadBlocksPool, [&, BlockIndex, Payload = std::move(CompressedBlock)](std::atomic&) mutable { auto _ = MakeGuard([&QueuedPendingBlocksForUpload] { QueuedPendingBlocksForUpload--; }); if (!AbortFlag) { if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount) { ZEN_TRACE_CPU("GenerateBuildBlocks_Save"); FilteredUploadedBytesPerSecond.Stop(); std::span Segments = Payload.GetCompressed().GetSegments(); ZEN_ASSERT(Segments.size() >= 2); OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]); } else { ZEN_TRACE_CPU("GenerateBuildBlocks_Upload"); FilteredUploadedBytesPerSecond.Start(); // TODO: Convert ScheduleWork body to function const CbObject BlockMetaData = BuildChunkBlockDescription(OutBlocks.BlockDescriptions[BlockIndex], OutBlocks.BlockMetaDatas[BlockIndex]); const IoHash& BlockHash = OutBlocks.BlockDescriptions[BlockIndex].BlockHash; const uint64_t CompressedBlockSize = Payload.GetCompressedSize(); Storage.PutBuildBlob(BuildId, BlockHash, ZenContentType::kCompressedBinary, std::move(Payload).GetCompressed()); UploadStats.BlocksBytes += CompressedBlockSize; ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks", OutBlocks.BlockDescriptions[BlockIndex].BlockHash, NiceBytes(CompressedBlockSize), OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); Storage.PutBlockMetadata(BuildId, OutBlocks.BlockDescriptions[BlockIndex].BlockHash, BlockMetaData); ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})", OutBlocks.BlockDescriptions[BlockIndex].BlockHash, NiceBytes(BlockMetaData.GetSize())); OutBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; UploadStats.BlocksBytes += BlockMetaData.GetSize(); UploadStats.BlockCount++; if (UploadStats.BlockCount == NewBlockCount) { FilteredUploadedBytesPerSecond.Stop(); } } } }, Work.DefaultErrorFunction()); } } } }, Work.DefaultErrorFunction()); } Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted, PendingWork); FilteredGeneratedBytesPerSecond.Update(GenerateBlocksStats.GeneratedBlockByteCount.load()); FilteredUploadedBytesPerSecond.Update(UploadStats.BlocksBytes.load()); std::string Details = fmt::format("Generated {}/{} ({}, {}B/s). Uploaded {}/{} ({}, {}bits/s)", GenerateBlocksStats.GeneratedBlockCount.load(), NewBlockCount, NiceBytes(GenerateBlocksStats.GeneratedBlockByteCount.load()), NiceNum(FilteredGeneratedBytesPerSecond.GetCurrent()), UploadStats.BlockCount.load(), NewBlockCount, NiceBytes(UploadStats.BlocksBytes.load()), NiceNum(FilteredUploadedBytesPerSecond.GetCurrent() * 8)); ProgressBar.UpdateState( {.Task = "Generating blocks", .Details = Details, .TotalCount = gsl::narrow(NewBlockCount), .RemainingCount = gsl::narrow(NewBlockCount - GenerateBlocksStats.GeneratedBlockCount.load())}, false); }); ZEN_ASSERT(AbortFlag || QueuedPendingBlocksForUpload.load() == 0); ProgressBar.Finish(); GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGeneratedBytesPerSecond.GetElapsedTimeUS(); UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS(); } } void UploadPartBlobs(BuildStorage& Storage, const Oid& BuildId, const std::filesystem::path& Path, const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, std::span RawHashes, const std::vector>& NewBlockChunks, GeneratedBlocks& NewBlocks, std::span LooseChunkIndexes, const std::uint64_t LargeAttachmentSize, DiskStatistics& DiskStats, UploadStatistics& UploadStats, LooseChunksStatistics& LooseChunksStats) { ZEN_TRACE_CPU("UploadPartBlobs"); { ProgressBar ProgressBar(UsePlainProgress); WorkerThreadPool& ReadChunkPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // WorkerThreadPool& UploadChunkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // FilteredRate FilteredGenerateBlockBytesPerSecond; FilteredRate FilteredCompressedBytesPerSecond; FilteredRate FilteredUploadedBytesPerSecond; ParallellWork Work(AbortFlag); std::atomic UploadedBlockSize = 0; std::atomic UploadedBlockCount = 0; std::atomic UploadedRawChunkSize = 0; std::atomic UploadedCompressedChunkSize = 0; std::atomic UploadedChunkCount = 0; tsl::robin_map ChunkIndexToLooseChunkOrderIndex; ChunkIndexToLooseChunkOrderIndex.reserve(LooseChunkIndexes.size()); for (uint32_t OrderIndex = 0; OrderIndex < LooseChunkIndexes.size(); OrderIndex++) { ChunkIndexToLooseChunkOrderIndex.insert_or_assign(LooseChunkIndexes[OrderIndex], OrderIndex); } std::vector BlockIndexes; std::vector LooseChunkOrderIndexes; uint64_t TotalLooseChunksSize = 0; uint64_t TotalBlocksSize = 0; for (const IoHash& RawHash : RawHashes) { if (auto It = NewBlocks.BlockHashToBlockIndex.find(RawHash); It != NewBlocks.BlockHashToBlockIndex.end()) { BlockIndexes.push_back(It->second); TotalBlocksSize += NewBlocks.BlockSizes[It->second]; } if (auto ChunkIndexIt = Lookup.ChunkHashToChunkIndex.find(RawHash); ChunkIndexIt != Lookup.ChunkHashToChunkIndex.end()) { const uint32_t ChunkIndex = ChunkIndexIt->second; if (auto LooseOrderIndexIt = ChunkIndexToLooseChunkOrderIndex.find(ChunkIndex); LooseOrderIndexIt != ChunkIndexToLooseChunkOrderIndex.end()) { LooseChunkOrderIndexes.push_back(LooseOrderIndexIt->second); TotalLooseChunksSize += Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; } } } uint64_t TotalRawSize = TotalLooseChunksSize + TotalBlocksSize; const size_t UploadBlockCount = BlockIndexes.size(); const uint32_t UploadChunkCount = gsl::narrow(LooseChunkOrderIndexes.size()); auto AsyncUploadBlock = [&](const size_t BlockIndex, const IoHash BlockHash, CompositeBuffer&& Payload, std::atomic& QueuedPendingInMemoryBlocksForUpload) { bool IsInMemoryBlock = true; if (QueuedPendingInMemoryBlocksForUpload.load() > 16) { ZEN_TRACE_CPU("AsyncUploadBlock_WriteTempBlock"); Payload = CompositeBuffer(WriteToTempFile(std::move(Payload), Path / ZenTempBlockFolderName, BlockHash)); IsInMemoryBlock = false; } else { QueuedPendingInMemoryBlocksForUpload++; } Work.ScheduleWork( UploadChunkPool, [&, IsInMemoryBlock, BlockIndex, BlockHash, Payload = std::move(Payload)](std::atomic&) mutable { auto _ = MakeGuard([IsInMemoryBlock, &QueuedPendingInMemoryBlocksForUpload] { if (IsInMemoryBlock) { QueuedPendingInMemoryBlocksForUpload--; } }); if (!AbortFlag) { ZEN_TRACE_CPU("AsyncUploadBlock"); const uint64_t PayloadSize = Payload.GetSize(); FilteredUploadedBytesPerSecond.Start(); const CbObject BlockMetaData = BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]); Storage.PutBuildBlob(BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload); ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks", NewBlocks.BlockDescriptions[BlockIndex].BlockHash, NiceBytes(PayloadSize), NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); UploadedBlockSize += PayloadSize; UploadStats.BlocksBytes += PayloadSize; Storage.PutBlockMetadata(BuildId, BlockHash, BlockMetaData); ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})", NewBlocks.BlockDescriptions[BlockIndex].BlockHash, NiceBytes(BlockMetaData.GetSize())); NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; UploadStats.BlockCount++; UploadStats.BlocksBytes += BlockMetaData.GetSize(); UploadedBlockCount++; if (UploadedBlockCount == UploadBlockCount && UploadedChunkCount == UploadChunkCount) { FilteredUploadedBytesPerSecond.Stop(); } } }, Work.DefaultErrorFunction()); }; auto AsyncUploadLooseChunk = [&](const IoHash& RawHash, uint64_t RawSize, CompositeBuffer&& Payload) { Work.ScheduleWork( UploadChunkPool, [&, RawHash, RawSize, Payload = CompositeBuffer(std::move(Payload))](std::atomic&) mutable { if (!AbortFlag) { ZEN_TRACE_CPU("AsyncUploadLooseChunk"); const uint64_t PayloadSize = Payload.GetSize(); ; if (PayloadSize >= LargeAttachmentSize) { ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart"); UploadStats.MultipartAttachmentCount++; std::vector> MultipartWork = Storage.PutLargeBuildBlob( BuildId, RawHash, ZenContentType::kCompressedBinary, PayloadSize, [Payload = std::move(Payload), &FilteredUploadedBytesPerSecond](uint64_t Offset, uint64_t Size) mutable -> IoBuffer { FilteredUploadedBytesPerSecond.Start(); IoBuffer PartPayload = Payload.Mid(Offset, Size).Flatten().AsIoBuffer(); PartPayload.SetContentType(ZenContentType::kBinary); return PartPayload; }, [&, RawSize](uint64_t SentBytes, bool IsComplete) { UploadStats.ChunksBytes += SentBytes; UploadedCompressedChunkSize += SentBytes; if (IsComplete) { UploadStats.ChunkCount++; UploadedChunkCount++; if (UploadedBlockCount == UploadBlockCount && UploadedChunkCount == UploadChunkCount) { FilteredUploadedBytesPerSecond.Stop(); } UploadedRawChunkSize += RawSize; } }); for (auto& WorkPart : MultipartWork) { Work.ScheduleWork( UploadChunkPool, [Work = std::move(WorkPart)](std::atomic&) { ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart_Work"); if (!AbortFlag) { Work(); } }, Work.DefaultErrorFunction()); } ZEN_CONSOLE_VERBOSE("Uploaded multipart chunk {} ({})", RawHash, NiceBytes(PayloadSize)); } else { ZEN_TRACE_CPU("AsyncUploadLooseChunk_Singlepart"); Storage.PutBuildBlob(BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); ZEN_CONSOLE_VERBOSE("Uploaded chunk {} ({})", RawHash, NiceBytes(PayloadSize)); UploadStats.ChunksBytes += Payload.GetSize(); UploadStats.ChunkCount++; UploadedCompressedChunkSize += Payload.GetSize(); UploadedRawChunkSize += RawSize; UploadedChunkCount++; if (UploadedChunkCount == UploadChunkCount) { FilteredUploadedBytesPerSecond.Stop(); } } } }, Work.DefaultErrorFunction()); }; std::vector GenerateBlockIndexes; std::atomic GeneratedBlockCount = 0; std::atomic GeneratedBlockByteCount = 0; std::vector CompressLooseChunkOrderIndexes; std::atomic QueuedPendingInMemoryBlocksForUpload = 0; // Start upload of any pre-compressed loose chunks for (const uint32_t LooseChunkOrderIndex : LooseChunkOrderIndexes) { CompressLooseChunkOrderIndexes.push_back(LooseChunkOrderIndex); } // Start generation of any non-prebuilt blocks and schedule upload for (const size_t BlockIndex : BlockIndexes) { const IoHash& BlockHash = NewBlocks.BlockDescriptions[BlockIndex].BlockHash; if (!AbortFlag) { Work.ScheduleWork( ReadChunkPool, // GetSyncWorkerPool() [&, BlockIndex](std::atomic&) { if (!AbortFlag) { ZEN_TRACE_CPU("UploadPartBlobs_GenerateBlock"); FilteredGenerateBlockBytesPerSecond.Start(); CompositeBuffer Payload; if (NewBlocks.BlockHeaders[BlockIndex]) { Payload = RebuildBlock(Path, Content, Lookup, std::move(NewBlocks.BlockHeaders[BlockIndex]), NewBlockChunks[BlockIndex], DiskStats) .GetCompressed(); } else { ChunkBlockDescription BlockDescription; CompressedBuffer CompressedBlock = GenerateBlock(Path, Content, Lookup, NewBlockChunks[BlockIndex], BlockDescription, DiskStats); if (!CompressedBlock) { throw std::runtime_error(fmt::format("Failed generating block {}", BlockHash)); } ZEN_ASSERT(BlockDescription.BlockHash == BlockHash); Payload = std::move(CompressedBlock).GetCompressed(); } GeneratedBlockByteCount += NewBlocks.BlockSizes[BlockIndex]; GeneratedBlockCount++; if (GeneratedBlockCount == GenerateBlockIndexes.size()) { FilteredGenerateBlockBytesPerSecond.Stop(); } if (!AbortFlag) { AsyncUploadBlock(BlockIndex, BlockHash, std::move(Payload), QueuedPendingInMemoryBlocksForUpload); } ZEN_CONSOLE_VERBOSE("Regenerated block {} ({}) containing {} chunks", NewBlocks.BlockDescriptions[BlockIndex].BlockHash, NiceBytes(NewBlocks.BlockSizes[BlockIndex]), NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); } }, Work.DefaultErrorFunction()); } } std::atomic RawLooseChunkByteCount = 0; // Start compression of any non-precompressed loose chunks and schedule upload for (const uint32_t CompressLooseChunkOrderIndex : CompressLooseChunkOrderIndexes) { const uint32_t ChunkIndex = LooseChunkIndexes[CompressLooseChunkOrderIndex]; Work.ScheduleWork( ReadChunkPool, // GetSyncWorkerPool(),// ReadChunkPool, [&, ChunkIndex](std::atomic&) { if (!AbortFlag) { ZEN_TRACE_CPU("UploadPartBlobs_CompressChunk"); FilteredCompressedBytesPerSecond.Start(); CompositeBuffer Payload = CompressChunk(Path, Content, Lookup, ChunkIndex, Path / ZenTempChunkFolderName, RawLooseChunkByteCount, LooseChunksStats); ZEN_CONSOLE_VERBOSE("Compressed chunk {} ({} -> {})", Content.ChunkedContent.ChunkHashes[ChunkIndex], NiceBytes(Content.ChunkedContent.ChunkRawSizes[ChunkIndex]), NiceBytes(Payload.GetSize())); const uint64_t ChunkRawSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; UploadStats.ReadFromDiskBytes += ChunkRawSize; if (LooseChunksStats.CompressedChunkCount == CompressLooseChunkOrderIndexes.size()) { FilteredCompressedBytesPerSecond.Stop(); } if (!AbortFlag) { AsyncUploadLooseChunk(Content.ChunkedContent.ChunkHashes[ChunkIndex], ChunkRawSize, std::move(Payload)); } } }, Work.DefaultErrorFunction()); } Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted, PendingWork); FilteredCompressedBytesPerSecond.Update(LooseChunksStats.CompressedChunkBytes.load()); FilteredGenerateBlockBytesPerSecond.Update(GeneratedBlockByteCount.load()); FilteredUploadedBytesPerSecond.Update(UploadedCompressedChunkSize.load() + UploadedBlockSize.load()); uint64_t UploadedRawSize = UploadedRawChunkSize.load() + UploadedBlockSize.load(); uint64_t UploadedCompressedSize = UploadedCompressedChunkSize.load() + UploadedBlockSize.load(); std::string Details = fmt::format( "Compressed {}/{} ({}/{} {}B/s) chunks. " "Uploaded {}/{} ({}/{}) blobs " "({} {}bits/s)", LooseChunksStats.CompressedChunkCount.load(), CompressLooseChunkOrderIndexes.size(), NiceBytes(RawLooseChunkByteCount), NiceBytes(TotalLooseChunksSize), NiceNum(FilteredCompressedBytesPerSecond.GetCurrent()), UploadedBlockCount.load() + UploadedChunkCount.load(), UploadBlockCount + UploadChunkCount, NiceBytes(UploadedRawSize), NiceBytes(TotalRawSize), NiceBytes(UploadedCompressedSize), NiceNum(FilteredUploadedBytesPerSecond.GetCurrent())); ProgressBar.UpdateState({.Task = "Uploading blobs ", .Details = Details, .TotalCount = gsl::narrow(TotalRawSize), .RemainingCount = gsl::narrow(TotalRawSize - UploadedRawSize)}, false); }); ZEN_ASSERT(AbortFlag || QueuedPendingInMemoryBlocksForUpload.load() == 0); ProgressBar.Finish(); UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS(); LooseChunksStats.CompressChunksElapsedWallTimeUS = FilteredCompressedBytesPerSecond.GetElapsedTimeUS(); } } std::vector FindReuseBlocks(const std::vector& KnownBlocks, std::span ChunkHashes, std::span ChunkIndexes, uint8_t MinPercentLimit, std::vector& OutUnusedChunkIndexes, FindBlocksStatistics& FindBlocksStats) { ZEN_TRACE_CPU("FindReuseBlocks"); // Find all blocks with a usage level higher than MinPercentLimit // Pick out the blocks with usage higher or equal to MinPercentLimit // Sort them with highest size usage - most usage first // Make a list of all chunks and mark them as not found // For each block, recalculate the block has usage percent based on the chunks marked as not found // If the block still reaches MinPercentLimit, keep it and remove the matching chunks from the not found list // Repeat for following all remaining block that initially matched MinPercentLimit std::vector FilteredReuseBlockIndexes; uint32_t ChunkCount = gsl::narrow(ChunkHashes.size()); std::vector ChunkFound(ChunkCount, false); if (ChunkCount > 0) { if (!KnownBlocks.empty()) { Stopwatch ReuseTimer; tsl::robin_map ChunkHashToChunkIndex; ChunkHashToChunkIndex.reserve(ChunkIndexes.size()); for (uint32_t ChunkIndex : ChunkIndexes) { ChunkHashToChunkIndex.insert_or_assign(ChunkHashes[ChunkIndex], ChunkIndex); } std::vector BlockSizes(KnownBlocks.size(), 0); std::vector BlockUseSize(KnownBlocks.size(), 0); std::vector ReuseBlockIndexes; for (size_t KnownBlockIndex = 0; KnownBlockIndex < KnownBlocks.size(); KnownBlockIndex++) { const ChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex]; if (KnownBlock.BlockHash != IoHash::Zero && KnownBlock.ChunkRawHashes.size() == KnownBlock.ChunkCompressedLengths.size()) { size_t BlockAttachmentCount = KnownBlock.ChunkRawHashes.size(); if (BlockAttachmentCount == 0) { continue; } size_t ReuseSize = 0; size_t BlockSize = 0; size_t FoundAttachmentCount = 0; size_t BlockChunkCount = KnownBlock.ChunkRawHashes.size(); for (size_t BlockChunkIndex = 0; BlockChunkIndex < BlockChunkCount; BlockChunkIndex++) { const IoHash& BlockChunkHash = KnownBlock.ChunkRawHashes[BlockChunkIndex]; const uint32_t BlockChunkSize = KnownBlock.ChunkCompressedLengths[BlockChunkIndex]; BlockSize += BlockChunkSize; if (ChunkHashToChunkIndex.contains(BlockChunkHash)) { ReuseSize += BlockChunkSize; FoundAttachmentCount++; } } size_t ReusePercent = (ReuseSize * 100) / BlockSize; if (ReusePercent >= MinPercentLimit) { ZEN_CONSOLE_VERBOSE("Reusing block {}. {} attachments found, usage level: {}%", KnownBlock.BlockHash, FoundAttachmentCount, ReusePercent); ReuseBlockIndexes.push_back(KnownBlockIndex); BlockSizes[KnownBlockIndex] = BlockSize; BlockUseSize[KnownBlockIndex] = ReuseSize; } else if (FoundAttachmentCount > 0) { ZEN_CONSOLE_VERBOSE("Skipping block {}. {} attachments found, usage level: {}%", KnownBlock.BlockHash, FoundAttachmentCount, ReusePercent); FindBlocksStats.RejectedBlockCount++; FindBlocksStats.RejectedChunkCount += FoundAttachmentCount; FindBlocksStats.RejectedByteCount += ReuseSize; } } } if (!ReuseBlockIndexes.empty()) { std::sort(ReuseBlockIndexes.begin(), ReuseBlockIndexes.end(), [&](size_t Lhs, size_t Rhs) { return BlockUseSize[Lhs] > BlockUseSize[Rhs]; }); for (size_t KnownBlockIndex : ReuseBlockIndexes) { std::vector FoundChunkIndexes; size_t BlockSize = 0; size_t AdjustedReuseSize = 0; const ChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex]; for (size_t BlockChunkIndex = 0; BlockChunkIndex < KnownBlock.ChunkRawHashes.size(); BlockChunkIndex++) { const IoHash& BlockChunkHash = KnownBlock.ChunkRawHashes[BlockChunkIndex]; const uint32_t BlockChunkSize = KnownBlock.ChunkCompressedLengths[BlockChunkIndex]; BlockSize += BlockChunkSize; if (auto It = ChunkHashToChunkIndex.find(BlockChunkHash); It != ChunkHashToChunkIndex.end()) { const uint32_t ChunkIndex = It->second; if (!ChunkFound[ChunkIndex]) { FoundChunkIndexes.push_back(ChunkIndex); AdjustedReuseSize += KnownBlock.ChunkCompressedLengths[BlockChunkIndex]; } } } size_t ReusePercent = (AdjustedReuseSize * 100) / BlockSize; if (ReusePercent >= MinPercentLimit) { ZEN_CONSOLE_VERBOSE("Reusing block {}. {} attachments found, usage level: {}%", KnownBlock.BlockHash, FoundChunkIndexes.size(), ReusePercent); FilteredReuseBlockIndexes.push_back(KnownBlockIndex); for (uint32_t ChunkIndex : FoundChunkIndexes) { ChunkFound[ChunkIndex] = true; } FindBlocksStats.AcceptedChunkCount += FoundChunkIndexes.size(); FindBlocksStats.AcceptedByteCount += AdjustedReuseSize; FindBlocksStats.AcceptedReduntantChunkCount += KnownBlock.ChunkRawHashes.size() - FoundChunkIndexes.size(); FindBlocksStats.AcceptedReduntantByteCount += BlockSize - AdjustedReuseSize; } else { ZEN_CONSOLE_VERBOSE("Skipping block {}. filtered usage level: {}%", KnownBlock.BlockHash, ReusePercent); FindBlocksStats.RejectedBlockCount++; FindBlocksStats.RejectedChunkCount += FoundChunkIndexes.size(); FindBlocksStats.RejectedByteCount += AdjustedReuseSize; } } } } OutUnusedChunkIndexes.reserve(ChunkIndexes.size() - FindBlocksStats.AcceptedChunkCount); for (uint32_t ChunkIndex : ChunkIndexes) { if (!ChunkFound[ChunkIndex]) { OutUnusedChunkIndexes.push_back(ChunkIndex); } } } return FilteredReuseBlockIndexes; }; void UploadFolder(BuildStorage& Storage, const Oid& BuildId, const Oid& BuildPartId, const std::string_view BuildPartName, const std::filesystem::path& Path, const std::filesystem::path& ManifestPath, const uint8_t BlockReuseMinPercentLimit, bool AllowMultiparts, const CbObject& MetaData, bool CreateBuild, bool IgnoreExistingBlocks, bool PostUploadVerify) { Stopwatch ProcessTimer; const std::filesystem::path ZenTempFolder = Path / ZenTempFolderName; CreateDirectories(ZenTempFolder); CleanDirectory(ZenTempFolder, {}); auto _ = MakeGuard([&]() { if (CleanDirectory(ZenTempFolder, {})) { std::filesystem::remove(ZenTempFolder); } }); CreateDirectories(Path / ZenTempBlockFolderName); CreateDirectories(Path / ZenTempChunkFolderName); std::uint64_t TotalRawSize = 0; CbObject ChunkerParameters; struct PrepareBuildResult { std::vector KnownBlocks; uint64_t PreferredMultipartChunkSize = DefaultPreferredMultipartChunkSize; uint64_t PayloadSize = 0; uint64_t PrepareBuildTimeMs = 0; uint64_t FindBlocksTimeMs = 0; uint64_t ElapsedTimeMs = 0; }; FindBlocksStatistics FindBlocksStats; std::future PrepBuildResultFuture = GetSmallWorkerPool(EWorkloadType::Burst) .EnqueueTask(std::packaged_task{ [&Storage, BuildId, &MetaData, CreateBuild, AllowMultiparts, IgnoreExistingBlocks, &FindBlocksStats] { ZEN_TRACE_CPU("PrepareBuild"); PrepareBuildResult Result; Stopwatch Timer; if (CreateBuild) { ZEN_TRACE_CPU("CreateBuild"); Stopwatch PutBuildTimer; CbObject PutBuildResult = Storage.PutBuild(BuildId, MetaData); Result.PrepareBuildTimeMs = PutBuildTimer.GetElapsedTimeMs(); Result.PreferredMultipartChunkSize = PutBuildResult["chunkSize"sv].AsUInt64(Result.PreferredMultipartChunkSize); Result.PayloadSize = MetaData.GetSize(); } else { ZEN_TRACE_CPU("PutBuild"); Stopwatch GetBuildTimer; CbObject Build = Storage.GetBuild(BuildId); Result.PrepareBuildTimeMs = GetBuildTimer.GetElapsedTimeMs(); Result.PayloadSize = Build.GetSize(); if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0) { Result.PreferredMultipartChunkSize = ChunkSize; } else if (AllowMultiparts) { ZEN_WARN("PreferredMultipartChunkSize is unknown. Defaulting to '{}'", NiceBytes(Result.PreferredMultipartChunkSize)); } } if (!IgnoreExistingBlocks) { ZEN_TRACE_CPU("FindBlocks"); Stopwatch KnownBlocksTimer; Result.KnownBlocks = Storage.FindBlocks(BuildId); FindBlocksStats.FindBlockTimeMS = KnownBlocksTimer.GetElapsedTimeMs(); FindBlocksStats.FoundBlockCount = Result.KnownBlocks.size(); Result.FindBlocksTimeMs = KnownBlocksTimer.GetElapsedTimeMs(); } Result.ElapsedTimeMs = Timer.GetElapsedTimeMs(); return Result; }}); ChunkedFolderContent LocalContent; GetFolderContentStatistics LocalFolderScanStats; ChunkingStatistics ChunkingStats; { auto IsAcceptedFolder = [ExcludeFolders = DefaultExcludeFolders](const std::string_view& RelativePath) -> bool { for (const std::string_view& ExcludeFolder : ExcludeFolders) { if (RelativePath.starts_with(ExcludeFolder)) { if (RelativePath.length() == ExcludeFolder.length()) { return false; } else if (RelativePath[ExcludeFolder.length()] == '/') { return false; } } } return true; }; auto IsAcceptedFile = [ExcludeExtensions = DefaultExcludeExtensions](const std::string_view& RelativePath, uint64_t, uint32_t) -> bool { for (const std::string_view& ExcludeExtension : ExcludeExtensions) { if (RelativePath.ends_with(ExcludeExtension)) { return false; } } return true; }; auto ParseManifest = [](const std::filesystem::path& Path, const std::filesystem::path& ManifestPath) -> std::vector { std::vector AssetPaths; std::filesystem::path AbsoluteManifestPath = ManifestPath.is_absolute() ? ManifestPath : Path / ManifestPath; IoBuffer ManifestContent = ReadFile(AbsoluteManifestPath).Flatten(); std::string_view ManifestString((const char*)ManifestContent.GetView().GetData(), ManifestContent.GetSize()); std::string_view::size_type Offset = 0; while (Offset < ManifestContent.GetSize()) { size_t PathBreakOffset = ManifestString.find_first_of("\t\r\n", Offset); if (PathBreakOffset == std::string_view::npos) { PathBreakOffset = ManifestContent.GetSize(); } std::string_view AssetPath = ManifestString.substr(Offset, PathBreakOffset - Offset); if (!AssetPath.empty()) { AssetPaths.emplace_back(std::filesystem::path(AssetPath)); } Offset = PathBreakOffset; size_t EolOffset = ManifestString.find_first_of("\r\n", Offset); if (EolOffset == std::string_view::npos) { break; } Offset = EolOffset; size_t LineBreakOffset = ManifestString.find_first_not_of("\t\r\n", Offset); if (LineBreakOffset == std::string_view::npos) { break; } Offset = LineBreakOffset; } return AssetPaths; }; Stopwatch ScanTimer; FolderContent Content; if (ManifestPath.empty()) { std::filesystem::path ExcludeManifestPath = Path / ZenExcludeManifestName; tsl::robin_set ExcludeAssetPaths; if (std::filesystem::is_regular_file(ExcludeManifestPath)) { std::vector AssetPaths = ParseManifest(Path, ExcludeManifestPath); ExcludeAssetPaths.reserve(AssetPaths.size()); for (const std::filesystem::path& AssetPath : AssetPaths) { ExcludeAssetPaths.insert(AssetPath.generic_string()); } } Content = GetFolderContent( LocalFolderScanStats, Path, std::move(IsAcceptedFolder), [&IsAcceptedFile, &ExcludeAssetPaths](const std::string_view& RelativePath, uint64_t Size, uint32_t Attributes) -> bool { if (RelativePath == ZenExcludeManifestName) { return false; } if (!IsAcceptedFile(RelativePath, Size, Attributes)) { return false; } if (ExcludeAssetPaths.contains(std::filesystem::path(RelativePath).generic_string())) { return false; } return true; }, GetMediumWorkerPool(EWorkloadType::Burst), UsePlainProgress ? 5000 : 200, [&](bool, std::ptrdiff_t) { ZEN_DEBUG("Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), Path); }, AbortFlag); } else { Stopwatch ManifestParseTimer; std::vector AssetPaths = ParseManifest(Path, ManifestPath); for (const std::filesystem::path& AssetPath : AssetPaths) { Content.Paths.push_back(AssetPath); Content.RawSizes.push_back(std::filesystem::file_size(Path / AssetPath)); #if ZEN_PLATFORM_WINDOWS Content.Attributes.push_back(GetFileAttributes(Path / AssetPath)); #endif // ZEN_PLATFORM_WINDOWS #if ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX Content.Attributes.push_back(GetFileMode(Path / AssetPath)); #endif // ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX LocalFolderScanStats.AcceptedFileByteCount += Content.RawSizes.back(); LocalFolderScanStats.AcceptedFileCount++; } if (ManifestPath.is_relative()) { Content.Paths.push_back(ManifestPath); Content.RawSizes.push_back(std::filesystem::file_size(ManifestPath)); #if ZEN_PLATFORM_WINDOWS Content.Attributes.push_back(GetFileAttributes(ManifestPath)); #endif // ZEN_PLATFORM_WINDOWS #if ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX Content.Attributes.push_back(GetFileMode(ManifestPath)); #endif // ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX LocalFolderScanStats.AcceptedFileByteCount += Content.RawSizes.back(); LocalFolderScanStats.AcceptedFileCount++; } LocalFolderScanStats.FoundFileByteCount.store(LocalFolderScanStats.AcceptedFileByteCount); LocalFolderScanStats.FoundFileCount.store(LocalFolderScanStats.AcceptedFileCount); LocalFolderScanStats.ElapsedWallTimeUS = ManifestParseTimer.GetElapsedTimeUs(); } std::unique_ptr ChunkController = CreateBasicChunkingController(); { CbObjectWriter ChunkParametersWriter; ChunkParametersWriter.AddString("name"sv, ChunkController->GetName()); ChunkParametersWriter.AddObject("parameters"sv, ChunkController->GetParameters()); ChunkerParameters = ChunkParametersWriter.Save(); } TotalRawSize = std::accumulate(Content.RawSizes.begin(), Content.RawSizes.end(), std::uint64_t(0)); { ProgressBar ProgressBar(UsePlainProgress); FilteredRate FilteredBytesHashed; FilteredBytesHashed.Start(); LocalContent = ChunkFolderContent( ChunkingStats, GetMediumWorkerPool(EWorkloadType::Burst), Path, Content, *ChunkController, UsePlainProgress ? 5000 : 200, [&](bool, std::ptrdiff_t) { FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load()); std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found", ChunkingStats.FilesProcessed.load(), Content.Paths.size(), NiceBytes(ChunkingStats.BytesHashed.load()), NiceBytes(TotalRawSize), NiceNum(FilteredBytesHashed.GetCurrent()), ChunkingStats.UniqueChunksFound.load(), NiceBytes(ChunkingStats.UniqueBytesFound.load())); ProgressBar.UpdateState({.Task = "Scanning files ", .Details = Details, .TotalCount = TotalRawSize, .RemainingCount = TotalRawSize - ChunkingStats.BytesHashed.load()}, false); }, AbortFlag); if (AbortFlag) { return; } FilteredBytesHashed.Stop(); ProgressBar.Finish(); } ZEN_CONSOLE("Found {} ({}) files divided into {} ({}) unique chunks in '{}' in {}. Average hash rate {}B/sec", LocalContent.Paths.size(), NiceBytes(TotalRawSize), ChunkingStats.UniqueChunksFound.load(), NiceBytes(ChunkingStats.UniqueBytesFound.load()), Path, NiceTimeSpanMs(ScanTimer.GetElapsedTimeMs()), NiceNum(GetBytesPerSecond(ChunkingStats.ElapsedWallTimeUS, ChunkingStats.BytesHashed))); } const ChunkedContentLookup LocalLookup = BuildChunkedContentLookup(LocalContent); GenerateBlocksStatistics GenerateBlocksStats; LooseChunksStatistics LooseChunksStats; std::vector ReuseBlockIndexes; std::vector NewBlockChunkIndexes; PrepareBuildResult PrepBuildResult = PrepBuildResultFuture.get(); ZEN_CONSOLE("Build prepare took {}. {} took {}, payload size {}{}", NiceTimeSpanMs(PrepBuildResult.ElapsedTimeMs), CreateBuild ? "PutBuild" : "GetBuild", NiceTimeSpanMs(PrepBuildResult.PrepareBuildTimeMs), NiceBytes(PrepBuildResult.PayloadSize), IgnoreExistingBlocks ? "" : fmt::format(". Found {} blocks in {}", PrepBuildResult.KnownBlocks.size(), NiceTimeSpanMs(PrepBuildResult.FindBlocksTimeMs))); const std::uint64_t LargeAttachmentSize = AllowMultiparts ? PrepBuildResult.PreferredMultipartChunkSize * 4u : (std::uint64_t)-1; Stopwatch BlockArrangeTimer; std::vector LooseChunkIndexes; { bool EnableBlocks = true; std::vector BlockChunkIndexes; for (uint32_t ChunkIndex = 0; ChunkIndex < LocalContent.ChunkedContent.ChunkHashes.size(); ChunkIndex++) { const uint64_t ChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; if (!EnableBlocks || ChunkRawSize == 0 || ChunkRawSize > DefaultChunksBlockParams.MaxChunkEmbedSize) { LooseChunkIndexes.push_back(ChunkIndex); LooseChunksStats.ChunkByteCount += ChunkRawSize; } else { BlockChunkIndexes.push_back(ChunkIndex); FindBlocksStats.PotentialChunkByteCount += ChunkRawSize; } } FindBlocksStats.PotentialChunkCount = BlockChunkIndexes.size(); LooseChunksStats.ChunkCount = LooseChunkIndexes.size(); if (IgnoreExistingBlocks) { ZEN_CONSOLE("Ignoring any existing blocks in store"); NewBlockChunkIndexes = std::move(BlockChunkIndexes); } else { ReuseBlockIndexes = FindReuseBlocks(PrepBuildResult.KnownBlocks, LocalContent.ChunkedContent.ChunkHashes, BlockChunkIndexes, BlockReuseMinPercentLimit, NewBlockChunkIndexes, FindBlocksStats); FindBlocksStats.AcceptedBlockCount = ReuseBlockIndexes.size(); for (const ChunkBlockDescription& Description : PrepBuildResult.KnownBlocks) { for (uint32_t ChunkRawLength : Description.ChunkRawLengths) { FindBlocksStats.FoundBlockByteCount += ChunkRawLength; } FindBlocksStats.FoundBlockChunkCount += Description.ChunkRawHashes.size(); } } } std::vector> NewBlockChunks; ArrangeChunksIntoBlocks(LocalContent, LocalLookup, DefaultChunksBlockParams.MaxBlockSize, NewBlockChunkIndexes, NewBlockChunks); FindBlocksStats.NewBlocksCount = NewBlockChunks.size(); for (uint32_t ChunkIndex : NewBlockChunkIndexes) { FindBlocksStats.NewBlocksChunkByteCount += LocalContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; } FindBlocksStats.NewBlocksChunkCount = NewBlockChunkIndexes.size(); const double AcceptedByteCountPercent = FindBlocksStats.PotentialChunkByteCount > 0 ? (100.0 * FindBlocksStats.AcceptedByteCount / FindBlocksStats.PotentialChunkByteCount) : 0.0; const double AcceptedReduntantByteCountPercent = FindBlocksStats.AcceptedByteCount > 0 ? (100.0 * FindBlocksStats.AcceptedReduntantByteCount) / (FindBlocksStats.AcceptedByteCount + FindBlocksStats.AcceptedReduntantByteCount) : 0.0; ZEN_CONSOLE( "Found {} chunks in {} ({}) blocks eligible for reuse in {}\n" " Reusing {} ({}) matching chunks in {} blocks ({:.1f}%)\n" " Accepting {} ({}) redundant chunks ({:.1f}%)\n" " Rejected {} ({}) chunks in {} blocks\n" " Arranged {} ({}) chunks in {} new blocks\n" " Keeping {} ({}) chunks as loose chunks\n" " Discovery completed in {}", FindBlocksStats.FoundBlockChunkCount, FindBlocksStats.FoundBlockCount, NiceBytes(FindBlocksStats.FoundBlockByteCount), NiceTimeSpanMs(FindBlocksStats.FindBlockTimeMS), FindBlocksStats.AcceptedChunkCount, NiceBytes(FindBlocksStats.AcceptedByteCount), FindBlocksStats.AcceptedBlockCount, AcceptedByteCountPercent, FindBlocksStats.AcceptedReduntantChunkCount, NiceBytes(FindBlocksStats.AcceptedReduntantByteCount), AcceptedReduntantByteCountPercent, FindBlocksStats.RejectedChunkCount, NiceBytes(FindBlocksStats.RejectedByteCount), FindBlocksStats.RejectedBlockCount, FindBlocksStats.NewBlocksChunkCount, NiceBytes(FindBlocksStats.NewBlocksChunkByteCount), FindBlocksStats.NewBlocksCount, LooseChunksStats.ChunkCount, NiceBytes(LooseChunksStats.ChunkByteCount), NiceTimeSpanMs(BlockArrangeTimer.GetElapsedTimeMs())); DiskStatistics DiskStats; UploadStatistics UploadStats; GeneratedBlocks NewBlocks; if (!NewBlockChunks.empty()) { Stopwatch GenerateBuildBlocksTimer; auto __ = MakeGuard([&]() { uint64_t BlockGenerateTimeUs = GenerateBuildBlocksTimer.GetElapsedTimeUs(); ZEN_CONSOLE("Generated {} ({}) and uploaded {} ({}) blocks in {}. Generate speed: {}B/sec. Transfer speed {}bits/sec.", GenerateBlocksStats.GeneratedBlockCount.load(), NiceBytes(GenerateBlocksStats.GeneratedBlockByteCount), UploadStats.BlockCount.load(), NiceBytes(UploadStats.BlocksBytes.load()), NiceTimeSpanMs(BlockGenerateTimeUs / 1000), NiceNum(GetBytesPerSecond(GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS, GenerateBlocksStats.GeneratedBlockByteCount)), NiceNum(GetBytesPerSecond(UploadStats.ElapsedWallTimeUS, UploadStats.BlocksBytes * 8))); }); GenerateBuildBlocks(Path, LocalContent, LocalLookup, Storage, BuildId, NewBlockChunks, NewBlocks, DiskStats, UploadStats, GenerateBlocksStats); } CbObject PartManifest; { CbObjectWriter PartManifestWriter; Stopwatch ManifestGenerationTimer; auto __ = MakeGuard([&]() { ZEN_CONSOLE("Generated build part manifest in {} ({})", NiceTimeSpanMs(ManifestGenerationTimer.GetElapsedTimeMs()), NiceBytes(PartManifestWriter.GetSaveSize())); }); PartManifestWriter.AddObject("chunker"sv, ChunkerParameters); std::vector AllChunkBlockHashes; std::vector AllChunkBlockDescriptions; AllChunkBlockHashes.reserve(ReuseBlockIndexes.size() + NewBlocks.BlockDescriptions.size()); AllChunkBlockDescriptions.reserve(ReuseBlockIndexes.size() + NewBlocks.BlockDescriptions.size()); for (size_t ReuseBlockIndex : ReuseBlockIndexes) { AllChunkBlockDescriptions.push_back(PrepBuildResult.KnownBlocks[ReuseBlockIndex]); AllChunkBlockHashes.push_back(PrepBuildResult.KnownBlocks[ReuseBlockIndex].BlockHash); } AllChunkBlockDescriptions.insert(AllChunkBlockDescriptions.end(), NewBlocks.BlockDescriptions.begin(), NewBlocks.BlockDescriptions.end()); for (const ChunkBlockDescription& BlockDescription : NewBlocks.BlockDescriptions) { AllChunkBlockHashes.push_back(BlockDescription.BlockHash); } #if EXTRA_VERIFY tsl::robin_map ChunkHashToAbsoluteChunkIndex; std::vector AbsoluteChunkHashes; AbsoluteChunkHashes.reserve(LocalContent.ChunkedContent.ChunkHashes.size()); for (uint32_t ChunkIndex : LooseChunkIndexes) { ChunkHashToAbsoluteChunkIndex.insert({LocalContent.ChunkedContent.ChunkHashes[ChunkIndex], AbsoluteChunkHashes.size()}); AbsoluteChunkHashes.push_back(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]); } for (const ChunkBlockDescription& Block : AllChunkBlockDescriptions) { for (const IoHash& ChunkHash : Block.ChunkHashes) { ChunkHashToAbsoluteChunkIndex.insert({ChunkHash, AbsoluteChunkHashes.size()}); AbsoluteChunkHashes.push_back(ChunkHash); } } for (const IoHash& ChunkHash : LocalContent.ChunkedContent.ChunkHashes) { ZEN_ASSERT(AbsoluteChunkHashes[ChunkHashToAbsoluteChunkIndex.at(ChunkHash)] == ChunkHash); ZEN_ASSERT(LocalContent.ChunkedContent.ChunkHashes[LocalLookup.ChunkHashToChunkIndex.at(ChunkHash)] == ChunkHash); } for (const uint32_t ChunkIndex : LocalContent.ChunkedContent.ChunkOrders) { ZEN_ASSERT(AbsoluteChunkHashes[ChunkHashToAbsoluteChunkIndex.at(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex])] == LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]); ZEN_ASSERT(LocalLookup.ChunkHashToChunkIndex.at(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]) == ChunkIndex); } #endif // EXTRA_VERIFY std::vector AbsoluteChunkOrders = CalculateAbsoluteChunkOrders(LocalContent.ChunkedContent.ChunkHashes, LocalContent.ChunkedContent.ChunkOrders, LocalLookup.ChunkHashToChunkIndex, LooseChunkIndexes, AllChunkBlockDescriptions); #if EXTRA_VERIFY for (uint32_t ChunkOrderIndex = 0; ChunkOrderIndex < LocalContent.ChunkedContent.ChunkOrders.size(); ChunkOrderIndex++) { uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[ChunkOrderIndex]; uint32_t AbsoluteChunkIndex = AbsoluteChunkOrders[ChunkOrderIndex]; const IoHash& LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; const IoHash& AbsoluteChunkHash = AbsoluteChunkHashes[AbsoluteChunkIndex]; ZEN_ASSERT(LocalChunkHash == AbsoluteChunkHash); } #endif // EXTRA_VERIFY WriteBuildContentToCompactBinary(PartManifestWriter, LocalContent.Platform, LocalContent.Paths, LocalContent.RawHashes, LocalContent.RawSizes, LocalContent.Attributes, LocalContent.ChunkedContent.SequenceRawHashes, LocalContent.ChunkedContent.ChunkCounts, LocalContent.ChunkedContent.ChunkHashes, LocalContent.ChunkedContent.ChunkRawSizes, AbsoluteChunkOrders, LooseChunkIndexes, AllChunkBlockHashes); #if EXTRA_VERIFY { ChunkedFolderContent VerifyFolderContent; std::vector OutAbsoluteChunkOrders; std::vector OutLooseChunkHashes; std::vector OutLooseChunkRawSizes; std::vector OutBlockRawHashes; ReadBuildContentFromCompactBinary(PartManifestWriter.Save(), VerifyFolderContent.Platform, VerifyFolderContent.Paths, VerifyFolderContent.RawHashes, VerifyFolderContent.RawSizes, VerifyFolderContent.Attributes, VerifyFolderContent.ChunkedContent.SequenceRawHashes, VerifyFolderContent.ChunkedContent.ChunkCounts, OutAbsoluteChunkOrders, OutLooseChunkHashes, OutLooseChunkRawSizes, OutBlockRawHashes); ZEN_ASSERT(OutBlockRawHashes == AllChunkBlockHashes); for (uint32_t OrderIndex = 0; OrderIndex < OutAbsoluteChunkOrders.size(); OrderIndex++) { uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[OrderIndex]; const IoHash LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; uint32_t VerifyChunkIndex = OutAbsoluteChunkOrders[OrderIndex]; const IoHash VerifyChunkHash = AbsoluteChunkHashes[VerifyChunkIndex]; ZEN_ASSERT(LocalChunkHash == VerifyChunkHash); } CalculateLocalChunkOrders(OutAbsoluteChunkOrders, OutLooseChunkHashes, OutLooseChunkRawSizes, AllChunkBlockDescriptions, VerifyFolderContent.ChunkedContent.ChunkHashes, VerifyFolderContent.ChunkedContent.ChunkRawSizes, VerifyFolderContent.ChunkedContent.ChunkOrders); ZEN_ASSERT(LocalContent.Paths == VerifyFolderContent.Paths); ZEN_ASSERT(LocalContent.RawHashes == VerifyFolderContent.RawHashes); ZEN_ASSERT(LocalContent.RawSizes == VerifyFolderContent.RawSizes); ZEN_ASSERT(LocalContent.Attributes == VerifyFolderContent.Attributes); ZEN_ASSERT(LocalContent.ChunkedContent.SequenceRawHashes == VerifyFolderContent.ChunkedContent.SequenceRawHashes); ZEN_ASSERT(LocalContent.ChunkedContent.ChunkCounts == VerifyFolderContent.ChunkedContent.ChunkCounts); for (uint32_t OrderIndex = 0; OrderIndex < LocalContent.ChunkedContent.ChunkOrders.size(); OrderIndex++) { uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[OrderIndex]; const IoHash LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; uint64_t LocalChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[LocalChunkIndex]; uint32_t VerifyChunkIndex = VerifyFolderContent.ChunkedContent.ChunkOrders[OrderIndex]; const IoHash VerifyChunkHash = VerifyFolderContent.ChunkedContent.ChunkHashes[VerifyChunkIndex]; uint64_t VerifyChunkRawSize = VerifyFolderContent.ChunkedContent.ChunkRawSizes[VerifyChunkIndex]; ZEN_ASSERT(LocalChunkHash == VerifyChunkHash); ZEN_ASSERT(LocalChunkRawSize == VerifyChunkRawSize); } } #endif // EXTRA_VERIFY PartManifest = PartManifestWriter.Save(); } Stopwatch PutBuildPartResultTimer; std::pair> PutBuildPartResult = Storage.PutBuildPart(BuildId, BuildPartId, BuildPartName, PartManifest); ZEN_CONSOLE("PutBuildPart took {}, payload size {}. {} attachments are needed.", NiceTimeSpanMs(PutBuildPartResultTimer.GetElapsedTimeMs()), NiceBytes(PartManifest.GetSize()), PutBuildPartResult.second.size()); IoHash PartHash = PutBuildPartResult.first; auto UploadAttachments = [&](std::span RawHashes) { if (!AbortFlag) { ZEN_CONSOLE_VERBOSE("Uploading attachments: {}", FormatArray(RawHashes, "\n "sv)); UploadStatistics TempUploadStats; LooseChunksStatistics TempLooseChunksStats; Stopwatch TempUploadTimer; auto __ = MakeGuard([&]() { uint64_t TempChunkUploadTimeUs = TempUploadTimer.GetElapsedTimeUs(); ZEN_CONSOLE( "Uploaded {} ({}) blocks. " "Compressed {} ({} {}B/s) and uploaded {} ({}) chunks. " "Transferred {} ({}bits/s) in {}", TempUploadStats.BlockCount.load(), NiceBytes(TempUploadStats.BlocksBytes), TempLooseChunksStats.CompressedChunkCount.load(), NiceBytes(TempLooseChunksStats.CompressedChunkBytes.load()), NiceNum(GetBytesPerSecond(TempLooseChunksStats.CompressChunksElapsedWallTimeUS, TempLooseChunksStats.CompressedChunkBytes)), TempUploadStats.ChunkCount.load(), NiceBytes(TempUploadStats.ChunksBytes), NiceBytes(TempUploadStats.BlocksBytes + TempUploadStats.ChunksBytes), NiceNum(GetBytesPerSecond(TempUploadStats.ElapsedWallTimeUS, TempUploadStats.ChunksBytes * 8)), NiceTimeSpanMs(TempChunkUploadTimeUs / 1000)); }); UploadPartBlobs(Storage, BuildId, Path, LocalContent, LocalLookup, RawHashes, NewBlockChunks, NewBlocks, LooseChunkIndexes, LargeAttachmentSize, DiskStats, TempUploadStats, TempLooseChunksStats); UploadStats += TempUploadStats; LooseChunksStats += TempLooseChunksStats; } }; if (IgnoreExistingBlocks) { ZEN_CONSOLE_VERBOSE("PutBuildPart uploading all attachments, needs are: {}", FormatArray(PutBuildPartResult.second, "\n "sv)); std::vector ForceUploadChunkHashes; ForceUploadChunkHashes.reserve(LooseChunkIndexes.size()); for (uint32_t ChunkIndex : LooseChunkIndexes) { ForceUploadChunkHashes.push_back(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]); } for (size_t BlockIndex = 0; BlockIndex < NewBlocks.BlockHeaders.size(); BlockIndex++) { if (NewBlocks.BlockHeaders[BlockIndex]) { // Block was not uploaded during generation ForceUploadChunkHashes.push_back(NewBlocks.BlockDescriptions[BlockIndex].BlockHash); } } UploadAttachments(ForceUploadChunkHashes); } else if (!PutBuildPartResult.second.empty()) { ZEN_CONSOLE_VERBOSE("PutBuildPart needs attachments: {}", FormatArray(PutBuildPartResult.second, "\n "sv)); UploadAttachments(PutBuildPartResult.second); } while (!AbortFlag) { Stopwatch FinalizeBuildPartTimer; std::vector Needs = Storage.FinalizeBuildPart(BuildId, BuildPartId, PartHash); ZEN_CONSOLE("FinalizeBuildPart took {}. {} attachments are missing.", NiceTimeSpanMs(FinalizeBuildPartTimer.GetElapsedTimeMs()), Needs.size()); if (Needs.empty()) { break; } ZEN_CONSOLE_VERBOSE("FinalizeBuildPart needs attachments: {}", FormatArray(Needs, "\n "sv)); UploadAttachments(Needs); } if (CreateBuild && !AbortFlag) { Stopwatch FinalizeBuildTimer; Storage.FinalizeBuild(BuildId); ZEN_CONSOLE("FinalizeBuild took {}", NiceTimeSpanMs(FinalizeBuildTimer.GetElapsedTimeMs())); } if (!NewBlocks.BlockDescriptions.empty() && !AbortFlag) { uint64_t UploadBlockMetadataCount = 0; std::vector BlockHashes; BlockHashes.reserve(NewBlocks.BlockDescriptions.size()); Stopwatch UploadBlockMetadataTimer; for (size_t BlockIndex = 0; BlockIndex < NewBlocks.BlockDescriptions.size(); BlockIndex++) { const IoHash& BlockHash = NewBlocks.BlockDescriptions[BlockIndex].BlockHash; if (!NewBlocks.MetaDataHasBeenUploaded[BlockIndex]) { const CbObject BlockMetaData = BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]); Storage.PutBlockMetadata(BuildId, BlockHash, BlockMetaData); UploadStats.BlocksBytes += BlockMetaData.GetSize(); NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; UploadBlockMetadataCount++; } BlockHashes.push_back(BlockHash); } if (UploadBlockMetadataCount > 0) { uint64_t ElapsedUS = UploadBlockMetadataTimer.GetElapsedTimeUs(); UploadStats.ElapsedWallTimeUS += ElapsedUS; ZEN_CONSOLE("Uploaded metadata for {} blocks in {}", UploadBlockMetadataCount, NiceTimeSpanMs(ElapsedUS / 1000)); } } ValidateStatistics ValidateStats; DownloadStatistics ValidateDownloadStats; if (PostUploadVerify && !AbortFlag) { ValidateBuildPart(Storage, BuildId, BuildPartId, BuildPartName, ValidateStats, ValidateDownloadStats); } struct ValidateStatistics { uint64_t BuildBlobSize = 0; uint64_t BuildPartSize = 0; uint64_t ChunkAttachmentCount = 0; uint64_t BlockAttachmentCount = 0; std::atomic DownloadedAttachmentCount = 0; std::atomic VerifiedAttachmentCount = 0; std::atomic DownloadedByteCount = 0; std::atomic VerifiedByteCount = 0; uint64_t ElapsedWallTimeUS = 0; }; ZEN_CONSOLE_VERBOSE( "Folder scanning stats:" "\n FoundFileCount: {}" "\n FoundFileByteCount: {}" "\n AcceptedFileCount: {}" "\n AcceptedFileByteCount: {}" "\n ElapsedWallTimeUS: {}", LocalFolderScanStats.FoundFileCount.load(), NiceBytes(LocalFolderScanStats.FoundFileByteCount.load()), LocalFolderScanStats.AcceptedFileCount.load(), NiceBytes(LocalFolderScanStats.AcceptedFileByteCount.load()), NiceLatencyNs(LocalFolderScanStats.ElapsedWallTimeUS * 1000)); ZEN_CONSOLE_VERBOSE( "Chunking stats:" "\n FilesProcessed: {}" "\n FilesChunked: {}" "\n BytesHashed: {}" "\n UniqueChunksFound: {}" "\n UniqueSequencesFound: {}" "\n UniqueBytesFound: {}" "\n ElapsedWallTimeUS: {}", ChunkingStats.FilesProcessed.load(), ChunkingStats.FilesChunked.load(), NiceBytes(ChunkingStats.BytesHashed.load()), ChunkingStats.UniqueChunksFound.load(), ChunkingStats.UniqueSequencesFound.load(), NiceBytes(ChunkingStats.UniqueBytesFound.load()), NiceLatencyNs(ChunkingStats.ElapsedWallTimeUS * 1000)); ZEN_CONSOLE_VERBOSE( "Find block stats:" "\n FindBlockTimeMS: {}" "\n PotentialChunkCount: {}" "\n PotentialChunkByteCount: {}" "\n FoundBlockCount: {}" "\n FoundBlockChunkCount: {}" "\n FoundBlockByteCount: {}" "\n AcceptedBlockCount: {}" "\n AcceptedChunkCount: {}" "\n AcceptedByteCount: {}" "\n RejectedBlockCount: {}" "\n RejectedChunkCount: {}" "\n RejectedByteCount: {}" "\n AcceptedReduntantChunkCount: {}" "\n AcceptedReduntantByteCount: {}" "\n NewBlocksCount: {}" "\n NewBlocksChunkCount: {}" "\n NewBlocksChunkByteCount: {}", NiceTimeSpanMs(FindBlocksStats.FindBlockTimeMS), FindBlocksStats.PotentialChunkCount, NiceBytes(FindBlocksStats.PotentialChunkByteCount), FindBlocksStats.FoundBlockCount, FindBlocksStats.FoundBlockChunkCount, NiceBytes(FindBlocksStats.FoundBlockByteCount), FindBlocksStats.AcceptedBlockCount, FindBlocksStats.AcceptedChunkCount, NiceBytes(FindBlocksStats.AcceptedByteCount), FindBlocksStats.RejectedBlockCount, FindBlocksStats.RejectedChunkCount, NiceBytes(FindBlocksStats.RejectedByteCount), FindBlocksStats.AcceptedReduntantChunkCount, NiceBytes(FindBlocksStats.AcceptedReduntantByteCount), FindBlocksStats.NewBlocksCount, FindBlocksStats.NewBlocksChunkCount, NiceBytes(FindBlocksStats.NewBlocksChunkByteCount)); ZEN_CONSOLE_VERBOSE( "Generate blocks stats:" "\n GeneratedBlockByteCount: {}" "\n GeneratedBlockCount: {}" "\n GenerateBlocksElapsedWallTimeUS: {}", NiceBytes(GenerateBlocksStats.GeneratedBlockByteCount.load()), GenerateBlocksStats.GeneratedBlockCount.load(), NiceLatencyNs(GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS * 1000)); ZEN_CONSOLE_VERBOSE( "Generate blocks stats:" "\n ChunkCount: {}" "\n ChunkByteCount: {}" "\n CompressedChunkCount: {}" "\n CompressChunksElapsedWallTimeUS: {}", LooseChunksStats.ChunkCount, NiceBytes(LooseChunksStats.ChunkByteCount), LooseChunksStats.CompressedChunkCount.load(), NiceBytes(LooseChunksStats.CompressedChunkBytes.load()), NiceLatencyNs(LooseChunksStats.CompressChunksElapsedWallTimeUS * 1000)); ZEN_CONSOLE_VERBOSE( "Disk stats:" "\n OpenReadCount: {}" "\n OpenWriteCount: {}" "\n ReadCount: {}" "\n ReadByteCount: {}" "\n WriteCount: {}" "\n WriteByteCount: {}" "\n CurrentOpenFileCount: {}", DiskStats.OpenReadCount.load(), DiskStats.OpenWriteCount.load(), DiskStats.ReadCount.load(), NiceBytes(DiskStats.ReadByteCount.load()), DiskStats.WriteCount.load(), NiceBytes(DiskStats.WriteByteCount.load()), DiskStats.CurrentOpenFileCount.load()); ZEN_CONSOLE_VERBOSE( "Upload stats:" "\n BlockCount: {}" "\n BlocksBytes: {}" "\n ChunkCount: {}" "\n ChunksBytes: {}" "\n ReadFromDiskBytes: {}" "\n MultipartAttachmentCount: {}" "\n ElapsedWallTimeUS: {}", UploadStats.BlockCount.load(), NiceBytes(UploadStats.BlocksBytes.load()), UploadStats.ChunkCount.load(), NiceBytes(UploadStats.ChunksBytes.load()), NiceBytes(UploadStats.ReadFromDiskBytes.load()), UploadStats.MultipartAttachmentCount.load(), NiceLatencyNs(UploadStats.ElapsedWallTimeUS * 1000)); if (PostUploadVerify) { ZEN_CONSOLE_VERBOSE( "Validate stats:" "\n BuildBlobSize: {}" "\n BuildPartSize: {}" "\n ChunkAttachmentCount: {}" "\n BlockAttachmentCount: {}" "\n VerifiedAttachmentCount: {}" "\n VerifiedByteCount: {}" "\n ElapsedWallTimeUS: {}", NiceBytes(ValidateStats.BuildBlobSize), NiceBytes(ValidateStats.BuildPartSize), ValidateStats.ChunkAttachmentCount, ValidateStats.BlockAttachmentCount, ValidateStats.VerifiedAttachmentCount.load(), NiceBytes(ValidateStats.VerifiedByteCount.load()), NiceLatencyNs(ValidateStats.ElapsedWallTimeUS * 1000)); ZEN_CONSOLE_VERBOSE( "Validate download stats:" "\n RequestsCompleteCount: {}" "\n DownloadedChunkCount: {}" "\n DownloadedChunkByteCount: {}" "\n MultipartAttachmentCount: {}" "\n DownloadedBlockCount: {}" "\n DownloadedBlockByteCount: {}" "\n DownloadedPartialBlockCount: {}" "\n DownloadedPartialBlockByteCount: {}", ValidateDownloadStats.RequestsCompleteCount.load(), ValidateDownloadStats.DownloadedChunkCount.load(), NiceBytes(ValidateDownloadStats.DownloadedChunkByteCount.load()), ValidateDownloadStats.MultipartAttachmentCount.load(), ValidateDownloadStats.DownloadedBlockCount.load(), NiceBytes(ValidateDownloadStats.DownloadedBlockByteCount.load()), ValidateDownloadStats.DownloadedPartialBlockCount.load(), NiceBytes(ValidateDownloadStats.DownloadedPartialBlockByteCount.load())); } const double DeltaByteCountPercent = ChunkingStats.BytesHashed > 0 ? (100.0 * (FindBlocksStats.NewBlocksChunkByteCount + LooseChunksStats.CompressedChunkBytes)) / (ChunkingStats.BytesHashed) : 0.0; const std::string MultipartAttachmentStats = (LargeAttachmentSize != (uint64_t)-1) ? fmt::format(" ({} as multipart)", UploadStats.MultipartAttachmentCount.load()) : ""; std::string ValidateInfo; if (PostUploadVerify) { const uint64_t DownloadedCount = ValidateDownloadStats.DownloadedChunkCount + ValidateDownloadStats.DownloadedBlockCount; const uint64_t DownloadedByteCount = ValidateDownloadStats.DownloadedChunkByteCount + ValidateDownloadStats.DownloadedBlockByteCount; ValidateInfo = fmt::format("\n Verified: {} ({}) {}B/sec in {}", DownloadedCount, NiceBytes(DownloadedByteCount), NiceNum(GetBytesPerSecond(ValidateStats.ElapsedWallTimeUS, DownloadedByteCount)), NiceTimeSpanMs(ValidateStats.ElapsedWallTimeUS / 1000)); } ZEN_CONSOLE( "Uploaded part {} ('{}') to build {} in {} \n" " Scanned files: {} ({}) {}B/sec in {}\n" " New data: {} ({:.1f}%)\n" " New blocks: {} ({}) {}B/sec\n" " New chunks: {} ({} -> {}) {}B/sec\n" " Uploaded: {} ({}) {}bits/sec\n" " Blocks: {} ({})\n" " Chunks: {} ({}){}" "{}", BuildPartId, BuildPartName, BuildId, NiceTimeSpanMs(ProcessTimer.GetElapsedTimeMs()), LocalFolderScanStats.FoundFileCount.load(), NiceBytes(LocalFolderScanStats.FoundFileByteCount.load()), NiceNum(GetBytesPerSecond(ChunkingStats.ElapsedWallTimeUS, ChunkingStats.BytesHashed)), NiceTimeSpanMs(ChunkingStats.ElapsedWallTimeUS / 1000), NiceBytes(FindBlocksStats.NewBlocksChunkByteCount + LooseChunksStats.CompressedChunkBytes), DeltaByteCountPercent, GenerateBlocksStats.GeneratedBlockCount.load(), NiceBytes(GenerateBlocksStats.GeneratedBlockByteCount.load()), NiceNum(GetBytesPerSecond(GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS, GenerateBlocksStats.GeneratedBlockByteCount)), LooseChunksStats.CompressedChunkCount.load(), NiceBytes(LooseChunksStats.ChunkByteCount), NiceBytes(LooseChunksStats.CompressedChunkBytes.load()), NiceNum(GetBytesPerSecond(LooseChunksStats.CompressChunksElapsedWallTimeUS, LooseChunksStats.ChunkByteCount)), NiceBytes(UploadStats.BlockCount.load() + UploadStats.ChunkCount.load()), NiceBytes(UploadStats.BlocksBytes + UploadStats.ChunksBytes), NiceNum(GetBytesPerSecond(UploadStats.ElapsedWallTimeUS, (UploadStats.ChunksBytes + UploadStats.BlocksBytes * 8))), UploadStats.BlockCount.load(), NiceBytes(UploadStats.BlocksBytes.load()), UploadStats.ChunkCount.load(), NiceBytes(UploadStats.ChunksBytes.load()), MultipartAttachmentStats, ValidateInfo); } void VerifyFolder(const ChunkedFolderContent& Content, const std::filesystem::path& Path, bool VerifyFileHash, VerifyFolderStatistics& VerifyFolderStats) { ZEN_TRACE_CPU("VerifyFolder"); Stopwatch Timer; ProgressBar ProgressBar(UsePlainProgress); WorkerThreadPool& VerifyPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // ParallellWork Work(AbortFlag); const uint32_t PathCount = gsl::narrow(Content.Paths.size()); RwLock ErrorLock; std::vector Errors; auto IsAcceptedFolder = [ExcludeFolders = DefaultExcludeFolders](const std::string_view& RelativePath) -> bool { for (const std::string_view& ExcludeFolder : ExcludeFolders) { if (RelativePath.starts_with(ExcludeFolder)) { if (RelativePath.length() == ExcludeFolder.length()) { return false; } else if (RelativePath[ExcludeFolder.length()] == '/') { return false; } } } return true; }; const ChunkedContentLookup Lookup = BuildChunkedContentLookup(Content); for (uint32_t PathIndex = 0; PathIndex < PathCount; PathIndex++) { if (Work.IsAborted()) { break; } Work.ScheduleWork( VerifyPool, [&, PathIndex](std::atomic&) { if (!AbortFlag) { ZEN_TRACE_CPU("VerifyFile_work"); // TODO: Convert ScheduleWork body to function const std::filesystem::path TargetPath = (Path / Content.Paths[PathIndex]).make_preferred(); if (IsAcceptedFolder(TargetPath.parent_path().generic_string())) { const uint64_t ExpectedSize = Content.RawSizes[PathIndex]; if (!std::filesystem::exists(TargetPath)) { ErrorLock.WithExclusiveLock([&]() { Errors.push_back(fmt::format("File {} with expected size {} does not exist", TargetPath, ExpectedSize)); }); VerifyFolderStats.FilesFailed++; } else { std::error_code Ec; uint64_t SizeOnDisk = gsl::narrow(std::filesystem::file_size(TargetPath, Ec)); if (Ec) { ErrorLock.WithExclusiveLock([&]() { Errors.push_back( fmt::format("Failed to get size of file {}: {} ({})", TargetPath, Ec.message(), Ec.value())); }); VerifyFolderStats.FilesFailed++; } else if (SizeOnDisk < ExpectedSize) { ErrorLock.WithExclusiveLock([&]() { Errors.push_back(fmt::format("Size of file {} is smaller than expected. Expected: {}, Found: {}", TargetPath, ExpectedSize, SizeOnDisk)); }); VerifyFolderStats.FilesFailed++; } else if (SizeOnDisk > ExpectedSize) { ErrorLock.WithExclusiveLock([&]() { Errors.push_back(fmt::format("Size of file {} is bigger than expected. Expected: {}, Found: {}", TargetPath, ExpectedSize, SizeOnDisk)); }); VerifyFolderStats.FilesFailed++; } else if (SizeOnDisk > 0 && VerifyFileHash) { const IoHash& ExpectedRawHash = Content.RawHashes[PathIndex]; IoBuffer Buffer = IoBufferBuilder::MakeFromFile(TargetPath); IoHash RawHash = IoHash::HashBuffer(Buffer); if (RawHash != ExpectedRawHash) { uint64_t FileOffset = 0; const uint32_t SequenceIndex = Lookup.RawHashToSequenceIndex.at(ExpectedRawHash); const uint32_t OrderOffset = Lookup.SequenceIndexChunkOrderOffset[SequenceIndex]; for (uint32_t OrderIndex = OrderOffset; OrderIndex < OrderOffset + Content.ChunkedContent.ChunkCounts[SequenceIndex]; OrderIndex++) { uint32_t ChunkIndex = Content.ChunkedContent.ChunkOrders[OrderIndex]; uint64_t ChunkSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; IoHash ChunkHash = Content.ChunkedContent.ChunkHashes[ChunkIndex]; IoBuffer FileChunk = IoBuffer(Buffer, FileOffset, ChunkSize); if (IoHash::HashBuffer(FileChunk) != ChunkHash) { ErrorLock.WithExclusiveLock([&]() { Errors.push_back(fmt::format( "WARNING: Hash of file {} does not match expected hash. Expected: {}, Found: {}. " "Mismatch at chunk {}", TargetPath, ExpectedRawHash, RawHash, OrderIndex - OrderOffset)); }); break; } FileOffset += ChunkSize; } VerifyFolderStats.FilesFailed++; } VerifyFolderStats.ReadBytes += SizeOnDisk; } } } VerifyFolderStats.FilesVerified++; } }, [&, PathIndex](const std::exception& Ex, std::atomic&) { ErrorLock.WithExclusiveLock([&]() { Errors.push_back(fmt::format("Failed verifying file '{}'. Reason: {}", (Path / Content.Paths[PathIndex]).make_preferred(), Ex.what())); }); VerifyFolderStats.FilesFailed++; }); } Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted, PendingWork); std::string Details = fmt::format("Verified {}/{} ({}). Failed files: {}", VerifyFolderStats.FilesVerified.load(), PathCount, NiceBytes(VerifyFolderStats.ReadBytes.load()), VerifyFolderStats.FilesFailed.load()); ProgressBar.UpdateState({.Task = "Verifying files ", .Details = Details, .TotalCount = gsl::narrow(PathCount), .RemainingCount = gsl::narrow(PathCount - VerifyFolderStats.FilesVerified.load())}, false); }); VerifyFolderStats.VerifyElapsedWallTimeUs = Timer.GetElapsedTimeUs(); ProgressBar.Finish(); for (const std::string& Error : Errors) { ZEN_CONSOLE("{}", Error); } if (!Errors.empty()) { throw std::runtime_error(fmt::format("Verify failed with {} errors", Errors.size())); } } class WriteFileCache { public: WriteFileCache(DiskStatistics& DiskStats) : m_DiskStats(DiskStats) {} ~WriteFileCache() { Flush(); } template void WriteToFile(uint32_t TargetIndex, std::function&& GetTargetPath, const TBufferType& Buffer, uint64_t FileOffset, uint64_t TargetFinalSize) { ZEN_TRACE_CPU("WriteFileCache_WriteToFile"); if (!SeenTargetIndexes.empty() && SeenTargetIndexes.back() == TargetIndex) { ZEN_TRACE_CPU("WriteFileCache_WriteToFile_CacheWrite"); ZEN_ASSERT(OpenFileWriter); OpenFileWriter->Write(Buffer, FileOffset); m_DiskStats.WriteCount++; m_DiskStats.WriteByteCount += Buffer.GetSize(); } else { std::unique_ptr NewOutputFile; { ZEN_TRACE_CPU("WriteFileCache_WriteToFile_Open"); Flush(); const std::filesystem::path& TargetPath = GetTargetPath(TargetIndex); CreateDirectories(TargetPath.parent_path()); uint32_t Tries = 5; NewOutputFile = std::make_unique(TargetPath, BasicFile::Mode::kWrite, [&Tries, TargetPath](std::error_code& Ec) { if (Tries < 3) { ZEN_CONSOLE("Failed opening file '{}': {}{}", TargetPath, Ec.message(), Tries > 1 ? " Retrying"sv : ""sv); } if (Tries > 1) { Sleep(100); } return --Tries > 0; }); m_DiskStats.OpenWriteCount++; m_DiskStats.CurrentOpenFileCount++; } const bool CacheWriter = TargetFinalSize > Buffer.GetSize(); if (CacheWriter) { ZEN_TRACE_CPU("WriteFileCache_WriteToFile_CacheWrite"); ZEN_ASSERT_SLOW(std::find(SeenTargetIndexes.begin(), SeenTargetIndexes.end(), TargetIndex) == SeenTargetIndexes.end()); OutputFile = std::move(NewOutputFile); OpenFileWriter = std::make_unique(*OutputFile, Min(TargetFinalSize, 256u * 1024u)); OpenFileWriter->Write(Buffer, FileOffset); m_DiskStats.WriteCount++; m_DiskStats.WriteByteCount += Buffer.GetSize(); SeenTargetIndexes.push_back(TargetIndex); } else { ZEN_TRACE_CPU("WriteFileCache_WriteToFile_Write"); NewOutputFile->Write(Buffer, FileOffset); m_DiskStats.WriteCount++; m_DiskStats.WriteByteCount += Buffer.GetSize(); NewOutputFile = {}; m_DiskStats.CurrentOpenFileCount--; } } } void Flush() { ZEN_TRACE_CPU("WriteFileCache_Flush"); if (OutputFile) { m_DiskStats.CurrentOpenFileCount--; } OpenFileWriter = {}; OutputFile = {}; } DiskStatistics& m_DiskStats; std::vector SeenTargetIndexes; std::unique_ptr OutputFile; std::unique_ptr OpenFileWriter; }; std::vector GetRemainingChunkTargets( std::span> SequenceIndexChunksLeftToWriteCounters, const ChunkedContentLookup& Lookup, uint32_t ChunkIndex) { std::span ChunkSources = GetChunkSequenceLocations(Lookup, ChunkIndex); std::vector ChunkTargetPtrs; if (!ChunkSources.empty()) { ChunkTargetPtrs.reserve(ChunkSources.size()); for (const ChunkedContentLookup::ChunkSequenceLocation& Source : ChunkSources) { if (SequenceIndexChunksLeftToWriteCounters[Source.SequenceIndex].load() > 0) { ChunkTargetPtrs.push_back(&Source); } } } return ChunkTargetPtrs; }; void FinalizeChunkSequence(const std::filesystem::path& TargetFolder, const IoHash& SequenceRawHash) { ZEN_TRACE_CPU("FinalizeChunkSequence"); ZEN_ASSERT_SLOW(!std::filesystem::exists(GetFinalChunkedSequenceFileName(TargetFolder, SequenceRawHash))); std::filesystem::rename(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash), GetFinalChunkedSequenceFileName(TargetFolder, SequenceRawHash)); } void FinalizeChunkSequences(const std::filesystem::path& TargetFolder, const ChunkedFolderContent& RemoteContent, std::span RemoteSequenceIndexes) { ZEN_TRACE_CPU("FinalizeChunkSequences"); for (uint32_t SequenceIndex : RemoteSequenceIndexes) { FinalizeChunkSequence(TargetFolder, RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); } } void VerifyAndCompleteChunkSequencesAsync(const std::filesystem::path& TargetFolder, const ChunkedFolderContent& RemoteContent, std::span RemoteSequenceIndexes, ParallellWork& Work, WorkerThreadPool& VerifyPool) { if (RemoteSequenceIndexes.empty()) { return; } ZEN_TRACE_CPU("VerifyAndCompleteChunkSequence"); for (uint32_t RemoteSequenceIndexOffset = 1; RemoteSequenceIndexOffset < RemoteSequenceIndexes.size(); RemoteSequenceIndexOffset++) { const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset]; Work.ScheduleWork( VerifyPool, [&RemoteContent, TargetFolder, RemoteSequenceIndex](std::atomic&) { if (!AbortFlag) { ZEN_TRACE_CPU("VerifyAndCompleteChunkSequenceAsync"); const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; { ZEN_TRACE_CPU("HashSequence"); const IoHash VerifyChunkHash = IoHash::HashBuffer( IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash))); if (VerifyChunkHash != SequenceRawHash) { throw std::runtime_error(fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash)); } } FinalizeChunkSequence(TargetFolder, SequenceRawHash); } }, Work.DefaultErrorFunction()); } const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[0]; const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; { ZEN_TRACE_CPU("HashSequence"); const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash))); if (VerifyChunkHash != SequenceRawHash) { throw std::runtime_error( fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash)); } } FinalizeChunkSequence(TargetFolder, SequenceRawHash); } bool CompleteSequenceChunk(uint32_t RemoteSequenceIndex, std::span> SequenceIndexChunksLeftToWriteCounters) { return SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1; } std::vector CompleteChunkTargets(const std::vector& ChunkTargetPtrs, std::span> SequenceIndexChunksLeftToWriteCounters) { ZEN_TRACE_CPU("CompleteChunkTargets"); std::vector CompletedSequenceIndexes; for (const ChunkedContentLookup::ChunkSequenceLocation* Location : ChunkTargetPtrs) { const uint32_t RemoteSequenceIndex = Location->SequenceIndex; if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) { CompletedSequenceIndexes.push_back(RemoteSequenceIndex); } } return CompletedSequenceIndexes; } struct BlockWriteOps { std::vector ChunkBuffers; struct WriteOpData { const ChunkedContentLookup::ChunkSequenceLocation* Target = nullptr; size_t ChunkBufferIndex = (size_t)-1; }; std::vector WriteOps; }; void WriteBlockChunkOps(const std::filesystem::path& CacheFolderPath, const ChunkedFolderContent& RemoteContent, const ChunkedContentLookup& Lookup, std::span> SequenceIndexChunksLeftToWriteCounters, const BlockWriteOps& Ops, ParallellWork& Work, WorkerThreadPool& VerifyPool, DiskStatistics& DiskStats, WriteChunkStatistics& WriteChunkStats) { ZEN_TRACE_CPU("WriteBlockChunkOps"); { WriteFileCache OpenFileCache(DiskStats); for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps) { if (AbortFlag) { break; } const CompositeBuffer& Chunk = Ops.ChunkBuffers[WriteOp.ChunkBufferIndex]; const uint32_t SequenceIndex = WriteOp.Target->SequenceIndex; ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() <= RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]); ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() > 0); const uint64_t ChunkSize = Chunk.GetSize(); const uint64_t FileOffset = WriteOp.Target->Offset; const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex]; ZEN_ASSERT(FileOffset + ChunkSize <= RemoteContent.RawSizes[PathIndex]); OpenFileCache.WriteToFile( SequenceIndex, [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) { return GetTempChunkedSequenceFileName(CacheFolderPath, RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); }, Chunk, FileOffset, RemoteContent.RawSizes[PathIndex]); } WriteChunkStats.ChunkCountWritten += gsl::narrow(Ops.ChunkBuffers.size()); WriteChunkStats.ChunkBytesWritten += std::accumulate(Ops.ChunkBuffers.begin(), Ops.ChunkBuffers.end(), uint64_t(0), [](uint64_t Current, const CompositeBuffer& Buffer) -> uint64_t { return Current + Buffer.GetSize(); }); } if (!AbortFlag) { // Write tracking, updating this must be done without any files open (WriteFileCache) std::vector CompletedChunkSequences; for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps) { const uint32_t RemoteSequenceIndex = WriteOp.Target->SequenceIndex; if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) { CompletedChunkSequences.push_back(RemoteSequenceIndex); } } VerifyAndCompleteChunkSequencesAsync(CacheFolderPath, RemoteContent, CompletedChunkSequences, Work, VerifyPool); } } IoBuffer MakeBufferMemoryBased(const CompositeBuffer& PartialBlockBuffer) { ZEN_TRACE_CPU("MakeBufferMemoryBased"); IoBuffer BlockMemoryBuffer; std::span Segments = PartialBlockBuffer.GetSegments(); if (Segments.size() == 1) { IoBufferFileReference FileRef = {}; if (PartialBlockBuffer.GetSegments().front().AsIoBuffer().GetFileReference(FileRef)) { BlockMemoryBuffer = UniqueBuffer::Alloc(FileRef.FileChunkSize).MoveToShared().AsIoBuffer(); BasicFile Reader; Reader.Attach(FileRef.FileHandle); auto _ = MakeGuard([&Reader]() { Reader.Detach(); }); MutableMemoryView ReadMem = BlockMemoryBuffer.GetMutableView(); Reader.Read(ReadMem.GetData(), FileRef.FileChunkSize, FileRef.FileChunkOffset); return BlockMemoryBuffer; } else { return PartialBlockBuffer.GetSegments().front().AsIoBuffer(); } } else { // Not a homogenous memory buffer, read all to memory BlockMemoryBuffer = UniqueBuffer::Alloc(PartialBlockBuffer.GetSize()).MoveToShared().AsIoBuffer(); MutableMemoryView ReadMem = BlockMemoryBuffer.GetMutableView(); for (const SharedBuffer& Segment : Segments) { IoBufferFileReference FileRef = {}; if (Segment.AsIoBuffer().GetFileReference(FileRef)) { BasicFile Reader; Reader.Attach(FileRef.FileHandle); Reader.Read(ReadMem.GetData(), FileRef.FileChunkSize, FileRef.FileChunkOffset); Reader.Detach(); ReadMem = ReadMem.Mid(FileRef.FileChunkSize); } else { ReadMem = ReadMem.CopyFrom(Segment.AsIoBuffer().GetView()); } } return BlockMemoryBuffer; } } bool GetBlockWriteOps(const ChunkedFolderContent& RemoteContent, const ChunkedContentLookup& Lookup, std::span ChunkRawHashes, std::span ChunkCompressedLengths, std::span> SequenceIndexChunksLeftToWriteCounters, std::span> RemoteChunkIndexNeedsCopyFromSourceFlags, const MemoryView BlockView, uint32_t FirstIncludedBlockChunkIndex, uint32_t LastIncludedBlockChunkIndex, BlockWriteOps& OutOps) { ZEN_TRACE_CPU("GetBlockWriteOps"); uint32_t OffsetInBlock = 0; for (uint32_t ChunkBlockIndex = FirstIncludedBlockChunkIndex; ChunkBlockIndex <= LastIncludedBlockChunkIndex; ChunkBlockIndex++) { const uint32_t ChunkCompressedSize = ChunkCompressedLengths[ChunkBlockIndex]; const IoHash& ChunkHash = ChunkRawHashes[ChunkBlockIndex]; if (auto It = Lookup.ChunkHashToChunkIndex.find(ChunkHash); It != Lookup.ChunkHashToChunkIndex.end()) { const uint32_t ChunkIndex = It->second; std::vector ChunkTargetPtrs = GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, Lookup, ChunkIndex); if (!ChunkTargetPtrs.empty()) { bool NeedsWrite = true; if (RemoteChunkIndexNeedsCopyFromSourceFlags[ChunkIndex].compare_exchange_strong(NeedsWrite, false)) { MemoryView ChunkMemoryView = BlockView.Mid(OffsetInBlock + CompressedBuffer::GetHeaderSizeForNoneEncoder(), ChunkCompressedSize - CompressedBuffer::GetHeaderSizeForNoneEncoder()); IoBuffer Decompressed(IoBuffer::Wrap, ChunkMemoryView.GetData(), ChunkMemoryView.GetSize()); ZEN_ASSERT_SLOW(ChunkHash == IoHash::HashBuffer(Decompressed)); ZEN_ASSERT(Decompressed.GetSize() == RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]); for (const ChunkedContentLookup::ChunkSequenceLocation* Target : ChunkTargetPtrs) { OutOps.WriteOps.push_back( BlockWriteOps::WriteOpData{.Target = Target, .ChunkBufferIndex = OutOps.ChunkBuffers.size()}); } OutOps.ChunkBuffers.emplace_back(std::move(Decompressed)); } } } OffsetInBlock += ChunkCompressedSize; } { ZEN_TRACE_CPU("GetBlockWriteOps_sort"); std::sort(OutOps.WriteOps.begin(), OutOps.WriteOps.end(), [](const BlockWriteOps::WriteOpData& Lhs, const BlockWriteOps::WriteOpData& Rhs) { if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex) { return true; } if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex) { return false; } return Lhs.Target->Offset < Rhs.Target->Offset; }); } return true; } bool WriteBlockToDisk(const std::filesystem::path& CacheFolderPath, const ChunkedFolderContent& RemoteContent, const ChunkBlockDescription& BlockDescription, std::span> SequenceIndexChunksLeftToWriteCounters, ParallellWork& Work, WorkerThreadPool& VerifyPool, CompositeBuffer&& BlockBuffer, const ChunkedContentLookup& Lookup, std::span> RemoteChunkIndexNeedsCopyFromSourceFlags, DiskStatistics& DiskStats, WriteChunkStatistics& WriteChunkStats) { ZEN_TRACE_CPU("WriteBlockToDisk"); IoBuffer BlockMemoryBuffer = MakeBufferMemoryBased(BlockBuffer); const MemoryView BlockView = BlockMemoryBuffer.GetView(); BlockWriteOps Ops; if ((BlockDescription.HeaderSize == 0) || BlockDescription.ChunkCompressedLengths.empty()) { ZEN_TRACE_CPU("WriteBlockToDisk_Legacy"); uint64_t HeaderSize; const std::vector ChunkCompressedLengths = ReadChunkBlockHeader(BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder()), HeaderSize); if (GetBlockWriteOps(RemoteContent, Lookup, BlockDescription.ChunkRawHashes, ChunkCompressedLengths, SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndexNeedsCopyFromSourceFlags, BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder() + HeaderSize), 0, gsl::narrow(BlockDescription.ChunkRawHashes.size() - 1), Ops)) { WriteBlockChunkOps(CacheFolderPath, RemoteContent, Lookup, SequenceIndexChunksLeftToWriteCounters, Ops, Work, VerifyPool, DiskStats, WriteChunkStats); return true; } return false; } if (GetBlockWriteOps(RemoteContent, Lookup, BlockDescription.ChunkRawHashes, BlockDescription.ChunkCompressedLengths, SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndexNeedsCopyFromSourceFlags, BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize), 0, gsl::narrow(BlockDescription.ChunkRawHashes.size() - 1), Ops)) { WriteBlockChunkOps(CacheFolderPath, RemoteContent, Lookup, SequenceIndexChunksLeftToWriteCounters, Ops, Work, VerifyPool, DiskStats, WriteChunkStats); return true; } return false; } bool WritePartialBlockToDisk(const std::filesystem::path& CacheFolderPath, const ChunkedFolderContent& RemoteContent, const ChunkBlockDescription& BlockDescription, std::span> SequenceIndexChunksLeftToWriteCounters, ParallellWork& Work, WorkerThreadPool& VerifyPool, CompositeBuffer&& PartialBlockBuffer, uint32_t FirstIncludedBlockChunkIndex, uint32_t LastIncludedBlockChunkIndex, const ChunkedContentLookup& Lookup, std::span> RemoteChunkIndexNeedsCopyFromSourceFlags, DiskStatistics& DiskStats, WriteChunkStatistics& WriteChunkStats) { ZEN_TRACE_CPU("WritePartialBlockToDisk"); IoBuffer BlockMemoryBuffer = MakeBufferMemoryBased(PartialBlockBuffer); const MemoryView BlockView = BlockMemoryBuffer.GetView(); BlockWriteOps Ops; if (GetBlockWriteOps(RemoteContent, Lookup, BlockDescription.ChunkRawHashes, BlockDescription.ChunkCompressedLengths, SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndexNeedsCopyFromSourceFlags, BlockView, FirstIncludedBlockChunkIndex, LastIncludedBlockChunkIndex, Ops)) { WriteBlockChunkOps(CacheFolderPath, RemoteContent, Lookup, SequenceIndexChunksLeftToWriteCounters, Ops, Work, VerifyPool, DiskStats, WriteChunkStats); return true; } else { return false; } } SharedBuffer Decompress(CompositeBuffer&& CompressedChunk, const IoHash& ChunkHash, const uint64_t ChunkRawSize) { ZEN_TRACE_CPU("Decompress"); IoHash RawHash; uint64_t RawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompressedChunk, RawHash, RawSize); if (!Compressed) { throw std::runtime_error(fmt::format("Invalid build blob format for chunk {}", ChunkHash)); } if (RawHash != ChunkHash) { throw std::runtime_error(fmt::format("Mismatching build blob {}, but compressed header rawhash is {}", ChunkHash, RawHash)); } if (RawSize != ChunkRawSize) { throw std::runtime_error( fmt::format("Mismatching build blob {}, expected raw size {} but recevied raw size {}", ChunkHash, ChunkRawSize, RawSize)); } if (!Compressed) { throw std::runtime_error(fmt::format("Invalid build blob {}, not a compressed buffer", ChunkHash)); } SharedBuffer Decompressed = Compressed.Decompress(); if (!Decompressed) { throw std::runtime_error(fmt::format("Decompression of build blob {} failed", ChunkHash)); } return Decompressed; } void WriteChunkToDisk(const std::filesystem::path& CacheFolderPath, const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, std::span ChunkTargets, CompositeBuffer&& ChunkData, WriteFileCache& OpenFileCache) { ZEN_TRACE_CPU("WriteChunkToDisk"); for (const ChunkedContentLookup::ChunkSequenceLocation* TargetPtr : ChunkTargets) { const auto& Target = *TargetPtr; const uint64_t FileOffset = Target.Offset; const uint32_t SequenceIndex = Target.SequenceIndex; const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex]; OpenFileCache.WriteToFile( SequenceIndex, [&CacheFolderPath, &Content](uint32_t SequenceIndex) { return GetTempChunkedSequenceFileName(CacheFolderPath, Content.ChunkedContent.SequenceRawHashes[SequenceIndex]); }, ChunkData, FileOffset, Content.RawSizes[PathIndex]); } } bool CanDecompressDirectToSequence(const ChunkedFolderContent& RemoteContent, const std::vector Locations) { if (Locations.size() == 1) { const uint32_t FirstSequenceIndex = Locations[0]->SequenceIndex; if (Locations[0]->Offset == 0 && RemoteContent.ChunkedContent.ChunkCounts[FirstSequenceIndex] == 1) { return true; } } return false; } void StreamDecompress(const std::filesystem::path& CacheFolderPath, const IoHash& SequenceRawHash, CompositeBuffer&& CompressedPart, DiskStatistics& DiskStats, WriteChunkStatistics& WriteChunkStats) { ZEN_TRACE_CPU("StreamDecompress"); const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash); TemporaryFile DecompressedTemp; std::error_code Ec; DecompressedTemp.CreateTemporary(TempChunkSequenceFileName.parent_path(), Ec); if (Ec) { throw std::runtime_error( fmt::format("Failed creating temporary file for decompressing large blob {}. Reason: {}", SequenceRawHash, Ec.message())); } IoHash RawHash; uint64_t RawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompressedPart, RawHash, RawSize); if (!Compressed) { throw std::runtime_error(fmt::format("Failed to parse header of compressed large blob {}", SequenceRawHash)); } if (RawHash != SequenceRawHash) { throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, SequenceRawHash)); } IoHashStream Hash; bool CouldDecompress = Compressed.DecompressToStream( 0, (uint64_t)-1, [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { ZEN_UNUSED(SourceOffset); ZEN_TRACE_CPU("StreamDecompress_Write"); DiskStats.ReadByteCount += SourceSize; if (!AbortFlag) { WriteChunkStats.ChunkBytesWritten += RangeBuffer.GetSize(); DecompressedTemp.Write(RangeBuffer, Offset); for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) { Hash.Append(Segment.GetView()); } DiskStats.WriteByteCount += RangeBuffer.GetSize(); DiskStats.WriteCount++; return true; } return false; }); if (AbortFlag) { return; } if (!CouldDecompress) { throw std::runtime_error(fmt::format("Failed to decompress large blob {}", SequenceRawHash)); } const IoHash VerifyHash = Hash.GetHash(); if (VerifyHash != SequenceRawHash) { throw std::runtime_error( fmt::format("Decompressed blob payload hash {} does not match expected hash {}", VerifyHash, SequenceRawHash)); } DecompressedTemp.MoveTemporaryIntoPlace(TempChunkSequenceFileName, Ec); if (Ec) { throw std::runtime_error( fmt::format("Failed moving temporary file for decompressing large blob {}. Reason: {}", SequenceRawHash, Ec.message())); } WriteChunkStats.ChunkCountWritten++; } bool WriteCompressedChunk(const std::filesystem::path& TargetFolder, const ChunkedFolderContent& RemoteContent, const ChunkedContentLookup& RemoteLookup, const IoHash& ChunkHash, const std::vector& ChunkTargetPtrs, IoBuffer&& CompressedPart, DiskStatistics& DiskStats, WriteChunkStatistics& WriteChunkStats) { auto ChunkHashToChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); ZEN_ASSERT(ChunkHashToChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end()); const uint32_t ChunkIndex = ChunkHashToChunkIndexIt->second; const uint64_t ChunkRawSize = RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; if (CanDecompressDirectToSequence(RemoteContent, ChunkTargetPtrs)) { const std::uint32_t SequenceIndex = ChunkTargetPtrs.front()->SequenceIndex; const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]; StreamDecompress(TargetFolder, SequenceRawHash, CompositeBuffer(std::move(CompressedPart)), DiskStats, WriteChunkStats); } else { SharedBuffer Chunk = Decompress(CompositeBuffer(std::move(CompressedPart)), ChunkHash, ChunkRawSize); if (!AbortFlag) { WriteFileCache OpenFileCache(DiskStats); WriteChunkToDisk(TargetFolder, RemoteContent, RemoteLookup, ChunkTargetPtrs, CompositeBuffer(std::move(Chunk)), OpenFileCache); WriteChunkStats.ChunkCountWritten++; WriteChunkStats.ChunkBytesWritten += ChunkRawSize; return true; } } return false; } void AsyncWriteDownloadedChunk(const std::filesystem::path& Path, const ChunkedFolderContent& RemoteContent, const ChunkedContentLookup& RemoteLookup, uint32_t RemoteChunkIndex, std::vector&& ChunkTargetPtrs, ParallellWork& Work, WorkerThreadPool& WritePool, IoBuffer&& Payload, std::span> SequenceIndexChunksLeftToWriteCounters, std::atomic& WritePartsComplete, const uint64_t TotalPartWriteCount, FilteredRate& FilteredWrittenBytesPerSecond, DiskStatistics& DiskStats, WriteChunkStatistics& WriteChunkStats) { ZEN_TRACE_CPU("AsyncWriteDownloadedChunk"); const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; const uint64_t Size = Payload.GetSize(); std::filesystem::path CompressedChunkPath; // Check if the dowloaded chunk is file based and we can move it directly without rewriting it { IoBufferFileReference FileRef; if (Payload.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == Size)) { ZEN_TRACE_CPU("MoveTempChunk"); std::error_code Ec; std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); if (!Ec) { Payload.SetDeleteOnClose(false); Payload = {}; CompressedChunkPath = Path / ZenTempDownloadFolderName / ChunkHash.ToHexString(); std::filesystem::rename(TempBlobPath, CompressedChunkPath, Ec); if (Ec) { CompressedChunkPath = std::filesystem::path{}; // Re-open the temp file again BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); Payload = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, Size, true); Payload.SetDeleteOnClose(true); } } } } if (CompressedChunkPath.empty() && (Size > 512u * 1024u)) { ZEN_TRACE_CPU("WriteTempChunk"); // Could not be moved and rather large, lets store it on disk CompressedChunkPath = Path / ZenTempDownloadFolderName / ChunkHash.ToHexString(); TemporaryFile::SafeWriteFile(CompressedChunkPath, Payload); Payload = {}; } Work.ScheduleWork( WritePool, // GetSyncWorkerPool(),// [&, SequenceIndexChunksLeftToWriteCounters, CompressedChunkPath, RemoteChunkIndex, TotalPartWriteCount, ChunkTargetPtrs = std::move(ChunkTargetPtrs), CompressedPart = std::move(Payload)](std::atomic&) mutable { ZEN_TRACE_CPU("UpdateFolder_WriteChunk"); if (!AbortFlag) { FilteredWrittenBytesPerSecond.Start(); const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; if (CompressedChunkPath.empty()) { ZEN_ASSERT(CompressedPart); } else { ZEN_ASSERT(!CompressedPart); CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); if (!CompressedPart) { throw std::runtime_error( fmt::format("Could not open dowloaded compressed chunk {} from {}", ChunkHash, CompressedChunkPath)); } } std::filesystem::path TargetFolder = Path / ZenTempCacheFolderName; bool NeedHashVerify = WriteCompressedChunk(TargetFolder, RemoteContent, RemoteLookup, ChunkHash, ChunkTargetPtrs, std::move(CompressedPart), DiskStats, WriteChunkStats); if (!AbortFlag) { WritePartsComplete++; if (WritePartsComplete == TotalPartWriteCount) { FilteredWrittenBytesPerSecond.Stop(); } std::filesystem::remove(CompressedChunkPath); std::vector CompletedSequences = CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); if (NeedHashVerify) { VerifyAndCompleteChunkSequencesAsync(TargetFolder, RemoteContent, CompletedSequences, Work, WritePool); } else { FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences); } } } }, Work.DefaultErrorFunction()); }; void UpdateFolder(BuildStorage& Storage, const Oid& BuildId, const std::filesystem::path& Path, const std::uint64_t LargeAttachmentSize, const std::uint64_t PreferredMultipartChunkSize, const ChunkedFolderContent& LocalContent, const ChunkedFolderContent& RemoteContent, const std::vector& BlockDescriptions, const std::vector& LooseChunkHashes, bool AllowPartialBlockRequests, bool WipeTargetFolder, FolderContent& OutLocalFolderState, DiskStatistics& DiskStats, CacheMappingStatistics& CacheMappingStats, DownloadStatistics& DownloadStats, WriteChunkStatistics& WriteChunkStats, RebuildFolderStateStatistics& RebuildFolderStateStats) { ZEN_TRACE_CPU("UpdateFolder"); ZEN_UNUSED(WipeTargetFolder); Stopwatch IndexTimer; const ChunkedContentLookup LocalLookup = BuildChunkedContentLookup(LocalContent); const ChunkedContentLookup RemoteLookup = BuildChunkedContentLookup(RemoteContent); ZEN_CONSOLE("Indexed local and remote content in {}", NiceTimeSpanMs(IndexTimer.GetElapsedTimeMs())); const std::filesystem::path CacheFolderPath = Path / ZenTempCacheFolderName; Stopwatch CacheMappingTimer; std::vector> SequenceIndexChunksLeftToWriteCounters(RemoteContent.ChunkedContent.SequenceRawHashes.size()); std::vector RemoteChunkIndexNeedsCopyFromLocalFileFlags(RemoteContent.ChunkedContent.ChunkHashes.size()); std::vector> RemoteChunkIndexNeedsCopyFromSourceFlags(RemoteContent.ChunkedContent.ChunkHashes.size()); tsl::robin_map CachedChunkHashesFound; tsl::robin_map CachedSequenceHashesFound; { ZEN_TRACE_CPU("UpdateFolder_CheckChunkCache"); DirectoryContent CacheDirContent; GetDirectoryContent(CacheFolderPath, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeFileSizes, CacheDirContent); for (size_t Index = 0; Index < CacheDirContent.Files.size(); Index++) { IoHash FileHash; if (IoHash::TryParse(CacheDirContent.Files[Index].filename().string(), FileHash)) { if (auto ChunkIt = RemoteLookup.ChunkHashToChunkIndex.find(FileHash); ChunkIt != RemoteLookup.ChunkHashToChunkIndex.end()) { const uint32_t ChunkIndex = ChunkIt->second; const uint64_t ChunkSize = RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; if (ChunkSize == CacheDirContent.FileSizes[Index]) { CachedChunkHashesFound.insert({FileHash, ChunkIndex}); CacheMappingStats.CacheChunkCount++; CacheMappingStats.CacheChunkByteCount += ChunkSize; continue; } } else if (auto SequenceIt = RemoteLookup.RawHashToSequenceIndex.find(FileHash); SequenceIt != RemoteLookup.RawHashToSequenceIndex.end()) { const uint32_t SequenceIndex = SequenceIt->second; const uint32_t PathIndex = RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; const uint64_t SequenceSize = RemoteContent.RawSizes[PathIndex]; if (SequenceSize == CacheDirContent.FileSizes[Index]) { CachedSequenceHashesFound.insert({FileHash, SequenceIndex}); CacheMappingStats.CacheSequenceHashesCount += SequenceSize; CacheMappingStats.CacheSequenceHashesByteCount++; continue; } } } std::filesystem::remove(CacheDirContent.Files[Index]); } } tsl::robin_map CachedBlocksFound; { ZEN_TRACE_CPU("UpdateFolder_CheckBlockCache"); tsl::robin_map AllBlockSizes; AllBlockSizes.reserve(BlockDescriptions.size()); for (uint32_t BlockIndex = 0; BlockIndex < BlockDescriptions.size(); BlockIndex++) { const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; AllBlockSizes.insert({BlockDescription.BlockHash, BlockIndex}); } DirectoryContent BlockDirContent; GetDirectoryContent(Path / ZenTempBlockFolderName, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeFileSizes, BlockDirContent); CachedBlocksFound.reserve(BlockDirContent.Files.size()); for (size_t Index = 0; Index < BlockDirContent.Files.size(); Index++) { IoHash FileHash; if (IoHash::TryParse(BlockDirContent.Files[Index].filename().string(), FileHash)) { if (auto BlockIt = AllBlockSizes.find(FileHash); BlockIt != AllBlockSizes.end()) { const uint32_t BlockIndex = BlockIt->second; const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; uint64_t BlockSize = CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize; for (uint64_t ChunkSize : BlockDescription.ChunkCompressedLengths) { BlockSize += ChunkSize; } if (BlockSize == BlockDirContent.FileSizes[Index]) { CachedBlocksFound.insert({FileHash, BlockIndex}); CacheMappingStats.CacheBlockCount++; CacheMappingStats.CacheBlocksByteCount += BlockSize; continue; } } } std::filesystem::remove(BlockDirContent.Files[Index]); } } std::vector LocalPathIndexesMatchingSequenceIndexes; // Pick up all whole files we can use from current local state { ZEN_TRACE_CPU("UpdateFolder_CheckLocalChunks"); for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < RemoteContent.ChunkedContent.SequenceRawHashes.size(); RemoteSequenceIndex++) { const IoHash& RemoteSequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; if (auto CacheSequenceIt = CachedSequenceHashesFound.find(RemoteSequenceRawHash); CacheSequenceIt != CachedSequenceHashesFound.end()) { // const uint32_t RemoteSequenceIndex = CacheSequenceIt->second; // const uint32_t RemotePathIndex = GetFirstPathIndexForSeqeuenceIndex(RemoteLookup, RemoteSequenceIndex); // RemoteSequenceByteCountFoundInCache += RemoteContent.RawSizes[RemotePathIndex]; } else if (auto CacheChunkIt = CachedChunkHashesFound.find(RemoteSequenceRawHash); CacheChunkIt != CachedChunkHashesFound.end()) { // const uint32_t RemoteChunkIndex = CacheChunkIt->second; // const uint32_t RemotePathIndex = GetFirstPathIndexForSeqeuenceIndex(RemoteLookup, RemoteSequenceIndex); // RemoteSequenceByteCountFoundInCache += RemoteContent.RawSizes[RemotePathIndex]; } else if (auto It = LocalLookup.RawHashToSequenceIndex.find(RemoteSequenceRawHash); It != LocalLookup.RawHashToSequenceIndex.end()) { const uint32_t LocalSequenceIndex = It->second; const uint32_t LocalPathIndex = GetFirstPathIndexForSeqeuenceIndex(LocalLookup, LocalSequenceIndex); uint64_t RawSize = LocalContent.RawSizes[LocalPathIndex]; LocalPathIndexesMatchingSequenceIndexes.push_back(LocalPathIndex); CacheMappingStats.LocalPathsMatchingSequencesCount++; CacheMappingStats.LocalPathsMatchingSequencesByteCount += RawSize; } else { // We must write the sequence const uint32_t ChunkCount = RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]; SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = ChunkCount; } } } // Pick up all chunks in current local state struct CacheCopyData { uint32_t LocalSequenceIndex = (uint32_t)-1; std::vector TargetChunkLocationPtrs; struct ChunkTarget { uint32_t TargetChunkLocationCount = (uint32_t)-1; uint32_t RemoteChunkIndex = (uint32_t)-1; uint64_t CacheFileOffset = (uint64_t)-1; }; std::vector ChunkTargets; }; tsl::robin_map RawHashToCacheCopyDataIndex; std::vector CacheCopyDatas; { ZEN_TRACE_CPU("UpdateFolder_GetLocalChunks"); for (uint32_t LocalSequenceIndex = 0; LocalSequenceIndex < LocalContent.ChunkedContent.SequenceRawHashes.size(); LocalSequenceIndex++) { const IoHash& LocalSequenceRawHash = LocalContent.ChunkedContent.SequenceRawHashes[LocalSequenceIndex]; const uint32_t LocalOrderOffset = LocalLookup.SequenceIndexChunkOrderOffset[LocalSequenceIndex]; { uint64_t SourceOffset = 0; const uint32_t LocalChunkCount = LocalContent.ChunkedContent.ChunkCounts[LocalSequenceIndex]; for (uint32_t LocalOrderIndex = 0; LocalOrderIndex < LocalChunkCount; LocalOrderIndex++) { const uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[LocalOrderOffset + LocalOrderIndex]; const IoHash& LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; const uint64_t LocalChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[LocalChunkIndex]; if (auto RemoteChunkIt = RemoteLookup.ChunkHashToChunkIndex.find(LocalChunkHash); RemoteChunkIt != RemoteLookup.ChunkHashToChunkIndex.end()) { const uint32_t RemoteChunkIndex = RemoteChunkIt->second; if (!RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) { std::vector ChunkTargetPtrs = GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex); if (!ChunkTargetPtrs.empty()) { CacheCopyData::ChunkTarget Target = { .TargetChunkLocationCount = gsl::narrow(ChunkTargetPtrs.size()), .RemoteChunkIndex = RemoteChunkIndex, .CacheFileOffset = SourceOffset}; if (auto CopySourceIt = RawHashToCacheCopyDataIndex.find(LocalSequenceRawHash); CopySourceIt != RawHashToCacheCopyDataIndex.end()) { CacheCopyData& Data = CacheCopyDatas[CopySourceIt->second]; if (Data.TargetChunkLocationPtrs.size() > 1024) { RawHashToCacheCopyDataIndex.insert_or_assign(LocalSequenceRawHash, CacheCopyDatas.size()); CacheCopyDatas.push_back( CacheCopyData{.LocalSequenceIndex = LocalSequenceIndex, .TargetChunkLocationPtrs = ChunkTargetPtrs, .ChunkTargets = std::vector{Target}}); } else { Data.TargetChunkLocationPtrs.insert(Data.TargetChunkLocationPtrs.end(), ChunkTargetPtrs.begin(), ChunkTargetPtrs.end()); Data.ChunkTargets.push_back(Target); } } else { RawHashToCacheCopyDataIndex.insert_or_assign(LocalSequenceRawHash, CacheCopyDatas.size()); CacheCopyDatas.push_back( CacheCopyData{.LocalSequenceIndex = LocalSequenceIndex, .TargetChunkLocationPtrs = ChunkTargetPtrs, .ChunkTargets = std::vector{Target}}); } CacheMappingStats.LocalChunkMatchingRemoteCount++; CacheMappingStats.LocalChunkMatchingRemoteByteCount += LocalChunkRawSize; RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex] = true; } } } SourceOffset += LocalChunkRawSize; } } } } if (!CachedSequenceHashesFound.empty() || !CachedChunkHashesFound.empty() || !CachedBlocksFound.empty() || !LocalPathIndexesMatchingSequenceIndexes.empty() || CacheMappingStats.LocalChunkMatchingRemoteCount > 0) { ZEN_CONSOLE( "Cache: {} ({}) chunk sequences, {} ({}) chunks, {} ({}) blocks. Local state: {} ({}) chunk sequences, {} ({}) chunks", CachedSequenceHashesFound.size(), NiceBytes(CacheMappingStats.CacheSequenceHashesByteCount), CachedChunkHashesFound.size(), NiceBytes(CacheMappingStats.CacheChunkByteCount), CachedBlocksFound.size(), NiceBytes(CacheMappingStats.CacheBlocksByteCount), LocalPathIndexesMatchingSequenceIndexes.size(), NiceBytes(CacheMappingStats.LocalPathsMatchingSequencesByteCount), CacheMappingStats.LocalChunkMatchingRemoteCount, NiceBytes(CacheMappingStats.LocalChunkMatchingRemoteByteCount)); } uint32_t ChunkCountToWrite = 0; for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++) { if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) { ChunkCountToWrite++; } else { std::vector ChunkTargetPtrs = GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex); if (!ChunkTargetPtrs.empty()) { RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex] = true; ChunkCountToWrite++; } } } uint64_t TotalRequestCount = 0; uint64_t TotalPartWriteCount = 0; std::atomic WritePartsComplete = 0; { ZEN_TRACE_CPU("WriteChunks"); Stopwatch WriteTimer; FilteredRate FilteredDownloadedBytesPerSecond; FilteredRate FilteredWrittenBytesPerSecond; WorkerThreadPool& NetworkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // WorkerThreadPool& WritePool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // ProgressBar WriteProgressBar(UsePlainProgress); ParallellWork Work(AbortFlag); struct LooseChunkHashWorkData { std::vector ChunkTargetPtrs; uint32_t RemoteChunkIndex = (uint32_t)-1; }; std::vector LooseChunkHashWorks; TotalPartWriteCount += CacheCopyDatas.size(); for (const IoHash ChunkHash : LooseChunkHashes) { auto RemoteChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); ZEN_ASSERT(RemoteChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end()); const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second; if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) { ZEN_DEBUG("Skipping chunk {} due to cache reuse", ChunkHash); continue; } bool NeedsCopy = true; if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex].compare_exchange_strong(NeedsCopy, false)) { std::vector ChunkTargetPtrs = GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex); if (ChunkTargetPtrs.empty()) { ZEN_DEBUG("Skipping chunk {} due to cache reuse", ChunkHash); } else { TotalRequestCount++; TotalPartWriteCount++; LooseChunkHashWorks.push_back( LooseChunkHashWorkData{.ChunkTargetPtrs = ChunkTargetPtrs, .RemoteChunkIndex = RemoteChunkIndex}); } } } uint32_t BlockCount = gsl::narrow(BlockDescriptions.size()); std::vector ChunkIsPickedUpByBlock(RemoteContent.ChunkedContent.ChunkHashes.size(), false); auto GetNeededChunkBlockIndexes = [&RemoteContent, &RemoteLookup, &RemoteChunkIndexNeedsCopyFromSourceFlags, &ChunkIsPickedUpByBlock](const ChunkBlockDescription& BlockDescription) { ZEN_TRACE_CPU("UpdateFolder_GetNeededChunkBlockIndexes"); std::vector NeededBlockChunkIndexes; for (uint32_t ChunkBlockIndex = 0; ChunkBlockIndex < BlockDescription.ChunkRawHashes.size(); ChunkBlockIndex++) { const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex]; if (auto It = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); It != RemoteLookup.ChunkHashToChunkIndex.end()) { const uint32_t RemoteChunkIndex = It->second; if (!ChunkIsPickedUpByBlock[RemoteChunkIndex]) { if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex]) { ChunkIsPickedUpByBlock[RemoteChunkIndex] = true; NeededBlockChunkIndexes.push_back(ChunkBlockIndex); } } } } return NeededBlockChunkIndexes; }; std::vector CachedChunkBlockIndexes; struct BlockRangeDescriptor { uint32_t BlockIndex = (uint32_t)-1; uint64_t RangeStart = 0; uint64_t RangeLength = 0; uint32_t ChunkBlockIndexStart = 0; uint32_t ChunkBlockIndexCount = 0; }; std::vector BlockRangeWorks; std::vector FullBlockWorks; for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; BlockIndex++) { const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; const std::vector BlockChunkIndexNeeded = GetNeededChunkBlockIndexes(BlockDescription); if (!BlockChunkIndexNeeded.empty()) { bool UsingCachedBlock = false; if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end()) { ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_CacheGet"); TotalPartWriteCount++; std::filesystem::path BlockPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString(); if (std::filesystem::exists(BlockPath)) { CachedChunkBlockIndexes.push_back(BlockIndex); UsingCachedBlock = true; } } if (!UsingCachedBlock) { bool WantsToDoPartialBlockDownload = BlockChunkIndexNeeded.size() < BlockDescription.ChunkRawHashes.size(); bool CanDoPartialBlockDownload = (BlockDescription.HeaderSize > 0) && (BlockDescription.ChunkCompressedLengths.size() == BlockDescription.ChunkRawHashes.size()); if (AllowPartialBlockRequests && WantsToDoPartialBlockDownload && CanDoPartialBlockDownload) { std::vector BlockRanges; ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_PartialAnalysis"); uint32_t NeedBlockChunkIndexOffset = 0; uint32_t ChunkBlockIndex = 0; uint32_t CurrentOffset = gsl::narrow(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); BlockRangeDescriptor NextRange{.BlockIndex = BlockIndex}; while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() && ChunkBlockIndex < BlockDescription.ChunkRawHashes.size()) { const uint32_t ChunkCompressedLength = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex]; if (ChunkBlockIndex < BlockChunkIndexNeeded[NeedBlockChunkIndexOffset]) { if (NextRange.RangeLength > 0) { BlockRanges.push_back(NextRange); NextRange = {.BlockIndex = BlockIndex}; } ChunkBlockIndex++; CurrentOffset += ChunkCompressedLength; } else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset]) { if (NextRange.RangeLength == 0) { NextRange.RangeStart = CurrentOffset; NextRange.ChunkBlockIndexStart = ChunkBlockIndex; } NextRange.RangeLength += ChunkCompressedLength; NextRange.ChunkBlockIndexCount++; ChunkBlockIndex++; CurrentOffset += ChunkCompressedLength; NeedBlockChunkIndexOffset++; } else { ZEN_ASSERT(false); } } if (NextRange.RangeLength > 0) { BlockRanges.push_back(NextRange); } ZEN_ASSERT(!BlockRanges.empty()); std::vector CollapsedBlockRanges; auto It = BlockRanges.begin(); CollapsedBlockRanges.push_back(*It++); while (It != BlockRanges.end()) { BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back(); uint64_t Slack = It->RangeStart - (LastRange.RangeStart + LastRange.RangeLength); uint64_t BothRangeSize = It->RangeLength + LastRange.RangeLength; if (Slack <= Max(BothRangeSize / 8, 64u * 1024u)) // Made up heuristic - we'll see how it pans out { LastRange.ChunkBlockIndexCount = (It->ChunkBlockIndexStart + It->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart; LastRange.RangeLength = (It->RangeStart + It->RangeLength) - LastRange.RangeStart; } else { CollapsedBlockRanges.push_back(*It); } ++It; } TotalRequestCount += CollapsedBlockRanges.size(); TotalPartWriteCount += CollapsedBlockRanges.size(); BlockRangeWorks.insert(BlockRangeWorks.end(), CollapsedBlockRanges.begin(), CollapsedBlockRanges.end()); } else { TotalRequestCount++; TotalPartWriteCount++; FullBlockWorks.push_back(BlockIndex); } } } else { ZEN_DEBUG("Skipping block {} due to cache reuse", BlockDescriptions[BlockIndex].BlockHash); } } for (uint32_t LooseChunkHashWorkIndex = 0; LooseChunkHashWorkIndex < LooseChunkHashWorks.size(); LooseChunkHashWorkIndex++) { if (AbortFlag) { break; } LooseChunkHashWorkData& LooseChunkHashWork = LooseChunkHashWorks[LooseChunkHashWorkIndex]; std::vector ChunkTargetPtrs = std::move(LooseChunkHashWork.ChunkTargetPtrs); const uint32_t RemoteChunkIndex = LooseChunkHashWork.RemoteChunkIndex; Work.ScheduleWork( WritePool, // NetworkPool, // GetSyncWorkerPool(),// [&, RemoteChunkIndex, ChunkTargetPtrs](std::atomic&) mutable { if (!AbortFlag) { ZEN_TRACE_CPU("UpdateFolder_ReadPreDownloaded"); std::filesystem::path ExistingCompressedChunkPath; { const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; std::filesystem::path CompressedChunkPath = Path / ZenTempDownloadFolderName / ChunkHash.ToHexString(); if (std::filesystem::exists(CompressedChunkPath)) { IoBuffer ExistingCompressedPart = IoBufferBuilder::MakeFromFile(ExistingCompressedChunkPath); if (ExistingCompressedPart) { IoHash RawHash; uint64_t RawSize; if (CompressedBuffer::ValidateCompressedHeader(ExistingCompressedPart, RawHash, RawSize)) { DownloadStats.RequestsCompleteCount++; if (DownloadStats.RequestsCompleteCount == TotalRequestCount) { FilteredDownloadedBytesPerSecond.Stop(); } ExistingCompressedChunkPath = std::move(CompressedChunkPath); } else { std::error_code DummyEc; std::filesystem::remove(CompressedChunkPath, DummyEc); } } } } if (!ExistingCompressedChunkPath.empty()) { Work.ScheduleWork( WritePool, // WritePool, GetSyncWorkerPool() [&Path, &RemoteContent, &RemoteLookup, &CacheFolderPath, &SequenceIndexChunksLeftToWriteCounters, &Work, &WritePool, &DiskStats, &WriteChunkStats, &WritePartsComplete, &TotalPartWriteCount, &FilteredWrittenBytesPerSecond, RemoteChunkIndex, ChunkTargetPtrs, CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic&) mutable { if (!AbortFlag) { ZEN_TRACE_CPU("UpdateFolder_WritePreDownloaded"); FilteredWrittenBytesPerSecond.Start(); const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); if (!CompressedPart) { throw std::runtime_error(fmt::format("Could not open dowloaded compressed chunk {} from {}", ChunkHash, CompressedChunkPath)); } std::filesystem::path TargetFolder = Path / ZenTempCacheFolderName; bool NeedHashVerify = WriteCompressedChunk(TargetFolder, RemoteContent, RemoteLookup, ChunkHash, ChunkTargetPtrs, std::move(CompressedPart), DiskStats, WriteChunkStats); WriteChunkStats.ChunkCountWritten++; WriteChunkStats.ChunkBytesWritten += RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex]; WritePartsComplete++; if (!AbortFlag) { if (WritePartsComplete == TotalPartWriteCount) { FilteredWrittenBytesPerSecond.Stop(); } std::filesystem::remove(CompressedChunkPath); std::vector CompletedSequences = CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); if (NeedHashVerify) { VerifyAndCompleteChunkSequencesAsync(TargetFolder, RemoteContent, CompletedSequences, Work, WritePool); } else { FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences); } } } }, Work.DefaultErrorFunction()); } else { FilteredDownloadedBytesPerSecond.Start(); const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize) { ZEN_TRACE_CPU("UpdateFolder_GetLargeChunk"); DownloadLargeBlob(Storage, Path / ZenTempDownloadFolderName, BuildId, ChunkHash, PreferredMultipartChunkSize, Work, NetworkPool, DownloadStats, [&, RemoteChunkIndex, ChunkTargetPtrs](IoBuffer&& Payload) mutable { DownloadStats.RequestsCompleteCount++; if (DownloadStats.RequestsCompleteCount == TotalRequestCount) { FilteredDownloadedBytesPerSecond.Stop(); } AsyncWriteDownloadedChunk(Path, RemoteContent, RemoteLookup, RemoteChunkIndex, std::move(ChunkTargetPtrs), Work, WritePool, std::move(Payload), SequenceIndexChunksLeftToWriteCounters, WritePartsComplete, TotalPartWriteCount, FilteredWrittenBytesPerSecond, DiskStats, WriteChunkStats); }); } else { ZEN_TRACE_CPU("UpdateFolder_GetChunk"); IoBuffer BuildBlob = Storage.GetBuildBlob(BuildId, ChunkHash); if (!BuildBlob) { throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash)); } uint64_t BlobSize = BuildBlob.GetSize(); DownloadStats.DownloadedChunkCount++; DownloadStats.DownloadedChunkByteCount += BlobSize; DownloadStats.RequestsCompleteCount++; if (DownloadStats.RequestsCompleteCount == TotalRequestCount) { FilteredDownloadedBytesPerSecond.Stop(); } AsyncWriteDownloadedChunk(Path, RemoteContent, RemoteLookup, RemoteChunkIndex, std::move(ChunkTargetPtrs), Work, WritePool, std::move(BuildBlob), SequenceIndexChunksLeftToWriteCounters, WritePartsComplete, TotalPartWriteCount, FilteredWrittenBytesPerSecond, DiskStats, WriteChunkStats); } } } }, Work.DefaultErrorFunction()); } for (size_t CopyDataIndex = 0; CopyDataIndex < CacheCopyDatas.size(); CopyDataIndex++) { if (AbortFlag) { break; } Work.ScheduleWork( WritePool, // GetSyncWorkerPool(),// [&, CopyDataIndex](std::atomic&) { if (!AbortFlag) { ZEN_TRACE_CPU("UpdateFolder_CopyLocal"); FilteredWrittenBytesPerSecond.Start(); const CacheCopyData& CopyData = CacheCopyDatas[CopyDataIndex]; const uint32_t LocalPathIndex = LocalLookup.SequenceIndexFirstPathIndex[CopyData.LocalSequenceIndex]; const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred(); ZEN_ASSERT(!CopyData.TargetChunkLocationPtrs.empty()); uint64_t CacheLocalFileBytesRead = 0; size_t TargetStart = 0; const std::span AllTargets( CopyData.TargetChunkLocationPtrs); struct WriteOp { const ChunkedContentLookup::ChunkSequenceLocation* Target = nullptr; uint64_t CacheFileOffset = (uint64_t)-1; uint32_t ChunkIndex = (uint32_t)-1; }; std::vector WriteOps; if (!AbortFlag) { ZEN_TRACE_CPU("Sort"); WriteOps.reserve(AllTargets.size()); for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets) { std::span TargetRange = AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount); for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange) { WriteOps.push_back(WriteOp{.Target = Target, .CacheFileOffset = ChunkTarget.CacheFileOffset, .ChunkIndex = ChunkTarget.RemoteChunkIndex}); } TargetStart += ChunkTarget.TargetChunkLocationCount; } std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) { if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex) { return true; } else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex) { return false; } if (Lhs.Target->Offset < Rhs.Target->Offset) { return true; } return false; }); } if (!AbortFlag) { ZEN_TRACE_CPU("Write"); tsl::robin_set ChunkIndexesWritten; BufferedOpenFile SourceFile(LocalFilePath, DiskStats); WriteFileCache OpenFileCache(DiskStats); for (size_t WriteOpIndex = 0; WriteOpIndex < WriteOps.size();) { if (AbortFlag) { break; } const WriteOp& Op = WriteOps[WriteOpIndex]; const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex; ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() <= RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]); ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() > 0); const uint32_t RemotePathIndex = RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]; const uint64_t ChunkSize = RemoteContent.ChunkedContent.ChunkRawSizes[Op.ChunkIndex]; uint64_t ReadLength = ChunkSize; size_t WriteCount = 1; uint64_t OpSourceEnd = Op.CacheFileOffset + ChunkSize; uint64_t OpTargetEnd = Op.Target->Offset + ChunkSize; while ((WriteOpIndex + WriteCount) < WriteOps.size()) { const WriteOp& NextOp = WriteOps[WriteOpIndex + WriteCount]; if (NextOp.Target->SequenceIndex != Op.Target->SequenceIndex) { break; } if (NextOp.Target->Offset != OpTargetEnd) { break; } if (NextOp.CacheFileOffset != OpSourceEnd) { break; } const uint64_t NextChunkLength = RemoteContent.ChunkedContent.ChunkRawSizes[NextOp.ChunkIndex]; if (ReadLength + NextChunkLength > 512u * 1024u) { break; } ReadLength += NextChunkLength; OpSourceEnd += NextChunkLength; OpTargetEnd += NextChunkLength; WriteCount++; } CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ReadLength); ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]); OpenFileCache.WriteToFile( RemoteSequenceIndex, [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) { return GetTempChunkedSequenceFileName( CacheFolderPath, RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); }, ChunkSource, Op.Target->Offset, RemoteContent.RawSizes[RemotePathIndex]); for (size_t WrittenOpIndex = WriteOpIndex; WrittenOpIndex < WriteOpIndex + WriteCount; WrittenOpIndex++) { const WriteOp& WrittenOp = WriteOps[WrittenOpIndex]; if (ChunkIndexesWritten.insert(WrittenOp.ChunkIndex).second) { WriteChunkStats.ChunkCountWritten++; WriteChunkStats.ChunkBytesWritten += RemoteContent.ChunkedContent.ChunkRawSizes[WrittenOp.ChunkIndex]; } } CacheLocalFileBytesRead += ReadLength; // TODO: This should be the sum of unique chunk sizes? WriteOpIndex += WriteCount; } } if (!AbortFlag) { // Write tracking, updating this must be done without any files open (WriteFileCache) std::vector CompletedChunkSequences; for (const WriteOp& Op : WriteOps) { const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex; if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) { CompletedChunkSequences.push_back(RemoteSequenceIndex); } } VerifyAndCompleteChunkSequencesAsync(CacheFolderPath, RemoteContent, CompletedChunkSequences, Work, WritePool); ZEN_DEBUG("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), LocalContent.Paths[LocalPathIndex]); } WritePartsComplete++; if (WritePartsComplete == TotalPartWriteCount) { FilteredWrittenBytesPerSecond.Stop(); } } }, Work.DefaultErrorFunction()); } for (uint32_t BlockIndex : CachedChunkBlockIndexes) { if (AbortFlag) { break; } Work.ScheduleWork( WritePool, // GetSyncWorkerPool(), // WritePool, [&, BlockIndex](std::atomic&) mutable { if (!AbortFlag) { ZEN_TRACE_CPU("UpdateFolder_WriteCachedBlock"); const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; FilteredWrittenBytesPerSecond.Start(); std::filesystem::path BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString(); IoBuffer BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); if (!BlockBuffer) { throw std::runtime_error( fmt::format("Can not read block {} at {}", BlockDescription.BlockHash, BlockChunkPath)); } if (!WriteBlockToDisk(CacheFolderPath, RemoteContent, BlockDescription, SequenceIndexChunksLeftToWriteCounters, Work, WritePool, CompositeBuffer(std::move(BlockBuffer)), RemoteLookup, RemoteChunkIndexNeedsCopyFromSourceFlags, DiskStats, WriteChunkStats)) { std::error_code DummyEc; std::filesystem::remove(BlockChunkPath, DummyEc); throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash)); } WritePartsComplete++; std::filesystem::remove(BlockChunkPath); if (WritePartsComplete == TotalPartWriteCount) { FilteredWrittenBytesPerSecond.Stop(); } } }, Work.DefaultErrorFunction()); } for (size_t BlockRangeIndex = 0; BlockRangeIndex < BlockRangeWorks.size(); BlockRangeIndex++) { if (AbortFlag) { break; } const BlockRangeDescriptor BlockRange = BlockRangeWorks[BlockRangeIndex]; ZEN_ASSERT(BlockRange.BlockIndex != (uint32_t)-1); const uint32_t BlockIndex = BlockRange.BlockIndex; Work.ScheduleWork( NetworkPool, // NetworkPool, // GetSyncWorkerPool() [&, BlockIndex, BlockRange](std::atomic&) { if (!AbortFlag) { ZEN_TRACE_CPU("UpdateFolder_GetPartialBlock"); const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; FilteredDownloadedBytesPerSecond.Start(); IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength); if (!BlockBuffer) { throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); } uint64_t BlockSize = BlockBuffer.GetSize(); DownloadStats.DownloadedBlockCount++; DownloadStats.DownloadedBlockByteCount += BlockSize; DownloadStats.RequestsCompleteCount++; if (DownloadStats.RequestsCompleteCount == TotalRequestCount) { FilteredDownloadedBytesPerSecond.Stop(); } std::filesystem::path BlockChunkPath; // Check if the dowloaded block is file based and we can move it directly without rewriting it { IoBufferFileReference FileRef; if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == BlockSize)) { ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock"); std::error_code Ec; std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); if (!Ec) { BlockBuffer.SetDeleteOnClose(false); BlockBuffer = {}; BlockChunkPath = Path / ZenTempBlockFolderName / fmt::format("{}_{:x}_{:x}", BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength); std::filesystem::rename(TempBlobPath, BlockChunkPath, Ec); if (Ec) { BlockChunkPath = std::filesystem::path{}; // Re-open the temp file again BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); BlockBuffer.SetDeleteOnClose(true); } } } } if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u)) { ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock"); // Could not be moved and rather large, lets store it on disk BlockChunkPath = Path / ZenTempBlockFolderName / fmt::format("{}_{:x}_{:x}", BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength); TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); BlockBuffer = {}; } if (!AbortFlag) { Work.ScheduleWork( WritePool, // WritePool, // GetSyncWorkerPool(), [&, BlockIndex, BlockRange, BlockChunkPath, BlockPartialBuffer = std::move(BlockBuffer)]( std::atomic&) mutable { if (!AbortFlag) { ZEN_TRACE_CPU("UpdateFolder_WritePartialBlock"); const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; if (BlockChunkPath.empty()) { ZEN_ASSERT(BlockPartialBuffer); } else { ZEN_ASSERT(!BlockPartialBuffer); BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); if (!BlockPartialBuffer) { throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}", BlockDescription.BlockHash, BlockChunkPath)); } } FilteredWrittenBytesPerSecond.Start(); if (!WritePartialBlockToDisk( CacheFolderPath, RemoteContent, BlockDescription, SequenceIndexChunksLeftToWriteCounters, Work, WritePool, CompositeBuffer(std::move(BlockPartialBuffer)), BlockRange.ChunkBlockIndexStart, BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1, RemoteLookup, RemoteChunkIndexNeedsCopyFromSourceFlags, DiskStats, WriteChunkStats)) { std::error_code DummyEc; std::filesystem::remove(BlockChunkPath, DummyEc); throw std::runtime_error( fmt::format("Partial block {} is malformed", BlockDescription.BlockHash)); } if (!BlockChunkPath.empty()) { std::filesystem::remove(BlockChunkPath); } WritePartsComplete++; if (WritePartsComplete == TotalPartWriteCount) { FilteredWrittenBytesPerSecond.Stop(); } } }, Work.DefaultErrorFunction()); } } }, Work.DefaultErrorFunction()); } for (uint32_t BlockIndex : FullBlockWorks) { if (AbortFlag) { break; } Work.ScheduleWork( NetworkPool, // GetSyncWorkerPool(), // NetworkPool, [&, BlockIndex](std::atomic&) { if (!AbortFlag) { ZEN_TRACE_CPU("UpdateFolder_GetFullBlock"); const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; FilteredDownloadedBytesPerSecond.Start(); IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockDescription.BlockHash); if (!BlockBuffer) { throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); } uint64_t BlockSize = BlockBuffer.GetSize(); DownloadStats.DownloadedBlockCount++; DownloadStats.DownloadedBlockByteCount += BlockSize; DownloadStats.RequestsCompleteCount++; if (DownloadStats.RequestsCompleteCount == TotalRequestCount) { FilteredDownloadedBytesPerSecond.Stop(); } std::filesystem::path BlockChunkPath; // Check if the dowloaded block is file based and we can move it directly without rewriting it { IoBufferFileReference FileRef; if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == BlockSize)) { ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock"); std::error_code Ec; std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); if (!Ec) { BlockBuffer.SetDeleteOnClose(false); BlockBuffer = {}; BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString(); std::filesystem::rename(TempBlobPath, BlockChunkPath, Ec); if (Ec) { BlockChunkPath = std::filesystem::path{}; // Re-open the temp file again BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); BlockBuffer.SetDeleteOnClose(true); } } } } if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u)) { ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock"); // Could not be moved and rather large, lets store it on disk BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString(); TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); BlockBuffer = {}; } if (!AbortFlag) { Work.ScheduleWork( WritePool, // WritePool, GetSyncWorkerPool() [&Work, &WritePool, &RemoteContent, &RemoteLookup, CacheFolderPath, &RemoteChunkIndexNeedsCopyFromSourceFlags, &SequenceIndexChunksLeftToWriteCounters, BlockIndex, &BlockDescriptions, &WriteChunkStats, &DiskStats, &WritePartsComplete, &TotalPartWriteCount, &FilteredWrittenBytesPerSecond, BlockChunkPath, BlockBuffer = std::move(BlockBuffer)](std::atomic&) mutable { if (!AbortFlag) { ZEN_TRACE_CPU("UpdateFolder_WriteFullBlock"); const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; if (BlockChunkPath.empty()) { ZEN_ASSERT(BlockBuffer); } else { ZEN_ASSERT(!BlockBuffer); BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); if (!BlockBuffer) { throw std::runtime_error(fmt::format("Could not open dowloaded block {} from {}", BlockDescription.BlockHash, BlockChunkPath)); } } FilteredWrittenBytesPerSecond.Start(); if (!WriteBlockToDisk(CacheFolderPath, RemoteContent, BlockDescription, SequenceIndexChunksLeftToWriteCounters, Work, WritePool, CompositeBuffer(std::move(BlockBuffer)), RemoteLookup, RemoteChunkIndexNeedsCopyFromSourceFlags, DiskStats, WriteChunkStats)) { std::error_code DummyEc; std::filesystem::remove(BlockChunkPath, DummyEc); throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash)); } if (!BlockChunkPath.empty()) { std::filesystem::remove(BlockChunkPath); } WritePartsComplete++; if (WritePartsComplete == TotalPartWriteCount) { FilteredWrittenBytesPerSecond.Stop(); } } }, Work.DefaultErrorFunction()); } } }, Work.DefaultErrorFunction()); } { ZEN_TRACE_CPU("WriteChunks_Wait"); Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted, PendingWork); ZEN_ASSERT(ChunkCountToWrite >= WriteChunkStats.ChunkCountWritten.load()); uint64_t DownloadedBytes = DownloadStats.DownloadedChunkByteCount.load() + DownloadStats.DownloadedBlockByteCount.load() + +DownloadStats.DownloadedPartialBlockByteCount.load(); FilteredWrittenBytesPerSecond.Update(DiskStats.WriteByteCount.load()); FilteredDownloadedBytesPerSecond.Update(DownloadedBytes); std::string Details = fmt::format("{}/{} ({} {}bits/s) downloaded. {}/{} ({} {}B/s) written.", DownloadStats.RequestsCompleteCount.load(), TotalRequestCount, NiceBytes(DownloadedBytes), NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8), WriteChunkStats.ChunkCountWritten.load(), ChunkCountToWrite, NiceBytes(DiskStats.WriteByteCount.load()), NiceNum(FilteredWrittenBytesPerSecond.GetCurrent())); WriteProgressBar.UpdateState( {.Task = "Writing chunks ", .Details = Details, .TotalCount = gsl::narrow(ChunkCountToWrite), .RemainingCount = gsl::narrow(ChunkCountToWrite - WriteChunkStats.ChunkCountWritten.load())}, false); }); } FilteredWrittenBytesPerSecond.Stop(); FilteredDownloadedBytesPerSecond.Stop(); if (AbortFlag) { return; } WriteProgressBar.Finish(); uint32_t RawSequencesMissingWriteCount = 0; for (uint32_t SequenceIndex = 0; SequenceIndex < SequenceIndexChunksLeftToWriteCounters.size(); SequenceIndex++) { const auto& SequenceIndexChunksLeftToWriteCounter = SequenceIndexChunksLeftToWriteCounters[SequenceIndex]; if (SequenceIndexChunksLeftToWriteCounter.load() != 0) { RawSequencesMissingWriteCount++; const uint32_t PathIndex = RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; const std::filesystem::path& IncompletePath = RemoteContent.Paths[PathIndex]; ZEN_ASSERT(!IncompletePath.empty()); const uint32_t ExpectedSequenceCount = RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]; ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounter.load() <= ExpectedSequenceCount); } } ZEN_ASSERT(RawSequencesMissingWriteCount == 0); const uint64_t DownloadedBytes = DownloadStats.DownloadedChunkByteCount.load() + DownloadStats.DownloadedBlockByteCount.load() + +DownloadStats.DownloadedPartialBlockByteCount.load(); ZEN_CONSOLE("Downloaded {} ({}bits/s) in {}. Wrote {} ({}B/s) in {}. Completed in {}", NiceBytes(DownloadedBytes), NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), DownloadedBytes * 8)), NiceTimeSpanMs(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS() / 1000), NiceBytes(DiskStats.WriteByteCount.load()), NiceNum(GetBytesPerSecond(FilteredWrittenBytesPerSecond.GetElapsedTimeUS(), DiskStats.WriteByteCount.load())), NiceTimeSpanMs(FilteredWrittenBytesPerSecond.GetElapsedTimeUS() / 1000), NiceTimeSpanMs(WriteTimer.GetElapsedTimeMs())); WriteChunkStats.WriteChunksElapsedWallTimeUs = WriteTimer.GetElapsedTimeUs(); WriteChunkStats.DownloadTimeUs = FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(); WriteChunkStats.WriteTimeUs = FilteredWrittenBytesPerSecond.GetElapsedTimeUS(); } // Move all files we will reuse to cache folder // TODO: If WipeTargetFolder is false we could check which files are already correct and leave them in place if (!LocalPathIndexesMatchingSequenceIndexes.empty()) { ZEN_TRACE_CPU("UpdateFolder_CacheReused"); uint64_t TotalFullFileSizeCached = 0; for (uint32_t LocalPathIndex : LocalPathIndexesMatchingSequenceIndexes) { const IoHash& RawHash = LocalContent.RawHashes[LocalPathIndex]; const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred(); const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash); ZEN_ASSERT_SLOW(std::filesystem::exists(LocalFilePath)); SetFileReadOnly(LocalFilePath, false); ZEN_ASSERT_SLOW(!std::filesystem::exists(CacheFilePath)); std::filesystem::rename(LocalFilePath, CacheFilePath); TotalFullFileSizeCached += std::filesystem::file_size(CacheFilePath); } ZEN_CONSOLE("Saved {} ({}) unchanged files in cache", LocalPathIndexesMatchingSequenceIndexes.size(), NiceBytes(TotalFullFileSizeCached)); } if (WipeTargetFolder) { ZEN_TRACE_CPU("UpdateFolder_WipeTarget"); Stopwatch Timer; // Clean target folder ZEN_CONSOLE("Wiping {}", Path); if (!CleanDirectory(Path, DefaultExcludeFolders)) { ZEN_WARN("Some files in {} could not be removed", Path); } RebuildFolderStateStats.CleanFolderElapsedWallTimeUs = Timer.GetElapsedTimeUs(); } else { ZEN_TRACE_CPU("UpdateFolder_RemoveUnused"); Stopwatch Timer; // Remove unused tracked files tsl::robin_map RemotePathToRemoteIndex; RemotePathToRemoteIndex.reserve(RemoteContent.Paths.size()); for (uint32_t RemotePathIndex = 0; RemotePathIndex < RemoteContent.Paths.size(); RemotePathIndex++) { RemotePathToRemoteIndex.insert({RemoteContent.Paths[RemotePathIndex].generic_string(), RemotePathIndex}); } std::vector LocalFilesToRemove; for (uint32_t LocalPathIndex = 0; LocalPathIndex < LocalContent.Paths.size(); LocalPathIndex++) { if (!RemotePathToRemoteIndex.contains(LocalContent.Paths[LocalPathIndex].generic_string())) { const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred(); if (std::filesystem::exists(LocalFilePath)) { LocalFilesToRemove.emplace_back(std::move(LocalFilePath)); } } } if (!LocalFilesToRemove.empty()) { ZEN_CONSOLE("Cleaning {} removed files from {}", LocalFilesToRemove.size(), Path); for (const std::filesystem::path& LocalFilePath : LocalFilesToRemove) { SetFileReadOnly(LocalFilePath, false); std::filesystem::remove(LocalFilePath); } } RebuildFolderStateStats.CleanFolderElapsedWallTimeUs = Timer.GetElapsedTimeUs(); } { ZEN_TRACE_CPU("UpdateFolder_FinalizeTree"); Stopwatch Timer; WorkerThreadPool& WritePool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // ProgressBar RebuildProgressBar(UsePlainProgress); ParallellWork Work(AbortFlag); OutLocalFolderState.Paths.resize(RemoteContent.Paths.size()); OutLocalFolderState.RawSizes.resize(RemoteContent.Paths.size()); OutLocalFolderState.Attributes.resize(RemoteContent.Paths.size()); OutLocalFolderState.ModificationTicks.resize(RemoteContent.Paths.size()); std::atomic TargetsComplete = 0; std::vector> Targets; Targets.reserve(RemoteContent.Paths.size()); for (uint32_t RemotePathIndex = 0; RemotePathIndex < RemoteContent.Paths.size(); RemotePathIndex++) { Targets.push_back(std::make_pair(RemoteContent.RawHashes[RemotePathIndex], RemotePathIndex)); } std::sort(Targets.begin(), Targets.end(), [](const std::pair& Lhs, const std::pair& Rhs) { return Lhs.first < Rhs.first; }); size_t TargetOffset = 0; while (TargetOffset < Targets.size()) { if (AbortFlag) { break; } size_t TargetCount = 1; const IoHash& RawHash = Targets[TargetOffset].first; while (Targets[TargetOffset + TargetCount].first == RawHash) { TargetCount++; } Work.ScheduleWork( WritePool, // GetSyncWorkerPool(),// [&, BaseTargetOffset = TargetOffset, TargetCount](std::atomic&) { if (!AbortFlag) { ZEN_TRACE_CPU("FinalizeTree_Work"); size_t TargetOffset = BaseTargetOffset; const IoHash& RawHash = Targets[TargetOffset].first; const uint32_t FirstTargetPathIndex = Targets[TargetOffset].second; const std::filesystem::path& FirstTargetPath = RemoteContent.Paths[FirstTargetPathIndex]; OutLocalFolderState.Paths[FirstTargetPathIndex] = FirstTargetPath; OutLocalFolderState.RawSizes[FirstTargetPathIndex] = RemoteContent.RawSizes[FirstTargetPathIndex]; const std::filesystem::path FirstTargetFilePath = (Path / FirstTargetPath).make_preferred(); if (RawHash == IoHash::Zero) { if (std::filesystem::exists(FirstTargetFilePath)) { SetFileReadOnly(FirstTargetFilePath, false); } CreateDirectories(FirstTargetFilePath.parent_path()); { BasicFile OutputFile; OutputFile.Open(FirstTargetFilePath, BasicFile::Mode::kTruncate); } } else { ZEN_TRACE_CPU("FinalizeTree_MoveIntoPlace"); const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash); ZEN_ASSERT_SLOW(std::filesystem::exists(CacheFilePath)); CreateDirectories(FirstTargetFilePath.parent_path()); if (std::filesystem::exists(FirstTargetFilePath)) { SetFileReadOnly(FirstTargetFilePath, false); } std::filesystem::rename(CacheFilePath, FirstTargetFilePath); RebuildFolderStateStats.FinalizeTreeFilesMovedCount++; } OutLocalFolderState.Attributes[FirstTargetPathIndex] = RemoteContent.Attributes.empty() ? GetNativeFileAttributes(FirstTargetFilePath) : SetNativeFileAttributes(FirstTargetFilePath, RemoteContent.Platform, RemoteContent.Attributes[FirstTargetPathIndex]); OutLocalFolderState.ModificationTicks[FirstTargetPathIndex] = GetModificationTickFromPath(FirstTargetFilePath); TargetOffset++; TargetsComplete++; while (TargetOffset < (BaseTargetOffset + TargetCount)) { ZEN_TRACE_CPU("FinalizeTree_Copy"); ZEN_ASSERT(Targets[TargetOffset].first == RawHash); ZEN_ASSERT_SLOW(std::filesystem::exists(FirstTargetFilePath)); const uint32_t ExtraTargetPathIndex = Targets[TargetOffset].second; const std::filesystem::path& ExtraTargetPath = RemoteContent.Paths[ExtraTargetPathIndex]; const std::filesystem::path ExtraTargetFilePath = (Path / ExtraTargetPath).make_preferred(); OutLocalFolderState.Paths[ExtraTargetPathIndex] = ExtraTargetPath; OutLocalFolderState.RawSizes[ExtraTargetPathIndex] = RemoteContent.RawSizes[ExtraTargetPathIndex]; CreateDirectories(ExtraTargetFilePath.parent_path()); if (std::filesystem::exists(ExtraTargetFilePath)) { SetFileReadOnly(ExtraTargetFilePath, false); } CopyFile(FirstTargetFilePath, ExtraTargetFilePath, {.EnableClone = false}); RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++; OutLocalFolderState.Attributes[ExtraTargetPathIndex] = RemoteContent.Attributes.empty() ? GetNativeFileAttributes(ExtraTargetFilePath) : SetNativeFileAttributes(ExtraTargetFilePath, RemoteContent.Platform, RemoteContent.Attributes[ExtraTargetPathIndex]); OutLocalFolderState.ModificationTicks[ExtraTargetPathIndex] = GetModificationTickFromPath(ExtraTargetFilePath); TargetOffset++; TargetsComplete++; } } }, Work.DefaultErrorFunction()); TargetOffset += TargetCount; } { ZEN_TRACE_CPU("FinalizeTree_Wait"); Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted, PendingWork); std::string Details = fmt::format("{}/{} files", TargetsComplete.load(), Targets.size()); RebuildProgressBar.UpdateState({.Task = "Rebuilding state ", .Details = Details, .TotalCount = gsl::narrow(Targets.size()), .RemainingCount = gsl::narrow(Targets.size() - TargetsComplete.load())}, false); }); } RebuildFolderStateStats.FinalizeTreeElapsedWallTimeUs = Timer.GetElapsedTimeUs(); if (AbortFlag) { return; } RebuildProgressBar.Finish(); } } std::vector> ResolveBuildPartNames(BuildStorage& Storage, const Oid& BuildId, const std::vector& BuildPartIds, std::span BuildPartNames, std::uint64_t& OutPreferredMultipartChunkSize) { std::vector> Result; { Stopwatch GetBuildTimer; std::vector> AvailableParts; CbObject BuildObject = Storage.GetBuild(BuildId); ZEN_CONSOLE("GetBuild took {}. Name: '{}', Payload size: {}", NiceTimeSpanMs(GetBuildTimer.GetElapsedTimeMs()), BuildObject["BuildName"sv].AsString(), NiceBytes(BuildObject.GetSize())); ZEN_DEBUG("Build object: {}", BuildObject); CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView(); if (!PartsObject) { throw std::runtime_error("Build object does not have a 'parts' object"); } OutPreferredMultipartChunkSize = BuildObject["chunkSize"sv].AsUInt64(OutPreferredMultipartChunkSize); for (CbFieldView PartView : PartsObject) { const std::string BuildPartName = std::string(PartView.GetName()); const Oid BuildPartId = PartView.AsObjectId(); if (BuildPartId == Oid::Zero) { ExtendableStringBuilder<128> SB; for (CbFieldView ScanPartView : PartsObject) { SB.Append(fmt::format("\n {}: {}", ScanPartView.GetName(), ScanPartView.AsObjectId())); } throw std::runtime_error( fmt::format("Build object parts does not have a '{}' object id{}", BuildPartName, SB.ToView())); } AvailableParts.push_back({BuildPartId, BuildPartName}); } if (BuildPartIds.empty() && BuildPartNames.empty()) { Result = AvailableParts; } else { for (const std::string& BuildPartName : BuildPartNames) { if (auto It = std::find_if(AvailableParts.begin(), AvailableParts.end(), [&BuildPartName](const auto& Part) { return Part.second == BuildPartName; }); It != AvailableParts.end()) { Result.push_back(*It); } else { throw std::runtime_error(fmt::format("Build {} object does not have a part named '{}'", BuildId, BuildPartName)); } } for (const Oid& BuildPartId : BuildPartIds) { if (auto It = std::find_if(AvailableParts.begin(), AvailableParts.end(), [&BuildPartId](const auto& Part) { return Part.first == BuildPartId; }); It != AvailableParts.end()) { Result.push_back(*It); } else { throw std::runtime_error(fmt::format("Build {} object does not have a part with id '{}'", BuildId, BuildPartId)); } } } if (Result.empty()) { throw std::runtime_error(fmt::format("Build object does not have any parts", BuildId)); } } return Result; } ChunkedFolderContent GetRemoteContent(BuildStorage& Storage, const Oid& BuildId, const std::vector>& BuildParts, std::unique_ptr& OutChunkController, std::vector& OutPartContents, std::vector& OutBlockDescriptions, std::vector& OutLooseChunkHashes) { ZEN_TRACE_CPU("GetRemoteContent"); Stopwatch GetBuildPartTimer; const Oid BuildPartId = BuildParts[0].first; const std::string_view BuildPartName = BuildParts[0].second; CbObject BuildPartManifest = Storage.GetBuildPart(BuildId, BuildPartId); ZEN_CONSOLE("GetBuildPart {} ('{}') took {}. Payload size: {}", BuildPartId, BuildPartName, NiceTimeSpanMs(GetBuildPartTimer.GetElapsedTimeMs()), NiceBytes(BuildPartManifest.GetSize())); { CbObjectView Chunker = BuildPartManifest["chunker"sv].AsObjectView(); std::string_view ChunkerName = Chunker["name"sv].AsString(); CbObjectView Parameters = Chunker["parameters"sv].AsObjectView(); OutChunkController = CreateChunkingController(ChunkerName, Parameters); } auto ParseBuildPartManifest = [](BuildStorage& Storage, const Oid& BuildId, const Oid& BuildPartId, CbObject BuildPartManifest, ChunkedFolderContent& OutRemoteContent, std::vector& OutBlockDescriptions, std::vector& OutLooseChunkHashes) { std::vector AbsoluteChunkOrders; std::vector LooseChunkRawSizes; std::vector BlockRawHashes; ReadBuildContentFromCompactBinary(BuildPartManifest, OutRemoteContent.Platform, OutRemoteContent.Paths, OutRemoteContent.RawHashes, OutRemoteContent.RawSizes, OutRemoteContent.Attributes, OutRemoteContent.ChunkedContent.SequenceRawHashes, OutRemoteContent.ChunkedContent.ChunkCounts, AbsoluteChunkOrders, OutLooseChunkHashes, LooseChunkRawSizes, BlockRawHashes); // TODO: GetBlockDescriptions for all BlockRawHashes in one go - check for local block descriptions when we cache them Stopwatch GetBlockMetadataTimer; OutBlockDescriptions = Storage.GetBlockMetadata(BuildId, BlockRawHashes); ZEN_CONSOLE("GetBlockMetadata for {} took {}. Found {} blocks", BuildPartId, NiceTimeSpanMs(GetBlockMetadataTimer.GetElapsedTimeMs()), OutBlockDescriptions.size()); if (OutBlockDescriptions.size() != BlockRawHashes.size()) { bool AttemptFallback = false; std::string ErrorDescription = fmt::format("All required blocks could not be found, {} blocks does not have metadata in this context.", BlockRawHashes.size() - OutBlockDescriptions.size()); if (AttemptFallback) { ZEN_CONSOLE("{} Attemping fallback options.", ErrorDescription); std::vector AugmentedBlockDescriptions; AugmentedBlockDescriptions.reserve(BlockRawHashes.size()); std::vector FoundBlocks = Storage.FindBlocks(BuildId); for (const IoHash& BlockHash : BlockRawHashes) { if (auto It = std::find_if( OutBlockDescriptions.begin(), OutBlockDescriptions.end(), [BlockHash](const ChunkBlockDescription& Description) { return Description.BlockHash == BlockHash; }); It != OutBlockDescriptions.end()) { AugmentedBlockDescriptions.emplace_back(std::move(*It)); } else if (auto ListBlocksIt = std::find_if( FoundBlocks.begin(), FoundBlocks.end(), [BlockHash](const ChunkBlockDescription& Description) { return Description.BlockHash == BlockHash; }); ListBlocksIt != FoundBlocks.end()) { ZEN_CONSOLE("Found block {} via context find successfully", BlockHash); AugmentedBlockDescriptions.emplace_back(std::move(*ListBlocksIt)); } else { IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockHash); if (!BlockBuffer) { throw std::runtime_error(fmt::format("Block {} could not be found", BlockHash)); } IoHash BlockRawHash; uint64_t BlockRawSize; CompressedBuffer CompressedBlockBuffer = CompressedBuffer::FromCompressed(SharedBuffer(std::move(BlockBuffer)), BlockRawHash, BlockRawSize); if (!CompressedBlockBuffer) { throw std::runtime_error(fmt::format("Block {} is not a compressed buffer", BlockHash)); } if (BlockRawHash != BlockHash) { throw std::runtime_error( fmt::format("Block {} header has a mismatching raw hash {}", BlockHash, BlockRawHash)); } CompositeBuffer DecompressedBlockBuffer = CompressedBlockBuffer.DecompressToComposite(); if (!DecompressedBlockBuffer) { throw std::runtime_error(fmt::format("Block {} failed to decompress", BlockHash)); } ChunkBlockDescription MissingChunkDescription = GetChunkBlockDescription(DecompressedBlockBuffer.Flatten(), BlockHash); AugmentedBlockDescriptions.emplace_back(std::move(MissingChunkDescription)); } } OutBlockDescriptions.swap(AugmentedBlockDescriptions); } else { throw std::runtime_error(ErrorDescription); } } CalculateLocalChunkOrders(AbsoluteChunkOrders, OutLooseChunkHashes, LooseChunkRawSizes, OutBlockDescriptions, OutRemoteContent.ChunkedContent.ChunkHashes, OutRemoteContent.ChunkedContent.ChunkRawSizes, OutRemoteContent.ChunkedContent.ChunkOrders); }; OutPartContents.resize(1); ParseBuildPartManifest(Storage, BuildId, BuildPartId, BuildPartManifest, OutPartContents[0], OutBlockDescriptions, OutLooseChunkHashes); ChunkedFolderContent RemoteContent; if (BuildParts.size() > 1) { std::vector OverlayBlockDescriptions; std::vector OverlayLooseChunkHashes; for (size_t PartIndex = 1; PartIndex < BuildParts.size(); PartIndex++) { const Oid& OverlayBuildPartId = BuildParts[PartIndex].first; const std::string& OverlayBuildPartName = BuildParts[PartIndex].second; Stopwatch GetOverlayBuildPartTimer; CbObject OverlayBuildPartManifest = Storage.GetBuildPart(BuildId, OverlayBuildPartId); ZEN_CONSOLE("GetBuildPart {} ('{}') took {}. Payload size: {}", OverlayBuildPartId, OverlayBuildPartName, NiceTimeSpanMs(GetOverlayBuildPartTimer.GetElapsedTimeMs()), NiceBytes(OverlayBuildPartManifest.GetSize())); ChunkedFolderContent OverlayPartContent; std::vector OverlayPartBlockDescriptions; std::vector OverlayPartLooseChunkHashes; ParseBuildPartManifest(Storage, BuildId, OverlayBuildPartId, OverlayBuildPartManifest, OverlayPartContent, OverlayPartBlockDescriptions, OverlayPartLooseChunkHashes); OutPartContents.push_back(OverlayPartContent); OverlayBlockDescriptions.insert(OverlayBlockDescriptions.end(), OverlayPartBlockDescriptions.begin(), OverlayPartBlockDescriptions.end()); OverlayLooseChunkHashes.insert(OverlayLooseChunkHashes.end(), OverlayPartLooseChunkHashes.begin(), OverlayPartLooseChunkHashes.end()); } RemoteContent = MergeChunkedFolderContents(OutPartContents[0], std::span(OutPartContents).subspan(1)); { tsl::robin_set AllBlockHashes; for (const ChunkBlockDescription& Description : OutBlockDescriptions) { AllBlockHashes.insert(Description.BlockHash); } for (const ChunkBlockDescription& Description : OverlayBlockDescriptions) { if (!AllBlockHashes.contains(Description.BlockHash)) { AllBlockHashes.insert(Description.BlockHash); OutBlockDescriptions.push_back(Description); } } } { tsl::robin_set AllLooseChunkHashes(OutLooseChunkHashes.begin(), OutLooseChunkHashes.end()); for (const IoHash& OverlayLooseChunkHash : OverlayLooseChunkHashes) { if (!AllLooseChunkHashes.contains(OverlayLooseChunkHash)) { AllLooseChunkHashes.insert(OverlayLooseChunkHash); OutLooseChunkHashes.push_back(OverlayLooseChunkHash); } } } } else { RemoteContent = OutPartContents[0]; } return RemoteContent; } ChunkedFolderContent GetLocalContent(GetFolderContentStatistics& LocalFolderScanStats, ChunkingStatistics& ChunkingStats, const std::filesystem::path& Path, ChunkingController& ChunkController) { ChunkedFolderContent LocalContent; auto IsAcceptedFolder = [ExcludeFolders = DefaultExcludeFolders](const std::string_view& RelativePath) -> bool { for (const std::string_view& ExcludeFolder : ExcludeFolders) { if (RelativePath.starts_with(ExcludeFolder)) { if (RelativePath.length() == ExcludeFolder.length()) { return false; } else if (RelativePath[ExcludeFolder.length()] == '/') { return false; } } } return true; }; auto IsAcceptedFile = [ExcludeExtensions = DefaultExcludeExtensions](const std::string_view& RelativePath, uint64_t, uint32_t) -> bool { for (const std::string_view& ExcludeExtension : ExcludeExtensions) { if (RelativePath.ends_with(ExcludeExtension)) { return false; } } return true; }; FolderContent CurrentLocalFolderContent = GetFolderContent( LocalFolderScanStats, Path, std::move(IsAcceptedFolder), std::move(IsAcceptedFile), GetMediumWorkerPool(EWorkloadType::Burst), UsePlainProgress ? 5000 : 200, [&](bool, std::ptrdiff_t) { ZEN_DEBUG("Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), Path); }, AbortFlag); if (AbortFlag) { return {}; } FolderContent LocalFolderState; bool ScanContent = true; std::vector PathIndexesOufOfDate; if (std::filesystem::is_regular_file(Path / ZenStateFilePath)) { try { Stopwatch ReadStateTimer; CbObject CurrentStateObject = LoadCompactBinaryObject(Path / ZenStateFilePath).Object; if (CurrentStateObject) { Oid CurrentBuildId; std::vector SavedBuildPartIds; std::vector SavedBuildPartsNames; std::vector SavedPartContents; if (ReadStateObject(CurrentStateObject, CurrentBuildId, SavedBuildPartIds, SavedBuildPartsNames, SavedPartContents, LocalFolderState)) { if (!SavedPartContents.empty()) { if (SavedPartContents.size() == 1) { LocalContent = std::move(SavedPartContents[0]); } else { LocalContent = MergeChunkedFolderContents(SavedPartContents[0], std::span(SavedPartContents).subspan(1)); } if (!LocalFolderState.AreKnownFilesEqual(CurrentLocalFolderContent)) { const size_t LocaStatePathCount = LocalFolderState.Paths.size(); std::vector DeletedPaths; FolderContent UpdatedContent = GetUpdatedContent(LocalFolderState, CurrentLocalFolderContent, DeletedPaths); if (!DeletedPaths.empty()) { LocalContent = DeletePathsFromChunkedContent(LocalContent, DeletedPaths); } ZEN_CONSOLE("Updating state, {} local files deleted and {} local files updated out of {}", DeletedPaths.size(), UpdatedContent.Paths.size(), LocaStatePathCount); if (UpdatedContent.Paths.size() > 0) { uint64_t ByteCountToScan = 0; for (const uint64_t RawSize : UpdatedContent.RawSizes) { ByteCountToScan += RawSize; } ProgressBar ProgressBar(false); FilteredRate FilteredBytesHashed; FilteredBytesHashed.Start(); ChunkedFolderContent UpdatedLocalContent = ChunkFolderContent( ChunkingStats, GetMediumWorkerPool(EWorkloadType::Burst), Path, UpdatedContent, ChunkController, UsePlainProgress ? 5000 : 200, [&](bool, std::ptrdiff_t) { FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load()); std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found", ChunkingStats.FilesProcessed.load(), UpdatedContent.Paths.size(), NiceBytes(ChunkingStats.BytesHashed.load()), NiceBytes(ByteCountToScan), NiceNum(FilteredBytesHashed.GetCurrent()), ChunkingStats.UniqueChunksFound.load(), NiceBytes(ChunkingStats.UniqueBytesFound.load())); ProgressBar.UpdateState({.Task = "Scanning files ", .Details = Details, .TotalCount = ByteCountToScan, .RemainingCount = ByteCountToScan - ChunkingStats.BytesHashed.load()}, false); }, AbortFlag); if (AbortFlag) { return {}; } FilteredBytesHashed.Stop(); ProgressBar.Finish(); LocalContent = MergeChunkedFolderContents(LocalContent, {{UpdatedLocalContent}}); } } else { // Remove files from LocalContent no longer in LocalFolderState tsl::robin_set LocalFolderPaths; LocalFolderPaths.reserve(LocalFolderState.Paths.size()); for (const std::filesystem::path& LocalFolderPath : LocalFolderState.Paths) { LocalFolderPaths.insert(LocalFolderPath.generic_string()); } std::vector DeletedPaths; for (const std::filesystem::path& LocalContentPath : LocalContent.Paths) { if (!LocalFolderPaths.contains(LocalContentPath.generic_string())) { DeletedPaths.push_back(LocalContentPath); } } if (!DeletedPaths.empty()) { LocalContent = DeletePathsFromChunkedContent(LocalContent, DeletedPaths); } ZEN_CONSOLE("Using cached local state"); } ZEN_CONSOLE("Read local state in {}", NiceTimeSpanMs(ReadStateTimer.GetElapsedTimeMs())); ScanContent = false; } } } } catch (const std::exception& Ex) { ZEN_CONSOLE("Failed reading state file, falling back to scannning. Reason: {}", Ex.what()); } } if (ScanContent) { uint64_t ByteCountToScan = 0; for (const uint64_t RawSize : CurrentLocalFolderContent.RawSizes) { ByteCountToScan += RawSize; } ProgressBar ProgressBar(false); FilteredRate FilteredBytesHashed; FilteredBytesHashed.Start(); ChunkedFolderContent UpdatedLocalContent = ChunkFolderContent( ChunkingStats, GetMediumWorkerPool(EWorkloadType::Burst), Path, CurrentLocalFolderContent, ChunkController, UsePlainProgress ? 5000 : 200, [&](bool, std::ptrdiff_t) { FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load()); std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found", ChunkingStats.FilesProcessed.load(), CurrentLocalFolderContent.Paths.size(), NiceBytes(ChunkingStats.BytesHashed.load()), ByteCountToScan, NiceNum(FilteredBytesHashed.GetCurrent()), ChunkingStats.UniqueChunksFound.load(), NiceBytes(ChunkingStats.UniqueBytesFound.load())); ProgressBar.UpdateState({.Task = "Scanning files ", .Details = Details, .TotalCount = ByteCountToScan, .RemainingCount = (ByteCountToScan - ChunkingStats.BytesHashed.load())}, false); }, AbortFlag); if (AbortFlag) { return {}; } FilteredBytesHashed.Stop(); ProgressBar.Finish(); } return LocalContent; } void DownloadFolder(BuildStorage& Storage, const Oid& BuildId, const std::vector& BuildPartIds, std::span BuildPartNames, const std::filesystem::path& Path, bool AllowMultiparts, bool AllowPartialBlockRequests, bool WipeTargetFolder, bool PostDownloadVerify) { ZEN_TRACE_CPU("DownloadFolder"); Stopwatch DownloadTimer; const std::filesystem::path ZenTempFolder = Path / ZenTempFolderName; CreateDirectories(ZenTempFolder); CreateDirectories(Path / ZenTempBlockFolderName); CreateDirectories(Path / ZenTempCacheFolderName); CreateDirectories(Path / ZenTempDownloadFolderName); std::uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u; std::vector> AllBuildParts = ResolveBuildPartNames(Storage, BuildId, BuildPartIds, BuildPartNames, PreferredMultipartChunkSize); std::vector PartContents; std::unique_ptr ChunkController; std::vector BlockDescriptions; std::vector LooseChunkHashes; ChunkedFolderContent RemoteContent = GetRemoteContent(Storage, BuildId, AllBuildParts, ChunkController, PartContents, BlockDescriptions, LooseChunkHashes); const std::uint64_t LargeAttachmentSize = AllowMultiparts ? PreferredMultipartChunkSize * 4u : (std::uint64_t)-1; if (!ChunkController) { ZEN_CONSOLE("Warning: Unspecified chunking algorith, using default"); ChunkController = CreateBasicChunkingController(); } GetFolderContentStatistics LocalFolderScanStats; ChunkingStatistics ChunkingStats; ChunkedFolderContent LocalContent; if (std::filesystem::is_directory(Path)) { if (!WipeTargetFolder) { LocalContent = GetLocalContent(LocalFolderScanStats, ChunkingStats, Path, *ChunkController); } } else { CreateDirectories(Path); } if (AbortFlag) { return; } auto CompareContent = [](const ChunkedFolderContent& Lhs, const ChunkedFolderContent& Rhs) { tsl::robin_map RhsPathToIndex; const size_t RhsPathCount = Rhs.Paths.size(); RhsPathToIndex.reserve(RhsPathCount); for (size_t RhsPathIndex = 0; RhsPathIndex < RhsPathCount; RhsPathIndex++) { RhsPathToIndex.insert({Rhs.Paths[RhsPathIndex].generic_string(), RhsPathIndex}); } const size_t LhsPathCount = Lhs.Paths.size(); for (size_t LhsPathIndex = 0; LhsPathIndex < LhsPathCount; LhsPathIndex++) { if (auto It = RhsPathToIndex.find(Lhs.Paths[LhsPathIndex].generic_string()); It != RhsPathToIndex.end()) { const size_t RhsPathIndex = It->second; if ((Lhs.RawHashes[LhsPathIndex] != Rhs.RawHashes[RhsPathIndex]) || (!FolderContent::AreFileAttributesEqual(Lhs.Attributes[LhsPathIndex], Rhs.Attributes[RhsPathIndex]))) { return false; } } else { return false; } } tsl::robin_set LhsPathExists; LhsPathExists.reserve(LhsPathCount); for (size_t LhsPathIndex = 0; LhsPathIndex < LhsPathCount; LhsPathIndex++) { LhsPathExists.insert({Lhs.Paths[LhsPathIndex].generic_string()}); } for (size_t RhsPathIndex = 0; RhsPathIndex < RhsPathCount; RhsPathIndex++) { if (!LhsPathExists.contains(Rhs.Paths[RhsPathIndex].generic_string())) { return false; } } return true; }; if (CompareContent(RemoteContent, LocalContent)) { ZEN_CONSOLE("Local state is identical to build to download. All done. Completed in {}.", NiceTimeSpanMs(DownloadTimer.GetElapsedTimeMs())); } else { ExtendableStringBuilder<128> BuildPartString; for (const std::pair& BuildPart : AllBuildParts) { BuildPartString.Append(fmt::format(" {} ({})", BuildPart.second, BuildPart.first)); } ZEN_CONSOLE("Downloading build {}, parts:{}", BuildId, BuildPartString.ToView()); FolderContent LocalFolderState; DiskStatistics DiskStats; CacheMappingStatistics CacheMappingStats; DownloadStatistics DownloadStats; WriteChunkStatistics WriteChunkStats; RebuildFolderStateStatistics RebuildFolderStateStats; VerifyFolderStatistics VerifyFolderStats; UpdateFolder(Storage, BuildId, Path, LargeAttachmentSize, PreferredMultipartChunkSize, LocalContent, RemoteContent, BlockDescriptions, LooseChunkHashes, AllowPartialBlockRequests, WipeTargetFolder, LocalFolderState, DiskStats, CacheMappingStats, DownloadStats, WriteChunkStats, RebuildFolderStateStats); if (!AbortFlag) { VerifyFolder(RemoteContent, Path, PostDownloadVerify, VerifyFolderStats); Stopwatch WriteStateTimer; CbObject StateObject = CreateStateObject(BuildId, AllBuildParts, PartContents, LocalFolderState); CreateDirectories((Path / ZenStateFilePath).parent_path()); TemporaryFile::SafeWriteFile(Path / ZenStateFilePath, StateObject.GetView()); ZEN_CONSOLE("Wrote local state in {}", NiceTimeSpanMs(WriteStateTimer.GetElapsedTimeMs())); #if 0 ExtendableStringBuilder<1024> SB; CompactBinaryToJson(StateObject, SB); WriteFile(Path / ZenStateFileJsonPath, IoBuffer(IoBuffer::Wrap, SB.Data(), SB.Size())); #endif // 0 const uint64_t DownloadCount = DownloadStats.DownloadedChunkCount.load() + DownloadStats.DownloadedBlockCount.load() + DownloadStats.DownloadedPartialBlockCount.load(); const uint64_t DownloadByteCount = DownloadStats.DownloadedChunkByteCount.load() + DownloadStats.DownloadedBlockByteCount.load() + DownloadStats.DownloadedPartialBlockByteCount.load(); const uint64_t DownloadTimeMs = DownloadTimer.GetElapsedTimeMs(); ZEN_CONSOLE( "Downloaded build {}, parts:{} in {}\n" " Download: {} ({}) {}bits/s\n" " Write: {} ({}) {}B/s\n" " Clean: {}\n" " Finalize: {}\n" " Verify: {}", BuildId, BuildPartString.ToView(), NiceTimeSpanMs(DownloadTimeMs), DownloadCount, NiceBytes(DownloadByteCount), NiceNum(GetBytesPerSecond(WriteChunkStats.DownloadTimeUs, DownloadByteCount * 8)), DiskStats.WriteCount.load(), NiceBytes(DiskStats.WriteByteCount.load()), NiceNum(GetBytesPerSecond(WriteChunkStats.WriteTimeUs, DiskStats.WriteByteCount.load())), NiceTimeSpanMs(RebuildFolderStateStats.CleanFolderElapsedWallTimeUs / 1000), NiceTimeSpanMs(RebuildFolderStateStats.FinalizeTreeElapsedWallTimeUs / 1000), NiceTimeSpanMs(VerifyFolderStats.VerifyElapsedWallTimeUs / 1000)); } } if (CleanDirectory(ZenTempFolder, {})) { std::filesystem::remove(ZenTempFolder); } } void DiffFolders(const std::filesystem::path& BasePath, const std::filesystem::path& ComparePath, bool OnlyChunked) { ChunkedFolderContent BaseFolderContent; ChunkedFolderContent CompareFolderContent; { std::unique_ptr ChunkController = CreateBasicChunkingController(); std::vector ExcludeExtensions = DefaultExcludeExtensions; if (OnlyChunked) { ExcludeExtensions.insert(ExcludeExtensions.end(), DefaultChunkingExcludeExtensions.begin(), DefaultChunkingExcludeExtensions.end()); } auto IsAcceptedFolder = [ExcludeFolders = DefaultExcludeFolders](const std::string_view& RelativePath) -> bool { for (const std::string_view& ExcludeFolder : ExcludeFolders) { if (RelativePath.starts_with(ExcludeFolder)) { if (RelativePath.length() == ExcludeFolder.length()) { return false; } else if (RelativePath[ExcludeFolder.length()] == '/') { return false; } } } return true; }; auto IsAcceptedFile = [ExcludeExtensions](const std::string_view& RelativePath, uint64_t, uint32_t) -> bool { for (const std::string_view& ExcludeExtension : ExcludeExtensions) { if (RelativePath.ends_with(ExcludeExtension)) { return false; } } return true; }; GetFolderContentStatistics BaseGetFolderContentStats; ChunkingStatistics BaseChunkingStats; BaseFolderContent = ScanAndChunkFolder(BaseGetFolderContentStats, BaseChunkingStats, BasePath, IsAcceptedFolder, IsAcceptedFile, *ChunkController); if (AbortFlag) { return; } GetFolderContentStatistics CompareGetFolderContentStats; ChunkingStatistics CompareChunkingStats; CompareFolderContent = ScanAndChunkFolder(CompareGetFolderContentStats, CompareChunkingStats, ComparePath, IsAcceptedFolder, IsAcceptedFile, *ChunkController); if (AbortFlag) { return; } } std::vector AddedHashes; std::vector RemovedHashes; uint64_t RemovedSize = 0; uint64_t AddedSize = 0; tsl::robin_map BaseRawHashLookup; for (size_t PathIndex = 0; PathIndex < BaseFolderContent.RawHashes.size(); PathIndex++) { const IoHash& RawHash = BaseFolderContent.RawHashes[PathIndex]; BaseRawHashLookup.insert_or_assign(RawHash, PathIndex); } tsl::robin_map CompareRawHashLookup; for (size_t PathIndex = 0; PathIndex < CompareFolderContent.RawHashes.size(); PathIndex++) { const IoHash& RawHash = CompareFolderContent.RawHashes[PathIndex]; if (!BaseRawHashLookup.contains(RawHash)) { AddedHashes.push_back(RawHash); AddedSize += CompareFolderContent.RawSizes[PathIndex]; } CompareRawHashLookup.insert_or_assign(RawHash, PathIndex); } for (uint32_t PathIndex = 0; PathIndex < BaseFolderContent.Paths.size(); PathIndex++) { const IoHash& RawHash = BaseFolderContent.RawHashes[PathIndex]; if (!CompareRawHashLookup.contains(RawHash)) { RemovedHashes.push_back(RawHash); RemovedSize += BaseFolderContent.RawSizes[PathIndex]; } } uint64_t BaseTotalRawSize = 0; for (uint32_t PathIndex = 0; PathIndex < BaseFolderContent.Paths.size(); PathIndex++) { BaseTotalRawSize += BaseFolderContent.RawSizes[PathIndex]; } double KeptPercent = BaseTotalRawSize > 0 ? (100.0 * (BaseTotalRawSize - RemovedSize)) / BaseTotalRawSize : 0; ZEN_CONSOLE("{} ({}) files removed, {} ({}) files added, {} ({} {:.1f}%) files kept", RemovedHashes.size(), NiceBytes(RemovedSize), AddedHashes.size(), NiceBytes(AddedSize), BaseFolderContent.Paths.size() - RemovedHashes.size(), NiceBytes(BaseTotalRawSize - RemovedSize), KeptPercent); uint64_t CompareTotalRawSize = 0; uint64_t FoundChunkCount = 0; uint64_t FoundChunkSize = 0; uint64_t NewChunkCount = 0; uint64_t NewChunkSize = 0; const ChunkedContentLookup BaseFolderLookup = BuildChunkedContentLookup(BaseFolderContent); for (uint32_t ChunkIndex = 0; ChunkIndex < CompareFolderContent.ChunkedContent.ChunkHashes.size(); ChunkIndex++) { const IoHash& ChunkHash = CompareFolderContent.ChunkedContent.ChunkHashes[ChunkIndex]; if (BaseFolderLookup.ChunkHashToChunkIndex.contains(ChunkHash)) { FoundChunkCount++; FoundChunkSize += CompareFolderContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; } else { NewChunkCount++; NewChunkSize += CompareFolderContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; } CompareTotalRawSize += CompareFolderContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; } double FoundPercent = CompareTotalRawSize > 0 ? (100.0 * FoundChunkSize) / CompareTotalRawSize : 0; double NewPercent = CompareTotalRawSize > 0 ? (100.0 * NewChunkSize) / CompareTotalRawSize : 0; ZEN_CONSOLE("Found {} ({} {:.1f}%) out of {} ({}) chunks in {} ({}) base chunks. Added {} ({} {:.1f}%) chunks.", FoundChunkCount, NiceBytes(FoundChunkSize), FoundPercent, CompareFolderContent.ChunkedContent.ChunkHashes.size(), NiceBytes(CompareTotalRawSize), BaseFolderContent.ChunkedContent.ChunkHashes.size(), NiceBytes(BaseTotalRawSize), NewChunkCount, NiceBytes(NewChunkSize), NewPercent); } } // namespace ////////////////////////////////////////////////////////////////////////////////////////////////////// BuildsCommand::BuildsCommand() { m_Options.add_options()("h,help", "Print help"); auto AddAuthOptions = [this](cxxopts::Options& Ops) { Ops.add_option("", "", "system-dir", "Specify system root", cxxopts::value(m_SystemRootDir), ""); // Direct access token (may expire) Ops.add_option("auth-token", "", "access-token", "Cloud/Builds Storage access token", cxxopts::value(m_AccessToken), ""); Ops.add_option("auth-token", "", "access-token-env", "Name of environment variable that holds the cloud/builds Storage access token", cxxopts::value(m_AccessTokenEnv)->default_value(DefaultAccessTokenEnvVariableName), ""); Ops.add_option("auth-token", "", "access-token-path", "Path to json file that holds the cloud/builds Storage access token", cxxopts::value(m_AccessTokenPath), ""); // Auth manager token encryption Ops.add_option("security", "", "encryption-aes-key", "256 bit AES encryption key", cxxopts::value(m_EncryptionKey), ""); Ops.add_option("security", "", "encryption-aes-iv", "128 bit AES encryption initialization vector", cxxopts::value(m_EncryptionIV), ""); // OpenId acccess token Ops.add_option("openid", "", "openid-provider-name", "Open ID provider name", cxxopts::value(m_OpenIdProviderName), "Default"); Ops.add_option("openid", "", "openid-provider-url", "Open ID provider url", cxxopts::value(m_OpenIdProviderUrl), ""); Ops.add_option("openid", "", "openid-client-id", "Open ID client id", cxxopts::value(m_OpenIdClientId), ""); Ops.add_option("openid", "", "openid-refresh-token", "Open ID refresh token", cxxopts::value(m_OpenIdRefreshToken), ""); // OAuth acccess token Ops.add_option("oauth", "", "oauth-url", "OAuth provier url", cxxopts::value(m_OAuthUrl)->default_value(""), ""); Ops.add_option("oauth", "", "oauth-clientid", "OAuth client id", cxxopts::value(m_OAuthClientId)->default_value(""), ""); Ops.add_option("oauth", "", "oauth-clientsecret", "OAuth client secret", cxxopts::value(m_OAuthClientSecret)->default_value(""), ""); }; auto AddCloudOptions = [this, &AddAuthOptions](cxxopts::Options& Ops) { AddAuthOptions(Ops); Ops.add_option("cloud build", "", "url", "Cloud Builds URL", cxxopts::value(m_BuildsUrl), ""); Ops.add_option("cloud build", "", "assume-http2", "Assume that the builds endpoint is a HTTP/2 endpoint skipping HTTP/1.1 upgrade handshake", cxxopts::value(m_AssumeHttp2), ""); Ops.add_option("cloud build", "", "namespace", "Builds Storage namespace", cxxopts::value(m_Namespace), ""); Ops.add_option("cloud build", "", "bucket", "Builds Storage bucket", cxxopts::value(m_Bucket), ""); }; auto AddFileOptions = [this](cxxopts::Options& Ops) { Ops.add_option("filestorage", "", "storage-path", "Builds Storage Path", cxxopts::value(m_StoragePath), ""); Ops.add_option("filestorage", "", "json-metadata", "Write build, part and block metadata as .json files in addition to .cb files", cxxopts::value(m_WriteMetadataAsJson), ""); }; auto AddOutputOptions = [this](cxxopts::Options& Ops) { Ops.add_option("output", "", "plain-progress", "Show progress using plain output", cxxopts::value(m_PlainProgress), ""); Ops.add_option("output", "", "verbose", "Enable verbose console output", cxxopts::value(m_Verbose), ""); }; m_Options.add_option("", "v", "verb", "Verb for build - list, upload, download, diff", cxxopts::value(m_Verb), ""); m_Options.parse_positional({"verb"}); m_Options.positional_help("verb"); // list AddCloudOptions(m_ListOptions); AddFileOptions(m_ListOptions); AddOutputOptions(m_ListOptions); m_ListOptions.add_options()("h,help", "Print help"); // upload AddCloudOptions(m_UploadOptions); AddFileOptions(m_UploadOptions); AddOutputOptions(m_UploadOptions); m_UploadOptions.add_options()("h,help", "Print help"); m_UploadOptions.add_option("", "l", "local-path", "Root file system folder for build", cxxopts::value(m_Path), ""); m_UploadOptions.add_option("", "", "create-build", "Set to true to create the containing build, if unset a builds-id must be given and the build already exist", cxxopts::value(m_CreateBuild), ""); m_UploadOptions.add_option("", "", "build-id", "Build Id", cxxopts::value(m_BuildId), ""); m_UploadOptions.add_option("", "", "build-part-id", "Build part Id, if not given it will be auto generated", cxxopts::value(m_BuildPartId), ""); m_UploadOptions.add_option("", "", "build-part-name", "Name of the build part, if not given it will be be named after the directory name at end of local-path", cxxopts::value(m_BuildPartName), ""); m_UploadOptions.add_option("", "", "metadata-path", "Path to json file that holds the metadata for the build. Requires the create-build option to be set", cxxopts::value(m_BuildMetadataPath), ""); m_UploadOptions.add_option( "", "", "metadata", "Key-value pairs separated by ';' with build meta data. (key1=value1;key2=value2). Requires the create-build option to be set", cxxopts::value(m_BuildMetadata), ""); m_UploadOptions.add_option("", "", "clean", "Ignore existing blocks", cxxopts::value(m_Clean), ""); m_UploadOptions.add_option("", "", "block-min-reuse", "Percent of an existing block that must be relevant for it to be resused. Defaults to 85.", cxxopts::value(m_BlockReuseMinPercentLimit), ""); m_UploadOptions.add_option("", "", "allow-multipart", "Allow large attachments to be transfered using multipart protocol. Defaults to true.", cxxopts::value(m_AllowMultiparts), ""); m_UploadOptions.add_option("", "", "manifest-path", "Path to a text file with one line of [TAB] per file to include.", cxxopts::value(m_ManifestPath), ""); m_UploadOptions .add_option("", "", "verify", "Enable post upload verify of all uploaded data", cxxopts::value(m_PostUploadVerify), ""); m_UploadOptions.add_option("", "", "allow-deltaencoding", "Allow efficient encoding of build manifest. Defaults to true.", cxxopts::value(g_UseDeltaEncoding), ""); m_UploadOptions.parse_positional({"local-path", "build-id"}); m_UploadOptions.positional_help("local-path build-id"); // download AddCloudOptions(m_DownloadOptions); AddFileOptions(m_DownloadOptions); AddOutputOptions(m_DownloadOptions); m_DownloadOptions.add_options()("h,help", "Print help"); m_DownloadOptions.add_option("", "l", "local-path", "Root file system folder for build", cxxopts::value(m_Path), ""); m_DownloadOptions.add_option("", "", "build-id", "Build Id", cxxopts::value(m_BuildId), ""); m_DownloadOptions.add_option( "", "", "build-part-id", "Build part Ids list separated by ',', if no build-part-ids or build-part-names are given all parts will be downloaded", cxxopts::value(m_BuildPartIds), ""); m_DownloadOptions.add_option("", "", "build-part-name", "Name of the build parts list separated by ',', if no build-part-ids or build-part-names are given " "all parts will be downloaded", cxxopts::value(m_BuildPartNames), ""); m_DownloadOptions .add_option("", "", "clean", "Delete all data in target folder before downloading", cxxopts::value(m_Clean), ""); m_DownloadOptions.add_option("", "", "allow-multipart", "Allow large attachments to be transfered using multipart protocol. Defaults to true.", cxxopts::value(m_AllowMultiparts), ""); m_DownloadOptions.add_option("", "", "allow-partial-block-requests", "Allow request for partial chunk blocks. Defaults to true.", cxxopts::value(m_AllowPartialBlockRequests), ""); m_DownloadOptions .add_option("", "", "verify", "Enable post download verify of all tracked files", cxxopts::value(m_PostDownloadVerify), ""); m_DownloadOptions.parse_positional({"local-path", "build-id", "build-part-name"}); m_DownloadOptions.positional_help("local-path build-id build-part-name"); AddOutputOptions(m_DiffOptions); m_DiffOptions.add_options()("h,help", "Print help"); m_DiffOptions.add_option("", "l", "local-path", "Root file system folder used as base", cxxopts::value(m_Path), ""); m_DiffOptions.add_option("", "c", "compare-path", "Root file system folder used as diff", cxxopts::value(m_DiffPath), ""); m_DiffOptions.add_option("", "", "only-chunked", "Skip files from diff summation that are not processed with chunking", cxxopts::value(m_OnlyChunked), ""); m_DiffOptions.parse_positional({"local-path", "compare-path"}); m_DiffOptions.positional_help("local-path compare-path"); AddCloudOptions(m_TestOptions); AddFileOptions(m_TestOptions); AddOutputOptions(m_TestOptions); m_TestOptions.add_options()("h,help", "Print help"); m_TestOptions.add_option("", "l", "local-path", "Root file system folder used as base", cxxopts::value(m_Path), ""); m_TestOptions.add_option("", "", "allow-multipart", "Allow large attachments to be transfered using multipart protocol. Defaults to true.", cxxopts::value(m_AllowMultiparts), ""); m_TestOptions.add_option("", "", "allow-partial-block-requests", "Allow request for partial chunk blocks. Defaults to true.", cxxopts::value(m_AllowPartialBlockRequests), ""); m_TestOptions.parse_positional({"local-path"}); m_TestOptions.positional_help("local-path"); AddCloudOptions(m_FetchBlobOptions); AddFileOptions(m_FetchBlobOptions); AddOutputOptions(m_FetchBlobOptions); m_FetchBlobOptions.add_option("", "", "build-id", "Build Id", cxxopts::value(m_BuildId), ""); m_FetchBlobOptions .add_option("", "", "blob-hash", "IoHash in hex form identifying the blob to download", cxxopts::value(m_BlobHash), ""); m_FetchBlobOptions.parse_positional({"build-id", "blob-hash"}); m_FetchBlobOptions.positional_help("build-id blob-hash"); AddCloudOptions(m_ValidateBuildPartOptions); AddFileOptions(m_ValidateBuildPartOptions); AddOutputOptions(m_ValidateBuildPartOptions); m_ValidateBuildPartOptions.add_option("", "", "build-id", "Build Id", cxxopts::value(m_BuildId), ""); m_ValidateBuildPartOptions.add_option("", "", "build-part-id", "Build part Id, if not given it will be auto generated", cxxopts::value(m_BuildPartId), ""); m_ValidateBuildPartOptions.add_option( "", "", "build-part-name", "Name of the build part, if not given it will be be named after the directory name at end of local-path", cxxopts::value(m_BuildPartName), ""); m_ValidateBuildPartOptions.parse_positional({"build-id", "build-part-id"}); m_ValidateBuildPartOptions.positional_help("build-id build-part-id"); AddCloudOptions(m_MultiTestDownloadOptions); AddFileOptions(m_MultiTestDownloadOptions); AddOutputOptions(m_MultiTestDownloadOptions); m_MultiTestDownloadOptions .add_option("", "l", "local-path", "Root file system folder used as base", cxxopts::value(m_Path), ""); m_MultiTestDownloadOptions.add_option("", "", "build-ids", "Build Ids list separated by ','", cxxopts::value(m_BuildIds), ""); m_MultiTestDownloadOptions.parse_positional({"local-path"}); m_MultiTestDownloadOptions.positional_help("local-path"); } BuildsCommand::~BuildsCommand() = default; int BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { ZEN_UNUSED(GlobalOptions); signal(SIGINT, SignalCallbackHandler); #if ZEN_PLATFORM_WINDOWS signal(SIGBREAK, SignalCallbackHandler); #endif // ZEN_PLATFORM_WINDOWS using namespace std::literals; std::vector SubCommandArguments; cxxopts::Options* SubOption = nullptr; int ParentCommandArgCount = GetSubCommand(m_Options, argc, argv, m_SubCommands, SubOption, SubCommandArguments); if (!ParseOptions(ParentCommandArgCount, argv)) { return 0; } if (SubOption == nullptr) { throw zen::OptionParseException("command verb is missing"); } if (!ParseOptions(*SubOption, gsl::narrow(SubCommandArguments.size()), SubCommandArguments.data())) { return 0; } auto ParseStorageOptions = [&]() { if (!m_BuildsUrl.empty()) { if (!m_StoragePath.empty()) { throw zen::OptionParseException(fmt::format("url is not compatible with the storage-path option\n{}", m_Options.help())); } if (m_Namespace.empty() || m_Bucket.empty()) { throw zen::OptionParseException( fmt::format("namespace and bucket options are required for url option\n{}", m_Options.help())); } } }; std::unique_ptr Auth; HttpClientSettings ClientSettings{.AssumeHttp2 = m_AssumeHttp2, .AllowResume = true, .RetryCount = 2}; auto CreateAuthMgr = [&]() { if (!Auth) { std::filesystem::path DataRoot = m_SystemRootDir.empty() ? PickDefaultSystemRootDirectory() : StringToPath(m_SystemRootDir); if (m_EncryptionKey.empty()) { m_EncryptionKey = "abcdefghijklmnopqrstuvxyz0123456"; ZEN_CONSOLE("Warning: Using default encryption key"); } if (m_EncryptionIV.empty()) { m_EncryptionIV = "0123456789abcdef"; ZEN_CONSOLE("Warning: Using default encryption initialization vector"); } AuthConfig AuthMgrConfig = {.RootDirectory = DataRoot / "auth", .EncryptionKey = AesKey256Bit::FromString(m_EncryptionKey), .EncryptionIV = AesIV128Bit::FromString(m_EncryptionIV)}; if (!AuthMgrConfig.EncryptionKey.IsValid()) { throw zen::OptionParseException("Invalid AES encryption key"); } if (!AuthMgrConfig.EncryptionIV.IsValid()) { throw zen::OptionParseException("Invalid AES initialization vector"); } Auth = AuthMgr::Create(AuthMgrConfig); } }; auto ParseAuthOptions = [&]() { if (!m_OpenIdProviderUrl.empty() && !m_OpenIdClientId.empty()) { CreateAuthMgr(); std::string ProviderName = m_OpenIdProviderName.empty() ? "Default" : m_OpenIdProviderName; Auth->AddOpenIdProvider({.Name = ProviderName, .Url = m_OpenIdProviderUrl, .ClientId = m_OpenIdClientId}); if (!m_OpenIdRefreshToken.empty()) { Auth->AddOpenIdToken({.ProviderName = ProviderName, .RefreshToken = m_OpenIdRefreshToken}); } } if (!m_AccessToken.empty()) { ClientSettings.AccessTokenProvider = httpclientauth::CreateFromStaticToken(m_AccessToken); } else if (!m_AccessTokenPath.empty()) { std::string ResolvedAccessToken = ReadAccessTokenFromFile(m_AccessTokenPath); if (!ResolvedAccessToken.empty()) { ClientSettings.AccessTokenProvider = httpclientauth::CreateFromStaticToken(ResolvedAccessToken); } } else if (!m_AccessTokenEnv.empty()) { std::string ResolvedAccessToken = GetEnvVariable(m_AccessTokenEnv); if (!ResolvedAccessToken.empty()) { ClientSettings.AccessTokenProvider = httpclientauth::CreateFromStaticToken(ResolvedAccessToken); } } else if (!m_OAuthUrl.empty()) { ClientSettings.AccessTokenProvider = httpclientauth::CreateFromOAuthClientCredentials( {.Url = m_OAuthUrl, .ClientId = m_OAuthClientId, .ClientSecret = m_OAuthClientSecret}); } else if (!m_OpenIdProviderName.empty()) { CreateAuthMgr(); ClientSettings.AccessTokenProvider = httpclientauth::CreateFromOpenIdProvider(*Auth, m_OpenIdProviderName); } else { CreateAuthMgr(); ClientSettings.AccessTokenProvider = httpclientauth::CreateFromDefaultOpenIdProvider(*Auth); } if (!m_BuildsUrl.empty() && !ClientSettings.AccessTokenProvider) { ZEN_CONSOLE("Warning: No auth provider given, attempting operation without credentials."); } }; auto ParseOutputOptions = [&]() { IsVerbose = m_Verbose; UsePlainProgress = IsVerbose || m_PlainProgress; }; ParseOutputOptions(); try { if (SubOption == &m_ListOptions) { ParseStorageOptions(); ParseAuthOptions(); HttpClient Http(m_BuildsUrl, ClientSettings); CbObjectWriter QueryWriter; QueryWriter.BeginObject("query"); { // QueryWriter.BeginObject("platform"); // { // QueryWriter.AddString("$eq", "Windows"); // } // QueryWriter.EndObject(); // changelist } QueryWriter.EndObject(); // query BuildStorage::Statistics StorageStats; std::unique_ptr Storage; if (!m_BuildsUrl.empty()) { ZEN_CONSOLE("Querying builds in cloud endpoint '{}'. SessionId: '{}'. Namespace '{}', Bucket '{}'", m_BuildsUrl, Http.GetSessionId(), m_Namespace, m_Bucket); Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, std::filesystem::path{}); } else if (!m_StoragePath.empty()) { std::filesystem::path StoragePath = StringToPath(m_StoragePath); ZEN_CONSOLE("Querying builds in folder '{}'.", StoragePath); Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); } else { throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); } CbObject Response = Storage->ListBuilds(QueryWriter.Save()); ExtendableStringBuilder<1024> SB; CompactBinaryToJson(Response.GetView(), SB); ZEN_CONSOLE("{}", SB.ToView()); return 0; } if (SubOption == &m_UploadOptions) { ParseStorageOptions(); ParseAuthOptions(); HttpClient Http(m_BuildsUrl, ClientSettings); if (m_Path.empty()) { throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_UploadOptions.help())); } if (m_CreateBuild) { if (m_BuildMetadataPath.empty() && m_BuildMetadata.empty()) { throw zen::OptionParseException(fmt::format("Options for builds target are missing\n{}", m_UploadOptions.help())); } if (!m_BuildMetadataPath.empty() && !m_BuildMetadata.empty()) { throw zen::OptionParseException(fmt::format("Conflicting options for builds target\n{}", m_UploadOptions.help())); } } else { if (!m_BuildMetadataPath.empty()) { throw zen::OptionParseException( fmt::format("metadata-path option is only valid if creating a build\n{}", m_UploadOptions.help())); } if (!m_BuildMetadata.empty()) { throw zen::OptionParseException( fmt::format("metadata option is only valid if creating a build\n{}", m_UploadOptions.help())); } } std::filesystem::path Path = StringToPath(m_Path); if (m_BuildPartName.empty()) { m_BuildPartName = Path.filename().string(); } const bool GeneratedBuildId = m_BuildId.empty(); if (GeneratedBuildId) { m_BuildId = Oid::NewOid().ToString(); } else if (m_BuildId.length() != Oid::StringLength) { throw zen::OptionParseException(fmt::format("Invalid build id\n{}", m_UploadOptions.help())); } else if (Oid::FromHexString(m_BuildId) == Oid::Zero) { throw zen::OptionParseException(fmt::format("Invalid build id\n{}", m_UploadOptions.help())); } const bool GeneratedBuildPartId = m_BuildPartId.empty(); if (GeneratedBuildPartId) { m_BuildPartId = Oid::NewOid().ToString(); } else if (m_BuildPartId.length() != Oid::StringLength) { throw zen::OptionParseException(fmt::format("Invalid build id\n{}", m_UploadOptions.help())); } else if (Oid::FromHexString(m_BuildPartId) == Oid::Zero) { throw zen::OptionParseException(fmt::format("Invalid build part id\n{}", m_UploadOptions.help())); } BuildStorage::Statistics StorageStats; const Oid BuildId = Oid::FromHexString(m_BuildId); const Oid BuildPartId = Oid::FromHexString(m_BuildPartId); std::unique_ptr Storage; std::string StorageName; if (!m_BuildsUrl.empty()) { ZEN_CONSOLE("Uploading '{}' from '{}' to cloud endpoint '{}'. SessionId: '{}'. Namespace '{}', Bucket '{}', {}BuildId '{}'", m_BuildPartName, Path, m_BuildsUrl, Http.GetSessionId(), m_Namespace, m_Bucket, GeneratedBuildId ? "Generated " : "", BuildId); Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName); StorageName = "Cloud DDC"; } else if (!m_StoragePath.empty()) { std::filesystem::path StoragePath = StringToPath(m_StoragePath); ZEN_CONSOLE("Uploading '{}' from '{}' to folder '{}'. {}BuildId '{}'", m_BuildPartName, Path, StoragePath, GeneratedBuildId ? "Generated " : "", BuildId); Storage = CreateFileBuildStorage(StoragePath, StorageStats, m_WriteMetadataAsJson, DefaultLatency, DefaultDelayPerKBSec); StorageName = fmt::format("Disk {}", StoragePath.stem()); } else { throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); } CbObject MetaData; if (m_CreateBuild) { if (!m_BuildMetadataPath.empty()) { std::filesystem::path MetadataPath(m_BuildMetadataPath); IoBuffer MetaDataJson = ReadFile(MetadataPath).Flatten(); std::string_view Json(reinterpret_cast(MetaDataJson.GetData()), MetaDataJson.GetSize()); std::string JsonError; MetaData = LoadCompactBinaryFromJson(Json, JsonError).AsObject(); if (!JsonError.empty()) { throw std::runtime_error( fmt::format("build metadata file '{}' is malformed. Reason: '{}'", m_BuildMetadataPath, JsonError)); } } if (!m_BuildMetadata.empty()) { CbObjectWriter MetaDataWriter(1024); ForEachStrTok(m_BuildMetadata, ';', [&](std::string_view Pair) { size_t SplitPos = Pair.find('='); if (SplitPos == std::string::npos || SplitPos == 0) { throw std::runtime_error(fmt::format("build metadata key-value pair '{}' is malformed", Pair)); } MetaDataWriter.AddString(Pair.substr(0, SplitPos), Pair.substr(SplitPos + 1)); return true; }); MetaData = MetaDataWriter.Save(); } } UploadFolder(*Storage, BuildId, BuildPartId, m_BuildPartName, Path, m_ManifestPath, m_BlockReuseMinPercentLimit, m_AllowMultiparts, MetaData, m_CreateBuild, m_Clean, m_PostUploadVerify); if (false) { ZEN_CONSOLE( "{}:\n" "Read: {}\n" "Write: {}\n" "Requests: {}\n" "Avg Request Time: {}\n" "Avg I/O Time: {}", StorageName, NiceBytes(StorageStats.TotalBytesRead.load()), NiceBytes(StorageStats.TotalBytesWritten.load()), StorageStats.TotalRequestCount.load(), StorageStats.TotalExecutionTimeUs.load() > 0 ? NiceTimeSpanMs(StorageStats.TotalExecutionTimeUs.load() / 1000 / StorageStats.TotalRequestCount.load()) : 0, StorageStats.TotalRequestCount.load() > 0 ? NiceTimeSpanMs(StorageStats.TotalRequestTimeUs.load() / 1000 / StorageStats.TotalRequestCount.load()) : 0); } return AbortFlag ? 11 : 0; } if (SubOption == &m_DownloadOptions) { ParseStorageOptions(); ParseAuthOptions(); HttpClient Http(m_BuildsUrl, ClientSettings); if (m_Path.empty()) { throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_DownloadOptions.help())); } if (m_BuildId.empty()) { throw zen::OptionParseException(fmt::format("build-id is required\n{}", m_DownloadOptions.help())); } Oid BuildId = Oid::TryFromHexString(m_BuildId); if (BuildId == Oid::Zero) { throw zen::OptionParseException(fmt::format("build-id is invalid\n{}", m_DownloadOptions.help())); } if (!m_BuildPartName.empty() && !m_BuildPartId.empty()) { throw zen::OptionParseException(fmt::format("build-part-id conflicts with build-part-name\n{}", m_DownloadOptions.help())); } std::vector BuildPartIds; for (const std::string& BuildPartId : m_BuildPartIds) { BuildPartIds.push_back(Oid::TryFromHexString(BuildPartId)); if (BuildPartIds.back() == Oid::Zero) { throw zen::OptionParseException( fmt::format("build-part-id '{}' is invalid\n{}", BuildPartId, m_DownloadOptions.help())); } } std::filesystem::path Path = StringToPath(m_Path); BuildStorage::Statistics StorageStats; std::unique_ptr Storage; std::string StorageName; if (!m_BuildsUrl.empty()) { ZEN_CONSOLE("Downloading Build '{}' to '{}' from {}. SessionId: '{}'. Namespace '{}', Bucket '{}'", BuildId, Path, m_BuildsUrl, Http.GetSessionId(), m_Namespace, m_Bucket); Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName); StorageName = ZEN_CLOUD_STORAGE; } else if (!m_StoragePath.empty()) { std::filesystem::path StoragePath = StringToPath(m_StoragePath); ZEN_CONSOLE("Downloading Build '{}' to '{}' from folder {}", BuildId, Path, StoragePath); Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); StorageName = fmt::format("Disk {}", StoragePath.stem()); } else { throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); } DownloadFolder(*Storage, BuildId, BuildPartIds, m_BuildPartNames, Path, m_AllowMultiparts, m_AllowPartialBlockRequests, m_Clean, m_PostDownloadVerify); if (false) { ZEN_CONSOLE( "{}:\n" "Read: {}\n" "Write: {}\n" "Requests: {}\n" "Avg Request Time: {}\n" "Avg I/O Time: {}", StorageName, NiceBytes(StorageStats.TotalBytesRead.load()), NiceBytes(StorageStats.TotalBytesWritten.load()), StorageStats.TotalRequestCount.load(), StorageStats.TotalExecutionTimeUs.load() > 0 ? NiceTimeSpanMs(StorageStats.TotalExecutionTimeUs.load() / 1000 / StorageStats.TotalRequestCount.load()) : 0, StorageStats.TotalRequestCount.load() > 0 ? NiceTimeSpanMs(StorageStats.TotalRequestTimeUs.load() / 1000 / StorageStats.TotalRequestCount.load()) : 0); } return AbortFlag ? 11 : 0; } if (SubOption == &m_DiffOptions) { if (m_Path.empty()) { throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_DownloadOptions.help())); } if (m_DiffPath.empty()) { throw zen::OptionParseException(fmt::format("compare-path is required\n{}", m_DownloadOptions.help())); } std::filesystem::path Path = StringToPath(m_Path); DiffFolders(Path, m_DiffPath, m_OnlyChunked); return AbortFlag ? 11 : 0; } if (SubOption == &m_MultiTestDownloadOptions) { if (m_Path.empty()) { throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_DownloadOptions.help())); } ParseStorageOptions(); ParseAuthOptions(); HttpClient Http(m_BuildsUrl, ClientSettings); // m_StoragePath = "D:\\buildstorage"; // m_Path = "F:\\Saved\\DownloadedBuilds\\++Fortnite+Main-CL-XXXXXXXX\\WindowsClient"; // std::vector BuildIdStrings{"07d3942f0e7f4ca1b13b0587", // "07d394eed89d769f2254e75d", // "07d3953f22fa3f8000fa6f0a", // "07d3959df47ed1f42ddbe44c", // "07d395fa7803d50804f14417", // "07d3964f919d577a321a1fdd", // "07d396a6ce875004e16b9528"}; std::filesystem::path Path = StringToPath(m_Path); BuildStorage::Statistics StorageStats; std::unique_ptr Storage; std::string StorageName; if (!m_BuildsUrl.empty()) { ZEN_CONSOLE("Downloading {} to '{}' from {}. SessionId: '{}'. Namespace '{}', Bucket '{}'", FormatArray(m_BuildIds, " "sv), Path, m_BuildsUrl, Http.GetSessionId(), m_Namespace, m_Bucket); Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName); StorageName = ZEN_CLOUD_STORAGE; } else if (!m_StoragePath.empty()) { std::filesystem::path StoragePath = StringToPath(m_StoragePath); ZEN_CONSOLE("Downloading '{}' to '{}' from folder '{}'", FormatArray(m_BuildIds, " "sv), Path, StoragePath); Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); StorageName = fmt::format("Disk {}", StoragePath.stem()); } else { throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); } Stopwatch Timer; for (const std::string& BuildIdString : m_BuildIds) { Oid BuildId = Oid::FromHexString(BuildIdString); if (BuildId == Oid::Zero) { throw zen::OptionParseException(fmt::format("invalid build id {}\n{}", BuildIdString, m_DownloadOptions.help())); } DownloadFolder(*Storage, BuildId, {}, {}, Path, m_AllowMultiparts, m_AllowPartialBlockRequests, BuildIdString == m_BuildIds.front(), true); if (AbortFlag) { ZEN_CONSOLE("Download cancelled"); return 11; } ZEN_CONSOLE("\n"); } ZEN_CONSOLE("Completed in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); return 0; } if (SubOption == &m_TestOptions) { ParseStorageOptions(); ParseAuthOptions(); HttpClient Http(m_BuildsUrl, ClientSettings); if (m_Path.empty()) { throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_DownloadOptions.help())); } std::filesystem::path Path = StringToPath(m_Path); m_BuildId = Oid::NewOid().ToString(); m_BuildPartName = Path.filename().string(); m_BuildPartId = Oid::NewOid().ToString(); m_CreateBuild = true; BuildStorage::Statistics StorageStats; const Oid BuildId = Oid::FromHexString(m_BuildId); const Oid BuildPartId = Oid::FromHexString(m_BuildPartId); std::unique_ptr Storage; std::string StorageName; std::filesystem::path StoragePath = StringToPath(m_StoragePath); if (m_BuildsUrl.empty() && StoragePath.empty()) { m_StoragePath = (GetRunningExecutablePath().parent_path() / ".tmpstore").generic_string(); CreateDirectories(StoragePath); CleanDirectory(StoragePath, {}); } auto _ = MakeGuard([&]() { if (m_BuildsUrl.empty() && StoragePath.empty()) { DeleteDirectories(StoragePath); } }); if (!m_BuildsUrl.empty()) { ZEN_CONSOLE("Using '{}' to '{}' from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'", m_BuildPartName.empty() ? m_BuildPartId : m_BuildPartName, Path, m_BuildsUrl, Http.GetSessionId(), m_Namespace, m_Bucket, BuildId); Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName); StorageName = ZEN_CLOUD_STORAGE; } else if (!StoragePath.empty()) { ZEN_CONSOLE("Using '{}' to '{}' from folder {}. BuildId '{}'", m_BuildPartName.empty() ? m_BuildPartId : m_BuildPartName, Path, StoragePath, BuildId); Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); StorageName = fmt::format("Disk {}", StoragePath.stem()); } else { throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); } auto MakeMetaData = [](const Oid& BuildId) -> CbObject { CbObjectWriter BuildMetaDataWriter; { const uint32_t CL = BuildId.OidBits[2]; BuildMetaDataWriter.AddString("name", fmt::format("++Test+Main-CL-{}", CL)); BuildMetaDataWriter.AddString("branch", "ZenTestBuild"); BuildMetaDataWriter.AddString("baselineBranch", "ZenTestBuild"); BuildMetaDataWriter.AddString("platform", "Windows"); BuildMetaDataWriter.AddString("project", "Test"); BuildMetaDataWriter.AddInteger("changelist", CL); BuildMetaDataWriter.AddString("buildType", "test-folder"); } return BuildMetaDataWriter.Save(); }; CbObject MetaData = MakeMetaData(Oid::TryFromHexString(m_BuildId)); { ExtendableStringBuilder<256> SB; CompactBinaryToJson(MetaData, SB); ZEN_CONSOLE("Upload Build {}, Part {} ({})\n{}", m_BuildId, BuildPartId, m_BuildPartName, SB.ToView()); } UploadFolder(*Storage, BuildId, BuildPartId, m_BuildPartName, Path, {}, m_BlockReuseMinPercentLimit, m_AllowMultiparts, MetaData, true, false, true); if (AbortFlag) { ZEN_CONSOLE("Upload failed."); return 11; } const std::filesystem::path DownloadPath = Path.parent_path() / (m_BuildPartName + "_download"); ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}'", BuildId, BuildPartId, m_BuildPartName, DownloadPath); DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, true, true); if (AbortFlag) { ZEN_CONSOLE("Download failed."); return 11; } ZEN_CONSOLE("\nRe-download Build {}, Part {} ({}) to '{}' (identical target)", BuildId, BuildPartId, m_BuildPartName, DownloadPath); DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true); if (AbortFlag) { ZEN_CONSOLE("Re-download failed. (identical target)"); return 11; } auto ScrambleDir = [](const std::filesystem::path& Path) { ZEN_CONSOLE("\nScrambling '{}'", Path); Stopwatch Timer; DirectoryContent DownloadContent; GetDirectoryContent( Path, DirectoryContentFlags::Recursive | DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeFileSizes, DownloadContent); auto IsAcceptedFolder = [ExcludeFolders = DefaultExcludeFolders, Path](const std::filesystem::path& AbsolutePath) -> bool { std::string RelativePath = std::filesystem::relative(AbsolutePath, Path).generic_string(); for (const std::string_view& ExcludeFolder : ExcludeFolders) { if (RelativePath.starts_with(ExcludeFolder)) { if (RelativePath.length() == ExcludeFolder.length()) { return false; } else if (RelativePath[ExcludeFolder.length()] == '/') { return false; } } } return true; }; ParallellWork Work(AbortFlag); uint32_t Randomizer = 0; auto FileSizeIt = DownloadContent.FileSizes.begin(); for (const std::filesystem::path& FilePath : DownloadContent.Files) { if (IsAcceptedFolder(FilePath)) { uint32_t Case = (Randomizer++) % 7; switch (Case) { case 0: { uint64_t SourceSize = *FileSizeIt; if (SourceSize > 256) { Work.ScheduleWork( GetMediumWorkerPool(EWorkloadType::Burst), [SourceSize, FilePath](std::atomic&) { if (!AbortFlag) { bool IsReadOnly = SetFileReadOnly(FilePath, false); { BasicFile Source(FilePath, BasicFile::Mode::kWrite); uint64_t RangeSize = Min(SourceSize / 3, 512u * 1024u); IoBuffer TempBuffer1(RangeSize); IoBuffer TempBuffer2(RangeSize); IoBuffer TempBuffer3(RangeSize); Source.Read(TempBuffer1.GetMutableView().GetData(), RangeSize, 0); Source.Read(TempBuffer2.GetMutableView().GetData(), RangeSize, SourceSize / 2); Source.Read(TempBuffer3.GetMutableView().GetData(), RangeSize, SourceSize - RangeSize); Source.Write(TempBuffer1, SourceSize / 2); Source.Write(TempBuffer2, SourceSize - RangeSize); Source.Write(TempBuffer3, SourceSize - 0); } if (IsReadOnly) { SetFileReadOnly(FilePath, true); } } }, Work.DefaultErrorFunction()); } } break; case 1: std::filesystem::remove(FilePath); break; default: break; } } FileSizeIt++; } Work.Wait(5000, [&](bool IsAborted, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted); ZEN_CONSOLE("Scrambling files, {} remaining", PendingWork); }); ZEN_ASSERT(!AbortFlag.load()); ZEN_CONSOLE("Scrambled files in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }; ScrambleDir(DownloadPath); ZEN_CONSOLE("\nRe-download Build {}, Part {} ({}) to '{}' (scrambled target)", BuildId, BuildPartId, m_BuildPartName, DownloadPath); DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true); if (AbortFlag) { ZEN_CONSOLE("Re-download failed. (scrambled target)"); return 11; } ScrambleDir(DownloadPath); Oid BuildId2 = Oid::NewOid(); Oid BuildPartId2 = Oid::NewOid(); CbObject MetaData2 = MakeMetaData(BuildId2); { ExtendableStringBuilder<256> SB; CompactBinaryToJson(MetaData, SB); ZEN_CONSOLE("\nUpload scrambled Build {}, Part {} ({})\n{}\n", BuildId2, BuildPartId2, m_BuildPartName, SB.ToView()); } UploadFolder(*Storage, BuildId2, BuildPartId2, m_BuildPartName, DownloadPath, {}, m_BlockReuseMinPercentLimit, m_AllowMultiparts, MetaData2, true, false, true); if (AbortFlag) { ZEN_CONSOLE("Upload of scrambled failed."); return 11; } ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (original)", BuildId, BuildPartId, m_BuildPartName, DownloadPath); DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true); if (AbortFlag) { ZEN_CONSOLE("Re-download failed."); return 11; } ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath); DownloadFolder(*Storage, BuildId2, {BuildPartId2}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true); if (AbortFlag) { ZEN_CONSOLE("Re-download failed."); return 11; } ZEN_CONSOLE("\nRe-download Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath); DownloadFolder(*Storage, BuildId2, {BuildPartId2}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true); if (AbortFlag) { ZEN_CONSOLE("Re-download failed."); return 11; } return 0; } if (SubOption == &m_FetchBlobOptions) { ParseStorageOptions(); ParseAuthOptions(); HttpClient Http(m_BuildsUrl, ClientSettings); if (m_BlobHash.empty()) { throw zen::OptionParseException(fmt::format("Blob hash string is missing\n{}", m_UploadOptions.help())); } IoHash BlobHash; if (!IoHash::TryParse(m_BlobHash, BlobHash)) { throw zen::OptionParseException(fmt::format("Blob hash string is invalid\n{}", m_UploadOptions.help())); } if (m_BuildsUrl.empty() && m_StoragePath.empty()) { throw zen::OptionParseException(fmt::format("At least one storage option is required\n{}", m_UploadOptions.help())); } BuildStorage::Statistics StorageStats; const Oid BuildId = Oid::FromHexString(m_BuildId); std::unique_ptr Storage; std::string StorageName; std::filesystem::path Path = StringToPath(m_Path); if (!m_BuildsUrl.empty()) { ZEN_CONSOLE("Using from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'", m_BuildsUrl, Http.GetSessionId(), m_Namespace, m_Bucket, BuildId); Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName); StorageName = ZEN_CLOUD_STORAGE; } else if (!m_StoragePath.empty()) { std::filesystem::path StoragePath = StringToPath(m_StoragePath); ZEN_CONSOLE("Using folder {}. BuildId '{}'", StoragePath, BuildId); Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); StorageName = fmt::format("Disk {}", StoragePath.stem()); } else { throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); } uint64_t CompressedSize; uint64_t DecompressedSize; ValidateBlob(*Storage, BuildId, BlobHash, CompressedSize, DecompressedSize); if (AbortFlag) { return 11; } ZEN_CONSOLE("Blob '{}' has a compressed size {} and a decompressed size of {} bytes", BlobHash, CompressedSize, DecompressedSize); return 0; } if (SubOption == &m_ValidateBuildPartOptions) { ParseStorageOptions(); ParseAuthOptions(); HttpClient Http(m_BuildsUrl, ClientSettings); if (m_BuildsUrl.empty() && m_StoragePath.empty()) { throw zen::OptionParseException(fmt::format("At least one storage option is required\n{}", m_UploadOptions.help())); } if (m_BuildId.empty()) { throw zen::OptionParseException(fmt::format("build-id is required\n{}", m_DownloadOptions.help())); } Oid BuildId = Oid::TryFromHexString(m_BuildId); if (BuildId == Oid::Zero) { throw zen::OptionParseException(fmt::format("build-id is invalid\n{}", m_DownloadOptions.help())); } if (!m_BuildPartName.empty() && !m_BuildPartId.empty()) { throw zen::OptionParseException(fmt::format("build-part-id conflicts with build-part-name\n{}", m_DownloadOptions.help())); } BuildStorage::Statistics StorageStats; std::unique_ptr Storage; std::string StorageName; std::filesystem::path Path = StringToPath(m_Path); if (!m_BuildsUrl.empty()) { ZEN_CONSOLE("Using from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'", m_BuildsUrl, Http.GetSessionId(), m_Namespace, m_Bucket, BuildId); Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, Path / ZenTempStorageFolderName); StorageName = ZEN_CLOUD_STORAGE; } else if (!m_StoragePath.empty()) { std::filesystem::path StoragePath = StringToPath(m_StoragePath); ZEN_CONSOLE("Using folder {}. BuildId '{}'", StoragePath, BuildId); Storage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); StorageName = fmt::format("Disk {}", StoragePath.stem()); } else { throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); } Oid BuildPartId = Oid::TryFromHexString(m_BuildPartId); ValidateStatistics ValidateStats; DownloadStatistics DownloadStats; ValidateBuildPart(*Storage, BuildId, BuildPartId, m_BuildPartName, ValidateStats, DownloadStats); return AbortFlag ? 13 : 0; } } catch (const ParallellWorkException& Ex) { for (const std::string& Error : Ex.m_Errors) { ZEN_ERROR("{}", Error); } return 3; } catch (const std::exception& Ex) { ZEN_ERROR("{}", Ex.what()); return 3; } ZEN_ASSERT(false); } } // namespace zen