diff options
| author | Zousar Shaker <[email protected]> | 2025-03-27 08:44:43 -0600 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-03-27 08:44:43 -0600 |
| commit | 697ba032e65248352305920fb90024e7ba6885f3 (patch) | |
| tree | 055e4369f3691d32068407ca72c07d4f67d2303a /src/zen/cmds/builds_cmd.cpp | |
| parent | Merge branch 'main' into zs/ui-show-cook-artifacts (diff) | |
| parent | optional compress of block chunks (#326) (diff) | |
| download | archived-zen-697ba032e65248352305920fb90024e7ba6885f3.tar.xz archived-zen-697ba032e65248352305920fb90024e7ba6885f3.zip | |
Merge branch 'main' into zs/ui-show-cook-artifacts
Diffstat (limited to 'src/zen/cmds/builds_cmd.cpp')
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 187 |
1 files changed, 131 insertions, 56 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index b2ad579f1..08d30948b 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -82,9 +82,10 @@ namespace { size_t MaxChunkEmbedSize = DefaultMaxChunkEmbedSize; }; - const ChunksBlockParameters DefaultChunksBlockParams{.MaxBlockSize = 32u * 1024u * 1024u, - .MaxChunkEmbedSize = DefaultChunkedParams.MaxSize}; - + const ChunksBlockParameters DefaultChunksBlockParams{ + .MaxBlockSize = 32u * 1024u * 1024u, + .MaxChunkEmbedSize = 2u * 1024u * 1024u // DefaultChunkedParams.MaxSize + }; const uint64_t DefaultPreferredMultipartChunkSize = 32u * 1024u * 1024u; const double DefaultLatency = 0; // .0010; @@ -92,6 +93,8 @@ namespace { const bool SingleThreaded = false; + const uint64_t MinimumSizeForCompressInBlock = 2u * 1024u; + const std::string ZenFolderName = ".zen"; const std::string ZenStateFilePath = fmt::format("{}/current_state.cbo", ZenFolderName); const std::string ZenStateFileJsonPath = fmt::format("{}/current_state.json", ZenFolderName); @@ -511,6 +514,7 @@ namespace { uint64_t AcceptedBlockCount = 0; uint64_t AcceptedChunkCount = 0; uint64_t AcceptedByteCount = 0; + uint64_t AcceptedRawByteCount = 0; uint64_t RejectedBlockCount = 0; uint64_t RejectedChunkCount = 0; uint64_t RejectedByteCount = 0; @@ -549,6 +553,7 @@ namespace { uint64_t ChunkCount = 0; uint64_t ChunkByteCount = 0; std::atomic<uint64_t> CompressedChunkCount = 0; + std::atomic<uint64_t> CompressedChunkRawBytes = 0; std::atomic<uint64_t> CompressedChunkBytes = 0; uint64_t CompressChunksElapsedWallTimeUS = 0; @@ -557,6 +562,7 @@ namespace { ChunkCount += Rhs.ChunkCount; ChunkByteCount += Rhs.ChunkByteCount; CompressedChunkCount += Rhs.CompressedChunkCount; + CompressedChunkRawBytes += Rhs.CompressedChunkRawBytes; CompressedChunkBytes += Rhs.CompressedChunkBytes; CompressChunksElapsedWallTimeUS += Rhs.CompressChunksElapsedWallTimeUS; return *this; @@ -1369,7 +1375,15 @@ namespace { ZEN_ASSERT(false); } uint64_t RawSize = Chunk.GetSize(); - return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid, OodleCompressionLevel::None)}; + if (Lookup.RawHashToSequenceIndex.contains(ChunkHash) && RawSize >= MinimumSizeForCompressInBlock) + { + // Standalone chunk, not part of a sequence + return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid)}; + } + else + { + return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid, OodleCompressionLevel::None)}; + } })); } @@ -1393,13 +1407,24 @@ namespace { { std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkLocations = GetChunkSequenceLocations(Lookup, ChunkIndex); ZEN_ASSERT(!ChunkLocations.empty()); - CompositeBuffer Chunk = OpenFileCache.GetRange(ChunkLocations[0].SequenceIndex, + const IoHash& ChunkHash = Content.ChunkedContent.ChunkHashes[ChunkIndex]; + CompositeBuffer Chunk = OpenFileCache.GetRange(ChunkLocations[0].SequenceIndex, ChunkLocations[0].Offset, Content.ChunkedContent.ChunkRawSizes[ChunkIndex]); - ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == Content.ChunkedContent.ChunkHashes[ChunkIndex]); - CompositeBuffer CompressedChunk = - CompressedBuffer::Compress(std::move(Chunk), OodleCompressor::Mermaid, OodleCompressionLevel::None).GetCompressed(); - ResultBuffers.insert(ResultBuffers.end(), CompressedChunk.GetSegments().begin(), CompressedChunk.GetSegments().end()); + ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == ChunkHash); + + const uint64_t RawSize = Chunk.GetSize(); + if (Lookup.RawHashToSequenceIndex.contains(ChunkHash) && RawSize >= MinimumSizeForCompressInBlock) + { + CompositeBuffer CompressedChunk = CompressedBuffer::Compress(std::move(Chunk), OodleCompressor::Mermaid).GetCompressed(); + ResultBuffers.insert(ResultBuffers.end(), CompressedChunk.GetSegments().begin(), CompressedChunk.GetSegments().end()); + } + else + { + CompositeBuffer CompressedChunk = + CompressedBuffer::Compress(std::move(Chunk), OodleCompressor::Mermaid, OodleCompressionLevel::None).GetCompressed(); + ResultBuffers.insert(ResultBuffers.end(), CompressedChunk.GetSegments().begin(), CompressedChunk.GetSegments().end()); + } } return CompressedBuffer::FromCompressedNoValidate(CompositeBuffer(std::move(ResultBuffers))); }; @@ -1805,7 +1830,6 @@ namespace { const ChunkedContentLookup& Lookup, uint32_t ChunkIndex, const std::filesystem::path& TempFolderPath, - std::atomic<uint64_t>& ReadRawBytes, LooseChunksStatistics& LooseChunksStats) { ZEN_TRACE_CPU("CompressChunk"); @@ -1841,7 +1865,7 @@ namespace { CompositeBuffer(SharedBuffer(RawSource)), [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { ZEN_UNUSED(SourceOffset); - ReadRawBytes += SourceSize; + LooseChunksStats.CompressedChunkRawBytes += SourceSize; CompressedFile.Write(RangeBuffer, Offset); LooseChunksStats.CompressedChunkBytes += RangeBuffer.GetSize(); }); @@ -2330,16 +2354,8 @@ namespace { std::atomic<uint64_t> GeneratedBlockCount = 0; std::atomic<uint64_t> GeneratedBlockByteCount = 0; - std::vector<uint32_t> CompressLooseChunkOrderIndexes; - std::atomic<uint64_t> QueuedPendingInMemoryBlocksForUpload = 0; - // Start upload of any pre-compressed loose chunks - for (const uint32_t LooseChunkOrderIndex : LooseChunkOrderIndexes) - { - CompressLooseChunkOrderIndexes.push_back(LooseChunkOrderIndex); - } - // Start generation of any non-prebuilt blocks and schedule upload for (const size_t BlockIndex : BlockIndexes) { @@ -2399,12 +2415,10 @@ namespace { } } - std::atomic<uint64_t> RawLooseChunkByteCount = 0; - // Start compression of any non-precompressed loose chunks and schedule upload - for (const uint32_t CompressLooseChunkOrderIndex : CompressLooseChunkOrderIndexes) + for (const uint32_t LooseChunkOrderIndex : LooseChunkOrderIndexes) { - const uint32_t ChunkIndex = LooseChunkIndexes[CompressLooseChunkOrderIndex]; + const uint32_t ChunkIndex = LooseChunkIndexes[LooseChunkOrderIndex]; Work.ScheduleWork( SingleThreaded ? GetSyncWorkerPool() : ReadChunkPool, [&, ChunkIndex](std::atomic<bool>&) { @@ -2413,20 +2427,15 @@ namespace { ZEN_TRACE_CPU("UploadPartBlobs_CompressChunk"); FilteredCompressedBytesPerSecond.Start(); - CompositeBuffer Payload = CompressChunk(Path, - Content, - Lookup, - ChunkIndex, - Path / ZenTempChunkFolderName, - RawLooseChunkByteCount, - LooseChunksStats); + CompositeBuffer Payload = + CompressChunk(Path, Content, Lookup, ChunkIndex, Path / ZenTempChunkFolderName, LooseChunksStats); ZEN_CONSOLE_VERBOSE("Compressed chunk {} ({} -> {})", Content.ChunkedContent.ChunkHashes[ChunkIndex], NiceBytes(Content.ChunkedContent.ChunkRawSizes[ChunkIndex]), NiceBytes(Payload.GetSize())); const uint64_t ChunkRawSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; UploadStats.ReadFromDiskBytes += ChunkRawSize; - if (LooseChunksStats.CompressedChunkCount == CompressLooseChunkOrderIndexes.size()) + if (LooseChunksStats.CompressedChunkCount == LooseChunkOrderIndexes.size()) { FilteredCompressedBytesPerSecond.Stop(); } @@ -2441,7 +2450,7 @@ namespace { Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted, PendingWork); - FilteredCompressedBytesPerSecond.Update(LooseChunksStats.CompressedChunkBytes.load()); + FilteredCompressedBytesPerSecond.Update(LooseChunksStats.CompressedChunkRawBytes.load()); FilteredGenerateBlockBytesPerSecond.Update(GeneratedBlockByteCount.load()); FilteredUploadedBytesPerSecond.Update(UploadedCompressedChunkSize.load() + UploadedBlockSize.load()); uint64_t UploadedRawSize = UploadedRawChunkSize.load() + UploadedBlockSize.load(); @@ -2452,8 +2461,8 @@ namespace { "Uploaded {}/{} ({}/{}) blobs " "({} {}bits/s)", LooseChunksStats.CompressedChunkCount.load(), - CompressLooseChunkOrderIndexes.size(), - NiceBytes(RawLooseChunkByteCount), + LooseChunkOrderIndexes.size(), + NiceBytes(LooseChunksStats.CompressedChunkRawBytes), NiceBytes(TotalLooseChunksSize), NiceNum(FilteredCompressedBytesPerSecond.GetCurrent()), @@ -2582,9 +2591,10 @@ namespace { for (size_t KnownBlockIndex : ReuseBlockIndexes) { std::vector<uint32_t> FoundChunkIndexes; - size_t BlockSize = 0; - size_t AdjustedReuseSize = 0; - const ChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex]; + size_t BlockSize = 0; + size_t AdjustedReuseSize = 0; + size_t AdjustedRawReuseSize = 0; + const ChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex]; for (size_t BlockChunkIndex = 0; BlockChunkIndex < KnownBlock.ChunkRawHashes.size(); BlockChunkIndex++) { const IoHash& BlockChunkHash = KnownBlock.ChunkRawHashes[BlockChunkIndex]; @@ -2597,6 +2607,7 @@ namespace { { FoundChunkIndexes.push_back(ChunkIndex); AdjustedReuseSize += KnownBlock.ChunkCompressedLengths[BlockChunkIndex]; + AdjustedRawReuseSize += KnownBlock.ChunkRawLengths[BlockChunkIndex]; } } } @@ -2617,6 +2628,7 @@ namespace { } FindBlocksStats.AcceptedChunkCount += FoundChunkIndexes.size(); FindBlocksStats.AcceptedByteCount += AdjustedReuseSize; + FindBlocksStats.AcceptedRawByteCount += AdjustedRawReuseSize; FindBlocksStats.AcceptedReduntantChunkCount += KnownBlock.ChunkRawHashes.size() - FoundChunkIndexes.size(); FindBlocksStats.AcceptedReduntantByteCount += BlockSize - AdjustedReuseSize; } @@ -3020,9 +3032,10 @@ namespace { } FindBlocksStats.NewBlocksChunkCount = NewBlockChunkIndexes.size(); - const double AcceptedByteCountPercent = FindBlocksStats.PotentialChunkByteCount > 0 - ? (100.0 * FindBlocksStats.AcceptedByteCount / FindBlocksStats.PotentialChunkByteCount) - : 0.0; + const double AcceptedByteCountPercent = + FindBlocksStats.PotentialChunkByteCount > 0 + ? (100.0 * FindBlocksStats.AcceptedRawByteCount / FindBlocksStats.PotentialChunkByteCount) + : 0.0; const double AcceptedReduntantByteCountPercent = FindBlocksStats.AcceptedByteCount > 0 ? (100.0 * FindBlocksStats.AcceptedReduntantByteCount) / @@ -3042,7 +3055,7 @@ namespace { NiceTimeSpanMs(FindBlocksStats.FindBlockTimeMS), FindBlocksStats.AcceptedChunkCount, - NiceBytes(FindBlocksStats.AcceptedByteCount), + NiceBytes(FindBlocksStats.AcceptedRawByteCount), FindBlocksStats.AcceptedBlockCount, AcceptedByteCountPercent, @@ -3276,8 +3289,8 @@ namespace { TempLooseChunksStats.CompressedChunkCount.load(), NiceBytes(TempLooseChunksStats.CompressedChunkBytes.load()), - NiceNum(GetBytesPerSecond(TempLooseChunksStats.CompressChunksElapsedWallTimeUS, - TempLooseChunksStats.CompressedChunkBytes)), + NiceNum( + GetBytesPerSecond(TempLooseChunksStats.CompressChunksElapsedWallTimeUS, TempLooseChunksStats.ChunkByteCount)), TempUploadStats.ChunkCount.load(), NiceBytes(TempUploadStats.ChunksBytes), @@ -3435,6 +3448,7 @@ namespace { "\n AcceptedBlockCount: {}" "\n AcceptedChunkCount: {}" "\n AcceptedByteCount: {}" + "\n AcceptedRawByteCount: {}" "\n RejectedBlockCount: {}" "\n RejectedChunkCount: {}" "\n RejectedByteCount: {}" @@ -3452,6 +3466,7 @@ namespace { FindBlocksStats.AcceptedBlockCount, FindBlocksStats.AcceptedChunkCount, NiceBytes(FindBlocksStats.AcceptedByteCount), + NiceBytes(FindBlocksStats.AcceptedRawByteCount), FindBlocksStats.RejectedBlockCount, FindBlocksStats.RejectedChunkCount, NiceBytes(FindBlocksStats.RejectedByteCount), @@ -3627,7 +3642,7 @@ namespace { {{"totalSize", double(LocalFolderScanStats.FoundFileByteCount.load())}, {"reusedRatio", AcceptedByteCountPercent / 100.0}, {"reusedBlockCount", double(FindBlocksStats.AcceptedBlockCount)}, - {"reusedBlockByteCount", double(FindBlocksStats.AcceptedByteCount)}, + {"reusedBlockByteCount", double(FindBlocksStats.AcceptedRawByteCount)}, {"newBlockCount", double(FindBlocksStats.NewBlocksCount)}, {"newBlockByteCount", double(FindBlocksStats.NewBlocksChunkByteCount)}, {"uploadedCount", double(UploadStats.BlockCount.load() + UploadStats.ChunkCount.load())}, @@ -4192,11 +4207,36 @@ namespace { bool NeedsWrite = true; if (RemoteChunkIndexNeedsCopyFromSourceFlags[ChunkIndex].compare_exchange_strong(NeedsWrite, false)) { - MemoryView ChunkMemoryView = BlockView.Mid(OffsetInBlock + CompressedBuffer::GetHeaderSizeForNoneEncoder(), - ChunkCompressedSize - CompressedBuffer::GetHeaderSizeForNoneEncoder()); - IoBuffer Decompressed(IoBuffer::Wrap, ChunkMemoryView.GetData(), ChunkMemoryView.GetSize()); - ZEN_ASSERT_SLOW(ChunkHash == IoHash::HashBuffer(Decompressed)); + MemoryView ChunkMemoryView = BlockView.Mid(OffsetInBlock, ChunkCompressedSize); + IoHash VerifyChunkHash; + uint64_t VerifyChunkSize; + CompressedBuffer CompressedChunk = + CompressedBuffer::FromCompressed(SharedBuffer::MakeView(ChunkMemoryView), VerifyChunkHash, VerifyChunkSize); + ZEN_ASSERT(CompressedChunk); + ZEN_ASSERT(VerifyChunkHash == ChunkHash); + ZEN_ASSERT(VerifyChunkSize == RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]); + + OodleCompressor ChunkCompressor; + OodleCompressionLevel ChunkCompressionLevel; + uint64_t ChunkBlockSize; + + bool GetCompressParametersSuccess = + CompressedChunk.TryGetCompressParameters(ChunkCompressor, ChunkCompressionLevel, ChunkBlockSize); + ZEN_ASSERT(GetCompressParametersSuccess); + + IoBuffer Decompressed; + if (ChunkCompressionLevel == OodleCompressionLevel::None) + { + MemoryView ChunkDecompressedMemoryView = ChunkMemoryView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder()); + Decompressed = + IoBuffer(IoBuffer::Wrap, ChunkDecompressedMemoryView.GetData(), ChunkDecompressedMemoryView.GetSize()); + } + else + { + Decompressed = CompressedChunk.Decompress().AsIoBuffer(); + } ZEN_ASSERT(Decompressed.GetSize() == RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]); + ZEN_ASSERT_SLOW(ChunkHash == IoHash::HashBuffer(Decompressed)); for (const ChunkedContentLookup::ChunkSequenceLocation* Target : ChunkTargetPtrs) { OutOps.WriteOps.push_back( @@ -6208,7 +6248,17 @@ namespace { ZEN_ASSERT_SLOW(std::filesystem::exists(LocalFilePath)); SetFileReadOnly(LocalFilePath, false); ZEN_ASSERT_SLOW(!std::filesystem::exists(CacheFilePath)); - std::filesystem::rename(LocalFilePath, CacheFilePath); + std::error_code Ec; + std::filesystem::rename(LocalFilePath, CacheFilePath, Ec); + for (size_t Retries = 0; Ec && Retries < 3; Retries++) + { + Sleep(100); + std::filesystem::rename(LocalFilePath, CacheFilePath, Ec); + } + if (Ec) + { + zen::ThrowSystemError(Ec.value(), Ec.message()); + } TotalFullFileSizeCached += std::filesystem::file_size(CacheFilePath); } ZEN_CONSOLE("Saved {} ({}) unchanged files in cache", @@ -6343,7 +6393,17 @@ namespace { { SetFileReadOnly(FirstTargetFilePath, false); } - std::filesystem::rename(CacheFilePath, FirstTargetFilePath); + std::error_code Ec; + std::filesystem::rename(CacheFilePath, FirstTargetFilePath, Ec); + for (size_t Retries = 0; Ec && Retries < 3; Retries++) + { + Sleep(100); + std::filesystem::rename(CacheFilePath, FirstTargetFilePath, Ec); + } + if (Ec) + { + zen::ThrowSystemError(Ec.value(), Ec.message()); + } RebuildFolderStateStats.FinalizeTreeFilesMovedCount++; } @@ -7893,15 +7953,18 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) StorageInstance Result; + std::string StorageDescription; + std::string CacheDescription; + if (!m_BuildsUrl.empty()) { ParseAuthOptions(); Result.BuildStorageHttp = std::make_unique<HttpClient>(m_BuildsUrl, ClientSettings); - ZEN_CONSOLE("Using cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}'", - m_BuildsUrl, - Result.BuildStorageHttp->GetSessionId(), - m_Namespace, - m_Bucket); + StorageDescription = fmt::format("cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}'", + m_BuildsUrl, + Result.BuildStorageHttp->GetSessionId(), + m_Namespace, + m_Bucket); Result.BuildStorage = CreateJupiterBuildStorage(Log(), *Result.BuildStorageHttp, StorageStats, m_Namespace, m_Bucket, TempPath / "storage"); Result.StorageName = ZEN_CLOUD_STORAGE; @@ -7909,7 +7972,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) else if (!m_StoragePath.empty()) { std::filesystem::path StoragePath = std::filesystem::absolute(StringToPath(m_StoragePath)).make_preferred(); - ZEN_CONSOLE("Using folder {}", StoragePath); + StorageDescription = fmt::format("folder {}", StoragePath); Result.BuildStorage = CreateFileBuildStorage(StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); Result.StorageName = fmt::format("Disk {}", StoragePath.stem()); } @@ -7930,12 +7993,24 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { Result.BuildCacheStorage = CreateZenBuildStorageCache(*Result.CacheHttp, StorageCacheStats, m_Namespace, m_Bucket, TempPath / "zencache"); + CacheDescription = fmt::format("zen cache {}. SessionId: '{}'", m_ZenCacheHost, Result.CacheHttp->GetSessionId()); + if (!m_Namespace.empty()) + { + CacheDescription += fmt::format(" {}.", m_Namespace); + } + if (!m_Bucket.empty()) + { + CacheDescription += fmt::format(" {}.", m_Bucket); + } } else { Result.CacheHttp.reset(); } } + ZEN_CONSOLE("Remote: {}.{}", + StorageDescription, + CacheDescription.empty() ? std::string("") : fmt::format(" Cache: {}", CacheDescription)); return Result; }; |