diff options
| author | Dan Engelbrecht <[email protected]> | 2025-10-14 13:13:59 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-14 13:13:59 +0200 |
| commit | 9b7580230798d83d9bb36d40150913af69a13929 (patch) | |
| tree | 73552ec1d3e9d955ce391cad894c637b74be91d4 | |
| parent | move all storage-related services into storage tree (#571) (diff) | |
| download | zen-9b7580230798d83d9bb36d40150913af69a13929.tar.xz zen-9b7580230798d83d9bb36d40150913af69a13929.zip | |
refactor builds cmd part2 (#572)
* fix metadata info in filebuildstorage GetBuild
* move MakeSafeAbsolutePathÃnPlace to filesystem.h/cpp
* add BuildsOperationUploadFolder op moving code from builds_cmd.cpp
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 2961 | ||||
| -rw-r--r-- | src/zencore/filesystem.cpp | 27 | ||||
| -rw-r--r-- | src/zencore/include/zencore/filesystem.h | 3 | ||||
| -rw-r--r-- | src/zenremotestore/builds/buildstorageoperations.cpp | 2429 | ||||
| -rw-r--r-- | src/zenremotestore/builds/filebuildstorage.cpp | 6 | ||||
| -rw-r--r-- | src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h | 245 | ||||
| -rw-r--r-- | src/zenutil/commandlineoptions.cpp | 27 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/commandlineoptions.h | 10 |
8 files changed, 2982 insertions, 2726 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index 00bba6f4f..b60844ea1 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -263,24 +263,6 @@ namespace { } } - static const size_t DefaultMaxBlockSize = 64u * 1024u * 1024u; - static const size_t DefaultMaxChunksPerBlock = 4u * 1000u; - static const size_t DefaultMaxChunkEmbedSize = 3u * 512u * 1024u; - - struct ChunksBlockParameters - { - size_t MaxBlockSize = DefaultMaxBlockSize; - size_t MaxChunksPerBlock = DefaultMaxChunksPerBlock; - size_t MaxChunkEmbedSize = DefaultMaxChunkEmbedSize; - }; - - const ChunksBlockParameters DefaultChunksBlockParams{ - .MaxBlockSize = 32u * 1024u * 1024u, - .MaxChunksPerBlock = DefaultMaxChunksPerBlock, - .MaxChunkEmbedSize = 2u * 1024u * 1024u // DefaultChunkedParams.MaxSize - }; - const uint64_t DefaultPreferredMultipartChunkSize = 32u * 1024u * 1024u; - const double DefaultLatency = 0; // .0010; const double DefaultDelayPerKBSec = 0; // 0.00005; @@ -296,42 +278,6 @@ namespace { } WorkerThreadPool& GetNetworkPool() { return SingleThreaded ? GetSyncWorkerPool() : GetSmallWorkerPool(EWorkloadType::Burst); } - static const std::vector<uint32_t> NonCompressableExtensions({HashStringDjb2(".mp4"sv), - HashStringDjb2(".zip"sv), - HashStringDjb2(".7z"sv), - HashStringDjb2(".bzip"sv), - HashStringDjb2(".rar"sv), - HashStringDjb2(".gzip"sv), - HashStringDjb2(".apk"sv), - HashStringDjb2(".nsp"sv), - HashStringDjb2(".xvc"sv), - HashStringDjb2(".pkg"sv), - HashStringDjb2(".dmg"sv), - HashStringDjb2(".ipa"sv)}); - - static const tsl::robin_set<uint32_t> NonCompressableExtensionSet(NonCompressableExtensions.begin(), NonCompressableExtensions.end()); - - static bool IsExtensionHashCompressable(const uint32_t PathHash) { return !NonCompressableExtensionSet.contains(PathHash); } - - static bool IsChunkCompressable(const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, uint32_t ChunkIndex) - { - ZEN_UNUSED(Content); - const uint32_t ChunkLocationCount = Lookup.ChunkSequenceLocationCounts[ChunkIndex]; - if (ChunkLocationCount == 0) - { - return false; - } - const size_t ChunkLocationOffset = Lookup.ChunkSequenceLocationOffset[ChunkIndex]; - const uint32_t SequenceIndex = Lookup.ChunkSequenceLocations[ChunkLocationOffset].SequenceIndex; - const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex]; - const uint32_t ExtensionHash = Lookup.PathExtensionHash[PathIndex]; - - const bool IsCompressable = IsExtensionHashCompressable(ExtensionHash); - return IsCompressable; - } - - const uint64_t MinimumSizeForCompressInBlock = 2u * 1024u; - const std::string ZenFolderName = ".zen"; std::filesystem::path ZenStateFilePath(const std::filesystem::path& ZenFolderPath) { return ZenFolderPath / "current_state.cbo"; } // std::filesystem::path ZenStateFileJsonPath(const std::filesystem::path& ZenFolderPath) { return ZenFolderPath / "current_state.json"; @@ -575,17 +521,6 @@ namespace { } } - template<typename T> - std::string FormatArray(std::span<const T> Items, std::string_view Prefix) - { - ExtendableStringBuilder<512> SB; - for (const T& Item : Items) - { - SB.Append(fmt::format("{}{}", Prefix, Item)); - } - return SB.ToString(); - } - bool CleanDirectory(const std::filesystem::path& Path, std::span<const std::string> ExcludeDirectories) { ZEN_TRACE_CPU("CleanDirectory"); @@ -785,15 +720,6 @@ namespace { return CleanWipe; } - IoBuffer WriteToTempFile(CompositeBuffer&& Buffer, - const std::filesystem::path& TempFolderPath, - const IoHash& Hash, - const std::string& Suffix = {}) - { - std::filesystem::path TempFilePath = TempFolderPath / (Hash.ToHexString() + Suffix); - return WriteToTempFile(std::move(Buffer), TempFilePath); - } - class FilteredRate { public: @@ -886,161 +812,6 @@ namespace { return Count * 1000000 / ElapsedWallTimeUS; } - struct FindBlocksStatistics - { - uint64_t FindBlockTimeMS = 0; - uint64_t PotentialChunkCount = 0; - uint64_t PotentialChunkByteCount = 0; - uint64_t FoundBlockCount = 0; - uint64_t FoundBlockChunkCount = 0; - uint64_t FoundBlockByteCount = 0; - uint64_t AcceptedBlockCount = 0; - uint64_t AcceptedChunkCount = 0; - uint64_t AcceptedByteCount = 0; - uint64_t AcceptedRawByteCount = 0; - uint64_t RejectedBlockCount = 0; - uint64_t RejectedChunkCount = 0; - uint64_t RejectedByteCount = 0; - uint64_t AcceptedReduntantChunkCount = 0; - uint64_t AcceptedReduntantByteCount = 0; - uint64_t NewBlocksCount = 0; - uint64_t NewBlocksChunkCount = 0; - uint64_t NewBlocksChunkByteCount = 0; - }; - - struct UploadStatistics - { - std::atomic<uint64_t> BlockCount = 0; - std::atomic<uint64_t> BlocksBytes = 0; - std::atomic<uint64_t> ChunkCount = 0; - std::atomic<uint64_t> ChunksBytes = 0; - std::atomic<uint64_t> ReadFromDiskBytes = 0; - std::atomic<uint64_t> MultipartAttachmentCount = 0; - uint64_t ElapsedWallTimeUS = 0; - - UploadStatistics& operator+=(const UploadStatistics& Rhs) - { - BlockCount += Rhs.BlockCount; - BlocksBytes += Rhs.BlocksBytes; - ChunkCount += Rhs.ChunkCount; - ChunksBytes += Rhs.ChunksBytes; - ReadFromDiskBytes += Rhs.ReadFromDiskBytes; - MultipartAttachmentCount += Rhs.MultipartAttachmentCount; - ElapsedWallTimeUS += Rhs.ElapsedWallTimeUS; - return *this; - } - }; - - struct LooseChunksStatistics - { - uint64_t ChunkCount = 0; - uint64_t ChunkByteCount = 0; - std::atomic<uint64_t> CompressedChunkCount = 0; - std::atomic<uint64_t> CompressedChunkRawBytes = 0; - std::atomic<uint64_t> CompressedChunkBytes = 0; - uint64_t CompressChunksElapsedWallTimeUS = 0; - - LooseChunksStatistics& operator+=(const LooseChunksStatistics& Rhs) - { - ChunkCount += Rhs.ChunkCount; - ChunkByteCount += Rhs.ChunkByteCount; - CompressedChunkCount += Rhs.CompressedChunkCount; - CompressedChunkRawBytes += Rhs.CompressedChunkRawBytes; - CompressedChunkBytes += Rhs.CompressedChunkBytes; - CompressChunksElapsedWallTimeUS += Rhs.CompressChunksElapsedWallTimeUS; - return *this; - } - }; - - struct GenerateBlocksStatistics - { - std::atomic<uint64_t> GeneratedBlockByteCount = 0; - std::atomic<uint64_t> GeneratedBlockCount = 0; - uint64_t GenerateBlocksElapsedWallTimeUS = 0; - - GenerateBlocksStatistics& operator+=(const GenerateBlocksStatistics& Rhs) - { - GeneratedBlockByteCount += Rhs.GeneratedBlockByteCount; - GeneratedBlockCount += Rhs.GeneratedBlockCount; - GenerateBlocksElapsedWallTimeUS += Rhs.GenerateBlocksElapsedWallTimeUS; - return *this; - } - }; - - struct VerifyFolderStatistics - { - std::atomic<uint64_t> FilesVerified = 0; - std::atomic<uint64_t> FilesFailed = 0; - std::atomic<uint64_t> ReadBytes = 0; - uint64_t VerifyElapsedWallTimeUs = 0; - }; - - std::vector<uint32_t> CalculateAbsoluteChunkOrders(const std::span<const IoHash> LocalChunkHashes, - const std::span<const uint32_t> LocalChunkOrder, - const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& ChunkHashToLocalChunkIndex, - const std::span<const uint32_t>& LooseChunkIndexes, - const std::span<const ChunkBlockDescription>& BlockDescriptions) - { - ZEN_TRACE_CPU("CalculateAbsoluteChunkOrders"); - -#if EXTRA_VERIFY - std::vector<IoHash> TmpAbsoluteChunkHashes; - TmpAbsoluteChunkHashes.reserve(LocalChunkHashes.size()); -#endif // EXTRA_VERIFY - std::vector<uint32_t> LocalChunkIndexToAbsoluteChunkIndex; - LocalChunkIndexToAbsoluteChunkIndex.resize(LocalChunkHashes.size(), (uint32_t)-1); - std::uint32_t AbsoluteChunkCount = 0; - for (uint32_t ChunkIndex : LooseChunkIndexes) - { - LocalChunkIndexToAbsoluteChunkIndex[ChunkIndex] = AbsoluteChunkCount; -#if EXTRA_VERIFY - TmpAbsoluteChunkHashes.push_back(LocalChunkHashes[ChunkIndex]); -#endif // EXTRA_VERIFY - AbsoluteChunkCount++; - } - for (const ChunkBlockDescription& Block : BlockDescriptions) - { - for (const IoHash& ChunkHash : Block.ChunkRawHashes) - { - if (auto It = ChunkHashToLocalChunkIndex.find(ChunkHash); It != ChunkHashToLocalChunkIndex.end()) - { - const uint32_t LocalChunkIndex = It->second; - ZEN_ASSERT_SLOW(LocalChunkHashes[LocalChunkIndex] == ChunkHash); - LocalChunkIndexToAbsoluteChunkIndex[LocalChunkIndex] = AbsoluteChunkCount; - } -#if EXTRA_VERIFY - TmpAbsoluteChunkHashes.push_back(ChunkHash); -#endif // EXTRA_VERIFY - AbsoluteChunkCount++; - } - } - std::vector<uint32_t> AbsoluteChunkOrder; - AbsoluteChunkOrder.reserve(LocalChunkHashes.size()); - for (const uint32_t LocalChunkIndex : LocalChunkOrder) - { - const uint32_t AbsoluteChunkIndex = LocalChunkIndexToAbsoluteChunkIndex[LocalChunkIndex]; -#if EXTRA_VERIFY - ZEN_ASSERT(LocalChunkHashes[LocalChunkIndex] == TmpAbsoluteChunkHashes[AbsoluteChunkIndex]); -#endif // EXTRA_VERIFY - AbsoluteChunkOrder.push_back(AbsoluteChunkIndex); - } -#if EXTRA_VERIFY - { - uint32_t OrderIndex = 0; - while (OrderIndex < LocalChunkOrder.size()) - { - const uint32_t LocalChunkIndex = LocalChunkOrder[OrderIndex]; - const IoHash& LocalChunkHash = LocalChunkHashes[LocalChunkIndex]; - const uint32_t AbsoluteChunkIndex = AbsoluteChunkOrder[OrderIndex]; - const IoHash& AbsoluteChunkHash = TmpAbsoluteChunkHashes[AbsoluteChunkIndex]; - ZEN_ASSERT(LocalChunkHash == AbsoluteChunkHash); - OrderIndex++; - } - } -#endif // EXTRA_VERIFY - return AbsoluteChunkOrder; - } - void CalculateLocalChunkOrders(const std::span<const uint32_t>& AbsoluteChunkOrders, const std::span<const IoHash> LooseChunkHashes, const std::span<const uint64_t> LooseChunkRawSizes, @@ -1111,86 +882,6 @@ namespace { #endif // EXTRA_VERIFY } - void WriteBuildContentToCompactBinary(CbObjectWriter& PartManifestWriter, - const SourcePlatform Platform, - std::span<const std::filesystem::path> Paths, - std::span<const IoHash> RawHashes, - std::span<const uint64_t> RawSizes, - std::span<const uint32_t> Attributes, - std::span<const IoHash> SequenceRawHashes, - std::span<const uint32_t> ChunkCounts, - std::span<const IoHash> LocalChunkHashes, - std::span<const uint64_t> LocalChunkRawSizes, - const std::vector<uint32_t>& AbsoluteChunkOrders, - const std::span<const uint32_t> LooseLocalChunkIndexes, - const std::span<IoHash> BlockHashes) - { - ZEN_ASSERT(Platform != SourcePlatform::_Count); - PartManifestWriter.AddString("platform"sv, ToString(Platform)); - - uint64_t TotalSize = 0; - for (const uint64_t Size : RawSizes) - { - TotalSize += Size; - } - PartManifestWriter.AddInteger("totalSize", TotalSize); - - PartManifestWriter.BeginObject("files"sv); - { - compactbinary_helpers::WriteArray(Paths, "paths"sv, PartManifestWriter); - compactbinary_helpers::WriteArray(RawHashes, "rawhashes"sv, PartManifestWriter); - compactbinary_helpers::WriteArray(RawSizes, "rawsizes"sv, PartManifestWriter); - if (Platform == SourcePlatform::Windows) - { - compactbinary_helpers::WriteArray(Attributes, "attributes"sv, PartManifestWriter); - } - if (Platform == SourcePlatform::Linux || Platform == SourcePlatform::MacOS) - { - compactbinary_helpers::WriteArray(Attributes, "mode"sv, PartManifestWriter); - } - } - PartManifestWriter.EndObject(); // files - - PartManifestWriter.BeginObject("chunkedContent"); - { - compactbinary_helpers::WriteArray(SequenceRawHashes, "sequenceRawHashes"sv, PartManifestWriter); - compactbinary_helpers::WriteArray(ChunkCounts, "chunkcounts"sv, PartManifestWriter); - compactbinary_helpers::WriteArray(AbsoluteChunkOrders, "chunkorders"sv, PartManifestWriter); - } - PartManifestWriter.EndObject(); // chunkedContent - - size_t LooseChunkCount = LooseLocalChunkIndexes.size(); - if (LooseChunkCount > 0) - { - PartManifestWriter.BeginObject("chunkAttachments"); - { - PartManifestWriter.BeginArray("rawHashes"sv); - for (uint32_t ChunkIndex : LooseLocalChunkIndexes) - { - PartManifestWriter.AddBinaryAttachment(LocalChunkHashes[ChunkIndex]); - } - PartManifestWriter.EndArray(); // rawHashes - - PartManifestWriter.BeginArray("chunkRawSizes"sv); - for (uint32_t ChunkIndex : LooseLocalChunkIndexes) - { - PartManifestWriter.AddInteger(LocalChunkRawSizes[ChunkIndex]); - } - PartManifestWriter.EndArray(); // chunkSizes - } - PartManifestWriter.EndObject(); // - } - - if (BlockHashes.size() > 0) - { - PartManifestWriter.BeginObject("blockAttachments"); - { - compactbinary_helpers::WriteBinaryAttachmentArray(BlockHashes, "rawHashes"sv, PartManifestWriter); - } - PartManifestWriter.EndObject(); // blocks - } - } - void ReadBuildContentFromCompactBinary(CbObjectView BuildPartManifest, SourcePlatform& OutPlatform, std::vector<std::filesystem::path>& OutPaths, @@ -1468,72 +1159,6 @@ namespace { TemporaryFile::SafeWriteFile(WritePath, JsonPayload); } - class ReadFileCache - { - public: - // A buffered file reader that provides CompositeBuffer where the buffers are owned and the memory never overwritten - ReadFileCache(DiskStatistics& DiskStats, - const std::filesystem::path& Path, - const ChunkedFolderContent& LocalContent, - const ChunkedContentLookup& LocalLookup, - size_t MaxOpenFileCount) - : m_Path(Path) - , m_LocalContent(LocalContent) - , m_LocalLookup(LocalLookup) - , m_DiskStats(DiskStats) - { - m_OpenFiles.reserve(MaxOpenFileCount); - } - ~ReadFileCache() { m_OpenFiles.clear(); } - - CompositeBuffer GetRange(uint32_t SequenceIndex, uint64_t Offset, uint64_t Size) - { - ZEN_TRACE_CPU("ReadFileCache::GetRange"); - - auto CacheIt = std::find_if(m_OpenFiles.begin(), m_OpenFiles.end(), [SequenceIndex](const auto& Lhs) { - return Lhs.first == SequenceIndex; - }); - if (CacheIt != m_OpenFiles.end()) - { - if (CacheIt != m_OpenFiles.begin()) - { - auto CachedFile(std::move(CacheIt->second)); - m_OpenFiles.erase(CacheIt); - m_OpenFiles.insert(m_OpenFiles.begin(), std::make_pair(SequenceIndex, std::move(CachedFile))); - } - CompositeBuffer Result = m_OpenFiles.front().second->GetRange(Offset, Size); - return Result; - } - const uint32_t LocalPathIndex = m_LocalLookup.SequenceIndexFirstPathIndex[SequenceIndex]; - const std::filesystem::path LocalFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred(); - if (Size == m_LocalContent.RawSizes[LocalPathIndex]) - { - IoBuffer Result = IoBufferBuilder::MakeFromFile(LocalFilePath); - return CompositeBuffer(SharedBuffer(Result)); - } - if (m_OpenFiles.size() == m_OpenFiles.capacity()) - { - m_OpenFiles.pop_back(); - } - m_OpenFiles.insert(m_OpenFiles.begin(), - std::make_pair(SequenceIndex, - std::make_unique<BufferedOpenFile>(LocalFilePath, - m_DiskStats.OpenReadCount, - m_DiskStats.CurrentOpenFileCount, - m_DiskStats.ReadCount, - m_DiskStats.ReadByteCount))); - CompositeBuffer Result = m_OpenFiles.front().second->GetRange(Offset, Size); - return Result; - } - - private: - const std::filesystem::path m_Path; - const ChunkedFolderContent& m_LocalContent; - const ChunkedContentLookup& m_LocalLookup; - std::vector<std::pair<uint32_t, std::unique_ptr<BufferedOpenFile>>> m_OpenFiles; - DiskStatistics& m_DiskStats; - }; - CompositeBuffer ValidateBlob(IoBuffer&& Payload, const IoHash& BlobHash, uint64_t& OutCompressedSize, uint64_t& OutDecompressedSize) { ZEN_TRACE_CPU("ValidateBlob"); @@ -1643,95 +1268,6 @@ namespace { return GetChunkBlockDescription(BlockBuffer.Flatten(), BlobHash); } - CompositeBuffer FetchChunk(const ChunkedFolderContent& Content, - const ChunkedContentLookup& Lookup, - const IoHash& ChunkHash, - ReadFileCache& OpenFileCache) - { - ZEN_TRACE_CPU("FetchChunk"); - auto It = Lookup.ChunkHashToChunkIndex.find(ChunkHash); - ZEN_ASSERT(It != Lookup.ChunkHashToChunkIndex.end()); - uint32_t ChunkIndex = It->second; - std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkLocations = GetChunkSequenceLocations(Lookup, ChunkIndex); - ZEN_ASSERT(!ChunkLocations.empty()); - CompositeBuffer Chunk = OpenFileCache.GetRange(ChunkLocations[0].SequenceIndex, - ChunkLocations[0].Offset, - Content.ChunkedContent.ChunkRawSizes[ChunkIndex]); - ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == ChunkHash); - return Chunk; - }; - - CompressedBuffer GenerateBlock(const std::filesystem::path& Path, - const ChunkedFolderContent& Content, - const ChunkedContentLookup& Lookup, - const std::vector<uint32_t>& ChunksInBlock, - ChunkBlockDescription& OutBlockDescription, - DiskStatistics& DiskStats) - { - ZEN_TRACE_CPU("GenerateBlock"); - ReadFileCache OpenFileCache(DiskStats, Path, Content, Lookup, 4); - - std::vector<std::pair<IoHash, FetchChunkFunc>> BlockContent; - BlockContent.reserve(ChunksInBlock.size()); - for (uint32_t ChunkIndex : ChunksInBlock) - { - BlockContent.emplace_back(std::make_pair( - Content.ChunkedContent.ChunkHashes[ChunkIndex], - [&Content, &Lookup, &OpenFileCache, ChunkIndex](const IoHash& ChunkHash) -> std::pair<uint64_t, CompressedBuffer> { - CompositeBuffer Chunk = FetchChunk(Content, Lookup, ChunkHash, OpenFileCache); - if (!Chunk) - { - ZEN_ASSERT(false); - } - uint64_t RawSize = Chunk.GetSize(); - const bool ShouldCompressChunk = Lookup.RawHashToSequenceIndex.contains(ChunkHash) && - (RawSize >= MinimumSizeForCompressInBlock) && - IsChunkCompressable(Content, Lookup, ChunkIndex); - const OodleCompressionLevel CompressionLevel = - ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None; - return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid, CompressionLevel)}; - })); - } - - return GenerateChunkBlock(std::move(BlockContent), OutBlockDescription); - }; - - CompressedBuffer RebuildBlock(const std::filesystem::path& Path, - const ChunkedFolderContent& Content, - const ChunkedContentLookup& Lookup, - CompositeBuffer&& HeaderBuffer, - const std::vector<uint32_t>& ChunksInBlock, - DiskStatistics& DiskStats) - { - ZEN_TRACE_CPU("RebuildBlock"); - ReadFileCache OpenFileCache(DiskStats, Path, Content, Lookup, 4); - - std::vector<SharedBuffer> ResultBuffers; - ResultBuffers.reserve(HeaderBuffer.GetSegments().size() + ChunksInBlock.size()); - ResultBuffers.insert(ResultBuffers.end(), HeaderBuffer.GetSegments().begin(), HeaderBuffer.GetSegments().end()); - for (uint32_t ChunkIndex : ChunksInBlock) - { - std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkLocations = GetChunkSequenceLocations(Lookup, ChunkIndex); - ZEN_ASSERT(!ChunkLocations.empty()); - const IoHash& ChunkHash = Content.ChunkedContent.ChunkHashes[ChunkIndex]; - CompositeBuffer Chunk = OpenFileCache.GetRange(ChunkLocations[0].SequenceIndex, - ChunkLocations[0].Offset, - Content.ChunkedContent.ChunkRawSizes[ChunkIndex]); - ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == ChunkHash); - - const uint64_t RawSize = Chunk.GetSize(); - const bool ShouldCompressChunk = Lookup.RawHashToSequenceIndex.contains(ChunkHash) && - (RawSize >= MinimumSizeForCompressInBlock) && IsChunkCompressable(Content, Lookup, ChunkIndex); - const OodleCompressionLevel CompressionLevel = - ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None; - - CompositeBuffer CompressedChunk = - CompressedBuffer::Compress(std::move(Chunk), OodleCompressor::Mermaid, CompressionLevel).GetCompressed(); - ResultBuffers.insert(ResultBuffers.end(), CompressedChunk.GetSegments().begin(), CompressedChunk.GetSegments().end()); - } - return CompressedBuffer::FromCompressedNoValidate(CompositeBuffer(std::move(ResultBuffers))); - }; - struct ValidateStatistics { uint64_t BuildBlobSize = 0; @@ -1790,7 +1326,7 @@ namespace { } } ValidateStats.BuildBlobSize = Build.GetSize(); - uint64_t PreferredMultipartChunkSize = DefaultPreferredMultipartChunkSize; + uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u; if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0) { PreferredMultipartChunkSize = ChunkSize; @@ -2025,1079 +1561,6 @@ namespace { ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::Cleanup, TaskSteps::StepCount); } - void ArrangeChunksIntoBlocks(const ChunkedFolderContent& Content, - const ChunkedContentLookup& Lookup, - uint64_t MaxBlockSize, - uint64_t MaxChunksPerBlock, - std::vector<uint32_t>& ChunkIndexes, - std::vector<std::vector<uint32_t>>& OutBlocks) - { - ZEN_TRACE_CPU("ArrangeChunksIntoBlocks"); - std::sort(ChunkIndexes.begin(), ChunkIndexes.end(), [&Content, &Lookup](uint32_t Lhs, uint32_t Rhs) { - const ChunkedContentLookup::ChunkSequenceLocation& LhsLocation = GetChunkSequenceLocations(Lookup, Lhs)[0]; - const ChunkedContentLookup::ChunkSequenceLocation& RhsLocation = GetChunkSequenceLocations(Lookup, Rhs)[0]; - if (LhsLocation.SequenceIndex < RhsLocation.SequenceIndex) - { - return true; - } - else if (LhsLocation.SequenceIndex > RhsLocation.SequenceIndex) - { - return false; - } - return LhsLocation.Offset < RhsLocation.Offset; - }); - - uint64_t MaxBlockSizeLowThreshold = MaxBlockSize - (MaxBlockSize / 16); - - uint64_t BlockSize = 0; - - uint32_t ChunkIndexStart = 0; - for (uint32_t ChunkIndexOffset = 0; ChunkIndexOffset < ChunkIndexes.size();) - { - const uint32_t ChunkIndex = ChunkIndexes[ChunkIndexOffset]; - const uint64_t ChunkSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; - - if (((BlockSize + ChunkSize) > MaxBlockSize) || (ChunkIndexOffset - ChunkIndexStart) > MaxChunksPerBlock) - { - // Within the span of MaxBlockSizeLowThreshold and MaxBlockSize, see if there is a break - // between source paths for chunks. Break the block at the last such break if any. - ZEN_ASSERT(ChunkIndexOffset > ChunkIndexStart); - - const uint32_t ChunkSequenceIndex = - Lookup.ChunkSequenceLocations[Lookup.ChunkSequenceLocationOffset[ChunkIndex]].SequenceIndex; - - uint64_t ScanBlockSize = BlockSize; - - uint32_t ScanChunkIndexOffset = ChunkIndexOffset - 1; - while (ScanChunkIndexOffset > (ChunkIndexStart + 2)) - { - const uint32_t TestChunkIndex = ChunkIndexes[ScanChunkIndexOffset]; - const uint64_t TestChunkSize = Content.ChunkedContent.ChunkRawSizes[TestChunkIndex]; - if ((ScanBlockSize - TestChunkSize) < MaxBlockSizeLowThreshold) - { - break; - } - - const uint32_t TestSequenceIndex = - Lookup.ChunkSequenceLocations[Lookup.ChunkSequenceLocationOffset[TestChunkIndex]].SequenceIndex; - if (ChunkSequenceIndex != TestSequenceIndex) - { - ChunkIndexOffset = ScanChunkIndexOffset + 1; - break; - } - - ScanBlockSize -= TestChunkSize; - ScanChunkIndexOffset--; - } - - std::vector<uint32_t> ChunksInBlock; - ChunksInBlock.reserve(ChunkIndexOffset - ChunkIndexStart); - for (uint32_t AddIndexOffset = ChunkIndexStart; AddIndexOffset < ChunkIndexOffset; AddIndexOffset++) - { - const uint32_t AddChunkIndex = ChunkIndexes[AddIndexOffset]; - ChunksInBlock.push_back(AddChunkIndex); - } - OutBlocks.emplace_back(std::move(ChunksInBlock)); - BlockSize = 0; - ChunkIndexStart = ChunkIndexOffset; - } - else - { - ChunkIndexOffset++; - BlockSize += ChunkSize; - } - } - if (ChunkIndexStart < ChunkIndexes.size()) - { - std::vector<uint32_t> ChunksInBlock; - ChunksInBlock.reserve(ChunkIndexes.size() - ChunkIndexStart); - for (uint32_t AddIndexOffset = ChunkIndexStart; AddIndexOffset < ChunkIndexes.size(); AddIndexOffset++) - { - const uint32_t AddChunkIndex = ChunkIndexes[AddIndexOffset]; - ChunksInBlock.push_back(AddChunkIndex); - } - OutBlocks.emplace_back(std::move(ChunksInBlock)); - } - } - - CompositeBuffer CompressChunk(const std::filesystem::path& Path, - const ChunkedFolderContent& Content, - const ChunkedContentLookup& Lookup, - uint32_t ChunkIndex, - const std::filesystem::path& TempFolderPath, - LooseChunksStatistics& LooseChunksStats) - { - ZEN_TRACE_CPU("CompressChunk"); - ZEN_ASSERT(!TempFolderPath.empty()); - const IoHash& ChunkHash = Content.ChunkedContent.ChunkHashes[ChunkIndex]; - const uint64_t ChunkSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; - - const ChunkedContentLookup::ChunkSequenceLocation& Source = GetChunkSequenceLocations(Lookup, ChunkIndex)[0]; - const std::uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[Source.SequenceIndex]; - IoBuffer RawSource = IoBufferBuilder::MakeFromFile((Path / Content.Paths[PathIndex]).make_preferred(), Source.Offset, ChunkSize); - if (!RawSource) - { - throw std::runtime_error(fmt::format("Failed fetching chunk {}", ChunkHash)); - } - if (RawSource.GetSize() != ChunkSize) - { - throw std::runtime_error(fmt::format("Fetched chunk {} has invalid size", ChunkHash)); - } - - const bool ShouldCompressChunk = IsChunkCompressable(Content, Lookup, ChunkIndex); - const OodleCompressionLevel CompressionLevel = ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None; - - if (ShouldCompressChunk) - { - std::filesystem::path TempFilePath = TempFolderPath / ChunkHash.ToHexString(); - - BasicFile CompressedFile; - std::error_code Ec; - CompressedFile.Open(TempFilePath, BasicFile::Mode::kTruncateDelete, Ec); - if (Ec) - { - throw std::runtime_error( - fmt::format("Failed creating temporary file for compressing blob {}. Reason: {}", ChunkHash, Ec.message())); - } - - uint64_t StreamRawBytes = 0; - uint64_t StreamCompressedBytes = 0; - - bool CouldCompress = CompressedBuffer::CompressToStream( - CompositeBuffer(SharedBuffer(RawSource)), - [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { - ZEN_UNUSED(SourceOffset); - LooseChunksStats.CompressedChunkRawBytes += SourceSize; - CompressedFile.Write(RangeBuffer, Offset); - LooseChunksStats.CompressedChunkBytes += RangeBuffer.GetSize(); - StreamRawBytes += SourceSize; - StreamCompressedBytes += RangeBuffer.GetSize(); - }, - OodleCompressor::Mermaid, - CompressionLevel); - if (CouldCompress) - { - uint64_t CompressedSize = CompressedFile.FileSize(); - void* FileHandle = CompressedFile.Detach(); - IoBuffer TempPayload = IoBuffer(IoBuffer::File, - FileHandle, - 0, - CompressedSize, - /*IsWholeFile*/ true); - ZEN_ASSERT(TempPayload); - TempPayload.SetDeleteOnClose(true); - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(TempPayload), RawHash, RawSize); - ZEN_ASSERT(Compressed); - ZEN_ASSERT(RawHash == ChunkHash); - ZEN_ASSERT(RawSize == ChunkSize); - - LooseChunksStats.CompressedChunkCount++; - - return Compressed.GetCompressed(); - } - else - { - LooseChunksStats.CompressedChunkRawBytes -= StreamRawBytes; - LooseChunksStats.CompressedChunkBytes -= StreamCompressedBytes; - } - CompressedFile.Close(); - RemoveFile(TempFilePath, Ec); - ZEN_UNUSED(Ec); - } - - CompressedBuffer CompressedBlob = - CompressedBuffer::Compress(SharedBuffer(std::move(RawSource)), OodleCompressor::Mermaid, CompressionLevel); - if (!CompressedBlob) - { - throw std::runtime_error(fmt::format("Failed to compress large blob {}", ChunkHash)); - } - ZEN_ASSERT_SLOW(CompressedBlob.DecodeRawHash() == ChunkHash); - ZEN_ASSERT_SLOW(CompressedBlob.DecodeRawSize() == ChunkSize); - - LooseChunksStats.CompressedChunkRawBytes += ChunkSize; - LooseChunksStats.CompressedChunkBytes += CompressedBlob.GetCompressedSize(); - - // If we use none-compression, the compressed blob references the data and has 64 kb in memory so we don't need to write it to disk - if (ShouldCompressChunk) - { - IoBuffer TempPayload = WriteToTempFile(std::move(CompressedBlob).GetCompressed(), TempFolderPath, ChunkHash); - CompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(TempPayload)); - } - - LooseChunksStats.CompressedChunkCount++; - return std::move(CompressedBlob).GetCompressed(); - } - - struct GeneratedBlocks - { - std::vector<ChunkBlockDescription> BlockDescriptions; - std::vector<uint64_t> BlockSizes; - std::vector<CompositeBuffer> BlockHeaders; - std::vector<CbObject> BlockMetaDatas; - std::vector<uint8_t> - MetaDataHasBeenUploaded; // NOTE: Do not use std::vector<bool> here as this vector is modified by multiple threads - tsl::robin_map<IoHash, size_t, IoHash::Hasher> BlockHashToBlockIndex; - }; - - void GenerateBuildBlocks(const std::filesystem::path& Path, - const ChunkedFolderContent& Content, - const ChunkedContentLookup& Lookup, - StorageInstance& Storage, - const Oid& BuildId, - const std::vector<std::vector<uint32_t>>& NewBlockChunks, - GeneratedBlocks& OutBlocks, - DiskStatistics& DiskStats, - UploadStatistics& UploadStats, - GenerateBlocksStatistics& GenerateBlocksStats) - { - ZEN_TRACE_CPU("GenerateBuildBlocks"); - const std::size_t NewBlockCount = NewBlockChunks.size(); - if (NewBlockCount > 0) - { - ProgressBar ProgressBar(ProgressMode, "Generate Blocks"); - - OutBlocks.BlockDescriptions.resize(NewBlockCount); - OutBlocks.BlockSizes.resize(NewBlockCount); - OutBlocks.BlockMetaDatas.resize(NewBlockCount); - OutBlocks.BlockHeaders.resize(NewBlockCount); - OutBlocks.MetaDataHasBeenUploaded.resize(NewBlockCount, 0); - OutBlocks.BlockHashToBlockIndex.reserve(NewBlockCount); - - RwLock Lock; - - WorkerThreadPool& GenerateBlobsPool = GetIOWorkerPool(); - WorkerThreadPool& UploadBlocksPool = GetNetworkPool(); - - FilteredRate FilteredGeneratedBytesPerSecond; - FilteredRate FilteredUploadedBytesPerSecond; - - ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - - std::atomic<uint64_t> QueuedPendingBlocksForUpload = 0; - - for (size_t BlockIndex = 0; BlockIndex < NewBlockCount; BlockIndex++) - { - if (Work.IsAborted()) - { - break; - } - const std::vector<uint32_t>& ChunksInBlock = NewBlockChunks[BlockIndex]; - Work.ScheduleWork( - GenerateBlobsPool, - [&Storage, - BuildId, - &Path, - &Content, - &Lookup, - &Work, - &UploadBlocksPool, - NewBlockCount, - ChunksInBlock, - &Lock, - &OutBlocks, - &DiskStats, - &GenerateBlocksStats, - &UploadStats, - &FilteredGeneratedBytesPerSecond, - &QueuedPendingBlocksForUpload, - &FilteredUploadedBytesPerSecond, - BlockIndex](std::atomic<bool>&) { - if (!AbortFlag) - { - ZEN_TRACE_CPU("GenerateBuildBlocks_Generate"); - - FilteredGeneratedBytesPerSecond.Start(); - // TODO: Convert ScheduleWork body to function - - Stopwatch GenerateTimer; - CompressedBuffer CompressedBlock = - GenerateBlock(Path, Content, Lookup, ChunksInBlock, OutBlocks.BlockDescriptions[BlockIndex], DiskStats); - ZEN_CONSOLE_VERBOSE("Generated block {} ({}) containing {} chunks in {}", - OutBlocks.BlockDescriptions[BlockIndex].BlockHash, - NiceBytes(CompressedBlock.GetCompressedSize()), - OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size(), - NiceTimeSpanMs(GenerateTimer.GetElapsedTimeMs())); - - OutBlocks.BlockSizes[BlockIndex] = CompressedBlock.GetCompressedSize(); - { - CbObjectWriter Writer; - Writer.AddString("createdBy", "zen"); - OutBlocks.BlockMetaDatas[BlockIndex] = Writer.Save(); - } - GenerateBlocksStats.GeneratedBlockByteCount += OutBlocks.BlockSizes[BlockIndex]; - GenerateBlocksStats.GeneratedBlockCount++; - - Lock.WithExclusiveLock([&]() { - OutBlocks.BlockHashToBlockIndex.insert_or_assign(OutBlocks.BlockDescriptions[BlockIndex].BlockHash, - BlockIndex); - }); - - { - std::span<const SharedBuffer> Segments = CompressedBlock.GetCompressed().GetSegments(); - ZEN_ASSERT(Segments.size() >= 2); - OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]); - } - - if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount) - { - FilteredGeneratedBytesPerSecond.Stop(); - } - - if (QueuedPendingBlocksForUpload.load() > 16) - { - std::span<const SharedBuffer> Segments = CompressedBlock.GetCompressed().GetSegments(); - ZEN_ASSERT(Segments.size() >= 2); - OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]); - } - else - { - if (!AbortFlag) - { - QueuedPendingBlocksForUpload++; - - Work.ScheduleWork( - UploadBlocksPool, - [&Storage, - BuildId, - NewBlockCount, - &FilteredUploadedBytesPerSecond, - &QueuedPendingBlocksForUpload, - &GenerateBlocksStats, - &UploadStats, - &OutBlocks, - BlockIndex, - Payload = std::move(CompressedBlock)](std::atomic<bool>&) mutable { - auto _ = MakeGuard([&QueuedPendingBlocksForUpload] { QueuedPendingBlocksForUpload--; }); - if (!AbortFlag) - { - if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount) - { - ZEN_TRACE_CPU("GenerateBuildBlocks_Save"); - - FilteredUploadedBytesPerSecond.Stop(); - std::span<const SharedBuffer> Segments = Payload.GetCompressed().GetSegments(); - ZEN_ASSERT(Segments.size() >= 2); - OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]); - } - else - { - ZEN_TRACE_CPU("GenerateBuildBlocks_Upload"); - - FilteredUploadedBytesPerSecond.Start(); - // TODO: Convert ScheduleWork body to function - - const CbObject BlockMetaData = - BuildChunkBlockDescription(OutBlocks.BlockDescriptions[BlockIndex], - OutBlocks.BlockMetaDatas[BlockIndex]); - - const IoHash& BlockHash = OutBlocks.BlockDescriptions[BlockIndex].BlockHash; - const uint64_t CompressedBlockSize = Payload.GetCompressedSize(); - - if (Storage.BuildCacheStorage) - { - Storage.BuildCacheStorage->PutBuildBlob(BuildId, - BlockHash, - ZenContentType::kCompressedBinary, - Payload.GetCompressed()); - } - - Storage.BuildStorage->PutBuildBlob(BuildId, - BlockHash, - ZenContentType::kCompressedBinary, - std::move(Payload).GetCompressed()); - UploadStats.BlocksBytes += CompressedBlockSize; - - ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks", - BlockHash, - NiceBytes(CompressedBlockSize), - OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); - - if (Storage.BuildCacheStorage) - { - Storage.BuildCacheStorage->PutBlobMetadatas(BuildId, - std::vector<IoHash>({BlockHash}), - std::vector<CbObject>({BlockMetaData})); - } - - bool MetadataSucceeded = - Storage.BuildStorage->PutBlockMetadata(BuildId, BlockHash, BlockMetaData); - if (MetadataSucceeded) - { - ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})", - BlockHash, - NiceBytes(BlockMetaData.GetSize())); - - OutBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; - UploadStats.BlocksBytes += BlockMetaData.GetSize(); - } - - UploadStats.BlockCount++; - if (UploadStats.BlockCount == NewBlockCount) - { - FilteredUploadedBytesPerSecond.Stop(); - } - } - } - }); - } - } - } - }); - } - - Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(PendingWork); - - FilteredGeneratedBytesPerSecond.Update(GenerateBlocksStats.GeneratedBlockByteCount.load()); - FilteredUploadedBytesPerSecond.Update(UploadStats.BlocksBytes.load()); - - std::string Details = fmt::format("Generated {}/{} ({}, {}B/s). Uploaded {}/{} ({}, {}bits/s)", - GenerateBlocksStats.GeneratedBlockCount.load(), - NewBlockCount, - NiceBytes(GenerateBlocksStats.GeneratedBlockByteCount.load()), - NiceNum(FilteredGeneratedBytesPerSecond.GetCurrent()), - UploadStats.BlockCount.load(), - NewBlockCount, - NiceBytes(UploadStats.BlocksBytes.load()), - NiceNum(FilteredUploadedBytesPerSecond.GetCurrent() * 8)); - - ProgressBar.UpdateState( - {.Task = "Generating blocks", - .Details = Details, - .TotalCount = gsl::narrow<uint64_t>(NewBlockCount), - .RemainingCount = gsl::narrow<uint64_t>(NewBlockCount - GenerateBlocksStats.GeneratedBlockCount.load()), - .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, - false); - }); - - ZEN_ASSERT(AbortFlag || QueuedPendingBlocksForUpload.load() == 0); - - ProgressBar.Finish(); - - GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGeneratedBytesPerSecond.GetElapsedTimeUS(); - UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS(); - } - } - - void UploadPartBlobs(StorageInstance& Storage, - const Oid& BuildId, - const std::filesystem::path& Path, - const std::filesystem::path& TempDir, - const ChunkedFolderContent& Content, - const ChunkedContentLookup& Lookup, - std::span<IoHash> RawHashes, - const std::vector<std::vector<uint32_t>>& NewBlockChunks, - GeneratedBlocks& NewBlocks, - std::span<const uint32_t> LooseChunkIndexes, - const std::uint64_t LargeAttachmentSize, - DiskStatistics& DiskStats, - UploadStatistics& UploadStats, - LooseChunksStatistics& LooseChunksStats) - { - ZEN_TRACE_CPU("UploadPartBlobs"); - { - ProgressBar ProgressBar(ProgressMode, "Upload Blobs"); - - WorkerThreadPool& ReadChunkPool = GetIOWorkerPool(); - WorkerThreadPool& UploadChunkPool = GetNetworkPool(); - - FilteredRate FilteredGenerateBlockBytesPerSecond; - FilteredRate FilteredCompressedBytesPerSecond; - FilteredRate FilteredUploadedBytesPerSecond; - - ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - - std::atomic<size_t> UploadedBlockSize = 0; - std::atomic<size_t> UploadedBlockCount = 0; - std::atomic<size_t> UploadedRawChunkSize = 0; - std::atomic<size_t> UploadedCompressedChunkSize = 0; - std::atomic<uint32_t> UploadedChunkCount = 0; - - tsl::robin_map<uint32_t, uint32_t> ChunkIndexToLooseChunkOrderIndex; - ChunkIndexToLooseChunkOrderIndex.reserve(LooseChunkIndexes.size()); - for (uint32_t OrderIndex = 0; OrderIndex < LooseChunkIndexes.size(); OrderIndex++) - { - ChunkIndexToLooseChunkOrderIndex.insert_or_assign(LooseChunkIndexes[OrderIndex], OrderIndex); - } - - std::vector<size_t> BlockIndexes; - std::vector<uint32_t> LooseChunkOrderIndexes; - - uint64_t TotalLooseChunksSize = 0; - uint64_t TotalBlocksSize = 0; - for (const IoHash& RawHash : RawHashes) - { - if (auto It = NewBlocks.BlockHashToBlockIndex.find(RawHash); It != NewBlocks.BlockHashToBlockIndex.end()) - { - BlockIndexes.push_back(It->second); - TotalBlocksSize += NewBlocks.BlockSizes[It->second]; - } - else if (auto ChunkIndexIt = Lookup.ChunkHashToChunkIndex.find(RawHash); ChunkIndexIt != Lookup.ChunkHashToChunkIndex.end()) - { - const uint32_t ChunkIndex = ChunkIndexIt->second; - if (auto LooseOrderIndexIt = ChunkIndexToLooseChunkOrderIndex.find(ChunkIndex); - LooseOrderIndexIt != ChunkIndexToLooseChunkOrderIndex.end()) - { - LooseChunkOrderIndexes.push_back(LooseOrderIndexIt->second); - TotalLooseChunksSize += Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; - } - } - else - { - ZEN_CONSOLE_WARN( - "Build blob {} was reported as needed for upload but it was reported as existing at the start of the " - "operation. Treating it as a transient inconsistent issue and will attempt to retry finalization", - RawHash); - } - } - uint64_t TotalRawSize = TotalLooseChunksSize + TotalBlocksSize; - - const size_t UploadBlockCount = BlockIndexes.size(); - const uint32_t UploadChunkCount = gsl::narrow<uint32_t>(LooseChunkOrderIndexes.size()); - - auto AsyncUploadBlock = [&Storage, - &BuildId, - &Work, - &TempDir, - &NewBlocks, - UploadBlockCount, - &UploadedBlockCount, - UploadChunkCount, - &UploadedChunkCount, - &UploadedBlockSize, - &UploadStats, - &FilteredUploadedBytesPerSecond, - &UploadChunkPool](const size_t BlockIndex, - const IoHash BlockHash, - CompositeBuffer&& Payload, - std::atomic<uint64_t>& QueuedPendingInMemoryBlocksForUpload) { - bool IsInMemoryBlock = true; - if (QueuedPendingInMemoryBlocksForUpload.load() > 16) - { - ZEN_TRACE_CPU("AsyncUploadBlock_WriteTempBlock"); - Payload = CompositeBuffer(WriteToTempFile(std::move(Payload), TempDir, BlockHash)); - IsInMemoryBlock = false; - } - else - { - QueuedPendingInMemoryBlocksForUpload++; - } - - Work.ScheduleWork( - UploadChunkPool, - [&Storage, - &BuildId, - &QueuedPendingInMemoryBlocksForUpload, - &NewBlocks, - UploadBlockCount, - &UploadedBlockCount, - UploadChunkCount, - &UploadedChunkCount, - &UploadedBlockSize, - &UploadStats, - &FilteredUploadedBytesPerSecond, - IsInMemoryBlock, - BlockIndex, - BlockHash, - Payload = std::move(Payload)](std::atomic<bool>&) mutable { - auto _ = MakeGuard([IsInMemoryBlock, &QueuedPendingInMemoryBlocksForUpload] { - if (IsInMemoryBlock) - { - QueuedPendingInMemoryBlocksForUpload--; - } - }); - if (!AbortFlag) - { - ZEN_TRACE_CPU("AsyncUploadBlock"); - - const uint64_t PayloadSize = Payload.GetSize(); - - FilteredUploadedBytesPerSecond.Start(); - const CbObject BlockMetaData = - BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]); - - if (Storage.BuildCacheStorage) - { - Storage.BuildCacheStorage->PutBuildBlob(BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload); - } - Storage.BuildStorage->PutBuildBlob(BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload); - ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks", - BlockHash, - NiceBytes(PayloadSize), - NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); - UploadedBlockSize += PayloadSize; - UploadStats.BlocksBytes += PayloadSize; - - if (Storage.BuildCacheStorage) - { - Storage.BuildCacheStorage->PutBlobMetadatas(BuildId, - std::vector<IoHash>({BlockHash}), - std::vector<CbObject>({BlockMetaData})); - } - bool MetadataSucceeded = Storage.BuildStorage->PutBlockMetadata(BuildId, BlockHash, BlockMetaData); - if (MetadataSucceeded) - { - ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})", BlockHash, NiceBytes(BlockMetaData.GetSize())); - - NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; - UploadStats.BlocksBytes += BlockMetaData.GetSize(); - } - - UploadStats.BlockCount++; - - UploadedBlockCount++; - if (UploadedBlockCount == UploadBlockCount && UploadedChunkCount == UploadChunkCount) - { - FilteredUploadedBytesPerSecond.Stop(); - } - } - }); - }; - - auto AsyncUploadLooseChunk = [&Storage, - BuildId, - LargeAttachmentSize, - &Work, - &UploadChunkPool, - &FilteredUploadedBytesPerSecond, - &UploadedBlockCount, - &UploadedChunkCount, - UploadBlockCount, - UploadChunkCount, - &UploadedCompressedChunkSize, - &UploadedRawChunkSize, - &UploadStats](const IoHash& RawHash, uint64_t RawSize, CompositeBuffer&& Payload) { - Work.ScheduleWork( - UploadChunkPool, - [&Storage, - BuildId, - &Work, - LargeAttachmentSize, - &FilteredUploadedBytesPerSecond, - &UploadChunkPool, - &UploadedBlockCount, - &UploadedChunkCount, - UploadBlockCount, - UploadChunkCount, - &UploadedCompressedChunkSize, - &UploadedRawChunkSize, - &UploadStats, - RawHash, - RawSize, - Payload = CompositeBuffer(std::move(Payload))](std::atomic<bool>&) mutable { - if (!AbortFlag) - { - ZEN_TRACE_CPU("AsyncUploadLooseChunk"); - - const uint64_t PayloadSize = Payload.GetSize(); - - if (Storage.BuildCacheStorage) - { - Storage.BuildCacheStorage->PutBuildBlob(BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); - } - - if (PayloadSize >= LargeAttachmentSize) - { - ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart"); - UploadStats.MultipartAttachmentCount++; - std::vector<std::function<void()>> MultipartWork = Storage.BuildStorage->PutLargeBuildBlob( - BuildId, - RawHash, - ZenContentType::kCompressedBinary, - PayloadSize, - [Payload = std::move(Payload), &FilteredUploadedBytesPerSecond](uint64_t Offset, - uint64_t Size) mutable -> IoBuffer { - FilteredUploadedBytesPerSecond.Start(); - - IoBuffer PartPayload = Payload.Mid(Offset, Size).Flatten().AsIoBuffer(); - PartPayload.SetContentType(ZenContentType::kBinary); - return PartPayload; - }, - [RawSize, - &UploadStats, - &UploadedCompressedChunkSize, - &UploadChunkPool, - &UploadedBlockCount, - UploadBlockCount, - &UploadedChunkCount, - UploadChunkCount, - &FilteredUploadedBytesPerSecond, - &UploadedRawChunkSize](uint64_t SentBytes, bool IsComplete) { - UploadStats.ChunksBytes += SentBytes; - UploadedCompressedChunkSize += SentBytes; - if (IsComplete) - { - UploadStats.ChunkCount++; - UploadedChunkCount++; - if (UploadedBlockCount == UploadBlockCount && UploadedChunkCount == UploadChunkCount) - { - FilteredUploadedBytesPerSecond.Stop(); - } - UploadedRawChunkSize += RawSize; - } - }); - for (auto& WorkPart : MultipartWork) - { - Work.ScheduleWork(UploadChunkPool, [Work = std::move(WorkPart)](std::atomic<bool>&) { - ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart_Work"); - if (!AbortFlag) - { - Work(); - } - }); - } - ZEN_CONSOLE_VERBOSE("Uploaded multipart chunk {} ({})", RawHash, NiceBytes(PayloadSize)); - } - else - { - ZEN_TRACE_CPU("AsyncUploadLooseChunk_Singlepart"); - Storage.BuildStorage->PutBuildBlob(BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); - ZEN_CONSOLE_VERBOSE("Uploaded chunk {} ({})", RawHash, NiceBytes(PayloadSize)); - UploadStats.ChunksBytes += Payload.GetSize(); - UploadStats.ChunkCount++; - UploadedCompressedChunkSize += Payload.GetSize(); - UploadedRawChunkSize += RawSize; - UploadedChunkCount++; - if (UploadedChunkCount == UploadChunkCount) - { - FilteredUploadedBytesPerSecond.Stop(); - } - } - } - }); - }; - - std::vector<size_t> GenerateBlockIndexes; - - std::atomic<uint64_t> GeneratedBlockCount = 0; - std::atomic<uint64_t> GeneratedBlockByteCount = 0; - - std::atomic<uint64_t> QueuedPendingInMemoryBlocksForUpload = 0; - - // Start generation of any non-prebuilt blocks and schedule upload - for (const size_t BlockIndex : BlockIndexes) - { - const IoHash& BlockHash = NewBlocks.BlockDescriptions[BlockIndex].BlockHash; - if (!AbortFlag) - { - Work.ScheduleWork( - ReadChunkPool, - [BlockHash = IoHash(BlockHash), - BlockIndex, - &FilteredGenerateBlockBytesPerSecond, - Path, - &Content, - &Lookup, - &NewBlocks, - &NewBlockChunks, - &GenerateBlockIndexes, - &GeneratedBlockCount, - &GeneratedBlockByteCount, - &DiskStats, - &AsyncUploadBlock, - &QueuedPendingInMemoryBlocksForUpload](std::atomic<bool>&) { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UploadPartBlobs_GenerateBlock"); - - FilteredGenerateBlockBytesPerSecond.Start(); - - Stopwatch GenerateTimer; - CompositeBuffer Payload; - if (NewBlocks.BlockHeaders[BlockIndex]) - { - Payload = RebuildBlock(Path, - Content, - Lookup, - std::move(NewBlocks.BlockHeaders[BlockIndex]), - NewBlockChunks[BlockIndex], - DiskStats) - .GetCompressed(); - } - else - { - ChunkBlockDescription BlockDescription; - CompressedBuffer CompressedBlock = - GenerateBlock(Path, Content, Lookup, NewBlockChunks[BlockIndex], BlockDescription, DiskStats); - if (!CompressedBlock) - { - throw std::runtime_error(fmt::format("Failed generating block {}", BlockHash)); - } - ZEN_ASSERT(BlockDescription.BlockHash == BlockHash); - Payload = std::move(CompressedBlock).GetCompressed(); - } - - GeneratedBlockByteCount += NewBlocks.BlockSizes[BlockIndex]; - GeneratedBlockCount++; - if (GeneratedBlockCount == GenerateBlockIndexes.size()) - { - FilteredGenerateBlockBytesPerSecond.Stop(); - } - ZEN_CONSOLE_VERBOSE("{} block {} ({}) containing {} chunks in {}", - NewBlocks.BlockHeaders[BlockIndex] ? "Regenerated" : "Generated", - NewBlocks.BlockDescriptions[BlockIndex].BlockHash, - NiceBytes(NewBlocks.BlockSizes[BlockIndex]), - NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size(), - NiceTimeSpanMs(GenerateTimer.GetElapsedTimeMs())); - if (!AbortFlag) - { - AsyncUploadBlock(BlockIndex, BlockHash, std::move(Payload), QueuedPendingInMemoryBlocksForUpload); - } - } - }); - } - } - - // Start compression of any non-precompressed loose chunks and schedule upload - for (const uint32_t LooseChunkOrderIndex : LooseChunkOrderIndexes) - { - const uint32_t ChunkIndex = LooseChunkIndexes[LooseChunkOrderIndex]; - Work.ScheduleWork( - ReadChunkPool, - [&Path, - &Content, - &Lookup, - &TempDir, - &LooseChunksStats, - &LooseChunkOrderIndexes, - &FilteredCompressedBytesPerSecond, - &UploadStats, - &AsyncUploadLooseChunk, - ChunkIndex](std::atomic<bool>&) { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UploadPartBlobs_CompressChunk"); - - FilteredCompressedBytesPerSecond.Start(); - Stopwatch CompressTimer; - CompositeBuffer Payload = CompressChunk(Path, Content, Lookup, ChunkIndex, TempDir, LooseChunksStats); - ZEN_CONSOLE_VERBOSE("Compressed chunk {} ({} -> {}) in {}", - Content.ChunkedContent.ChunkHashes[ChunkIndex], - NiceBytes(Content.ChunkedContent.ChunkRawSizes[ChunkIndex]), - NiceBytes(Payload.GetSize()), - NiceTimeSpanMs(CompressTimer.GetElapsedTimeMs())); - const uint64_t ChunkRawSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; - UploadStats.ReadFromDiskBytes += ChunkRawSize; - if (LooseChunksStats.CompressedChunkCount == LooseChunkOrderIndexes.size()) - { - FilteredCompressedBytesPerSecond.Stop(); - } - if (!AbortFlag) - { - AsyncUploadLooseChunk(Content.ChunkedContent.ChunkHashes[ChunkIndex], ChunkRawSize, std::move(Payload)); - } - } - }); - } - - Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(PendingWork); - FilteredCompressedBytesPerSecond.Update(LooseChunksStats.CompressedChunkRawBytes.load()); - FilteredGenerateBlockBytesPerSecond.Update(GeneratedBlockByteCount.load()); - FilteredUploadedBytesPerSecond.Update(UploadedCompressedChunkSize.load() + UploadedBlockSize.load()); - uint64_t UploadedRawSize = UploadedRawChunkSize.load() + UploadedBlockSize.load(); - uint64_t UploadedCompressedSize = UploadedCompressedChunkSize.load() + UploadedBlockSize.load(); - - std::string Details = fmt::format( - "Compressed {}/{} ({}/{} {}B/s) chunks. " - "Uploaded {}/{} ({}/{}) blobs " - "({} {}bits/s)", - LooseChunksStats.CompressedChunkCount.load(), - LooseChunkOrderIndexes.size(), - NiceBytes(LooseChunksStats.CompressedChunkRawBytes), - NiceBytes(TotalLooseChunksSize), - NiceNum(FilteredCompressedBytesPerSecond.GetCurrent()), - - UploadedBlockCount.load() + UploadedChunkCount.load(), - UploadBlockCount + UploadChunkCount, - NiceBytes(UploadedRawSize), - NiceBytes(TotalRawSize), - - NiceBytes(UploadedCompressedSize), - NiceNum(FilteredUploadedBytesPerSecond.GetCurrent())); - - ProgressBar.UpdateState({.Task = "Uploading blobs ", - .Details = Details, - .TotalCount = gsl::narrow<uint64_t>(TotalRawSize), - .RemainingCount = gsl::narrow<uint64_t>(TotalRawSize - UploadedRawSize), - .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, - false); - }); - - ZEN_ASSERT(AbortFlag || QueuedPendingInMemoryBlocksForUpload.load() == 0); - - ProgressBar.Finish(); - - UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS(); - LooseChunksStats.CompressChunksElapsedWallTimeUS = FilteredCompressedBytesPerSecond.GetElapsedTimeUS(); - } - } - - std::vector<size_t> FindReuseBlocks(const std::vector<ChunkBlockDescription>& KnownBlocks, - std::span<const IoHash> ChunkHashes, - std::span<const uint32_t> ChunkIndexes, - uint8_t MinPercentLimit, - std::vector<uint32_t>& OutUnusedChunkIndexes, - FindBlocksStatistics& FindBlocksStats) - { - ZEN_TRACE_CPU("FindReuseBlocks"); - - // Find all blocks with a usage level higher than MinPercentLimit - // Pick out the blocks with usage higher or equal to MinPercentLimit - // Sort them with highest size usage - most usage first - // Make a list of all chunks and mark them as not found - // For each block, recalculate the block has usage percent based on the chunks marked as not found - // If the block still reaches MinPercentLimit, keep it and remove the matching chunks from the not found list - // Repeat for following all remaining block that initially matched MinPercentLimit - - std::vector<size_t> FilteredReuseBlockIndexes; - - uint32_t ChunkCount = gsl::narrow<uint32_t>(ChunkHashes.size()); - std::vector<bool> ChunkFound(ChunkCount, false); - - if (ChunkCount > 0) - { - if (!KnownBlocks.empty()) - { - Stopwatch ReuseTimer; - - tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkHashToChunkIndex; - ChunkHashToChunkIndex.reserve(ChunkIndexes.size()); - for (uint32_t ChunkIndex : ChunkIndexes) - { - ChunkHashToChunkIndex.insert_or_assign(ChunkHashes[ChunkIndex], ChunkIndex); - } - - std::vector<size_t> BlockSizes(KnownBlocks.size(), 0); - std::vector<size_t> BlockUseSize(KnownBlocks.size(), 0); - - std::vector<size_t> ReuseBlockIndexes; - - for (size_t KnownBlockIndex = 0; KnownBlockIndex < KnownBlocks.size(); KnownBlockIndex++) - { - const ChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex]; - if (KnownBlock.BlockHash != IoHash::Zero && - KnownBlock.ChunkRawHashes.size() == KnownBlock.ChunkCompressedLengths.size()) - { - size_t BlockAttachmentCount = KnownBlock.ChunkRawHashes.size(); - if (BlockAttachmentCount == 0) - { - continue; - } - size_t ReuseSize = 0; - size_t BlockSize = 0; - size_t FoundAttachmentCount = 0; - size_t BlockChunkCount = KnownBlock.ChunkRawHashes.size(); - for (size_t BlockChunkIndex = 0; BlockChunkIndex < BlockChunkCount; BlockChunkIndex++) - { - const IoHash& BlockChunkHash = KnownBlock.ChunkRawHashes[BlockChunkIndex]; - const uint32_t BlockChunkSize = KnownBlock.ChunkCompressedLengths[BlockChunkIndex]; - BlockSize += BlockChunkSize; - if (ChunkHashToChunkIndex.contains(BlockChunkHash)) - { - ReuseSize += BlockChunkSize; - FoundAttachmentCount++; - } - } - - size_t ReusePercent = (ReuseSize * 100) / BlockSize; - - if (ReusePercent >= MinPercentLimit) - { - ZEN_CONSOLE_VERBOSE("Reusing block {}. {} attachments found, usage level: {}%", - KnownBlock.BlockHash, - FoundAttachmentCount, - ReusePercent); - ReuseBlockIndexes.push_back(KnownBlockIndex); - - BlockSizes[KnownBlockIndex] = BlockSize; - BlockUseSize[KnownBlockIndex] = ReuseSize; - } - else if (FoundAttachmentCount > 0) - { - // ZEN_CONSOLE_VERBOSE("Skipping block {}. {} attachments found, usage level: {}%", KnownBlock.BlockHash, - // FoundAttachmentCount, ReusePercent); - FindBlocksStats.RejectedBlockCount++; - FindBlocksStats.RejectedChunkCount += FoundAttachmentCount; - FindBlocksStats.RejectedByteCount += ReuseSize; - } - } - } - - if (!ReuseBlockIndexes.empty()) - { - std::sort(ReuseBlockIndexes.begin(), ReuseBlockIndexes.end(), [&](size_t Lhs, size_t Rhs) { - return BlockUseSize[Lhs] > BlockUseSize[Rhs]; - }); - - for (size_t KnownBlockIndex : ReuseBlockIndexes) - { - std::vector<uint32_t> FoundChunkIndexes; - size_t BlockSize = 0; - size_t AdjustedReuseSize = 0; - size_t AdjustedRawReuseSize = 0; - const ChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex]; - for (size_t BlockChunkIndex = 0; BlockChunkIndex < KnownBlock.ChunkRawHashes.size(); BlockChunkIndex++) - { - const IoHash& BlockChunkHash = KnownBlock.ChunkRawHashes[BlockChunkIndex]; - const uint32_t BlockChunkSize = KnownBlock.ChunkCompressedLengths[BlockChunkIndex]; - BlockSize += BlockChunkSize; - if (auto It = ChunkHashToChunkIndex.find(BlockChunkHash); It != ChunkHashToChunkIndex.end()) - { - const uint32_t ChunkIndex = It->second; - if (!ChunkFound[ChunkIndex]) - { - FoundChunkIndexes.push_back(ChunkIndex); - AdjustedReuseSize += KnownBlock.ChunkCompressedLengths[BlockChunkIndex]; - AdjustedRawReuseSize += KnownBlock.ChunkRawLengths[BlockChunkIndex]; - } - } - } - - size_t ReusePercent = (AdjustedReuseSize * 100) / BlockSize; - - if (ReusePercent >= MinPercentLimit) - { - ZEN_CONSOLE_VERBOSE("Reusing block {}. {} attachments found, usage level: {}%", - KnownBlock.BlockHash, - FoundChunkIndexes.size(), - ReusePercent); - FilteredReuseBlockIndexes.push_back(KnownBlockIndex); - - for (uint32_t ChunkIndex : FoundChunkIndexes) - { - ChunkFound[ChunkIndex] = true; - } - FindBlocksStats.AcceptedChunkCount += FoundChunkIndexes.size(); - FindBlocksStats.AcceptedByteCount += AdjustedReuseSize; - FindBlocksStats.AcceptedRawByteCount += AdjustedRawReuseSize; - FindBlocksStats.AcceptedReduntantChunkCount += KnownBlock.ChunkRawHashes.size() - FoundChunkIndexes.size(); - FindBlocksStats.AcceptedReduntantByteCount += BlockSize - AdjustedReuseSize; - } - else - { - // ZEN_CONSOLE_VERBOSE("Skipping block {}. filtered usage level: {}%", KnownBlock.BlockHash, ReusePercent); - FindBlocksStats.RejectedBlockCount++; - FindBlocksStats.RejectedChunkCount += FoundChunkIndexes.size(); - FindBlocksStats.RejectedByteCount += AdjustedReuseSize; - } - } - } - } - OutUnusedChunkIndexes.reserve(ChunkIndexes.size() - FindBlocksStats.AcceptedChunkCount); - for (uint32_t ChunkIndex : ChunkIndexes) - { - if (!ChunkFound[ChunkIndex]) - { - OutUnusedChunkIndexes.push_back(ChunkIndex); - } - } - } - return FilteredReuseBlockIndexes; - }; - void UploadFolder(StorageInstance& Storage, const Oid& BuildId, const Oid& BuildPartId, @@ -3110,1104 +1573,242 @@ namespace { bool AllowMultiparts, const CbObject& MetaData, bool CreateBuild, - bool IgnoreExistingBlocks, - bool PostUploadVerify) + bool IgnoreExistingBlocks) { - ZEN_TRACE_CPU("UploadFolder"); - ProgressBar::SetLogOperationName(ProgressMode, "Upload Folder"); - - enum TaskSteps : uint32_t { - PrepareBuild, - CalculateDelta, - Upload, - Validate, - Cleanup, - StepCount - }; - - auto EndProgress = - MakeGuard([&]() { ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::StepCount, TaskSteps::StepCount); }); + Stopwatch UploadTimer; - Stopwatch ProcessTimer; + ConsoleOpLogOutput Output(ProgressMode); - CreateDirectories(TempDir); - CleanDirectory(TempDir, {}); - auto _ = MakeGuard([&]() { - if (CleanDirectory(TempDir, {})) + BuildsOperationUploadFolder UploadOp( + Output, + Storage, + AbortFlag, + PauseFlag, + GetIOWorkerPool(), + GetNetworkPool(), + BuildId, + BuildPartId, + BuildPartName, + Path, + ManifestPath, + CreateBuild, + std::move(MetaData), + BuildsOperationUploadFolder::Options{.IsQuiet = IsQuiet, + .IsVerbose = IsVerbose, + .FindBlockMaxCount = FindBlockMaxCount, + .BlockReuseMinPercentLimit = BlockReuseMinPercentLimit, + .AllowMultiparts = AllowMultiparts, + .IgnoreExistingBlocks = IgnoreExistingBlocks, + .TempDir = TempDir, + .ExcludeFolders = DefaultExcludeFolders, + .ExcludeExtensions = DefaultExcludeExtensions, + .ZenExcludeManifestName = ZenExcludeManifestName}); + UploadOp.Execute(); + if (AbortFlag) { - std::error_code DummyEc; - RemoveDir(TempDir, DummyEc); + return; } - }); - - ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::PrepareBuild, TaskSteps::StepCount); - - std::uint64_t TotalRawSize = 0; - - CbObject ChunkerParameters; - - struct PrepareBuildResult - { - std::vector<ChunkBlockDescription> KnownBlocks; - uint64_t PreferredMultipartChunkSize = DefaultPreferredMultipartChunkSize; - uint64_t PayloadSize = 0; - uint64_t PrepareBuildTimeMs = 0; - uint64_t FindBlocksTimeMs = 0; - uint64_t ElapsedTimeMs = 0; - }; - - FindBlocksStatistics FindBlocksStats; - - std::future<PrepareBuildResult> PrepBuildResultFuture = GetNetworkPool().EnqueueTask( - std::packaged_task<PrepareBuildResult()>{ - [&Storage, BuildId, FindBlockMaxCount, &MetaData, CreateBuild, AllowMultiparts, IgnoreExistingBlocks, &FindBlocksStats] { - ZEN_TRACE_CPU("PrepareBuild"); - - PrepareBuildResult Result; - Stopwatch Timer; - if (CreateBuild) - { - ZEN_TRACE_CPU("CreateBuild"); - - Stopwatch PutBuildTimer; - CbObject PutBuildResult = Storage.BuildStorage->PutBuild(BuildId, MetaData); - Result.PrepareBuildTimeMs = PutBuildTimer.GetElapsedTimeMs(); - Result.PreferredMultipartChunkSize = PutBuildResult["chunkSize"sv].AsUInt64(Result.PreferredMultipartChunkSize); - Result.PayloadSize = MetaData.GetSize(); - } - else - { - ZEN_TRACE_CPU("PutBuild"); - Stopwatch GetBuildTimer; - CbObject Build = Storage.BuildStorage->GetBuild(BuildId); - Result.PrepareBuildTimeMs = GetBuildTimer.GetElapsedTimeMs(); - Result.PayloadSize = Build.GetSize(); - if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0) - { - Result.PreferredMultipartChunkSize = ChunkSize; - } - else if (AllowMultiparts) - { - ZEN_CONSOLE_WARN("PreferredMultipartChunkSize is unknown. Defaulting to '{}'", - NiceBytes(Result.PreferredMultipartChunkSize)); - } - } - if (!IgnoreExistingBlocks) - { - ZEN_TRACE_CPU("FindBlocks"); - Stopwatch KnownBlocksTimer; - CbObject BlockDescriptionList = Storage.BuildStorage->FindBlocks(BuildId, FindBlockMaxCount); - if (BlockDescriptionList) - { - Result.KnownBlocks = ParseChunkBlockDescriptionList(BlockDescriptionList); - } - FindBlocksStats.FindBlockTimeMS = KnownBlocksTimer.GetElapsedTimeMs(); - FindBlocksStats.FoundBlockCount = Result.KnownBlocks.size(); - Result.FindBlocksTimeMs = KnownBlocksTimer.GetElapsedTimeMs(); - } - Result.ElapsedTimeMs = Timer.GetElapsedTimeMs(); - return Result; - }}, - WorkerThreadPool::EMode::EnableBacklog); - - ChunkedFolderContent LocalContent; + ZEN_CONSOLE_VERBOSE( + "Folder scanning stats:" + "\n FoundFileCount: {}" + "\n FoundFileByteCount: {}" + "\n AcceptedFileCount: {}" + "\n AcceptedFileByteCount: {}" + "\n ElapsedWallTimeUS: {}", + UploadOp.m_LocalFolderScanStats.FoundFileCount.load(), + NiceBytes(UploadOp.m_LocalFolderScanStats.FoundFileByteCount.load()), + UploadOp.m_LocalFolderScanStats.AcceptedFileCount.load(), + NiceBytes(UploadOp.m_LocalFolderScanStats.AcceptedFileByteCount.load()), + NiceLatencyNs(UploadOp.m_LocalFolderScanStats.ElapsedWallTimeUS * 1000)); - GetFolderContentStatistics LocalFolderScanStats; - ChunkingStatistics ChunkingStats; - { - auto IsAcceptedFolder = [ExcludeFolders = DefaultExcludeFolders](const std::string_view& RelativePath) -> bool { - for (const std::string& ExcludeFolder : ExcludeFolders) - { - if (RelativePath.starts_with(ExcludeFolder)) - { - if (RelativePath.length() == ExcludeFolder.length()) - { - return false; - } - else if (RelativePath[ExcludeFolder.length()] == '/') - { - return false; - } - } - } - return true; - }; + ZEN_CONSOLE_VERBOSE( + "Chunking stats:" + "\n FilesProcessed: {}" + "\n FilesChunked: {}" + "\n BytesHashed: {}" + "\n UniqueChunksFound: {}" + "\n UniqueSequencesFound: {}" + "\n UniqueBytesFound: {}" + "\n ElapsedWallTimeUS: {}", + UploadOp.m_ChunkingStats.FilesProcessed.load(), + UploadOp.m_ChunkingStats.FilesChunked.load(), + NiceBytes(UploadOp.m_ChunkingStats.BytesHashed.load()), + UploadOp.m_ChunkingStats.UniqueChunksFound.load(), + UploadOp.m_ChunkingStats.UniqueSequencesFound.load(), + NiceBytes(UploadOp.m_ChunkingStats.UniqueBytesFound.load()), + NiceLatencyNs(UploadOp.m_ChunkingStats.ElapsedWallTimeUS * 1000)); - auto IsAcceptedFile = [ExcludeExtensions = - DefaultExcludeExtensions](const std::string_view& RelativePath, uint64_t, uint32_t) -> bool { - for (const std::string& ExcludeExtension : ExcludeExtensions) - { - if (RelativePath.ends_with(ExcludeExtension)) - { - return false; - } - } - return true; - }; + ZEN_CONSOLE_VERBOSE( + "Find block stats:" + "\n FindBlockTimeMS: {}" + "\n PotentialChunkCount: {}" + "\n PotentialChunkByteCount: {}" + "\n FoundBlockCount: {}" + "\n FoundBlockChunkCount: {}" + "\n FoundBlockByteCount: {}" + "\n AcceptedBlockCount: {}" + "\n AcceptedChunkCount: {}" + "\n AcceptedByteCount: {}" + "\n AcceptedRawByteCount: {}" + "\n RejectedBlockCount: {}" + "\n RejectedChunkCount: {}" + "\n RejectedByteCount: {}" + "\n AcceptedReduntantChunkCount: {}" + "\n AcceptedReduntantByteCount: {}" + "\n NewBlocksCount: {}" + "\n NewBlocksChunkCount: {}" + "\n NewBlocksChunkByteCount: {}", + NiceTimeSpanMs(UploadOp.m_FindBlocksStats.FindBlockTimeMS), + UploadOp.m_FindBlocksStats.PotentialChunkCount, + NiceBytes(UploadOp.m_FindBlocksStats.PotentialChunkByteCount), + UploadOp.m_FindBlocksStats.FoundBlockCount, + UploadOp.m_FindBlocksStats.FoundBlockChunkCount, + NiceBytes(UploadOp.m_FindBlocksStats.FoundBlockByteCount), + UploadOp.m_FindBlocksStats.AcceptedBlockCount, + UploadOp.m_FindBlocksStats.AcceptedChunkCount, + NiceBytes(UploadOp.m_FindBlocksStats.AcceptedByteCount), + NiceBytes(UploadOp.m_FindBlocksStats.AcceptedRawByteCount), + UploadOp.m_FindBlocksStats.RejectedBlockCount, + UploadOp.m_FindBlocksStats.RejectedChunkCount, + NiceBytes(UploadOp.m_FindBlocksStats.RejectedByteCount), + UploadOp.m_FindBlocksStats.AcceptedReduntantChunkCount, + NiceBytes(UploadOp.m_FindBlocksStats.AcceptedReduntantByteCount), + UploadOp.m_FindBlocksStats.NewBlocksCount, + UploadOp.m_FindBlocksStats.NewBlocksChunkCount, + NiceBytes(UploadOp.m_FindBlocksStats.NewBlocksChunkByteCount)); - auto ParseManifest = [](const std::filesystem::path& Path, - const std::filesystem::path& ManifestPath) -> std::vector<std::filesystem::path> { - std::vector<std::filesystem::path> AssetPaths; - std::filesystem::path AbsoluteManifestPath = - MakeSafeAbsolutePath(ManifestPath.is_absolute() ? ManifestPath : Path / ManifestPath); - IoBuffer ManifestContent = ReadFile(AbsoluteManifestPath).Flatten(); - std::string_view ManifestString((const char*)ManifestContent.GetView().GetData(), ManifestContent.GetSize()); - std::string_view::size_type Offset = 0; - while (Offset < ManifestContent.GetSize()) - { - size_t PathBreakOffset = ManifestString.find_first_of("\t\r\n", Offset); - if (PathBreakOffset == std::string_view::npos) - { - PathBreakOffset = ManifestContent.GetSize(); - } - std::string_view AssetPath = ManifestString.substr(Offset, PathBreakOffset - Offset); - if (!AssetPath.empty()) - { - AssetPaths.emplace_back(std::filesystem::path(AssetPath)); - } - Offset = PathBreakOffset; - size_t EolOffset = ManifestString.find_first_of("\r\n", Offset); - if (EolOffset == std::string_view::npos) - { - break; - } - Offset = EolOffset; - size_t LineBreakOffset = ManifestString.find_first_not_of("\t\r\n", Offset); - if (LineBreakOffset == std::string_view::npos) - { - break; - } - Offset = LineBreakOffset; - } - return AssetPaths; - }; + ZEN_CONSOLE_VERBOSE( + "Generate blocks stats:" + "\n GeneratedBlockByteCount: {}" + "\n GeneratedBlockCount: {}" + "\n GenerateBlocksElapsedWallTimeUS: {}", + NiceBytes(UploadOp.m_GenerateBlocksStats.GeneratedBlockByteCount.load()), + UploadOp.m_GenerateBlocksStats.GeneratedBlockCount.load(), + NiceLatencyNs(UploadOp.m_GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS * 1000)); - Stopwatch ScanTimer; - FolderContent Content; - if (ManifestPath.empty()) - { - std::filesystem::path ExcludeManifestPath = Path / ZenExcludeManifestName; - tsl::robin_set<std::string> ExcludeAssetPaths; - if (IsFile(ExcludeManifestPath)) - { - std::vector<std::filesystem::path> AssetPaths = ParseManifest(Path, ExcludeManifestPath); - ExcludeAssetPaths.reserve(AssetPaths.size()); - for (const std::filesystem::path& AssetPath : AssetPaths) - { - ExcludeAssetPaths.insert(AssetPath.generic_string()); - } - } - Content = GetFolderContent( - LocalFolderScanStats, - Path, - std::move(IsAcceptedFolder), - [&IsAcceptedFile, - &ExcludeAssetPaths](const std::string_view& RelativePath, uint64_t Size, uint32_t Attributes) -> bool { - if (RelativePath == ZenExcludeManifestName) - { - return false; - } - if (!IsAcceptedFile(RelativePath, Size, Attributes)) - { - return false; - } - if (ExcludeAssetPaths.contains(std::filesystem::path(RelativePath).generic_string())) - { - return false; - } - return true; - }, - GetIOWorkerPool(), - GetUpdateDelayMS(ProgressMode), - [&](bool, std::ptrdiff_t) { - ZEN_CONSOLE_VERBOSE("Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), Path); - }, - AbortFlag); - } - else - { - Stopwatch ManifestParseTimer; - std::vector<std::filesystem::path> AssetPaths = ParseManifest(Path, ManifestPath); - for (const std::filesystem::path& AssetPath : AssetPaths) - { - Content.Paths.push_back(AssetPath); - const std::filesystem::path AssetFilePath = (Path / AssetPath).make_preferred(); - Content.RawSizes.push_back(FileSizeFromPath(AssetFilePath)); -#if ZEN_PLATFORM_WINDOWS - Content.Attributes.push_back(GetFileAttributesFromPath(AssetFilePath)); -#endif // ZEN_PLATFORM_WINDOWS -#if ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX - Content.Attributes.push_back(GetFileMode(AssetFilePath)); -#endif // ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX - LocalFolderScanStats.AcceptedFileByteCount += Content.RawSizes.back(); - LocalFolderScanStats.AcceptedFileCount++; - } - if (ManifestPath.is_relative()) - { - Content.Paths.push_back(ManifestPath); - const std::filesystem::path ManifestFilePath = (Path / ManifestPath).make_preferred(); - Content.RawSizes.push_back(FileSizeFromPath(ManifestFilePath)); -#if ZEN_PLATFORM_WINDOWS - Content.Attributes.push_back(GetFileAttributesFromPath(ManifestFilePath)); -#endif // ZEN_PLATFORM_WINDOWS -#if ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX - Content.Attributes.push_back(GetFileMode(ManifestFilePath)); -#endif // ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX + ZEN_CONSOLE_VERBOSE( + "Generate blocks stats:" + "\n ChunkCount: {}" + "\n ChunkByteCount: {}" + "\n CompressedChunkCount: {}" + "\n CompressChunksElapsedWallTimeUS: {}", + UploadOp.m_LooseChunksStats.ChunkCount, + NiceBytes(UploadOp.m_LooseChunksStats.ChunkByteCount), + UploadOp.m_LooseChunksStats.CompressedChunkCount.load(), + NiceBytes(UploadOp.m_LooseChunksStats.CompressedChunkBytes.load()), + NiceLatencyNs(UploadOp.m_LooseChunksStats.CompressChunksElapsedWallTimeUS * 1000)); - LocalFolderScanStats.AcceptedFileByteCount += Content.RawSizes.back(); - LocalFolderScanStats.AcceptedFileCount++; - } - LocalFolderScanStats.FoundFileByteCount.store(LocalFolderScanStats.AcceptedFileByteCount); - LocalFolderScanStats.FoundFileCount.store(LocalFolderScanStats.AcceptedFileCount); - LocalFolderScanStats.ElapsedWallTimeUS = ManifestParseTimer.GetElapsedTimeUs(); - } + ZEN_CONSOLE_VERBOSE( + "Disk stats:" + "\n OpenReadCount: {}" + "\n OpenWriteCount: {}" + "\n ReadCount: {}" + "\n ReadByteCount: {}" + "\n WriteCount: {} ({} cloned)" + "\n WriteByteCount: {} ({} cloned)" + "\n CurrentOpenFileCount: {}", + UploadOp.m_DiskStats.OpenReadCount.load(), + UploadOp.m_DiskStats.OpenWriteCount.load(), + UploadOp.m_DiskStats.ReadCount.load(), + NiceBytes(UploadOp.m_DiskStats.ReadByteCount.load()), + UploadOp.m_DiskStats.WriteCount.load(), + UploadOp.m_DiskStats.CloneCount.load(), + NiceBytes(UploadOp.m_DiskStats.WriteByteCount.load()), + NiceBytes(UploadOp.m_DiskStats.CloneByteCount.load()), + UploadOp.m_DiskStats.CurrentOpenFileCount.load()); - std::unique_ptr<ChunkingController> ChunkController = - CreateChunkingControllerWithFixedChunking(ChunkingControllerWithFixedChunkingSettings{}); - { - CbObjectWriter ChunkParametersWriter; - ChunkParametersWriter.AddString("name"sv, ChunkController->GetName()); - ChunkParametersWriter.AddObject("parameters"sv, ChunkController->GetParameters()); - ChunkerParameters = ChunkParametersWriter.Save(); - } - - TotalRawSize = std::accumulate(Content.RawSizes.begin(), Content.RawSizes.end(), std::uint64_t(0)); - - { - ProgressBar ProgressBar(ProgressMode, "Scan Files"); - FilteredRate FilteredBytesHashed; - FilteredBytesHashed.Start(); - LocalContent = ChunkFolderContent( - ChunkingStats, - GetIOWorkerPool(), - Path, - Content, - *ChunkController, - GetUpdateDelayMS(ProgressMode), - [&](bool IsAborted, bool IsPaused, std::ptrdiff_t) { - FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load()); - std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found", - ChunkingStats.FilesProcessed.load(), - Content.Paths.size(), - NiceBytes(ChunkingStats.BytesHashed.load()), - NiceBytes(TotalRawSize), - NiceNum(FilteredBytesHashed.GetCurrent()), - ChunkingStats.UniqueChunksFound.load(), - NiceBytes(ChunkingStats.UniqueBytesFound.load())); - ProgressBar.UpdateState({.Task = "Scanning files ", - .Details = Details, - .TotalCount = TotalRawSize, - .RemainingCount = TotalRawSize - ChunkingStats.BytesHashed.load(), - .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, - false); - }, - AbortFlag, - PauseFlag); - FilteredBytesHashed.Stop(); - ProgressBar.Finish(); - if (AbortFlag) - { - return; - } - } + ZEN_CONSOLE_VERBOSE( + "Upload stats:" + "\n BlockCount: {}" + "\n BlocksBytes: {}" + "\n ChunkCount: {}" + "\n ChunksBytes: {}" + "\n ReadFromDiskBytes: {}" + "\n MultipartAttachmentCount: {}" + "\n ElapsedWallTimeUS: {}", + UploadOp.m_UploadStats.BlockCount.load(), + NiceBytes(UploadOp.m_UploadStats.BlocksBytes.load()), + UploadOp.m_UploadStats.ChunkCount.load(), + NiceBytes(UploadOp.m_UploadStats.ChunksBytes.load()), + NiceBytes(UploadOp.m_UploadStats.ReadFromDiskBytes.load()), + UploadOp.m_UploadStats.MultipartAttachmentCount.load(), + NiceLatencyNs(UploadOp.m_UploadStats.ElapsedWallTimeUS * 1000)); + + const double DeltaByteCountPercent = + UploadOp.m_ChunkingStats.BytesHashed > 0 + ? (100.0 * (UploadOp.m_FindBlocksStats.NewBlocksChunkByteCount + UploadOp.m_LooseChunksStats.CompressedChunkBytes)) / + (UploadOp.m_ChunkingStats.BytesHashed) + : 0.0; + + const std::string MultipartAttachmentStats = + AllowMultiparts ? fmt::format(" ({} as multipart)", UploadOp.m_UploadStats.MultipartAttachmentCount.load()) : ""; if (!IsQuiet) { - ZEN_CONSOLE("Found {} ({}) files divided into {} ({}) unique chunks in '{}' in {}. Average hash rate {}B/sec", - LocalContent.Paths.size(), - NiceBytes(TotalRawSize), - ChunkingStats.UniqueChunksFound.load(), - NiceBytes(ChunkingStats.UniqueBytesFound.load()), - Path, - NiceTimeSpanMs(ScanTimer.GetElapsedTimeMs()), - NiceNum(GetBytesPerSecond(ChunkingStats.ElapsedWallTimeUS, ChunkingStats.BytesHashed))); - } - } - - const ChunkedContentLookup LocalLookup = BuildChunkedContentLookup(LocalContent); - - GenerateBlocksStatistics GenerateBlocksStats; - LooseChunksStatistics LooseChunksStats; - - std::vector<size_t> ReuseBlockIndexes; - std::vector<uint32_t> NewBlockChunkIndexes; - - PrepareBuildResult PrepBuildResult = PrepBuildResultFuture.get(); - - if (!IsQuiet) - { - ZEN_CONSOLE("Build prepare took {}. {} took {}, payload size {}{}", - NiceTimeSpanMs(PrepBuildResult.ElapsedTimeMs), - CreateBuild ? "PutBuild" : "GetBuild", - NiceTimeSpanMs(PrepBuildResult.PrepareBuildTimeMs), - NiceBytes(PrepBuildResult.PayloadSize), - IgnoreExistingBlocks ? "" - : fmt::format(". Found {} blocks in {}", - PrepBuildResult.KnownBlocks.size(), - NiceTimeSpanMs(PrepBuildResult.FindBlocksTimeMs))); - } - - ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::CalculateDelta, TaskSteps::StepCount); - - const std::uint64_t LargeAttachmentSize = AllowMultiparts ? PrepBuildResult.PreferredMultipartChunkSize * 4u : (std::uint64_t)-1; - - Stopwatch BlockArrangeTimer; - - std::vector<std::uint32_t> LooseChunkIndexes; - { - bool EnableBlocks = true; - std::vector<std::uint32_t> BlockChunkIndexes; - for (uint32_t ChunkIndex = 0; ChunkIndex < LocalContent.ChunkedContent.ChunkHashes.size(); ChunkIndex++) - { - const uint64_t ChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; - if (!EnableBlocks || ChunkRawSize == 0 || ChunkRawSize > DefaultChunksBlockParams.MaxChunkEmbedSize) - { - LooseChunkIndexes.push_back(ChunkIndex); - LooseChunksStats.ChunkByteCount += ChunkRawSize; - } - else - { - BlockChunkIndexes.push_back(ChunkIndex); - FindBlocksStats.PotentialChunkByteCount += ChunkRawSize; - } - } - FindBlocksStats.PotentialChunkCount = BlockChunkIndexes.size(); - LooseChunksStats.ChunkCount = LooseChunkIndexes.size(); - - if (IgnoreExistingBlocks) - { - if (!IsQuiet) - { - ZEN_CONSOLE("Ignoring any existing blocks in store"); - } - NewBlockChunkIndexes = std::move(BlockChunkIndexes); - } - else - { - ReuseBlockIndexes = FindReuseBlocks(PrepBuildResult.KnownBlocks, - LocalContent.ChunkedContent.ChunkHashes, - BlockChunkIndexes, - BlockReuseMinPercentLimit, - NewBlockChunkIndexes, - FindBlocksStats); - FindBlocksStats.AcceptedBlockCount = ReuseBlockIndexes.size(); - - for (const ChunkBlockDescription& Description : PrepBuildResult.KnownBlocks) - { - for (uint32_t ChunkRawLength : Description.ChunkRawLengths) - { - FindBlocksStats.FoundBlockByteCount += ChunkRawLength; - } - FindBlocksStats.FoundBlockChunkCount += Description.ChunkRawHashes.size(); - } - } - } - - std::vector<std::vector<uint32_t>> NewBlockChunks; - ArrangeChunksIntoBlocks(LocalContent, - LocalLookup, - DefaultChunksBlockParams.MaxBlockSize, - DefaultChunksBlockParams.MaxChunksPerBlock, - NewBlockChunkIndexes, - NewBlockChunks); - - FindBlocksStats.NewBlocksCount = NewBlockChunks.size(); - for (uint32_t ChunkIndex : NewBlockChunkIndexes) - { - FindBlocksStats.NewBlocksChunkByteCount += LocalContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; - } - FindBlocksStats.NewBlocksChunkCount = NewBlockChunkIndexes.size(); - - const double AcceptedByteCountPercent = - FindBlocksStats.PotentialChunkByteCount > 0 - ? (100.0 * FindBlocksStats.AcceptedRawByteCount / FindBlocksStats.PotentialChunkByteCount) - : 0.0; - - const double AcceptedReduntantByteCountPercent = - FindBlocksStats.AcceptedByteCount > 0 ? (100.0 * FindBlocksStats.AcceptedReduntantByteCount) / - (FindBlocksStats.AcceptedByteCount + FindBlocksStats.AcceptedReduntantByteCount) - : 0.0; - if (!IsQuiet) - { - ZEN_CONSOLE( - "Found {} chunks in {} ({}) blocks eligible for reuse in {}\n" - " Reusing {} ({}) matching chunks in {} blocks ({:.1f}%)\n" - " Accepting {} ({}) redundant chunks ({:.1f}%)\n" - " Rejected {} ({}) chunks in {} blocks\n" - " Arranged {} ({}) chunks in {} new blocks\n" - " Keeping {} ({}) chunks as loose chunks\n" - " Discovery completed in {}", - FindBlocksStats.FoundBlockChunkCount, - FindBlocksStats.FoundBlockCount, - NiceBytes(FindBlocksStats.FoundBlockByteCount), - NiceTimeSpanMs(FindBlocksStats.FindBlockTimeMS), - - FindBlocksStats.AcceptedChunkCount, - NiceBytes(FindBlocksStats.AcceptedRawByteCount), - FindBlocksStats.AcceptedBlockCount, - AcceptedByteCountPercent, - - FindBlocksStats.AcceptedReduntantChunkCount, - NiceBytes(FindBlocksStats.AcceptedReduntantByteCount), - AcceptedReduntantByteCountPercent, - - FindBlocksStats.RejectedChunkCount, - NiceBytes(FindBlocksStats.RejectedByteCount), - FindBlocksStats.RejectedBlockCount, - - FindBlocksStats.NewBlocksChunkCount, - NiceBytes(FindBlocksStats.NewBlocksChunkByteCount), - FindBlocksStats.NewBlocksCount, - - LooseChunksStats.ChunkCount, - NiceBytes(LooseChunksStats.ChunkByteCount), - - NiceTimeSpanMs(BlockArrangeTimer.GetElapsedTimeMs())); - } - - DiskStatistics DiskStats; - UploadStatistics UploadStats; - GeneratedBlocks NewBlocks; - - ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::Upload, TaskSteps::StepCount); - - if (!NewBlockChunks.empty()) - { - Stopwatch GenerateBuildBlocksTimer; - auto __ = MakeGuard([&]() { - uint64_t BlockGenerateTimeUs = GenerateBuildBlocksTimer.GetElapsedTimeUs(); - if (!IsQuiet) - { - ZEN_CONSOLE("Generated {} ({}) and uploaded {} ({}) blocks in {}. Generate speed: {}B/sec. Transfer speed {}bits/sec.", - GenerateBlocksStats.GeneratedBlockCount.load(), - NiceBytes(GenerateBlocksStats.GeneratedBlockByteCount), - UploadStats.BlockCount.load(), - NiceBytes(UploadStats.BlocksBytes.load()), - NiceTimeSpanMs(BlockGenerateTimeUs / 1000), - NiceNum(GetBytesPerSecond(GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS, - GenerateBlocksStats.GeneratedBlockByteCount)), - NiceNum(GetBytesPerSecond(UploadStats.ElapsedWallTimeUS, UploadStats.BlocksBytes * 8))); - } - }); - GenerateBuildBlocks(Path, - LocalContent, - LocalLookup, - Storage, - BuildId, - NewBlockChunks, - NewBlocks, - DiskStats, - UploadStats, - GenerateBlocksStats); - } - - CbObject PartManifest; - { - CbObjectWriter PartManifestWriter; - Stopwatch ManifestGenerationTimer; - auto __ = MakeGuard([&]() { - if (!IsQuiet) - { - ZEN_CONSOLE("Generated build part manifest in {} ({})", - NiceTimeSpanMs(ManifestGenerationTimer.GetElapsedTimeMs()), - NiceBytes(PartManifestWriter.GetSaveSize())); - } - }); - PartManifestWriter.AddObject("chunker"sv, ChunkerParameters); - - std::vector<IoHash> AllChunkBlockHashes; - std::vector<ChunkBlockDescription> AllChunkBlockDescriptions; - AllChunkBlockHashes.reserve(ReuseBlockIndexes.size() + NewBlocks.BlockDescriptions.size()); - AllChunkBlockDescriptions.reserve(ReuseBlockIndexes.size() + NewBlocks.BlockDescriptions.size()); - for (size_t ReuseBlockIndex : ReuseBlockIndexes) - { - AllChunkBlockDescriptions.push_back(PrepBuildResult.KnownBlocks[ReuseBlockIndex]); - AllChunkBlockHashes.push_back(PrepBuildResult.KnownBlocks[ReuseBlockIndex].BlockHash); - } - AllChunkBlockDescriptions.insert(AllChunkBlockDescriptions.end(), - NewBlocks.BlockDescriptions.begin(), - NewBlocks.BlockDescriptions.end()); - for (const ChunkBlockDescription& BlockDescription : NewBlocks.BlockDescriptions) - { - AllChunkBlockHashes.push_back(BlockDescription.BlockHash); - } -#if EXTRA_VERIFY - tsl::robin_map<IoHash, size_t, IoHash::Hasher> ChunkHashToAbsoluteChunkIndex; - std::vector<IoHash> AbsoluteChunkHashes; - AbsoluteChunkHashes.reserve(LocalContent.ChunkedContent.ChunkHashes.size()); - for (uint32_t ChunkIndex : LooseChunkIndexes) - { - ChunkHashToAbsoluteChunkIndex.insert({LocalContent.ChunkedContent.ChunkHashes[ChunkIndex], AbsoluteChunkHashes.size()}); - AbsoluteChunkHashes.push_back(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]); - } - for (const ChunkBlockDescription& Block : AllChunkBlockDescriptions) - { - for (const IoHash& ChunkHash : Block.ChunkHashes) - { - ChunkHashToAbsoluteChunkIndex.insert({ChunkHash, AbsoluteChunkHashes.size()}); - AbsoluteChunkHashes.push_back(ChunkHash); - } - } - for (const IoHash& ChunkHash : LocalContent.ChunkedContent.ChunkHashes) - { - ZEN_ASSERT(AbsoluteChunkHashes[ChunkHashToAbsoluteChunkIndex.at(ChunkHash)] == ChunkHash); - ZEN_ASSERT(LocalContent.ChunkedContent.ChunkHashes[LocalLookup.ChunkHashToChunkIndex.at(ChunkHash)] == ChunkHash); - } - for (const uint32_t ChunkIndex : LocalContent.ChunkedContent.ChunkOrders) - { - ZEN_ASSERT(AbsoluteChunkHashes[ChunkHashToAbsoluteChunkIndex.at(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex])] == - LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]); - ZEN_ASSERT(LocalLookup.ChunkHashToChunkIndex.at(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]) == ChunkIndex); - } -#endif // EXTRA_VERIFY - std::vector<uint32_t> AbsoluteChunkOrders = CalculateAbsoluteChunkOrders(LocalContent.ChunkedContent.ChunkHashes, - LocalContent.ChunkedContent.ChunkOrders, - LocalLookup.ChunkHashToChunkIndex, - LooseChunkIndexes, - AllChunkBlockDescriptions); - -#if EXTRA_VERIFY - for (uint32_t ChunkOrderIndex = 0; ChunkOrderIndex < LocalContent.ChunkedContent.ChunkOrders.size(); ChunkOrderIndex++) - { - uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[ChunkOrderIndex]; - uint32_t AbsoluteChunkIndex = AbsoluteChunkOrders[ChunkOrderIndex]; - const IoHash& LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; - const IoHash& AbsoluteChunkHash = AbsoluteChunkHashes[AbsoluteChunkIndex]; - ZEN_ASSERT(LocalChunkHash == AbsoluteChunkHash); - } -#endif // EXTRA_VERIFY - - WriteBuildContentToCompactBinary(PartManifestWriter, - LocalContent.Platform, - LocalContent.Paths, - LocalContent.RawHashes, - LocalContent.RawSizes, - LocalContent.Attributes, - LocalContent.ChunkedContent.SequenceRawHashes, - LocalContent.ChunkedContent.ChunkCounts, - LocalContent.ChunkedContent.ChunkHashes, - LocalContent.ChunkedContent.ChunkRawSizes, - AbsoluteChunkOrders, - LooseChunkIndexes, - AllChunkBlockHashes); - -#if EXTRA_VERIFY - { - ChunkedFolderContent VerifyFolderContent; - - std::vector<uint32_t> OutAbsoluteChunkOrders; - std::vector<IoHash> OutLooseChunkHashes; - std::vector<uint64_t> OutLooseChunkRawSizes; - std::vector<IoHash> OutBlockRawHashes; - ReadBuildContentFromCompactBinary(PartManifestWriter.Save(), - VerifyFolderContent.Platform, - VerifyFolderContent.Paths, - VerifyFolderContent.RawHashes, - VerifyFolderContent.RawSizes, - VerifyFolderContent.Attributes, - VerifyFolderContent.ChunkedContent.SequenceRawHashes, - VerifyFolderContent.ChunkedContent.ChunkCounts, - OutAbsoluteChunkOrders, - OutLooseChunkHashes, - OutLooseChunkRawSizes, - OutBlockRawHashes); - ZEN_ASSERT(OutBlockRawHashes == AllChunkBlockHashes); - - for (uint32_t OrderIndex = 0; OrderIndex < OutAbsoluteChunkOrders.size(); OrderIndex++) - { - uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[OrderIndex]; - const IoHash LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; - - uint32_t VerifyChunkIndex = OutAbsoluteChunkOrders[OrderIndex]; - const IoHash VerifyChunkHash = AbsoluteChunkHashes[VerifyChunkIndex]; - - ZEN_ASSERT(LocalChunkHash == VerifyChunkHash); - } - - CalculateLocalChunkOrders(OutAbsoluteChunkOrders, - OutLooseChunkHashes, - OutLooseChunkRawSizes, - AllChunkBlockDescriptions, - VerifyFolderContent.ChunkedContent.ChunkHashes, - VerifyFolderContent.ChunkedContent.ChunkRawSizes, - VerifyFolderContent.ChunkedContent.ChunkOrders); - - ZEN_ASSERT(LocalContent.Paths == VerifyFolderContent.Paths); - ZEN_ASSERT(LocalContent.RawHashes == VerifyFolderContent.RawHashes); - ZEN_ASSERT(LocalContent.RawSizes == VerifyFolderContent.RawSizes); - ZEN_ASSERT(LocalContent.Attributes == VerifyFolderContent.Attributes); - ZEN_ASSERT(LocalContent.ChunkedContent.SequenceRawHashes == VerifyFolderContent.ChunkedContent.SequenceRawHashes); - ZEN_ASSERT(LocalContent.ChunkedContent.ChunkCounts == VerifyFolderContent.ChunkedContent.ChunkCounts); - - for (uint32_t OrderIndex = 0; OrderIndex < LocalContent.ChunkedContent.ChunkOrders.size(); OrderIndex++) - { - uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[OrderIndex]; - const IoHash LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; - uint64_t LocalChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[LocalChunkIndex]; - - uint32_t VerifyChunkIndex = VerifyFolderContent.ChunkedContent.ChunkOrders[OrderIndex]; - const IoHash VerifyChunkHash = VerifyFolderContent.ChunkedContent.ChunkHashes[VerifyChunkIndex]; - uint64_t VerifyChunkRawSize = VerifyFolderContent.ChunkedContent.ChunkRawSizes[VerifyChunkIndex]; - - ZEN_ASSERT(LocalChunkHash == VerifyChunkHash); - ZEN_ASSERT(LocalChunkRawSize == VerifyChunkRawSize); - } - } -#endif // EXTRA_VERIFY - PartManifest = PartManifestWriter.Save(); - } - - Stopwatch PutBuildPartResultTimer; - std::pair<IoHash, std::vector<IoHash>> PutBuildPartResult = - Storage.BuildStorage->PutBuildPart(BuildId, BuildPartId, BuildPartName, PartManifest); - if (!IsQuiet) - { - ZEN_CONSOLE("PutBuildPart took {}, payload size {}. {} attachments are needed.", - NiceTimeSpanMs(PutBuildPartResultTimer.GetElapsedTimeMs()), - NiceBytes(PartManifest.GetSize()), - PutBuildPartResult.second.size()); - } - IoHash PartHash = PutBuildPartResult.first; - - auto UploadAttachments = [&Storage, - &BuildId, - &Path, - &TempDir, - &LocalContent, - &LocalLookup, - &NewBlockChunks, - &NewBlocks, - &LooseChunkIndexes, - &LargeAttachmentSize, - &DiskStats, - &UploadStats, - &LooseChunksStats](std::span<IoHash> RawHashes) { - if (!AbortFlag) - { - UploadStatistics TempUploadStats; - LooseChunksStatistics TempLooseChunksStats; - - Stopwatch TempUploadTimer; - auto __ = MakeGuard([&]() { - if (!IsQuiet) - { - uint64_t TempChunkUploadTimeUs = TempUploadTimer.GetElapsedTimeUs(); - ZEN_CONSOLE( - "Uploaded {} ({}) blocks. " - "Compressed {} ({} {}B/s) and uploaded {} ({}) chunks. " - "Transferred {} ({}bits/s) in {}", - TempUploadStats.BlockCount.load(), - NiceBytes(TempUploadStats.BlocksBytes), - - TempLooseChunksStats.CompressedChunkCount.load(), - NiceBytes(TempLooseChunksStats.CompressedChunkBytes.load()), - NiceNum(GetBytesPerSecond(TempLooseChunksStats.CompressChunksElapsedWallTimeUS, - TempLooseChunksStats.ChunkByteCount)), - TempUploadStats.ChunkCount.load(), - NiceBytes(TempUploadStats.ChunksBytes), - - NiceBytes(TempUploadStats.BlocksBytes + TempUploadStats.ChunksBytes), - NiceNum(GetBytesPerSecond(TempUploadStats.ElapsedWallTimeUS, TempUploadStats.ChunksBytes * 8)), - NiceTimeSpanMs(TempChunkUploadTimeUs / 1000)); - } - }); - UploadPartBlobs(Storage, - BuildId, - Path, - TempDir, - LocalContent, - LocalLookup, - RawHashes, - NewBlockChunks, - NewBlocks, - LooseChunkIndexes, - LargeAttachmentSize, - DiskStats, - TempUploadStats, - TempLooseChunksStats); - UploadStats += TempUploadStats; - LooseChunksStats += TempLooseChunksStats; - } - }; - if (IgnoreExistingBlocks) - { - ZEN_CONSOLE_VERBOSE("PutBuildPart uploading all attachments, needs are: {}", - FormatArray<IoHash>(PutBuildPartResult.second, "\n "sv)); + ZEN_CONSOLE( + "Uploaded part {} ('{}') to build {}, {}\n" + " Scanned files: {:>8} ({}), {}B/sec, {}\n" + " New data: {:>8} ({}) {:.1f}%\n" + " New blocks: {:>8} ({} -> {}), {}B/sec, {}\n" + " New chunks: {:>8} ({} -> {}), {}B/sec, {}\n" + " Uploaded: {:>8} ({}), {}bits/sec, {}\n" + " Blocks: {:>8} ({})\n" + " Chunks: {:>8} ({}){}", + BuildPartId, + BuildPartName, + BuildId, + NiceTimeSpanMs(UploadTimer.GetElapsedTimeMs()), - std::vector<IoHash> ForceUploadChunkHashes; - ForceUploadChunkHashes.reserve(LooseChunkIndexes.size()); + UploadOp.m_LocalFolderScanStats.FoundFileCount.load(), + NiceBytes(UploadOp.m_LocalFolderScanStats.FoundFileByteCount.load()), + NiceNum(GetBytesPerSecond(UploadOp.m_ChunkingStats.ElapsedWallTimeUS, UploadOp.m_ChunkingStats.BytesHashed)), + NiceTimeSpanMs(UploadOp.m_ChunkingStats.ElapsedWallTimeUS / 1000), - for (uint32_t ChunkIndex : LooseChunkIndexes) - { - ForceUploadChunkHashes.push_back(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]); - } + UploadOp.m_FindBlocksStats.NewBlocksChunkCount + UploadOp.m_LooseChunksStats.CompressedChunkCount, + NiceBytes(UploadOp.m_FindBlocksStats.NewBlocksChunkByteCount + UploadOp.m_LooseChunksStats.CompressedChunkBytes), + DeltaByteCountPercent, - for (size_t BlockIndex = 0; BlockIndex < NewBlocks.BlockHeaders.size(); BlockIndex++) - { - if (NewBlocks.BlockHeaders[BlockIndex]) - { - // Block was not uploaded during generation - ForceUploadChunkHashes.push_back(NewBlocks.BlockDescriptions[BlockIndex].BlockHash); - } - } - UploadAttachments(ForceUploadChunkHashes); - } - else if (!PutBuildPartResult.second.empty()) - { - ZEN_CONSOLE_VERBOSE("PutBuildPart needs attachments: {}", FormatArray<IoHash>(PutBuildPartResult.second, "\n "sv)); - UploadAttachments(PutBuildPartResult.second); - } + UploadOp.m_GenerateBlocksStats.GeneratedBlockCount.load(), + NiceBytes(UploadOp.m_FindBlocksStats.NewBlocksChunkByteCount), + NiceBytes(UploadOp.m_GenerateBlocksStats.GeneratedBlockByteCount.load()), + NiceNum(GetBytesPerSecond(UploadOp.m_GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS, + UploadOp.m_GenerateBlocksStats.GeneratedBlockByteCount)), + NiceTimeSpanMs(UploadOp.m_GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS / 1000), - uint32_t FinalizeBuildPartRetryCount = 5; - while (!AbortFlag && (FinalizeBuildPartRetryCount--) > 0) - { - Stopwatch FinalizeBuildPartTimer; - std::vector<IoHash> Needs = Storage.BuildStorage->FinalizeBuildPart(BuildId, BuildPartId, PartHash); - if (!IsQuiet) - { - ZEN_CONSOLE("FinalizeBuildPart took {}. {} attachments are missing.", - NiceTimeSpanMs(FinalizeBuildPartTimer.GetElapsedTimeMs()), - Needs.size()); - } - if (Needs.empty()) - { - break; - } - ZEN_CONSOLE_VERBOSE("FinalizeBuildPart needs attachments: {}", FormatArray<IoHash>(Needs, "\n "sv)); - UploadAttachments(Needs); - } + UploadOp.m_LooseChunksStats.CompressedChunkCount.load(), + NiceBytes(UploadOp.m_LooseChunksStats.ChunkByteCount), + NiceBytes(UploadOp.m_LooseChunksStats.CompressedChunkBytes.load()), + NiceNum(GetBytesPerSecond(UploadOp.m_LooseChunksStats.CompressChunksElapsedWallTimeUS, + UploadOp.m_LooseChunksStats.ChunkByteCount)), + NiceTimeSpanMs(UploadOp.m_LooseChunksStats.CompressChunksElapsedWallTimeUS / 1000), - if (CreateBuild && !AbortFlag) - { - Stopwatch FinalizeBuildTimer; - Storage.BuildStorage->FinalizeBuild(BuildId); - if (!IsQuiet) - { - ZEN_CONSOLE("FinalizeBuild took {}", NiceTimeSpanMs(FinalizeBuildTimer.GetElapsedTimeMs())); - } - } + UploadOp.m_UploadStats.BlockCount.load() + UploadOp.m_UploadStats.ChunkCount.load(), + NiceBytes(UploadOp.m_UploadStats.BlocksBytes + UploadOp.m_UploadStats.ChunksBytes), + NiceNum(GetBytesPerSecond(UploadOp.m_UploadStats.ElapsedWallTimeUS, + (UploadOp.m_UploadStats.ChunksBytes + UploadOp.m_UploadStats.BlocksBytes) * 8)), + NiceTimeSpanMs(UploadOp.m_UploadStats.ElapsedWallTimeUS / 1000), - if (!NewBlocks.BlockDescriptions.empty() && !AbortFlag) - { - uint64_t UploadBlockMetadataCount = 0; - Stopwatch UploadBlockMetadataTimer; + UploadOp.m_UploadStats.BlockCount.load(), + NiceBytes(UploadOp.m_UploadStats.BlocksBytes.load()), - uint32_t FailedMetadataUploadCount = 1; - int32_t MetadataUploadRetryCount = 3; - while ((MetadataUploadRetryCount-- > 0) && (FailedMetadataUploadCount > 0)) - { - FailedMetadataUploadCount = 0; - for (size_t BlockIndex = 0; BlockIndex < NewBlocks.BlockDescriptions.size(); BlockIndex++) - { - if (AbortFlag) - { - break; - } - const IoHash& BlockHash = NewBlocks.BlockDescriptions[BlockIndex].BlockHash; - if (!NewBlocks.MetaDataHasBeenUploaded[BlockIndex]) - { - const CbObject BlockMetaData = - BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]); - if (Storage.BuildCacheStorage) - { - Storage.BuildCacheStorage->PutBlobMetadatas(BuildId, - std::vector<IoHash>({BlockHash}), - std::vector<CbObject>({BlockMetaData})); - } - bool MetadataSucceeded = Storage.BuildStorage->PutBlockMetadata(BuildId, BlockHash, BlockMetaData); - if (MetadataSucceeded) - { - UploadStats.BlocksBytes += BlockMetaData.GetSize(); - NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; - UploadBlockMetadataCount++; - } - else - { - FailedMetadataUploadCount++; - } - } - } + UploadOp.m_UploadStats.ChunkCount.load(), + NiceBytes(UploadOp.m_UploadStats.ChunksBytes.load()), + MultipartAttachmentStats); } - if (UploadBlockMetadataCount > 0) - { - uint64_t ElapsedUS = UploadBlockMetadataTimer.GetElapsedTimeUs(); - UploadStats.ElapsedWallTimeUS += ElapsedUS; - if (!IsQuiet) - { - ZEN_CONSOLE("Uploaded metadata for {} blocks in {}", UploadBlockMetadataCount, NiceTimeSpanMs(ElapsedUS / 1000)); - } - } - } - - ValidateStatistics ValidateStats; - DownloadStatistics ValidateDownloadStats; - if (PostUploadVerify && !AbortFlag) - { - ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::Validate, TaskSteps::StepCount); - ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, BuildPartName, ValidateStats, ValidateDownloadStats); - } - - ZEN_CONSOLE_VERBOSE( - "Folder scanning stats:" - "\n FoundFileCount: {}" - "\n FoundFileByteCount: {}" - "\n AcceptedFileCount: {}" - "\n AcceptedFileByteCount: {}" - "\n ElapsedWallTimeUS: {}", - LocalFolderScanStats.FoundFileCount.load(), - NiceBytes(LocalFolderScanStats.FoundFileByteCount.load()), - LocalFolderScanStats.AcceptedFileCount.load(), - NiceBytes(LocalFolderScanStats.AcceptedFileByteCount.load()), - NiceLatencyNs(LocalFolderScanStats.ElapsedWallTimeUS * 1000)); - - ZEN_CONSOLE_VERBOSE( - "Chunking stats:" - "\n FilesProcessed: {}" - "\n FilesChunked: {}" - "\n BytesHashed: {}" - "\n UniqueChunksFound: {}" - "\n UniqueSequencesFound: {}" - "\n UniqueBytesFound: {}" - "\n ElapsedWallTimeUS: {}", - ChunkingStats.FilesProcessed.load(), - ChunkingStats.FilesChunked.load(), - NiceBytes(ChunkingStats.BytesHashed.load()), - ChunkingStats.UniqueChunksFound.load(), - ChunkingStats.UniqueSequencesFound.load(), - NiceBytes(ChunkingStats.UniqueBytesFound.load()), - NiceLatencyNs(ChunkingStats.ElapsedWallTimeUS * 1000)); - - ZEN_CONSOLE_VERBOSE( - "Find block stats:" - "\n FindBlockTimeMS: {}" - "\n PotentialChunkCount: {}" - "\n PotentialChunkByteCount: {}" - "\n FoundBlockCount: {}" - "\n FoundBlockChunkCount: {}" - "\n FoundBlockByteCount: {}" - "\n AcceptedBlockCount: {}" - "\n AcceptedChunkCount: {}" - "\n AcceptedByteCount: {}" - "\n AcceptedRawByteCount: {}" - "\n RejectedBlockCount: {}" - "\n RejectedChunkCount: {}" - "\n RejectedByteCount: {}" - "\n AcceptedReduntantChunkCount: {}" - "\n AcceptedReduntantByteCount: {}" - "\n NewBlocksCount: {}" - "\n NewBlocksChunkCount: {}" - "\n NewBlocksChunkByteCount: {}", - NiceTimeSpanMs(FindBlocksStats.FindBlockTimeMS), - FindBlocksStats.PotentialChunkCount, - NiceBytes(FindBlocksStats.PotentialChunkByteCount), - FindBlocksStats.FoundBlockCount, - FindBlocksStats.FoundBlockChunkCount, - NiceBytes(FindBlocksStats.FoundBlockByteCount), - FindBlocksStats.AcceptedBlockCount, - FindBlocksStats.AcceptedChunkCount, - NiceBytes(FindBlocksStats.AcceptedByteCount), - NiceBytes(FindBlocksStats.AcceptedRawByteCount), - FindBlocksStats.RejectedBlockCount, - FindBlocksStats.RejectedChunkCount, - NiceBytes(FindBlocksStats.RejectedByteCount), - FindBlocksStats.AcceptedReduntantChunkCount, - NiceBytes(FindBlocksStats.AcceptedReduntantByteCount), - FindBlocksStats.NewBlocksCount, - FindBlocksStats.NewBlocksChunkCount, - NiceBytes(FindBlocksStats.NewBlocksChunkByteCount)); - - ZEN_CONSOLE_VERBOSE( - "Generate blocks stats:" - "\n GeneratedBlockByteCount: {}" - "\n GeneratedBlockCount: {}" - "\n GenerateBlocksElapsedWallTimeUS: {}", - NiceBytes(GenerateBlocksStats.GeneratedBlockByteCount.load()), - GenerateBlocksStats.GeneratedBlockCount.load(), - NiceLatencyNs(GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS * 1000)); - - ZEN_CONSOLE_VERBOSE( - "Generate blocks stats:" - "\n ChunkCount: {}" - "\n ChunkByteCount: {}" - "\n CompressedChunkCount: {}" - "\n CompressChunksElapsedWallTimeUS: {}", - LooseChunksStats.ChunkCount, - NiceBytes(LooseChunksStats.ChunkByteCount), - LooseChunksStats.CompressedChunkCount.load(), - NiceBytes(LooseChunksStats.CompressedChunkBytes.load()), - NiceLatencyNs(LooseChunksStats.CompressChunksElapsedWallTimeUS * 1000)); - - ZEN_CONSOLE_VERBOSE( - "Disk stats:" - "\n OpenReadCount: {}" - "\n OpenWriteCount: {}" - "\n ReadCount: {}" - "\n ReadByteCount: {}" - "\n WriteCount: {} ({} cloned)" - "\n WriteByteCount: {} ({} cloned)" - "\n CurrentOpenFileCount: {}", - DiskStats.OpenReadCount.load(), - DiskStats.OpenWriteCount.load(), - DiskStats.ReadCount.load(), - NiceBytes(DiskStats.ReadByteCount.load()), - DiskStats.WriteCount.load(), - DiskStats.CloneCount.load(), - NiceBytes(DiskStats.WriteByteCount.load()), - NiceBytes(DiskStats.CloneByteCount.load()), - DiskStats.CurrentOpenFileCount.load()); - - ZEN_CONSOLE_VERBOSE( - "Upload stats:" - "\n BlockCount: {}" - "\n BlocksBytes: {}" - "\n ChunkCount: {}" - "\n ChunksBytes: {}" - "\n ReadFromDiskBytes: {}" - "\n MultipartAttachmentCount: {}" - "\n ElapsedWallTimeUS: {}", - UploadStats.BlockCount.load(), - NiceBytes(UploadStats.BlocksBytes.load()), - UploadStats.ChunkCount.load(), - NiceBytes(UploadStats.ChunksBytes.load()), - NiceBytes(UploadStats.ReadFromDiskBytes.load()), - UploadStats.MultipartAttachmentCount.load(), - NiceLatencyNs(UploadStats.ElapsedWallTimeUS * 1000)); - - if (PostUploadVerify) - { - ZEN_CONSOLE_VERBOSE( - "Validate stats:" - "\n BuildBlobSize: {}" - "\n BuildPartSize: {}" - "\n ChunkAttachmentCount: {}" - "\n BlockAttachmentCount: {}" - "\n VerifiedAttachmentCount: {}" - "\n VerifiedByteCount: {}" - "\n ElapsedWallTimeUS: {}", - NiceBytes(ValidateStats.BuildBlobSize), - NiceBytes(ValidateStats.BuildPartSize), - ValidateStats.ChunkAttachmentCount, - ValidateStats.BlockAttachmentCount, - ValidateStats.VerifiedAttachmentCount.load(), - NiceBytes(ValidateStats.VerifiedByteCount.load()), - NiceLatencyNs(ValidateStats.ElapsedWallTimeUS * 1000)); - - ZEN_CONSOLE_VERBOSE( - "Validate download stats:" - "\n RequestsCompleteCount: {}" - "\n DownloadedChunkCount: {}" - "\n DownloadedChunkByteCount: {}" - "\n MultipartAttachmentCount: {}" - "\n DownloadedBlockCount: {}" - "\n DownloadedBlockByteCount: {}" - "\n DownloadedPartialBlockCount: {}" - "\n DownloadedPartialBlockByteCount: {}", - ValidateDownloadStats.RequestsCompleteCount.load(), - ValidateDownloadStats.DownloadedChunkCount.load(), - NiceBytes(ValidateDownloadStats.DownloadedChunkByteCount.load()), - ValidateDownloadStats.MultipartAttachmentCount.load(), - ValidateDownloadStats.DownloadedBlockCount.load(), - NiceBytes(ValidateDownloadStats.DownloadedBlockByteCount.load()), - ValidateDownloadStats.DownloadedPartialBlockCount.load(), - NiceBytes(ValidateDownloadStats.DownloadedPartialBlockByteCount.load())); - } - - const double DeltaByteCountPercent = - ChunkingStats.BytesHashed > 0 - ? (100.0 * (FindBlocksStats.NewBlocksChunkByteCount + LooseChunksStats.CompressedChunkBytes)) / (ChunkingStats.BytesHashed) - : 0.0; - - const std::string MultipartAttachmentStats = - (LargeAttachmentSize != (uint64_t)-1) ? fmt::format(" ({} as multipart)", UploadStats.MultipartAttachmentCount.load()) : ""; - - std::string ValidateInfo; - if (PostUploadVerify) - { - const uint64_t DownloadedCount = ValidateDownloadStats.DownloadedChunkCount + ValidateDownloadStats.DownloadedBlockCount; - const uint64_t DownloadedByteCount = - ValidateDownloadStats.DownloadedChunkByteCount + ValidateDownloadStats.DownloadedBlockByteCount; - ValidateInfo = fmt::format("\n Verified: {:>8} ({}), {}B/sec, {}", - DownloadedCount, - NiceBytes(DownloadedByteCount), - NiceNum(GetBytesPerSecond(ValidateStats.ElapsedWallTimeUS, DownloadedByteCount)), - NiceTimeSpanMs(ValidateStats.ElapsedWallTimeUS / 1000)); } - - if (!IsQuiet) - { - ZEN_CONSOLE( - "Uploaded part {} ('{}') to build {}, {}\n" - " Scanned files: {:>8} ({}), {}B/sec, {}\n" - " New data: {:>8} ({}) {:.1f}%\n" - " New blocks: {:>8} ({} -> {}), {}B/sec, {}\n" - " New chunks: {:>8} ({} -> {}), {}B/sec, {}\n" - " Uploaded: {:>8} ({}), {}bits/sec, {}\n" - " Blocks: {:>8} ({})\n" - " Chunks: {:>8} ({}){}" - "{}", - BuildPartId, - BuildPartName, - BuildId, - NiceTimeSpanMs(ProcessTimer.GetElapsedTimeMs()), - - LocalFolderScanStats.FoundFileCount.load(), - NiceBytes(LocalFolderScanStats.FoundFileByteCount.load()), - NiceNum(GetBytesPerSecond(ChunkingStats.ElapsedWallTimeUS, ChunkingStats.BytesHashed)), - NiceTimeSpanMs(ChunkingStats.ElapsedWallTimeUS / 1000), - - FindBlocksStats.NewBlocksChunkCount + LooseChunksStats.CompressedChunkCount, - NiceBytes(FindBlocksStats.NewBlocksChunkByteCount + LooseChunksStats.CompressedChunkBytes), - DeltaByteCountPercent, - - GenerateBlocksStats.GeneratedBlockCount.load(), - NiceBytes(FindBlocksStats.NewBlocksChunkByteCount), - NiceBytes(GenerateBlocksStats.GeneratedBlockByteCount.load()), - NiceNum( - GetBytesPerSecond(GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS, GenerateBlocksStats.GeneratedBlockByteCount)), - NiceTimeSpanMs(GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS / 1000), - - LooseChunksStats.CompressedChunkCount.load(), - NiceBytes(LooseChunksStats.ChunkByteCount), - NiceBytes(LooseChunksStats.CompressedChunkBytes.load()), - NiceNum(GetBytesPerSecond(LooseChunksStats.CompressChunksElapsedWallTimeUS, LooseChunksStats.ChunkByteCount)), - NiceTimeSpanMs(LooseChunksStats.CompressChunksElapsedWallTimeUS / 1000), - - UploadStats.BlockCount.load() + UploadStats.ChunkCount.load(), - NiceBytes(UploadStats.BlocksBytes + UploadStats.ChunksBytes), - NiceNum(GetBytesPerSecond(UploadStats.ElapsedWallTimeUS, (UploadStats.ChunksBytes + UploadStats.BlocksBytes) * 8)), - NiceTimeSpanMs(UploadStats.ElapsedWallTimeUS / 1000), - - UploadStats.BlockCount.load(), - NiceBytes(UploadStats.BlocksBytes.load()), - - UploadStats.ChunkCount.load(), - NiceBytes(UploadStats.ChunksBytes.load()), - MultipartAttachmentStats, - - ValidateInfo); - } - - Storage.BuildStorage->PutBuildPartStats( - BuildId, - BuildPartId, - {{"totalSize", double(LocalFolderScanStats.FoundFileByteCount.load())}, - {"reusedRatio", AcceptedByteCountPercent / 100.0}, - {"reusedBlockCount", double(FindBlocksStats.AcceptedBlockCount)}, - {"reusedBlockByteCount", double(FindBlocksStats.AcceptedRawByteCount)}, - {"newBlockCount", double(FindBlocksStats.NewBlocksCount)}, - {"newBlockByteCount", double(FindBlocksStats.NewBlocksChunkByteCount)}, - {"uploadedCount", double(UploadStats.BlockCount.load() + UploadStats.ChunkCount.load())}, - {"uploadedByteCount", double(UploadStats.BlocksBytes.load() + UploadStats.ChunksBytes.load())}, - {"uploadedBytesPerSec", - double(GetBytesPerSecond(UploadStats.ElapsedWallTimeUS, UploadStats.ChunksBytes + UploadStats.BlocksBytes))}, - {"elapsedTimeSec", double(ProcessTimer.GetElapsedTimeMs() / 1000.0)}}); - - ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::Cleanup, TaskSteps::StepCount); } + struct VerifyFolderStatistics + { + std::atomic<uint64_t> FilesVerified = 0; + std::atomic<uint64_t> FilesFailed = 0; + std::atomic<uint64_t> ReadBytes = 0; + uint64_t VerifyElapsedWallTimeUs = 0; + }; + void VerifyFolder(const ChunkedFolderContent& Content, const std::filesystem::path& Path, bool VerifyFileHash, @@ -5634,54 +3235,45 @@ namespace { } FolderContent LocalFolderState; - DiskStatistics DiskStats; - CacheMappingStatistics CacheMappingStats; - DownloadStatistics DownloadStats; - WriteChunkStatistics WriteChunkStats; - RebuildFolderStateStatistics RebuildFolderStateStats; - VerifyFolderStatistics VerifyFolderStats; - const ChunkedContentLookup LocalLookup = BuildChunkedContentLookup(LocalContent); const ChunkedContentLookup RemoteLookup = BuildChunkedContentLookup(RemoteContent); ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::Download, TaskSteps::StepCount); - { - ConsoleOpLogOutput Output(ProgressMode); - BuildsOperationUpdateFolder Updater( - Output, - Storage, - AbortFlag, - PauseFlag, - GetIOWorkerPool(), - GetNetworkPool(), - BuildId, - Path, - LocalContent, - LocalLookup, - RemoteContent, - RemoteLookup, - BlockDescriptions, - LooseChunkHashes, - BuildsOperationUpdateFolder::Options{.IsQuiet = IsQuiet, - .IsVerbose = IsVerbose, - .AllowFileClone = AllowFileClone, - .UseSparseFiles = UseSparseFiles, - .SystemRootDir = Options.SystemRootDir, - .ZenFolderPath = Options.ZenFolderPath, - .LargeAttachmentSize = LargeAttachmentSize, - .PreferredMultipartChunkSize = PreferredMultipartChunkSize, - .PartialBlockRequestMode = Options.PartialBlockRequestMode, - .WipeTargetFolder = Options.CleanTargetFolder, - .PrimeCacheOnly = Options.PrimeCacheOnly, - .EnableOtherDownloadsScavenging = Options.EnableOtherDownloadsScavenging, - .EnableTargetFolderScavenging = Options.EnableTargetFolderScavenging, - .DefaultExcludeFolders = DefaultExcludeFolders, - .DefaultExcludeExtensions = DefaultExcludeExtensions}, - DiskStats); - Updater.Execute(LocalFolderState); - } - + ConsoleOpLogOutput Output(ProgressMode); + BuildsOperationUpdateFolder Updater( + Output, + Storage, + AbortFlag, + PauseFlag, + GetIOWorkerPool(), + GetNetworkPool(), + BuildId, + Path, + LocalContent, + LocalLookup, + RemoteContent, + RemoteLookup, + BlockDescriptions, + LooseChunkHashes, + BuildsOperationUpdateFolder::Options{.IsQuiet = IsQuiet, + .IsVerbose = IsVerbose, + .AllowFileClone = AllowFileClone, + .UseSparseFiles = UseSparseFiles, + .SystemRootDir = Options.SystemRootDir, + .ZenFolderPath = Options.ZenFolderPath, + .LargeAttachmentSize = LargeAttachmentSize, + .PreferredMultipartChunkSize = PreferredMultipartChunkSize, + .PartialBlockRequestMode = Options.PartialBlockRequestMode, + .WipeTargetFolder = Options.CleanTargetFolder, + .PrimeCacheOnly = Options.PrimeCacheOnly, + .EnableOtherDownloadsScavenging = Options.EnableOtherDownloadsScavenging, + .EnableTargetFolderScavenging = Options.EnableTargetFolderScavenging, + .ExcludeFolders = DefaultExcludeFolders, + .ExcludeExtensions = DefaultExcludeExtensions}); + Updater.Execute(LocalFolderState); + + VerifyFolderStatistics VerifyFolderStats; if (!AbortFlag) { if (!Options.PrimeCacheOnly) @@ -5708,11 +3300,12 @@ namespace { WriteFile(ZenStateFileJsonPath(Options.ZenFolderPath), IoBuffer(IoBuffer::Wrap, SB.Data(), SB.Size())); #endif // 0 } - const uint64_t DownloadCount = DownloadStats.DownloadedChunkCount.load() + DownloadStats.DownloadedBlockCount.load() + - DownloadStats.DownloadedPartialBlockCount.load(); - const uint64_t DownloadByteCount = DownloadStats.DownloadedChunkByteCount.load() + - DownloadStats.DownloadedBlockByteCount.load() + - DownloadStats.DownloadedPartialBlockByteCount.load(); + const uint64_t DownloadCount = Updater.m_DownloadStats.DownloadedChunkCount.load() + + Updater.m_DownloadStats.DownloadedBlockCount.load() + + Updater.m_DownloadStats.DownloadedPartialBlockCount.load(); + const uint64_t DownloadByteCount = Updater.m_DownloadStats.DownloadedChunkByteCount.load() + + Updater.m_DownloadStats.DownloadedBlockByteCount.load() + + Updater.m_DownloadStats.DownloadedPartialBlockByteCount.load(); const uint64_t DownloadTimeMs = DownloadTimer.GetElapsedTimeMs(); if (!IsQuiet) @@ -5730,15 +3323,15 @@ namespace { DownloadCount, NiceBytes(DownloadByteCount), - NiceNum(GetBytesPerSecond(WriteChunkStats.DownloadTimeUs, DownloadByteCount * 8)), + NiceNum(GetBytesPerSecond(Updater.m_WriteChunkStats.DownloadTimeUs, DownloadByteCount * 8)), - DiskStats.WriteCount.load(), - NiceBytes(DiskStats.WriteByteCount.load()), - NiceNum(GetBytesPerSecond(WriteChunkStats.WriteTimeUs, DiskStats.WriteByteCount.load())), + Updater.m_DiskStats.WriteCount.load(), + NiceBytes(Updater.m_DiskStats.WriteByteCount.load()), + NiceNum(GetBytesPerSecond(Updater.m_WriteChunkStats.WriteTimeUs, Updater.m_DiskStats.WriteByteCount.load())), - NiceTimeSpanMs(RebuildFolderStateStats.CleanFolderElapsedWallTimeUs / 1000), + NiceTimeSpanMs(Updater.m_RebuildFolderStateStats.CleanFolderElapsedWallTimeUs / 1000), - NiceTimeSpanMs(RebuildFolderStateStats.FinalizeTreeElapsedWallTimeUs / 1000), + NiceTimeSpanMs(Updater.m_RebuildFolderStateStats.FinalizeTreeElapsedWallTimeUs / 1000), NiceTimeSpanMs(VerifyFolderStats.VerifyElapsedWallTimeUs / 1000)); } @@ -7377,8 +4970,30 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) m_AllowMultiparts, MetaData, m_CreateBuild, - m_Clean, - m_PostUploadVerify); + m_Clean); + + if (!AbortFlag) + { + if (m_PostUploadVerify) + { + ValidateStatistics ValidateStats; + DownloadStatistics ValidateDownloadStats; + ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName, ValidateStats, ValidateDownloadStats); + std::string ValidateInfo; + if (m_PostUploadVerify) + { + const uint64_t DownloadedCount = + ValidateDownloadStats.DownloadedChunkCount + ValidateDownloadStats.DownloadedBlockCount; + const uint64_t DownloadedByteCount = + ValidateDownloadStats.DownloadedChunkByteCount + ValidateDownloadStats.DownloadedBlockByteCount; + ValidateInfo = fmt::format("Verified: {:>8} ({}), {}B/sec, {}", + DownloadedCount, + NiceBytes(DownloadedByteCount), + NiceNum(GetBytesPerSecond(ValidateStats.ElapsedWallTimeUS, DownloadedByteCount)), + NiceTimeSpanMs(ValidateStats.ElapsedWallTimeUS / 1000)); + } + } + } if (true) { @@ -7864,13 +5479,18 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) m_AllowMultiparts, MetaData, true, - false, - true); + false); if (AbortFlag) { throw std::runtime_error("Test aborted. (Upload build)"); } + { + ValidateStatistics ValidateStats; + DownloadStatistics ValidateDownloadStats; + ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName, ValidateStats, ValidateDownloadStats); + } + ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}'", BuildId, BuildPartId, m_BuildPartName, DownloadPath); DownloadFolder(Storage, BuildId, @@ -8050,13 +5670,18 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) m_AllowMultiparts, MetaData2, true, - false, - true); + false); if (AbortFlag) { throw std::runtime_error("Test aborted. (Upload scrambled)"); } + { + ValidateStatistics ValidateStats; + DownloadStatistics ValidateDownloadStats; + ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName, ValidateStats, ValidateDownloadStats); + } + ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (original)", BuildId, BuildPartId, m_BuildPartName, DownloadPath); DownloadFolder(Storage, BuildId, diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp index 8c5040397..c7afe702c 100644 --- a/src/zencore/filesystem.cpp +++ b/src/zencore/filesystem.cpp @@ -3063,6 +3063,33 @@ SetFileReadOnly(const std::filesystem::path& Filename, bool ReadOnly) return Result; } +void +MakeSafeAbsolutePathÍnPlace(std::filesystem::path& Path) +{ + if (!Path.empty()) + { + std::filesystem::path AbsolutePath = std::filesystem::absolute(Path).make_preferred(); +#if ZEN_PLATFORM_WINDOWS + const std::string_view Prefix = "\\\\?\\"; + const std::u8string PrefixU8(Prefix.begin(), Prefix.end()); + std::u8string PathString = AbsolutePath.u8string(); + if (!PathString.empty() && !PathString.starts_with(PrefixU8)) + { + PathString.insert(0, PrefixU8); + Path = PathString; + } +#endif // ZEN_PLATFORM_WINDOWS + } +} + +std::filesystem::path +MakeSafeAbsolutePath(const std::filesystem::path& Path) +{ + std::filesystem::path Tmp(Path); + MakeSafeAbsolutePathÍnPlace(Tmp); + return Tmp; +} + class SharedMemoryImpl : public SharedMemory { public: diff --git a/src/zencore/include/zencore/filesystem.h b/src/zencore/include/zencore/filesystem.h index 3bfc3b540..b4906aebf 100644 --- a/src/zencore/include/zencore/filesystem.h +++ b/src/zencore/include/zencore/filesystem.h @@ -408,6 +408,9 @@ uint32_t MakeFileModeReadOnly(uint32_t FileMode, bool ReadOnly); bool SetFileReadOnly(const std::filesystem::path& Filename, bool ReadOnly, std::error_code& Ec); bool SetFileReadOnly(const std::filesystem::path& Filename, bool ReadOnly); +void MakeSafeAbsolutePathÍnPlace(std::filesystem::path& Path); +[[nodiscard]] std::filesystem::path MakeSafeAbsolutePath(const std::filesystem::path& Path); + class SharedMemory { public: diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp index f3a585737..a4f4237cd 100644 --- a/src/zenremotestore/builds/buildstorageoperations.cpp +++ b/src/zenremotestore/builds/buildstorageoperations.cpp @@ -5,6 +5,7 @@ #include <zenremotestore/builds/buildstorage.h> #include <zenremotestore/builds/buildstoragecache.h> #include <zenremotestore/chunking/chunkblock.h> +#include <zenremotestore/chunking/chunkingcontroller.h> #include <zencore/basicfile.h> #include <zencore/compactbinary.h> @@ -25,6 +26,8 @@ ZEN_THIRD_PARTY_INCLUDES_START #include <tsl/robin_set.h> ZEN_THIRD_PARTY_INCLUDES_END +#define EXTRA_VERIFY 0 + namespace zen { using namespace std::literals; @@ -496,8 +499,118 @@ namespace { } } + const std::vector<uint32_t> NonCompressableExtensions({HashStringDjb2(".mp4"sv), + HashStringDjb2(".zip"sv), + HashStringDjb2(".7z"sv), + HashStringDjb2(".bzip"sv), + HashStringDjb2(".rar"sv), + HashStringDjb2(".gzip"sv), + HashStringDjb2(".apk"sv), + HashStringDjb2(".nsp"sv), + HashStringDjb2(".xvc"sv), + HashStringDjb2(".pkg"sv), + HashStringDjb2(".dmg"sv), + HashStringDjb2(".ipa"sv)}); + + const tsl::robin_set<uint32_t> NonCompressableExtensionSet(NonCompressableExtensions.begin(), NonCompressableExtensions.end()); + + bool IsExtensionHashCompressable(const uint32_t PathHash) { return !NonCompressableExtensionSet.contains(PathHash); } + + bool IsChunkCompressable(const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, uint32_t ChunkIndex) + { + ZEN_UNUSED(Content); + const uint32_t ChunkLocationCount = Lookup.ChunkSequenceLocationCounts[ChunkIndex]; + if (ChunkLocationCount == 0) + { + return false; + } + const size_t ChunkLocationOffset = Lookup.ChunkSequenceLocationOffset[ChunkIndex]; + const uint32_t SequenceIndex = Lookup.ChunkSequenceLocations[ChunkLocationOffset].SequenceIndex; + const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex]; + const uint32_t ExtensionHash = Lookup.PathExtensionHash[PathIndex]; + + const bool IsCompressable = IsExtensionHashCompressable(ExtensionHash); + return IsCompressable; + } + + template<typename T> + std::string FormatArray(std::span<const T> Items, std::string_view Prefix) + { + ExtendableStringBuilder<512> SB; + for (const T& Item : Items) + { + SB.Append(fmt::format("{}{}", Prefix, Item)); + } + return SB.ToString(); + } + } // namespace +class ReadFileCache +{ +public: + // A buffered file reader that provides CompositeBuffer where the buffers are owned and the memory never overwritten + ReadFileCache(DiskStatistics& DiskStats, + const std::filesystem::path& Path, + const ChunkedFolderContent& LocalContent, + const ChunkedContentLookup& LocalLookup, + size_t MaxOpenFileCount) + : m_Path(Path) + , m_LocalContent(LocalContent) + , m_LocalLookup(LocalLookup) + , m_DiskStats(DiskStats) + { + m_OpenFiles.reserve(MaxOpenFileCount); + } + ~ReadFileCache() { m_OpenFiles.clear(); } + + CompositeBuffer GetRange(uint32_t SequenceIndex, uint64_t Offset, uint64_t Size) + { + ZEN_TRACE_CPU("ReadFileCache::GetRange"); + + auto CacheIt = + std::find_if(m_OpenFiles.begin(), m_OpenFiles.end(), [SequenceIndex](const auto& Lhs) { return Lhs.first == SequenceIndex; }); + if (CacheIt != m_OpenFiles.end()) + { + if (CacheIt != m_OpenFiles.begin()) + { + auto CachedFile(std::move(CacheIt->second)); + m_OpenFiles.erase(CacheIt); + m_OpenFiles.insert(m_OpenFiles.begin(), std::make_pair(SequenceIndex, std::move(CachedFile))); + } + CompositeBuffer Result = m_OpenFiles.front().second->GetRange(Offset, Size); + return Result; + } + const uint32_t LocalPathIndex = m_LocalLookup.SequenceIndexFirstPathIndex[SequenceIndex]; + const std::filesystem::path LocalFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred(); + if (Size == m_LocalContent.RawSizes[LocalPathIndex]) + { + IoBuffer Result = IoBufferBuilder::MakeFromFile(LocalFilePath); + return CompositeBuffer(SharedBuffer(Result)); + } + if (m_OpenFiles.size() == m_OpenFiles.capacity()) + { + m_OpenFiles.pop_back(); + } + m_OpenFiles.insert(m_OpenFiles.begin(), + std::make_pair(SequenceIndex, + std::make_unique<BufferedOpenFile>(LocalFilePath, + m_DiskStats.OpenReadCount, + m_DiskStats.CurrentOpenFileCount, + m_DiskStats.ReadCount, + m_DiskStats.ReadByteCount))); + CompositeBuffer Result = m_OpenFiles.front().second->GetRange(Offset, Size); + return Result; + } + +private: + const std::filesystem::path m_Path; + const ChunkedFolderContent& m_LocalContent; + const ChunkedContentLookup& m_LocalLookup; + std::vector<std::pair<uint32_t, std::unique_ptr<BufferedOpenFile>>> m_OpenFiles; + DiskStatistics& m_DiskStats; +}; + bool ReadStateObject(BuildOpLogOutput& LogOutput, CbObjectView StateView, @@ -1118,6 +1231,8 @@ ZenTempFolderPath(const std::filesystem::path& ZenFolderPath) return ZenFolderPath / "tmp"; } +////////////////////// BuildsOperationUpdateFolder + BuildsOperationUpdateFolder::BuildsOperationUpdateFolder(BuildOpLogOutput& LogOutput, StorageInstance& Storage, std::atomic<bool>& AbortFlag, @@ -1132,8 +1247,7 @@ BuildsOperationUpdateFolder::BuildsOperationUpdateFolder(BuildOpLogOutput& const ChunkedContentLookup& RemoteLookup, const std::vector<ChunkBlockDescription>& BlockDescriptions, const std::vector<IoHash>& LooseChunkHashes, - const Options& Options, - DiskStatistics& DiskStats) + const Options& Options) : m_LogOutput(LogOutput) , m_Storage(Storage) , m_AbortFlag(AbortFlag) @@ -1149,7 +1263,6 @@ BuildsOperationUpdateFolder::BuildsOperationUpdateFolder(BuildOpLogOutput& , m_BlockDescriptions(BlockDescriptions) , m_LooseChunkHashes(LooseChunkHashes) , m_Options(Options) -, m_DiskStats(DiskStats) , m_CacheFolderPath(ZenTempCacheFolderPath(m_Options.ZenFolderPath)) { } @@ -1159,9 +1272,22 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) { ZEN_TRACE_CPU("BuildsOperationUpdateFolder::Execute"); + enum TaskSteps : uint32_t + { + ScanExistingData, + WriteChunks, + PrepareTarget, + FinalizeTarget, + StepCount + }; + + auto EndProgress = MakeGuard([&]() { m_LogOutput.SetLogOperationProgress(TaskSteps::StepCount, TaskSteps::StepCount); }); + ZEN_ASSERT((!m_Options.PrimeCacheOnly) || (m_Options.PrimeCacheOnly && (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Off))); + m_LogOutput.SetLogOperationProgress(TaskSteps::ScanExistingData, TaskSteps::StepCount); + Stopwatch IndexTimer; if (!m_Options.IsQuiet) @@ -1604,6 +1730,8 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) { ZEN_TRACE_CPU("WriteChunks"); + m_LogOutput.SetLogOperationProgress(TaskSteps::WriteChunks, TaskSteps::StepCount); + Stopwatch WriteTimer; FilteredRate FilteredDownloadedBytesPerSecond; @@ -2853,6 +2981,8 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) return; } + m_LogOutput.SetLogOperationProgress(TaskSteps::PrepareTarget, TaskSteps::StepCount); + tsl::robin_map<uint32_t, uint32_t> RemotePathIndexToLocalPathIndex; RemotePathIndexToLocalPathIndex.reserve(m_RemoteContent.Paths.size()); @@ -3031,13 +3161,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) Stopwatch Timer; // Clean target folder - if (!CleanDirectory(m_IOWorkerPool, - m_AbortFlag, - m_PauseFlag, - m_LogOutput, - m_Options.IsQuiet, - m_Path, - m_Options.DefaultExcludeFolders)) + if (!CleanDirectory(m_IOWorkerPool, m_AbortFlag, m_PauseFlag, m_LogOutput, m_Options.IsQuiet, m_Path, m_Options.ExcludeFolders)) { LOG_OUTPUT_WARN(m_LogOutput, "Some files in {} could not be removed", m_Path); } @@ -3051,6 +3175,9 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) { ZEN_TRACE_CPU("FinalizeTree"); + + m_LogOutput.SetLogOperationProgress(TaskSteps::FinalizeTarget, TaskSteps::StepCount); + Stopwatch Timer; std::unique_ptr<BuildOpLogOutput::ProgressBar> RebuildProgressBarPtr(m_LogOutput.CreateProgressBar("Rebuild State")); @@ -3688,9 +3815,9 @@ BuildsOperationUpdateFolder::FindDownloadedChunk(const IoHash& ChunkHash) } FolderContent -BuildsOperationUpdateFolder::GetValidFolderContent(GetFolderContentStatistics& LocalFolderScanStats, - const std::filesystem::path& Path, - std::span<const std::filesystem::path> PathsToCheck, +BuildsOperationUpdateFolder::GetValidFolderContent(GetFolderContentStatistics& FolderScanStats, + const std::filesystem::path& Path, + std::span<const std::filesystem::path> PathsToCheck, std::function<void(uint64_t PathCount, uint64_t CompletedPathCount)>&& ProgressCallback) { ZEN_TRACE_CPU("GetValidFolderContent"); @@ -3705,7 +3832,7 @@ BuildsOperationUpdateFolder::GetValidFolderContent(GetFolderContentStatistics& { Stopwatch Timer; - auto _ = MakeGuard([&LocalFolderScanStats, &Timer]() { LocalFolderScanStats.ElapsedWallTimeUS = Timer.GetElapsedTimeUs(); }); + auto _ = MakeGuard([&FolderScanStats, &Timer]() { FolderScanStats.ElapsedWallTimeUS = Timer.GetElapsedTimeUs(); }); ParallelWork Work(m_AbortFlag, m_PauseFlag, @@ -3717,7 +3844,7 @@ BuildsOperationUpdateFolder::GetValidFolderContent(GetFolderContentStatistics& { uint32_t PathRangeCount = Min(128u, PathCount - PathIndex); Work.ScheduleWork(m_IOWorkerPool, - [PathIndex, PathRangeCount, &PathsToCheck, &Path, &Result, &CompletedPathCount, &LocalFolderScanStats]( + [PathIndex, PathRangeCount, &PathsToCheck, &Path, &Result, &CompletedPathCount, &FolderScanStats]( std::atomic<bool>& AbortFlag) { if (!AbortFlag) { @@ -3734,10 +3861,10 @@ BuildsOperationUpdateFolder::GetValidFolderContent(GetFolderContentStatistics& Result.Attributes[PathRangeIndex])) { Result.Paths[PathRangeIndex] = std::move(FilePath); - LocalFolderScanStats.FoundFileCount++; - LocalFolderScanStats.FoundFileByteCount += Result.RawSizes[PathRangeIndex]; - LocalFolderScanStats.AcceptedFileCount++; - LocalFolderScanStats.AcceptedFileByteCount += Result.RawSizes[PathRangeIndex]; + FolderScanStats.FoundFileCount++; + FolderScanStats.FoundFileByteCount += Result.RawSizes[PathRangeIndex]; + FolderScanStats.AcceptedFileCount++; + FolderScanStats.AcceptedFileByteCount += Result.RawSizes[PathRangeIndex]; } CompletedPathCount++; } @@ -4669,6 +4796,884 @@ BuildsOperationUpdateFolder::VerifySequence(uint32_t RemoteSequenceIndex) } } +////////////////////// BuildsOperationUploadFolder + +BuildsOperationUploadFolder::BuildsOperationUploadFolder(BuildOpLogOutput& LogOutput, + StorageInstance& Storage, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag, + WorkerThreadPool& IOWorkerPool, + WorkerThreadPool& NetworkPool, + const Oid& BuildId, + const Oid& BuildPartId, + const std::string_view BuildPartName, + const std::filesystem::path& Path, + const std::filesystem::path& ManifestPath, + bool CreateBuild, + const CbObject& MetaData, + const Options& Options) +: m_LogOutput(LogOutput) +, m_Storage(Storage) +, m_AbortFlag(AbortFlag) +, m_PauseFlag(PauseFlag) +, m_IOWorkerPool(IOWorkerPool) +, m_NetworkPool(NetworkPool) +, m_BuildId(BuildId) +, m_BuildPartId(BuildPartId) +, m_BuildPartName(BuildPartName) +, m_Path(Path) +, m_ManifestPath(ManifestPath) +, m_CreateBuild(CreateBuild) +, m_MetaData(MetaData) +, m_Options(Options) +{ +} + +void +BuildsOperationUploadFolder::Execute() +{ + enum TaskSteps : uint32_t + { + PrepareBuild, + CalculateDelta, + Upload, + // Validate, + Cleanup, + StepCount + }; + + auto EndProgress = MakeGuard([&]() { m_LogOutput.SetLogOperationProgress(TaskSteps::StepCount, TaskSteps::StepCount); }); + + Stopwatch ProcessTimer; + + CreateDirectories(m_Options.TempDir); + CleanDirectory(m_Options.TempDir, {}); + auto _ = MakeGuard([&]() { + if (CleanDirectory(m_Options.TempDir, {})) + { + std::error_code DummyEc; + RemoveDir(m_Options.TempDir, DummyEc); + } + }); + + m_LogOutput.SetLogOperationProgress(TaskSteps::PrepareBuild, TaskSteps::StepCount); + + std::uint64_t TotalRawSize = 0; + + CbObject ChunkerParameters; + + struct PrepareBuildResult + { + std::vector<ChunkBlockDescription> KnownBlocks; + uint64_t PreferredMultipartChunkSize = 0; + uint64_t PayloadSize = 0; + uint64_t PrepareBuildTimeMs = 0; + uint64_t FindBlocksTimeMs = 0; + uint64_t ElapsedTimeMs = 0; + }; + + std::future<PrepareBuildResult> PrepBuildResultFuture = m_NetworkPool.EnqueueTask( + std::packaged_task<PrepareBuildResult()>{[this] { + ZEN_TRACE_CPU("PrepareBuild"); + + PrepareBuildResult Result; + Result.PreferredMultipartChunkSize = m_Options.PreferredMultipartChunkSize; + Stopwatch Timer; + if (m_CreateBuild) + { + ZEN_TRACE_CPU("CreateBuild"); + + Stopwatch PutBuildTimer; + CbObject PutBuildResult = m_Storage.BuildStorage->PutBuild(m_BuildId, m_MetaData); + Result.PrepareBuildTimeMs = PutBuildTimer.GetElapsedTimeMs(); + Result.PreferredMultipartChunkSize = PutBuildResult["chunkSize"sv].AsUInt64(Result.PreferredMultipartChunkSize); + Result.PayloadSize = m_MetaData.GetSize(); + } + else + { + ZEN_TRACE_CPU("PutBuild"); + Stopwatch GetBuildTimer; + CbObject Build = m_Storage.BuildStorage->GetBuild(m_BuildId); + Result.PrepareBuildTimeMs = GetBuildTimer.GetElapsedTimeMs(); + Result.PayloadSize = Build.GetSize(); + if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0) + { + Result.PreferredMultipartChunkSize = ChunkSize; + } + else if (m_Options.AllowMultiparts) + { + LOG_OUTPUT_WARN(m_LogOutput, + "PreferredMultipartChunkSize is unknown. Defaulting to '{}'", + NiceBytes(Result.PreferredMultipartChunkSize)); + } + } + + if (!m_Options.IgnoreExistingBlocks) + { + ZEN_TRACE_CPU("FindBlocks"); + Stopwatch KnownBlocksTimer; + CbObject BlockDescriptionList = m_Storage.BuildStorage->FindBlocks(m_BuildId, m_Options.FindBlockMaxCount); + if (BlockDescriptionList) + { + Result.KnownBlocks = ParseChunkBlockDescriptionList(BlockDescriptionList); + } + m_FindBlocksStats.FindBlockTimeMS = KnownBlocksTimer.GetElapsedTimeMs(); + m_FindBlocksStats.FoundBlockCount = Result.KnownBlocks.size(); + Result.FindBlocksTimeMs = KnownBlocksTimer.GetElapsedTimeMs(); + } + Result.ElapsedTimeMs = Timer.GetElapsedTimeMs(); + return Result; + }}, + WorkerThreadPool::EMode::EnableBacklog); + + ChunkedFolderContent LocalContent; + + { + auto IsAcceptedFolder = [this](const std::string_view& RelativePath) -> bool { + for (const std::string& ExcludeFolder : m_Options.ExcludeFolders) + { + if (RelativePath.starts_with(ExcludeFolder)) + { + if (RelativePath.length() == ExcludeFolder.length()) + { + return false; + } + else if (RelativePath[ExcludeFolder.length()] == '/') + { + return false; + } + } + } + return true; + }; + + auto IsAcceptedFile = [this](const std::string_view& RelativePath, uint64_t, uint32_t) -> bool { + for (const std::string& ExcludeExtension : m_Options.ExcludeExtensions) + { + if (RelativePath.ends_with(ExcludeExtension)) + { + return false; + } + } + return true; + }; + + auto ParseManifest = [](const std::filesystem::path& Path, + const std::filesystem::path& ManifestPath) -> std::vector<std::filesystem::path> { + std::vector<std::filesystem::path> AssetPaths; + std::filesystem::path AbsoluteManifestPath = + MakeSafeAbsolutePath(ManifestPath.is_absolute() ? ManifestPath : Path / ManifestPath); + IoBuffer ManifestContent = ReadFile(AbsoluteManifestPath).Flatten(); + std::string_view ManifestString((const char*)ManifestContent.GetView().GetData(), ManifestContent.GetSize()); + std::string_view::size_type Offset = 0; + while (Offset < ManifestContent.GetSize()) + { + size_t PathBreakOffset = ManifestString.find_first_of("\t\r\n", Offset); + if (PathBreakOffset == std::string_view::npos) + { + PathBreakOffset = ManifestContent.GetSize(); + } + std::string_view AssetPath = ManifestString.substr(Offset, PathBreakOffset - Offset); + if (!AssetPath.empty()) + { + AssetPaths.emplace_back(std::filesystem::path(AssetPath)); + } + Offset = PathBreakOffset; + size_t EolOffset = ManifestString.find_first_of("\r\n", Offset); + if (EolOffset == std::string_view::npos) + { + break; + } + Offset = EolOffset; + size_t LineBreakOffset = ManifestString.find_first_not_of("\t\r\n", Offset); + if (LineBreakOffset == std::string_view::npos) + { + break; + } + Offset = LineBreakOffset; + } + return AssetPaths; + }; + + Stopwatch ScanTimer; + FolderContent Content; + if (m_ManifestPath.empty()) + { + std::filesystem::path ExcludeManifestPath = m_Path / m_Options.ZenExcludeManifestName; + tsl::robin_set<std::string> ExcludeAssetPaths; + if (IsFile(ExcludeManifestPath)) + { + std::vector<std::filesystem::path> AssetPaths = ParseManifest(m_Path, ExcludeManifestPath); + ExcludeAssetPaths.reserve(AssetPaths.size()); + for (const std::filesystem::path& AssetPath : AssetPaths) + { + ExcludeAssetPaths.insert(AssetPath.generic_string()); + } + } + Content = GetFolderContent( + m_LocalFolderScanStats, + m_Path, + std::move(IsAcceptedFolder), + [this, &IsAcceptedFile, &ExcludeAssetPaths](const std::string_view& RelativePath, + uint64_t Size, + uint32_t Attributes) -> bool { + if (RelativePath == m_Options.ZenExcludeManifestName) + { + return false; + } + if (!IsAcceptedFile(RelativePath, Size, Attributes)) + { + return false; + } + if (ExcludeAssetPaths.contains(std::filesystem::path(RelativePath).generic_string())) + { + return false; + } + return true; + }, + m_IOWorkerPool, + m_LogOutput.GetProgressUpdateDelayMS(), + [&](bool, std::ptrdiff_t) { + LOG_OUTPUT(m_LogOutput, "Found {} files in '{}'...", m_LocalFolderScanStats.AcceptedFileCount.load(), m_Path); + }, + m_AbortFlag); + } + else + { + Stopwatch ManifestParseTimer; + std::vector<std::filesystem::path> AssetPaths = ParseManifest(m_Path, m_ManifestPath); + for (const std::filesystem::path& AssetPath : AssetPaths) + { + Content.Paths.push_back(AssetPath); + const std::filesystem::path AssetFilePath = (m_Path / AssetPath).make_preferred(); + Content.RawSizes.push_back(FileSizeFromPath(AssetFilePath)); +#if ZEN_PLATFORM_WINDOWS + Content.Attributes.push_back(GetFileAttributesFromPath(AssetFilePath)); +#endif // ZEN_PLATFORM_WINDOWS +#if ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX + Content.Attributes.push_back(GetFileMode(AssetFilePath)); +#endif // ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX + m_LocalFolderScanStats.AcceptedFileByteCount += Content.RawSizes.back(); + m_LocalFolderScanStats.AcceptedFileCount++; + } + if (m_ManifestPath.is_relative()) + { + Content.Paths.push_back(m_ManifestPath); + const std::filesystem::path ManifestFilePath = (m_Path / m_ManifestPath).make_preferred(); + Content.RawSizes.push_back(FileSizeFromPath(ManifestFilePath)); +#if ZEN_PLATFORM_WINDOWS + Content.Attributes.push_back(GetFileAttributesFromPath(ManifestFilePath)); +#endif // ZEN_PLATFORM_WINDOWS +#if ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX + Content.Attributes.push_back(GetFileMode(ManifestFilePath)); +#endif // ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX + + m_LocalFolderScanStats.AcceptedFileByteCount += Content.RawSizes.back(); + m_LocalFolderScanStats.AcceptedFileCount++; + } + m_LocalFolderScanStats.FoundFileByteCount.store(m_LocalFolderScanStats.AcceptedFileByteCount); + m_LocalFolderScanStats.FoundFileCount.store(m_LocalFolderScanStats.AcceptedFileCount); + m_LocalFolderScanStats.ElapsedWallTimeUS = ManifestParseTimer.GetElapsedTimeUs(); + } + + std::unique_ptr<ChunkingController> ChunkController = + CreateChunkingControllerWithFixedChunking(ChunkingControllerWithFixedChunkingSettings{}); + { + CbObjectWriter ChunkParametersWriter; + ChunkParametersWriter.AddString("name"sv, ChunkController->GetName()); + ChunkParametersWriter.AddObject("parameters"sv, ChunkController->GetParameters()); + ChunkerParameters = ChunkParametersWriter.Save(); + } + + TotalRawSize = std::accumulate(Content.RawSizes.begin(), Content.RawSizes.end(), std::uint64_t(0)); + + { + std::unique_ptr<BuildOpLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Scan Folder")); + BuildOpLogOutput::ProgressBar& Progress(*ProgressBarPtr); + + FilteredRate FilteredBytesHashed; + FilteredBytesHashed.Start(); + LocalContent = ChunkFolderContent( + m_ChunkingStats, + m_IOWorkerPool, + m_Path, + Content, + *ChunkController, + m_LogOutput.GetProgressUpdateDelayMS(), + [&](bool IsAborted, bool IsPaused, std::ptrdiff_t) { + FilteredBytesHashed.Update(m_ChunkingStats.BytesHashed.load()); + std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found", + m_ChunkingStats.FilesProcessed.load(), + Content.Paths.size(), + NiceBytes(m_ChunkingStats.BytesHashed.load()), + NiceBytes(TotalRawSize), + NiceNum(FilteredBytesHashed.GetCurrent()), + m_ChunkingStats.UniqueChunksFound.load(), + NiceBytes(m_ChunkingStats.UniqueBytesFound.load())); + Progress.UpdateState({.Task = "Scanning files ", + .Details = Details, + .TotalCount = TotalRawSize, + .RemainingCount = TotalRawSize - m_ChunkingStats.BytesHashed.load(), + .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, + false); + }, + m_AbortFlag, + m_PauseFlag); + FilteredBytesHashed.Stop(); + Progress.Finish(); + if (m_AbortFlag) + { + return; + } + } + + if (!m_Options.IsQuiet) + { + LOG_OUTPUT(m_LogOutput, + "Found {} ({}) files divided into {} ({}) unique chunks in '{}' in {}. Average hash rate {}B/sec", + LocalContent.Paths.size(), + NiceBytes(TotalRawSize), + m_ChunkingStats.UniqueChunksFound.load(), + NiceBytes(m_ChunkingStats.UniqueBytesFound.load()), + m_Path, + NiceTimeSpanMs(ScanTimer.GetElapsedTimeMs()), + NiceNum(GetBytesPerSecond(m_ChunkingStats.ElapsedWallTimeUS, m_ChunkingStats.BytesHashed))); + } + } + + const ChunkedContentLookup LocalLookup = BuildChunkedContentLookup(LocalContent); + + std::vector<size_t> ReuseBlockIndexes; + std::vector<uint32_t> NewBlockChunkIndexes; + + PrepareBuildResult PrepBuildResult = PrepBuildResultFuture.get(); + + if (!m_Options.IsQuiet) + { + LOG_OUTPUT(m_LogOutput, + "Build prepare took {}. {} took {}, payload size {}{}", + NiceTimeSpanMs(PrepBuildResult.ElapsedTimeMs), + m_CreateBuild ? "PutBuild" : "GetBuild", + NiceTimeSpanMs(PrepBuildResult.PrepareBuildTimeMs), + NiceBytes(PrepBuildResult.PayloadSize), + m_Options.IgnoreExistingBlocks ? "" + : fmt::format(". Found {} blocks in {}", + PrepBuildResult.KnownBlocks.size(), + NiceTimeSpanMs(PrepBuildResult.FindBlocksTimeMs))); + } + + m_LogOutput.SetLogOperationProgress(TaskSteps::CalculateDelta, TaskSteps::StepCount); + + const std::uint64_t LargeAttachmentSize = + m_Options.AllowMultiparts ? PrepBuildResult.PreferredMultipartChunkSize * 4u : (std::uint64_t)-1; + + Stopwatch BlockArrangeTimer; + + std::vector<std::uint32_t> LooseChunkIndexes; + { + bool EnableBlocks = true; + std::vector<std::uint32_t> BlockChunkIndexes; + for (uint32_t ChunkIndex = 0; ChunkIndex < LocalContent.ChunkedContent.ChunkHashes.size(); ChunkIndex++) + { + const uint64_t ChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; + if (!EnableBlocks || ChunkRawSize == 0 || ChunkRawSize > m_Options.BlockParameters.MaxChunkEmbedSize) + { + LooseChunkIndexes.push_back(ChunkIndex); + m_LooseChunksStats.ChunkByteCount += ChunkRawSize; + } + else + { + BlockChunkIndexes.push_back(ChunkIndex); + m_FindBlocksStats.PotentialChunkByteCount += ChunkRawSize; + } + } + m_FindBlocksStats.PotentialChunkCount = BlockChunkIndexes.size(); + m_LooseChunksStats.ChunkCount = LooseChunkIndexes.size(); + + if (m_Options.IgnoreExistingBlocks) + { + if (!m_Options.IsQuiet) + { + LOG_OUTPUT(m_LogOutput, "Ignoring any existing blocks in store"); + } + NewBlockChunkIndexes = std::move(BlockChunkIndexes); + } + else + { + ReuseBlockIndexes = FindReuseBlocks(PrepBuildResult.KnownBlocks, + LocalContent.ChunkedContent.ChunkHashes, + BlockChunkIndexes, + NewBlockChunkIndexes); + m_FindBlocksStats.AcceptedBlockCount = ReuseBlockIndexes.size(); + + for (const ChunkBlockDescription& Description : PrepBuildResult.KnownBlocks) + { + for (uint32_t ChunkRawLength : Description.ChunkRawLengths) + { + m_FindBlocksStats.FoundBlockByteCount += ChunkRawLength; + } + m_FindBlocksStats.FoundBlockChunkCount += Description.ChunkRawHashes.size(); + } + } + } + + std::vector<std::vector<uint32_t>> NewBlockChunks; + ArrangeChunksIntoBlocks(LocalContent, LocalLookup, NewBlockChunkIndexes, NewBlockChunks); + + m_FindBlocksStats.NewBlocksCount = NewBlockChunks.size(); + for (uint32_t ChunkIndex : NewBlockChunkIndexes) + { + m_FindBlocksStats.NewBlocksChunkByteCount += LocalContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; + } + m_FindBlocksStats.NewBlocksChunkCount = NewBlockChunkIndexes.size(); + + const double AcceptedByteCountPercent = + m_FindBlocksStats.PotentialChunkByteCount > 0 + ? (100.0 * m_FindBlocksStats.AcceptedRawByteCount / m_FindBlocksStats.PotentialChunkByteCount) + : 0.0; + + const double AcceptedReduntantByteCountPercent = + m_FindBlocksStats.AcceptedByteCount > 0 ? (100.0 * m_FindBlocksStats.AcceptedReduntantByteCount) / + (m_FindBlocksStats.AcceptedByteCount + m_FindBlocksStats.AcceptedReduntantByteCount) + : 0.0; + if (!m_Options.IsQuiet) + { + LOG_OUTPUT(m_LogOutput, + "Found {} chunks in {} ({}) blocks eligible for reuse in {}\n" + " Reusing {} ({}) matching chunks in {} blocks ({:.1f}%)\n" + " Accepting {} ({}) redundant chunks ({:.1f}%)\n" + " Rejected {} ({}) chunks in {} blocks\n" + " Arranged {} ({}) chunks in {} new blocks\n" + " Keeping {} ({}) chunks as loose chunks\n" + " Discovery completed in {}", + m_FindBlocksStats.FoundBlockChunkCount, + m_FindBlocksStats.FoundBlockCount, + NiceBytes(m_FindBlocksStats.FoundBlockByteCount), + NiceTimeSpanMs(m_FindBlocksStats.FindBlockTimeMS), + + m_FindBlocksStats.AcceptedChunkCount, + NiceBytes(m_FindBlocksStats.AcceptedRawByteCount), + m_FindBlocksStats.AcceptedBlockCount, + AcceptedByteCountPercent, + + m_FindBlocksStats.AcceptedReduntantChunkCount, + NiceBytes(m_FindBlocksStats.AcceptedReduntantByteCount), + AcceptedReduntantByteCountPercent, + + m_FindBlocksStats.RejectedChunkCount, + NiceBytes(m_FindBlocksStats.RejectedByteCount), + m_FindBlocksStats.RejectedBlockCount, + + m_FindBlocksStats.NewBlocksChunkCount, + NiceBytes(m_FindBlocksStats.NewBlocksChunkByteCount), + m_FindBlocksStats.NewBlocksCount, + + m_LooseChunksStats.ChunkCount, + NiceBytes(m_LooseChunksStats.ChunkByteCount), + + NiceTimeSpanMs(BlockArrangeTimer.GetElapsedTimeMs())); + } + + GeneratedBlocks NewBlocks; + + m_LogOutput.SetLogOperationProgress(TaskSteps::Upload, TaskSteps::StepCount); + + if (!NewBlockChunks.empty()) + { + Stopwatch GenerateBuildBlocksTimer; + auto __ = MakeGuard([&]() { + uint64_t BlockGenerateTimeUs = GenerateBuildBlocksTimer.GetElapsedTimeUs(); + if (!m_Options.IsQuiet) + { + LOG_OUTPUT(m_LogOutput, + "Generated {} ({}) and uploaded {} ({}) blocks in {}. Generate speed: {}B/sec. Transfer speed {}bits/sec.", + m_GenerateBlocksStats.GeneratedBlockCount.load(), + NiceBytes(m_GenerateBlocksStats.GeneratedBlockByteCount), + m_UploadStats.BlockCount.load(), + NiceBytes(m_UploadStats.BlocksBytes.load()), + NiceTimeSpanMs(BlockGenerateTimeUs / 1000), + NiceNum(GetBytesPerSecond(m_GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS, + m_GenerateBlocksStats.GeneratedBlockByteCount)), + NiceNum(GetBytesPerSecond(m_UploadStats.ElapsedWallTimeUS, m_UploadStats.BlocksBytes * 8))); + } + }); + GenerateBuildBlocks(LocalContent, LocalLookup, NewBlockChunks, NewBlocks); + } + + CbObject PartManifest; + { + CbObjectWriter PartManifestWriter; + Stopwatch ManifestGenerationTimer; + auto __ = MakeGuard([&]() { + if (!m_Options.IsQuiet) + { + LOG_OUTPUT(m_LogOutput, + "Generated build part manifest in {} ({})", + NiceTimeSpanMs(ManifestGenerationTimer.GetElapsedTimeMs()), + NiceBytes(PartManifestWriter.GetSaveSize())); + } + }); + PartManifestWriter.AddObject("chunker"sv, ChunkerParameters); + + std::vector<IoHash> AllChunkBlockHashes; + std::vector<ChunkBlockDescription> AllChunkBlockDescriptions; + AllChunkBlockHashes.reserve(ReuseBlockIndexes.size() + NewBlocks.BlockDescriptions.size()); + AllChunkBlockDescriptions.reserve(ReuseBlockIndexes.size() + NewBlocks.BlockDescriptions.size()); + for (size_t ReuseBlockIndex : ReuseBlockIndexes) + { + AllChunkBlockDescriptions.push_back(PrepBuildResult.KnownBlocks[ReuseBlockIndex]); + AllChunkBlockHashes.push_back(PrepBuildResult.KnownBlocks[ReuseBlockIndex].BlockHash); + } + AllChunkBlockDescriptions.insert(AllChunkBlockDescriptions.end(), + NewBlocks.BlockDescriptions.begin(), + NewBlocks.BlockDescriptions.end()); + for (const ChunkBlockDescription& BlockDescription : NewBlocks.BlockDescriptions) + { + AllChunkBlockHashes.push_back(BlockDescription.BlockHash); + } +#if EXTRA_VERIFY + tsl::robin_map<IoHash, size_t, IoHash::Hasher> ChunkHashToAbsoluteChunkIndex; + std::vector<IoHash> AbsoluteChunkHashes; + AbsoluteChunkHashes.reserve(LocalContent.ChunkedContent.ChunkHashes.size()); + for (uint32_t ChunkIndex : LooseChunkIndexes) + { + ChunkHashToAbsoluteChunkIndex.insert({LocalContent.ChunkedContent.ChunkHashes[ChunkIndex], AbsoluteChunkHashes.size()}); + AbsoluteChunkHashes.push_back(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]); + } + for (const ChunkBlockDescription& Block : AllChunkBlockDescriptions) + { + for (const IoHash& ChunkHash : Block.ChunkHashes) + { + ChunkHashToAbsoluteChunkIndex.insert({ChunkHash, AbsoluteChunkHashes.size()}); + AbsoluteChunkHashes.push_back(ChunkHash); + } + } + for (const IoHash& ChunkHash : LocalContent.ChunkedContent.ChunkHashes) + { + ZEN_ASSERT(AbsoluteChunkHashes[ChunkHashToAbsoluteChunkIndex.at(ChunkHash)] == ChunkHash); + ZEN_ASSERT(LocalContent.ChunkedContent.ChunkHashes[LocalLookup.ChunkHashToChunkIndex.at(ChunkHash)] == ChunkHash); + } + for (const uint32_t ChunkIndex : LocalContent.ChunkedContent.ChunkOrders) + { + ZEN_ASSERT(AbsoluteChunkHashes[ChunkHashToAbsoluteChunkIndex.at(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex])] == + LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]); + ZEN_ASSERT(LocalLookup.ChunkHashToChunkIndex.at(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]) == ChunkIndex); + } +#endif // EXTRA_VERIFY + std::vector<uint32_t> AbsoluteChunkOrders = CalculateAbsoluteChunkOrders(LocalContent.ChunkedContent.ChunkHashes, + LocalContent.ChunkedContent.ChunkOrders, + LocalLookup.ChunkHashToChunkIndex, + LooseChunkIndexes, + AllChunkBlockDescriptions); + +#if EXTRA_VERIFY + for (uint32_t ChunkOrderIndex = 0; ChunkOrderIndex < LocalContent.ChunkedContent.ChunkOrders.size(); ChunkOrderIndex++) + { + uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[ChunkOrderIndex]; + uint32_t AbsoluteChunkIndex = AbsoluteChunkOrders[ChunkOrderIndex]; + const IoHash& LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; + const IoHash& AbsoluteChunkHash = AbsoluteChunkHashes[AbsoluteChunkIndex]; + ZEN_ASSERT(LocalChunkHash == AbsoluteChunkHash); + } +#endif // EXTRA_VERIFY + + WriteBuildContentToCompactBinary(PartManifestWriter, + LocalContent.Platform, + LocalContent.Paths, + LocalContent.RawHashes, + LocalContent.RawSizes, + LocalContent.Attributes, + LocalContent.ChunkedContent.SequenceRawHashes, + LocalContent.ChunkedContent.ChunkCounts, + LocalContent.ChunkedContent.ChunkHashes, + LocalContent.ChunkedContent.ChunkRawSizes, + AbsoluteChunkOrders, + LooseChunkIndexes, + AllChunkBlockHashes); + +#if EXTRA_VERIFY + { + ChunkedFolderContent VerifyFolderContent; + + std::vector<uint32_t> OutAbsoluteChunkOrders; + std::vector<IoHash> OutLooseChunkHashes; + std::vector<uint64_t> OutLooseChunkRawSizes; + std::vector<IoHash> OutBlockRawHashes; + ReadBuildContentFromCompactBinary(PartManifestWriter.Save(), + VerifyFolderContent.Platform, + VerifyFolderContent.Paths, + VerifyFolderContent.RawHashes, + VerifyFolderContent.RawSizes, + VerifyFolderContent.Attributes, + VerifyFolderContent.ChunkedContent.SequenceRawHashes, + VerifyFolderContent.ChunkedContent.ChunkCounts, + OutAbsoluteChunkOrders, + OutLooseChunkHashes, + OutLooseChunkRawSizes, + OutBlockRawHashes); + ZEN_ASSERT(OutBlockRawHashes == AllChunkBlockHashes); + + for (uint32_t OrderIndex = 0; OrderIndex < OutAbsoluteChunkOrders.size(); OrderIndex++) + { + uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[OrderIndex]; + const IoHash LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; + + uint32_t VerifyChunkIndex = OutAbsoluteChunkOrders[OrderIndex]; + const IoHash VerifyChunkHash = AbsoluteChunkHashes[VerifyChunkIndex]; + + ZEN_ASSERT(LocalChunkHash == VerifyChunkHash); + } + + CalculateLocalChunkOrders(OutAbsoluteChunkOrders, + OutLooseChunkHashes, + OutLooseChunkRawSizes, + AllChunkBlockDescriptions, + VerifyFolderContent.ChunkedContent.ChunkHashes, + VerifyFolderContent.ChunkedContent.ChunkRawSizes, + VerifyFolderContent.ChunkedContent.ChunkOrders); + + ZEN_ASSERT(LocalContent.Paths == VerifyFolderContent.Paths); + ZEN_ASSERT(LocalContent.RawHashes == VerifyFolderContent.RawHashes); + ZEN_ASSERT(LocalContent.RawSizes == VerifyFolderContent.RawSizes); + ZEN_ASSERT(LocalContent.Attributes == VerifyFolderContent.Attributes); + ZEN_ASSERT(LocalContent.ChunkedContent.SequenceRawHashes == VerifyFolderContent.ChunkedContent.SequenceRawHashes); + ZEN_ASSERT(LocalContent.ChunkedContent.ChunkCounts == VerifyFolderContent.ChunkedContent.ChunkCounts); + + for (uint32_t OrderIndex = 0; OrderIndex < LocalContent.ChunkedContent.ChunkOrders.size(); OrderIndex++) + { + uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[OrderIndex]; + const IoHash LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; + uint64_t LocalChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[LocalChunkIndex]; + + uint32_t VerifyChunkIndex = VerifyFolderContent.ChunkedContent.ChunkOrders[OrderIndex]; + const IoHash VerifyChunkHash = VerifyFolderContent.ChunkedContent.ChunkHashes[VerifyChunkIndex]; + uint64_t VerifyChunkRawSize = VerifyFolderContent.ChunkedContent.ChunkRawSizes[VerifyChunkIndex]; + + ZEN_ASSERT(LocalChunkHash == VerifyChunkHash); + ZEN_ASSERT(LocalChunkRawSize == VerifyChunkRawSize); + } + } +#endif // EXTRA_VERIFY + PartManifest = PartManifestWriter.Save(); + } + + Stopwatch PutBuildPartResultTimer; + std::pair<IoHash, std::vector<IoHash>> PutBuildPartResult = + m_Storage.BuildStorage->PutBuildPart(m_BuildId, m_BuildPartId, m_BuildPartName, PartManifest); + if (!m_Options.IsQuiet) + { + LOG_OUTPUT(m_LogOutput, + "PutBuildPart took {}, payload size {}. {} attachments are needed.", + NiceTimeSpanMs(PutBuildPartResultTimer.GetElapsedTimeMs()), + NiceBytes(PartManifest.GetSize()), + PutBuildPartResult.second.size()); + } + IoHash PartHash = PutBuildPartResult.first; + + auto UploadAttachments = [this, &LocalContent, &LocalLookup, &NewBlockChunks, &NewBlocks, &LooseChunkIndexes, &LargeAttachmentSize]( + std::span<IoHash> RawHashes) { + if (!m_AbortFlag) + { + UploadStatistics TempUploadStats; + LooseChunksStatistics TempLooseChunksStats; + + Stopwatch TempUploadTimer; + auto __ = MakeGuard([&]() { + if (!m_Options.IsQuiet) + { + uint64_t TempChunkUploadTimeUs = TempUploadTimer.GetElapsedTimeUs(); + LOG_OUTPUT(m_LogOutput, + "Uploaded {} ({}) blocks. " + "Compressed {} ({} {}B/s) and uploaded {} ({}) chunks. " + "Transferred {} ({}bits/s) in {}", + TempUploadStats.BlockCount.load(), + NiceBytes(TempUploadStats.BlocksBytes), + + TempLooseChunksStats.CompressedChunkCount.load(), + NiceBytes(TempLooseChunksStats.CompressedChunkBytes.load()), + NiceNum(GetBytesPerSecond(TempLooseChunksStats.CompressChunksElapsedWallTimeUS, + TempLooseChunksStats.ChunkByteCount)), + TempUploadStats.ChunkCount.load(), + NiceBytes(TempUploadStats.ChunksBytes), + + NiceBytes(TempUploadStats.BlocksBytes + TempUploadStats.ChunksBytes), + NiceNum(GetBytesPerSecond(TempUploadStats.ElapsedWallTimeUS, TempUploadStats.ChunksBytes * 8)), + NiceTimeSpanMs(TempChunkUploadTimeUs / 1000)); + } + }); + UploadPartBlobs(LocalContent, + LocalLookup, + RawHashes, + NewBlockChunks, + NewBlocks, + LooseChunkIndexes, + LargeAttachmentSize, + TempUploadStats, + TempLooseChunksStats); + m_UploadStats += TempUploadStats; + m_LooseChunksStats += TempLooseChunksStats; + } + }; + if (m_Options.IgnoreExistingBlocks) + { + if (m_Options.IsVerbose) + { + LOG_OUTPUT(m_LogOutput, + "PutBuildPart uploading all attachments, needs are: {}", + FormatArray<IoHash>(PutBuildPartResult.second, "\n "sv)); + } + + std::vector<IoHash> ForceUploadChunkHashes; + ForceUploadChunkHashes.reserve(LooseChunkIndexes.size()); + + for (uint32_t ChunkIndex : LooseChunkIndexes) + { + ForceUploadChunkHashes.push_back(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]); + } + + for (size_t BlockIndex = 0; BlockIndex < NewBlocks.BlockHeaders.size(); BlockIndex++) + { + if (NewBlocks.BlockHeaders[BlockIndex]) + { + // Block was not uploaded during generation + ForceUploadChunkHashes.push_back(NewBlocks.BlockDescriptions[BlockIndex].BlockHash); + } + } + UploadAttachments(ForceUploadChunkHashes); + } + else if (!PutBuildPartResult.second.empty()) + { + if (m_Options.IsVerbose) + { + LOG_OUTPUT(m_LogOutput, "PutBuildPart needs attachments: {}", FormatArray<IoHash>(PutBuildPartResult.second, "\n "sv)); + } + UploadAttachments(PutBuildPartResult.second); + } + + uint32_t FinalizeBuildPartRetryCount = 5; + while (!m_AbortFlag && (FinalizeBuildPartRetryCount--) > 0) + { + Stopwatch FinalizeBuildPartTimer; + std::vector<IoHash> Needs = m_Storage.BuildStorage->FinalizeBuildPart(m_BuildId, m_BuildPartId, PartHash); + if (!m_Options.IsQuiet) + { + LOG_OUTPUT(m_LogOutput, + "FinalizeBuildPart took {}. {} attachments are missing.", + NiceTimeSpanMs(FinalizeBuildPartTimer.GetElapsedTimeMs()), + Needs.size()); + } + if (Needs.empty()) + { + break; + } + if (m_Options.IsVerbose) + { + LOG_OUTPUT(m_LogOutput, "FinalizeBuildPart needs attachments: {}", FormatArray<IoHash>(Needs, "\n "sv)); + } + UploadAttachments(Needs); + } + + if (m_CreateBuild && !m_AbortFlag) + { + Stopwatch FinalizeBuildTimer; + m_Storage.BuildStorage->FinalizeBuild(m_BuildId); + if (!m_Options.IsQuiet) + { + LOG_OUTPUT(m_LogOutput, "FinalizeBuild took {}", NiceTimeSpanMs(FinalizeBuildTimer.GetElapsedTimeMs())); + } + } + + if (!NewBlocks.BlockDescriptions.empty() && !m_AbortFlag) + { + uint64_t UploadBlockMetadataCount = 0; + Stopwatch UploadBlockMetadataTimer; + + uint32_t FailedMetadataUploadCount = 1; + int32_t MetadataUploadRetryCount = 3; + while ((MetadataUploadRetryCount-- > 0) && (FailedMetadataUploadCount > 0)) + { + FailedMetadataUploadCount = 0; + for (size_t BlockIndex = 0; BlockIndex < NewBlocks.BlockDescriptions.size(); BlockIndex++) + { + if (m_AbortFlag) + { + break; + } + const IoHash& BlockHash = NewBlocks.BlockDescriptions[BlockIndex].BlockHash; + if (!NewBlocks.MetaDataHasBeenUploaded[BlockIndex]) + { + const CbObject BlockMetaData = + BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]); + if (m_Storage.BuildCacheStorage) + { + m_Storage.BuildCacheStorage->PutBlobMetadatas(m_BuildId, + std::vector<IoHash>({BlockHash}), + std::vector<CbObject>({BlockMetaData})); + } + bool MetadataSucceeded = m_Storage.BuildStorage->PutBlockMetadata(m_BuildId, BlockHash, BlockMetaData); + if (MetadataSucceeded) + { + m_UploadStats.BlocksBytes += BlockMetaData.GetSize(); + NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; + UploadBlockMetadataCount++; + } + else + { + FailedMetadataUploadCount++; + } + } + } + } + if (UploadBlockMetadataCount > 0) + { + uint64_t ElapsedUS = UploadBlockMetadataTimer.GetElapsedTimeUs(); + m_UploadStats.ElapsedWallTimeUS += ElapsedUS; + if (!m_Options.IsQuiet) + { + LOG_OUTPUT(m_LogOutput, + "Uploaded metadata for {} blocks in {}", + UploadBlockMetadataCount, + NiceTimeSpanMs(ElapsedUS / 1000)); + } + } + } + + // if (m_PostUploadVerify && !m_AbortFlag) + // { + // //m_LogOutput.SetLogOperationProgress(TaskSteps::Validate, TaskSteps::StepCount); + // ValidateBuildPart(*m_Storage.BuildStorage, m_BuildId, m_BuildPartId, m_BuildPartName, ValidateStats, ValidateDownloadStats); + // std::string ValidateInfo; + // if (m_PostUploadVerify) + // { + // const uint64_t DownloadedCount = ValidateDownloadStats.DownloadedChunkCount + ValidateDownloadStats.DownloadedBlockCount; + // const uint64_t DownloadedByteCount = + // ValidateDownloadStats.DownloadedChunkByteCount + ValidateDownloadStats.DownloadedBlockByteCount; + // ValidateInfo = fmt::format("\n Verified: {:>8} ({}), {}B/sec, {}", + // DownloadedCount, + // NiceBytes(DownloadedByteCount), + // NiceNum(GetBytesPerSecond(ValidateStats.ElapsedWallTimeUS, DownloadedByteCount)), + // NiceTimeSpanMs(ValidateStats.ElapsedWallTimeUS / 1000)); + // } + // } + + m_Storage.BuildStorage->PutBuildPartStats( + m_BuildId, + m_BuildPartId, + {{"totalSize", double(m_LocalFolderScanStats.FoundFileByteCount.load())}, + {"reusedRatio", AcceptedByteCountPercent / 100.0}, + {"reusedBlockCount", double(m_FindBlocksStats.AcceptedBlockCount)}, + {"reusedBlockByteCount", double(m_FindBlocksStats.AcceptedRawByteCount)}, + {"newBlockCount", double(m_FindBlocksStats.NewBlocksCount)}, + {"newBlockByteCount", double(m_FindBlocksStats.NewBlocksChunkByteCount)}, + {"uploadedCount", double(m_UploadStats.BlockCount.load() + m_UploadStats.ChunkCount.load())}, + {"uploadedByteCount", double(m_UploadStats.BlocksBytes.load() + m_UploadStats.ChunksBytes.load())}, + {"uploadedBytesPerSec", + double(GetBytesPerSecond(m_UploadStats.ElapsedWallTimeUS, m_UploadStats.ChunksBytes + m_UploadStats.BlocksBytes))}, + {"elapsedTimeSec", double(ProcessTimer.GetElapsedTimeMs() / 1000.0)}}); + + m_LogOutput.SetLogOperationProgress(TaskSteps::Cleanup, TaskSteps::StepCount); +} + void DownloadLargeBlob(BuildStorage& Storage, const std::filesystem::path& DownloadFolder, @@ -4739,4 +5744,1390 @@ DownloadLargeBlob(BuildStorage& Storage, } } +std::vector<size_t> +BuildsOperationUploadFolder::FindReuseBlocks(const std::vector<ChunkBlockDescription>& KnownBlocks, + std::span<const IoHash> ChunkHashes, + std::span<const uint32_t> ChunkIndexes, + std::vector<uint32_t>& OutUnusedChunkIndexes) +{ + ZEN_TRACE_CPU("FindReuseBlocks"); + + // Find all blocks with a usage level higher than MinPercentLimit + // Pick out the blocks with usage higher or equal to MinPercentLimit + // Sort them with highest size usage - most usage first + // Make a list of all chunks and mark them as not found + // For each block, recalculate the block has usage percent based on the chunks marked as not found + // If the block still reaches MinPercentLimit, keep it and remove the matching chunks from the not found list + // Repeat for following all remaining block that initially matched MinPercentLimit + + std::vector<size_t> FilteredReuseBlockIndexes; + + uint32_t ChunkCount = gsl::narrow<uint32_t>(ChunkHashes.size()); + std::vector<bool> ChunkFound(ChunkCount, false); + + if (ChunkCount > 0) + { + if (!KnownBlocks.empty()) + { + Stopwatch ReuseTimer; + + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkHashToChunkIndex; + ChunkHashToChunkIndex.reserve(ChunkIndexes.size()); + for (uint32_t ChunkIndex : ChunkIndexes) + { + ChunkHashToChunkIndex.insert_or_assign(ChunkHashes[ChunkIndex], ChunkIndex); + } + + std::vector<size_t> BlockSizes(KnownBlocks.size(), 0); + std::vector<size_t> BlockUseSize(KnownBlocks.size(), 0); + + std::vector<size_t> ReuseBlockIndexes; + + for (size_t KnownBlockIndex = 0; KnownBlockIndex < KnownBlocks.size(); KnownBlockIndex++) + { + const ChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex]; + if (KnownBlock.BlockHash != IoHash::Zero && KnownBlock.ChunkRawHashes.size() == KnownBlock.ChunkCompressedLengths.size()) + { + size_t BlockAttachmentCount = KnownBlock.ChunkRawHashes.size(); + if (BlockAttachmentCount == 0) + { + continue; + } + size_t ReuseSize = 0; + size_t BlockSize = 0; + size_t FoundAttachmentCount = 0; + size_t BlockChunkCount = KnownBlock.ChunkRawHashes.size(); + for (size_t BlockChunkIndex = 0; BlockChunkIndex < BlockChunkCount; BlockChunkIndex++) + { + const IoHash& BlockChunkHash = KnownBlock.ChunkRawHashes[BlockChunkIndex]; + const uint32_t BlockChunkSize = KnownBlock.ChunkCompressedLengths[BlockChunkIndex]; + BlockSize += BlockChunkSize; + if (ChunkHashToChunkIndex.contains(BlockChunkHash)) + { + ReuseSize += BlockChunkSize; + FoundAttachmentCount++; + } + } + + size_t ReusePercent = (ReuseSize * 100) / BlockSize; + + if (ReusePercent >= m_Options.BlockReuseMinPercentLimit) + { + if (m_Options.IsVerbose) + { + LOG_OUTPUT(m_LogOutput, + "Reusing block {}. {} attachments found, usage level: {}%", + KnownBlock.BlockHash, + FoundAttachmentCount, + ReusePercent); + } + ReuseBlockIndexes.push_back(KnownBlockIndex); + + BlockSizes[KnownBlockIndex] = BlockSize; + BlockUseSize[KnownBlockIndex] = ReuseSize; + } + else if (FoundAttachmentCount > 0) + { + // if (m_Options.IsVerbose) + //{ + // LOG_OUTPUT(m_LogOutput, "Skipping block {}. {} attachments found, usage level: {}%", KnownBlock.BlockHash, + // FoundAttachmentCount, ReusePercent); + //} + m_FindBlocksStats.RejectedBlockCount++; + m_FindBlocksStats.RejectedChunkCount += FoundAttachmentCount; + m_FindBlocksStats.RejectedByteCount += ReuseSize; + } + } + } + + if (!ReuseBlockIndexes.empty()) + { + std::sort(ReuseBlockIndexes.begin(), ReuseBlockIndexes.end(), [&](size_t Lhs, size_t Rhs) { + return BlockUseSize[Lhs] > BlockUseSize[Rhs]; + }); + + for (size_t KnownBlockIndex : ReuseBlockIndexes) + { + std::vector<uint32_t> FoundChunkIndexes; + size_t BlockSize = 0; + size_t AdjustedReuseSize = 0; + size_t AdjustedRawReuseSize = 0; + const ChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex]; + for (size_t BlockChunkIndex = 0; BlockChunkIndex < KnownBlock.ChunkRawHashes.size(); BlockChunkIndex++) + { + const IoHash& BlockChunkHash = KnownBlock.ChunkRawHashes[BlockChunkIndex]; + const uint32_t BlockChunkSize = KnownBlock.ChunkCompressedLengths[BlockChunkIndex]; + BlockSize += BlockChunkSize; + if (auto It = ChunkHashToChunkIndex.find(BlockChunkHash); It != ChunkHashToChunkIndex.end()) + { + const uint32_t ChunkIndex = It->second; + if (!ChunkFound[ChunkIndex]) + { + FoundChunkIndexes.push_back(ChunkIndex); + AdjustedReuseSize += KnownBlock.ChunkCompressedLengths[BlockChunkIndex]; + AdjustedRawReuseSize += KnownBlock.ChunkRawLengths[BlockChunkIndex]; + } + } + } + + size_t ReusePercent = (AdjustedReuseSize * 100) / BlockSize; + + if (ReusePercent >= m_Options.BlockReuseMinPercentLimit) + { + if (m_Options.IsVerbose) + { + LOG_OUTPUT(m_LogOutput, + "Reusing block {}. {} attachments found, usage level: {}%", + KnownBlock.BlockHash, + FoundChunkIndexes.size(), + ReusePercent); + } + FilteredReuseBlockIndexes.push_back(KnownBlockIndex); + + for (uint32_t ChunkIndex : FoundChunkIndexes) + { + ChunkFound[ChunkIndex] = true; + } + m_FindBlocksStats.AcceptedChunkCount += FoundChunkIndexes.size(); + m_FindBlocksStats.AcceptedByteCount += AdjustedReuseSize; + m_FindBlocksStats.AcceptedRawByteCount += AdjustedRawReuseSize; + m_FindBlocksStats.AcceptedReduntantChunkCount += KnownBlock.ChunkRawHashes.size() - FoundChunkIndexes.size(); + m_FindBlocksStats.AcceptedReduntantByteCount += BlockSize - AdjustedReuseSize; + } + else + { + // if (m_Options.IsVerbose) + //{ + // LOG_OUTPUT(m_LogOutput, "Skipping block {}. filtered usage level: {}%", KnownBlock.BlockHash, ReusePercent); + //} + m_FindBlocksStats.RejectedBlockCount++; + m_FindBlocksStats.RejectedChunkCount += FoundChunkIndexes.size(); + m_FindBlocksStats.RejectedByteCount += AdjustedReuseSize; + } + } + } + } + OutUnusedChunkIndexes.reserve(ChunkIndexes.size() - m_FindBlocksStats.AcceptedChunkCount); + for (uint32_t ChunkIndex : ChunkIndexes) + { + if (!ChunkFound[ChunkIndex]) + { + OutUnusedChunkIndexes.push_back(ChunkIndex); + } + } + } + return FilteredReuseBlockIndexes; +} + +void +BuildsOperationUploadFolder::ArrangeChunksIntoBlocks(const ChunkedFolderContent& Content, + const ChunkedContentLookup& Lookup, + std::vector<uint32_t>& ChunkIndexes, + std::vector<std::vector<uint32_t>>& OutBlocks) +{ + ZEN_TRACE_CPU("ArrangeChunksIntoBlocks"); + std::sort(ChunkIndexes.begin(), ChunkIndexes.end(), [&Content, &Lookup](uint32_t Lhs, uint32_t Rhs) { + const ChunkedContentLookup::ChunkSequenceLocation& LhsLocation = GetChunkSequenceLocations(Lookup, Lhs)[0]; + const ChunkedContentLookup::ChunkSequenceLocation& RhsLocation = GetChunkSequenceLocations(Lookup, Rhs)[0]; + if (LhsLocation.SequenceIndex < RhsLocation.SequenceIndex) + { + return true; + } + else if (LhsLocation.SequenceIndex > RhsLocation.SequenceIndex) + { + return false; + } + return LhsLocation.Offset < RhsLocation.Offset; + }); + + uint64_t MaxBlockSizeLowThreshold = m_Options.BlockParameters.MaxBlockSize - (m_Options.BlockParameters.MaxBlockSize / 16); + + uint64_t BlockSize = 0; + + uint32_t ChunkIndexStart = 0; + for (uint32_t ChunkIndexOffset = 0; ChunkIndexOffset < ChunkIndexes.size();) + { + const uint32_t ChunkIndex = ChunkIndexes[ChunkIndexOffset]; + const uint64_t ChunkSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; + + if (((BlockSize + ChunkSize) > m_Options.BlockParameters.MaxBlockSize) || + (ChunkIndexOffset - ChunkIndexStart) > m_Options.BlockParameters.MaxChunksPerBlock) + { + // Within the span of MaxBlockSizeLowThreshold and MaxBlockSize, see if there is a break + // between source paths for chunks. Break the block at the last such break if any. + ZEN_ASSERT(ChunkIndexOffset > ChunkIndexStart); + + const uint32_t ChunkSequenceIndex = Lookup.ChunkSequenceLocations[Lookup.ChunkSequenceLocationOffset[ChunkIndex]].SequenceIndex; + + uint64_t ScanBlockSize = BlockSize; + + uint32_t ScanChunkIndexOffset = ChunkIndexOffset - 1; + while (ScanChunkIndexOffset > (ChunkIndexStart + 2)) + { + const uint32_t TestChunkIndex = ChunkIndexes[ScanChunkIndexOffset]; + const uint64_t TestChunkSize = Content.ChunkedContent.ChunkRawSizes[TestChunkIndex]; + if ((ScanBlockSize - TestChunkSize) < MaxBlockSizeLowThreshold) + { + break; + } + + const uint32_t TestSequenceIndex = + Lookup.ChunkSequenceLocations[Lookup.ChunkSequenceLocationOffset[TestChunkIndex]].SequenceIndex; + if (ChunkSequenceIndex != TestSequenceIndex) + { + ChunkIndexOffset = ScanChunkIndexOffset + 1; + break; + } + + ScanBlockSize -= TestChunkSize; + ScanChunkIndexOffset--; + } + + std::vector<uint32_t> ChunksInBlock; + ChunksInBlock.reserve(ChunkIndexOffset - ChunkIndexStart); + for (uint32_t AddIndexOffset = ChunkIndexStart; AddIndexOffset < ChunkIndexOffset; AddIndexOffset++) + { + const uint32_t AddChunkIndex = ChunkIndexes[AddIndexOffset]; + ChunksInBlock.push_back(AddChunkIndex); + } + OutBlocks.emplace_back(std::move(ChunksInBlock)); + BlockSize = 0; + ChunkIndexStart = ChunkIndexOffset; + } + else + { + ChunkIndexOffset++; + BlockSize += ChunkSize; + } + } + if (ChunkIndexStart < ChunkIndexes.size()) + { + std::vector<uint32_t> ChunksInBlock; + ChunksInBlock.reserve(ChunkIndexes.size() - ChunkIndexStart); + for (uint32_t AddIndexOffset = ChunkIndexStart; AddIndexOffset < ChunkIndexes.size(); AddIndexOffset++) + { + const uint32_t AddChunkIndex = ChunkIndexes[AddIndexOffset]; + ChunksInBlock.push_back(AddChunkIndex); + } + OutBlocks.emplace_back(std::move(ChunksInBlock)); + } +} + +void +BuildsOperationUploadFolder::GenerateBuildBlocks(const ChunkedFolderContent& Content, + const ChunkedContentLookup& Lookup, + const std::vector<std::vector<uint32_t>>& NewBlockChunks, + GeneratedBlocks& OutBlocks) +{ + ZEN_TRACE_CPU("GenerateBuildBlocks"); + const std::size_t NewBlockCount = NewBlockChunks.size(); + if (NewBlockCount > 0) + { + std::unique_ptr<BuildOpLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Generate Blocks")); + BuildOpLogOutput::ProgressBar& Progress(*ProgressBarPtr); + + OutBlocks.BlockDescriptions.resize(NewBlockCount); + OutBlocks.BlockSizes.resize(NewBlockCount); + OutBlocks.BlockMetaDatas.resize(NewBlockCount); + OutBlocks.BlockHeaders.resize(NewBlockCount); + OutBlocks.MetaDataHasBeenUploaded.resize(NewBlockCount, 0); + OutBlocks.BlockHashToBlockIndex.reserve(NewBlockCount); + + RwLock Lock; + + WorkerThreadPool& GenerateBlobsPool = m_IOWorkerPool; + WorkerThreadPool& UploadBlocksPool = m_NetworkPool; + + FilteredRate FilteredGeneratedBytesPerSecond; + FilteredRate FilteredUploadedBytesPerSecond; + + ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + std::atomic<uint64_t> QueuedPendingBlocksForUpload = 0; + + for (size_t BlockIndex = 0; BlockIndex < NewBlockCount; BlockIndex++) + { + if (Work.IsAborted()) + { + break; + } + const std::vector<uint32_t>& ChunksInBlock = NewBlockChunks[BlockIndex]; + Work.ScheduleWork( + GenerateBlobsPool, + [this, + &Content, + &Lookup, + &Work, + &UploadBlocksPool, + NewBlockCount, + ChunksInBlock, + &Lock, + &OutBlocks, + &FilteredGeneratedBytesPerSecond, + &QueuedPendingBlocksForUpload, + &FilteredUploadedBytesPerSecond, + BlockIndex](std::atomic<bool>&) { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("GenerateBuildBlocks_Generate"); + + FilteredGeneratedBytesPerSecond.Start(); + // TODO: Convert ScheduleWork body to function + + Stopwatch GenerateTimer; + CompressedBuffer CompressedBlock = + GenerateBlock(Content, Lookup, ChunksInBlock, OutBlocks.BlockDescriptions[BlockIndex]); + if (m_Options.IsVerbose) + { + LOG_OUTPUT(m_LogOutput, + "Generated block {} ({}) containing {} chunks in {}", + OutBlocks.BlockDescriptions[BlockIndex].BlockHash, + NiceBytes(CompressedBlock.GetCompressedSize()), + OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size(), + NiceTimeSpanMs(GenerateTimer.GetElapsedTimeMs())); + } + + OutBlocks.BlockSizes[BlockIndex] = CompressedBlock.GetCompressedSize(); + { + CbObjectWriter Writer; + Writer.AddString("createdBy", "zen"); + OutBlocks.BlockMetaDatas[BlockIndex] = Writer.Save(); + } + m_GenerateBlocksStats.GeneratedBlockByteCount += OutBlocks.BlockSizes[BlockIndex]; + m_GenerateBlocksStats.GeneratedBlockCount++; + + Lock.WithExclusiveLock([&]() { + OutBlocks.BlockHashToBlockIndex.insert_or_assign(OutBlocks.BlockDescriptions[BlockIndex].BlockHash, BlockIndex); + }); + + { + std::span<const SharedBuffer> Segments = CompressedBlock.GetCompressed().GetSegments(); + ZEN_ASSERT(Segments.size() >= 2); + OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]); + } + + if (m_GenerateBlocksStats.GeneratedBlockCount == NewBlockCount) + { + FilteredGeneratedBytesPerSecond.Stop(); + } + + if (QueuedPendingBlocksForUpload.load() > 16) + { + std::span<const SharedBuffer> Segments = CompressedBlock.GetCompressed().GetSegments(); + ZEN_ASSERT(Segments.size() >= 2); + OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]); + } + else + { + if (!m_AbortFlag) + { + QueuedPendingBlocksForUpload++; + + Work.ScheduleWork( + UploadBlocksPool, + [this, + NewBlockCount, + &FilteredUploadedBytesPerSecond, + &QueuedPendingBlocksForUpload, + &OutBlocks, + BlockIndex, + Payload = std::move(CompressedBlock)](std::atomic<bool>&) mutable { + auto _ = MakeGuard([&QueuedPendingBlocksForUpload] { QueuedPendingBlocksForUpload--; }); + if (!m_AbortFlag) + { + if (m_GenerateBlocksStats.GeneratedBlockCount == NewBlockCount) + { + ZEN_TRACE_CPU("GenerateBuildBlocks_Save"); + + FilteredUploadedBytesPerSecond.Stop(); + std::span<const SharedBuffer> Segments = Payload.GetCompressed().GetSegments(); + ZEN_ASSERT(Segments.size() >= 2); + OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]); + } + else + { + ZEN_TRACE_CPU("GenerateBuildBlocks_Upload"); + + FilteredUploadedBytesPerSecond.Start(); + // TODO: Convert ScheduleWork body to function + + const CbObject BlockMetaData = + BuildChunkBlockDescription(OutBlocks.BlockDescriptions[BlockIndex], + OutBlocks.BlockMetaDatas[BlockIndex]); + + const IoHash& BlockHash = OutBlocks.BlockDescriptions[BlockIndex].BlockHash; + const uint64_t CompressedBlockSize = Payload.GetCompressedSize(); + + if (m_Storage.BuildCacheStorage) + { + m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, + BlockHash, + ZenContentType::kCompressedBinary, + Payload.GetCompressed()); + } + + m_Storage.BuildStorage->PutBuildBlob(m_BuildId, + BlockHash, + ZenContentType::kCompressedBinary, + std::move(Payload).GetCompressed()); + m_UploadStats.BlocksBytes += CompressedBlockSize; + + if (m_Options.IsVerbose) + { + LOG_OUTPUT(m_LogOutput, + "Uploaded block {} ({}) containing {} chunks", + BlockHash, + NiceBytes(CompressedBlockSize), + OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); + } + + if (m_Storage.BuildCacheStorage) + { + m_Storage.BuildCacheStorage->PutBlobMetadatas(m_BuildId, + std::vector<IoHash>({BlockHash}), + std::vector<CbObject>({BlockMetaData})); + } + + bool MetadataSucceeded = + m_Storage.BuildStorage->PutBlockMetadata(m_BuildId, BlockHash, BlockMetaData); + if (MetadataSucceeded) + { + if (m_Options.IsVerbose) + { + LOG_OUTPUT(m_LogOutput, + "Uploaded block {} metadata ({})", + BlockHash, + NiceBytes(BlockMetaData.GetSize())); + } + + OutBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; + m_UploadStats.BlocksBytes += BlockMetaData.GetSize(); + } + + m_UploadStats.BlockCount++; + if (m_UploadStats.BlockCount == NewBlockCount) + { + FilteredUploadedBytesPerSecond.Stop(); + } + } + } + }); + } + } + } + }); + } + + Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); + + FilteredGeneratedBytesPerSecond.Update(m_GenerateBlocksStats.GeneratedBlockByteCount.load()); + FilteredUploadedBytesPerSecond.Update(m_UploadStats.BlocksBytes.load()); + + std::string Details = fmt::format("Generated {}/{} ({}, {}B/s). Uploaded {}/{} ({}, {}bits/s)", + m_GenerateBlocksStats.GeneratedBlockCount.load(), + NewBlockCount, + NiceBytes(m_GenerateBlocksStats.GeneratedBlockByteCount.load()), + NiceNum(FilteredGeneratedBytesPerSecond.GetCurrent()), + m_UploadStats.BlockCount.load(), + NewBlockCount, + NiceBytes(m_UploadStats.BlocksBytes.load()), + NiceNum(FilteredUploadedBytesPerSecond.GetCurrent() * 8)); + + Progress.UpdateState({.Task = "Generating blocks", + .Details = Details, + .TotalCount = gsl::narrow<uint64_t>(NewBlockCount), + .RemainingCount = gsl::narrow<uint64_t>(NewBlockCount - m_GenerateBlocksStats.GeneratedBlockCount.load()), + .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, + false); + }); + + ZEN_ASSERT(m_AbortFlag || QueuedPendingBlocksForUpload.load() == 0); + + Progress.Finish(); + + m_GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGeneratedBytesPerSecond.GetElapsedTimeUS(); + m_UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS(); + } +} + +std::vector<uint32_t> +BuildsOperationUploadFolder::CalculateAbsoluteChunkOrders( + const std::span<const IoHash> LocalChunkHashes, + const std::span<const uint32_t> LocalChunkOrder, + const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& ChunkHashToLocalChunkIndex, + const std::span<const uint32_t>& LooseChunkIndexes, + const std::span<const ChunkBlockDescription>& BlockDescriptions) +{ + ZEN_TRACE_CPU("CalculateAbsoluteChunkOrders"); + +#if EXTRA_VERIFY + std::vector<IoHash> TmpAbsoluteChunkHashes; + TmpAbsoluteChunkHashes.reserve(LocalChunkHashes.size()); +#endif // EXTRA_VERIFY + std::vector<uint32_t> LocalChunkIndexToAbsoluteChunkIndex; + LocalChunkIndexToAbsoluteChunkIndex.resize(LocalChunkHashes.size(), (uint32_t)-1); + std::uint32_t AbsoluteChunkCount = 0; + for (uint32_t ChunkIndex : LooseChunkIndexes) + { + LocalChunkIndexToAbsoluteChunkIndex[ChunkIndex] = AbsoluteChunkCount; +#if EXTRA_VERIFY + TmpAbsoluteChunkHashes.push_back(LocalChunkHashes[ChunkIndex]); +#endif // EXTRA_VERIFY + AbsoluteChunkCount++; + } + for (const ChunkBlockDescription& Block : BlockDescriptions) + { + for (const IoHash& ChunkHash : Block.ChunkRawHashes) + { + if (auto It = ChunkHashToLocalChunkIndex.find(ChunkHash); It != ChunkHashToLocalChunkIndex.end()) + { + const uint32_t LocalChunkIndex = It->second; + ZEN_ASSERT_SLOW(LocalChunkHashes[LocalChunkIndex] == ChunkHash); + LocalChunkIndexToAbsoluteChunkIndex[LocalChunkIndex] = AbsoluteChunkCount; + } +#if EXTRA_VERIFY + TmpAbsoluteChunkHashes.push_back(ChunkHash); +#endif // EXTRA_VERIFY + AbsoluteChunkCount++; + } + } + std::vector<uint32_t> AbsoluteChunkOrder; + AbsoluteChunkOrder.reserve(LocalChunkHashes.size()); + for (const uint32_t LocalChunkIndex : LocalChunkOrder) + { + const uint32_t AbsoluteChunkIndex = LocalChunkIndexToAbsoluteChunkIndex[LocalChunkIndex]; +#if EXTRA_VERIFY + ZEN_ASSERT(LocalChunkHashes[LocalChunkIndex] == TmpAbsoluteChunkHashes[AbsoluteChunkIndex]); +#endif // EXTRA_VERIFY + AbsoluteChunkOrder.push_back(AbsoluteChunkIndex); + } +#if EXTRA_VERIFY + { + uint32_t OrderIndex = 0; + while (OrderIndex < LocalChunkOrder.size()) + { + const uint32_t LocalChunkIndex = LocalChunkOrder[OrderIndex]; + const IoHash& LocalChunkHash = LocalChunkHashes[LocalChunkIndex]; + const uint32_t AbsoluteChunkIndex = AbsoluteChunkOrder[OrderIndex]; + const IoHash& AbsoluteChunkHash = TmpAbsoluteChunkHashes[AbsoluteChunkIndex]; + ZEN_ASSERT(LocalChunkHash == AbsoluteChunkHash); + OrderIndex++; + } + } +#endif // EXTRA_VERIFY + return AbsoluteChunkOrder; +} + +void +BuildsOperationUploadFolder::CalculateLocalChunkOrders(const std::span<const uint32_t>& AbsoluteChunkOrders, + const std::span<const IoHash> LooseChunkHashes, + const std::span<const uint64_t> LooseChunkRawSizes, + const std::span<const ChunkBlockDescription>& BlockDescriptions, + std::vector<IoHash>& OutLocalChunkHashes, + std::vector<uint64_t>& OutLocalChunkRawSizes, + std::vector<uint32_t>& OutLocalChunkOrders) +{ + ZEN_TRACE_CPU("CalculateLocalChunkOrders"); + + std::vector<IoHash> AbsoluteChunkHashes; + std::vector<uint64_t> AbsoluteChunkRawSizes; + AbsoluteChunkHashes.insert(AbsoluteChunkHashes.end(), LooseChunkHashes.begin(), LooseChunkHashes.end()); + AbsoluteChunkRawSizes.insert(AbsoluteChunkRawSizes.end(), LooseChunkRawSizes.begin(), LooseChunkRawSizes.end()); + for (const ChunkBlockDescription& Block : BlockDescriptions) + { + AbsoluteChunkHashes.insert(AbsoluteChunkHashes.end(), Block.ChunkRawHashes.begin(), Block.ChunkRawHashes.end()); + AbsoluteChunkRawSizes.insert(AbsoluteChunkRawSizes.end(), Block.ChunkRawLengths.begin(), Block.ChunkRawLengths.end()); + } + OutLocalChunkHashes.reserve(AbsoluteChunkHashes.size()); + OutLocalChunkRawSizes.reserve(AbsoluteChunkRawSizes.size()); + OutLocalChunkOrders.reserve(AbsoluteChunkOrders.size()); + + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkHashToChunkIndex; + ChunkHashToChunkIndex.reserve(AbsoluteChunkHashes.size()); + + for (uint32_t AbsoluteChunkOrderIndex = 0; AbsoluteChunkOrderIndex < AbsoluteChunkOrders.size(); AbsoluteChunkOrderIndex++) + { + const uint32_t AbsoluteChunkIndex = AbsoluteChunkOrders[AbsoluteChunkOrderIndex]; + const IoHash& AbsoluteChunkHash = AbsoluteChunkHashes[AbsoluteChunkIndex]; + const uint64_t AbsoluteChunkRawSize = AbsoluteChunkRawSizes[AbsoluteChunkIndex]; + + if (auto It = ChunkHashToChunkIndex.find(AbsoluteChunkHash); It != ChunkHashToChunkIndex.end()) + { + const uint32_t LocalChunkIndex = It->second; + OutLocalChunkOrders.push_back(LocalChunkIndex); + } + else + { + uint32_t LocalChunkIndex = gsl::narrow<uint32_t>(OutLocalChunkHashes.size()); + OutLocalChunkHashes.push_back(AbsoluteChunkHash); + OutLocalChunkRawSizes.push_back(AbsoluteChunkRawSize); + OutLocalChunkOrders.push_back(LocalChunkIndex); + ChunkHashToChunkIndex.insert_or_assign(AbsoluteChunkHash, LocalChunkIndex); + } +#if EXTRA_VERIFY + const uint32_t LocalChunkIndex = OutLocalChunkOrders[AbsoluteChunkOrderIndex]; + const IoHash& LocalChunkHash = OutLocalChunkHashes[LocalChunkIndex]; + const uint64_t& LocalChunkRawSize = OutLocalChunkRawSizes[LocalChunkIndex]; + ZEN_ASSERT(LocalChunkHash == AbsoluteChunkHash); + ZEN_ASSERT(LocalChunkRawSize == AbsoluteChunkRawSize); +#endif // EXTRA_VERIFY + } +#if EXTRA_VERIFY + for (uint32_t OrderIndex = 0; OrderIndex < OutLocalChunkOrders.size(); OrderIndex++) + { + uint32_t LocalChunkIndex = OutLocalChunkOrders[OrderIndex]; + const IoHash LocalChunkHash = OutLocalChunkHashes[LocalChunkIndex]; + uint64_t LocalChunkRawSize = OutLocalChunkRawSizes[LocalChunkIndex]; + + uint32_t VerifyChunkIndex = AbsoluteChunkOrders[OrderIndex]; + const IoHash VerifyChunkHash = AbsoluteChunkHashes[VerifyChunkIndex]; + uint64_t VerifyChunkRawSize = AbsoluteChunkRawSizes[VerifyChunkIndex]; + + ZEN_ASSERT(LocalChunkHash == VerifyChunkHash); + ZEN_ASSERT(LocalChunkRawSize == VerifyChunkRawSize); + } +#endif // EXTRA_VERIFY +} + +void +BuildsOperationUploadFolder::WriteBuildContentToCompactBinary(CbObjectWriter& PartManifestWriter, + const SourcePlatform Platform, + std::span<const std::filesystem::path> Paths, + std::span<const IoHash> RawHashes, + std::span<const uint64_t> RawSizes, + std::span<const uint32_t> Attributes, + std::span<const IoHash> SequenceRawHashes, + std::span<const uint32_t> ChunkCounts, + std::span<const IoHash> LocalChunkHashes, + std::span<const uint64_t> LocalChunkRawSizes, + const std::vector<uint32_t>& AbsoluteChunkOrders, + const std::span<const uint32_t> LooseLocalChunkIndexes, + const std::span<IoHash> BlockHashes) +{ + ZEN_ASSERT(Platform != SourcePlatform::_Count); + PartManifestWriter.AddString("platform"sv, ToString(Platform)); + + uint64_t TotalSize = 0; + for (const uint64_t Size : RawSizes) + { + TotalSize += Size; + } + PartManifestWriter.AddInteger("totalSize", TotalSize); + + PartManifestWriter.BeginObject("files"sv); + { + compactbinary_helpers::WriteArray(Paths, "paths"sv, PartManifestWriter); + compactbinary_helpers::WriteArray(RawHashes, "rawhashes"sv, PartManifestWriter); + compactbinary_helpers::WriteArray(RawSizes, "rawsizes"sv, PartManifestWriter); + if (Platform == SourcePlatform::Windows) + { + compactbinary_helpers::WriteArray(Attributes, "attributes"sv, PartManifestWriter); + } + if (Platform == SourcePlatform::Linux || Platform == SourcePlatform::MacOS) + { + compactbinary_helpers::WriteArray(Attributes, "mode"sv, PartManifestWriter); + } + } + PartManifestWriter.EndObject(); // files + + PartManifestWriter.BeginObject("chunkedContent"); + { + compactbinary_helpers::WriteArray(SequenceRawHashes, "sequenceRawHashes"sv, PartManifestWriter); + compactbinary_helpers::WriteArray(ChunkCounts, "chunkcounts"sv, PartManifestWriter); + compactbinary_helpers::WriteArray(AbsoluteChunkOrders, "chunkorders"sv, PartManifestWriter); + } + PartManifestWriter.EndObject(); // chunkedContent + + size_t LooseChunkCount = LooseLocalChunkIndexes.size(); + if (LooseChunkCount > 0) + { + PartManifestWriter.BeginObject("chunkAttachments"); + { + PartManifestWriter.BeginArray("rawHashes"sv); + for (uint32_t ChunkIndex : LooseLocalChunkIndexes) + { + PartManifestWriter.AddBinaryAttachment(LocalChunkHashes[ChunkIndex]); + } + PartManifestWriter.EndArray(); // rawHashes + + PartManifestWriter.BeginArray("chunkRawSizes"sv); + for (uint32_t ChunkIndex : LooseLocalChunkIndexes) + { + PartManifestWriter.AddInteger(LocalChunkRawSizes[ChunkIndex]); + } + PartManifestWriter.EndArray(); // chunkSizes + } + PartManifestWriter.EndObject(); // + } + + if (BlockHashes.size() > 0) + { + PartManifestWriter.BeginObject("blockAttachments"); + { + compactbinary_helpers::WriteBinaryAttachmentArray(BlockHashes, "rawHashes"sv, PartManifestWriter); + } + PartManifestWriter.EndObject(); // blocks + } +} + +CompositeBuffer +BuildsOperationUploadFolder::FetchChunk(const ChunkedFolderContent& Content, + const ChunkedContentLookup& Lookup, + const IoHash& ChunkHash, + ReadFileCache& OpenFileCache) +{ + ZEN_TRACE_CPU("FetchChunk"); + auto It = Lookup.ChunkHashToChunkIndex.find(ChunkHash); + ZEN_ASSERT(It != Lookup.ChunkHashToChunkIndex.end()); + uint32_t ChunkIndex = It->second; + std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkLocations = GetChunkSequenceLocations(Lookup, ChunkIndex); + ZEN_ASSERT(!ChunkLocations.empty()); + CompositeBuffer Chunk = + OpenFileCache.GetRange(ChunkLocations[0].SequenceIndex, ChunkLocations[0].Offset, Content.ChunkedContent.ChunkRawSizes[ChunkIndex]); + ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == ChunkHash); + return Chunk; +}; + +CompressedBuffer +BuildsOperationUploadFolder::GenerateBlock(const ChunkedFolderContent& Content, + const ChunkedContentLookup& Lookup, + const std::vector<uint32_t>& ChunksInBlock, + ChunkBlockDescription& OutBlockDescription) +{ + ZEN_TRACE_CPU("GenerateBlock"); + ReadFileCache OpenFileCache(m_DiskStats, m_Path, Content, Lookup, 4); + + std::vector<std::pair<IoHash, FetchChunkFunc>> BlockContent; + BlockContent.reserve(ChunksInBlock.size()); + for (uint32_t ChunkIndex : ChunksInBlock) + { + BlockContent.emplace_back(std::make_pair( + Content.ChunkedContent.ChunkHashes[ChunkIndex], + [this, &Content, &Lookup, &OpenFileCache, ChunkIndex](const IoHash& ChunkHash) -> std::pair<uint64_t, CompressedBuffer> { + CompositeBuffer Chunk = FetchChunk(Content, Lookup, ChunkHash, OpenFileCache); + if (!Chunk) + { + ZEN_ASSERT(false); + } + uint64_t RawSize = Chunk.GetSize(); + const bool ShouldCompressChunk = Lookup.RawHashToSequenceIndex.contains(ChunkHash) && + (RawSize >= m_Options.MinimumSizeForCompressInBlock) && + IsChunkCompressable(Content, Lookup, ChunkIndex); + const OodleCompressionLevel CompressionLevel = + ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None; + return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid, CompressionLevel)}; + })); + } + + return GenerateChunkBlock(std::move(BlockContent), OutBlockDescription); +}; + +CompressedBuffer +BuildsOperationUploadFolder::RebuildBlock(const ChunkedFolderContent& Content, + const ChunkedContentLookup& Lookup, + CompositeBuffer&& HeaderBuffer, + const std::vector<uint32_t>& ChunksInBlock) +{ + ZEN_TRACE_CPU("RebuildBlock"); + ReadFileCache OpenFileCache(m_DiskStats, m_Path, Content, Lookup, 4); + + std::vector<SharedBuffer> ResultBuffers; + ResultBuffers.reserve(HeaderBuffer.GetSegments().size() + ChunksInBlock.size()); + ResultBuffers.insert(ResultBuffers.end(), HeaderBuffer.GetSegments().begin(), HeaderBuffer.GetSegments().end()); + for (uint32_t ChunkIndex : ChunksInBlock) + { + std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkLocations = GetChunkSequenceLocations(Lookup, ChunkIndex); + ZEN_ASSERT(!ChunkLocations.empty()); + const IoHash& ChunkHash = Content.ChunkedContent.ChunkHashes[ChunkIndex]; + CompositeBuffer Chunk = OpenFileCache.GetRange(ChunkLocations[0].SequenceIndex, + ChunkLocations[0].Offset, + Content.ChunkedContent.ChunkRawSizes[ChunkIndex]); + ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == ChunkHash); + + const uint64_t RawSize = Chunk.GetSize(); + const bool ShouldCompressChunk = Lookup.RawHashToSequenceIndex.contains(ChunkHash) && + (RawSize >= m_Options.MinimumSizeForCompressInBlock) && + IsChunkCompressable(Content, Lookup, ChunkIndex); + const OodleCompressionLevel CompressionLevel = ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None; + + CompositeBuffer CompressedChunk = + CompressedBuffer::Compress(std::move(Chunk), OodleCompressor::Mermaid, CompressionLevel).GetCompressed(); + ResultBuffers.insert(ResultBuffers.end(), CompressedChunk.GetSegments().begin(), CompressedChunk.GetSegments().end()); + } + return CompressedBuffer::FromCompressedNoValidate(CompositeBuffer(std::move(ResultBuffers))); +}; + +void +BuildsOperationUploadFolder::UploadPartBlobs(const ChunkedFolderContent& Content, + const ChunkedContentLookup& Lookup, + std::span<IoHash> RawHashes, + const std::vector<std::vector<uint32_t>>& NewBlockChunks, + GeneratedBlocks& NewBlocks, + std::span<const uint32_t> LooseChunkIndexes, + const std::uint64_t LargeAttachmentSize, + UploadStatistics& TempUploadStats, + LooseChunksStatistics& TempLooseChunksStats) +{ + ZEN_TRACE_CPU("UploadPartBlobs"); + { + std::unique_ptr<BuildOpLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Upload Blobs")); + BuildOpLogOutput::ProgressBar& Progress(*ProgressBarPtr); + + WorkerThreadPool& ReadChunkPool = m_IOWorkerPool; + WorkerThreadPool& UploadChunkPool = m_NetworkPool; + + FilteredRate FilteredGenerateBlockBytesPerSecond; + FilteredRate FilteredCompressedBytesPerSecond; + FilteredRate FilteredUploadedBytesPerSecond; + + ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + std::atomic<size_t> UploadedBlockSize = 0; + std::atomic<size_t> UploadedBlockCount = 0; + std::atomic<size_t> UploadedRawChunkSize = 0; + std::atomic<size_t> UploadedCompressedChunkSize = 0; + std::atomic<uint32_t> UploadedChunkCount = 0; + + tsl::robin_map<uint32_t, uint32_t> ChunkIndexToLooseChunkOrderIndex; + ChunkIndexToLooseChunkOrderIndex.reserve(LooseChunkIndexes.size()); + for (uint32_t OrderIndex = 0; OrderIndex < LooseChunkIndexes.size(); OrderIndex++) + { + ChunkIndexToLooseChunkOrderIndex.insert_or_assign(LooseChunkIndexes[OrderIndex], OrderIndex); + } + + std::vector<size_t> BlockIndexes; + std::vector<uint32_t> LooseChunkOrderIndexes; + + uint64_t TotalLooseChunksSize = 0; + uint64_t TotalBlocksSize = 0; + for (const IoHash& RawHash : RawHashes) + { + if (auto It = NewBlocks.BlockHashToBlockIndex.find(RawHash); It != NewBlocks.BlockHashToBlockIndex.end()) + { + BlockIndexes.push_back(It->second); + TotalBlocksSize += NewBlocks.BlockSizes[It->second]; + } + else if (auto ChunkIndexIt = Lookup.ChunkHashToChunkIndex.find(RawHash); ChunkIndexIt != Lookup.ChunkHashToChunkIndex.end()) + { + const uint32_t ChunkIndex = ChunkIndexIt->second; + if (auto LooseOrderIndexIt = ChunkIndexToLooseChunkOrderIndex.find(ChunkIndex); + LooseOrderIndexIt != ChunkIndexToLooseChunkOrderIndex.end()) + { + LooseChunkOrderIndexes.push_back(LooseOrderIndexIt->second); + TotalLooseChunksSize += Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; + } + } + else + { + ZEN_CONSOLE_WARN( + "Build blob {} was reported as needed for upload but it was reported as existing at the start of the " + "operation. Treating it as a transient inconsistent issue and will attempt to retry finalization", + RawHash); + } + } + uint64_t TotalRawSize = TotalLooseChunksSize + TotalBlocksSize; + + const size_t UploadBlockCount = BlockIndexes.size(); + const uint32_t UploadChunkCount = gsl::narrow<uint32_t>(LooseChunkOrderIndexes.size()); + + auto AsyncUploadBlock = [this, + &Work, + &NewBlocks, + UploadBlockCount, + &UploadedBlockCount, + UploadChunkCount, + &UploadedChunkCount, + &UploadedBlockSize, + &TempUploadStats, + &FilteredUploadedBytesPerSecond, + &UploadChunkPool](const size_t BlockIndex, + const IoHash BlockHash, + CompositeBuffer&& Payload, + std::atomic<uint64_t>& QueuedPendingInMemoryBlocksForUpload) { + bool IsInMemoryBlock = true; + if (QueuedPendingInMemoryBlocksForUpload.load() > 16) + { + ZEN_TRACE_CPU("AsyncUploadBlock_WriteTempBlock"); + std::filesystem::path TempFilePath = m_Options.TempDir / (BlockHash.ToHexString()); + Payload = CompositeBuffer(WriteToTempFile(std::move(Payload), TempFilePath)); + IsInMemoryBlock = false; + } + else + { + QueuedPendingInMemoryBlocksForUpload++; + } + + Work.ScheduleWork( + UploadChunkPool, + [this, + &QueuedPendingInMemoryBlocksForUpload, + &NewBlocks, + UploadBlockCount, + &UploadedBlockCount, + UploadChunkCount, + &UploadedChunkCount, + &UploadedBlockSize, + &TempUploadStats, + &FilteredUploadedBytesPerSecond, + IsInMemoryBlock, + BlockIndex, + BlockHash, + Payload = std::move(Payload)](std::atomic<bool>&) mutable { + auto _ = MakeGuard([IsInMemoryBlock, &QueuedPendingInMemoryBlocksForUpload] { + if (IsInMemoryBlock) + { + QueuedPendingInMemoryBlocksForUpload--; + } + }); + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("AsyncUploadBlock"); + + const uint64_t PayloadSize = Payload.GetSize(); + + FilteredUploadedBytesPerSecond.Start(); + const CbObject BlockMetaData = + BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]); + + if (m_Storage.BuildCacheStorage) + { + m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload); + } + m_Storage.BuildStorage->PutBuildBlob(m_BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload); + if (m_Options.IsVerbose) + { + LOG_OUTPUT(m_LogOutput, + "Uploaded block {} ({}) containing {} chunks", + BlockHash, + NiceBytes(PayloadSize), + NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); + } + UploadedBlockSize += PayloadSize; + TempUploadStats.BlocksBytes += PayloadSize; + + if (m_Storage.BuildCacheStorage) + { + m_Storage.BuildCacheStorage->PutBlobMetadatas(m_BuildId, + std::vector<IoHash>({BlockHash}), + std::vector<CbObject>({BlockMetaData})); + } + bool MetadataSucceeded = m_Storage.BuildStorage->PutBlockMetadata(m_BuildId, BlockHash, BlockMetaData); + if (MetadataSucceeded) + { + if (m_Options.IsVerbose) + { + LOG_OUTPUT(m_LogOutput, "Uploaded block {} metadata ({})", BlockHash, NiceBytes(BlockMetaData.GetSize())); + } + + NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; + TempUploadStats.BlocksBytes += BlockMetaData.GetSize(); + } + + TempUploadStats.BlockCount++; + + UploadedBlockCount++; + if (UploadedBlockCount == UploadBlockCount && UploadedChunkCount == UploadChunkCount) + { + FilteredUploadedBytesPerSecond.Stop(); + } + } + }); + }; + + auto AsyncUploadLooseChunk = [this, + LargeAttachmentSize, + &Work, + &UploadChunkPool, + &FilteredUploadedBytesPerSecond, + &UploadedBlockCount, + &UploadedChunkCount, + UploadBlockCount, + UploadChunkCount, + &UploadedCompressedChunkSize, + &UploadedRawChunkSize, + &TempUploadStats](const IoHash& RawHash, uint64_t RawSize, CompositeBuffer&& Payload) { + Work.ScheduleWork( + UploadChunkPool, + [this, + &Work, + LargeAttachmentSize, + &FilteredUploadedBytesPerSecond, + &UploadChunkPool, + &UploadedBlockCount, + &UploadedChunkCount, + UploadBlockCount, + UploadChunkCount, + &UploadedCompressedChunkSize, + &UploadedRawChunkSize, + &TempUploadStats, + RawHash, + RawSize, + Payload = CompositeBuffer(std::move(Payload))](std::atomic<bool>&) mutable { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("AsyncUploadLooseChunk"); + + const uint64_t PayloadSize = Payload.GetSize(); + + if (m_Storage.BuildCacheStorage) + { + m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); + } + + if (PayloadSize >= LargeAttachmentSize) + { + ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart"); + TempUploadStats.MultipartAttachmentCount++; + std::vector<std::function<void()>> MultipartWork = m_Storage.BuildStorage->PutLargeBuildBlob( + m_BuildId, + RawHash, + ZenContentType::kCompressedBinary, + PayloadSize, + [Payload = std::move(Payload), &FilteredUploadedBytesPerSecond](uint64_t Offset, + uint64_t Size) mutable -> IoBuffer { + FilteredUploadedBytesPerSecond.Start(); + + IoBuffer PartPayload = Payload.Mid(Offset, Size).Flatten().AsIoBuffer(); + PartPayload.SetContentType(ZenContentType::kBinary); + return PartPayload; + }, + [RawSize, + &TempUploadStats, + &UploadedCompressedChunkSize, + &UploadChunkPool, + &UploadedBlockCount, + UploadBlockCount, + &UploadedChunkCount, + UploadChunkCount, + &FilteredUploadedBytesPerSecond, + &UploadedRawChunkSize](uint64_t SentBytes, bool IsComplete) { + TempUploadStats.ChunksBytes += SentBytes; + UploadedCompressedChunkSize += SentBytes; + if (IsComplete) + { + TempUploadStats.ChunkCount++; + UploadedChunkCount++; + if (UploadedBlockCount == UploadBlockCount && UploadedChunkCount == UploadChunkCount) + { + FilteredUploadedBytesPerSecond.Stop(); + } + UploadedRawChunkSize += RawSize; + } + }); + for (auto& WorkPart : MultipartWork) + { + Work.ScheduleWork(UploadChunkPool, [Work = std::move(WorkPart)](std::atomic<bool>& AbortFlag) { + ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart_Work"); + if (!AbortFlag) + { + Work(); + } + }); + } + if (m_Options.IsVerbose) + { + LOG_OUTPUT(m_LogOutput, "Uploaded multipart chunk {} ({})", RawHash, NiceBytes(PayloadSize)); + } + } + else + { + ZEN_TRACE_CPU("AsyncUploadLooseChunk_Singlepart"); + m_Storage.BuildStorage->PutBuildBlob(m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); + if (m_Options.IsVerbose) + { + LOG_OUTPUT(m_LogOutput, "Uploaded chunk {} ({})", RawHash, NiceBytes(PayloadSize)); + } + TempUploadStats.ChunksBytes += Payload.GetSize(); + TempUploadStats.ChunkCount++; + UploadedCompressedChunkSize += Payload.GetSize(); + UploadedRawChunkSize += RawSize; + UploadedChunkCount++; + if (UploadedChunkCount == UploadChunkCount) + { + FilteredUploadedBytesPerSecond.Stop(); + } + } + } + }); + }; + + std::vector<size_t> GenerateBlockIndexes; + + std::atomic<uint64_t> GeneratedBlockCount = 0; + std::atomic<uint64_t> GeneratedBlockByteCount = 0; + + std::atomic<uint64_t> QueuedPendingInMemoryBlocksForUpload = 0; + + // Start generation of any non-prebuilt blocks and schedule upload + for (const size_t BlockIndex : BlockIndexes) + { + const IoHash& BlockHash = NewBlocks.BlockDescriptions[BlockIndex].BlockHash; + if (!m_AbortFlag) + { + Work.ScheduleWork( + ReadChunkPool, + [this, + BlockHash = IoHash(BlockHash), + BlockIndex, + &FilteredGenerateBlockBytesPerSecond, + &Content, + &Lookup, + &NewBlocks, + &NewBlockChunks, + &GenerateBlockIndexes, + &GeneratedBlockCount, + &GeneratedBlockByteCount, + &AsyncUploadBlock, + &QueuedPendingInMemoryBlocksForUpload](std::atomic<bool>&) { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("UploadPartBlobs_GenerateBlock"); + + FilteredGenerateBlockBytesPerSecond.Start(); + + Stopwatch GenerateTimer; + CompositeBuffer Payload; + if (NewBlocks.BlockHeaders[BlockIndex]) + { + Payload = + RebuildBlock(Content, Lookup, std::move(NewBlocks.BlockHeaders[BlockIndex]), NewBlockChunks[BlockIndex]) + .GetCompressed(); + } + else + { + ChunkBlockDescription BlockDescription; + CompressedBuffer CompressedBlock = + GenerateBlock(Content, Lookup, NewBlockChunks[BlockIndex], BlockDescription); + if (!CompressedBlock) + { + throw std::runtime_error(fmt::format("Failed generating block {}", BlockHash)); + } + ZEN_ASSERT(BlockDescription.BlockHash == BlockHash); + Payload = std::move(CompressedBlock).GetCompressed(); + } + + GeneratedBlockByteCount += NewBlocks.BlockSizes[BlockIndex]; + GeneratedBlockCount++; + if (GeneratedBlockCount == GenerateBlockIndexes.size()) + { + FilteredGenerateBlockBytesPerSecond.Stop(); + } + if (m_Options.IsVerbose) + { + LOG_OUTPUT(m_LogOutput, + "{} block {} ({}) containing {} chunks in {}", + NewBlocks.BlockHeaders[BlockIndex] ? "Regenerated" : "Generated", + NewBlocks.BlockDescriptions[BlockIndex].BlockHash, + NiceBytes(NewBlocks.BlockSizes[BlockIndex]), + NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size(), + NiceTimeSpanMs(GenerateTimer.GetElapsedTimeMs())); + } + if (!m_AbortFlag) + { + AsyncUploadBlock(BlockIndex, BlockHash, std::move(Payload), QueuedPendingInMemoryBlocksForUpload); + } + } + }); + } + } + + // Start compression of any non-precompressed loose chunks and schedule upload + for (const uint32_t LooseChunkOrderIndex : LooseChunkOrderIndexes) + { + const uint32_t ChunkIndex = LooseChunkIndexes[LooseChunkOrderIndex]; + Work.ScheduleWork( + ReadChunkPool, + [this, + &Content, + &Lookup, + &TempLooseChunksStats, + &LooseChunkOrderIndexes, + &FilteredCompressedBytesPerSecond, + &TempUploadStats, + &AsyncUploadLooseChunk, + ChunkIndex](std::atomic<bool>&) { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("UploadPartBlobs_CompressChunk"); + + FilteredCompressedBytesPerSecond.Start(); + Stopwatch CompressTimer; + CompositeBuffer Payload = CompressChunk(Content, Lookup, ChunkIndex, TempLooseChunksStats); + if (m_Options.IsVerbose) + { + LOG_OUTPUT(m_LogOutput, + "Compressed chunk {} ({} -> {}) in {}", + Content.ChunkedContent.ChunkHashes[ChunkIndex], + NiceBytes(Content.ChunkedContent.ChunkRawSizes[ChunkIndex]), + NiceBytes(Payload.GetSize()), + NiceTimeSpanMs(CompressTimer.GetElapsedTimeMs())); + } + const uint64_t ChunkRawSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; + TempUploadStats.ReadFromDiskBytes += ChunkRawSize; + if (TempLooseChunksStats.CompressedChunkCount == LooseChunkOrderIndexes.size()) + { + FilteredCompressedBytesPerSecond.Stop(); + } + if (!m_AbortFlag) + { + AsyncUploadLooseChunk(Content.ChunkedContent.ChunkHashes[ChunkIndex], ChunkRawSize, std::move(Payload)); + } + } + }); + } + + Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); + FilteredCompressedBytesPerSecond.Update(TempLooseChunksStats.CompressedChunkRawBytes.load()); + FilteredGenerateBlockBytesPerSecond.Update(GeneratedBlockByteCount.load()); + FilteredUploadedBytesPerSecond.Update(UploadedCompressedChunkSize.load() + UploadedBlockSize.load()); + uint64_t UploadedRawSize = UploadedRawChunkSize.load() + UploadedBlockSize.load(); + uint64_t UploadedCompressedSize = UploadedCompressedChunkSize.load() + UploadedBlockSize.load(); + + std::string Details = fmt::format( + "Compressed {}/{} ({}/{} {}B/s) chunks. " + "Uploaded {}/{} ({}/{}) blobs " + "({} {}bits/s)", + TempLooseChunksStats.CompressedChunkCount.load(), + LooseChunkOrderIndexes.size(), + NiceBytes(TempLooseChunksStats.CompressedChunkRawBytes), + NiceBytes(TotalLooseChunksSize), + NiceNum(FilteredCompressedBytesPerSecond.GetCurrent()), + + UploadedBlockCount.load() + UploadedChunkCount.load(), + UploadBlockCount + UploadChunkCount, + NiceBytes(UploadedRawSize), + NiceBytes(TotalRawSize), + + NiceBytes(UploadedCompressedSize), + NiceNum(FilteredUploadedBytesPerSecond.GetCurrent())); + + Progress.UpdateState({.Task = "Uploading blobs ", + .Details = Details, + .TotalCount = gsl::narrow<uint64_t>(TotalRawSize), + .RemainingCount = gsl::narrow<uint64_t>(TotalRawSize - UploadedRawSize), + .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, + false); + }); + + ZEN_ASSERT(m_AbortFlag || QueuedPendingInMemoryBlocksForUpload.load() == 0); + + Progress.Finish(); + + TempUploadStats.ElapsedWallTimeUS += FilteredUploadedBytesPerSecond.GetElapsedTimeUS(); + TempLooseChunksStats.CompressChunksElapsedWallTimeUS += FilteredCompressedBytesPerSecond.GetElapsedTimeUS(); + } +} + +CompositeBuffer +BuildsOperationUploadFolder::CompressChunk(const ChunkedFolderContent& Content, + const ChunkedContentLookup& Lookup, + uint32_t ChunkIndex, + LooseChunksStatistics& TempLooseChunksStats) +{ + ZEN_TRACE_CPU("CompressChunk"); + ZEN_ASSERT(!m_Options.TempDir.empty()); + const IoHash& ChunkHash = Content.ChunkedContent.ChunkHashes[ChunkIndex]; + const uint64_t ChunkSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; + + const ChunkedContentLookup::ChunkSequenceLocation& Source = GetChunkSequenceLocations(Lookup, ChunkIndex)[0]; + const std::uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[Source.SequenceIndex]; + IoBuffer RawSource = IoBufferBuilder::MakeFromFile((m_Path / Content.Paths[PathIndex]).make_preferred(), Source.Offset, ChunkSize); + if (!RawSource) + { + throw std::runtime_error(fmt::format("Failed fetching chunk {}", ChunkHash)); + } + if (RawSource.GetSize() != ChunkSize) + { + throw std::runtime_error(fmt::format("Fetched chunk {} has invalid size", ChunkHash)); + } + + const bool ShouldCompressChunk = IsChunkCompressable(Content, Lookup, ChunkIndex); + const OodleCompressionLevel CompressionLevel = ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None; + + if (ShouldCompressChunk) + { + std::filesystem::path TempFilePath = m_Options.TempDir / ChunkHash.ToHexString(); + + BasicFile CompressedFile; + std::error_code Ec; + CompressedFile.Open(TempFilePath, BasicFile::Mode::kTruncateDelete, Ec); + if (Ec) + { + throw std::runtime_error( + fmt::format("Failed creating temporary file for compressing blob {}. Reason: {}", ChunkHash, Ec.message())); + } + + uint64_t StreamRawBytes = 0; + uint64_t StreamCompressedBytes = 0; + + bool CouldCompress = CompressedBuffer::CompressToStream( + CompositeBuffer(SharedBuffer(RawSource)), + [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { + ZEN_UNUSED(SourceOffset); + TempLooseChunksStats.CompressedChunkRawBytes += SourceSize; + CompressedFile.Write(RangeBuffer, Offset); + TempLooseChunksStats.CompressedChunkBytes += RangeBuffer.GetSize(); + StreamRawBytes += SourceSize; + StreamCompressedBytes += RangeBuffer.GetSize(); + }, + OodleCompressor::Mermaid, + CompressionLevel); + if (CouldCompress) + { + uint64_t CompressedSize = CompressedFile.FileSize(); + void* FileHandle = CompressedFile.Detach(); + IoBuffer TempPayload = IoBuffer(IoBuffer::File, + FileHandle, + 0, + CompressedSize, + /*IsWholeFile*/ true); + ZEN_ASSERT(TempPayload); + TempPayload.SetDeleteOnClose(true); + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(TempPayload), RawHash, RawSize); + ZEN_ASSERT(Compressed); + ZEN_ASSERT(RawHash == ChunkHash); + ZEN_ASSERT(RawSize == ChunkSize); + + TempLooseChunksStats.CompressedChunkCount++; + + return Compressed.GetCompressed(); + } + else + { + TempLooseChunksStats.CompressedChunkRawBytes -= StreamRawBytes; + TempLooseChunksStats.CompressedChunkBytes -= StreamCompressedBytes; + } + CompressedFile.Close(); + RemoveFile(TempFilePath, Ec); + ZEN_UNUSED(Ec); + } + + CompressedBuffer CompressedBlob = + CompressedBuffer::Compress(SharedBuffer(std::move(RawSource)), OodleCompressor::Mermaid, CompressionLevel); + if (!CompressedBlob) + { + throw std::runtime_error(fmt::format("Failed to compress large blob {}", ChunkHash)); + } + ZEN_ASSERT_SLOW(CompressedBlob.DecodeRawHash() == ChunkHash); + ZEN_ASSERT_SLOW(CompressedBlob.DecodeRawSize() == ChunkSize); + + TempLooseChunksStats.CompressedChunkRawBytes += ChunkSize; + TempLooseChunksStats.CompressedChunkBytes += CompressedBlob.GetCompressedSize(); + + // If we use none-compression, the compressed blob references the data and has 64 kb in memory so we don't need to write it to disk + if (ShouldCompressChunk) + { + std::filesystem::path TempFilePath = m_Options.TempDir / (ChunkHash.ToHexString()); + IoBuffer TempPayload = WriteToTempFile(std::move(CompressedBlob).GetCompressed(), TempFilePath); + CompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(TempPayload)); + } + + TempLooseChunksStats.CompressedChunkCount++; + return std::move(CompressedBlob).GetCompressed(); +} + } // namespace zen diff --git a/src/zenremotestore/builds/filebuildstorage.cpp b/src/zenremotestore/builds/filebuildstorage.cpp index 5cfd80666..3cda5f00f 100644 --- a/src/zenremotestore/builds/filebuildstorage.cpp +++ b/src/zenremotestore/builds/filebuildstorage.cpp @@ -187,7 +187,11 @@ public: CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView(); CbObjectView MetaDataView = BuildObject["metadata"sv].AsObjectView(); - Writer.AddObject("metadata"sv, MetaDataView); + for (CbFieldView InnerField : MetaDataView) + { + Writer.AddField(InnerField.GetName(), InnerField); + } + Writer.BeginObject("parts"sv); { for (CbFieldView PartView : PartsObject) diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h index 0e719edc6..8ba32127a 100644 --- a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h +++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h @@ -22,6 +22,7 @@ class HttpClient; class ParallelWork; class WorkerThreadPool; class FilteredRate; +class ReadFileCache; class BufferedWriteFileCache; struct ChunkBlockDescription; @@ -192,8 +193,8 @@ public: bool PrimeCacheOnly = false; bool EnableOtherDownloadsScavenging = true; bool EnableTargetFolderScavenging = true; - std::vector<std::string> DefaultExcludeFolders; - std::vector<std::string> DefaultExcludeExtensions; + std::vector<std::string> ExcludeFolders; + std::vector<std::string> ExcludeExtensions; }; BuildsOperationUpdateFolder(BuildOpLogOutput& LogOutput, @@ -210,11 +211,11 @@ public: const ChunkedContentLookup& RemoteLookup, const std::vector<ChunkBlockDescription>& BlockDescriptions, const std::vector<IoHash>& LooseChunkHashes, - const Options& Options, - DiskStatistics& DiskStats); + const Options& Options); void Execute(FolderContent& OutLocalFolderState); + DiskStatistics m_DiskStats; CacheMappingStatistics m_CacheMappingStats; GetFolderContentStatistics m_ScavengedFolderScanStats; DownloadStatistics m_DownloadStats; @@ -372,10 +373,244 @@ private: const std::vector<ChunkBlockDescription>& m_BlockDescriptions; const std::vector<IoHash>& m_LooseChunkHashes; const Options m_Options; - DiskStatistics& m_DiskStats; const std::filesystem::path m_CacheFolderPath; }; +struct FindBlocksStatistics +{ + uint64_t FindBlockTimeMS = 0; + uint64_t PotentialChunkCount = 0; + uint64_t PotentialChunkByteCount = 0; + uint64_t FoundBlockCount = 0; + uint64_t FoundBlockChunkCount = 0; + uint64_t FoundBlockByteCount = 0; + uint64_t AcceptedBlockCount = 0; + uint64_t AcceptedChunkCount = 0; + uint64_t AcceptedByteCount = 0; + uint64_t AcceptedRawByteCount = 0; + uint64_t RejectedBlockCount = 0; + uint64_t RejectedChunkCount = 0; + uint64_t RejectedByteCount = 0; + uint64_t AcceptedReduntantChunkCount = 0; + uint64_t AcceptedReduntantByteCount = 0; + uint64_t NewBlocksCount = 0; + uint64_t NewBlocksChunkCount = 0; + uint64_t NewBlocksChunkByteCount = 0; +}; + +struct UploadStatistics +{ + std::atomic<uint64_t> BlockCount = 0; + std::atomic<uint64_t> BlocksBytes = 0; + std::atomic<uint64_t> ChunkCount = 0; + std::atomic<uint64_t> ChunksBytes = 0; + std::atomic<uint64_t> ReadFromDiskBytes = 0; + std::atomic<uint64_t> MultipartAttachmentCount = 0; + uint64_t ElapsedWallTimeUS = 0; + + UploadStatistics& operator+=(const UploadStatistics& Rhs) + { + BlockCount += Rhs.BlockCount; + BlocksBytes += Rhs.BlocksBytes; + ChunkCount += Rhs.ChunkCount; + ChunksBytes += Rhs.ChunksBytes; + ReadFromDiskBytes += Rhs.ReadFromDiskBytes; + MultipartAttachmentCount += Rhs.MultipartAttachmentCount; + ElapsedWallTimeUS += Rhs.ElapsedWallTimeUS; + return *this; + } +}; + +struct LooseChunksStatistics +{ + uint64_t ChunkCount = 0; + uint64_t ChunkByteCount = 0; + std::atomic<uint64_t> CompressedChunkCount = 0; + std::atomic<uint64_t> CompressedChunkRawBytes = 0; + std::atomic<uint64_t> CompressedChunkBytes = 0; + uint64_t CompressChunksElapsedWallTimeUS = 0; + + LooseChunksStatistics& operator+=(const LooseChunksStatistics& Rhs) + { + ChunkCount += Rhs.ChunkCount; + ChunkByteCount += Rhs.ChunkByteCount; + CompressedChunkCount += Rhs.CompressedChunkCount; + CompressedChunkRawBytes += Rhs.CompressedChunkRawBytes; + CompressedChunkBytes += Rhs.CompressedChunkBytes; + CompressChunksElapsedWallTimeUS += Rhs.CompressChunksElapsedWallTimeUS; + return *this; + } +}; + +struct GenerateBlocksStatistics +{ + std::atomic<uint64_t> GeneratedBlockByteCount = 0; + std::atomic<uint64_t> GeneratedBlockCount = 0; + uint64_t GenerateBlocksElapsedWallTimeUS = 0; + + GenerateBlocksStatistics& operator+=(const GenerateBlocksStatistics& Rhs) + { + GeneratedBlockByteCount += Rhs.GeneratedBlockByteCount; + GeneratedBlockCount += Rhs.GeneratedBlockCount; + GenerateBlocksElapsedWallTimeUS += Rhs.GenerateBlocksElapsedWallTimeUS; + return *this; + } +}; + +class BuildsOperationUploadFolder +{ +public: + struct ChunksBlockParameters + { + size_t MaxBlockSize = 64u * 1024u * 1024u; + size_t MaxChunksPerBlock = 4u * 1000u; + size_t MaxChunkEmbedSize = 3u * 512u * 1024u; + }; + + struct Options + { + bool IsQuiet = false; + bool IsVerbose = false; + + const uint64_t FindBlockMaxCount = 10000; + const uint8_t BlockReuseMinPercentLimit = 85; + bool AllowMultiparts = true; + bool IgnoreExistingBlocks = false; + ChunksBlockParameters BlockParameters; + + uint32_t PreferredMultipartChunkSize = 32u * 1024u * 1024u; + + const uint64_t MinimumSizeForCompressInBlock = 2u * 1024u; + + std::filesystem::path TempDir; + std::vector<std::string> ExcludeFolders; + std::vector<std::string> ExcludeExtensions; + std::string ZenExcludeManifestName = ".zen_exclude_manifest.txt"; + }; + BuildsOperationUploadFolder(BuildOpLogOutput& LogOutput, + StorageInstance& Storage, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag, + WorkerThreadPool& IOWorkerPool, + WorkerThreadPool& NetworkPool, + const Oid& BuildId, + const Oid& BuildPartId, + const std::string_view BuildPartName, + const std::filesystem::path& Path, + const std::filesystem::path& ManifestPath, + bool CreateBuild, + const CbObject& MetaData, + const Options& Options); + + void Execute(); + + DiskStatistics m_DiskStats; + GetFolderContentStatistics m_LocalFolderScanStats; + ChunkingStatistics m_ChunkingStats; + FindBlocksStatistics m_FindBlocksStats; + UploadStatistics m_UploadStats; + GenerateBlocksStatistics m_GenerateBlocksStats; + LooseChunksStatistics m_LooseChunksStats; + +private: + std::vector<size_t> FindReuseBlocks(const std::vector<ChunkBlockDescription>& KnownBlocks, + std::span<const IoHash> ChunkHashes, + std::span<const uint32_t> ChunkIndexes, + std::vector<uint32_t>& OutUnusedChunkIndexes); + void ArrangeChunksIntoBlocks(const ChunkedFolderContent& Content, + const ChunkedContentLookup& Lookup, + std::vector<uint32_t>& ChunkIndexes, + std::vector<std::vector<uint32_t>>& OutBlocks); + struct GeneratedBlocks + { + std::vector<ChunkBlockDescription> BlockDescriptions; + std::vector<uint64_t> BlockSizes; + std::vector<CompositeBuffer> BlockHeaders; + std::vector<CbObject> BlockMetaDatas; + std::vector<uint8_t> + MetaDataHasBeenUploaded; // NOTE: Do not use std::vector<bool> here as this vector is modified by multiple threads + tsl::robin_map<IoHash, size_t, IoHash::Hasher> BlockHashToBlockIndex; + }; + + void GenerateBuildBlocks(const ChunkedFolderContent& Content, + const ChunkedContentLookup& Lookup, + const std::vector<std::vector<uint32_t>>& NewBlockChunks, + GeneratedBlocks& OutBlocks); + + std::vector<uint32_t> CalculateAbsoluteChunkOrders(const std::span<const IoHash> LocalChunkHashes, + const std::span<const uint32_t> LocalChunkOrder, + const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& ChunkHashToLocalChunkIndex, + const std::span<const uint32_t>& LooseChunkIndexes, + const std::span<const ChunkBlockDescription>& BlockDescriptions); + void CalculateLocalChunkOrders(const std::span<const uint32_t>& AbsoluteChunkOrders, + const std::span<const IoHash> LooseChunkHashes, + const std::span<const uint64_t> LooseChunkRawSizes, + const std::span<const ChunkBlockDescription>& BlockDescriptions, + std::vector<IoHash>& OutLocalChunkHashes, + std::vector<uint64_t>& OutLocalChunkRawSizes, + std::vector<uint32_t>& OutLocalChunkOrders); + + void WriteBuildContentToCompactBinary(CbObjectWriter& PartManifestWriter, + const SourcePlatform Platform, + std::span<const std::filesystem::path> Paths, + std::span<const IoHash> RawHashes, + std::span<const uint64_t> RawSizes, + std::span<const uint32_t> Attributes, + std::span<const IoHash> SequenceRawHashes, + std::span<const uint32_t> ChunkCounts, + std::span<const IoHash> LocalChunkHashes, + std::span<const uint64_t> LocalChunkRawSizes, + const std::vector<uint32_t>& AbsoluteChunkOrders, + const std::span<const uint32_t> LooseLocalChunkIndexes, + const std::span<IoHash> BlockHashes); + + CompositeBuffer FetchChunk(const ChunkedFolderContent& Content, + const ChunkedContentLookup& Lookup, + const IoHash& ChunkHash, + ReadFileCache& OpenFileCache); + + CompressedBuffer GenerateBlock(const ChunkedFolderContent& Content, + const ChunkedContentLookup& Lookup, + const std::vector<uint32_t>& ChunksInBlock, + ChunkBlockDescription& OutBlockDescription); + + CompressedBuffer RebuildBlock(const ChunkedFolderContent& Content, + const ChunkedContentLookup& Lookup, + CompositeBuffer&& HeaderBuffer, + const std::vector<uint32_t>& ChunksInBlock); + + void UploadPartBlobs(const ChunkedFolderContent& Content, + const ChunkedContentLookup& Lookup, + std::span<IoHash> RawHashes, + const std::vector<std::vector<uint32_t>>& NewBlockChunks, + GeneratedBlocks& NewBlocks, + std::span<const uint32_t> LooseChunkIndexes, + const std::uint64_t LargeAttachmentSize, + UploadStatistics& TempUploadStats, + LooseChunksStatistics& TempLooseChunksStats); + + CompositeBuffer CompressChunk(const ChunkedFolderContent& Content, + const ChunkedContentLookup& Lookup, + uint32_t ChunkIndex, + LooseChunksStatistics& TempLooseChunksStats); + + BuildOpLogOutput& m_LogOutput; + StorageInstance& m_Storage; + std::atomic<bool>& m_AbortFlag; + std::atomic<bool>& m_PauseFlag; + WorkerThreadPool& m_IOWorkerPool; + WorkerThreadPool& m_NetworkPool; + const Oid m_BuildId; + const Oid m_BuildPartId; + const std::string m_BuildPartName; + + const std::filesystem::path m_Path; + const std::filesystem::path m_ManifestPath; + const bool m_CreateBuild; // ?? Member? + const CbObject m_MetaData; // ?? Member + const Options m_Options; +}; + void DownloadLargeBlob(BuildStorage& Storage, const std::filesystem::path& DownloadFolder, const Oid& BuildId, diff --git a/src/zenutil/commandlineoptions.cpp b/src/zenutil/commandlineoptions.cpp index 040726c77..5db6d8c04 100644 --- a/src/zenutil/commandlineoptions.cpp +++ b/src/zenutil/commandlineoptions.cpp @@ -132,33 +132,6 @@ StripCommandlineQuotes(std::vector<std::string>& InOutArgs) return RawArgs; } -void -MakeSafeAbsolutePathÍnPlace(std::filesystem::path& Path) -{ - if (!Path.empty()) - { - std::filesystem::path AbsolutePath = std::filesystem::absolute(Path).make_preferred(); -#if ZEN_PLATFORM_WINDOWS - const std::string_view Prefix = "\\\\?\\"; - const std::u8string PrefixU8(Prefix.begin(), Prefix.end()); - std::u8string PathString = AbsolutePath.u8string(); - if (!PathString.empty() && !PathString.starts_with(PrefixU8)) - { - PathString.insert(0, PrefixU8); - Path = PathString; - } -#endif // ZEN_PLATFORM_WINDOWS - } -} - -std::filesystem::path -MakeSafeAbsolutePath(const std::filesystem::path& Path) -{ - std::filesystem::path Tmp(Path); - MakeSafeAbsolutePathÍnPlace(Tmp); - return Tmp; -} - std::filesystem::path StringToPath(const std::string_view& Path) { diff --git a/src/zenutil/include/zenutil/commandlineoptions.h b/src/zenutil/include/zenutil/commandlineoptions.h index f927d41e5..d6a171242 100644 --- a/src/zenutil/include/zenutil/commandlineoptions.h +++ b/src/zenutil/include/zenutil/commandlineoptions.h @@ -17,12 +17,10 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { -std::vector<std::string> ParseCommandLine(std::string_view CommandLine); -std::vector<char*> StripCommandlineQuotes(std::vector<std::string>& InOutArgs); -void MakeSafeAbsolutePathÍnPlace(std::filesystem::path& Path); -[[nodiscard]] std::filesystem::path MakeSafeAbsolutePath(const std::filesystem::path& Path); -std::filesystem::path StringToPath(const std::string_view& Path); -std::string_view RemoveQuotes(const std::string_view& Arg); +std::vector<std::string> ParseCommandLine(std::string_view CommandLine); +std::vector<char*> StripCommandlineQuotes(std::vector<std::string>& InOutArgs); +std::filesystem::path StringToPath(const std::string_view& Path); +std::string_view RemoveQuotes(const std::string_view& Arg); void commandlineoptions_forcelink(); // internal |