// Copyright Epic Games, Inc. All Rights Reserved. #include "builds_cmd.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include #include #include ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_PLATFORM_WINDOWS # include #else # include # include # include # include #endif #define EXTRA_VERIFY 0 namespace zen { namespace { static std::atomic AbortFlag = false; static void SignalCallbackHandler(int SigNum) { if (SigNum == SIGINT) { AbortFlag = true; } #if ZEN_PLATFORM_WINDOWS if (SigNum == SIGBREAK) { AbortFlag = true; } #endif // ZEN_PLATFORM_WINDOWS } using namespace std::literals; static const size_t DefaultMaxBlockSize = 64u * 1024u * 1024u; static const size_t DefaultMaxChunkEmbedSize = 3u * 512u * 1024u; struct ChunksBlockParameters { size_t MaxBlockSize = DefaultMaxBlockSize; size_t MaxChunkEmbedSize = DefaultMaxChunkEmbedSize; }; const ChunksBlockParameters DefaultChunksBlockParams{.MaxBlockSize = 32u * 1024u * 1024u, .MaxChunkEmbedSize = DefaultChunkedParams.MaxSize}; const double DefaultLatency = 0; // .0010; const double DefaultDelayPerKBSec = 0; // 0.00005; const std::string ZenFolderName = ".zen"; const std::string ZenStateFilePath = fmt::format("{}/current_state.cbo", ZenFolderName); const std::string ZenStateFileJsonPath = fmt::format("{}/current_state.json", ZenFolderName); const std::string ZenTempFolderName = fmt::format("{}/tmp", ZenFolderName); const std::string ZenTempCacheFolderName = fmt::format("{}/cache", ZenTempFolderName); const std::string ZenTempStorageFolderName = fmt::format("{}/storage", ZenTempFolderName); const std::string ZenTempBlockFolderName = fmt::format("{}/blocks", ZenTempFolderName); const std::string ZenTempChunkFolderName = fmt::format("{}/chunks", ZenTempFolderName); const std::string ZenExcludeManifestName = ".zen_exclude_manifest.txt"; const std::string UnsyncFolderName = ".unsync"; const std::string UGSFolderName = ".ugs"; const std::string LegacyZenTempFolderName = ".zen-tmp"; const eastl::vector DefaultExcludeFolders({UnsyncFolderName, ZenFolderName, UGSFolderName, LegacyZenTempFolderName}); const eastl::vector DefaultExcludeExtensions({}); static bool IsVerbose = false; static bool UsePlainProgress = false; #define ZEN_CONSOLE_VERBOSE(fmtstr, ...) \ if (IsVerbose) \ { \ ZEN_CONSOLE_LOG(zen::logging::level::Info, fmtstr, ##__VA_ARGS__); \ } const std::string DefaultAccessTokenEnvVariableName( #if ZEN_PLATFORM_WINDOWS "UE-CloudDataCacheAccessToken"sv #endif #if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC "UE_CloudDataCacheAccessToken"sv #endif ); uint32_t SetNativeFileAttributes(const std::filesystem::path FilePath, SourcePlatform SourcePlatform, uint32_t Attributes) { #if ZEN_PLATFORM_WINDOWS if (SourcePlatform == SourcePlatform::Windows) { SetFileAttributes(FilePath, Attributes); return Attributes; } else { uint32_t CurrentAttributes = GetFileAttributes(FilePath); uint32_t NewAttributes = MakeFileAttributeReadOnly(CurrentAttributes, IsFileModeReadOnly(Attributes)); if (CurrentAttributes != NewAttributes) { SetFileAttributes(FilePath, NewAttributes); } return NewAttributes; } #endif // ZEN_PLATFORM_WINDOWS #if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC if (SourcePlatform != SourcePlatform::Windows) { SetFileMode(FilePath, Attributes); return Attributes; } else { uint32_t CurrentMode = GetFileMode(FilePath); uint32_t NewMode = MakeFileModeReadOnly(CurrentMode, IsFileAttributeReadOnly(Attributes)); if (CurrentMode != NewMode) { SetFileMode(FilePath, NewMode); } return NewMode; } #endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC }; uint32_t GetNativeFileAttributes(const std::filesystem::path FilePath) { #if ZEN_PLATFORM_WINDOWS return GetFileAttributes(FilePath); #endif // ZEN_PLATFORM_WINDOWS #if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC return GetFileMode(FilePath); #endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC } template std::string FormatArray(eastl::span Items, std::string_view Prefix) { ExtendableStringBuilder<512> SB; for (const T& Item : Items) { SB.Append(fmt::format("{}{}", Prefix, Item)); } return SB.ToString(); } void CleanDirectory(const std::filesystem::path& Path, eastl::span ExcludeDirectories) { ZEN_TRACE_CPU("CleanDirectory"); DirectoryContent LocalDirectoryContent; GetDirectoryContent(Path, DirectoryContentFlags::IncludeDirs | DirectoryContentFlags::IncludeFiles, LocalDirectoryContent); for (const std::filesystem::path& LocalFilePath : LocalDirectoryContent.Files) { std::filesystem::remove(LocalFilePath); } for (const std::filesystem::path& LocalDirPath : LocalDirectoryContent.Directories) { bool Leave = false; for (const std::string_view ExcludeDirectory : ExcludeDirectories) { if (LocalDirPath == (Path / ExcludeDirectory)) { Leave = true; break; } } if (!Leave) { zen::CleanDirectory(LocalDirPath); std::filesystem::remove(LocalDirPath); } } } std::string ReadAccessTokenFromFile(const std::filesystem::path& Path) { if (!std::filesystem::is_regular_file(Path)) { throw std::runtime_error(fmt::format("the file '{}' does not exist", Path)); } IoBuffer Body = IoBufferBuilder::MakeFromFile(Path); std::string JsonText(reinterpret_cast(Body.GetData()), Body.GetSize()); std::string JsonError; json11::Json TokenInfo = json11::Json::parse(JsonText, JsonError); if (!JsonError.empty()) { throw std::runtime_error(fmt::format("failed parsing json file '{}'. Reason: '{}'", Path, JsonError)); } const std::string AuthToken = TokenInfo["Token"].string_value(); if (AuthToken.empty()) { throw std::runtime_error(fmt::format("the json file '{}' does not contain a value for \"Token\"", Path)); } return AuthToken; } CompositeBuffer WriteToTempFileIfNeeded(const CompositeBuffer& Buffer, const std::filesystem::path& TempFolderPath, const IoHash& Hash) { // If this is a file based buffer or a compressed buffer with a memory-based header, we don't need to rewrite to disk to save memory eastl::span Segments = Buffer.GetSegments(); ZEN_ASSERT(Buffer.GetSegments().size() > 0); IoBufferFileReference FileRef; if (Segments.back().GetFileReference(FileRef)) { return Buffer; } std::filesystem::path TempFilePath = (TempFolderPath / Hash.ToHexString()).make_preferred(); return CompositeBuffer(WriteToTempFile(Buffer, TempFilePath)); } class FilteredRate { public: FilteredRate() {} void Start() { if (StartTimeUS == (uint64_t)-1) { uint64_t Expected = (uint64_t)-1; if (StartTimeUS.compare_exchange_weak(Expected, Timer.GetElapsedTimeUs())) { LastTimeUS = StartTimeUS.load(); } } } void Stop() { if (EndTimeUS == (uint64_t)-1) { uint64_t Expected = (uint64_t)-1; EndTimeUS.compare_exchange_weak(Expected, Timer.GetElapsedTimeUs()); } } void Update(uint64_t Count) { if (LastTimeUS == (uint64_t)-1) { return; } uint64_t TimeUS = Timer.GetElapsedTimeUs(); uint64_t TimeDeltaUS = TimeUS - LastTimeUS; if (TimeDeltaUS >= 2000000) { uint64_t Delta = Count - LastCount; uint64_t PerSecond = (Delta * 1000000) / TimeDeltaUS; LastPerSecond = PerSecond; LastCount = Count; FilteredPerSecond = (PerSecond + (LastPerSecond * 7)) / 8; LastTimeUS = TimeUS; } } uint64_t GetCurrent() const // If Stopped - return total count / total time { if (LastTimeUS == (uint64_t)-1) { return 0; } return FilteredPerSecond; } uint64_t GetElapsedTime() const { if (StartTimeUS == (uint64_t)-1) { return 0; } if (EndTimeUS == (uint64_t)-1) { return 0; } uint64_t TimeDeltaUS = EndTimeUS - StartTimeUS; return TimeDeltaUS; } bool IsActive() const { return (StartTimeUS != (uint64_t)-1) && (EndTimeUS == (uint64_t)-1); } private: Stopwatch Timer; std::atomic StartTimeUS = (uint64_t)-1; std::atomic EndTimeUS = (uint64_t)-1; std::atomic LastTimeUS = (uint64_t)-1; uint64_t LastCount = 0; uint64_t LastPerSecond = 0; uint64_t FilteredPerSecond = 0; }; uint64_t GetBytesPerSecond(uint64_t ElapsedWallTimeUS, uint64_t Count) { if (ElapsedWallTimeUS == 0) { return 0; } return Count * 1000000 / ElapsedWallTimeUS; } std::filesystem::path GetTempChunkedSequenceFileName(const std::filesystem::path& CacheFolderPath, const IoHash& RawHash) { return (CacheFolderPath / (RawHash.ToHexString() + ".tmp")).make_preferred(); } std::filesystem::path GetFinalChunkedSequenceFileName(const std::filesystem::path& CacheFolderPath, const IoHash& RawHash) { return (CacheFolderPath / RawHash.ToHexString()).make_preferred(); } ChunkedFolderContent ScanAndChunkFolder( GetFolderContentStatistics& GetFolderContentStats, ChunkingStatistics& ChunkingStats, const std::filesystem::path& Path, std::function&& IsAcceptedFolder, std::function&& IsAcceptedFile, ChunkingController& ChunkController) { ZEN_TRACE_CPU("ScanAndChunkFolder"); FolderContent Content = GetFolderContent( GetFolderContentStats, Path, std::move(IsAcceptedFolder), std::move(IsAcceptedFile), GetMediumWorkerPool(EWorkloadType::Burst), UsePlainProgress ? 5000 : 200, [](bool, std::ptrdiff_t) {}, AbortFlag); if (AbortFlag) { return {}; } ProgressBar ProgressBar(UsePlainProgress); FilteredRate FilteredBytesHashed; FilteredBytesHashed.Start(); ChunkedFolderContent FolderContent = ChunkFolderContent( ChunkingStats, GetMediumWorkerPool(EWorkloadType::Burst), Path, Content, ChunkController, UsePlainProgress ? 5000 : 200, [&](bool, std::ptrdiff_t) { FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load()); std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found", ChunkingStats.FilesProcessed.load(), GetFolderContentStats.AcceptedFileCount.load(), NiceBytes(ChunkingStats.BytesHashed.load()), NiceBytes(GetFolderContentStats.FoundFileByteCount), NiceNum(FilteredBytesHashed.GetCurrent()), ChunkingStats.UniqueChunksFound.load(), NiceBytes(ChunkingStats.UniqueBytesFound.load())); ProgressBar.UpdateState({.Task = "Scanning files ", .Details = Details, .TotalCount = GetFolderContentStats.AcceptedFileByteCount, .RemainingCount = GetFolderContentStats.AcceptedFileByteCount - ChunkingStats.BytesHashed.load()}, false); }, AbortFlag); if (AbortFlag) { return {}; } FilteredBytesHashed.Stop(); ProgressBar.Finish(); ZEN_CONSOLE("Found {} ({}) files divided into {} ({}) unique chunks in '{}' in {}. Average hash rate {}B/sec", ChunkingStats.FilesProcessed.load(), NiceBytes(ChunkingStats.BytesHashed.load()), ChunkingStats.UniqueChunksFound.load(), NiceBytes(ChunkingStats.UniqueBytesFound.load()), Path, NiceTimeSpanMs((GetFolderContentStats.ElapsedWallTimeUS + ChunkingStats.ElapsedWallTimeUS) / 1000), NiceNum(GetBytesPerSecond(ChunkingStats.ElapsedWallTimeUS, ChunkingStats.BytesHashed))); return FolderContent; }; struct DiskStatistics { std::atomic OpenReadCount = 0; std::atomic OpenWriteCount = 0; std::atomic ReadCount = 0; std::atomic ReadByteCount = 0; std::atomic WriteCount = 0; std::atomic WriteByteCount = 0; std::atomic CurrentOpenFileCount = 0; }; struct FindBlocksStatistics { uint64_t FindBlockTimeMS = 0; uint64_t PotentialChunkCount = 0; uint64_t PotentialChunkByteCount = 0; uint64_t FoundBlockCount = 0; uint64_t FoundBlockChunkCount = 0; uint64_t FoundBlockByteCount = 0; uint64_t AcceptedBlockCount = 0; uint64_t AcceptedChunkCount = 0; uint64_t AcceptedByteCount = 0; uint64_t RejectedBlockCount = 0; uint64_t RejectedChunkCount = 0; uint64_t RejectedByteCount = 0; uint64_t AcceptedReduntantChunkCount = 0; uint64_t AcceptedReduntantByteCount = 0; uint64_t NewBlocksCount = 0; uint64_t NewBlocksChunkCount = 0; uint64_t NewBlocksChunkByteCount = 0; }; struct UploadStatistics { std::atomic BlockCount = 0; std::atomic BlocksBytes = 0; std::atomic ChunkCount = 0; std::atomic ChunksBytes = 0; std::atomic ReadFromDiskBytes = 0; std::atomic MultipartAttachmentCount = 0; uint64_t ElapsedWallTimeUS = 0; UploadStatistics& operator+=(const UploadStatistics& Rhs) { BlockCount += Rhs.BlockCount; BlocksBytes += Rhs.BlocksBytes; ChunkCount += Rhs.ChunkCount; ChunksBytes += Rhs.ChunksBytes; ReadFromDiskBytes += Rhs.ReadFromDiskBytes; MultipartAttachmentCount += Rhs.MultipartAttachmentCount; ElapsedWallTimeUS += Rhs.ElapsedWallTimeUS; return *this; } }; struct LooseChunksStatistics { uint64_t ChunkCount = 0; uint64_t ChunkByteCount = 0; std::atomic CompressedChunkCount = 0; std::atomic CompressedChunkBytes = 0; uint64_t CompressChunksElapsedWallTimeUS = 0; LooseChunksStatistics& operator+=(const LooseChunksStatistics& Rhs) { ChunkCount += Rhs.ChunkCount; ChunkByteCount += Rhs.ChunkByteCount; CompressedChunkCount += Rhs.CompressedChunkCount; CompressedChunkBytes += Rhs.CompressedChunkBytes; CompressChunksElapsedWallTimeUS += Rhs.CompressChunksElapsedWallTimeUS; return *this; } }; struct GenerateBlocksStatistics { std::atomic GeneratedBlockByteCount = 0; std::atomic GeneratedBlockCount = 0; uint64_t GenerateBlocksElapsedWallTimeUS = 0; GenerateBlocksStatistics& operator+=(const GenerateBlocksStatistics& Rhs) { GeneratedBlockByteCount += Rhs.GeneratedBlockByteCount; GeneratedBlockCount += Rhs.GeneratedBlockCount; GenerateBlocksElapsedWallTimeUS += Rhs.GenerateBlocksElapsedWallTimeUS; return *this; } }; eastl::vector CalculateAbsoluteChunkOrders(const eastl::span LocalChunkHashes, const eastl::span LocalChunkOrder, const tsl::robin_map& ChunkHashToLocalChunkIndex, const eastl::span& LooseChunkIndexes, const eastl::span& BlockDescriptions) { ZEN_TRACE_CPU("CalculateAbsoluteChunkOrders"); #if EXTRA_VERIFY eastl::vector TmpAbsoluteChunkHashes; TmpAbsoluteChunkHashes.reserve(LocalChunkHashes.size()); #endif // EXTRA_VERIFY eastl::vector LocalChunkIndexToAbsoluteChunkIndex; LocalChunkIndexToAbsoluteChunkIndex.resize(LocalChunkHashes.size(), (uint32_t)-1); std::uint32_t AbsoluteChunkCount = 0; for (uint32_t ChunkIndex : LooseChunkIndexes) { LocalChunkIndexToAbsoluteChunkIndex[ChunkIndex] = AbsoluteChunkCount; #if EXTRA_VERIFY TmpAbsoluteChunkHashes.push_back(LocalChunkHashes[ChunkIndex]); #endif // EXTRA_VERIFY AbsoluteChunkCount++; } for (const ChunkBlockDescription& Block : BlockDescriptions) { for (const IoHash& ChunkHash : Block.ChunkRawHashes) { if (auto It = ChunkHashToLocalChunkIndex.find(ChunkHash); It != ChunkHashToLocalChunkIndex.end()) { const uint32_t LocalChunkIndex = It->second; ZEN_ASSERT_SLOW(LocalChunkHashes[LocalChunkIndex] == ChunkHash); LocalChunkIndexToAbsoluteChunkIndex[LocalChunkIndex] = AbsoluteChunkCount; } #if EXTRA_VERIFY TmpAbsoluteChunkHashes.push_back(ChunkHash); #endif // EXTRA_VERIFY AbsoluteChunkCount++; } } eastl::vector AbsoluteChunkOrder; AbsoluteChunkOrder.reserve(LocalChunkHashes.size()); for (const uint32_t LocalChunkIndex : LocalChunkOrder) { const uint32_t AbsoluteChunkIndex = LocalChunkIndexToAbsoluteChunkIndex[LocalChunkIndex]; #if EXTRA_VERIFY ZEN_ASSERT(LocalChunkHashes[LocalChunkIndex] == TmpAbsoluteChunkHashes[AbsoluteChunkIndex]); #endif // EXTRA_VERIFY AbsoluteChunkOrder.push_back(AbsoluteChunkIndex); } #if EXTRA_VERIFY { uint32_t OrderIndex = 0; while (OrderIndex < LocalChunkOrder.size()) { const uint32_t LocalChunkIndex = LocalChunkOrder[OrderIndex]; const IoHash& LocalChunkHash = LocalChunkHashes[LocalChunkIndex]; const uint32_t AbsoluteChunkIndex = AbsoluteChunkOrder[OrderIndex]; const IoHash& AbsoluteChunkHash = TmpAbsoluteChunkHashes[AbsoluteChunkIndex]; ZEN_ASSERT(LocalChunkHash == AbsoluteChunkHash); OrderIndex++; } } #endif // EXTRA_VERIFY return AbsoluteChunkOrder; } void CalculateLocalChunkOrders(const eastl::span& AbsoluteChunkOrders, const eastl::span LooseChunkHashes, const eastl::span LooseChunkRawSizes, const eastl::span& BlockDescriptions, eastl::vector& OutLocalChunkHashes, eastl::vector& OutLocalChunkRawSizes, eastl::vector& OutLocalChunkOrders) { ZEN_TRACE_CPU("CalculateLocalChunkOrders"); eastl::vector AbsoluteChunkHashes; eastl::vector AbsoluteChunkRawSizes; AbsoluteChunkHashes.insert(AbsoluteChunkHashes.end(), LooseChunkHashes.begin(), LooseChunkHashes.end()); AbsoluteChunkRawSizes.insert(AbsoluteChunkRawSizes.end(), LooseChunkRawSizes.begin(), LooseChunkRawSizes.end()); for (const ChunkBlockDescription& Block : BlockDescriptions) { AbsoluteChunkHashes.insert(AbsoluteChunkHashes.end(), Block.ChunkRawHashes.begin(), Block.ChunkRawHashes.end()); AbsoluteChunkRawSizes.insert(AbsoluteChunkRawSizes.end(), Block.ChunkRawLengths.begin(), Block.ChunkRawLengths.end()); } OutLocalChunkHashes.reserve(AbsoluteChunkHashes.size()); OutLocalChunkRawSizes.reserve(AbsoluteChunkRawSizes.size()); OutLocalChunkOrders.reserve(AbsoluteChunkOrders.size()); tsl::robin_map ChunkHashToChunkIndex; ChunkHashToChunkIndex.reserve(AbsoluteChunkHashes.size()); for (uint32_t AbsoluteChunkOrderIndex = 0; AbsoluteChunkOrderIndex < AbsoluteChunkOrders.size(); AbsoluteChunkOrderIndex++) { const uint32_t AbsoluteChunkIndex = AbsoluteChunkOrders[AbsoluteChunkOrderIndex]; const IoHash& AbsoluteChunkHash = AbsoluteChunkHashes[AbsoluteChunkIndex]; const uint64_t AbsoluteChunkRawSize = AbsoluteChunkRawSizes[AbsoluteChunkIndex]; if (auto It = ChunkHashToChunkIndex.find(AbsoluteChunkHash); It != ChunkHashToChunkIndex.end()) { const uint32_t LocalChunkIndex = It->second; OutLocalChunkOrders.push_back(LocalChunkIndex); } else { uint32_t LocalChunkIndex = gsl::narrow(OutLocalChunkHashes.size()); OutLocalChunkHashes.push_back(AbsoluteChunkHash); OutLocalChunkRawSizes.push_back(AbsoluteChunkRawSize); OutLocalChunkOrders.push_back(LocalChunkIndex); ChunkHashToChunkIndex.insert_or_assign(AbsoluteChunkHash, LocalChunkIndex); } #if EXTRA_VERIFY const uint32_t LocalChunkIndex = OutLocalChunkOrders[AbsoluteChunkOrderIndex]; const IoHash& LocalChunkHash = OutLocalChunkHashes[LocalChunkIndex]; const uint64_t& LocalChunkRawSize = OutLocalChunkRawSizes[LocalChunkIndex]; ZEN_ASSERT(LocalChunkHash == AbsoluteChunkHash); ZEN_ASSERT(LocalChunkRawSize == AbsoluteChunkRawSize); #endif // EXTRA_VERIFY } #if EXTRA_VERIFY for (uint32_t OrderIndex = 0; OrderIndex < OutLocalChunkOrders.size(); OrderIndex++) { uint32_t LocalChunkIndex = OutLocalChunkOrders[OrderIndex]; const IoHash LocalChunkHash = OutLocalChunkHashes[LocalChunkIndex]; uint64_t LocalChunkRawSize = OutLocalChunkRawSizes[LocalChunkIndex]; uint32_t VerifyChunkIndex = AbsoluteChunkOrders[OrderIndex]; const IoHash VerifyChunkHash = AbsoluteChunkHashes[VerifyChunkIndex]; uint64_t VerifyChunkRawSize = AbsoluteChunkRawSizes[VerifyChunkIndex]; ZEN_ASSERT(LocalChunkHash == VerifyChunkHash); ZEN_ASSERT(LocalChunkRawSize == VerifyChunkRawSize); } #endif // EXTRA_VERIFY } void WriteBuildContentToCompactBinary(CbObjectWriter& PartManifestWriter, const SourcePlatform Platform, eastl::span Paths, eastl::span RawHashes, eastl::span RawSizes, eastl::span Attributes, eastl::span SequenceRawHashes, eastl::span ChunkCounts, eastl::span LocalChunkHashes, eastl::span LocalChunkRawSizes, eastl::vector AbsoluteChunkOrders, const eastl::span LooseLocalChunkIndexes, const eastl::span BlockHashes) { ZEN_ASSERT(Platform != SourcePlatform::_Count); PartManifestWriter.AddString("platform"sv, ToString(Platform)); uint64_t TotalSize = 0; for (const uint64_t Size : RawSizes) { TotalSize += Size; } PartManifestWriter.AddInteger("totalSize", TotalSize); PartManifestWriter.BeginObject("files"sv); { compactbinary_helpers::WriteArray(Paths, "paths"sv, PartManifestWriter); compactbinary_helpers::WriteArray(RawHashes, "rawhashes"sv, PartManifestWriter); compactbinary_helpers::WriteArray(RawSizes, "rawsizes"sv, PartManifestWriter); if (Platform == SourcePlatform::Windows) { compactbinary_helpers::WriteArray(Attributes, "attributes"sv, PartManifestWriter); } if (Platform == SourcePlatform::Linux || Platform == SourcePlatform::MacOS) { compactbinary_helpers::WriteArray(Attributes, "mode"sv, PartManifestWriter); } } PartManifestWriter.EndObject(); // files PartManifestWriter.BeginObject("chunkedContent"); { compactbinary_helpers::WriteArray(SequenceRawHashes, "sequenceRawHashes"sv, PartManifestWriter); compactbinary_helpers::WriteArray(ChunkCounts, "chunkcounts"sv, PartManifestWriter); compactbinary_helpers::WriteArray(AbsoluteChunkOrders, "chunkorders"sv, PartManifestWriter); } PartManifestWriter.EndObject(); // chunkedContent size_t LooseChunkCount = LooseLocalChunkIndexes.size(); if (LooseChunkCount > 0) { PartManifestWriter.BeginObject("chunkAttachments"); { PartManifestWriter.BeginArray("rawHashes"sv); for (uint32_t ChunkIndex : LooseLocalChunkIndexes) { PartManifestWriter.AddBinaryAttachment(LocalChunkHashes[ChunkIndex]); } PartManifestWriter.EndArray(); // rawHashes PartManifestWriter.BeginArray("chunkRawSizes"sv); for (uint32_t ChunkIndex : LooseLocalChunkIndexes) { PartManifestWriter.AddInteger(LocalChunkRawSizes[ChunkIndex]); } PartManifestWriter.EndArray(); // chunkSizes } PartManifestWriter.EndObject(); // } if (BlockHashes.size() > 0) { PartManifestWriter.BeginObject("blockAttachments"); { compactbinary_helpers::WriteBinaryAttachmentArray(BlockHashes, "rawHashes"sv, PartManifestWriter); } PartManifestWriter.EndObject(); // blocks } } void ReadBuildContentFromCompactBinary(CbObjectView BuildPartManifest, SourcePlatform& OutPlatform, eastl::vector& OutPaths, eastl::vector& OutRawHashes, eastl::vector& OutRawSizes, eastl::vector& OutAttributes, eastl::vector& OutSequenceRawHashes, eastl::vector& OutChunkCounts, eastl::vector& OutAbsoluteChunkOrders, eastl::vector& OutLooseChunkHashes, eastl::vector& OutLooseChunkRawSizes, eastl::vector& OutBlockRawHashes) { OutPlatform = FromString(BuildPartManifest["platform"sv].AsString(), SourcePlatform::_Count); CbObjectView FilesObject = BuildPartManifest["files"sv].AsObjectView(); compactbinary_helpers::ReadArray("paths"sv, FilesObject, OutPaths); compactbinary_helpers::ReadArray("rawhashes"sv, FilesObject, OutRawHashes); compactbinary_helpers::ReadArray("rawsizes"sv, FilesObject, OutRawSizes); uint64_t PathCount = OutPaths.size(); if (OutRawHashes.size() != PathCount) { throw std::runtime_error(fmt::format("Number of raw hashes entries does not match number of paths")); } if (OutRawSizes.size() != PathCount) { throw std::runtime_error(fmt::format("Number of raw sizes entries does not match number of paths")); } eastl::vector ModeArray; compactbinary_helpers::ReadArray("mode"sv, FilesObject, ModeArray); if (ModeArray.size() != PathCount && ModeArray.size() != 0) { throw std::runtime_error(fmt::format("Number of attribute entries does not match number of paths")); } eastl::vector AttributeArray; compactbinary_helpers::ReadArray("attributes"sv, FilesObject, ModeArray); if (AttributeArray.size() != PathCount && AttributeArray.size() != 0) { throw std::runtime_error(fmt::format("Number of attribute entries does not match number of paths")); } if (ModeArray.size() > 0) { if (OutPlatform == SourcePlatform::_Count) { OutPlatform = SourcePlatform::Linux; // Best guess - under dev format } OutAttributes = std::move(ModeArray); } else if (AttributeArray.size() > 0) { if (OutPlatform == SourcePlatform::_Count) { OutPlatform = SourcePlatform::Windows; } OutAttributes = std::move(AttributeArray); } else { if (OutPlatform == SourcePlatform::_Count) { OutPlatform = GetSourceCurrentPlatform(); } } if (CbObjectView ChunkContentView = BuildPartManifest["chunkedContent"sv].AsObjectView(); ChunkContentView) { compactbinary_helpers::ReadArray("sequenceRawHashes"sv, ChunkContentView, OutSequenceRawHashes); compactbinary_helpers::ReadArray("chunkcounts"sv, ChunkContentView, OutChunkCounts); if (OutChunkCounts.size() != OutSequenceRawHashes.size()) { throw std::runtime_error(fmt::format("Number of chunk count entries does not match number of paths")); } compactbinary_helpers::ReadArray("chunkorders"sv, ChunkContentView, OutAbsoluteChunkOrders); } else if (FilesObject["chunkcounts"sv]) { // Legacy zen style eastl::vector LegacyChunkCounts; compactbinary_helpers::ReadArray("chunkcounts"sv, FilesObject, LegacyChunkCounts); if (LegacyChunkCounts.size() != PathCount) { throw std::runtime_error(fmt::format("Number of chunk count entries does not match number of paths")); } eastl::vector LegacyAbsoluteChunkOrders; compactbinary_helpers::ReadArray("chunkorders"sv, FilesObject, LegacyAbsoluteChunkOrders); CbArrayView ChunkOrdersArray = BuildPartManifest["chunkorders"sv].AsArrayView(); const uint64_t ChunkOrdersCount = ChunkOrdersArray.Num(); tsl::robin_set FoundRawHashes; FoundRawHashes.reserve(PathCount); OutChunkCounts.reserve(PathCount); OutAbsoluteChunkOrders.reserve(ChunkOrdersCount); uint32_t OrderIndexOffset = 0; for (uint32_t PathIndex = 0; PathIndex < OutPaths.size(); PathIndex++) { const IoHash& PathRawHash = OutRawHashes[PathIndex]; uint32_t LegacyChunkCount = LegacyChunkCounts[PathIndex]; if (FoundRawHashes.insert(PathRawHash).second) { OutSequenceRawHashes.push_back(PathRawHash); OutChunkCounts.push_back(LegacyChunkCount); eastl::span AbsoluteChunkOrder = eastl::span(LegacyAbsoluteChunkOrders).subspan(OrderIndexOffset, LegacyChunkCount); OutAbsoluteChunkOrders.insert(OutAbsoluteChunkOrders.end(), AbsoluteChunkOrder.begin(), AbsoluteChunkOrder.end()); } OrderIndexOffset += LegacyChunkCounts[PathIndex]; } } else { // Legacy C# style tsl::robin_set FoundRawHashes; FoundRawHashes.reserve(PathCount); uint32_t OrderIndexOffset = 0; for (uint32_t PathIndex = 0; PathIndex < OutPaths.size(); PathIndex++) { if (OutRawSizes[PathIndex] > 0) { const IoHash& PathRawHash = OutRawHashes[PathIndex]; if (FoundRawHashes.insert(PathRawHash).second) { OutSequenceRawHashes.push_back(PathRawHash); OutChunkCounts.push_back(1); OutAbsoluteChunkOrders.push_back(OrderIndexOffset); OutLooseChunkHashes.push_back(PathRawHash); OutLooseChunkRawSizes.push_back(OutRawSizes[PathIndex]); OrderIndexOffset += 1; } } } } CbObjectView ChunkAttachmentsView = BuildPartManifest["chunkAttachments"sv].AsObjectView(); { compactbinary_helpers::ReadBinaryAttachmentArray("rawHashes"sv, ChunkAttachmentsView, OutLooseChunkHashes); compactbinary_helpers::ReadArray("chunkRawSizes"sv, ChunkAttachmentsView, OutLooseChunkRawSizes); if (OutLooseChunkHashes.size() != OutLooseChunkRawSizes.size()) { throw std::runtime_error( fmt::format("Number of attachment chunk hashes does not match number of attachemnt chunk raw sizes")); } } CbObjectView BlocksView = BuildPartManifest["blockAttachments"sv].AsObjectView(); { compactbinary_helpers::ReadBinaryAttachmentArray("rawHashes"sv, BlocksView, OutBlockRawHashes); } } bool ReadStateObject(CbObjectView StateView, Oid& OutBuildId, eastl::vector& BuildPartsIds, eastl::vector& BuildPartsNames, eastl::vector& OutPartContents, FolderContent& OutLocalFolderState) { try { CbObjectView BuildView = StateView["builds"sv].AsArrayView().CreateViewIterator().AsObjectView(); OutBuildId = BuildView["buildId"sv].AsObjectId(); for (CbFieldView PartView : BuildView["parts"sv].AsArrayView()) { CbObjectView PartObjectView = PartView.AsObjectView(); BuildPartsIds.push_back(PartObjectView["partId"sv].AsObjectId()); BuildPartsNames.push_back(std::string(PartObjectView["partName"sv].AsString())); OutPartContents.push_back(LoadChunkedFolderContentToCompactBinary(PartObjectView["content"sv].AsObjectView())); } OutLocalFolderState = LoadFolderContentToCompactBinary(StateView["localFolderState"sv].AsObjectView()); return true; } catch (const std::exception& Ex) { ZEN_CONSOLE("Unable to read local state: ", Ex.what()); return false; } } CbObject CreateStateObject(const Oid& BuildId, eastl::vector> AllBuildParts, eastl::span PartContents, const FolderContent& LocalFolderState) { CbObjectWriter CurrentStateWriter; CurrentStateWriter.BeginArray("builds"sv); { CurrentStateWriter.BeginObject(); { CurrentStateWriter.AddObjectId("buildId"sv, BuildId); CurrentStateWriter.BeginArray("parts"sv); for (size_t PartIndex = 0; PartIndex < AllBuildParts.size(); PartIndex++) { const Oid BuildPartId = AllBuildParts[PartIndex].first; CurrentStateWriter.BeginObject(); { CurrentStateWriter.AddObjectId("partId"sv, BuildPartId); CurrentStateWriter.AddString("partName"sv, AllBuildParts[PartIndex].second); CurrentStateWriter.BeginObject("content"); { SaveChunkedFolderContentToCompactBinary(PartContents[PartIndex], CurrentStateWriter); } CurrentStateWriter.EndObject(); } CurrentStateWriter.EndObject(); } CurrentStateWriter.EndArray(); // parts } CurrentStateWriter.EndObject(); } CurrentStateWriter.EndArray(); // builds CurrentStateWriter.BeginObject("localFolderState"sv); { SaveFolderContentToCompactBinary(LocalFolderState, CurrentStateWriter); } CurrentStateWriter.EndObject(); // localFolderState return CurrentStateWriter.Save(); } class BufferedOpenFile { public: BufferedOpenFile(const std::filesystem::path Path) : Source(Path, BasicFile::Mode::kRead), SourceSize(Source.FileSize()) {} BufferedOpenFile() = delete; BufferedOpenFile(const BufferedOpenFile&) = delete; BufferedOpenFile(BufferedOpenFile&&) = delete; BufferedOpenFile& operator=(BufferedOpenFile&&) = delete; BufferedOpenFile& operator=(const BufferedOpenFile&) = delete; const uint64_t BlockSize = 256u * 1024u; CompositeBuffer GetRange(uint64_t Offset, uint64_t Size) { ZEN_TRACE_CPU("BufferedOpenFile::GetRange"); ZEN_ASSERT((CacheBlockIndex == (uint64_t)-1) || Cache); auto _ = MakeGuard([&]() { ZEN_ASSERT((CacheBlockIndex == (uint64_t)-1) || Cache); }); ZEN_ASSERT((Offset + Size) <= SourceSize); const uint64_t BlockIndexStart = Offset / BlockSize; const uint64_t BlockIndexEnd = (Offset + Size - 1) / BlockSize; eastl::vector BufferRanges; BufferRanges.reserve(BlockIndexEnd - BlockIndexStart + 1); uint64_t ReadOffset = Offset; for (uint64_t BlockIndex = BlockIndexStart; BlockIndex <= BlockIndexEnd; BlockIndex++) { const uint64_t BlockStartOffset = BlockIndex * BlockSize; if (CacheBlockIndex != BlockIndex) { uint64_t CacheSize = Min(BlockSize, SourceSize - BlockStartOffset); ZEN_ASSERT(CacheSize > 0); Cache = IoBuffer(CacheSize); Source.Read(Cache.GetMutableView().GetData(), CacheSize, BlockStartOffset); CacheBlockIndex = BlockIndex; } const uint64_t BytesRead = ReadOffset - Offset; ZEN_ASSERT(BlockStartOffset <= ReadOffset); const uint64_t OffsetIntoBlock = ReadOffset - BlockStartOffset; ZEN_ASSERT(OffsetIntoBlock < Cache.GetSize()); const uint64_t BlockBytes = Min(Cache.GetSize() - OffsetIntoBlock, Size - BytesRead); BufferRanges.emplace_back(SharedBuffer(IoBuffer(Cache, OffsetIntoBlock, BlockBytes))); ReadOffset += BlockBytes; } CompositeBuffer Result(std::move(BufferRanges)); ZEN_ASSERT(Result.GetSize() == Size); return Result; } private: BasicFile Source; const uint64_t SourceSize; uint64_t CacheBlockIndex = (uint64_t)-1; IoBuffer Cache; }; class ReadFileCache { public: // A buffered file reader that provides CompositeBuffer where the buffers are owned and the memory never overwritten ReadFileCache(DiskStatistics& DiskStats, const std::filesystem::path& Path, const ChunkedFolderContent& LocalContent, const ChunkedContentLookup& LocalLookup, size_t MaxOpenFileCount) : m_Path(Path) , m_LocalContent(LocalContent) , m_LocalLookup(LocalLookup) , m_DiskStats(DiskStats) { m_OpenFiles.reserve(MaxOpenFileCount); } ~ReadFileCache() { m_DiskStats.CurrentOpenFileCount -= m_OpenFiles.size(); m_OpenFiles.clear(); } CompositeBuffer GetRange(uint32_t SequenceIndex, uint64_t Offset, uint64_t Size) { ZEN_TRACE_CPU("ReadFileCache::GetRange"); auto CacheIt = std::find_if(m_OpenFiles.begin(), m_OpenFiles.end(), [SequenceIndex](const auto& Lhs) { return Lhs.first == SequenceIndex; }); if (CacheIt != m_OpenFiles.end()) { if (CacheIt != m_OpenFiles.begin()) { auto CachedFile(std::move(CacheIt->second)); m_OpenFiles.erase(CacheIt); m_OpenFiles.insert(m_OpenFiles.begin(), std::make_pair(SequenceIndex, std::move(CachedFile))); } CompositeBuffer Result = m_OpenFiles.front().second->GetRange(Offset, Size); m_DiskStats.ReadByteCount += Result.GetSize(); return Result; } const uint32_t LocalPathIndex = m_LocalLookup.SequenceIndexFirstPathIndex[SequenceIndex]; const std::filesystem::path LocalFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred(); if (Size == m_LocalContent.RawSizes[LocalPathIndex]) { IoBuffer Result = IoBufferBuilder::MakeFromFile(LocalFilePath); m_DiskStats.OpenReadCount++; m_DiskStats.ReadByteCount += Result.GetSize(); return CompositeBuffer(SharedBuffer(Result)); } if (m_OpenFiles.size() == m_OpenFiles.capacity()) { m_OpenFiles.pop_back(); m_DiskStats.CurrentOpenFileCount--; } m_OpenFiles.insert(m_OpenFiles.begin(), std::make_pair(SequenceIndex, std::make_unique(LocalFilePath))); CompositeBuffer Result = m_OpenFiles.front().second->GetRange(Offset, Size); m_DiskStats.ReadByteCount += Result.GetSize(); m_DiskStats.OpenReadCount++; m_DiskStats.CurrentOpenFileCount++; return Result; } private: const std::filesystem::path m_Path; const ChunkedFolderContent& m_LocalContent; const ChunkedContentLookup& m_LocalLookup; eastl::vector>> m_OpenFiles; DiskStatistics& m_DiskStats; }; CompositeBuffer ValidateBlob(IoBuffer&& Payload, const IoHash& BlobHash, uint64_t& OutCompressedSize, uint64_t& OutDecompressedSize) { ZEN_TRACE_CPU("ValidateBlob"); if (Payload.GetContentType() != ZenContentType::kCompressedBinary) { throw std::runtime_error(fmt::format("Blob {} ({} bytes) has unexpected content type '{}'", BlobHash, Payload.GetSize(), ToString(Payload.GetContentType()))); } IoHash RawHash; uint64_t RawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, RawSize); if (!Compressed) { throw std::runtime_error(fmt::format("Blob {} ({} bytes) compressed header is invalid", BlobHash, Payload.GetSize())); } if (RawHash != BlobHash) { throw std::runtime_error( fmt::format("Blob {} ({} bytes) compressed header has a mismatching raw hash {}", BlobHash, Payload.GetSize(), RawHash)); } IoHashStream Hash; bool CouldDecompress = Compressed.DecompressToStream(0, RawSize, [&Hash](uint64_t, const CompositeBuffer& RangeBuffer) { for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) { Hash.Append(Segment.GetView()); } }); if (!CouldDecompress) { throw std::runtime_error( fmt::format("Blob {} ({} bytes) failed to decompress - header information mismatch", BlobHash, Payload.GetSize())); } IoHash ValidateRawHash = Hash.GetHash(); if (ValidateRawHash != BlobHash) { throw std::runtime_error(fmt::format("Blob {} ({} bytes) decompressed hash {} does not match header information", BlobHash, Payload.GetSize(), ValidateRawHash)); } OodleCompressor Compressor; OodleCompressionLevel CompressionLevel; uint64_t BlockSize; if (!Compressed.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) { throw std::runtime_error(fmt::format("Blob {} ({} bytes) failed to get compression details", BlobHash, Payload.GetSize())); } OutCompressedSize = Payload.GetSize(); OutDecompressedSize = RawSize; if (CompressionLevel == OodleCompressionLevel::None) { // Only decompress to composite if we need it for block verification CompositeBuffer DecompressedComposite = Compressed.DecompressToComposite(); if (!DecompressedComposite) { throw std::runtime_error(fmt::format("Blob {} ({} bytes) failed to decompress to composite", BlobHash, Payload.GetSize())); } return DecompressedComposite; } return CompositeBuffer{}; } CompositeBuffer ValidateBlob(BuildStorage& Storage, const Oid& BuildId, const IoHash& BlobHash, uint64_t& OutCompressedSize, uint64_t& OutDecompressedSize) { IoBuffer Payload = Storage.GetBuildBlob(BuildId, BlobHash); if (!Payload) { throw std::runtime_error(fmt::format("Blob {} could not be found", BlobHash)); } return ValidateBlob(std::move(Payload), BlobHash, OutCompressedSize, OutDecompressedSize); } ChunkBlockDescription ValidateChunkBlock(IoBuffer&& Payload, const IoHash& BlobHash, uint64_t& OutCompressedSize, uint64_t& OutDecompressedSize) { CompositeBuffer BlockBuffer = ValidateBlob(std::move(Payload), BlobHash, OutCompressedSize, OutDecompressedSize); if (!BlockBuffer) { throw std::runtime_error(fmt::format("Chunk block blob {} is not compressed using 'None' compression level", BlobHash)); } return GetChunkBlockDescription(BlockBuffer.Flatten(), BlobHash); } CompositeBuffer FetchChunk(const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, const IoHash& ChunkHash, ReadFileCache& OpenFileCache) { auto It = Lookup.ChunkHashToChunkIndex.find(ChunkHash); ZEN_ASSERT(It != Lookup.ChunkHashToChunkIndex.end()); uint32_t ChunkIndex = It->second; eastl::span ChunkLocations = GetChunkSequenceLocations(Lookup, ChunkIndex); ZEN_ASSERT(!ChunkLocations.empty()); CompositeBuffer Chunk = OpenFileCache.GetRange(ChunkLocations[0].SequenceIndex, ChunkLocations[0].Offset, Content.ChunkedContent.ChunkRawSizes[ChunkIndex]); ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == ChunkHash); return Chunk; }; CompressedBuffer GenerateBlock(const std::filesystem::path& Path, const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, const eastl::vector& ChunksInBlock, ChunkBlockDescription& OutBlockDescription, DiskStatistics& DiskStats) { ReadFileCache OpenFileCache(DiskStats, Path, Content, Lookup, 4); eastl::vector> BlockContent; BlockContent.reserve(ChunksInBlock.size()); for (uint32_t ChunkIndex : ChunksInBlock) { BlockContent.emplace_back(std::make_pair( Content.ChunkedContent.ChunkHashes[ChunkIndex], [&Content, &Lookup, &OpenFileCache, ChunkIndex](const IoHash& ChunkHash) -> std::pair { CompositeBuffer Chunk = FetchChunk(Content, Lookup, ChunkHash, OpenFileCache); if (!Chunk) { ZEN_ASSERT(false); } uint64_t RawSize = Chunk.GetSize(); return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid, OodleCompressionLevel::None)}; })); } return GenerateChunkBlock(std::move(BlockContent), OutBlockDescription); }; void ValidateBuildPart(BuildStorage& Storage, const Oid& BuildId, Oid BuildPartId, const std::string_view BuildPartName) { Stopwatch Timer; auto _ = MakeGuard([&]() { ZEN_CONSOLE("Validated build part {}/{} ('{}') in {}", BuildId, BuildPartId, BuildPartName, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }); CbObject Build = Storage.GetBuild(BuildId); if (!BuildPartName.empty()) { BuildPartId = Build["parts"sv].AsObjectView()[BuildPartName].AsObjectId(); if (BuildPartId == Oid::Zero) { throw std::runtime_error(fmt::format("Build {} does not have a part named '{}'", BuildId, BuildPartName)); } } CbObject BuildPart = Storage.GetBuildPart(BuildId, BuildPartId); ZEN_CONSOLE("Validating build part {}/{} ({})", BuildId, BuildPartId, NiceBytes(BuildPart.GetSize())); eastl::vector ChunkAttachments; for (CbFieldView LooseFileView : BuildPart["chunkAttachments"sv].AsObjectView()["rawHashes"sv]) { ChunkAttachments.push_back(LooseFileView.AsBinaryAttachment()); } eastl::vector BlockAttachments; for (CbFieldView BlocksView : BuildPart["blockAttachments"sv].AsObjectView()["rawHashes"sv]) { BlockAttachments.push_back(BlocksView.AsBinaryAttachment()); } eastl::vector VerifyBlockDescriptions = Storage.GetBlockMetadata(BuildId, BlockAttachments); if (VerifyBlockDescriptions.size() != BlockAttachments.size()) { throw std::runtime_error(fmt::format("Uploaded blocks metadata could not all be found, {} blocks metadata is missing", BlockAttachments.size() - VerifyBlockDescriptions.size())); } WorkerThreadPool& NetworkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // WorkerThreadPool& VerifyPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // ParallellWork Work(AbortFlag); ProgressBar ProgressBar(UsePlainProgress); uint64_t AttachmentsToVerifyCount = ChunkAttachments.size() + BlockAttachments.size(); std::atomic DownloadedAttachmentCount = 0; std::atomic VerifiedAttachmentCount = 0; std::atomic DownloadedByteCount = 0; std::atomic VerifiedByteCount = 0; FilteredRate FilteredDownloadedBytesPerSecond; FilteredRate FilteredVerifiedBytesPerSecond; for (const IoHash& ChunkAttachment : ChunkAttachments) { Work.ScheduleWork( NetworkPool, [&, ChunkAttachment](std::atomic&) { if (!AbortFlag) { FilteredDownloadedBytesPerSecond.Start(); IoBuffer Payload = Storage.GetBuildBlob(BuildId, ChunkAttachment); DownloadedAttachmentCount++; DownloadedByteCount += Payload.GetSize(); if (DownloadedAttachmentCount.load() == AttachmentsToVerifyCount) { FilteredDownloadedBytesPerSecond.Stop(); } if (!Payload) { throw std::runtime_error(fmt::format("Chunk attachment {} could not be found", ChunkAttachment)); } if (!AbortFlag) { Work.ScheduleWork( VerifyPool, [&, Payload = std::move(Payload), ChunkAttachment](std::atomic&) { if (!AbortFlag) { FilteredVerifiedBytesPerSecond.Start(); uint64_t CompressedSize; uint64_t DecompressedSize; ValidateBlob(IoBuffer(Payload), ChunkAttachment, CompressedSize, DecompressedSize); ZEN_CONSOLE_VERBOSE("Chunk attachment {} ({} -> {}) is valid", ChunkAttachment, NiceBytes(CompressedSize), NiceBytes(DecompressedSize)); VerifiedAttachmentCount++; VerifiedByteCount += DecompressedSize; if (VerifiedAttachmentCount.load() == AttachmentsToVerifyCount) { FilteredVerifiedBytesPerSecond.Stop(); } } }, Work.DefaultErrorFunction()); } } }, Work.DefaultErrorFunction()); } for (const IoHash& BlockAttachment : BlockAttachments) { Work.ScheduleWork( NetworkPool, [&, BlockAttachment](std::atomic&) { if (!AbortFlag) { FilteredDownloadedBytesPerSecond.Start(); IoBuffer Payload = Storage.GetBuildBlob(BuildId, BlockAttachment); DownloadedAttachmentCount++; DownloadedByteCount += Payload.GetSize(); if (DownloadedAttachmentCount.load() == AttachmentsToVerifyCount) { FilteredDownloadedBytesPerSecond.Stop(); } if (!Payload) { throw std::runtime_error(fmt::format("Block attachment {} could not be found", BlockAttachment)); } if (!AbortFlag) { Work.ScheduleWork( VerifyPool, [&, Payload = std::move(Payload), BlockAttachment](std::atomic&) { if (!AbortFlag) { FilteredVerifiedBytesPerSecond.Start(); uint64_t CompressedSize; uint64_t DecompressedSize; ValidateChunkBlock(IoBuffer(Payload), BlockAttachment, CompressedSize, DecompressedSize); ZEN_CONSOLE_VERBOSE("Chunk block {} ({} -> {}) is valid", BlockAttachment, NiceBytes(CompressedSize), NiceBytes(DecompressedSize)); VerifiedAttachmentCount++; VerifiedByteCount += DecompressedSize; if (VerifiedAttachmentCount.load() == AttachmentsToVerifyCount) { FilteredVerifiedBytesPerSecond.Stop(); } } }, Work.DefaultErrorFunction()); } } }, Work.DefaultErrorFunction()); } Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted, PendingWork); FilteredDownloadedBytesPerSecond.Update(DownloadedByteCount); FilteredVerifiedBytesPerSecond.Update(VerifiedByteCount); std::string Details = fmt::format("Downloaded {}/{} ({}, {}bits/s). Verified {}/{} ({}, {}B/s)", DownloadedAttachmentCount.load(), AttachmentsToVerifyCount, NiceBytes(DownloadedByteCount.load()), NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8), VerifiedAttachmentCount.load(), AttachmentsToVerifyCount, NiceBytes(VerifiedByteCount.load()), NiceNum(FilteredVerifiedBytesPerSecond.GetCurrent())); ProgressBar.UpdateState( {.Task = "Validating blobs ", .Details = Details, .TotalCount = gsl::narrow(AttachmentsToVerifyCount * 2), .RemainingCount = gsl::narrow(AttachmentsToVerifyCount * 2 - (DownloadedAttachmentCount.load() + VerifiedAttachmentCount.load()))}, false); }); ProgressBar.Finish(); } void ArrangeChunksIntoBlocks(const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, uint64_t MaxBlockSize, eastl::vector& ChunkIndexes, eastl::vector>& OutBlocks) { std::sort(ChunkIndexes.begin(), ChunkIndexes.end(), [&Content, &Lookup](uint32_t Lhs, uint32_t Rhs) { const ChunkedContentLookup::ChunkSequenceLocation& LhsLocation = GetChunkSequenceLocations(Lookup, Lhs)[0]; const ChunkedContentLookup::ChunkSequenceLocation& RhsLocation = GetChunkSequenceLocations(Lookup, Rhs)[0]; if (LhsLocation.SequenceIndex < RhsLocation.SequenceIndex) { return true; } else if (LhsLocation.SequenceIndex > RhsLocation.SequenceIndex) { return false; } return LhsLocation.Offset < RhsLocation.Offset; }); uint64_t MaxBlockSizeLowThreshold = MaxBlockSize - (MaxBlockSize / 16); uint64_t BlockSize = 0; uint32_t ChunkIndexStart = 0; for (uint32_t ChunkIndexOffset = 0; ChunkIndexOffset < ChunkIndexes.size();) { const uint32_t ChunkIndex = ChunkIndexes[ChunkIndexOffset]; const uint64_t ChunkSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; if ((BlockSize + ChunkSize) > MaxBlockSize) { // Within the span of MaxBlockSizeLowThreshold and MaxBlockSize, see if there is a break // between source paths for chunks. Break the block at the last such break if any. ZEN_ASSERT(ChunkIndexOffset > ChunkIndexStart); const uint32_t ChunkSequenceIndex = Lookup.ChunkSequenceLocations[Lookup.ChunkSequenceLocationOffset[ChunkIndex]].SequenceIndex; uint64_t ScanBlockSize = BlockSize; uint32_t ScanChunkIndexOffset = ChunkIndexOffset - 1; while (ScanChunkIndexOffset > (ChunkIndexStart + 2)) { const uint32_t TestChunkIndex = ChunkIndexes[ScanChunkIndexOffset]; const uint64_t TestChunkSize = Content.ChunkedContent.ChunkRawSizes[TestChunkIndex]; if ((ScanBlockSize - TestChunkSize) < MaxBlockSizeLowThreshold) { break; } const uint32_t TestSequenceIndex = Lookup.ChunkSequenceLocations[Lookup.ChunkSequenceLocationOffset[TestChunkIndex]].SequenceIndex; if (ChunkSequenceIndex != TestSequenceIndex) { ChunkIndexOffset = ScanChunkIndexOffset + 1; break; } ScanBlockSize -= TestChunkSize; ScanChunkIndexOffset--; } eastl::vector ChunksInBlock; ChunksInBlock.reserve(ChunkIndexOffset - ChunkIndexStart); for (uint32_t AddIndexOffset = ChunkIndexStart; AddIndexOffset < ChunkIndexOffset; AddIndexOffset++) { const uint32_t AddChunkIndex = ChunkIndexes[AddIndexOffset]; ChunksInBlock.push_back(AddChunkIndex); } OutBlocks.emplace_back(std::move(ChunksInBlock)); BlockSize = 0; ChunkIndexStart = ChunkIndexOffset; } else { ChunkIndexOffset++; BlockSize += ChunkSize; } } if (ChunkIndexStart < ChunkIndexes.size()) { eastl::vector ChunksInBlock; ChunksInBlock.reserve(ChunkIndexes.size() - ChunkIndexStart); for (uint32_t AddIndexOffset = ChunkIndexStart; AddIndexOffset < ChunkIndexes.size(); AddIndexOffset++) { const uint32_t AddChunkIndex = ChunkIndexes[AddIndexOffset]; ChunksInBlock.push_back(AddChunkIndex); } OutBlocks.emplace_back(std::move(ChunksInBlock)); } } CompositeBuffer CompressChunk(const std::filesystem::path& Path, const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, uint32_t ChunkIndex, const std::filesystem::path& TempFolderPath) { ZEN_ASSERT(!TempFolderPath.empty()); const IoHash& ChunkHash = Content.ChunkedContent.ChunkHashes[ChunkIndex]; const uint64_t ChunkSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; const ChunkedContentLookup::ChunkSequenceLocation& Source = GetChunkSequenceLocations(Lookup, ChunkIndex)[0]; const std::uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[Source.SequenceIndex]; IoBuffer RawSource = IoBufferBuilder::MakeFromFile((Path / Content.Paths[PathIndex]).make_preferred(), Source.Offset, ChunkSize); if (!RawSource) { throw std::runtime_error(fmt::format("Failed fetching chunk {}", ChunkHash)); } if (RawSource.GetSize() != ChunkSize) { throw std::runtime_error(fmt::format("Fetched chunk {} has invalid size", ChunkHash)); } ZEN_ASSERT_SLOW(IoHash::HashBuffer(RawSource) == ChunkHash); { std::filesystem::path TempFilePath = (TempFolderPath / ChunkHash.ToHexString()).make_preferred(); BasicFile CompressedFile; std::error_code Ec; CompressedFile.Open(TempFilePath, BasicFile::Mode::kTruncate, Ec); if (Ec) { throw std::runtime_error( fmt::format("Failed creating temporary file for compressing blob {}. Reason: {}", ChunkHash, Ec.message())); } bool CouldCompress = CompressedBuffer::CompressToStream( CompositeBuffer(SharedBuffer(RawSource)), [&](uint64_t Offset, const CompositeBuffer& RangeBuffer) { CompressedFile.Write(RangeBuffer, Offset); }); if (CouldCompress) { uint64_t CompressedSize = CompressedFile.FileSize(); void* FileHandle = CompressedFile.Detach(); IoBuffer TempPayload = IoBuffer(IoBuffer::File, FileHandle, 0, CompressedSize, /*IsWholeFile*/ true); ZEN_ASSERT(TempPayload); TempPayload.SetDeleteOnClose(true); IoHash RawHash; uint64_t RawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(TempPayload), RawHash, RawSize); ZEN_ASSERT(Compressed); ZEN_ASSERT(RawHash == ChunkHash); ZEN_ASSERT(RawSize == ChunkSize); return Compressed.GetCompressed(); } CompressedFile.Close(); std::filesystem::remove(TempFilePath, Ec); ZEN_UNUSED(Ec); } // Try regular compress - decompress may fail if compressed data is larger than non-compressed CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(RawSource))); if (!CompressedBlob) { throw std::runtime_error(fmt::format("Failed to compress large blob {}", ChunkHash)); } CompositeBuffer TempPayload = WriteToTempFileIfNeeded(CompressedBlob.GetCompressed(), TempFolderPath, ChunkHash); return CompressedBuffer::FromCompressedNoValidate(std::move(TempPayload)).GetCompressed(); } struct GeneratedBlocks { eastl::vector BlockDescriptions; eastl::vector BlockSizes; eastl::vector BlockBuffers; eastl::vector BlockMetaDatas; eastl::vector MetaDataHasBeenUploaded; tsl::robin_map BlockHashToBlockIndex; }; void GenerateBuildBlocks(const std::filesystem::path& Path, const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, BuildStorage& Storage, const Oid& BuildId, const eastl::vector>& NewBlockChunks, GeneratedBlocks& OutBlocks, DiskStatistics& DiskStats, UploadStatistics& UploadStats, GenerateBlocksStatistics& GenerateBlocksStats) { const std::size_t NewBlockCount = NewBlockChunks.size(); if (NewBlockCount > 0) { ProgressBar ProgressBar(UsePlainProgress); OutBlocks.BlockDescriptions.resize(NewBlockCount); OutBlocks.BlockSizes.resize(NewBlockCount); OutBlocks.BlockBuffers.resize(NewBlockCount); OutBlocks.BlockMetaDatas.resize(NewBlockCount); OutBlocks.MetaDataHasBeenUploaded.resize(NewBlockCount, false); OutBlocks.BlockHashToBlockIndex.reserve(NewBlockCount); RwLock Lock; WorkerThreadPool& GenerateBlobsPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool();// WorkerThreadPool& UploadBlocksPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool();// FilteredRate FilteredGeneratedBytesPerSecond; FilteredRate FilteredUploadedBytesPerSecond; ParallellWork Work(AbortFlag); std::atomic PendingUploadCount(0); for (size_t BlockIndex = 0; BlockIndex < NewBlockCount; BlockIndex++) { if (Work.IsAborted()) { break; } const eastl::vector& ChunksInBlock = NewBlockChunks[BlockIndex]; Work.ScheduleWork( GenerateBlobsPool, [&, BlockIndex](std::atomic&) { if (!AbortFlag) { FilteredGeneratedBytesPerSecond.Start(); // TODO: Convert ScheduleWork body to function CompressedBuffer CompressedBlock = GenerateBlock(Path, Content, Lookup, ChunksInBlock, OutBlocks.BlockDescriptions[BlockIndex], DiskStats); ZEN_CONSOLE_VERBOSE("Generated block {} ({}) containing {} chunks", OutBlocks.BlockDescriptions[BlockIndex].BlockHash, NiceBytes(CompressedBlock.GetCompressedSize()), OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); OutBlocks.BlockSizes[BlockIndex] = CompressedBlock.GetCompressedSize(); CompositeBuffer Payload = WriteToTempFileIfNeeded(CompressedBlock.GetCompressed(), Path / ZenTempBlockFolderName, OutBlocks.BlockDescriptions[BlockIndex].BlockHash); { CbObjectWriter Writer; Writer.AddString("createdBy", "zen"); OutBlocks.BlockMetaDatas[BlockIndex] = Writer.Save(); } GenerateBlocksStats.GeneratedBlockByteCount += OutBlocks.BlockSizes[BlockIndex]; GenerateBlocksStats.GeneratedBlockCount++; Lock.WithExclusiveLock([&]() { OutBlocks.BlockHashToBlockIndex.insert_or_assign(OutBlocks.BlockDescriptions[BlockIndex].BlockHash, BlockIndex); }); if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount) { FilteredGeneratedBytesPerSecond.Stop(); } if (!AbortFlag) { PendingUploadCount++; Work.ScheduleWork( UploadBlocksPool, [&, BlockIndex, Payload = std::move(Payload)](std::atomic&) { if (!AbortFlag) { if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount) { FilteredUploadedBytesPerSecond.Stop(); OutBlocks.BlockBuffers[BlockIndex] = std::move(Payload); } else { FilteredUploadedBytesPerSecond.Start(); // TODO: Convert ScheduleWork body to function PendingUploadCount--; const CbObject BlockMetaData = BuildChunkBlockDescription(OutBlocks.BlockDescriptions[BlockIndex], OutBlocks.BlockMetaDatas[BlockIndex]); const IoHash& BlockHash = OutBlocks.BlockDescriptions[BlockIndex].BlockHash; Storage.PutBuildBlob(BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload); UploadStats.BlocksBytes += Payload.GetSize(); ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks", OutBlocks.BlockDescriptions[BlockIndex].BlockHash, NiceBytes(Payload.GetSize()), OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); Storage.PutBlockMetadata(BuildId, OutBlocks.BlockDescriptions[BlockIndex].BlockHash, BlockMetaData); ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})", OutBlocks.BlockDescriptions[BlockIndex].BlockHash, NiceBytes(BlockMetaData.GetSize())); OutBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; UploadStats.BlocksBytes += BlockMetaData.GetSize(); UploadStats.BlockCount++; if (UploadStats.BlockCount == NewBlockCount) { FilteredUploadedBytesPerSecond.Stop(); } } } }, Work.DefaultErrorFunction()); } } }, Work.DefaultErrorFunction()); } Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted, PendingWork); FilteredGeneratedBytesPerSecond.Update(GenerateBlocksStats.GeneratedBlockByteCount.load()); FilteredUploadedBytesPerSecond.Update(UploadStats.BlocksBytes.load()); std::string Details = fmt::format("Generated {}/{} ({}, {}B/s). Uploaded {}/{} ({}, {}bits/s)", GenerateBlocksStats.GeneratedBlockCount.load(), NewBlockCount, NiceBytes(GenerateBlocksStats.GeneratedBlockByteCount.load()), NiceNum(FilteredGeneratedBytesPerSecond.GetCurrent()), UploadStats.BlockCount.load(), NewBlockCount, NiceBytes(UploadStats.BlocksBytes.load()), NiceNum(FilteredUploadedBytesPerSecond.GetCurrent() * 8)); ProgressBar.UpdateState( {.Task = "Generating blocks", .Details = Details, .TotalCount = gsl::narrow(NewBlockCount), .RemainingCount = gsl::narrow(NewBlockCount - GenerateBlocksStats.GeneratedBlockCount.load())}, false); }); ProgressBar.Finish(); GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGeneratedBytesPerSecond.GetElapsedTime(); UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTime(); } } void UploadPartBlobs(BuildStorage& Storage, const Oid& BuildId, const std::filesystem::path& Path, const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, eastl::span RawHashes, const eastl::vector>& NewBlockChunks, GeneratedBlocks& NewBlocks, eastl::span LooseChunkIndexes, const std::uint64_t LargeAttachmentSize, DiskStatistics& DiskStats, UploadStatistics& UploadStats, GenerateBlocksStatistics& GenerateBlocksStats, LooseChunksStatistics& LooseChunksStats) { { ProgressBar ProgressBar(UsePlainProgress); WorkerThreadPool& ReadChunkPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // WorkerThreadPool& UploadChunkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // FilteredRate FilteredGenerateBlockBytesPerSecond; FilteredRate FilteredCompressedBytesPerSecond; FilteredRate FilteredUploadedBytesPerSecond; ParallellWork Work(AbortFlag); std::atomic UploadedBlockSize = 0; std::atomic UploadedBlockCount = 0; std::atomic UploadedRawChunkSize = 0; std::atomic UploadedCompressedChunkSize = 0; std::atomic UploadedChunkCount = 0; tsl::robin_map ChunkIndexToLooseChunkOrderIndex; ChunkIndexToLooseChunkOrderIndex.reserve(LooseChunkIndexes.size()); for (uint32_t OrderIndex = 0; OrderIndex < LooseChunkIndexes.size(); OrderIndex++) { ChunkIndexToLooseChunkOrderIndex.insert_or_assign(LooseChunkIndexes[OrderIndex], OrderIndex); } eastl::vector BlockIndexes; eastl::vector LooseChunkOrderIndexes; uint64_t TotalLooseChunksSize = 0; uint64_t TotalBlocksSize = 0; for (const IoHash& RawHash : RawHashes) { if (auto It = NewBlocks.BlockHashToBlockIndex.find(RawHash); It != NewBlocks.BlockHashToBlockIndex.end()) { BlockIndexes.push_back(It->second); TotalBlocksSize += NewBlocks.BlockSizes[It->second]; } if (auto ChunkIndexIt = Lookup.ChunkHashToChunkIndex.find(RawHash); ChunkIndexIt != Lookup.ChunkHashToChunkIndex.end()) { const uint32_t ChunkIndex = ChunkIndexIt->second; if (auto LooseOrderIndexIt = ChunkIndexToLooseChunkOrderIndex.find(ChunkIndex); LooseOrderIndexIt != ChunkIndexToLooseChunkOrderIndex.end()) { LooseChunkOrderIndexes.push_back(LooseOrderIndexIt->second); TotalLooseChunksSize += Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; } } } uint64_t TotalRawSize = TotalLooseChunksSize + TotalBlocksSize; const size_t UploadBlockCount = BlockIndexes.size(); const uint32_t UploadChunkCount = gsl::narrow(LooseChunkOrderIndexes.size()); auto AsyncUploadBlock = [&](const size_t BlockIndex, const IoHash BlockHash, CompositeBuffer&& Payload) { Work.ScheduleWork( UploadChunkPool, [&, BlockIndex, BlockHash, Payload = std::move(Payload)](std::atomic&) { if (!AbortFlag) { FilteredUploadedBytesPerSecond.Start(); const CbObject BlockMetaData = BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]); Storage.PutBuildBlob(BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload); ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks", NewBlocks.BlockDescriptions[BlockIndex].BlockHash, NiceBytes(Payload.GetSize()), NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); UploadedBlockSize += Payload.GetSize(); UploadStats.BlocksBytes += Payload.GetSize(); Storage.PutBlockMetadata(BuildId, BlockHash, BlockMetaData); ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})", NewBlocks.BlockDescriptions[BlockIndex].BlockHash, NiceBytes(BlockMetaData.GetSize())); NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; UploadStats.BlockCount++; UploadStats.BlocksBytes += BlockMetaData.GetSize(); UploadedBlockCount++; if (UploadedBlockCount == UploadBlockCount && UploadedChunkCount == UploadChunkCount) { FilteredUploadedBytesPerSecond.Stop(); } } }, Work.DefaultErrorFunction()); }; auto AsyncUploadLooseChunk = [&](const IoHash& RawHash, uint64_t RawSize, CompositeBuffer&& Payload) { Work.ScheduleWork( UploadChunkPool, [&, RawHash, RawSize, Payload = CompositeBuffer(std::move(Payload))](std::atomic&) { if (!AbortFlag) { const uint64_t PayloadSize = Payload.GetSize(); if (PayloadSize >= LargeAttachmentSize) { UploadStats.MultipartAttachmentCount++; eastl::vector> MultipartWork = Storage.PutLargeBuildBlob( BuildId, RawHash, ZenContentType::kCompressedBinary, PayloadSize, [Payload = std::move(Payload), &FilteredUploadedBytesPerSecond](uint64_t Offset, uint64_t Size) -> IoBuffer { FilteredUploadedBytesPerSecond.Start(); IoBuffer PartPayload = Payload.Mid(Offset, Size).Flatten().AsIoBuffer(); PartPayload.SetContentType(ZenContentType::kBinary); return PartPayload; }, [&, RawSize](uint64_t SentBytes, bool IsComplete) { UploadStats.ChunksBytes += SentBytes; UploadedCompressedChunkSize += SentBytes; if (IsComplete) { UploadStats.ChunkCount++; UploadedChunkCount++; if (UploadedBlockCount == UploadBlockCount && UploadedChunkCount == UploadChunkCount) { FilteredUploadedBytesPerSecond.Stop(); } UploadedRawChunkSize += RawSize; } }); for (auto& WorkPart : MultipartWork) { Work.ScheduleWork( UploadChunkPool, [Work = std::move(WorkPart)](std::atomic&) { if (!AbortFlag) { Work(); } }, Work.DefaultErrorFunction()); } ZEN_CONSOLE_VERBOSE("Uploaded multipart chunk {} ({})", RawHash, NiceBytes(PayloadSize)); } else { Storage.PutBuildBlob(BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); ZEN_CONSOLE_VERBOSE("Uploaded chunk {} ({})", RawHash, NiceBytes(PayloadSize)); UploadStats.ChunksBytes += Payload.GetSize(); UploadStats.ChunkCount++; UploadedCompressedChunkSize += Payload.GetSize(); UploadedRawChunkSize += RawSize; UploadedChunkCount++; if (UploadedChunkCount == UploadChunkCount) { FilteredUploadedBytesPerSecond.Stop(); } } } }, Work.DefaultErrorFunction()); }; eastl::vector GenerateBlockIndexes; std::atomic GeneratedBlockCount = 0; std::atomic GeneratedBlockByteCount = 0; // Start upload of any pre-built blocks for (const size_t BlockIndex : BlockIndexes) { if (CompositeBuffer BlockPayload = std::move(NewBlocks.BlockBuffers[BlockIndex]); BlockPayload) { const IoHash& BlockHash = NewBlocks.BlockDescriptions[BlockIndex].BlockHash; if (!AbortFlag) { AsyncUploadBlock(BlockIndex, BlockHash, std::move(BlockPayload)); } // GeneratedBlockCount++; } else { GenerateBlockIndexes.push_back(BlockIndex); } } eastl::vector CompressLooseChunkOrderIndexes; // Start upload of any pre-compressed loose chunks for (const uint32_t LooseChunkOrderIndex : LooseChunkOrderIndexes) { CompressLooseChunkOrderIndexes.push_back(LooseChunkOrderIndex); } // Start generation of any non-prebuilt blocks and schedule upload for (const size_t BlockIndex : GenerateBlockIndexes) { const IoHash& BlockHash = NewBlocks.BlockDescriptions[BlockIndex].BlockHash; if (!AbortFlag) { Work.ScheduleWork( ReadChunkPool, [&, BlockIndex](std::atomic&) { if (!AbortFlag) { FilteredGenerateBlockBytesPerSecond.Start(); ChunkBlockDescription BlockDescription; CompressedBuffer CompressedBlock = GenerateBlock(Path, Content, Lookup, NewBlockChunks[BlockIndex], BlockDescription, DiskStats); if (!CompressedBlock) { throw std::runtime_error(fmt::format("Failed generating block {}", BlockHash)); } ZEN_ASSERT(BlockDescription.BlockHash == BlockHash); CompositeBuffer Payload = WriteToTempFileIfNeeded(CompressedBlock.GetCompressed(), Path / ZenTempBlockFolderName, BlockDescription.BlockHash); GenerateBlocksStats.GeneratedBlockByteCount += NewBlocks.BlockSizes[BlockIndex]; GenerateBlocksStats.GeneratedBlockCount++; GeneratedBlockByteCount += NewBlocks.BlockSizes[BlockIndex]; GeneratedBlockCount++; if (GeneratedBlockCount == GenerateBlockIndexes.size()) { FilteredGenerateBlockBytesPerSecond.Stop(); } if (!AbortFlag) { AsyncUploadBlock(BlockIndex, BlockHash, std::move(Payload)); } ZEN_CONSOLE_VERBOSE("Regenerated block {} ({}) containing {} chunks", NewBlocks.BlockDescriptions[BlockIndex].BlockHash, NiceBytes(CompressedBlock.GetCompressedSize()), NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); } }, Work.DefaultErrorFunction()); } } std::atomic CompressedLooseChunkCount = 0; std::atomic CompressedLooseChunkByteCount = 0; std::atomic RawLooseChunkByteCount = 0; // Start compression of any non-precompressed loose chunks and schedule upload for (const uint32_t CompressLooseChunkOrderIndex : CompressLooseChunkOrderIndexes) { const uint32_t ChunkIndex = LooseChunkIndexes[CompressLooseChunkOrderIndex]; Work.ScheduleWork( ReadChunkPool, // GetSyncWorkerPool(),// ReadChunkPool, [&, ChunkIndex](std::atomic&) { if (!AbortFlag) { FilteredCompressedBytesPerSecond.Start(); CompositeBuffer Payload = CompressChunk(Path, Content, Lookup, ChunkIndex, Path / ZenTempChunkFolderName); ZEN_CONSOLE_VERBOSE("Compressed chunk {} ({} -> {})", Content.ChunkedContent.ChunkHashes[ChunkIndex], NiceBytes(Content.ChunkedContent.ChunkRawSizes[ChunkIndex]), NiceBytes(Payload.GetSize())); const uint64_t ChunkRawSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; UploadStats.ReadFromDiskBytes += ChunkRawSize; LooseChunksStats.CompressedChunkBytes += Payload.GetSize(); LooseChunksStats.CompressedChunkCount++; CompressedLooseChunkByteCount += Payload.GetSize(); CompressedLooseChunkCount++; RawLooseChunkByteCount += ChunkRawSize; if (CompressedLooseChunkCount == CompressLooseChunkOrderIndexes.size()) { FilteredCompressedBytesPerSecond.Stop(); } if (!AbortFlag) { AsyncUploadLooseChunk(Content.ChunkedContent.ChunkHashes[ChunkIndex], ChunkRawSize, std::move(Payload)); } } }, Work.DefaultErrorFunction()); } Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted, PendingWork); FilteredCompressedBytesPerSecond.Update(CompressedLooseChunkByteCount.load()); FilteredGenerateBlockBytesPerSecond.Update(GeneratedBlockByteCount.load()); FilteredUploadedBytesPerSecond.Update(UploadedCompressedChunkSize.load() + UploadedBlockSize.load()); uint64_t UploadedRawSize = UploadedRawChunkSize.load() + UploadedBlockSize.load(); uint64_t UploadedCompressedSize = UploadedCompressedChunkSize.load() + UploadedBlockSize.load(); std::string Details = fmt::format( "Compressed {}/{} ({}/{}) chunks. " "Uploaded {}/{} ({}/{}) blobs " "({} {}bits/s)", CompressedLooseChunkCount.load(), CompressLooseChunkOrderIndexes.size(), NiceBytes(RawLooseChunkByteCount), NiceBytes(TotalLooseChunksSize), UploadedBlockCount.load() + UploadedChunkCount.load(), UploadBlockCount + UploadChunkCount, NiceBytes(UploadedRawSize), NiceBytes(TotalRawSize), NiceBytes(UploadedCompressedSize), NiceNum(FilteredUploadedBytesPerSecond.GetCurrent())); ProgressBar.UpdateState({.Task = "Uploading blobs ", .Details = Details, .TotalCount = gsl::narrow(TotalRawSize), .RemainingCount = gsl::narrow(TotalRawSize - UploadedRawSize)}, false); }); ProgressBar.Finish(); UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTime(); GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGenerateBlockBytesPerSecond.GetElapsedTime(); LooseChunksStats.CompressChunksElapsedWallTimeUS = FilteredCompressedBytesPerSecond.GetElapsedTime(); } } eastl::vector FindReuseBlocks(const eastl::vector& KnownBlocks, eastl::span ChunkHashes, eastl::span ChunkIndexes, uint8_t MinPercentLimit, eastl::vector& OutUnusedChunkIndexes, FindBlocksStatistics& FindBlocksStats) { // Find all blocks with a usage level higher than MinPercentLimit // Pick out the blocks with usage higher or equal to MinPercentLimit // Sort them with highest size usage - most usage first // Make a list of all chunks and mark them as not found // For each block, recalculate the block has usage percent based on the chunks marked as not found // If the block still reaches MinPercentLimit, keep it and remove the matching chunks from the not found list // Repeat for following all remaining block that initially matched MinPercentLimit eastl::vector FilteredReuseBlockIndexes; uint32_t ChunkCount = gsl::narrow(ChunkHashes.size()); eastl::vector ChunkFound(ChunkCount, false); if (ChunkCount > 0) { if (!KnownBlocks.empty()) { Stopwatch ReuseTimer; tsl::robin_map ChunkHashToChunkIndex; ChunkHashToChunkIndex.reserve(ChunkIndexes.size()); for (uint32_t ChunkIndex : ChunkIndexes) { ChunkHashToChunkIndex.insert_or_assign(ChunkHashes[ChunkIndex], ChunkIndex); } eastl::vector BlockSizes(KnownBlocks.size(), 0); eastl::vector BlockUseSize(KnownBlocks.size(), 0); eastl::vector ReuseBlockIndexes; for (size_t KnownBlockIndex = 0; KnownBlockIndex < KnownBlocks.size(); KnownBlockIndex++) { const ChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex]; if (KnownBlock.BlockHash != IoHash::Zero && KnownBlock.ChunkRawHashes.size() == KnownBlock.ChunkCompressedLengths.size()) { size_t BlockAttachmentCount = KnownBlock.ChunkRawHashes.size(); if (BlockAttachmentCount == 0) { continue; } size_t ReuseSize = 0; size_t BlockSize = 0; size_t FoundAttachmentCount = 0; size_t BlockChunkCount = KnownBlock.ChunkRawHashes.size(); for (size_t BlockChunkIndex = 0; BlockChunkIndex < BlockChunkCount; BlockChunkIndex++) { const IoHash& BlockChunkHash = KnownBlock.ChunkRawHashes[BlockChunkIndex]; const uint32_t BlockChunkSize = KnownBlock.ChunkCompressedLengths[BlockChunkIndex]; BlockSize += BlockChunkSize; if (ChunkHashToChunkIndex.contains(BlockChunkHash)) { ReuseSize += BlockChunkSize; FoundAttachmentCount++; } } size_t ReusePercent = (ReuseSize * 100) / BlockSize; if (ReusePercent >= MinPercentLimit) { ZEN_CONSOLE_VERBOSE("Reusing block {}. {} attachments found, usage level: {}%", KnownBlock.BlockHash, FoundAttachmentCount, ReusePercent); ReuseBlockIndexes.push_back(KnownBlockIndex); BlockSizes[KnownBlockIndex] = BlockSize; BlockUseSize[KnownBlockIndex] = ReuseSize; } else if (FoundAttachmentCount > 0) { ZEN_CONSOLE_VERBOSE("Skipping block {}. {} attachments found, usage level: {}%", KnownBlock.BlockHash, FoundAttachmentCount, ReusePercent); FindBlocksStats.RejectedBlockCount++; FindBlocksStats.RejectedChunkCount += FoundAttachmentCount; FindBlocksStats.RejectedByteCount += ReuseSize; } } } if (!ReuseBlockIndexes.empty()) { std::sort(ReuseBlockIndexes.begin(), ReuseBlockIndexes.end(), [&](size_t Lhs, size_t Rhs) { return BlockUseSize[Lhs] > BlockUseSize[Rhs]; }); for (size_t KnownBlockIndex : ReuseBlockIndexes) { eastl::vector FoundChunkIndexes; size_t BlockSize = 0; size_t AdjustedReuseSize = 0; const ChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex]; for (size_t BlockChunkIndex = 0; BlockChunkIndex < KnownBlock.ChunkRawHashes.size(); BlockChunkIndex++) { const IoHash& BlockChunkHash = KnownBlock.ChunkRawHashes[BlockChunkIndex]; const uint32_t BlockChunkSize = KnownBlock.ChunkCompressedLengths[BlockChunkIndex]; BlockSize += BlockChunkSize; if (auto It = ChunkHashToChunkIndex.find(BlockChunkHash); It != ChunkHashToChunkIndex.end()) { const uint32_t ChunkIndex = It->second; if (!ChunkFound[ChunkIndex]) { FoundChunkIndexes.push_back(ChunkIndex); AdjustedReuseSize += KnownBlock.ChunkCompressedLengths[BlockChunkIndex]; } } } size_t ReusePercent = (AdjustedReuseSize * 100) / BlockSize; if (ReusePercent >= MinPercentLimit) { ZEN_CONSOLE_VERBOSE("Reusing block {}. {} attachments found, usage level: {}%", KnownBlock.BlockHash, FoundChunkIndexes.size(), ReusePercent); FilteredReuseBlockIndexes.push_back(KnownBlockIndex); for (uint32_t ChunkIndex : FoundChunkIndexes) { ChunkFound[ChunkIndex] = true; } FindBlocksStats.AcceptedChunkCount += FoundChunkIndexes.size(); FindBlocksStats.AcceptedByteCount += AdjustedReuseSize; FindBlocksStats.AcceptedReduntantChunkCount += KnownBlock.ChunkRawHashes.size() - FoundChunkIndexes.size(); FindBlocksStats.AcceptedReduntantByteCount += BlockSize - AdjustedReuseSize; } else { ZEN_CONSOLE_VERBOSE("Skipping block {}. filtered usage level: {}%", KnownBlock.BlockHash, ReusePercent); FindBlocksStats.RejectedBlockCount++; FindBlocksStats.RejectedChunkCount += FoundChunkIndexes.size(); FindBlocksStats.RejectedByteCount += AdjustedReuseSize; } } } } OutUnusedChunkIndexes.reserve(ChunkIndexes.size() - FindBlocksStats.AcceptedChunkCount); for (uint32_t ChunkIndex : ChunkIndexes) { if (!ChunkFound[ChunkIndex]) { OutUnusedChunkIndexes.push_back(ChunkIndex); } } } return FilteredReuseBlockIndexes; }; void UploadFolder(BuildStorage& Storage, const Oid& BuildId, const Oid& BuildPartId, const std::string_view BuildPartName, const std::filesystem::path& Path, const std::filesystem::path& ManifestPath, const uint8_t BlockReuseMinPercentLimit, bool AllowMultiparts, const CbObject& MetaData, bool CreateBuild, bool IgnoreExistingBlocks, bool PostUploadVerify) { Stopwatch ProcessTimer; const std::filesystem::path ZenTempFolder = Path / ZenTempFolderName; CreateDirectories(ZenTempFolder); CleanDirectory(ZenTempFolder, {}); auto _ = MakeGuard([&]() { CleanDirectory(ZenTempFolder, {}); std::filesystem::remove(ZenTempFolder); }); CreateDirectories(Path / ZenTempBlockFolderName); CreateDirectories(Path / ZenTempChunkFolderName); CbObject ChunkerParameters; ChunkedFolderContent LocalContent; GetFolderContentStatistics LocalFolderScanStats; ChunkingStatistics ChunkingStats; { auto IsAcceptedFolder = [ExcludeFolders = DefaultExcludeFolders](const std::string_view& RelativePath) -> bool { for (const std::string_view& ExcludeFolder : ExcludeFolders) { if (RelativePath.starts_with(ExcludeFolder)) { if (RelativePath.length() == ExcludeFolder.length()) { return false; } else if (RelativePath[ExcludeFolder.length()] == '/') { return false; } } } return true; }; auto IsAcceptedFile = [ExcludeExtensions = DefaultExcludeExtensions](const std::string_view& RelativePath, uint64_t, uint32_t) -> bool { for (const std::string_view& ExcludeExtension : ExcludeExtensions) { if (RelativePath.ends_with(ExcludeExtension)) { return false; } } return true; }; auto ParseManifest = [](const std::filesystem::path& Path, const std::filesystem::path& ManifestPath) -> eastl::vector { eastl::vector AssetPaths; std::filesystem::path AbsoluteManifestPath = ManifestPath.is_absolute() ? ManifestPath : Path / ManifestPath; IoBuffer ManifestContent = ReadFile(AbsoluteManifestPath).Flatten(); std::string_view ManifestString((const char*)ManifestContent.GetView().GetData(), ManifestContent.GetSize()); std::string_view::size_type Offset = 0; while (Offset < ManifestContent.GetSize()) { size_t PathBreakOffset = ManifestString.find_first_of("\t\r\n", Offset); if (PathBreakOffset == std::string_view::npos) { PathBreakOffset = ManifestContent.GetSize(); } std::string_view AssetPath = ManifestString.substr(Offset, PathBreakOffset - Offset); if (!AssetPath.empty()) { AssetPaths.emplace_back(std::filesystem::path(AssetPath)); } Offset = PathBreakOffset; size_t EolOffset = ManifestString.find_first_of("\r\n", Offset); if (EolOffset == std::string_view::npos) { break; } Offset = EolOffset; size_t LineBreakOffset = ManifestString.find_first_not_of("\t\r\n", Offset); if (LineBreakOffset == std::string_view::npos) { break; } Offset = LineBreakOffset; } return AssetPaths; }; Stopwatch ScanTimer; FolderContent Content; if (ManifestPath.empty()) { std::filesystem::path ExcludeManifestPath = Path / ZenExcludeManifestName; tsl::robin_set ExcludeAssetPaths; if (std::filesystem::is_regular_file(ExcludeManifestPath)) { eastl::vector AssetPaths = ParseManifest(Path, ExcludeManifestPath); ExcludeAssetPaths.reserve(AssetPaths.size()); for (const std::filesystem::path& AssetPath : AssetPaths) { ExcludeAssetPaths.insert(AssetPath.generic_string()); } } Content = GetFolderContent( LocalFolderScanStats, Path, std::move(IsAcceptedFolder), [&IsAcceptedFile, &ExcludeAssetPaths](const std::string_view& RelativePath, uint64_t Size, uint32_t Attributes) -> bool { if (RelativePath == ZenExcludeManifestName) { return false; } if (!IsAcceptedFile(RelativePath, Size, Attributes)) { return false; } if (ExcludeAssetPaths.contains(std::filesystem::path(RelativePath).generic_string())) { return false; } return true; }, GetMediumWorkerPool(EWorkloadType::Burst), UsePlainProgress ? 5000 : 200, [&](bool, std::ptrdiff_t) { ZEN_DEBUG("Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), Path); }, AbortFlag); } else { Stopwatch ManifestParseTimer; eastl::vector AssetPaths = ParseManifest(Path, ManifestPath); for (const std::filesystem::path& AssetPath : AssetPaths) { Content.Paths.push_back(AssetPath); Content.RawSizes.push_back(std::filesystem::file_size(Path / AssetPath)); #if ZEN_PLATFORM_WINDOWS Content.Attributes.push_back(GetFileAttributes(Path / AssetPath)); #endif // ZEN_PLATFORM_WINDOWS #if ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX Content.Attributes.push_back(GetFileMode(Path / AssetPath)); #endif // ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX LocalFolderScanStats.AcceptedFileByteCount += Content.RawSizes.back(); LocalFolderScanStats.AcceptedFileCount++; } if (ManifestPath.is_relative()) { Content.Paths.push_back(ManifestPath); Content.RawSizes.push_back(std::filesystem::file_size(ManifestPath)); #if ZEN_PLATFORM_WINDOWS Content.Attributes.push_back(GetFileAttributes(ManifestPath)); #endif // ZEN_PLATFORM_WINDOWS #if ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX Content.Attributes.push_back(GetFileMode(ManifestPath)); #endif // ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX LocalFolderScanStats.AcceptedFileByteCount += Content.RawSizes.back(); LocalFolderScanStats.AcceptedFileCount++; } LocalFolderScanStats.FoundFileByteCount.store(LocalFolderScanStats.AcceptedFileByteCount); LocalFolderScanStats.FoundFileCount.store(LocalFolderScanStats.AcceptedFileCount); LocalFolderScanStats.ElapsedWallTimeUS = ManifestParseTimer.GetElapsedTimeUs(); } std::unique_ptr ChunkController = CreateBasicChunkingController(); { CbObjectWriter ChunkParametersWriter; ChunkParametersWriter.AddString("name"sv, ChunkController->GetName()); ChunkParametersWriter.AddObject("parameters"sv, ChunkController->GetParameters()); ChunkerParameters = ChunkParametersWriter.Save(); } std::uint64_t TotalRawSize = 0; for (uint64_t RawSize : Content.RawSizes) { TotalRawSize += RawSize; } { ProgressBar ProgressBar(UsePlainProgress); FilteredRate FilteredBytesHashed; FilteredBytesHashed.Start(); LocalContent = ChunkFolderContent( ChunkingStats, GetMediumWorkerPool(EWorkloadType::Burst), Path, Content, *ChunkController, UsePlainProgress ? 5000 : 200, [&](bool, std::ptrdiff_t) { FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load()); std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found", ChunkingStats.FilesProcessed.load(), Content.Paths.size(), NiceBytes(ChunkingStats.BytesHashed.load()), NiceBytes(TotalRawSize), NiceNum(FilteredBytesHashed.GetCurrent()), ChunkingStats.UniqueChunksFound.load(), NiceBytes(ChunkingStats.UniqueBytesFound.load())); ProgressBar.UpdateState({.Task = "Scanning files ", .Details = Details, .TotalCount = TotalRawSize, .RemainingCount = TotalRawSize - ChunkingStats.BytesHashed.load()}, false); }, AbortFlag); if (AbortFlag) { return; } FilteredBytesHashed.Stop(); ProgressBar.Finish(); } ZEN_CONSOLE("Found {} ({}) files divided into {} ({}) unique chunks in '{}' in {}. Average hash rate {}B/sec", LocalContent.Paths.size(), NiceBytes(TotalRawSize), ChunkingStats.UniqueChunksFound.load(), NiceBytes(ChunkingStats.UniqueBytesFound.load()), Path, NiceTimeSpanMs(ScanTimer.GetElapsedTimeMs()), NiceNum(GetBytesPerSecond(ChunkingStats.ElapsedWallTimeUS, ChunkingStats.BytesHashed))); } const ChunkedContentLookup LocalLookup = BuildChunkedContentLookup(LocalContent); std::uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u; if (CreateBuild) { Stopwatch PutBuildTimer; CbObject PutBuildResult = Storage.PutBuild(BuildId, MetaData); ZEN_CONSOLE("PutBuild took {}. Payload size: {}", NiceTimeSpanMs(PutBuildTimer.GetElapsedTimeMs()), NiceBytes(MetaData.GetSize())); PreferredMultipartChunkSize = PutBuildResult["chunkSize"sv].AsUInt64(PreferredMultipartChunkSize); } else { Stopwatch GetBuildTimer; CbObject Build = Storage.GetBuild(BuildId); ZEN_CONSOLE("GetBuild took {}. Payload size: {}", NiceTimeSpanMs(GetBuildTimer.GetElapsedTimeMs()), NiceBytes(Build.GetSize())); if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0) { PreferredMultipartChunkSize = ChunkSize; } else if (AllowMultiparts) { ZEN_WARN("PreferredMultipartChunkSize is unknown. Defaulting to '{}'", NiceBytes(PreferredMultipartChunkSize)); } } const std::uint64_t LargeAttachmentSize = AllowMultiparts ? PreferredMultipartChunkSize * 4u : (std::uint64_t)-1; FindBlocksStatistics FindBlocksStats; GenerateBlocksStatistics GenerateBlocksStats; LooseChunksStatistics LooseChunksStats; eastl::vector KnownBlocks; eastl::vector ReuseBlockIndexes; eastl::vector NewBlockChunkIndexes; Stopwatch BlockArrangeTimer; eastl::vector LooseChunkIndexes; { bool EnableBlocks = true; eastl::vector BlockChunkIndexes; for (uint32_t ChunkIndex = 0; ChunkIndex < LocalContent.ChunkedContent.ChunkHashes.size(); ChunkIndex++) { const uint64_t ChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; if (!EnableBlocks || ChunkRawSize == 0 || ChunkRawSize > DefaultChunksBlockParams.MaxChunkEmbedSize) { LooseChunkIndexes.push_back(ChunkIndex); LooseChunksStats.ChunkByteCount += ChunkRawSize; } else { BlockChunkIndexes.push_back(ChunkIndex); FindBlocksStats.PotentialChunkByteCount += ChunkRawSize; } } FindBlocksStats.PotentialChunkCount = BlockChunkIndexes.size(); LooseChunksStats.ChunkCount = LooseChunkIndexes.size(); if (IgnoreExistingBlocks) { ZEN_CONSOLE("Ignoring any existing blocks in store"); NewBlockChunkIndexes = std::move(BlockChunkIndexes); } else { Stopwatch KnownBlocksTimer; KnownBlocks = Storage.FindBlocks(BuildId); FindBlocksStats.FindBlockTimeMS = KnownBlocksTimer.GetElapsedTimeMs(); FindBlocksStats.FoundBlockCount = KnownBlocks.size(); ReuseBlockIndexes = FindReuseBlocks(KnownBlocks, LocalContent.ChunkedContent.ChunkHashes, BlockChunkIndexes, BlockReuseMinPercentLimit, NewBlockChunkIndexes, FindBlocksStats); FindBlocksStats.AcceptedBlockCount = ReuseBlockIndexes.size(); for (const ChunkBlockDescription& Description : KnownBlocks) { for (uint32_t ChunkRawLength : Description.ChunkRawLengths) { FindBlocksStats.FoundBlockByteCount += ChunkRawLength; } FindBlocksStats.FoundBlockChunkCount += Description.ChunkRawHashes.size(); } } } eastl::vector> NewBlockChunks; ArrangeChunksIntoBlocks(LocalContent, LocalLookup, DefaultChunksBlockParams.MaxBlockSize, NewBlockChunkIndexes, NewBlockChunks); FindBlocksStats.NewBlocksCount = NewBlockChunks.size(); for (uint32_t ChunkIndex : NewBlockChunkIndexes) { FindBlocksStats.NewBlocksChunkByteCount += LocalContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; } FindBlocksStats.NewBlocksChunkCount = NewBlockChunkIndexes.size(); const double AcceptedByteCountPercent = FindBlocksStats.PotentialChunkByteCount > 0 ? (100.0 * FindBlocksStats.AcceptedByteCount / FindBlocksStats.PotentialChunkByteCount) : 0.0; const double AcceptedReduntantByteCountPercent = FindBlocksStats.AcceptedByteCount > 0 ? (100.0 * FindBlocksStats.AcceptedReduntantByteCount) / (FindBlocksStats.AcceptedByteCount + FindBlocksStats.AcceptedReduntantByteCount) : 0.0; ZEN_CONSOLE( "Found {} chunks in {} ({}) blocks eligeble for reuse in {}\n" " Reusing {} ({}) matching chunks in {} blocks ({:.1f}%)\n" " Accepting {} ({}) redundant chunks ({:.1f}%)\n" " Rejected {} ({}) chunks in {} blocks\n" " Arranged {} ({}) chunks in {} new blocks\n" " Keeping {} ({}) chunks as loose chunks\n" " Discovery completed in {}", FindBlocksStats.FoundBlockChunkCount, FindBlocksStats.FoundBlockCount, NiceBytes(FindBlocksStats.FoundBlockByteCount), NiceTimeSpanMs(FindBlocksStats.FindBlockTimeMS), FindBlocksStats.AcceptedChunkCount, NiceBytes(FindBlocksStats.AcceptedByteCount), FindBlocksStats.AcceptedBlockCount, AcceptedByteCountPercent, FindBlocksStats.AcceptedReduntantChunkCount, NiceBytes(FindBlocksStats.AcceptedReduntantByteCount), AcceptedReduntantByteCountPercent, FindBlocksStats.RejectedChunkCount, NiceBytes(FindBlocksStats.RejectedByteCount), FindBlocksStats.RejectedBlockCount, FindBlocksStats.NewBlocksChunkCount, NiceBytes(FindBlocksStats.NewBlocksChunkByteCount), FindBlocksStats.NewBlocksCount, LooseChunksStats.ChunkCount, NiceBytes(LooseChunksStats.ChunkByteCount), NiceTimeSpanMs(BlockArrangeTimer.GetElapsedTimeMs())); DiskStatistics DiskStats; UploadStatistics UploadStats; GeneratedBlocks NewBlocks; if (!NewBlockChunks.empty()) { Stopwatch GenerateBuildBlocksTimer; auto __ = MakeGuard([&]() { uint64_t BlockGenerateTimeUs = GenerateBuildBlocksTimer.GetElapsedTimeUs(); ZEN_CONSOLE("Generated {} ({}) and uploaded {} ({}) blocks in {}. Generate speed: {}B/sec. Transfer speed {}bits/sec.", GenerateBlocksStats.GeneratedBlockCount.load(), NiceBytes(GenerateBlocksStats.GeneratedBlockByteCount), UploadStats.BlockCount.load(), NiceBytes(UploadStats.BlocksBytes.load()), NiceTimeSpanMs(BlockGenerateTimeUs / 1000), NiceNum(GetBytesPerSecond(GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS, GenerateBlocksStats.GeneratedBlockByteCount)), NiceNum(GetBytesPerSecond(UploadStats.ElapsedWallTimeUS, UploadStats.BlocksBytes * 8))); }); GenerateBuildBlocks(Path, LocalContent, LocalLookup, Storage, BuildId, NewBlockChunks, NewBlocks, DiskStats, UploadStats, GenerateBlocksStats); } CbObject PartManifest; { CbObjectWriter PartManifestWriter; Stopwatch ManifestGenerationTimer; auto __ = MakeGuard([&]() { ZEN_CONSOLE("Generated build part manifest in {} ({})", NiceTimeSpanMs(ManifestGenerationTimer.GetElapsedTimeMs()), NiceBytes(PartManifestWriter.GetSaveSize())); }); PartManifestWriter.AddObject("chunker"sv, ChunkerParameters); eastl::vector AllChunkBlockHashes; eastl::vector AllChunkBlockDescriptions; AllChunkBlockHashes.reserve(ReuseBlockIndexes.size() + NewBlocks.BlockDescriptions.size()); AllChunkBlockDescriptions.reserve(ReuseBlockIndexes.size() + NewBlocks.BlockDescriptions.size()); for (size_t ReuseBlockIndex : ReuseBlockIndexes) { AllChunkBlockDescriptions.push_back(KnownBlocks[ReuseBlockIndex]); AllChunkBlockHashes.push_back(KnownBlocks[ReuseBlockIndex].BlockHash); } AllChunkBlockDescriptions.insert(AllChunkBlockDescriptions.end(), NewBlocks.BlockDescriptions.begin(), NewBlocks.BlockDescriptions.end()); for (const ChunkBlockDescription& BlockDescription : NewBlocks.BlockDescriptions) { AllChunkBlockHashes.push_back(BlockDescription.BlockHash); } #if EXTRA_VERIFY tsl::robin_map ChunkHashToAbsoluteChunkIndex; eastl::vector AbsoluteChunkHashes; AbsoluteChunkHashes.reserve(LocalContent.ChunkedContent.ChunkHashes.size()); for (uint32_t ChunkIndex : LooseChunkIndexes) { ChunkHashToAbsoluteChunkIndex.insert({LocalContent.ChunkedContent.ChunkHashes[ChunkIndex], AbsoluteChunkHashes.size()}); AbsoluteChunkHashes.push_back(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]); } for (const ChunkBlockDescription& Block : AllChunkBlockDescriptions) { for (const IoHash& ChunkHash : Block.ChunkHashes) { ChunkHashToAbsoluteChunkIndex.insert({ChunkHash, AbsoluteChunkHashes.size()}); AbsoluteChunkHashes.push_back(ChunkHash); } } for (const IoHash& ChunkHash : LocalContent.ChunkedContent.ChunkHashes) { ZEN_ASSERT(AbsoluteChunkHashes[ChunkHashToAbsoluteChunkIndex.at(ChunkHash)] == ChunkHash); ZEN_ASSERT(LocalContent.ChunkedContent.ChunkHashes[LocalLookup.ChunkHashToChunkIndex.at(ChunkHash)] == ChunkHash); } for (const uint32_t ChunkIndex : LocalContent.ChunkedContent.ChunkOrders) { ZEN_ASSERT(AbsoluteChunkHashes[ChunkHashToAbsoluteChunkIndex.at(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex])] == LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]); ZEN_ASSERT(LocalLookup.ChunkHashToChunkIndex.at(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]) == ChunkIndex); } #endif // EXTRA_VERIFY eastl::vector AbsoluteChunkOrders = CalculateAbsoluteChunkOrders(LocalContent.ChunkedContent.ChunkHashes, LocalContent.ChunkedContent.ChunkOrders, LocalLookup.ChunkHashToChunkIndex, LooseChunkIndexes, AllChunkBlockDescriptions); #if EXTRA_VERIFY for (uint32_t ChunkOrderIndex = 0; ChunkOrderIndex < LocalContent.ChunkedContent.ChunkOrders.size(); ChunkOrderIndex++) { uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[ChunkOrderIndex]; uint32_t AbsoluteChunkIndex = AbsoluteChunkOrders[ChunkOrderIndex]; const IoHash& LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; const IoHash& AbsoluteChunkHash = AbsoluteChunkHashes[AbsoluteChunkIndex]; ZEN_ASSERT(LocalChunkHash == AbsoluteChunkHash); } #endif // EXTRA_VERIFY WriteBuildContentToCompactBinary(PartManifestWriter, LocalContent.Platform, LocalContent.Paths, LocalContent.RawHashes, LocalContent.RawSizes, LocalContent.Attributes, LocalContent.ChunkedContent.SequenceRawHashes, LocalContent.ChunkedContent.ChunkCounts, LocalContent.ChunkedContent.ChunkHashes, LocalContent.ChunkedContent.ChunkRawSizes, AbsoluteChunkOrders, LooseChunkIndexes, AllChunkBlockHashes); #if EXTRA_VERIFY { ChunkedFolderContent VerifyFolderContent; eastl::vector OutAbsoluteChunkOrders; eastl::vector OutLooseChunkHashes; eastl::vector OutLooseChunkRawSizes; eastl::vector OutBlockRawHashes; 4 ReadBuildContentFromCompactBinary(PartManifestWriter.Save(), VerifyFolderContent.Platform, VerifyFolderContent.Paths, VerifyFolderContent.RawHashes, VerifyFolderContent.RawSizes, VerifyFolderContent.Attributes, VerifyFolderContent.ChunkedContent.SequenceRawHashes, VerifyFolderContent.ChunkedContent.ChunkCounts, OutAbsoluteChunkOrders, OutLooseChunkHashes, OutLooseChunkRawSizes, OutBlockRawHashes); ZEN_ASSERT(OutBlockRawHashes == AllChunkBlockHashes); for (uint32_t OrderIndex = 0; OrderIndex < OutAbsoluteChunkOrders.size(); OrderIndex++) { uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[OrderIndex]; const IoHash LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; uint32_t VerifyChunkIndex = OutAbsoluteChunkOrders[OrderIndex]; const IoHash VerifyChunkHash = AbsoluteChunkHashes[VerifyChunkIndex]; ZEN_ASSERT(LocalChunkHash == VerifyChunkHash); } CalculateLocalChunkOrders(OutAbsoluteChunkOrders, OutLooseChunkHashes, OutLooseChunkRawSizes, AllChunkBlockDescriptions, VerifyFolderContent.ChunkedContent.ChunkHashes, VerifyFolderContent.ChunkedContent.ChunkRawSizes, VerifyFolderContent.ChunkedContent.ChunkOrders); ZEN_ASSERT(LocalContent.Paths == VerifyFolderContent.Paths); ZEN_ASSERT(LocalContent.RawHashes == VerifyFolderContent.RawHashes); ZEN_ASSERT(LocalContent.RawSizes == VerifyFolderContent.RawSizes); ZEN_ASSERT(LocalContent.Attributes == VerifyFolderContent.Attributes); ZEN_ASSERT(LocalContent.ChunkedContent.SequenceRawHashes == VerifyFolderContent.ChunkedContent.SequenceRawHashes); ZEN_ASSERT(LocalContent.ChunkedContent.ChunkCounts == VerifyFolderContent.ChunkedContent.ChunkCounts); for (uint32_t OrderIndex = 0; OrderIndex < LocalContent.ChunkedContent.ChunkOrders.size(); OrderIndex++) { uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[OrderIndex]; const IoHash LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; uint64_t LocalChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[LocalChunkIndex]; uint32_t VerifyChunkIndex = VerifyFolderContent.ChunkedContent.ChunkOrders[OrderIndex]; const IoHash VerifyChunkHash = VerifyFolderContent.ChunkedContent.ChunkHashes[VerifyChunkIndex]; uint64_t VerifyChunkRawSize = VerifyFolderContent.ChunkedContent.ChunkRawSizes[VerifyChunkIndex]; ZEN_ASSERT(LocalChunkHash == VerifyChunkHash); ZEN_ASSERT(LocalChunkRawSize == VerifyChunkRawSize); } } #endif // EXTRA_VERIFY PartManifest = PartManifestWriter.Save(); } Stopwatch PutBuildPartResultTimer; std::pair> PutBuildPartResult = Storage.PutBuildPart(BuildId, BuildPartId, BuildPartName, PartManifest); ZEN_CONSOLE("PutBuildPart took {}, payload size {}. {} attachments are needed.", NiceTimeSpanMs(PutBuildPartResultTimer.GetElapsedTimeMs()), NiceBytes(PartManifest.GetSize()), PutBuildPartResult.second.size()); IoHash PartHash = PutBuildPartResult.first; auto UploadAttachments = [&](eastl::span RawHashes) { if (!AbortFlag) { ZEN_CONSOLE_VERBOSE("Uploading attachments: {}", FormatArray(RawHashes, "\n "sv)); UploadStatistics TempUploadStats; GenerateBlocksStatistics TempGenerateBlocksStats; LooseChunksStatistics TempLooseChunksStats; Stopwatch TempUploadTimer; auto __ = MakeGuard([&]() { uint64_t TempChunkUploadTimeUs = TempUploadTimer.GetElapsedTimeUs(); ZEN_CONSOLE( "Generated {} ({} {}B/s) and uploaded {} ({}) blocks. " "Compressed {} ({} {}B/s) and uploaded {} ({}) chunks. " "Transferred {} ({}bits/s) in {}", TempGenerateBlocksStats.GeneratedBlockCount.load(), NiceBytes(TempGenerateBlocksStats.GeneratedBlockByteCount.load()), NiceNum(GetBytesPerSecond(TempGenerateBlocksStats.GenerateBlocksElapsedWallTimeUS, TempGenerateBlocksStats.GeneratedBlockByteCount)), TempUploadStats.BlockCount.load(), NiceBytes(TempUploadStats.BlocksBytes), TempLooseChunksStats.CompressedChunkCount.load(), NiceBytes(TempLooseChunksStats.CompressedChunkBytes.load()), NiceNum(GetBytesPerSecond(TempLooseChunksStats.CompressChunksElapsedWallTimeUS, TempLooseChunksStats.CompressedChunkBytes)), TempUploadStats.ChunkCount.load(), NiceBytes(TempUploadStats.ChunksBytes), NiceBytes(TempUploadStats.BlocksBytes + TempUploadStats.ChunksBytes), NiceNum(GetBytesPerSecond(TempUploadStats.ElapsedWallTimeUS, TempUploadStats.ChunksBytes * 8)), NiceTimeSpanMs(TempChunkUploadTimeUs / 1000)); }); UploadPartBlobs(Storage, BuildId, Path, LocalContent, LocalLookup, RawHashes, NewBlockChunks, NewBlocks, LooseChunkIndexes, LargeAttachmentSize, DiskStats, TempUploadStats, TempGenerateBlocksStats, TempLooseChunksStats); UploadStats += TempUploadStats; LooseChunksStats += TempLooseChunksStats; GenerateBlocksStats += TempGenerateBlocksStats; } }; if (IgnoreExistingBlocks) { ZEN_CONSOLE_VERBOSE("PutBuildPart uploading all attachments, needs are: {}", FormatArray(PutBuildPartResult.second, "\n "sv)); eastl::vector ForceUploadChunkHashes; ForceUploadChunkHashes.reserve(LooseChunkIndexes.size()); for (uint32_t ChunkIndex : LooseChunkIndexes) { ForceUploadChunkHashes.push_back(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]); } for (size_t BlockIndex = 0; BlockIndex < NewBlocks.BlockBuffers.size(); BlockIndex++) { if (NewBlocks.BlockBuffers[BlockIndex]) { // Block was not uploaded during generation ForceUploadChunkHashes.push_back(NewBlocks.BlockDescriptions[BlockIndex].BlockHash); } } UploadAttachments(ForceUploadChunkHashes); } else if (!PutBuildPartResult.second.empty()) { ZEN_CONSOLE_VERBOSE("PutBuildPart needs attachments: {}", FormatArray(PutBuildPartResult.second, "\n "sv)); UploadAttachments(PutBuildPartResult.second); } while (!AbortFlag) { Stopwatch FinalizeBuildPartTimer; eastl::vector Needs = Storage.FinalizeBuildPart(BuildId, BuildPartId, PartHash); ZEN_CONSOLE("FinalizeBuildPart took {}. {} attachments are missing.", NiceTimeSpanMs(FinalizeBuildPartTimer.GetElapsedTimeMs()), Needs.size()); if (Needs.empty()) { break; } ZEN_CONSOLE_VERBOSE("FinalizeBuildPart needs attachments: {}", FormatArray(Needs, "\n "sv)); UploadAttachments(Needs); } if (CreateBuild && !AbortFlag) { Stopwatch FinalizeBuildTimer; Storage.FinalizeBuild(BuildId); ZEN_CONSOLE("FinalizeBuild took {}", NiceTimeSpanMs(FinalizeBuildTimer.GetElapsedTimeMs())); } if (!NewBlocks.BlockDescriptions.empty() && !AbortFlag) { uint64_t UploadBlockMetadataCount = 0; eastl::vector BlockHashes; BlockHashes.reserve(NewBlocks.BlockDescriptions.size()); Stopwatch UploadBlockMetadataTimer; for (size_t BlockIndex = 0; BlockIndex < NewBlocks.BlockDescriptions.size(); BlockIndex++) { const IoHash& BlockHash = NewBlocks.BlockDescriptions[BlockIndex].BlockHash; if (!NewBlocks.MetaDataHasBeenUploaded[BlockIndex]) { const CbObject BlockMetaData = BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]); Storage.PutBlockMetadata(BuildId, BlockHash, BlockMetaData); UploadStats.BlocksBytes += BlockMetaData.GetSize(); NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; UploadBlockMetadataCount++; } BlockHashes.push_back(BlockHash); } if (UploadBlockMetadataCount > 0) { uint64_t ElapsedUS = UploadBlockMetadataTimer.GetElapsedTimeUs(); UploadStats.ElapsedWallTimeUS += ElapsedUS; ZEN_CONSOLE("Uploaded metadata for {} blocks in {}", UploadBlockMetadataCount, NiceTimeSpanMs(ElapsedUS / 1000)); } } if (PostUploadVerify && !AbortFlag) { ValidateBuildPart(Storage, BuildId, BuildPartId, BuildPartName); } const double DeltaByteCountPercent = ChunkingStats.BytesHashed > 0 ? (100.0 * (FindBlocksStats.NewBlocksChunkByteCount + LooseChunksStats.CompressedChunkBytes)) / (ChunkingStats.BytesHashed) : 0.0; const std::string LargeAttachmentStats = (LargeAttachmentSize != (uint64_t)-1) ? fmt::format(" ({} as multipart)", UploadStats.MultipartAttachmentCount.load()) : ""; ZEN_CONSOLE_VERBOSE( "Folder scanning stats:" "\n FoundFileCount: {}" "\n FoundFileByteCount: {}" "\n AcceptedFileCount: {}" "\n AcceptedFileByteCount: {}" "\n ElapsedWallTimeUS: {}", LocalFolderScanStats.FoundFileCount.load(), NiceBytes(LocalFolderScanStats.FoundFileByteCount.load()), LocalFolderScanStats.AcceptedFileCount.load(), NiceBytes(LocalFolderScanStats.AcceptedFileByteCount.load()), NiceLatencyNs(LocalFolderScanStats.ElapsedWallTimeUS * 1000)); ZEN_CONSOLE_VERBOSE( "Chunking stats:" "\n FilesProcessed: {}" "\n FilesChunked: {}" "\n BytesHashed: {}" "\n UniqueChunksFound: {}" "\n UniqueSequencesFound: {}" "\n UniqueBytesFound: {}" "\n ElapsedWallTimeUS: {}", ChunkingStats.FilesProcessed.load(), ChunkingStats.FilesChunked.load(), NiceBytes(ChunkingStats.BytesHashed.load()), ChunkingStats.UniqueChunksFound.load(), ChunkingStats.UniqueSequencesFound.load(), NiceBytes(ChunkingStats.UniqueBytesFound.load()), NiceLatencyNs(ChunkingStats.ElapsedWallTimeUS * 1000)); ZEN_CONSOLE_VERBOSE( "Find block stats:" "\n FindBlockTimeMS: {}" "\n PotentialChunkCount: {}" "\n PotentialChunkByteCount: {}" "\n FoundBlockCount: {}" "\n FoundBlockChunkCount: {}" "\n FoundBlockByteCount: {}" "\n AcceptedBlockCount: {}" "\n AcceptedChunkCount: {}" "\n AcceptedByteCount: {}" "\n RejectedBlockCount: {}" "\n RejectedChunkCount: {}" "\n RejectedByteCount: {}" "\n AcceptedReduntantChunkCount: {}" "\n AcceptedReduntantByteCount: {}" "\n NewBlocksCount: {}" "\n NewBlocksChunkCount: {}" "\n NewBlocksChunkByteCount: {}", NiceTimeSpanMs(FindBlocksStats.FindBlockTimeMS), FindBlocksStats.PotentialChunkCount, NiceBytes(FindBlocksStats.PotentialChunkByteCount), FindBlocksStats.FoundBlockCount, FindBlocksStats.FoundBlockChunkCount, NiceBytes(FindBlocksStats.FoundBlockByteCount), FindBlocksStats.AcceptedBlockCount, FindBlocksStats.AcceptedChunkCount, NiceBytes(FindBlocksStats.AcceptedByteCount), FindBlocksStats.RejectedBlockCount, FindBlocksStats.RejectedChunkCount, NiceBytes(FindBlocksStats.RejectedByteCount), FindBlocksStats.AcceptedReduntantChunkCount, NiceBytes(FindBlocksStats.AcceptedReduntantByteCount), FindBlocksStats.NewBlocksCount, FindBlocksStats.NewBlocksChunkCount, NiceBytes(FindBlocksStats.NewBlocksChunkByteCount)); ZEN_CONSOLE_VERBOSE( "Generate blocks stats:" "\n GeneratedBlockByteCount: {}" "\n GeneratedBlockCount: {}" "\n GenerateBlocksElapsedWallTimeUS: {}", NiceBytes(GenerateBlocksStats.GeneratedBlockByteCount.load()), GenerateBlocksStats.GeneratedBlockCount.load(), NiceLatencyNs(GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS * 1000)); ZEN_CONSOLE_VERBOSE( "Generate blocks stats:" "\n ChunkCount: {}" "\n ChunkByteCount: {}" "\n CompressedChunkCount: {}" "\n CompressChunksElapsedWallTimeUS: {}", LooseChunksStats.ChunkCount, NiceBytes(LooseChunksStats.ChunkByteCount), LooseChunksStats.CompressedChunkCount.load(), NiceBytes(LooseChunksStats.CompressedChunkBytes.load()), NiceLatencyNs(LooseChunksStats.CompressChunksElapsedWallTimeUS * 1000)); ZEN_CONSOLE_VERBOSE( "Disk stats:" "\n OpenReadCount: {}" "\n OpenWriteCount: {}" "\n ReadCount: {}" "\n ReadByteCount: {}" "\n WriteCount: {}" "\n WriteByteCount: {}" "\n CurrentOpenFileCount: {}", DiskStats.OpenReadCount.load(), DiskStats.OpenWriteCount.load(), DiskStats.ReadCount.load(), NiceBytes(DiskStats.ReadByteCount.load()), DiskStats.WriteCount.load(), NiceBytes(DiskStats.WriteByteCount.load()), DiskStats.CurrentOpenFileCount.load()); ZEN_CONSOLE_VERBOSE( "Upload stats:" "\n BlockCount: {}" "\n BlocksBytes: {}" "\n ChunkCount: {}" "\n ChunksBytes: {}" "\n ReadFromDiskBytes: {}" "\n MultipartAttachmentCount: {}" "\n ElapsedWallTimeUS: {}", UploadStats.BlockCount.load(), NiceBytes(UploadStats.BlocksBytes.load()), UploadStats.ChunkCount.load(), NiceBytes(UploadStats.ChunksBytes.load()), NiceBytes(UploadStats.ReadFromDiskBytes.load()), UploadStats.MultipartAttachmentCount.load(), NiceLatencyNs(UploadStats.ElapsedWallTimeUS * 1000)); ZEN_CONSOLE( "Uploaded {}\n" " Delta: {}/{} ({:.1f}%)\n" " Blocks: {} ({})\n" " Chunks: {} ({}){}\n" " Rate: {}bits/sec", NiceBytes(UploadStats.BlocksBytes + UploadStats.ChunksBytes), NiceBytes(FindBlocksStats.NewBlocksChunkByteCount + LooseChunksStats.CompressedChunkBytes), NiceBytes(ChunkingStats.BytesHashed), DeltaByteCountPercent, UploadStats.BlockCount.load(), NiceBytes(UploadStats.BlocksBytes), UploadStats.ChunkCount.load(), NiceBytes(UploadStats.ChunksBytes), LargeAttachmentStats, NiceNum(GetBytesPerSecond(UploadStats.ElapsedWallTimeUS, (UploadStats.ChunksBytes + UploadStats.BlocksBytes * 8)))); ZEN_CONSOLE("Uploaded ({}) build {} part {} ({}) in {}", NiceBytes(FindBlocksStats.NewBlocksChunkByteCount + LooseChunksStats.CompressedChunkBytes), BuildId, BuildPartName, BuildPartId, NiceTimeSpanMs(ProcessTimer.GetElapsedTimeMs())); } void VerifyFolder(const ChunkedFolderContent& Content, const std::filesystem::path& Path, bool VerifyFileHash) { ZEN_TRACE_CPU("VerifyFolder"); ProgressBar ProgressBar(UsePlainProgress); std::atomic FilesVerified(0); std::atomic FilesFailed(0); std::atomic ReadBytes(0); WorkerThreadPool& VerifyPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // ParallellWork Work(AbortFlag); const uint32_t PathCount = gsl::narrow(Content.Paths.size()); RwLock ErrorLock; eastl::vector Errors; auto IsAcceptedFolder = [ExcludeFolders = DefaultExcludeFolders](const std::string_view& RelativePath) -> bool { for (const std::string_view& ExcludeFolder : ExcludeFolders) { if (RelativePath.starts_with(ExcludeFolder)) { if (RelativePath.length() == ExcludeFolder.length()) { return false; } else if (RelativePath[ExcludeFolder.length()] == '/') { return false; } } } return true; }; const ChunkedContentLookup Lookup = BuildChunkedContentLookup(Content); for (uint32_t PathIndex = 0; PathIndex < PathCount; PathIndex++) { if (Work.IsAborted()) { break; } Work.ScheduleWork( VerifyPool, [&, PathIndex](std::atomic&) { if (!AbortFlag) { ZEN_TRACE_CPU("VerifyFile_work"); // TODO: Convert ScheduleWork body to function const std::filesystem::path TargetPath = (Path / Content.Paths[PathIndex]).make_preferred(); if (IsAcceptedFolder(TargetPath.parent_path().generic_string())) { const uint64_t ExpectedSize = Content.RawSizes[PathIndex]; if (!std::filesystem::exists(TargetPath)) { ErrorLock.WithExclusiveLock([&]() { Errors.push_back(fmt::format("File {} with expected size {} does not exist", TargetPath, ExpectedSize)); }); FilesFailed++; } else { std::error_code Ec; uint64_t SizeOnDisk = gsl::narrow(std::filesystem::file_size(TargetPath, Ec)); if (Ec) { ErrorLock.WithExclusiveLock([&]() { Errors.push_back( fmt::format("Failed to get size of file {}: {} ({})", TargetPath, Ec.message(), Ec.value())); }); FilesFailed++; } else if (SizeOnDisk < ExpectedSize) { ErrorLock.WithExclusiveLock([&]() { Errors.push_back(fmt::format("Size of file {} is smaller than expected. Expected: {}, Found: {}", TargetPath, ExpectedSize, SizeOnDisk)); }); FilesFailed++; } else if (SizeOnDisk > ExpectedSize) { ErrorLock.WithExclusiveLock([&]() { Errors.push_back(fmt::format("Size of file {} is bigger than expected. Expected: {}, Found: {}", TargetPath, ExpectedSize, SizeOnDisk)); }); FilesFailed++; } else if (SizeOnDisk > 0 && VerifyFileHash) { const IoHash& ExpectedRawHash = Content.RawHashes[PathIndex]; IoBuffer Buffer = IoBufferBuilder::MakeFromFile(TargetPath); IoHash RawHash = IoHash::HashBuffer(Buffer); if (RawHash != ExpectedRawHash) { uint64_t FileOffset = 0; const uint32_t SequenceIndex = Lookup.RawHashToSequenceIndex.at(ExpectedRawHash); const uint32_t OrderOffset = Lookup.SequenceIndexChunkOrderOffset[SequenceIndex]; for (uint32_t OrderIndex = OrderOffset; OrderIndex < OrderOffset + Content.ChunkedContent.ChunkCounts[SequenceIndex]; OrderIndex++) { uint32_t ChunkIndex = Content.ChunkedContent.ChunkOrders[OrderIndex]; uint64_t ChunkSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; IoHash ChunkHash = Content.ChunkedContent.ChunkHashes[ChunkIndex]; IoBuffer FileChunk = IoBuffer(Buffer, FileOffset, ChunkSize); if (IoHash::HashBuffer(FileChunk) != ChunkHash) { ErrorLock.WithExclusiveLock([&]() { Errors.push_back(fmt::format( "WARNING: Hash of file {} does not match expected hash. Expected: {}, Found: {}. " "Mismatch at chunk {}", TargetPath, ExpectedRawHash, RawHash, OrderIndex - OrderOffset)); }); break; } FileOffset += ChunkSize; } FilesFailed++; } ReadBytes += SizeOnDisk; } } } FilesVerified++; } }, [&, PathIndex](const std::exception& Ex, std::atomic&) { ErrorLock.WithExclusiveLock([&]() { Errors.push_back(fmt::format("Failed verifying file '{}'. Reason: {}", (Path / Content.Paths[PathIndex]).make_preferred(), Ex.what())); }); FilesFailed++; }); } Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted, PendingWork); std::string Details = fmt::format("Verified {}/{} ({}). Failed files: {}", FilesVerified.load(), PathCount, NiceBytes(ReadBytes.load()), FilesFailed.load()); ProgressBar.UpdateState({.Task = "Verifying files ", .Details = Details, .TotalCount = gsl::narrow(PathCount), .RemainingCount = gsl::narrow(PathCount - FilesVerified.load())}, false); }); ProgressBar.Finish(); for (const std::string& Error : Errors) { ZEN_CONSOLE("{}", Error); } if (!Errors.empty()) { throw std::runtime_error(fmt::format("Verify failed with {} errors", Errors.size())); } } class WriteFileCache { public: WriteFileCache() {} ~WriteFileCache() { Flush(); } template void WriteToFile(uint32_t TargetIndex, std::function&& GetTargetPath, const TBufferType& Buffer, uint64_t FileOffset, uint64_t TargetFinalSize) { if (!SeenTargetIndexes.empty() && SeenTargetIndexes.back() == TargetIndex) { ZEN_ASSERT(OpenFileWriter); OpenFileWriter->Write(Buffer, FileOffset); } else { Flush(); const std::filesystem::path& TargetPath = GetTargetPath(TargetIndex); CreateDirectories(TargetPath.parent_path()); uint32_t Tries = 5; std::unique_ptr NewOutputFile( std::make_unique(TargetPath, BasicFile::Mode::kWrite, [&Tries, TargetPath](std::error_code& Ec) { if (Tries < 3) { ZEN_CONSOLE("Failed opening file '{}': {}{}", TargetPath, Ec.message(), Tries > 1 ? " Retrying"sv : ""sv); } if (Tries > 1) { Sleep(100); } return --Tries > 0; })); const bool CacheWriter = TargetFinalSize > Buffer.GetSize(); if (CacheWriter) { ZEN_ASSERT_SLOW(std::find(SeenTargetIndexes.begin(), SeenTargetIndexes.end(), TargetIndex) == SeenTargetIndexes.end()); OutputFile = std::move(NewOutputFile); OpenFileWriter = std::make_unique(*OutputFile, Min(TargetFinalSize, 256u * 1024u)); OpenFileWriter->Write(Buffer, FileOffset); SeenTargetIndexes.push_back(TargetIndex); } else { NewOutputFile->Write(Buffer, FileOffset); } } } void Flush() { OpenFileWriter = {}; OutputFile = {}; } eastl::vector SeenTargetIndexes; std::unique_ptr OutputFile; std::unique_ptr OpenFileWriter; }; eastl::vector GetRemainingChunkTargets( eastl::span> SequenceIndexChunksLeftToWriteCounters, const ChunkedContentLookup& Lookup, uint32_t ChunkIndex) { eastl::span ChunkSources = GetChunkSequenceLocations(Lookup, ChunkIndex); eastl::vector ChunkTargetPtrs; if (!ChunkSources.empty()) { ChunkTargetPtrs.reserve(ChunkSources.size()); for (const ChunkedContentLookup::ChunkSequenceLocation& Source : ChunkSources) { if (SequenceIndexChunksLeftToWriteCounters[Source.SequenceIndex].load() > 0) { ChunkTargetPtrs.push_back(&Source); } } } return ChunkTargetPtrs; }; bool WriteBlockToDisk(const std::filesystem::path& CacheFolderPath, const ChunkedFolderContent& RemoteContent, eastl::span> SequenceIndexChunksLeftToWriteCounters, const CompositeBuffer& DecompressedBlockBuffer, const ChunkedContentLookup& Lookup, std::atomic* RemoteChunkIndexNeedsCopyFromSourceFlags, std::atomic& OutChunksComplete, std::atomic& OutBytesWritten) { ZEN_TRACE_CPU("WriteBlockToDisk"); eastl::vector ChunkBuffers; struct WriteOpData { const ChunkedContentLookup::ChunkSequenceLocation* Target; size_t ChunkBufferIndex; }; eastl::vector WriteOps; SharedBuffer BlockBuffer = DecompressedBlockBuffer.Flatten(); uint64_t HeaderSize = 0; if (IterateChunkBlock( BlockBuffer, [&](CompressedBuffer&& Chunk, const IoHash& ChunkHash) { if (auto It = Lookup.ChunkHashToChunkIndex.find(ChunkHash); It != Lookup.ChunkHashToChunkIndex.end()) { const uint32_t ChunkIndex = It->second; eastl::vector ChunkTargetPtrs = GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, Lookup, ChunkIndex); if (!ChunkTargetPtrs.empty()) { bool NeedsWrite = true; if (RemoteChunkIndexNeedsCopyFromSourceFlags[ChunkIndex].compare_exchange_strong(NeedsWrite, false)) { CompositeBuffer Decompressed = Chunk.DecompressToComposite(); if (!Decompressed) { throw std::runtime_error(fmt::format("Decompression of build blob {} failed", ChunkHash)); } ZEN_ASSERT_SLOW(ChunkHash == IoHash::HashBuffer(Decompressed)); ZEN_ASSERT(Decompressed.GetSize() == RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]); for (const ChunkedContentLookup::ChunkSequenceLocation* Target : ChunkTargetPtrs) { WriteOps.push_back(WriteOpData{.Target = Target, .ChunkBufferIndex = ChunkBuffers.size()}); } ChunkBuffers.emplace_back(std::move(Decompressed)); } } } }, HeaderSize)) { if (!WriteOps.empty()) { if (!AbortFlag) { std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOpData& Lhs, const WriteOpData& Rhs) { if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex) { return true; } if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex) { return false; } return Lhs.Target->Offset < Rhs.Target->Offset; }); { WriteFileCache OpenFileCache; for (const WriteOpData& WriteOp : WriteOps) { if (AbortFlag) { break; } const CompositeBuffer& Chunk = ChunkBuffers[WriteOp.ChunkBufferIndex]; const uint32_t SequenceIndex = WriteOp.Target->SequenceIndex; ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() <= RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]); ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() > 0); const uint64_t ChunkSize = Chunk.GetSize(); const uint64_t FileOffset = WriteOp.Target->Offset; const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex]; ZEN_ASSERT(FileOffset + ChunkSize <= RemoteContent.RawSizes[PathIndex]); OpenFileCache.WriteToFile( SequenceIndex, [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) { return GetTempChunkedSequenceFileName(CacheFolderPath, RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); }, Chunk, FileOffset, RemoteContent.RawSizes[PathIndex]); OutBytesWritten += ChunkSize; } } if (!AbortFlag) { ZEN_TRACE_CPU("WriteBlockToDisk_VerifyHash"); // Write tracking, updating this must be done without any files open (WriteFileCache) for (const WriteOpData& WriteOp : WriteOps) { const uint32_t RemoteSequenceIndex = WriteOp.Target->SequenceIndex; if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1) { const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; const IoHash VerifyChunkHash = IoHash::HashBuffer( IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash))); if (VerifyChunkHash != SequenceRawHash) { throw std::runtime_error(fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash)); } std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash), GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)); } } } OutChunksComplete += gsl::narrow(ChunkBuffers.size()); } } return true; } return false; } SharedBuffer Decompress(const CompositeBuffer& CompressedChunk, const IoHash& ChunkHash, const uint64_t ChunkRawSize) { ZEN_TRACE_CPU("Decompress"); IoHash RawHash; uint64_t RawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompressedChunk, RawHash, RawSize); if (!Compressed) { throw std::runtime_error(fmt::format("Invalid build blob format for chunk {}", ChunkHash)); } if (RawHash != ChunkHash) { throw std::runtime_error(fmt::format("Mismatching build blob {}, but compressed header rawhash is {}", ChunkHash, RawHash)); } if (RawSize != ChunkRawSize) { throw std::runtime_error( fmt::format("Mismatching build blob {}, expected raw size {} but recevied raw size {}", ChunkHash, ChunkRawSize, RawSize)); } if (!Compressed) { throw std::runtime_error(fmt::format("Invalid build blob {}, not a compressed buffer", ChunkHash)); } SharedBuffer Decompressed = Compressed.Decompress(); if (!Decompressed) { throw std::runtime_error(fmt::format("Decompression of build blob {} failed", ChunkHash)); } return Decompressed; } void WriteChunkToDisk(const std::filesystem::path& CacheFolderPath, const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, eastl::span ChunkTargets, const CompositeBuffer& ChunkData, WriteFileCache& OpenFileCache, std::atomic& OutBytesWritten) { ZEN_TRACE_CPU("WriteChunkToDisk"); for (const ChunkedContentLookup::ChunkSequenceLocation* TargetPtr : ChunkTargets) { const auto& Target = *TargetPtr; const uint64_t FileOffset = Target.Offset; const uint32_t SequenceIndex = Target.SequenceIndex; const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex]; OpenFileCache.WriteToFile( SequenceIndex, [&CacheFolderPath, &Content](uint32_t SequenceIndex) { return GetTempChunkedSequenceFileName(CacheFolderPath, Content.ChunkedContent.SequenceRawHashes[SequenceIndex]); }, ChunkData, FileOffset, Content.RawSizes[PathIndex]); OutBytesWritten += ChunkData.GetSize(); } } bool CanStreamDecompress(const ChunkedFolderContent& RemoteContent, const eastl::vector Locations) { if (Locations.size() == 1) { const uint32_t FirstSequenceIndex = Locations[0]->SequenceIndex; if (Locations[0]->Offset == 0 && RemoteContent.ChunkedContent.ChunkCounts[FirstSequenceIndex] == 1) { return true; } } return false; } void StreamDecompress(const std::filesystem::path& CacheFolderPath, const IoHash& SequenceRawHash, CompositeBuffer&& CompressedPart, std::atomic& WriteToDiskBytes) { const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash); TemporaryFile DecompressedTemp; std::error_code Ec; DecompressedTemp.CreateTemporary(TempChunkSequenceFileName.parent_path(), Ec); if (Ec) { throw std::runtime_error( fmt::format("Failed creating temporary file for decompressing large blob {}. Reason: {}", SequenceRawHash, Ec.message())); } IoHash RawHash; uint64_t RawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompressedPart, RawHash, RawSize); if (!Compressed) { throw std::runtime_error(fmt::format("Failed to parse header of compressed large blob {}", SequenceRawHash)); } if (RawHash != SequenceRawHash) { throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, SequenceRawHash)); } IoHashStream Hash; bool CouldDecompress = Compressed.DecompressToStream(0, (uint64_t)-1, [&](uint64_t Offset, const CompositeBuffer& RangeBuffer) { DecompressedTemp.Write(RangeBuffer, Offset); for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) { Hash.Append(Segment.GetView()); } WriteToDiskBytes += RangeBuffer.GetSize(); }); if (!CouldDecompress) { throw std::runtime_error(fmt::format("Failed to decompress large blob {}", SequenceRawHash)); } const IoHash VerifyHash = Hash.GetHash(); if (VerifyHash != SequenceRawHash) { throw std::runtime_error( fmt::format("Decompressed blob payload hash {} does not match expected hash {}", VerifyHash, SequenceRawHash)); } DecompressedTemp.MoveTemporaryIntoPlace(TempChunkSequenceFileName, Ec); if (Ec) { throw std::runtime_error( fmt::format("Failed moving temporary file for decompressing large blob {}. Reason: {}", SequenceRawHash, Ec.message())); } } void DownloadLargeBlob(BuildStorage& Storage, const std::filesystem::path& TempFolderPath, const std::filesystem::path& CacheFolderPath, const ChunkedFolderContent& RemoteContent, const ChunkedContentLookup& RemoteLookup, const Oid& BuildId, const IoHash& ChunkHash, const std::uint64_t PreferredMultipartChunkSize, const eastl::vector& ChunkTargetPtrs, eastl::span> SequenceIndexChunksLeftToWriteCounters, ParallellWork& Work, WorkerThreadPool& WritePool, WorkerThreadPool& NetworkPool, std::atomic& WriteToDiskBytes, std::atomic& BytesDownloaded, std::atomic& LooseChunksBytes, std::atomic& DownloadedChunks, std::atomic& ChunksComplete, std::atomic& MultipartAttachmentCount) { ZEN_TRACE_CPU("DownloadLargeBlob"); struct WorkloadData { TemporaryFile TempFile; }; std::shared_ptr Workload(std::make_shared()); std::error_code Ec; Workload->TempFile.CreateTemporary(TempFolderPath, Ec); if (Ec) { throw std::runtime_error( fmt::format("Failed opening temporary file '{}': {} ({})", Workload->TempFile.GetPath(), Ec.message(), Ec.value())); } eastl::vector> WorkItems = Storage.GetLargeBuildBlob( BuildId, ChunkHash, PreferredMultipartChunkSize, [&CacheFolderPath, &RemoteContent, &RemoteLookup, &Work, &WritePool, Workload, ChunkHash, &BytesDownloaded, &LooseChunksBytes, &WriteToDiskBytes, &DownloadedChunks, &ChunksComplete, SequenceIndexChunksLeftToWriteCounters, ChunkTargetPtrs = eastl::vector( ChunkTargetPtrs)](uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining) { BytesDownloaded += Chunk.GetSize(); LooseChunksBytes += Chunk.GetSize(); if (!AbortFlag.load()) { Workload->TempFile.Write(Chunk.GetView(), Offset); if (Chunk.GetSize() == BytesRemaining) { DownloadedChunks++; Work.ScheduleWork( WritePool, // GetSyncWorkerPool(),// [&CacheFolderPath, &RemoteContent, &RemoteLookup, ChunkHash, Workload, Offset, BytesRemaining, &ChunksComplete, &WriteToDiskBytes, SequenceIndexChunksLeftToWriteCounters, ChunkTargetPtrs](std::atomic&) { ZEN_TRACE_CPU("DownloadLargeBlob_Work"); if (!AbortFlag) { uint64_t CompressedSize = Workload->TempFile.FileSize(); void* FileHandle = Workload->TempFile.Detach(); IoBuffer CompressedPart = IoBuffer(IoBuffer::File, FileHandle, 0, CompressedSize, /*IsWholeFile*/ true); if (!CompressedPart) { throw std::runtime_error( fmt::format("Multipart build blob {} is not a compressed buffer", ChunkHash)); } CompressedPart.SetDeleteOnClose(true); auto ChunkHashToChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); ZEN_ASSERT(ChunkHashToChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end()); bool NeedHashVerify = true; if (CanStreamDecompress(RemoteContent, ChunkTargetPtrs)) { const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[ChunkTargetPtrs.front()->SequenceIndex]; StreamDecompress(CacheFolderPath, SequenceRawHash, CompositeBuffer(std::move(CompressedPart)), WriteToDiskBytes); NeedHashVerify = false; ChunksComplete++; } else { const uint32_t ChunkIndex = ChunkHashToChunkIndexIt->second; SharedBuffer Chunk = Decompress(CompositeBuffer(std::move(CompressedPart)), ChunkHash, RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]); // ZEN_ASSERT_SLOW(ChunkHash == // IoHash::HashBuffer(Chunk.AsIoBuffer())); if (!AbortFlag) { WriteFileCache OpenFileCache; WriteChunkToDisk(CacheFolderPath, RemoteContent, RemoteLookup, ChunkTargetPtrs, CompositeBuffer(Chunk), OpenFileCache, WriteToDiskBytes); ChunksComplete++; } } if (!AbortFlag) { // Write tracking, updating this must be done without any files open (WriteFileCache) for (const ChunkedContentLookup::ChunkSequenceLocation* Location : ChunkTargetPtrs) { const uint32_t RemoteSequenceIndex = Location->SequenceIndex; if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1) { ZEN_TRACE_CPU("VerifyChunkHash"); const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; if (NeedHashVerify) { const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile( GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash))); if (VerifyChunkHash != ChunkHash) { throw std::runtime_error( fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, ChunkHash)); } } ZEN_TRACE_CPU("VerifyChunkHashes_rename"); std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash), GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)); } } } } }, Work.DefaultErrorFunction()); } } }); if (!WorkItems.empty()) { MultipartAttachmentCount++; } for (auto& WorkItem : WorkItems) { Work.ScheduleWork( NetworkPool, // GetSyncWorkerPool(),// [WorkItem = std::move(WorkItem)](std::atomic&) { if (!AbortFlag) { WorkItem(); } }, Work.DefaultErrorFunction()); } } void UpdateFolder(BuildStorage& Storage, const Oid& BuildId, const std::filesystem::path& Path, const std::uint64_t LargeAttachmentSize, const std::uint64_t PreferredMultipartChunkSize, const ChunkedFolderContent& LocalContent, const ChunkedFolderContent& RemoteContent, const eastl::vector& BlockDescriptions, const eastl::vector& LooseChunkHashes, bool WipeTargetFolder, FolderContent& OutLocalFolderState) { ZEN_TRACE_CPU("UpdateFolder"); ZEN_UNUSED(WipeTargetFolder); std::atomic DownloadedBlocks = 0; std::atomic BlockBytes = 0; std::atomic DownloadedChunks = 0; std::atomic LooseChunksBytes = 0; std::atomic WriteToDiskBytes = 0; std::atomic MultipartAttachmentCount = 0; DiskStatistics DiskStats; Stopwatch IndexTimer; const ChunkedContentLookup LocalLookup = BuildChunkedContentLookup(LocalContent); const ChunkedContentLookup RemoteLookup = BuildChunkedContentLookup(RemoteContent); ZEN_CONSOLE("Indexed local and remote content in {}", NiceTimeSpanMs(IndexTimer.GetElapsedTimeMs())); const std::filesystem::path CacheFolderPath = Path / ZenTempCacheFolderName; Stopwatch CacheMappingTimer; uint64_t CacheMappedBytesForReuse = 0; eastl::vector> SequenceIndexChunksLeftToWriteCounters(RemoteContent.ChunkedContent.SequenceRawHashes.size()); // eastl::vector RemoteSequenceIndexIsCachedFlags(RemoteContent.ChunkedContent.SequenceRawHashes.size(), false); eastl::vector RemoteChunkIndexIsCachedFlags(RemoteContent.ChunkedContent.ChunkHashes.size()); // Guard if he same chunks is in multiple blocks (can happen due to block reuse, cache reuse blocks writes directly) eastl::vector> RemoteChunkIndexNeedsCopyFromSourceFlags(RemoteContent.ChunkedContent.ChunkHashes.size()); // Pick up all whole files we can use from current local state for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < RemoteContent.ChunkedContent.SequenceRawHashes.size(); RemoteSequenceIndex++) { const IoHash& RemoteSequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; if (auto It = LocalLookup.RawHashToSequenceIndex.find(RemoteSequenceRawHash); It != LocalLookup.RawHashToSequenceIndex.end()) { SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = 0; const uint32_t RemotePathIndex = GetFirstPathIndexForRawHash(RemoteLookup, RemoteSequenceRawHash); CacheMappedBytesForReuse += RemoteContent.RawSizes[RemotePathIndex]; } else { SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]; } } // Pick up all chunks in current local state struct CacheCopyData { uint32_t LocalSequenceIndex; eastl::vector TargetChunkLocationPtrs; struct ChunkTarget { uint32_t TargetChunkLocationCount; uint64_t ChunkRawSize; uint64_t CacheFileOffset; }; eastl::vector ChunkTargets; }; tsl::robin_map RawHashToCacheCopyDataIndex; eastl::vector CacheCopyDatas; for (uint32_t LocalSequenceIndex = 0; LocalSequenceIndex < LocalContent.ChunkedContent.SequenceRawHashes.size(); LocalSequenceIndex++) { const IoHash& LocalSequenceRawHash = LocalContent.ChunkedContent.SequenceRawHashes[LocalSequenceIndex]; const uint32_t LocalOrderOffset = LocalLookup.SequenceIndexChunkOrderOffset[LocalSequenceIndex]; { uint64_t SourceOffset = 0; const uint32_t LocalChunkCount = LocalContent.ChunkedContent.ChunkCounts[LocalSequenceIndex]; for (uint32_t LocalOrderIndex = 0; LocalOrderIndex < LocalChunkCount; LocalOrderIndex++) { const uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[LocalOrderOffset + LocalOrderIndex]; const IoHash& LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; const uint64_t LocalChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[LocalChunkIndex]; if (auto RemoteChunkIt = RemoteLookup.ChunkHashToChunkIndex.find(LocalChunkHash); RemoteChunkIt != RemoteLookup.ChunkHashToChunkIndex.end()) { const uint32_t RemoteChunkIndex = RemoteChunkIt->second; if (!RemoteChunkIndexIsCachedFlags[RemoteChunkIndex]) { eastl::vector ChunkTargetPtrs = GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex); if (!ChunkTargetPtrs.empty()) { CacheCopyData::ChunkTarget Target = { .TargetChunkLocationCount = gsl::narrow(ChunkTargetPtrs.size()), .ChunkRawSize = LocalChunkRawSize, .CacheFileOffset = SourceOffset}; if (auto CopySourceIt = RawHashToCacheCopyDataIndex.find(LocalSequenceRawHash); CopySourceIt != RawHashToCacheCopyDataIndex.end()) { CacheCopyData& Data = CacheCopyDatas[CopySourceIt->second]; Data.TargetChunkLocationPtrs.insert(Data.TargetChunkLocationPtrs.end(), ChunkTargetPtrs.begin(), ChunkTargetPtrs.end()); Data.ChunkTargets.push_back(Target); } else { RawHashToCacheCopyDataIndex.insert_or_assign(LocalSequenceRawHash, CacheCopyDatas.size()); CacheCopyDatas.push_back( CacheCopyData{.LocalSequenceIndex = LocalSequenceIndex, .TargetChunkLocationPtrs = ChunkTargetPtrs, .ChunkTargets = eastl::vector{Target}}); } CacheMappedBytesForReuse += LocalChunkRawSize; RemoteChunkIndexIsCachedFlags[RemoteChunkIndex] = true; } } } SourceOffset += LocalChunkRawSize; } } } if (CacheMappedBytesForReuse > 0) { ZEN_CONSOLE("Mapped {} cached data for reuse in {}", NiceBytes(CacheMappedBytesForReuse), NiceTimeSpanMs(CacheMappingTimer.GetElapsedTimeMs())); } uint32_t ChunkCountToWrite = 0; for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++) { if (RemoteChunkIndexIsCachedFlags[RemoteChunkIndex]) { ChunkCountToWrite++; } else { eastl::vector ChunkTargetPtrs = GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex); if (!ChunkTargetPtrs.empty()) { RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex] = true; ChunkCountToWrite++; } } } std::atomic ChunkCountWritten = 0; { ZEN_TRACE_CPU("HandleChunks"); FilteredRate FilteredDownloadedBytesPerSecond; FilteredRate FilteredWrittenBytesPerSecond; WorkerThreadPool& NetworkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // WorkerThreadPool& WritePool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // ProgressBar WriteProgressBar(UsePlainProgress); ParallellWork Work(AbortFlag); std::atomic BytesDownloaded = 0; for (const IoHash ChunkHash : LooseChunkHashes) { if (AbortFlag) { break; } auto RemoteChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); ZEN_ASSERT(RemoteChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end()); const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second; if (RemoteChunkIndexIsCachedFlags[RemoteChunkIndex]) { ZEN_DEBUG("Skipping chunk {} due to cache reuse", ChunkHash); continue; } bool NeedsCopy = true; if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex].compare_exchange_strong(NeedsCopy, false)) { eastl::vector ChunkTargetPtrs = GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex); if (ChunkTargetPtrs.empty()) { ZEN_DEBUG("Skipping chunk {} due to cache reuse", ChunkHash); } else { Work.ScheduleWork( NetworkPool, // GetSyncWorkerPool(),// [&, ChunkHash, RemoteChunkIndex, ChunkTargetPtrs](std::atomic&) { ZEN_TRACE_CPU("UpdateFolder_LooseChunk"); if (!AbortFlag) { FilteredDownloadedBytesPerSecond.Start(); if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize) { DownloadLargeBlob(Storage, Path / ZenTempChunkFolderName, CacheFolderPath, RemoteContent, RemoteLookup, BuildId, ChunkHash, PreferredMultipartChunkSize, ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters, Work, WritePool, NetworkPool, WriteToDiskBytes, BytesDownloaded, LooseChunksBytes, DownloadedChunks, ChunkCountWritten, MultipartAttachmentCount); } else { IoBuffer CompressedPart = Storage.GetBuildBlob(BuildId, ChunkHash); if (!CompressedPart) { throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash)); } BytesDownloaded += CompressedPart.GetSize(); LooseChunksBytes += CompressedPart.GetSize(); CompositeBuffer Payload = WriteToTempFileIfNeeded(CompositeBuffer(std::move(CompressedPart)), Path / ZenTempChunkFolderName, ChunkHash); DownloadedChunks++; if (!AbortFlag) { Work.ScheduleWork( WritePool, [&, ChunkHash, RemoteChunkIndex, ChunkTargetPtrs, CompressedPart = std::move(Payload)]( std::atomic&) { ZEN_TRACE_CPU("UpdateFolder_WriteBlob"); if (!AbortFlag) { FilteredWrittenBytesPerSecond.Start(); bool NeedHashVerify = true; if (CanStreamDecompress(RemoteContent, ChunkTargetPtrs)) { const IoHash& SequenceRawHash = RemoteContent.ChunkedContent .SequenceRawHashes[ChunkTargetPtrs.front()->SequenceIndex]; StreamDecompress(CacheFolderPath, SequenceRawHash, CompositeBuffer(CompressedPart), WriteToDiskBytes); ChunkCountWritten++; NeedHashVerify = false; } else { SharedBuffer Chunk = Decompress(CompressedPart, ChunkHash, RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex]); { WriteFileCache OpenFileCache; WriteChunkToDisk(CacheFolderPath, RemoteContent, RemoteLookup, ChunkTargetPtrs, CompositeBuffer(Chunk), OpenFileCache, WriteToDiskBytes); ChunkCountWritten++; } } if (!AbortFlag) { // Write tracking, updating this must be done without any files open // (WriteFileCache) for (const ChunkedContentLookup::ChunkSequenceLocation* Location : ChunkTargetPtrs) { const uint32_t RemoteSequenceIndex = Location->SequenceIndex; if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub( 1) == 1) { ZEN_TRACE_CPU("UpdateFolder_VerifyHash"); const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; if (NeedHashVerify) { const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile( GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash))); if (VerifyChunkHash != SequenceRawHash) { throw std::runtime_error( fmt::format("Written chunk sequence {} hash does not match " "expected hash {}", VerifyChunkHash, SequenceRawHash)); } } ZEN_TRACE_CPU("UpdateFolder_rename"); std::filesystem::rename( GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash), GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)); } } } } }, Work.DefaultErrorFunction()); } } } }, Work.DefaultErrorFunction()); } } } for (size_t CopyDataIndex = 0; CopyDataIndex < CacheCopyDatas.size(); CopyDataIndex++) { if (AbortFlag) { break; } Work.ScheduleWork( WritePool, // GetSyncWorkerPool(),// [&, CopyDataIndex](std::atomic&) { if (!AbortFlag) { ZEN_TRACE_CPU("UpdateFolder_Copy"); FilteredWrittenBytesPerSecond.Start(); const CacheCopyData& CopyData = CacheCopyDatas[CopyDataIndex]; const uint32_t LocalPathIndex = LocalLookup.SequenceIndexFirstPathIndex[CopyData.LocalSequenceIndex]; const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred(); if (!CopyData.TargetChunkLocationPtrs.empty()) { uint64_t CacheLocalFileBytesRead = 0; size_t TargetStart = 0; const eastl::span AllTargets( CopyData.TargetChunkLocationPtrs); struct WriteOp { const ChunkedContentLookup::ChunkSequenceLocation* Target; uint64_t CacheFileOffset; uint64_t ChunkSize; }; eastl::vector WriteOps; WriteOps.reserve(AllTargets.size()); for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets) { eastl::span TargetRange = AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount); for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange) { WriteOps.push_back(WriteOp{.Target = Target, .CacheFileOffset = ChunkTarget.CacheFileOffset, .ChunkSize = ChunkTarget.ChunkRawSize}); } TargetStart += ChunkTarget.TargetChunkLocationCount; } std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) { if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex) { return true; } else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex) { return false; } if (Lhs.Target->Offset < Rhs.Target->Offset) { return true; } return false; }); if (!AbortFlag) { BufferedOpenFile SourceFile(LocalFilePath); WriteFileCache OpenFileCache; for (const WriteOp& Op : WriteOps) { if (AbortFlag) { break; } const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex; ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() <= RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]); ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() > 0); const uint32_t RemotePathIndex = RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]; const uint64_t ChunkSize = Op.ChunkSize; CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ChunkSize); ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]); OpenFileCache.WriteToFile( RemoteSequenceIndex, [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) { return GetTempChunkedSequenceFileName( CacheFolderPath, RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); }, ChunkSource, Op.Target->Offset, RemoteContent.RawSizes[RemotePathIndex]); WriteToDiskBytes += ChunkSize; CacheLocalFileBytesRead += ChunkSize; // TODO: This should be the sum of unique chunk sizes? } } if (!AbortFlag) { if (!AbortFlag) { // Write tracking, updating this must be done without any files open (WriteFileCache) for (const WriteOp& Op : WriteOps) { ZEN_TRACE_CPU("UpdateFolder_Copy_VerifyHash"); const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex; if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1) { const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile( GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash))); if (VerifyChunkHash != SequenceRawHash) { throw std::runtime_error( fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash)); } ZEN_TRACE_CPU("UpdateFolder_Copy_rename"); std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash), GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)); } } } ChunkCountWritten += gsl::narrow(CopyData.ChunkTargets.size()); ZEN_DEBUG("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), LocalContent.Paths[LocalPathIndex]); } } } }, Work.DefaultErrorFunction()); } size_t BlockCount = BlockDescriptions.size(); std::atomic BlocksComplete = 0; auto IsBlockNeeded = [&RemoteContent, &RemoteLookup, &RemoteChunkIndexNeedsCopyFromSourceFlags]( const ChunkBlockDescription& BlockDescription) -> bool { for (const IoHash& ChunkHash : BlockDescription.ChunkRawHashes) { if (auto It = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); It != RemoteLookup.ChunkHashToChunkIndex.end()) { const uint32_t RemoteChunkIndex = It->second; if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex]) { return true; } } } return false; }; size_t BlocksNeededCount = 0; for (size_t BlockIndex = 0; BlockIndex < BlockCount; BlockIndex++) { if (Work.IsAborted()) { break; } if (IsBlockNeeded(BlockDescriptions[BlockIndex])) { BlocksNeededCount++; Work.ScheduleWork( NetworkPool, [&, BlockIndex](std::atomic&) { if (!AbortFlag) { ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_Read"); FilteredDownloadedBytesPerSecond.Start(); IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockDescriptions[BlockIndex].BlockHash); if (!BlockBuffer) { throw std::runtime_error(fmt::format("Block {} is missing", BlockDescriptions[BlockIndex].BlockHash)); } BytesDownloaded += BlockBuffer.GetSize(); BlockBytes += BlockBuffer.GetSize(); DownloadedBlocks++; CompositeBuffer Payload = WriteToTempFileIfNeeded(CompositeBuffer(std::move(BlockBuffer)), Path / ZenTempBlockFolderName, BlockDescriptions[BlockIndex].BlockHash); if (!AbortFlag) { Work.ScheduleWork( WritePool, [&, BlockIndex, BlockBuffer = std::move(Payload)](std::atomic&) { if (!AbortFlag) { ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_Write"); FilteredWrittenBytesPerSecond.Start(); IoHash BlockRawHash; uint64_t BlockRawSize; CompressedBuffer CompressedBlockBuffer = CompressedBuffer::FromCompressed(std::move(BlockBuffer), BlockRawHash, BlockRawSize); if (!CompressedBlockBuffer) { throw std::runtime_error(fmt::format("Block {} is not a compressed buffer", BlockDescriptions[BlockIndex].BlockHash)); } if (BlockRawHash != BlockDescriptions[BlockIndex].BlockHash) { throw std::runtime_error(fmt::format("Block {} header has a mismatching raw hash {}", BlockDescriptions[BlockIndex].BlockHash, BlockRawHash)); } CompositeBuffer DecompressedBlockBuffer = CompressedBlockBuffer.DecompressToComposite(); if (!DecompressedBlockBuffer) { throw std::runtime_error(fmt::format("Block {} failed to decompress", BlockDescriptions[BlockIndex].BlockHash)); } ZEN_ASSERT_SLOW(BlockDescriptions[BlockIndex].BlockHash == IoHash::HashBuffer(DecompressedBlockBuffer)); if (!WriteBlockToDisk(CacheFolderPath, RemoteContent, SequenceIndexChunksLeftToWriteCounters, DecompressedBlockBuffer, RemoteLookup, RemoteChunkIndexNeedsCopyFromSourceFlags.data(), ChunkCountWritten, WriteToDiskBytes)) { throw std::runtime_error( fmt::format("Block {} is malformed", BlockDescriptions[BlockIndex].BlockHash)); } BlocksComplete++; } }, [&, BlockIndex](const std::exception& Ex, std::atomic&) { ZEN_ERROR("Failed writing block {}. Reason: {}", BlockDescriptions[BlockIndex].BlockHash, Ex.what()); AbortFlag = true; }); } } }, Work.DefaultErrorFunction()); } else { ZEN_DEBUG("Skipping block {} due to cache reuse", BlockDescriptions[BlockIndex].BlockHash); } } ZEN_TRACE_CPU("HandleChunks_Wait"); Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted, PendingWork); ZEN_ASSERT(ChunkCountToWrite >= ChunkCountWritten.load()); FilteredWrittenBytesPerSecond.Update(WriteToDiskBytes.load()); FilteredDownloadedBytesPerSecond.Update(BytesDownloaded.load()); std::string Details = fmt::format("{}/{} chunks. {}/{} blocks. {} {}bits/s downloaded. {} {}B/s written", ChunkCountWritten.load(), ChunkCountToWrite, BlocksComplete.load(), BlocksNeededCount, NiceBytes(BytesDownloaded.load()), NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8), NiceBytes(WriteToDiskBytes.load()), NiceNum(FilteredWrittenBytesPerSecond.GetCurrent())); WriteProgressBar.UpdateState({.Task = "Writing chunks ", .Details = Details, .TotalCount = gsl::narrow(ChunkCountToWrite), .RemainingCount = gsl::narrow(ChunkCountToWrite - ChunkCountWritten.load())}, false); }); FilteredWrittenBytesPerSecond.Stop(); FilteredDownloadedBytesPerSecond.Stop(); if (AbortFlag) { return; } WriteProgressBar.Finish(); } for (const auto& SequenceIndexChunksLeftToWriteCounter : SequenceIndexChunksLeftToWriteCounters) { ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounter.load() == 0); } eastl::vector> Targets; Targets.reserve(RemoteContent.Paths.size()); for (uint32_t RemotePathIndex = 0; RemotePathIndex < RemoteContent.Paths.size(); RemotePathIndex++) { Targets.push_back(std::make_pair(RemoteContent.RawHashes[RemotePathIndex], RemotePathIndex)); } std::sort(Targets.begin(), Targets.end(), [](const std::pair& Lhs, const std::pair& Rhs) { return Lhs.first < Rhs.first; }); // Move all files we will reuse to cache folder for (uint32_t LocalPathIndex = 0; LocalPathIndex < LocalContent.Paths.size(); LocalPathIndex++) { const IoHash& RawHash = LocalContent.RawHashes[LocalPathIndex]; if (RemoteLookup.RawHashToSequenceIndex.contains(RawHash)) { const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred(); const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash); ZEN_ASSERT_SLOW(std::filesystem::exists(LocalFilePath)); SetFileReadOnly(LocalFilePath, false); std::filesystem::rename(LocalFilePath, CacheFilePath); } } if (WipeTargetFolder) { ZEN_TRACE_CPU("UpdateFolder_WipeTarget"); // Clean target folder ZEN_CONSOLE("Wiping {}", Path); CleanDirectory(Path, DefaultExcludeFolders); } else { ZEN_TRACE_CPU("UpdateFolder_RemoveUnused"); // Remove unused tracked files tsl::robin_map RemotePathToRemoteIndex; RemotePathToRemoteIndex.reserve(RemoteContent.Paths.size()); for (uint32_t RemotePathIndex = 0; RemotePathIndex < RemoteContent.Paths.size(); RemotePathIndex++) { RemotePathToRemoteIndex.insert({RemoteContent.Paths[RemotePathIndex].generic_string(), RemotePathIndex}); } eastl::vector LocalFilesToRemove; for (uint32_t LocalPathIndex = 0; LocalPathIndex < LocalContent.Paths.size(); LocalPathIndex++) { if (!RemotePathToRemoteIndex.contains(LocalContent.Paths[LocalPathIndex].generic_string())) { const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred(); if (std::filesystem::exists(LocalFilePath)) { LocalFilesToRemove.emplace_back(std::move(LocalFilePath)); } } } if (!LocalFilesToRemove.empty()) { ZEN_CONSOLE("Cleaning {} removed files from {}", LocalFilesToRemove.size(), Path); for (const std::filesystem::path& LocalFilePath : LocalFilesToRemove) { SetFileReadOnly(LocalFilePath, false); std::filesystem::remove(LocalFilePath); } } } { WorkerThreadPool& WritePool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // ProgressBar RebuildProgressBar(UsePlainProgress); ParallellWork Work(AbortFlag); OutLocalFolderState.Paths.resize(RemoteContent.Paths.size()); OutLocalFolderState.RawSizes.resize(RemoteContent.Paths.size()); OutLocalFolderState.Attributes.resize(RemoteContent.Paths.size()); OutLocalFolderState.ModificationTicks.resize(RemoteContent.Paths.size()); std::atomic TargetsComplete = 0; size_t TargetOffset = 0; while (TargetOffset < Targets.size()) { if (AbortFlag) { break; } ZEN_TRACE_CPU("UpdateFolder_FinalizeTree"); size_t TargetCount = 1; const IoHash& RawHash = Targets[TargetOffset].first; while (Targets[TargetOffset + TargetCount].first == RawHash) { TargetCount++; } Work.ScheduleWork( WritePool, // GetSyncWorkerPool(),// [&, BaseTargetOffset = TargetOffset, TargetCount](std::atomic&) { if (!AbortFlag) { ZEN_TRACE_CPU("FinalizeTree_Work"); size_t TargetOffset = BaseTargetOffset; const IoHash& RawHash = Targets[TargetOffset].first; const uint32_t FirstTargetPathIndex = Targets[TargetOffset].second; const std::filesystem::path& FirstTargetPath = RemoteContent.Paths[FirstTargetPathIndex]; OutLocalFolderState.Paths[FirstTargetPathIndex] = FirstTargetPath; OutLocalFolderState.RawSizes[FirstTargetPathIndex] = RemoteContent.RawSizes[FirstTargetPathIndex]; const std::filesystem::path FirstTargetFilePath = (Path / FirstTargetPath).make_preferred(); if (RawHash == IoHash::Zero) { if (std::filesystem::exists(FirstTargetFilePath)) { SetFileReadOnly(FirstTargetFilePath, false); } CreateDirectories(FirstTargetFilePath.parent_path()); { BasicFile OutputFile; OutputFile.Open(FirstTargetFilePath, BasicFile::Mode::kTruncate); } } else { ZEN_TRACE_CPU("FinalizeTree_MoveIntoPlace"); const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash); ZEN_ASSERT_SLOW(std::filesystem::exists(CacheFilePath)); CreateDirectories(FirstTargetFilePath.parent_path()); if (std::filesystem::exists(FirstTargetFilePath)) { SetFileReadOnly(FirstTargetFilePath, false); } std::filesystem::rename(CacheFilePath, FirstTargetFilePath); } OutLocalFolderState.Attributes[FirstTargetPathIndex] = RemoteContent.Attributes.empty() ? GetNativeFileAttributes(FirstTargetFilePath) : SetNativeFileAttributes(FirstTargetFilePath, RemoteContent.Platform, RemoteContent.Attributes[FirstTargetPathIndex]); OutLocalFolderState.ModificationTicks[FirstTargetPathIndex] = GetModificationTickFromPath(FirstTargetFilePath); TargetOffset++; TargetsComplete++; while (TargetOffset < (BaseTargetOffset + TargetCount)) { ZEN_TRACE_CPU("FinalizeTree_Copy"); ZEN_ASSERT(Targets[TargetOffset].first == RawHash); ZEN_ASSERT_SLOW(std::filesystem::exists(FirstTargetFilePath)); const uint32_t ExtraTargetPathIndex = Targets[TargetOffset].second; const std::filesystem::path& ExtraTargetPath = RemoteContent.Paths[ExtraTargetPathIndex]; const std::filesystem::path ExtraTargetFilePath = (Path / ExtraTargetPath).make_preferred(); OutLocalFolderState.Paths[ExtraTargetPathIndex] = ExtraTargetPath; OutLocalFolderState.RawSizes[ExtraTargetPathIndex] = RemoteContent.RawSizes[ExtraTargetPathIndex]; CreateDirectories(ExtraTargetFilePath.parent_path()); if (std::filesystem::exists(ExtraTargetFilePath)) { SetFileReadOnly(ExtraTargetFilePath, false); } CopyFile(FirstTargetFilePath, ExtraTargetFilePath, {.EnableClone = false}); OutLocalFolderState.Attributes[ExtraTargetPathIndex] = RemoteContent.Attributes.empty() ? GetNativeFileAttributes(ExtraTargetFilePath) : SetNativeFileAttributes(ExtraTargetFilePath, RemoteContent.Platform, RemoteContent.Attributes[ExtraTargetPathIndex]); OutLocalFolderState.ModificationTicks[ExtraTargetPathIndex] = GetModificationTickFromPath(ExtraTargetFilePath); TargetOffset++; TargetsComplete++; } } }, Work.DefaultErrorFunction()); TargetOffset += TargetCount; } ZEN_TRACE_CPU("FinalizeTree_Wait"); Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted, PendingWork); std::string Details = fmt::format("{}/{} files", TargetsComplete.load(), Targets.size()); RebuildProgressBar.UpdateState({.Task = "Rebuilding state ", .Details = Details, .TotalCount = gsl::narrow(Targets.size()), .RemainingCount = gsl::narrow(Targets.size() - TargetsComplete.load())}, false); }); if (AbortFlag) { return; } RebuildProgressBar.Finish(); } } eastl::vector> ResolveBuildPartNames(BuildStorage& Storage, const Oid& BuildId, const eastl::vector& BuildPartIds, eastl::span BuildPartNames, std::uint64_t& OutPreferredMultipartChunkSize) { eastl::vector> Result; { Stopwatch GetBuildTimer; eastl::vector> AvailableParts; CbObject BuildObject = Storage.GetBuild(BuildId); ZEN_CONSOLE("GetBuild took {}. Payload size: {}", NiceTimeSpanMs(GetBuildTimer.GetElapsedTimeMs()), NiceBytes(BuildObject.GetSize())); CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView(); if (!PartsObject) { throw std::runtime_error("Build object does not have a 'parts' object"); } OutPreferredMultipartChunkSize = BuildObject["chunkSize"sv].AsUInt64(OutPreferredMultipartChunkSize); for (CbFieldView PartView : PartsObject) { const std::string BuildPartName = std::string(PartView.GetName()); const Oid BuildPartId = PartView.AsObjectId(); if (BuildPartId == Oid::Zero) { ExtendableStringBuilder<128> SB; for (CbFieldView ScanPartView : PartsObject) { SB.Append(fmt::format("\n {}: {}", ScanPartView.GetName(), ScanPartView.AsObjectId())); } throw std::runtime_error( fmt::format("Build object parts does not have a '{}' object id{}", BuildPartName, SB.ToView())); } AvailableParts.push_back({BuildPartId, BuildPartName}); } if (BuildPartIds.empty() && BuildPartNames.empty()) { Result = AvailableParts; } else { for (const std::string& BuildPartName : BuildPartNames) { if (auto It = std::find_if(AvailableParts.begin(), AvailableParts.end(), [&BuildPartName](const auto& Part) { return Part.second == BuildPartName; }); It != AvailableParts.end()) { Result.push_back(*It); } else { throw std::runtime_error(fmt::format("Build {} object does not have a part named '{}'", BuildId, BuildPartName)); } } for (const Oid& BuildPartId : BuildPartIds) { if (auto It = std::find_if(AvailableParts.begin(), AvailableParts.end(), [&BuildPartId](const auto& Part) { return Part.first == BuildPartId; }); It != AvailableParts.end()) { Result.push_back(*It); } else { throw std::runtime_error(fmt::format("Build {} object does not have a part with id '{}'", BuildId, BuildPartId)); } } } if (Result.empty()) { throw std::runtime_error(fmt::format("Build object does not have any parts", BuildId)); } } return Result; } ChunkedFolderContent GetRemoteContent(BuildStorage& Storage, const Oid& BuildId, const eastl::vector>& BuildParts, std::unique_ptr& OutChunkController, eastl::vector& OutPartContents, eastl::vector& OutBlockDescriptions, eastl::vector& OutLooseChunkHashes) { ZEN_TRACE_CPU("GetRemoteContent"); Stopwatch GetBuildPartTimer; CbObject BuildPartManifest = Storage.GetBuildPart(BuildId, BuildParts[0].first); ZEN_CONSOLE("GetBuildPart {} ('{}') took {}. Payload size: {}", BuildParts[0].first, BuildParts[0].second, NiceTimeSpanMs(GetBuildPartTimer.GetElapsedTimeMs()), NiceBytes(BuildPartManifest.GetSize())); { CbObjectView Chunker = BuildPartManifest["chunker"sv].AsObjectView(); std::string_view ChunkerName = Chunker["name"sv].AsString(); CbObjectView Parameters = Chunker["parameters"sv].AsObjectView(); OutChunkController = CreateChunkingController(ChunkerName, Parameters); } auto ParseBuildPartManifest = [](BuildStorage& Storage, const Oid& BuildId, const Oid& BuildPartId, CbObject BuildPartManifest, ChunkedFolderContent& OutRemoteContent, eastl::vector& OutBlockDescriptions, eastl::vector& OutLooseChunkHashes) { eastl::vector AbsoluteChunkOrders; eastl::vector LooseChunkRawSizes; eastl::vector BlockRawHashes; ReadBuildContentFromCompactBinary(BuildPartManifest, OutRemoteContent.Platform, OutRemoteContent.Paths, OutRemoteContent.RawHashes, OutRemoteContent.RawSizes, OutRemoteContent.Attributes, OutRemoteContent.ChunkedContent.SequenceRawHashes, OutRemoteContent.ChunkedContent.ChunkCounts, AbsoluteChunkOrders, OutLooseChunkHashes, LooseChunkRawSizes, BlockRawHashes); // TODO: GetBlockDescriptions for all BlockRawHashes in one go - check for local block descriptions when we cache them Stopwatch GetBlockMetadataTimer; OutBlockDescriptions = Storage.GetBlockMetadata(BuildId, BlockRawHashes); ZEN_CONSOLE("GetBlockMetadata for {} took {}. Found {} blocks", BuildPartId, NiceTimeSpanMs(GetBlockMetadataTimer.GetElapsedTimeMs()), OutBlockDescriptions.size()); if (OutBlockDescriptions.size() != BlockRawHashes.size()) { bool AttemptFallback = false; std::string ErrorDescription = fmt::format("All required blocks could not be found, {} blocks does not have metadata in this context.", BlockRawHashes.size() - OutBlockDescriptions.size()); if (AttemptFallback) { ZEN_CONSOLE("{} Attemping fallback options.", ErrorDescription); eastl::vector AugmentedBlockDescriptions; AugmentedBlockDescriptions.reserve(BlockRawHashes.size()); eastl::vector FoundBlocks = Storage.FindBlocks(BuildId); for (const IoHash& BlockHash : BlockRawHashes) { if (auto It = std::find_if( OutBlockDescriptions.begin(), OutBlockDescriptions.end(), [BlockHash](const ChunkBlockDescription& Description) { return Description.BlockHash == BlockHash; }); It != OutBlockDescriptions.end()) { AugmentedBlockDescriptions.emplace_back(std::move(*It)); } else if (auto ListBlocksIt = std::find_if( FoundBlocks.begin(), FoundBlocks.end(), [BlockHash](const ChunkBlockDescription& Description) { return Description.BlockHash == BlockHash; }); ListBlocksIt != FoundBlocks.end()) { ZEN_CONSOLE("Found block {} via context find successfully", BlockHash); AugmentedBlockDescriptions.emplace_back(std::move(*ListBlocksIt)); } else { IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockHash); if (!BlockBuffer) { throw std::runtime_error(fmt::format("Block {} could not be found", BlockHash)); } IoHash BlockRawHash; uint64_t BlockRawSize; CompressedBuffer CompressedBlockBuffer = CompressedBuffer::FromCompressed(SharedBuffer(std::move(BlockBuffer)), BlockRawHash, BlockRawSize); if (!CompressedBlockBuffer) { throw std::runtime_error(fmt::format("Block {} is not a compressed buffer", BlockHash)); } if (BlockRawHash != BlockHash) { throw std::runtime_error( fmt::format("Block {} header has a mismatching raw hash {}", BlockHash, BlockRawHash)); } CompositeBuffer DecompressedBlockBuffer = CompressedBlockBuffer.DecompressToComposite(); if (!DecompressedBlockBuffer) { throw std::runtime_error(fmt::format("Block {} failed to decompress", BlockHash)); } ChunkBlockDescription MissingChunkDescription = GetChunkBlockDescription(DecompressedBlockBuffer.Flatten(), BlockHash); AugmentedBlockDescriptions.emplace_back(std::move(MissingChunkDescription)); } } OutBlockDescriptions.swap(AugmentedBlockDescriptions); } else { throw std::runtime_error(ErrorDescription); } } CalculateLocalChunkOrders(AbsoluteChunkOrders, OutLooseChunkHashes, LooseChunkRawSizes, OutBlockDescriptions, OutRemoteContent.ChunkedContent.ChunkHashes, OutRemoteContent.ChunkedContent.ChunkRawSizes, OutRemoteContent.ChunkedContent.ChunkOrders); }; OutPartContents.resize(1); ParseBuildPartManifest(Storage, BuildId, BuildParts[0].first, BuildPartManifest, OutPartContents[0], OutBlockDescriptions, OutLooseChunkHashes); ChunkedFolderContent RemoteContent; if (BuildParts.size() > 1) { eastl::vector OverlayBlockDescriptions; eastl::vector OverlayLooseChunkHashes; for (size_t PartIndex = 1; PartIndex < BuildParts.size(); PartIndex++) { const Oid& OverlayBuildPartId = BuildParts[PartIndex].first; const std::string& OverlayBuildPartName = BuildParts[PartIndex].second; Stopwatch GetOverlayBuildPartTimer; CbObject OverlayBuildPartManifest = Storage.GetBuildPart(BuildId, OverlayBuildPartId); ZEN_CONSOLE("GetBuildPart {} ('{}') took {}. Payload size: {}", OverlayBuildPartId, OverlayBuildPartName, NiceTimeSpanMs(GetOverlayBuildPartTimer.GetElapsedTimeMs()), NiceBytes(OverlayBuildPartManifest.GetSize())); ChunkedFolderContent OverlayPartContent; eastl::vector OverlayPartBlockDescriptions; eastl::vector OverlayPartLooseChunkHashes; ParseBuildPartManifest(Storage, BuildId, OverlayBuildPartId, OverlayBuildPartManifest, OverlayPartContent, OverlayPartBlockDescriptions, OverlayPartLooseChunkHashes); OutPartContents.push_back(OverlayPartContent); OverlayBlockDescriptions.insert(OverlayBlockDescriptions.end(), OverlayPartBlockDescriptions.begin(), OverlayPartBlockDescriptions.end()); OverlayLooseChunkHashes.insert(OverlayLooseChunkHashes.end(), OverlayPartLooseChunkHashes.begin(), OverlayPartLooseChunkHashes.end()); } RemoteContent = MergeChunkedFolderContents(OutPartContents[0], eastl::span(OutPartContents).subspan(1)); { tsl::robin_set AllBlockHashes; for (const ChunkBlockDescription& Description : OutBlockDescriptions) { AllBlockHashes.insert(Description.BlockHash); } for (const ChunkBlockDescription& Description : OverlayBlockDescriptions) { if (!AllBlockHashes.contains(Description.BlockHash)) { AllBlockHashes.insert(Description.BlockHash); OutBlockDescriptions.push_back(Description); } } } { tsl::robin_set AllLooseChunkHashes(OutLooseChunkHashes.begin(), OutLooseChunkHashes.end()); for (const IoHash& OverlayLooseChunkHash : OverlayLooseChunkHashes) { if (!AllLooseChunkHashes.contains(OverlayLooseChunkHash)) { AllLooseChunkHashes.insert(OverlayLooseChunkHash); OutLooseChunkHashes.push_back(OverlayLooseChunkHash); } } } } else { RemoteContent = OutPartContents[0]; } return RemoteContent; } ChunkedFolderContent GetLocalContent(GetFolderContentStatistics& LocalFolderScanStats, ChunkingStatistics& ChunkingStats, const std::filesystem::path& Path, ChunkingController& ChunkController) { ChunkedFolderContent LocalContent; auto IsAcceptedFolder = [ExcludeFolders = DefaultExcludeFolders](const std::string_view& RelativePath) -> bool { for (const std::string_view& ExcludeFolder : ExcludeFolders) { if (RelativePath.starts_with(ExcludeFolder)) { if (RelativePath.length() == ExcludeFolder.length()) { return false; } else if (RelativePath[ExcludeFolder.length()] == '/') { return false; } } } return true; }; auto IsAcceptedFile = [ExcludeExtensions = DefaultExcludeExtensions](const std::string_view& RelativePath, uint64_t, uint32_t) -> bool { for (const std::string_view& ExcludeExtension : ExcludeExtensions) { if (RelativePath.ends_with(ExcludeExtension)) { return false; } } return true; }; FolderContent CurrentLocalFolderContent = GetFolderContent( LocalFolderScanStats, Path, std::move(IsAcceptedFolder), std::move(IsAcceptedFile), GetMediumWorkerPool(EWorkloadType::Burst), UsePlainProgress ? 5000 : 200, [&](bool, std::ptrdiff_t) { ZEN_DEBUG("Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), Path); }, AbortFlag); if (AbortFlag) { return {}; } FolderContent LocalFolderState; bool ScanContent = true; eastl::vector PathIndexesOufOfDate; if (std::filesystem::is_regular_file(Path / ZenStateFilePath)) { try { Stopwatch ReadStateTimer; CbObject CurrentStateObject = LoadCompactBinaryObject(Path / ZenStateFilePath).Object; if (CurrentStateObject) { Oid CurrentBuildId; eastl::vector SavedBuildPartIds; eastl::vector SavedBuildPartsNames; eastl::vector SavedPartContents; if (ReadStateObject(CurrentStateObject, CurrentBuildId, SavedBuildPartIds, SavedBuildPartsNames, SavedPartContents, LocalFolderState)) { if (!SavedPartContents.empty()) { if (SavedPartContents.size() == 1) { LocalContent = std::move(SavedPartContents[0]); } else { LocalContent = MergeChunkedFolderContents(SavedPartContents[0], eastl::span(SavedPartContents).subspan(1)); } if (!LocalFolderState.AreKnownFilesEqual(CurrentLocalFolderContent)) { const size_t LocaStatePathCount = LocalFolderState.Paths.size(); eastl::vector DeletedPaths; FolderContent UpdatedContent = GetUpdatedContent(LocalFolderState, CurrentLocalFolderContent, DeletedPaths); if (!DeletedPaths.empty()) { LocalContent = DeletePathsFromChunkedContent(LocalContent, DeletedPaths); } ZEN_CONSOLE("Updating state, {} local files deleted and {} local files updated out of {}", DeletedPaths.size(), UpdatedContent.Paths.size(), LocaStatePathCount); if (UpdatedContent.Paths.size() > 0) { uint64_t ByteCountToScan = 0; for (const uint64_t RawSize : UpdatedContent.RawSizes) { ByteCountToScan += RawSize; } ProgressBar ProgressBar(false); FilteredRate FilteredBytesHashed; FilteredBytesHashed.Start(); ChunkedFolderContent UpdatedLocalContent = ChunkFolderContent( ChunkingStats, GetMediumWorkerPool(EWorkloadType::Burst), Path, UpdatedContent, ChunkController, UsePlainProgress ? 5000 : 200, [&](bool, std::ptrdiff_t) { FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load()); std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found", ChunkingStats.FilesProcessed.load(), UpdatedContent.Paths.size(), NiceBytes(ChunkingStats.BytesHashed.load()), NiceBytes(ByteCountToScan), NiceNum(FilteredBytesHashed.GetCurrent()), ChunkingStats.UniqueChunksFound.load(), NiceBytes(ChunkingStats.UniqueBytesFound.load())); ProgressBar.UpdateState({.Task = "Scanning files ", .Details = Details, .TotalCount = ByteCountToScan, .RemainingCount = ByteCountToScan - ChunkingStats.BytesHashed.load()}, false); }, AbortFlag); if (AbortFlag) { return {}; } FilteredBytesHashed.Stop(); ProgressBar.Finish(); LocalContent = MergeChunkedFolderContents(LocalContent, {{UpdatedLocalContent}}); } } else { // Remove files from LocalContent no longer in LocalFolderState tsl::robin_set LocalFolderPaths; LocalFolderPaths.reserve(LocalFolderState.Paths.size()); for (const std::filesystem::path& LocalFolderPath : LocalFolderState.Paths) { LocalFolderPaths.insert(LocalFolderPath.generic_string()); } eastl::vector DeletedPaths; for (const std::filesystem::path& LocalContentPath : LocalContent.Paths) { if (!LocalFolderPaths.contains(LocalContentPath.generic_string())) { DeletedPaths.push_back(LocalContentPath); } } if (!DeletedPaths.empty()) { LocalContent = DeletePathsFromChunkedContent(LocalContent, DeletedPaths); } ZEN_CONSOLE("Using cached local state"); } ZEN_CONSOLE("Read local state in {}", NiceTimeSpanMs(ReadStateTimer.GetElapsedTimeMs())); ScanContent = false; } } } } catch (const std::exception& Ex) { ZEN_CONSOLE("Failed reading state file, falling back to scannning. Reason: {}", Ex.what()); } } if (ScanContent) { uint64_t ByteCountToScan = 0; for (const uint64_t RawSize : CurrentLocalFolderContent.RawSizes) { ByteCountToScan += RawSize; } ProgressBar ProgressBar(false); FilteredRate FilteredBytesHashed; FilteredBytesHashed.Start(); ChunkedFolderContent UpdatedLocalContent = ChunkFolderContent( ChunkingStats, GetMediumWorkerPool(EWorkloadType::Burst), Path, CurrentLocalFolderContent, ChunkController, UsePlainProgress ? 5000 : 200, [&](bool, std::ptrdiff_t) { FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load()); std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found", ChunkingStats.FilesProcessed.load(), CurrentLocalFolderContent.Paths.size(), NiceBytes(ChunkingStats.BytesHashed.load()), ByteCountToScan, NiceNum(FilteredBytesHashed.GetCurrent()), ChunkingStats.UniqueChunksFound.load(), NiceBytes(ChunkingStats.UniqueBytesFound.load())); ProgressBar.UpdateState({.Task = "Scanning files ", .Details = Details, .TotalCount = ByteCountToScan, .RemainingCount = (ByteCountToScan - ChunkingStats.BytesHashed.load())}, false); }, AbortFlag); if (AbortFlag) { return {}; } FilteredBytesHashed.Stop(); ProgressBar.Finish(); } return LocalContent; } void DownloadFolder(BuildStorage& Storage, const Oid& BuildId, const eastl::vector& BuildPartIds, eastl::span BuildPartNames, const std::filesystem::path& Path, bool AllowMultiparts, bool WipeTargetFolder, bool PostDownloadVerify) { ZEN_TRACE_CPU("DownloadFolder"); Stopwatch DownloadTimer; const std::filesystem::path ZenTempFolder = Path / ZenTempFolderName; CreateDirectories(ZenTempFolder); auto _ = MakeGuard([&]() { CleanDirectory(ZenTempFolder, {}); std::filesystem::remove(ZenTempFolder); }); CreateDirectories(Path / ZenTempBlockFolderName); CreateDirectories(Path / ZenTempChunkFolderName); // TODO: Don't clear this - pick up files -> chunks to use CreateDirectories(Path / ZenTempCacheFolderName); // TODO: Don't clear this - pick up files and use as sequences (non .tmp extension) and // delete .tmp (maybe?) - chunk them? How do we know the file is worth chunking? std::uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u; eastl::vector> AllBuildParts = ResolveBuildPartNames(Storage, BuildId, BuildPartIds, BuildPartNames, PreferredMultipartChunkSize); eastl::vector PartContents; std::unique_ptr ChunkController; eastl::vector BlockDescriptions; eastl::vector LooseChunkHashes; ChunkedFolderContent RemoteContent = GetRemoteContent(Storage, BuildId, AllBuildParts, ChunkController, PartContents, BlockDescriptions, LooseChunkHashes); const std::uint64_t LargeAttachmentSize = AllowMultiparts ? PreferredMultipartChunkSize * 4u : (std::uint64_t)-1; if (!ChunkController) { ZEN_CONSOLE("Warning: Unspecified chunking algorith, using default"); ChunkController = CreateBasicChunkingController(); } GetFolderContentStatistics LocalFolderScanStats; ChunkingStatistics ChunkingStats; ChunkedFolderContent LocalContent; if (std::filesystem::is_directory(Path)) { if (!WipeTargetFolder) { LocalContent = GetLocalContent(LocalFolderScanStats, ChunkingStats, Path, *ChunkController); } } else { CreateDirectories(Path); } if (AbortFlag) { return; } auto CompareContent = [](const ChunkedFolderContent& Lhs, const ChunkedFolderContent& Rhs) { tsl::robin_map RhsPathToIndex; const size_t RhsPathCount = Rhs.Paths.size(); RhsPathToIndex.reserve(RhsPathCount); for (size_t RhsPathIndex = 0; RhsPathIndex < RhsPathCount; RhsPathIndex++) { RhsPathToIndex.insert({Rhs.Paths[RhsPathIndex].generic_string(), RhsPathIndex}); } const size_t LhsPathCount = Lhs.Paths.size(); for (size_t LhsPathIndex = 0; LhsPathIndex < LhsPathCount; LhsPathIndex++) { if (auto It = RhsPathToIndex.find(Lhs.Paths[LhsPathIndex].generic_string()); It != RhsPathToIndex.end()) { const size_t RhsPathIndex = It->second; if ((Lhs.RawHashes[LhsPathIndex] != Rhs.RawHashes[RhsPathIndex]) || (!FolderContent::AreFileAttributesEqual(Lhs.Attributes[LhsPathIndex], Rhs.Attributes[RhsPathIndex]))) { return false; } } else { return false; } } tsl::robin_set LhsPathExists; LhsPathExists.reserve(LhsPathCount); for (size_t LhsPathIndex = 0; LhsPathIndex < LhsPathCount; LhsPathIndex++) { LhsPathExists.insert({Lhs.Paths[LhsPathIndex].generic_string()}); } for (size_t RhsPathIndex = 0; RhsPathIndex < RhsPathCount; RhsPathIndex++) { if (!LhsPathExists.contains(Rhs.Paths[RhsPathIndex].generic_string())) { return false; } } return true; }; if (CompareContent(RemoteContent, LocalContent)) { ZEN_CONSOLE("Local state is identical to build to download. All done. Completed in {}.", NiceTimeSpanMs(DownloadTimer.GetElapsedTimeMs())); } else { ExtendableStringBuilder<128> SB; for (const std::pair& BuildPart : AllBuildParts) { SB.Append(fmt::format(" {} ({})", BuildPart.second, BuildPart.first)); } ZEN_CONSOLE("Downloading build {}, parts:{}", BuildId, SB.ToView()); FolderContent LocalFolderState; UpdateFolder(Storage, BuildId, Path, LargeAttachmentSize, PreferredMultipartChunkSize, LocalContent, RemoteContent, BlockDescriptions, LooseChunkHashes, WipeTargetFolder, LocalFolderState); if (!AbortFlag) { VerifyFolder(RemoteContent, Path, PostDownloadVerify); Stopwatch WriteStateTimer; CbObject StateObject = CreateStateObject(BuildId, AllBuildParts, PartContents, LocalFolderState); CreateDirectories((Path / ZenStateFilePath).parent_path()); TemporaryFile::SafeWriteFile(Path / ZenStateFilePath, StateObject.GetView()); ZEN_CONSOLE("Wrote local state in {}", NiceTimeSpanMs(WriteStateTimer.GetElapsedTimeMs())); #if 0 ExtendableStringBuilder<1024> SB; CompactBinaryToJson(StateObject, SB); WriteFile(Path / ZenStateFileJsonPath, IoBuffer(IoBuffer::Wrap, SB.Data(), SB.Size())); #endif // 0 ZEN_CONSOLE("Downloaded build in {}.", NiceTimeSpanMs(DownloadTimer.GetElapsedTimeMs())); } } } void DiffFolders(const std::filesystem::path& BasePath, const std::filesystem::path& ComparePath, bool OnlyChunked) { ChunkedFolderContent BaseFolderContent; ChunkedFolderContent CompareFolderContent; { std::unique_ptr ChunkController = CreateBasicChunkingController(); eastl::vector ExcludeExtensions = DefaultExcludeExtensions; if (OnlyChunked) { ExcludeExtensions.insert(ExcludeExtensions.end(), DefaultChunkingExcludeExtensions.begin(), DefaultChunkingExcludeExtensions.end()); } auto IsAcceptedFolder = [ExcludeFolders = DefaultExcludeFolders](const std::string_view& RelativePath) -> bool { for (const std::string_view& ExcludeFolder : ExcludeFolders) { if (RelativePath.starts_with(ExcludeFolder)) { if (RelativePath.length() == ExcludeFolder.length()) { return false; } else if (RelativePath[ExcludeFolder.length()] == '/') { return false; } } } return true; }; auto IsAcceptedFile = [ExcludeExtensions](const std::string_view& RelativePath, uint64_t, uint32_t) -> bool { for (const std::string_view& ExcludeExtension : ExcludeExtensions) { if (RelativePath.ends_with(ExcludeExtension)) { return false; } } return true; }; GetFolderContentStatistics BaseGetFolderContentStats; ChunkingStatistics BaseChunkingStats; BaseFolderContent = ScanAndChunkFolder(BaseGetFolderContentStats, BaseChunkingStats, BasePath, IsAcceptedFolder, IsAcceptedFile, *ChunkController); if (AbortFlag) { return; } GetFolderContentStatistics CompareGetFolderContentStats; ChunkingStatistics CompareChunkingStats; CompareFolderContent = ScanAndChunkFolder(CompareGetFolderContentStats, CompareChunkingStats, ComparePath, IsAcceptedFolder, IsAcceptedFile, *ChunkController); if (AbortFlag) { return; } } eastl::vector AddedHashes; eastl::vector RemovedHashes; uint64_t RemovedSize = 0; uint64_t AddedSize = 0; tsl::robin_map BaseRawHashLookup; for (size_t PathIndex = 0; PathIndex < BaseFolderContent.RawHashes.size(); PathIndex++) { const IoHash& RawHash = BaseFolderContent.RawHashes[PathIndex]; BaseRawHashLookup.insert_or_assign(RawHash, PathIndex); } tsl::robin_map CompareRawHashLookup; for (size_t PathIndex = 0; PathIndex < CompareFolderContent.RawHashes.size(); PathIndex++) { const IoHash& RawHash = CompareFolderContent.RawHashes[PathIndex]; if (!BaseRawHashLookup.contains(RawHash)) { AddedHashes.push_back(RawHash); AddedSize += CompareFolderContent.RawSizes[PathIndex]; } CompareRawHashLookup.insert_or_assign(RawHash, PathIndex); } for (uint32_t PathIndex = 0; PathIndex < BaseFolderContent.Paths.size(); PathIndex++) { const IoHash& RawHash = BaseFolderContent.RawHashes[PathIndex]; if (!CompareRawHashLookup.contains(RawHash)) { RemovedHashes.push_back(RawHash); RemovedSize += BaseFolderContent.RawSizes[PathIndex]; } } uint64_t BaseTotalRawSize = 0; for (uint32_t PathIndex = 0; PathIndex < BaseFolderContent.Paths.size(); PathIndex++) { BaseTotalRawSize += BaseFolderContent.RawSizes[PathIndex]; } double KeptPercent = BaseTotalRawSize > 0 ? (100.0 * (BaseTotalRawSize - RemovedSize)) / BaseTotalRawSize : 0; ZEN_CONSOLE("{} ({}) files removed, {} ({}) files added, {} ({} {:.1f}%) files kept", RemovedHashes.size(), NiceBytes(RemovedSize), AddedHashes.size(), NiceBytes(AddedSize), BaseFolderContent.Paths.size() - RemovedHashes.size(), NiceBytes(BaseTotalRawSize - RemovedSize), KeptPercent); uint64_t CompareTotalRawSize = 0; uint64_t FoundChunkCount = 0; uint64_t FoundChunkSize = 0; uint64_t NewChunkCount = 0; uint64_t NewChunkSize = 0; const ChunkedContentLookup BaseFolderLookup = BuildChunkedContentLookup(BaseFolderContent); for (uint32_t ChunkIndex = 0; ChunkIndex < CompareFolderContent.ChunkedContent.ChunkHashes.size(); ChunkIndex++) { const IoHash& ChunkHash = CompareFolderContent.ChunkedContent.ChunkHashes[ChunkIndex]; if (BaseFolderLookup.ChunkHashToChunkIndex.contains(ChunkHash)) { FoundChunkCount++; FoundChunkSize += CompareFolderContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; } else { NewChunkCount++; NewChunkSize += CompareFolderContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; } CompareTotalRawSize += CompareFolderContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; } double FoundPercent = CompareTotalRawSize > 0 ? (100.0 * FoundChunkSize) / CompareTotalRawSize : 0; double NewPercent = CompareTotalRawSize > 0 ? (100.0 * NewChunkSize) / CompareTotalRawSize : 0; ZEN_CONSOLE("Found {} ({} {:.1f}%) out of {} ({}) chunks in {} ({}) base chunks. Added {} ({} {:.1f}%) chunks.", FoundChunkCount, NiceBytes(FoundChunkSize), FoundPercent, CompareFolderContent.ChunkedContent.ChunkHashes.size(), NiceBytes(CompareTotalRawSize), BaseFolderContent.ChunkedContent.ChunkHashes.size(), NiceBytes(BaseTotalRawSize), NewChunkCount, NiceBytes(NewChunkSize), NewPercent); } } // namespace ////////////////////////////////////////////////////////////////////////////////////////////////////// BuildsCommand::BuildsCommand() { m_Options.add_options()("h,help", "Print help"); auto AddAuthOptions = [this](cxxopts::Options& Ops) { Ops.add_option("", "", "system-dir", "Specify system root", cxxopts::value(m_SystemRootDir), ""); // Direct access token (may expire) Ops.add_option("auth-token", "", "access-token", "Cloud/Builds Storage access token", cxxopts::value(m_AccessToken), ""); Ops.add_option("auth-token", "", "access-token-env", "Name of environment variable that holds the cloud/builds Storage access token", cxxopts::value(m_AccessTokenEnv)->default_value(DefaultAccessTokenEnvVariableName), ""); Ops.add_option("auth-token", "", "access-token-path", "Path to json file that holds the cloud/builds Storage access token", cxxopts::value(m_AccessTokenPath), ""); // Auth manager token encryption Ops.add_option("security", "", "encryption-aes-key", "256 bit AES encryption key", cxxopts::value(m_EncryptionKey), ""); Ops.add_option("security", "", "encryption-aes-iv", "128 bit AES encryption initialization vector", cxxopts::value(m_EncryptionIV), ""); // OpenId acccess token Ops.add_option("openid", "", "openid-provider-name", "Open ID provider name", cxxopts::value(m_OpenIdProviderName), "Default"); Ops.add_option("openid", "", "openid-provider-url", "Open ID provider url", cxxopts::value(m_OpenIdProviderUrl), ""); Ops.add_option("openid", "", "openid-client-id", "Open ID client id", cxxopts::value(m_OpenIdClientId), ""); Ops.add_option("openid", "", "openid-refresh-token", "Open ID refresh token", cxxopts::value(m_OpenIdRefreshToken), ""); // OAuth acccess token Ops.add_option("oauth", "", "oauth-url", "OAuth provier url", cxxopts::value(m_OAuthUrl)->default_value(""), ""); Ops.add_option("oauth", "", "oauth-clientid", "OAuth client id", cxxopts::value(m_OAuthClientId)->default_value(""), ""); Ops.add_option("oauth", "", "oauth-clientsecret", "OAuth client secret", cxxopts::value(m_OAuthClientSecret)->default_value(""), ""); }; auto AddCloudOptions = [this, &AddAuthOptions](cxxopts::Options& Ops) { AddAuthOptions(Ops); Ops.add_option("cloud build", "", "url", "Cloud Builds URL", cxxopts::value(m_BuildsUrl), ""); Ops.add_option("cloud build", "", "assume-http2", "Assume that the builds endpoint is a HTTP/2 endpoint skipping HTTP/1.1 upgrade handshake", cxxopts::value(m_AssumeHttp2), ""); Ops.add_option("cloud build", "", "namespace", "Builds Storage namespace", cxxopts::value(m_Namespace), ""); Ops.add_option("cloud build", "", "bucket", "Builds Storage bucket", cxxopts::value(m_Bucket), ""); }; auto AddFileOptions = [this](cxxopts::Options& Ops) { Ops.add_option("filestorage", "", "storage-path", "Builds Storage Path", cxxopts::value(m_StoragePath), ""); Ops.add_option("filestorage", "", "json-metadata", "Write build, part and block metadata as .json files in addition to .cb files", cxxopts::value(m_WriteMetadataAsJson), ""); }; auto AddOutputOptions = [this](cxxopts::Options& Ops) { Ops.add_option("output", "", "plain-progress", "Show progress using plain output", cxxopts::value(m_PlainProgress), ""); Ops.add_option("output", "", "verbose", "Enable verbose console output", cxxopts::value(m_Verbose), ""); }; m_Options.add_option("", "v", "verb", "Verb for build - list, upload, download, diff", cxxopts::value(m_Verb), ""); m_Options.parse_positional({"verb"}); m_Options.positional_help("verb"); // list AddCloudOptions(m_ListOptions); AddFileOptions(m_ListOptions); AddOutputOptions(m_ListOptions); m_ListOptions.add_options()("h,help", "Print help"); // upload AddCloudOptions(m_UploadOptions); AddFileOptions(m_UploadOptions); AddOutputOptions(m_UploadOptions); m_UploadOptions.add_options()("h,help", "Print help"); m_UploadOptions.add_option("", "l", "local-path", "Root file system folder for build", cxxopts::value(m_Path), ""); m_UploadOptions.add_option("", "", "create-build", "Set to true to create the containing build, if unset a builds-id must be given and the build already exist", cxxopts::value(m_CreateBuild), ""); m_UploadOptions.add_option("", "", "build-id", "Build Id", cxxopts::value(m_BuildId), ""); m_UploadOptions.add_option("", "", "build-part-id", "Build part Id, if not given it will be auto generated", cxxopts::value(m_BuildPartId), ""); m_UploadOptions.add_option("", "", "build-part-name", "Name of the build part, if not given it will be be named after the directory name at end of local-path", cxxopts::value(m_BuildPartName), ""); m_UploadOptions.add_option("", "", "metadata-path", "Path to json file that holds the metadata for the build. Requires the create-build option to be set", cxxopts::value(m_BuildMetadataPath), ""); m_UploadOptions.add_option( "", "", "metadata", "Key-value pairs separated by ';' with build meta data. (key1=value1;key2=value2). Requires the create-build option to be set", cxxopts::value(m_BuildMetadata), ""); m_UploadOptions.add_option("", "", "clean", "Ignore existing blocks", cxxopts::value(m_Clean), ""); m_UploadOptions.add_option("", "", "block-min-reuse", "Percent of an existing block that must be relevant for it to be resused. Defaults to 85.", cxxopts::value(m_BlockReuseMinPercentLimit), ""); m_UploadOptions.add_option("", "", "allow-multipart", "Allow large attachments to be transfered using multipart protocol. Defaults to true.", cxxopts::value(m_AllowMultiparts), ""); m_UploadOptions.add_option("", "", "manifest-path", "Path to a text file with one line of [TAB] per file to include.", cxxopts::value(m_ManifestPath), ""); m_UploadOptions .add_option("", "", "verify", "Enable post upload verify of all uploaded data", cxxopts::value(m_PostUploadVerify), ""); m_UploadOptions.parse_positional({"local-path", "build-id"}); m_UploadOptions.positional_help("local-path build-id"); // download AddCloudOptions(m_DownloadOptions); AddFileOptions(m_DownloadOptions); AddOutputOptions(m_DownloadOptions); m_DownloadOptions.add_options()("h,help", "Print help"); m_DownloadOptions.add_option("", "l", "local-path", "Root file system folder for build", cxxopts::value(m_Path), ""); m_DownloadOptions.add_option("", "", "build-id", "Build Id", cxxopts::value(m_BuildId), ""); m_DownloadOptions.add_option( "", "", "build-part-id", "Build part Ids list separated by ',', if no build-part-ids or build-part-names are given all parts will be downloaded", cxxopts::value(m_BuildPartIds), ""); m_DownloadOptions.add_option("", "", "build-part-name", "Name of the build parts list separated by ',', if no build-part-ids or build-part-names are given " "all parts will be downloaded", cxxopts::value(m_BuildPartNames), ""); m_DownloadOptions .add_option("", "", "clean", "Delete all data in target folder before downloading", cxxopts::value(m_Clean), ""); m_DownloadOptions.add_option("", "", "allow-multipart", "Allow large attachments to be transfered using multipart protocol. Defaults to true.", cxxopts::value(m_AllowMultiparts), ""); m_DownloadOptions .add_option("", "", "verify", "Enable post download verify of all tracked files", cxxopts::value(m_PostDownloadVerify), ""); m_DownloadOptions.parse_positional({"local-path", "build-id", "build-part-name"}); m_DownloadOptions.positional_help("local-path build-id build-part-name"); AddOutputOptions(m_DiffOptions); m_DiffOptions.add_options()("h,help", "Print help"); m_DiffOptions.add_option("", "l", "local-path", "Root file system folder used as base", cxxopts::value(m_Path), ""); m_DiffOptions.add_option("", "c", "compare-path", "Root file system folder used as diff", cxxopts::value(m_DiffPath), ""); m_DiffOptions.add_option("", "", "only-chunked", "Skip files from diff summation that are not processed with chunking", cxxopts::value(m_OnlyChunked), ""); m_DiffOptions.parse_positional({"local-path", "compare-path"}); m_DiffOptions.positional_help("local-path compare-path"); AddCloudOptions(m_TestOptions); AddFileOptions(m_TestOptions); AddOutputOptions(m_TestOptions); m_TestOptions.add_options()("h,help", "Print help"); m_TestOptions.add_option("", "l", "local-path", "Root file system folder used as base", cxxopts::value(m_Path), ""); m_TestOptions.parse_positional({"local-path"}); m_TestOptions.positional_help("local-path"); AddCloudOptions(m_FetchBlobOptions); AddFileOptions(m_FetchBlobOptions); AddOutputOptions(m_FetchBlobOptions); m_FetchBlobOptions.add_option("", "", "build-id", "Build Id", cxxopts::value(m_BuildId), ""); m_FetchBlobOptions .add_option("", "", "blob-hash", "IoHash in hex form identifying the blob to download", cxxopts::value(m_BlobHash), ""); m_FetchBlobOptions.parse_positional({"build-id", "blob-hash"}); m_FetchBlobOptions.positional_help("build-id blob-hash"); AddCloudOptions(m_ValidateBuildPartOptions); AddFileOptions(m_ValidateBuildPartOptions); AddOutputOptions(m_ValidateBuildPartOptions); m_ValidateBuildPartOptions.add_option("", "", "build-id", "Build Id", cxxopts::value(m_BuildId), ""); m_ValidateBuildPartOptions.add_option("", "", "build-part-id", "Build part Id, if not given it will be auto generated", cxxopts::value(m_BuildPartId), ""); m_ValidateBuildPartOptions.add_option( "", "", "build-part-name", "Name of the build part, if not given it will be be named after the directory name at end of local-path", cxxopts::value(m_BuildPartName), ""); m_ValidateBuildPartOptions.parse_positional({"build-id", "build-part-id"}); m_ValidateBuildPartOptions.positional_help("build-id build-part-id"); } BuildsCommand::~BuildsCommand() = default; int BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { ZEN_UNUSED(GlobalOptions); signal(SIGINT, SignalCallbackHandler); #if ZEN_PLATFORM_WINDOWS signal(SIGBREAK, SignalCallbackHandler); #endif // ZEN_PLATFORM_WINDOWS using namespace std::literals; eastl::vector SubCommandArguments; cxxopts::Options* SubOption = nullptr; int ParentCommandArgCount = GetSubCommand(m_Options, argc, argv, m_SubCommands, SubOption, SubCommandArguments); if (!ParseOptions(ParentCommandArgCount, argv)) { return 0; } if (SubOption == nullptr) { throw zen::OptionParseException("command verb is missing"); } if (!ParseOptions(*SubOption, gsl::narrow(SubCommandArguments.size()), SubCommandArguments.data())) { return 0; } auto ParseStorageOptions = [&]() { if (!m_BuildsUrl.empty()) { if (!m_StoragePath.empty()) { throw zen::OptionParseException(fmt::format("url is not compatible with the storage-path option\n{}", m_Options.help())); } if (m_Namespace.empty() || m_Bucket.empty()) { throw zen::OptionParseException( fmt::format("namespace and bucket options are required for url option\n{}", m_Options.help())); } } }; std::unique_ptr Auth; HttpClientSettings ClientSettings{.AssumeHttp2 = m_AssumeHttp2, .AllowResume = true, .RetryCount = 2}; auto CreateAuthMgr = [&]() { if (!Auth) { std::filesystem::path DataRoot = m_SystemRootDir.empty() ? PickDefaultSystemRootDirectory() : m_SystemRootDir; if (m_EncryptionKey.empty()) { m_EncryptionKey = "abcdefghijklmnopqrstuvxyz0123456"; ZEN_CONSOLE("Warning: Using default encryption key"); } if (m_EncryptionIV.empty()) { m_EncryptionIV = "0123456789abcdef"; ZEN_CONSOLE("Warning: Using default encryption initialization vector"); } AuthConfig AuthMgrConfig = {.RootDirectory = DataRoot / "auth", .EncryptionKey = AesKey256Bit::FromString(m_EncryptionKey), .EncryptionIV = AesIV128Bit::FromString(m_EncryptionIV)}; if (!AuthMgrConfig.EncryptionKey.IsValid()) { throw zen::OptionParseException("Invalid AES encryption key"); } if (!AuthMgrConfig.EncryptionIV.IsValid()) { throw zen::OptionParseException("Invalid AES initialization vector"); } Auth = AuthMgr::Create(AuthMgrConfig); } }; auto ParseAuthOptions = [&]() { if (!m_OpenIdProviderUrl.empty() && !m_OpenIdClientId.empty()) { CreateAuthMgr(); std::string ProviderName = m_OpenIdProviderName.empty() ? "Default" : m_OpenIdProviderName; Auth->AddOpenIdProvider({.Name = ProviderName, .Url = m_OpenIdProviderUrl, .ClientId = m_OpenIdClientId}); if (!m_OpenIdRefreshToken.empty()) { Auth->AddOpenIdToken({.ProviderName = ProviderName, .RefreshToken = m_OpenIdRefreshToken}); } } if (!m_AccessToken.empty()) { ClientSettings.AccessTokenProvider = httpclientauth::CreateFromStaticToken(m_AccessToken); } else if (!m_AccessTokenPath.empty()) { std::string ResolvedAccessToken = ReadAccessTokenFromFile(m_AccessTokenPath); if (!ResolvedAccessToken.empty()) { ClientSettings.AccessTokenProvider = httpclientauth::CreateFromStaticToken(ResolvedAccessToken); } } else if (!m_AccessTokenEnv.empty()) { std::string ResolvedAccessToken = GetEnvVariable(m_AccessTokenEnv); if (!ResolvedAccessToken.empty()) { ClientSettings.AccessTokenProvider = httpclientauth::CreateFromStaticToken(ResolvedAccessToken); } } else if (!m_OAuthUrl.empty()) { ClientSettings.AccessTokenProvider = httpclientauth::CreateFromOAuthClientCredentials( {.Url = m_OAuthUrl, .ClientId = m_OAuthClientId, .ClientSecret = m_OAuthClientSecret}); } else if (!m_OpenIdProviderName.empty()) { CreateAuthMgr(); ClientSettings.AccessTokenProvider = httpclientauth::CreateFromOpenIdProvider(*Auth, m_OpenIdProviderName); } else { CreateAuthMgr(); ClientSettings.AccessTokenProvider = httpclientauth::CreateFromDefaultOpenIdProvider(*Auth); } if (!m_BuildsUrl.empty() && !ClientSettings.AccessTokenProvider) { ZEN_CONSOLE("Warning: No auth provider given, attempting operation without credentials."); } }; auto ParseOutputOptions = [&]() { IsVerbose = m_Verbose; UsePlainProgress = IsVerbose || m_PlainProgress; }; ParseOutputOptions(); try { if (SubOption == &m_ListOptions) { ParseStorageOptions(); ParseAuthOptions(); HttpClient Http(m_BuildsUrl, ClientSettings); CbObjectWriter QueryWriter; QueryWriter.BeginObject("query"); { // QueryWriter.BeginObject("platform"); // { // QueryWriter.AddString("$eq", "Windows"); // } // QueryWriter.EndObject(); // changelist } QueryWriter.EndObject(); // query BuildStorage::Statistics StorageStats; std::unique_ptr Storage; if (!m_BuildsUrl.empty()) { ZEN_CONSOLE("Querying builds in cloud endpoint '{}'. SessionId: '{}'. Namespace '{}', Bucket '{}'", m_BuildsUrl, Http.GetSessionId(), m_Namespace, m_Bucket); Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, std::filesystem::path{}); } else if (!m_StoragePath.empty()) { ZEN_CONSOLE("Querying builds in folder '{}'.", m_StoragePath); Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); } else { throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); } CbObject Response = Storage->ListBuilds(QueryWriter.Save()); ExtendableStringBuilder<1024> SB; CompactBinaryToJson(Response.GetView(), SB); ZEN_CONSOLE("{}", SB.ToView()); return 0; } if (SubOption == &m_UploadOptions) { ParseStorageOptions(); ParseAuthOptions(); HttpClient Http(m_BuildsUrl, ClientSettings); if (m_Path.empty()) { throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_UploadOptions.help())); } if (m_CreateBuild) { if (m_BuildMetadataPath.empty() && m_BuildMetadata.empty()) { throw zen::OptionParseException(fmt::format("Options for builds target are missing\n{}", m_UploadOptions.help())); } if (!m_BuildMetadataPath.empty() && !m_BuildMetadata.empty()) { throw zen::OptionParseException(fmt::format("Conflicting options for builds target\n{}", m_UploadOptions.help())); } } else { if (!m_BuildMetadataPath.empty()) { throw zen::OptionParseException( fmt::format("metadata-path option is only valid if creating a build\n{}", m_UploadOptions.help())); } if (!m_BuildMetadata.empty()) { throw zen::OptionParseException( fmt::format("metadata option is only valid if creating a build\n{}", m_UploadOptions.help())); } } if (m_BuildPartName.empty()) { m_BuildPartName = m_Path.filename().string(); } const bool GeneratedBuildId = m_BuildId.empty(); if (GeneratedBuildId) { m_BuildId = Oid::NewOid().ToString(); } else if (m_BuildId.length() != Oid::StringLength) { throw zen::OptionParseException(fmt::format("Invalid build id\n{}", m_UploadOptions.help())); } else if (Oid::FromHexString(m_BuildId) == Oid::Zero) { throw zen::OptionParseException(fmt::format("Invalid build id\n{}", m_UploadOptions.help())); } const bool GeneratedBuildPartId = m_BuildPartId.empty(); if (GeneratedBuildPartId) { m_BuildPartId = Oid::NewOid().ToString(); } else if (m_BuildPartId.length() != Oid::StringLength) { throw zen::OptionParseException(fmt::format("Invalid build id\n{}", m_UploadOptions.help())); } else if (Oid::FromHexString(m_BuildPartId) == Oid::Zero) { throw zen::OptionParseException(fmt::format("Invalid build part id\n{}", m_UploadOptions.help())); } BuildStorage::Statistics StorageStats; const Oid BuildId = Oid::FromHexString(m_BuildId); const Oid BuildPartId = Oid::FromHexString(m_BuildPartId); std::unique_ptr Storage; std::string StorageName; if (!m_BuildsUrl.empty()) { ZEN_CONSOLE("Uploading '{}' from '{}' to cloud endpoint '{}'. SessionId: '{}'. Namespace '{}', Bucket '{}', {}BuildId '{}'", m_BuildPartName, m_Path, m_BuildsUrl, Http.GetSessionId(), m_Namespace, m_Bucket, GeneratedBuildId ? "Generated " : "", BuildId); CreateDirectories(m_Path / ZenTempStorageFolderName); Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName); StorageName = "Cloud DDC"; } else if (!m_StoragePath.empty()) { ZEN_CONSOLE("Uploading '{}' from '{}' to folder '{}'. {}BuildId '{}'", m_BuildPartName, m_Path, m_StoragePath, GeneratedBuildId ? "Generated " : "", BuildId); Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, m_WriteMetadataAsJson, DefaultLatency, DefaultDelayPerKBSec); StorageName = fmt::format("Disk {}", m_StoragePath.stem()); } else { throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); } CbObject MetaData; if (m_CreateBuild) { if (!m_BuildMetadataPath.empty()) { std::filesystem::path MetadataPath(m_BuildMetadataPath); IoBuffer MetaDataJson = ReadFile(MetadataPath).Flatten(); std::string_view Json(reinterpret_cast(MetaDataJson.GetData()), MetaDataJson.GetSize()); std::string JsonError; MetaData = LoadCompactBinaryFromJson(Json, JsonError).AsObject(); if (!JsonError.empty()) { throw std::runtime_error( fmt::format("build metadata file '{}' is malformed. Reason: '{}'", m_BuildMetadataPath, JsonError)); } } if (!m_BuildMetadata.empty()) { CbObjectWriter MetaDataWriter(1024); ForEachStrTok(m_BuildMetadata, ';', [&](std::string_view Pair) { size_t SplitPos = Pair.find('='); if (SplitPos == std::string::npos || SplitPos == 0) { throw std::runtime_error(fmt::format("build metadata key-value pair '{}' is malformed", Pair)); } MetaDataWriter.AddString(Pair.substr(0, SplitPos), Pair.substr(SplitPos + 1)); return true; }); MetaData = MetaDataWriter.Save(); } } UploadFolder(*Storage, BuildId, BuildPartId, m_BuildPartName, m_Path, m_ManifestPath, m_BlockReuseMinPercentLimit, m_AllowMultiparts, MetaData, m_CreateBuild, m_Clean, m_PostUploadVerify); if (false) { ZEN_CONSOLE( "{}:\n" "Read: {}\n" "Write: {}\n" "Requests: {}\n" "Avg Request Time: {}\n" "Avg I/O Time: {}", StorageName, NiceBytes(StorageStats.TotalBytesRead.load()), NiceBytes(StorageStats.TotalBytesWritten.load()), StorageStats.TotalRequestCount.load(), StorageStats.TotalExecutionTimeUs.load() > 0 ? NiceTimeSpanMs(StorageStats.TotalExecutionTimeUs.load() / 1000 / StorageStats.TotalRequestCount.load()) : 0, StorageStats.TotalRequestCount.load() > 0 ? NiceTimeSpanMs(StorageStats.TotalRequestTimeUs.load() / 1000 / StorageStats.TotalRequestCount.load()) : 0); } return AbortFlag ? 11 : 0; } if (SubOption == &m_DownloadOptions) { ParseStorageOptions(); ParseAuthOptions(); HttpClient Http(m_BuildsUrl, ClientSettings); if (m_Path.empty()) { throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_DownloadOptions.help())); } if (m_BuildId.empty()) { throw zen::OptionParseException(fmt::format("build-id is required\n{}", m_DownloadOptions.help())); } Oid BuildId = Oid::TryFromHexString(m_BuildId); if (BuildId == Oid::Zero) { throw zen::OptionParseException(fmt::format("build-id is invalid\n{}", m_DownloadOptions.help())); } if (!m_BuildPartName.empty() && !m_BuildPartId.empty()) { throw zen::OptionParseException(fmt::format("build-part-id conflicts with build-part-name\n{}", m_DownloadOptions.help())); } eastl::vector BuildPartIds; for (const std::string& BuildPartId : m_BuildPartIds) { BuildPartIds.push_back(Oid::TryFromHexString(BuildPartId)); if (BuildPartIds.back() == Oid::Zero) { throw zen::OptionParseException( fmt::format("build-part-id '{}' is invalid\n{}", BuildPartId, m_DownloadOptions.help())); } } BuildStorage::Statistics StorageStats; std::unique_ptr Storage; std::string StorageName; if (!m_BuildsUrl.empty()) { ZEN_CONSOLE("Downloading '{}' to '{}' from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'", BuildId, m_Path, m_BuildsUrl, Http.GetSessionId(), m_Namespace, m_Bucket, BuildId); CreateDirectories(m_Path / ZenTempStorageFolderName); Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName); StorageName = "Cloud DDC"; } else if (!m_StoragePath.empty()) { ZEN_CONSOLE("Downloading '{}' to '{}' from folder {}. BuildId '{}'", BuildId, m_Path, m_StoragePath, BuildId); Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); StorageName = fmt::format("Disk {}", m_StoragePath.stem()); } else { throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); } DownloadFolder(*Storage, BuildId, BuildPartIds, m_BuildPartNames, m_Path, m_AllowMultiparts, m_Clean, m_PostDownloadVerify); if (false) { ZEN_CONSOLE( "{}:\n" "Read: {}\n" "Write: {}\n" "Requests: {}\n" "Avg Request Time: {}\n" "Avg I/O Time: {}", StorageName, NiceBytes(StorageStats.TotalBytesRead.load()), NiceBytes(StorageStats.TotalBytesWritten.load()), StorageStats.TotalRequestCount.load(), StorageStats.TotalExecutionTimeUs.load() > 0 ? NiceTimeSpanMs(StorageStats.TotalExecutionTimeUs.load() / 1000 / StorageStats.TotalRequestCount.load()) : 0, StorageStats.TotalRequestCount.load() > 0 ? NiceTimeSpanMs(StorageStats.TotalRequestTimeUs.load() / 1000 / StorageStats.TotalRequestCount.load()) : 0); } return AbortFlag ? 11 : 0; } if (SubOption == &m_DiffOptions) { if (m_Path.empty()) { throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_DownloadOptions.help())); } if (m_DiffPath.empty()) { throw zen::OptionParseException(fmt::format("compare-path is required\n{}", m_DownloadOptions.help())); } DiffFolders(m_Path, m_DiffPath, m_OnlyChunked); return AbortFlag ? 11 : 0; } if (SubOption == &m_TestOptions) { ParseStorageOptions(); ParseAuthOptions(); HttpClient Http(m_BuildsUrl, ClientSettings); if (m_Path.empty()) { throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_DownloadOptions.help())); } m_BuildId = Oid::NewOid().ToString(); m_BuildPartName = m_Path.filename().string(); m_BuildPartId = Oid::NewOid().ToString(); m_CreateBuild = true; BuildStorage::Statistics StorageStats; const Oid BuildId = Oid::FromHexString(m_BuildId); const Oid BuildPartId = Oid::FromHexString(m_BuildPartId); std::unique_ptr Storage; std::string StorageName; if (m_BuildsUrl.empty() && m_StoragePath.empty()) { m_StoragePath = GetRunningExecutablePath().parent_path() / ".tmpstore"; CreateDirectories(m_StoragePath); CleanDirectory(m_StoragePath); } auto _ = MakeGuard([&]() { if (m_BuildsUrl.empty() && m_StoragePath.empty()) { DeleteDirectories(m_StoragePath); } }); if (!m_BuildsUrl.empty()) { ZEN_CONSOLE("Using '{}' to '{}' from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'", m_BuildPartName.empty() ? m_BuildPartId : m_BuildPartName, m_Path, m_BuildsUrl, Http.GetSessionId(), m_Namespace, m_Bucket, BuildId); CreateDirectories(m_Path / ZenTempStorageFolderName); Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName); StorageName = "Cloud DDC"; } else if (!m_StoragePath.empty()) { ZEN_CONSOLE("Using '{}' to '{}' from folder {}. BuildId '{}'", m_BuildPartName.empty() ? m_BuildPartId : m_BuildPartName, m_Path, m_StoragePath, BuildId); Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); StorageName = fmt::format("Disk {}", m_StoragePath.stem()); } else { throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); } auto MakeMetaData = [](const Oid& BuildId) -> CbObject { CbObjectWriter BuildMetaDataWriter; { const uint32_t CL = BuildId.OidBits[2]; BuildMetaDataWriter.AddString("name", fmt::format("++Test+Main-CL-{}", CL)); BuildMetaDataWriter.AddString("branch", "ZenTestBuild"); BuildMetaDataWriter.AddString("baselineBranch", "ZenTestBuild"); BuildMetaDataWriter.AddString("platform", "Windows"); BuildMetaDataWriter.AddString("project", "Test"); BuildMetaDataWriter.AddInteger("changelist", CL); BuildMetaDataWriter.AddString("buildType", "test-folder"); } return BuildMetaDataWriter.Save(); }; CbObject MetaData = MakeMetaData(Oid::TryFromHexString(m_BuildId)); { ExtendableStringBuilder<256> SB; CompactBinaryToJson(MetaData, SB); ZEN_CONSOLE("Upload Build {}, Part {} ({})\n{}", m_BuildId, BuildPartId, m_BuildPartName, SB.ToView()); } UploadFolder(*Storage, BuildId, BuildPartId, m_BuildPartName, m_Path, {}, m_BlockReuseMinPercentLimit, m_AllowMultiparts, MetaData, true, false, true); if (AbortFlag) { ZEN_CONSOLE("Upload failed."); return 11; } const std::filesystem::path DownloadPath = m_Path.parent_path() / (m_BuildPartName + "_download"); ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}'", BuildId, BuildPartId, m_BuildPartName, DownloadPath); DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, true, true); if (AbortFlag) { ZEN_CONSOLE("Download failed."); return 11; } ZEN_CONSOLE("\nRe-download Build {}, Part {} ({}) to '{}' (identical target)", BuildId, BuildPartId, m_BuildPartName, DownloadPath); DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false, true); if (AbortFlag) { ZEN_CONSOLE("Re-download failed. (identical target)"); return 11; } auto ScrambleDir = [](const std::filesystem::path& Path) { ZEN_CONSOLE("\nScrambling '{}'", Path); Stopwatch Timer; DirectoryContent DownloadContent; GetDirectoryContent( Path, DirectoryContentFlags::Recursive | DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeFileSizes, DownloadContent); auto IsAcceptedFolder = [ExcludeFolders = DefaultExcludeFolders, Path](const std::filesystem::path& AbsolutePath) -> bool { std::string RelativePath = std::filesystem::relative(AbsolutePath, Path).generic_string(); for (const std::string_view& ExcludeFolder : ExcludeFolders) { if (RelativePath.starts_with(ExcludeFolder)) { if (RelativePath.length() == ExcludeFolder.length()) { return false; } else if (RelativePath[ExcludeFolder.length()] == '/') { return false; } } } return true; }; ParallellWork Work(AbortFlag); uint32_t Randomizer = 0; auto FileSizeIt = DownloadContent.FileSizes.begin(); for (const std::filesystem::path& FilePath : DownloadContent.Files) { if (IsAcceptedFolder(FilePath)) { uint32_t Case = (Randomizer++) % 7; switch (Case) { case 0: { uint64_t SourceSize = *FileSizeIt; if (SourceSize > 0) { Work.ScheduleWork( GetMediumWorkerPool(EWorkloadType::Burst), [SourceSize, FilePath](std::atomic&) { if (!AbortFlag) { IoBuffer Scrambled(SourceSize); { IoBuffer Source = IoBufferBuilder::MakeFromFile(FilePath); Scrambled.GetMutableView().CopyFrom( Source.GetView().Mid(SourceSize / 3, SourceSize / 3)); Scrambled.GetMutableView() .Mid(SourceSize / 3) .CopyFrom(Source.GetView().Mid(0, SourceSize / 3)); Scrambled.GetMutableView() .Mid((SourceSize / 3) * 2) .CopyFrom(Source.GetView().Mid(SourceSize / 2, SourceSize / 3)); } bool IsReadOnly = SetFileReadOnly(FilePath, false); WriteFile(FilePath, Scrambled); if (IsReadOnly) { SetFileReadOnly(FilePath, true); } } }, Work.DefaultErrorFunction()); } } break; case 1: std::filesystem::remove(FilePath); break; default: break; } } FileSizeIt++; } Work.Wait(5000, [&](bool IsAborted, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted); ZEN_CONSOLE("Scrambling files, {} remaining", PendingWork); }); ZEN_ASSERT(!AbortFlag.load()); ZEN_CONSOLE("Scrambled files in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); }; ScrambleDir(DownloadPath); ZEN_CONSOLE("\nRe-download Build {}, Part {} ({}) to '{}' (scrambled target)", BuildId, BuildPartId, m_BuildPartName, DownloadPath); DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false, true); if (AbortFlag) { ZEN_CONSOLE("Re-download failed. (scrambled target)"); return 11; } ScrambleDir(DownloadPath); Oid BuildId2 = Oid::NewOid(); Oid BuildPartId2 = Oid::NewOid(); CbObject MetaData2 = MakeMetaData(BuildId2); { ExtendableStringBuilder<256> SB; CompactBinaryToJson(MetaData, SB); ZEN_CONSOLE("\nUpload scrambled Build {}, Part {} ({})\n{}\n", BuildId2, BuildPartId2, m_BuildPartName, SB.ToView()); } UploadFolder(*Storage, BuildId2, BuildPartId2, m_BuildPartName, DownloadPath, {}, m_BlockReuseMinPercentLimit, m_AllowMultiparts, MetaData2, true, false, true); if (AbortFlag) { ZEN_CONSOLE("Upload of scrambled failed."); return 11; } ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (original)", BuildId, BuildPartId, m_BuildPartName, DownloadPath); DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false, true); if (AbortFlag) { ZEN_CONSOLE("Re-download failed."); return 11; } ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath); DownloadFolder(*Storage, BuildId2, {BuildPartId2}, {}, DownloadPath, m_AllowMultiparts, false, true); if (AbortFlag) { ZEN_CONSOLE("Re-download failed."); return 11; } ZEN_CONSOLE("\nRe-download Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath); DownloadFolder(*Storage, BuildId2, {BuildPartId2}, {}, DownloadPath, m_AllowMultiparts, false, true); if (AbortFlag) { ZEN_CONSOLE("Re-download failed."); return 11; } return 0; } if (SubOption == &m_FetchBlobOptions) { ParseStorageOptions(); ParseAuthOptions(); HttpClient Http(m_BuildsUrl, ClientSettings); if (m_BlobHash.empty()) { throw zen::OptionParseException(fmt::format("Blob hash string is missing\n{}", m_UploadOptions.help())); } IoHash BlobHash; if (!IoHash::TryParse(m_BlobHash, BlobHash)) { throw zen::OptionParseException(fmt::format("Blob hash string is invalid\n{}", m_UploadOptions.help())); } if (m_BuildsUrl.empty() && m_StoragePath.empty()) { throw zen::OptionParseException(fmt::format("At least one storage option is required\n{}", m_UploadOptions.help())); } BuildStorage::Statistics StorageStats; const Oid BuildId = Oid::FromHexString(m_BuildId); std::unique_ptr Storage; std::string StorageName; if (!m_BuildsUrl.empty()) { ZEN_CONSOLE("Using from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'", m_BuildsUrl, Http.GetSessionId(), m_Namespace, m_Bucket, BuildId); CreateDirectories(m_Path / ZenTempStorageFolderName); Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName); StorageName = "Cloud DDC"; } else if (!m_StoragePath.empty()) { ZEN_CONSOLE("Using folder {}. BuildId '{}'", m_StoragePath, BuildId); Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); StorageName = fmt::format("Disk {}", m_StoragePath.stem()); } else { throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); } uint64_t CompressedSize; uint64_t DecompressedSize; ValidateBlob(*Storage, BuildId, BlobHash, CompressedSize, DecompressedSize); if (AbortFlag) { return 11; } ZEN_CONSOLE("Blob '{}' has a compressed size {} and a decompressed size of {} bytes", BlobHash, CompressedSize, DecompressedSize); return 0; } if (SubOption == &m_ValidateBuildPartOptions) { ParseStorageOptions(); ParseAuthOptions(); HttpClient Http(m_BuildsUrl, ClientSettings); if (m_BuildsUrl.empty() && m_StoragePath.empty()) { throw zen::OptionParseException(fmt::format("At least one storage option is required\n{}", m_UploadOptions.help())); } if (m_BuildId.empty()) { throw zen::OptionParseException(fmt::format("build-id is required\n{}", m_DownloadOptions.help())); } Oid BuildId = Oid::TryFromHexString(m_BuildId); if (BuildId == Oid::Zero) { throw zen::OptionParseException(fmt::format("build-id is invalid\n{}", m_DownloadOptions.help())); } if (!m_BuildPartName.empty() && !m_BuildPartId.empty()) { throw zen::OptionParseException(fmt::format("build-part-id conflicts with build-part-name\n{}", m_DownloadOptions.help())); } BuildStorage::Statistics StorageStats; std::unique_ptr Storage; std::string StorageName; if (!m_BuildsUrl.empty()) { ZEN_CONSOLE("Using from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'", m_BuildsUrl, Http.GetSessionId(), m_Namespace, m_Bucket, BuildId); CreateDirectories(m_Path / ZenTempStorageFolderName); Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName); StorageName = "Cloud DDC"; } else if (!m_StoragePath.empty()) { ZEN_CONSOLE("Using folder {}. BuildId '{}'", m_StoragePath, BuildId); Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); StorageName = fmt::format("Disk {}", m_StoragePath.stem()); } else { throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); } Oid BuildPartId = Oid::TryFromHexString(m_BuildPartId); ValidateBuildPart(*Storage, BuildId, BuildPartId, m_BuildPartName); return AbortFlag ? 13 : 0; } } catch (const std::exception& Ex) { ZEN_ERROR("{}", Ex.what()); return 3; } ZEN_ASSERT(false); } } // namespace zen