diff options
| author | Dan Engelbrecht <[email protected]> | 2025-03-18 08:56:40 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-03-18 08:56:40 +0100 |
| commit | fa4ef162b1dd53cbad135850a8f9cf8fb532f395 (patch) | |
| tree | 53d0da390d49a7e6a8dd1f3dcc14d8d48a8acbba /src | |
| parent | fix quoted command lines arguments (#306) (diff) | |
| download | zen-fa4ef162b1dd53cbad135850a8f9cf8fb532f395.tar.xz zen-fa4ef162b1dd53cbad135850a8f9cf8fb532f395.zip | |
improved post upload/download summary (#308)
* added ValidateStatistics and improved post upload summary
* improved download statistics
* smoother stats update when compressing
* better feedback during stream compresss/decompress
* don't capture TotalPartWriteCount by reference
* disk stats cleanup
* multi-test-download overall timer
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 962 | ||||
| -rw-r--r-- | src/zencore/compress.cpp | 130 | ||||
| -rw-r--r-- | src/zencore/include/zencore/compress.h | 18 |
3 files changed, 690 insertions, 420 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index f0ee4904e..61e3c0fab 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -567,6 +567,64 @@ namespace { } }; + struct CacheMappingStatistics + { + uint64_t CacheChunkCount = 0; + uint64_t CacheChunkByteCount = 0; + + uint64_t CacheBlockCount = 0; + uint64_t CacheBlocksByteCount = 0; + + uint64_t CacheSequenceHashesCount = 0; + uint64_t CacheSequenceHashesByteCount = 0; + + uint32_t LocalPathsMatchingSequencesCount = 0; + uint64_t LocalPathsMatchingSequencesByteCount = 0; + + uint64_t LocalChunkMatchingRemoteCount = 0; + uint64_t LocalChunkMatchingRemoteByteCount = 0; + }; + + struct DownloadStatistics + { + std::atomic<uint64_t> RequestsCompleteCount = 0; + + std::atomic<uint64_t> DownloadedChunkCount = 0; + std::atomic<uint64_t> DownloadedChunkByteCount = 0; + std::atomic<uint64_t> MultipartAttachmentCount = 0; + + std::atomic<uint64_t> DownloadedBlockCount = 0; + std::atomic<uint64_t> DownloadedBlockByteCount = 0; + + std::atomic<uint64_t> DownloadedPartialBlockCount = 0; + std::atomic<uint64_t> DownloadedPartialBlockByteCount = 0; + }; + + struct WriteChunkStatistics + { + std::atomic<uint32_t> ChunkCountWritten = 0; + std::atomic<uint64_t> ChunkBytesWritten = 0; + uint64_t DownloadTimeUs = 0; + uint64_t WriteTimeUs = 0; + uint64_t WriteChunksElapsedWallTimeUs = 0; + }; + + struct RebuildFolderStateStatistics + { + uint64_t CleanFolderElapsedWallTimeUs = 0; + std::atomic<uint32_t> FinalizeTreeFilesMovedCount = 0; + std::atomic<uint32_t> FinalizeTreeFilesCopiedCount = 0; + uint64_t FinalizeTreeElapsedWallTimeUs = 0; + }; + + struct VerifyFolderStatistics + { + std::atomic<uint64_t> FilesVerified = 0; + std::atomic<uint64_t> FilesFailed = 0; + std::atomic<uint64_t> ReadBytes = 0; + uint64_t VerifyElapsedWallTimeUs = 0; + }; + std::vector<uint32_t> CalculateAbsoluteChunkOrders(const std::span<const IoHash> LocalChunkHashes, const std::span<const uint32_t> LocalChunkOrder, const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& ChunkHashToLocalChunkIndex, @@ -1016,7 +1074,15 @@ namespace { class BufferedOpenFile { public: - BufferedOpenFile(const std::filesystem::path Path) : Source(Path, BasicFile::Mode::kRead), SourceSize(Source.FileSize()) {} + BufferedOpenFile(const std::filesystem::path Path, DiskStatistics& DiskStats) + : m_Source(Path, BasicFile::Mode::kRead) + , m_SourceSize(m_Source.FileSize()) + , m_DiskStats(DiskStats) + { + m_DiskStats.OpenReadCount++; + m_DiskStats.CurrentOpenFileCount++; + } + ~BufferedOpenFile() { m_DiskStats.CurrentOpenFileCount--; } BufferedOpenFile() = delete; BufferedOpenFile(const BufferedOpenFile&) = delete; BufferedOpenFile(BufferedOpenFile&&) = delete; @@ -1028,10 +1094,10 @@ namespace { { ZEN_TRACE_CPU("BufferedOpenFile::GetRange"); - ZEN_ASSERT((CacheBlockIndex == (uint64_t)-1) || Cache); - auto _ = MakeGuard([&]() { ZEN_ASSERT((CacheBlockIndex == (uint64_t)-1) || Cache); }); + ZEN_ASSERT((m_CacheBlockIndex == (uint64_t)-1) || m_Cache); + auto _ = MakeGuard([&]() { ZEN_ASSERT((m_CacheBlockIndex == (uint64_t)-1) || m_Cache); }); - ZEN_ASSERT((Offset + Size) <= SourceSize); + ZEN_ASSERT((Offset + Size) <= m_SourceSize); const uint64_t BlockIndexStart = Offset / BlockSize; const uint64_t BlockIndexEnd = (Offset + Size - 1) / BlockSize; @@ -1042,21 +1108,23 @@ namespace { for (uint64_t BlockIndex = BlockIndexStart; BlockIndex <= BlockIndexEnd; BlockIndex++) { const uint64_t BlockStartOffset = BlockIndex * BlockSize; - if (CacheBlockIndex != BlockIndex) + if (m_CacheBlockIndex != BlockIndex) { - uint64_t CacheSize = Min(BlockSize, SourceSize - BlockStartOffset); + uint64_t CacheSize = Min(BlockSize, m_SourceSize - BlockStartOffset); ZEN_ASSERT(CacheSize > 0); - Cache = IoBuffer(CacheSize); - Source.Read(Cache.GetMutableView().GetData(), CacheSize, BlockStartOffset); - CacheBlockIndex = BlockIndex; + m_Cache = IoBuffer(CacheSize); + m_Source.Read(m_Cache.GetMutableView().GetData(), CacheSize, BlockStartOffset); + m_DiskStats.ReadCount++; + m_DiskStats.ReadByteCount += CacheSize; + m_CacheBlockIndex = BlockIndex; } const uint64_t BytesRead = ReadOffset - Offset; ZEN_ASSERT(BlockStartOffset <= ReadOffset); const uint64_t OffsetIntoBlock = ReadOffset - BlockStartOffset; - ZEN_ASSERT(OffsetIntoBlock < Cache.GetSize()); - const uint64_t BlockBytes = Min(Cache.GetSize() - OffsetIntoBlock, Size - BytesRead); - BufferRanges.emplace_back(SharedBuffer(IoBuffer(Cache, OffsetIntoBlock, BlockBytes))); + ZEN_ASSERT(OffsetIntoBlock < m_Cache.GetSize()); + const uint64_t BlockBytes = Min(m_Cache.GetSize() - OffsetIntoBlock, Size - BytesRead); + BufferRanges.emplace_back(SharedBuffer(IoBuffer(m_Cache, OffsetIntoBlock, BlockBytes))); ReadOffset += BlockBytes; } CompositeBuffer Result(std::move(BufferRanges)); @@ -1065,10 +1133,11 @@ namespace { } private: - BasicFile Source; - const uint64_t SourceSize; - uint64_t CacheBlockIndex = (uint64_t)-1; - IoBuffer Cache; + BasicFile m_Source; + const uint64_t m_SourceSize; + DiskStatistics& m_DiskStats; + uint64_t m_CacheBlockIndex = (uint64_t)-1; + IoBuffer m_Cache; }; class ReadFileCache @@ -1087,11 +1156,7 @@ namespace { { m_OpenFiles.reserve(MaxOpenFileCount); } - ~ReadFileCache() - { - m_DiskStats.CurrentOpenFileCount -= m_OpenFiles.size(); - m_OpenFiles.clear(); - } + ~ReadFileCache() { m_OpenFiles.clear(); } CompositeBuffer GetRange(uint32_t SequenceIndex, uint64_t Offset, uint64_t Size) { @@ -1109,7 +1174,6 @@ namespace { m_OpenFiles.insert(m_OpenFiles.begin(), std::make_pair(SequenceIndex, std::move(CachedFile))); } CompositeBuffer Result = m_OpenFiles.front().second->GetRange(Offset, Size); - m_DiskStats.ReadByteCount += Result.GetSize(); return Result; } const uint32_t LocalPathIndex = m_LocalLookup.SequenceIndexFirstPathIndex[SequenceIndex]; @@ -1117,20 +1181,15 @@ namespace { if (Size == m_LocalContent.RawSizes[LocalPathIndex]) { IoBuffer Result = IoBufferBuilder::MakeFromFile(LocalFilePath); - m_DiskStats.OpenReadCount++; - m_DiskStats.ReadByteCount += Result.GetSize(); return CompositeBuffer(SharedBuffer(Result)); } if (m_OpenFiles.size() == m_OpenFiles.capacity()) { m_OpenFiles.pop_back(); - m_DiskStats.CurrentOpenFileCount--; } - m_OpenFiles.insert(m_OpenFiles.begin(), std::make_pair(SequenceIndex, std::make_unique<BufferedOpenFile>(LocalFilePath))); + m_OpenFiles.insert(m_OpenFiles.begin(), + std::make_pair(SequenceIndex, std::make_unique<BufferedOpenFile>(LocalFilePath, m_DiskStats))); CompositeBuffer Result = m_OpenFiles.front().second->GetRange(Offset, Size); - m_DiskStats.ReadByteCount += Result.GetSize(); - m_DiskStats.OpenReadCount++; - m_DiskStats.CurrentOpenFileCount++; return Result; } @@ -1167,17 +1226,21 @@ namespace { } IoHashStream Hash; - bool CouldDecompress = Compressed.DecompressToStream(0, RawSize, [&Hash](uint64_t, const CompositeBuffer& RangeBuffer) { - if (!AbortFlag) - { - for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) + bool CouldDecompress = Compressed.DecompressToStream( + 0, + RawSize, + [&Hash](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { + ZEN_UNUSED(SourceOffset, SourceSize, Offset); + if (!AbortFlag) { - Hash.Append(Segment.GetView()); + for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) + { + Hash.Append(Segment.GetView()); + } + return true; } - return true; - } - return false; - }); + return false; + }); if (AbortFlag) { @@ -1330,8 +1393,7 @@ namespace { const std::uint64_t PreferredMultipartChunkSize, ParallellWork& Work, WorkerThreadPool& NetworkPool, - std::atomic<uint64_t>& BytesDownloaded, - std::atomic<uint64_t>& MultipartAttachmentCount, + DownloadStatistics& DownloadStats, std::function<void(IoBuffer&& Payload)>&& OnDownloadComplete) { ZEN_TRACE_CPU("DownloadLargeBlob"); @@ -1353,10 +1415,10 @@ namespace { BuildId, ChunkHash, PreferredMultipartChunkSize, - [Workload, &BytesDownloaded, OnDownloadComplete = std::move(OnDownloadComplete)](uint64_t Offset, - const IoBuffer& Chunk, - uint64_t BytesRemaining) { - BytesDownloaded += Chunk.GetSize(); + [Workload, &DownloadStats, OnDownloadComplete = std::move(OnDownloadComplete)](uint64_t Offset, + const IoBuffer& Chunk, + uint64_t BytesRemaining) { + DownloadStats.DownloadedChunkByteCount += Chunk.GetSize(); if (!AbortFlag.load()) { @@ -1364,6 +1426,7 @@ namespace { Workload->TempFile.Write(Chunk.GetView(), Offset); if (Chunk.GetSize() == BytesRemaining) { + DownloadStats.DownloadedChunkCount++; uint64_t PayloadSize = Workload->TempFile.FileSize(); void* FileHandle = Workload->TempFile.Detach(); ZEN_ASSERT(FileHandle != nullptr); @@ -1375,7 +1438,7 @@ namespace { }); if (!WorkItems.empty()) { - MultipartAttachmentCount++; + DownloadStats.MultipartAttachmentCount++; } for (auto& WorkItem : WorkItems) { @@ -1392,7 +1455,23 @@ namespace { } } - void ValidateBuildPart(BuildStorage& Storage, const Oid& BuildId, Oid BuildPartId, const std::string_view BuildPartName) + struct ValidateStatistics + { + uint64_t BuildBlobSize = 0; + uint64_t BuildPartSize = 0; + uint64_t ChunkAttachmentCount = 0; + uint64_t BlockAttachmentCount = 0; + std::atomic<uint64_t> VerifiedAttachmentCount = 0; + std::atomic<uint64_t> VerifiedByteCount = 0; + uint64_t ElapsedWallTimeUS = 0; + }; + + void ValidateBuildPart(BuildStorage& Storage, + const Oid& BuildId, + Oid BuildPartId, + const std::string_view BuildPartName, + ValidateStatistics& ValidateStats, + DownloadStatistics& DownloadStats) { Stopwatch Timer; auto _ = MakeGuard([&]() { @@ -1411,23 +1490,27 @@ namespace { throw std::runtime_error(fmt::format("Build {} does not have a part named '{}'", BuildId, BuildPartName)); } } + ValidateStats.BuildBlobSize = Build.GetSize(); uint64_t PreferredMultipartChunkSize = DefaultPreferredMultipartChunkSize; if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0) { PreferredMultipartChunkSize = ChunkSize; } - CbObject BuildPart = Storage.GetBuildPart(BuildId, BuildPartId); + CbObject BuildPart = Storage.GetBuildPart(BuildId, BuildPartId); + ValidateStats.BuildPartSize = BuildPart.GetSize(); ZEN_CONSOLE("Validating build part {}/{} ({})", BuildId, BuildPartId, NiceBytes(BuildPart.GetSize())); std::vector<IoHash> ChunkAttachments; for (CbFieldView LooseFileView : BuildPart["chunkAttachments"sv].AsObjectView()["rawHashes"sv]) { ChunkAttachments.push_back(LooseFileView.AsBinaryAttachment()); } + ValidateStats.ChunkAttachmentCount = ChunkAttachments.size(); std::vector<IoHash> BlockAttachments; for (CbFieldView BlocksView : BuildPart["blockAttachments"sv].AsObjectView()["rawHashes"sv]) { BlockAttachments.push_back(BlocksView.AsBinaryAttachment()); } + ValidateStats.BlockAttachmentCount = BlockAttachments.size(); std::vector<ChunkBlockDescription> VerifyBlockDescriptions = Storage.GetBlockMetadata(BuildId, BlockAttachments); if (VerifyBlockDescriptions.size() != BlockAttachments.size()) @@ -1453,13 +1536,9 @@ namespace { ProgressBar ProgressBar(UsePlainProgress); - uint64_t AttachmentsToVerifyCount = ChunkAttachments.size() + BlockAttachments.size(); - std::atomic<uint64_t> DownloadedAttachmentCount = 0; - std::atomic<uint64_t> VerifiedAttachmentCount = 0; - std::atomic<uint64_t> DownloadedByteCount = 0; - std::atomic<uint64_t> VerifiedByteCount = 0; - FilteredRate FilteredDownloadedBytesPerSecond; - FilteredRate FilteredVerifiedBytesPerSecond; + uint64_t AttachmentsToVerifyCount = ChunkAttachments.size() + BlockAttachments.size(); + FilteredRate FilteredDownloadedBytesPerSecond; + FilteredRate FilteredVerifiedBytesPerSecond; std::atomic<uint64_t> MultipartAttachmentCount = 0; @@ -1480,8 +1559,7 @@ namespace { PreferredMultipartChunkSize, Work, NetworkPool, - DownloadedByteCount, - MultipartAttachmentCount, + DownloadStats, [&, ChunkHash = ChunkAttachment](IoBuffer&& Payload) { Payload.SetContentType(ZenContentType::kCompressedBinary); if (!AbortFlag) @@ -1493,6 +1571,12 @@ namespace { { ZEN_TRACE_CPU("ValidateBuildPart_Validate"); + if (DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount == + AttachmentsToVerifyCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + FilteredVerifiedBytesPerSecond.Start(); uint64_t CompressedSize; @@ -1502,9 +1586,9 @@ namespace { ChunkHash, NiceBytes(CompressedSize), NiceBytes(DecompressedSize)); - VerifiedAttachmentCount++; - VerifiedByteCount += DecompressedSize; - if (VerifiedAttachmentCount.load() == AttachmentsToVerifyCount) + ValidateStats.VerifiedAttachmentCount++; + ValidateStats.VerifiedByteCount += DecompressedSize; + if (ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount) { FilteredVerifiedBytesPerSecond.Stop(); } @@ -1529,9 +1613,9 @@ namespace { FilteredDownloadedBytesPerSecond.Start(); IoBuffer Payload = Storage.GetBuildBlob(BuildId, BlockAttachment); - DownloadedAttachmentCount++; - DownloadedByteCount += Payload.GetSize(); - if (DownloadedAttachmentCount.load() == AttachmentsToVerifyCount) + DownloadStats.DownloadedBlockCount++; + DownloadStats.DownloadedBlockByteCount += Payload.GetSize(); + if (DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount == AttachmentsToVerifyCount) { FilteredDownloadedBytesPerSecond.Stop(); } @@ -1557,9 +1641,9 @@ namespace { BlockAttachment, NiceBytes(CompressedSize), NiceBytes(DecompressedSize)); - VerifiedAttachmentCount++; - VerifiedByteCount += DecompressedSize; - if (VerifiedAttachmentCount.load() == AttachmentsToVerifyCount) + ValidateStats.VerifiedAttachmentCount++; + ValidateStats.VerifiedByteCount += DecompressedSize; + if (ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount) { FilteredVerifiedBytesPerSecond.Stop(); } @@ -1575,17 +1659,20 @@ namespace { Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted, PendingWork); + const uint64_t DownloadedAttachmentCount = DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount; + const uint64_t DownloadedByteCount = DownloadStats.DownloadedChunkByteCount + DownloadStats.DownloadedBlockByteCount; + FilteredDownloadedBytesPerSecond.Update(DownloadedByteCount); - FilteredVerifiedBytesPerSecond.Update(VerifiedByteCount); + FilteredVerifiedBytesPerSecond.Update(ValidateStats.VerifiedByteCount); std::string Details = fmt::format("Downloaded {}/{} ({}, {}bits/s). Verified {}/{} ({}, {}B/s)", - DownloadedAttachmentCount.load(), + DownloadedAttachmentCount, AttachmentsToVerifyCount, - NiceBytes(DownloadedByteCount.load()), + NiceBytes(DownloadedByteCount), NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8), - VerifiedAttachmentCount.load(), + ValidateStats.VerifiedAttachmentCount.load(), AttachmentsToVerifyCount, - NiceBytes(VerifiedByteCount.load()), + NiceBytes(ValidateStats.VerifiedByteCount.load()), NiceNum(FilteredVerifiedBytesPerSecond.GetCurrent())); ProgressBar.UpdateState( @@ -1593,11 +1680,12 @@ namespace { .Details = Details, .TotalCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2), .RemainingCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2 - - (DownloadedAttachmentCount.load() + VerifiedAttachmentCount.load()))}, + (DownloadedAttachmentCount + ValidateStats.VerifiedAttachmentCount.load()))}, false); }); ProgressBar.Finish(); + ValidateStats.ElapsedWallTimeUS = Timer.GetElapsedTimeUs(); } void ArrangeChunksIntoBlocks(const ChunkedFolderContent& Content, @@ -1698,7 +1786,9 @@ namespace { const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, uint32_t ChunkIndex, - const std::filesystem::path& TempFolderPath) + const std::filesystem::path& TempFolderPath, + std::atomic<uint64_t>& ReadRawBytes, + LooseChunksStatistics& LooseChunksStats) { ZEN_TRACE_CPU("CompressChunk"); ZEN_ASSERT(!TempFolderPath.empty()); @@ -1731,7 +1821,12 @@ namespace { bool CouldCompress = CompressedBuffer::CompressToStream( CompositeBuffer(SharedBuffer(RawSource)), - [&](uint64_t Offset, const CompositeBuffer& RangeBuffer) { CompressedFile.Write(RangeBuffer, Offset); }); + [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { + ZEN_UNUSED(SourceOffset); + ReadRawBytes += SourceSize; + CompressedFile.Write(RangeBuffer, Offset); + LooseChunksStats.CompressedChunkBytes += RangeBuffer.GetSize(); + }); if (CouldCompress) { uint64_t CompressedSize = CompressedFile.FileSize(); @@ -1749,6 +1844,9 @@ namespace { ZEN_ASSERT(Compressed); ZEN_ASSERT(RawHash == ChunkHash); ZEN_ASSERT(RawSize == ChunkSize); + + LooseChunksStats.CompressedChunkCount++; + return Compressed.GetCompressed(); } CompressedFile.Close(); @@ -1988,7 +2086,6 @@ namespace { const std::uint64_t LargeAttachmentSize, DiskStatistics& DiskStats, UploadStatistics& UploadStats, - GenerateBlocksStatistics& GenerateBlocksStats, LooseChunksStatistics& LooseChunksStats) { ZEN_TRACE_CPU("UploadPartBlobs"); @@ -2238,8 +2335,6 @@ namespace { Payload = std::move(CompressedBlock).GetCompressed(); } - GenerateBlocksStats.GeneratedBlockByteCount += NewBlocks.BlockSizes[BlockIndex]; - GenerateBlocksStats.GeneratedBlockCount++; GeneratedBlockByteCount += NewBlocks.BlockSizes[BlockIndex]; GeneratedBlockCount++; if (GeneratedBlockCount == GenerateBlockIndexes.size()) @@ -2260,9 +2355,7 @@ namespace { } } - std::atomic<uint64_t> CompressedLooseChunkCount = 0; - std::atomic<uint64_t> CompressedLooseChunkByteCount = 0; - std::atomic<uint64_t> RawLooseChunkByteCount = 0; + std::atomic<uint64_t> RawLooseChunkByteCount = 0; // Start compression of any non-precompressed loose chunks and schedule upload for (const uint32_t CompressLooseChunkOrderIndex : CompressLooseChunkOrderIndexes) @@ -2276,19 +2369,20 @@ namespace { ZEN_TRACE_CPU("UploadPartBlobs_CompressChunk"); FilteredCompressedBytesPerSecond.Start(); - CompositeBuffer Payload = CompressChunk(Path, Content, Lookup, ChunkIndex, Path / ZenTempChunkFolderName); + CompositeBuffer Payload = CompressChunk(Path, + Content, + Lookup, + ChunkIndex, + Path / ZenTempChunkFolderName, + RawLooseChunkByteCount, + LooseChunksStats); ZEN_CONSOLE_VERBOSE("Compressed chunk {} ({} -> {})", Content.ChunkedContent.ChunkHashes[ChunkIndex], NiceBytes(Content.ChunkedContent.ChunkRawSizes[ChunkIndex]), NiceBytes(Payload.GetSize())); const uint64_t ChunkRawSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; UploadStats.ReadFromDiskBytes += ChunkRawSize; - LooseChunksStats.CompressedChunkBytes += Payload.GetSize(); - LooseChunksStats.CompressedChunkCount++; - CompressedLooseChunkByteCount += Payload.GetSize(); - CompressedLooseChunkCount++; - RawLooseChunkByteCount += ChunkRawSize; - if (CompressedLooseChunkCount == CompressLooseChunkOrderIndexes.size()) + if (LooseChunksStats.CompressedChunkCount == CompressLooseChunkOrderIndexes.size()) { FilteredCompressedBytesPerSecond.Stop(); } @@ -2303,20 +2397,21 @@ namespace { Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted, PendingWork); - FilteredCompressedBytesPerSecond.Update(CompressedLooseChunkByteCount.load()); + FilteredCompressedBytesPerSecond.Update(LooseChunksStats.CompressedChunkBytes.load()); FilteredGenerateBlockBytesPerSecond.Update(GeneratedBlockByteCount.load()); FilteredUploadedBytesPerSecond.Update(UploadedCompressedChunkSize.load() + UploadedBlockSize.load()); uint64_t UploadedRawSize = UploadedRawChunkSize.load() + UploadedBlockSize.load(); uint64_t UploadedCompressedSize = UploadedCompressedChunkSize.load() + UploadedBlockSize.load(); std::string Details = fmt::format( - "Compressed {}/{} ({}/{}) chunks. " + "Compressed {}/{} ({}/{} {}B/s) chunks. " "Uploaded {}/{} ({}/{}) blobs " "({} {}bits/s)", - CompressedLooseChunkCount.load(), + LooseChunksStats.CompressedChunkCount.load(), CompressLooseChunkOrderIndexes.size(), NiceBytes(RawLooseChunkByteCount), NiceBytes(TotalLooseChunksSize), + NiceNum(FilteredCompressedBytesPerSecond.GetCurrent()), UploadedBlockCount.load() + UploadedChunkCount.load(), UploadBlockCount + UploadChunkCount, @@ -2336,9 +2431,8 @@ namespace { ZEN_ASSERT(AbortFlag || QueuedPendingInMemoryBlocksForUpload.load() == 0); ProgressBar.Finish(); - UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS(); - GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGenerateBlockBytesPerSecond.GetElapsedTimeUS(); - LooseChunksStats.CompressChunksElapsedWallTimeUS = FilteredCompressedBytesPerSecond.GetElapsedTimeUS(); + UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS(); + LooseChunksStats.CompressChunksElapsedWallTimeUS = FilteredCompressedBytesPerSecond.GetElapsedTimeUS(); } } @@ -2531,6 +2625,8 @@ namespace { CreateDirectories(Path / ZenTempBlockFolderName); CreateDirectories(Path / ZenTempChunkFolderName); + std::uint64_t TotalRawSize = 0; + CbObject ChunkerParameters; struct PrepareBuildResult @@ -2751,7 +2847,7 @@ namespace { ChunkerParameters = ChunkParametersWriter.Save(); } - std::uint64_t TotalRawSize = std::accumulate(Content.RawSizes.begin(), Content.RawSizes.end(), std::uint64_t(0)); + TotalRawSize = std::accumulate(Content.RawSizes.begin(), Content.RawSizes.end(), std::uint64_t(0)); { ProgressBar ProgressBar(UsePlainProgress); @@ -3120,21 +3216,16 @@ namespace { { ZEN_CONSOLE_VERBOSE("Uploading attachments: {}", FormatArray<IoHash>(RawHashes, "\n "sv)); - UploadStatistics TempUploadStats; - GenerateBlocksStatistics TempGenerateBlocksStats; - LooseChunksStatistics TempLooseChunksStats; + UploadStatistics TempUploadStats; + LooseChunksStatistics TempLooseChunksStats; Stopwatch TempUploadTimer; auto __ = MakeGuard([&]() { uint64_t TempChunkUploadTimeUs = TempUploadTimer.GetElapsedTimeUs(); ZEN_CONSOLE( - "Generated {} ({} {}B/s) and uploaded {} ({}) blocks. " + "Uploaded {} ({}) blocks. " "Compressed {} ({} {}B/s) and uploaded {} ({}) chunks. " "Transferred {} ({}bits/s) in {}", - TempGenerateBlocksStats.GeneratedBlockCount.load(), - NiceBytes(TempGenerateBlocksStats.GeneratedBlockByteCount.load()), - NiceNum(GetBytesPerSecond(TempGenerateBlocksStats.GenerateBlocksElapsedWallTimeUS, - TempGenerateBlocksStats.GeneratedBlockByteCount)), TempUploadStats.BlockCount.load(), NiceBytes(TempUploadStats.BlocksBytes), @@ -3161,11 +3252,9 @@ namespace { LargeAttachmentSize, DiskStats, TempUploadStats, - TempGenerateBlocksStats, TempLooseChunksStats); UploadStats += TempUploadStats; LooseChunksStats += TempLooseChunksStats; - GenerateBlocksStats += TempGenerateBlocksStats; } }; if (IgnoreExistingBlocks) @@ -3247,18 +3336,25 @@ namespace { } } + ValidateStatistics ValidateStats; + DownloadStatistics ValidateDownloadStats; if (PostUploadVerify && !AbortFlag) { - ValidateBuildPart(Storage, BuildId, BuildPartId, BuildPartName); + ValidateBuildPart(Storage, BuildId, BuildPartId, BuildPartName, ValidateStats, ValidateDownloadStats); } - const double DeltaByteCountPercent = - ChunkingStats.BytesHashed > 0 - ? (100.0 * (FindBlocksStats.NewBlocksChunkByteCount + LooseChunksStats.CompressedChunkBytes)) / (ChunkingStats.BytesHashed) - : 0.0; - - const std::string LargeAttachmentStats = - (LargeAttachmentSize != (uint64_t)-1) ? fmt::format(" ({} as multipart)", UploadStats.MultipartAttachmentCount.load()) : ""; + struct ValidateStatistics + { + uint64_t BuildBlobSize = 0; + uint64_t BuildPartSize = 0; + uint64_t ChunkAttachmentCount = 0; + uint64_t BlockAttachmentCount = 0; + std::atomic<uint64_t> DownloadedAttachmentCount = 0; + std::atomic<uint64_t> VerifiedAttachmentCount = 0; + std::atomic<uint64_t> DownloadedByteCount = 0; + std::atomic<uint64_t> VerifiedByteCount = 0; + uint64_t ElapsedWallTimeUS = 0; + }; ZEN_CONSOLE_VERBOSE( "Folder scanning stats:" @@ -3382,42 +3478,122 @@ namespace { UploadStats.MultipartAttachmentCount.load(), NiceLatencyNs(UploadStats.ElapsedWallTimeUS * 1000)); + if (PostUploadVerify) + { + ZEN_CONSOLE_VERBOSE( + "Validate stats:" + "\n BuildBlobSize: {}" + "\n BuildPartSize: {}" + "\n ChunkAttachmentCount: {}" + "\n BlockAttachmentCount: {}" + "\n VerifiedAttachmentCount: {}" + "\n VerifiedByteCount: {}" + "\n ElapsedWallTimeUS: {}", + NiceBytes(ValidateStats.BuildBlobSize), + NiceBytes(ValidateStats.BuildPartSize), + ValidateStats.ChunkAttachmentCount, + ValidateStats.BlockAttachmentCount, + ValidateStats.VerifiedAttachmentCount.load(), + NiceBytes(ValidateStats.VerifiedByteCount.load()), + NiceLatencyNs(ValidateStats.ElapsedWallTimeUS * 1000)); + + ZEN_CONSOLE_VERBOSE( + "Validate download stats:" + "\n RequestsCompleteCount: {}" + "\n DownloadedChunkCount: {}" + "\n DownloadedChunkByteCount: {}" + "\n MultipartAttachmentCount: {}" + "\n DownloadedBlockCount: {}" + "\n DownloadedBlockByteCount: {}" + "\n DownloadedPartialBlockCount: {}" + "\n DownloadedPartialBlockByteCount: {}", + ValidateDownloadStats.RequestsCompleteCount.load(), + ValidateDownloadStats.DownloadedChunkCount.load(), + NiceBytes(ValidateDownloadStats.DownloadedChunkByteCount.load()), + ValidateDownloadStats.MultipartAttachmentCount.load(), + ValidateDownloadStats.DownloadedBlockCount.load(), + NiceBytes(ValidateDownloadStats.DownloadedBlockByteCount.load()), + ValidateDownloadStats.DownloadedPartialBlockCount.load(), + NiceBytes(ValidateDownloadStats.DownloadedPartialBlockByteCount.load())); + } + + const double DeltaByteCountPercent = + ChunkingStats.BytesHashed > 0 + ? (100.0 * (FindBlocksStats.NewBlocksChunkByteCount + LooseChunksStats.CompressedChunkBytes)) / (ChunkingStats.BytesHashed) + : 0.0; + + const std::string MultipartAttachmentStats = + (LargeAttachmentSize != (uint64_t)-1) ? fmt::format(" ({} as multipart)", UploadStats.MultipartAttachmentCount.load()) : ""; + + std::string ValidateInfo; + if (PostUploadVerify) + { + const uint64_t DownloadedCount = ValidateDownloadStats.DownloadedChunkCount + ValidateDownloadStats.DownloadedBlockCount; + const uint64_t DownloadedByteCount = + ValidateDownloadStats.DownloadedChunkByteCount + ValidateDownloadStats.DownloadedBlockByteCount; + ValidateInfo = fmt::format("\n Verified: {} ({}) {}B/sec in {}", + DownloadedCount, + NiceBytes(DownloadedByteCount), + NiceNum(GetBytesPerSecond(ValidateStats.ElapsedWallTimeUS, DownloadedByteCount)), + NiceTimeSpanMs(ValidateStats.ElapsedWallTimeUS / 1000)); + } + ZEN_CONSOLE( - "Uploaded {}\n" - " Delta: {}/{} ({:.1f}%)\n" - " Blocks: {} ({})\n" - " Chunks: {} ({}){}\n" - " Rate: {}bits/sec", - NiceBytes(UploadStats.BlocksBytes + UploadStats.ChunksBytes), + "Uploaded part {} ('{}') to build {} in {} \n" + " Scanned files: {} ({}) {}B/sec in {}\n" + " New data: {} ({:.1f}%)\n" + " New blocks: {} ({}) {}B/sec\n" + " New chunks: {} ({} -> {}) {}B/sec\n" + " Uploaded: {} ({}) {}bits/sec\n" + " Blocks: {} ({})\n" + " Chunks: {} ({}){}" + "{}", + BuildPartId, + BuildPartName, + BuildId, + NiceTimeSpanMs(ProcessTimer.GetElapsedTimeMs()), + + LocalFolderScanStats.FoundFileCount.load(), + NiceBytes(LocalFolderScanStats.FoundFileByteCount.load()), + NiceNum(GetBytesPerSecond(ChunkingStats.ElapsedWallTimeUS, ChunkingStats.BytesHashed)), + NiceTimeSpanMs(ChunkingStats.ElapsedWallTimeUS / 1000), NiceBytes(FindBlocksStats.NewBlocksChunkByteCount + LooseChunksStats.CompressedChunkBytes), - NiceBytes(ChunkingStats.BytesHashed), DeltaByteCountPercent, + GenerateBlocksStats.GeneratedBlockCount.load(), + NiceBytes(GenerateBlocksStats.GeneratedBlockByteCount.load()), + NiceNum(GetBytesPerSecond(GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS, GenerateBlocksStats.GeneratedBlockByteCount)), + + LooseChunksStats.CompressedChunkCount.load(), + NiceBytes(LooseChunksStats.ChunkByteCount), + NiceBytes(LooseChunksStats.CompressedChunkBytes.load()), + NiceNum(GetBytesPerSecond(LooseChunksStats.CompressChunksElapsedWallTimeUS, LooseChunksStats.ChunkByteCount)), + + NiceBytes(UploadStats.BlockCount.load() + UploadStats.ChunkCount.load()), + NiceBytes(UploadStats.BlocksBytes + UploadStats.ChunksBytes), + NiceNum(GetBytesPerSecond(UploadStats.ElapsedWallTimeUS, (UploadStats.ChunksBytes + UploadStats.BlocksBytes * 8))), + UploadStats.BlockCount.load(), - NiceBytes(UploadStats.BlocksBytes), - UploadStats.ChunkCount.load(), - NiceBytes(UploadStats.ChunksBytes), - LargeAttachmentStats, + NiceBytes(UploadStats.BlocksBytes.load()), - NiceNum(GetBytesPerSecond(UploadStats.ElapsedWallTimeUS, (UploadStats.ChunksBytes + UploadStats.BlocksBytes * 8)))); + UploadStats.ChunkCount.load(), + NiceBytes(UploadStats.ChunksBytes.load()), + MultipartAttachmentStats, - ZEN_CONSOLE("Uploaded ({}) build {} part {} ({}) in {}", - NiceBytes(FindBlocksStats.NewBlocksChunkByteCount + LooseChunksStats.CompressedChunkBytes), - BuildId, - BuildPartName, - BuildPartId, - NiceTimeSpanMs(ProcessTimer.GetElapsedTimeMs())); + ValidateInfo); } - void VerifyFolder(const ChunkedFolderContent& Content, const std::filesystem::path& Path, bool VerifyFileHash) + void VerifyFolder(const ChunkedFolderContent& Content, + const std::filesystem::path& Path, + bool VerifyFileHash, + VerifyFolderStatistics& VerifyFolderStats) { ZEN_TRACE_CPU("VerifyFolder"); - ProgressBar ProgressBar(UsePlainProgress); - std::atomic<uint64_t> FilesVerified(0); - std::atomic<uint64_t> FilesFailed(0); - std::atomic<uint64_t> ReadBytes(0); + Stopwatch Timer; + + ProgressBar ProgressBar(UsePlainProgress); WorkerThreadPool& VerifyPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // @@ -3473,7 +3649,7 @@ namespace { ErrorLock.WithExclusiveLock([&]() { Errors.push_back(fmt::format("File {} with expected size {} does not exist", TargetPath, ExpectedSize)); }); - FilesFailed++; + VerifyFolderStats.FilesFailed++; } else { @@ -3485,7 +3661,7 @@ namespace { Errors.push_back( fmt::format("Failed to get size of file {}: {} ({})", TargetPath, Ec.message(), Ec.value())); }); - FilesFailed++; + VerifyFolderStats.FilesFailed++; } else if (SizeOnDisk < ExpectedSize) { @@ -3495,7 +3671,7 @@ namespace { ExpectedSize, SizeOnDisk)); }); - FilesFailed++; + VerifyFolderStats.FilesFailed++; } else if (SizeOnDisk > ExpectedSize) { @@ -3505,7 +3681,7 @@ namespace { ExpectedSize, SizeOnDisk)); }); - FilesFailed++; + VerifyFolderStats.FilesFailed++; } else if (SizeOnDisk > 0 && VerifyFileHash) { @@ -3540,13 +3716,13 @@ namespace { } FileOffset += ChunkSize; } - FilesFailed++; + VerifyFolderStats.FilesFailed++; } - ReadBytes += SizeOnDisk; + VerifyFolderStats.ReadBytes += SizeOnDisk; } } } - FilesVerified++; + VerifyFolderStats.FilesVerified++; } }, [&, PathIndex](const std::exception& Ex, std::atomic<bool>&) { @@ -3555,23 +3731,25 @@ namespace { (Path / Content.Paths[PathIndex]).make_preferred(), Ex.what())); }); - FilesFailed++; + VerifyFolderStats.FilesFailed++; }); } Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted, PendingWork); std::string Details = fmt::format("Verified {}/{} ({}). Failed files: {}", - FilesVerified.load(), + VerifyFolderStats.FilesVerified.load(), PathCount, - NiceBytes(ReadBytes.load()), - FilesFailed.load()); + NiceBytes(VerifyFolderStats.ReadBytes.load()), + VerifyFolderStats.FilesFailed.load()); ProgressBar.UpdateState({.Task = "Verifying files ", .Details = Details, .TotalCount = gsl::narrow<uint64_t>(PathCount), - .RemainingCount = gsl::narrow<uint64_t>(PathCount - FilesVerified.load())}, + .RemainingCount = gsl::narrow<uint64_t>(PathCount - VerifyFolderStats.FilesVerified.load())}, false); }); + VerifyFolderStats.VerifyElapsedWallTimeUs = Timer.GetElapsedTimeUs(); + ProgressBar.Finish(); for (const std::string& Error : Errors) { @@ -3586,7 +3764,7 @@ namespace { class WriteFileCache { public: - WriteFileCache() {} + WriteFileCache(DiskStatistics& DiskStats) : m_DiskStats(DiskStats) {} ~WriteFileCache() { Flush(); } template<typename TBufferType> @@ -3602,6 +3780,8 @@ namespace { ZEN_TRACE_CPU("WriteFileCache_WriteToFile_CacheWrite"); ZEN_ASSERT(OpenFileWriter); OpenFileWriter->Write(Buffer, FileOffset); + m_DiskStats.WriteCount++; + m_DiskStats.WriteByteCount += Buffer.GetSize(); } else { @@ -3624,6 +3804,8 @@ namespace { } return --Tries > 0; }); + m_DiskStats.OpenWriteCount++; + m_DiskStats.CurrentOpenFileCount++; } const bool CacheWriter = TargetFinalSize > Buffer.GetSize(); @@ -3635,12 +3817,18 @@ namespace { OutputFile = std::move(NewOutputFile); OpenFileWriter = std::make_unique<BasicFileWriter>(*OutputFile, Min(TargetFinalSize, 256u * 1024u)); OpenFileWriter->Write(Buffer, FileOffset); + m_DiskStats.WriteCount++; + m_DiskStats.WriteByteCount += Buffer.GetSize(); SeenTargetIndexes.push_back(TargetIndex); } else { ZEN_TRACE_CPU("WriteFileCache_WriteToFile_Write"); NewOutputFile->Write(Buffer, FileOffset); + m_DiskStats.WriteCount++; + m_DiskStats.WriteByteCount += Buffer.GetSize(); + NewOutputFile = {}; + m_DiskStats.CurrentOpenFileCount--; } } } @@ -3648,9 +3836,15 @@ namespace { void Flush() { ZEN_TRACE_CPU("WriteFileCache_Flush"); + if (OutputFile) + { + m_DiskStats.CurrentOpenFileCount--; + } + OpenFileWriter = {}; OutputFile = {}; } + DiskStatistics& m_DiskStats; std::vector<uint32_t> SeenTargetIndexes; std::unique_ptr<BasicFile> OutputFile; std::unique_ptr<BasicFileWriter> OpenFileWriter; @@ -3693,12 +3887,12 @@ namespace { const ChunkedContentLookup& Lookup, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, const BlockWriteOps& Ops, - std::atomic<uint32_t>& OutChunksComplete, - std::atomic<uint64_t>& OutBytesWritten) + DiskStatistics& DiskStats, + WriteChunkStatistics& WriteChunkStats) { ZEN_TRACE_CPU("WriteBlockChunkOps"); { - WriteFileCache OpenFileCache; + WriteFileCache OpenFileCache(DiskStats); for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps) { if (AbortFlag) @@ -3724,8 +3918,13 @@ namespace { Chunk, FileOffset, RemoteContent.RawSizes[PathIndex]); - OutBytesWritten += ChunkSize; } + WriteChunkStats.ChunkCountWritten += gsl::narrow<uint32_t>(Ops.ChunkBuffers.size()); + WriteChunkStats.ChunkBytesWritten += + std::accumulate(Ops.ChunkBuffers.begin(), + Ops.ChunkBuffers.end(), + uint64_t(0), + [](uint64_t Current, const CompositeBuffer& Buffer) -> uint64_t { return Current + Buffer.GetSize(); }); } if (!AbortFlag) { @@ -3753,7 +3952,6 @@ namespace { GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)); } } - OutChunksComplete += gsl::narrow<uint32_t>(Ops.ChunkBuffers.size()); } } @@ -3865,8 +4063,8 @@ namespace { CompositeBuffer&& BlockBuffer, const ChunkedContentLookup& Lookup, std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, - std::atomic<uint32_t>& OutChunksComplete, - std::atomic<uint64_t>& OutBytesWritten) + DiskStatistics& DiskStats, + WriteChunkStatistics& WriteChunkStats) { ZEN_TRACE_CPU("WriteBlockToDisk"); @@ -3899,8 +4097,8 @@ namespace { Lookup, SequenceIndexChunksLeftToWriteCounters, Ops, - OutChunksComplete, - OutBytesWritten); + DiskStats, + WriteChunkStats); return true; } return false; @@ -3925,8 +4123,8 @@ namespace { Lookup, SequenceIndexChunksLeftToWriteCounters, Ops, - OutChunksComplete, - OutBytesWritten); + DiskStats, + WriteChunkStats); return true; } return false; @@ -3941,8 +4139,8 @@ namespace { uint32_t LastIncludedBlockChunkIndex, const ChunkedContentLookup& Lookup, std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, - std::atomic<uint32_t>& OutChunksComplete, - std::atomic<uint64_t>& OutBytesWritten) + DiskStatistics& DiskStats, + WriteChunkStatistics& WriteChunkStats) { ZEN_TRACE_CPU("WritePartialBlockToDisk"); BlockWriteOps Ops; @@ -3963,8 +4161,8 @@ namespace { Lookup, SequenceIndexChunksLeftToWriteCounters, Ops, - OutChunksComplete, - OutBytesWritten); + DiskStats, + WriteChunkStats); return true; } else @@ -4012,8 +4210,7 @@ namespace { const ChunkedContentLookup& Lookup, std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> ChunkTargets, CompositeBuffer&& ChunkData, - WriteFileCache& OpenFileCache, - std::atomic<uint64_t>& OutBytesWritten) + WriteFileCache& OpenFileCache) { ZEN_TRACE_CPU("WriteChunkToDisk"); @@ -4032,7 +4229,6 @@ namespace { ChunkData, FileOffset, Content.RawSizes[PathIndex]); - OutBytesWritten += ChunkData.GetSize(); } } @@ -4053,7 +4249,8 @@ namespace { void StreamDecompress(const std::filesystem::path& CacheFolderPath, const IoHash& SequenceRawHash, CompositeBuffer&& CompressedPart, - std::atomic<uint64_t>& WriteToDiskBytes) + DiskStatistics& DiskStats, + WriteChunkStatistics& WriteChunkStats) { ZEN_TRACE_CPU("StreamDecompress"); const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash); @@ -4076,21 +4273,29 @@ namespace { { throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, SequenceRawHash)); } + IoHashStream Hash; - bool CouldDecompress = Compressed.DecompressToStream(0, (uint64_t)-1, [&](uint64_t Offset, const CompositeBuffer& RangeBuffer) { - ZEN_TRACE_CPU("StreamDecompress_Write"); - if (!AbortFlag) - { - DecompressedTemp.Write(RangeBuffer, Offset); - for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) + bool CouldDecompress = Compressed.DecompressToStream( + 0, + (uint64_t)-1, + [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { + ZEN_UNUSED(SourceOffset); + ZEN_TRACE_CPU("StreamDecompress_Write"); + DiskStats.ReadByteCount += SourceSize; + if (!AbortFlag) { - Hash.Append(Segment.GetView()); + WriteChunkStats.ChunkBytesWritten += RangeBuffer.GetSize(); + DecompressedTemp.Write(RangeBuffer, Offset); + for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) + { + Hash.Append(Segment.GetView()); + } + DiskStats.WriteByteCount += RangeBuffer.GetSize(); + DiskStats.WriteCount++; + return true; } - WriteToDiskBytes += RangeBuffer.GetSize(); - return true; - } - return false; - }); + return false; + }); if (AbortFlag) { @@ -4113,6 +4318,7 @@ namespace { throw std::runtime_error( fmt::format("Failed moving temporary file for decompressing large blob {}. Reason: {}", SequenceRawHash, Ec.message())); } + WriteChunkStats.ChunkCountWritten++; } bool WriteCompressedChunk(const std::filesystem::path& TargetFolder, @@ -4121,32 +4327,34 @@ namespace { const IoHash& ChunkHash, const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs, IoBuffer&& CompressedPart, - std::atomic<uint64_t>& WriteToDiskBytes) + DiskStatistics& DiskStats, + WriteChunkStatistics& WriteChunkStats) { auto ChunkHashToChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); ZEN_ASSERT(ChunkHashToChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end()); + const uint32_t ChunkIndex = ChunkHashToChunkIndexIt->second; + const uint64_t ChunkRawSize = RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; if (CanDecompressDirectToSequence(RemoteContent, ChunkTargetPtrs)) { - const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[ChunkTargetPtrs.front()->SequenceIndex]; - StreamDecompress(TargetFolder, SequenceRawHash, CompositeBuffer(std::move(CompressedPart)), WriteToDiskBytes); + const std::uint32_t SequenceIndex = ChunkTargetPtrs.front()->SequenceIndex; + const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]; + StreamDecompress(TargetFolder, SequenceRawHash, CompositeBuffer(std::move(CompressedPart)), DiskStats, WriteChunkStats); } else { - const uint32_t ChunkIndex = ChunkHashToChunkIndexIt->second; - SharedBuffer Chunk = - Decompress(CompositeBuffer(std::move(CompressedPart)), ChunkHash, RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]); + SharedBuffer Chunk = Decompress(CompositeBuffer(std::move(CompressedPart)), ChunkHash, ChunkRawSize); if (!AbortFlag) { - WriteFileCache OpenFileCache; - + WriteFileCache OpenFileCache(DiskStats); WriteChunkToDisk(TargetFolder, RemoteContent, RemoteLookup, ChunkTargetPtrs, CompositeBuffer(std::move(Chunk)), - OpenFileCache, - WriteToDiskBytes); + OpenFileCache); + WriteChunkStats.ChunkCountWritten++; + WriteChunkStats.ChunkBytesWritten += ChunkRawSize; return true; } } @@ -4198,19 +4406,17 @@ namespace { WorkerThreadPool& WritePool, IoBuffer&& Payload, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, - std::atomic<uint64_t>& WriteToDiskBytes, - std::atomic<uint32_t>& ChunkCountWritten, std::atomic<uint64_t>& WritePartsComplete, - std::atomic<uint64_t>& TotalPartWriteCount, - std::atomic<uint64_t>& LooseChunksBytes, - FilteredRate& FilteredWrittenBytesPerSecond) + const uint64_t TotalPartWriteCount, + FilteredRate& FilteredWrittenBytesPerSecond, + DiskStatistics& DiskStats, + WriteChunkStatistics& WriteChunkStats) { ZEN_TRACE_CPU("AsyncWriteDownloadedChunk"); const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; - uint64_t Size = Payload.GetSize(); - LooseChunksBytes += Size; + const uint64_t Size = Payload.GetSize(); std::filesystem::path CompressedChunkPath; @@ -4256,6 +4462,7 @@ namespace { SequenceIndexChunksLeftToWriteCounters, CompressedChunkPath, RemoteChunkIndex, + TotalPartWriteCount, ChunkTargetPtrs = std::move(ChunkTargetPtrs), CompressedPart = std::move(Payload)](std::atomic<bool>&) mutable { ZEN_TRACE_CPU("UpdateFolder_WriteChunk"); @@ -4288,11 +4495,11 @@ namespace { ChunkHash, ChunkTargetPtrs, std::move(CompressedPart), - WriteToDiskBytes); + DiskStats, + WriteChunkStats); if (!AbortFlag) { - ChunkCountWritten++; WritePartsComplete++; if (WritePartsComplete == TotalPartWriteCount) { @@ -4324,19 +4531,16 @@ namespace { const std::vector<IoHash>& LooseChunkHashes, bool AllowPartialBlockRequests, bool WipeTargetFolder, - FolderContent& OutLocalFolderState) + FolderContent& OutLocalFolderState, + DiskStatistics& DiskStats, + CacheMappingStatistics& CacheMappingStats, + DownloadStatistics& DownloadStats, + WriteChunkStatistics& WriteChunkStats, + RebuildFolderStateStatistics& RebuildFolderStateStats) { ZEN_TRACE_CPU("UpdateFolder"); ZEN_UNUSED(WipeTargetFolder); - std::atomic<uint64_t> DownloadedBlocks = 0; - std::atomic<uint64_t> BlockBytes = 0; - std::atomic<uint64_t> DownloadedChunks = 0; - std::atomic<uint64_t> LooseChunksBytes = 0; - std::atomic<uint64_t> WriteToDiskBytes = 0; - std::atomic<uint64_t> MultipartAttachmentCount = 0; - - DiskStatistics DiskStats; Stopwatch IndexTimer; @@ -4351,15 +4555,11 @@ namespace { Stopwatch CacheMappingTimer; std::vector<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters(RemoteContent.ChunkedContent.SequenceRawHashes.size()); - // std::vector<bool> RemoteSequenceIndexIsCachedFlags(RemoteContent.ChunkedContent.SequenceRawHashes.size(), false); - std::vector<bool> RemoteChunkIndexNeedsCopyFromLocalFileFlags(RemoteContent.ChunkedContent.ChunkHashes.size()); - // Guard if he same chunks is in multiple blocks (can happen due to block reuse, cache reuse blocks writes directly) - std::vector<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags(RemoteContent.ChunkedContent.ChunkHashes.size()); + std::vector<bool> RemoteChunkIndexNeedsCopyFromLocalFileFlags(RemoteContent.ChunkedContent.ChunkHashes.size()); + std::vector<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags(RemoteContent.ChunkedContent.ChunkHashes.size()); tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedChunkHashesFound; tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedSequenceHashesFound; - uint64_t CachedChunkHashesByteCountFound = 0; - uint64_t CachedSequenceHashesByteCountFound = 0; { ZEN_TRACE_CPU("UpdateFolder_CheckChunkCache"); @@ -4380,7 +4580,8 @@ namespace { if (ChunkSize == CacheDirContent.FileSizes[Index]) { CachedChunkHashesFound.insert({FileHash, ChunkIndex}); - CachedChunkHashesByteCountFound += ChunkSize; + CacheMappingStats.CacheChunkCount++; + CacheMappingStats.CacheChunkByteCount += ChunkSize; continue; } } @@ -4393,7 +4594,8 @@ namespace { if (SequenceSize == CacheDirContent.FileSizes[Index]) { CachedSequenceHashesFound.insert({FileHash, SequenceIndex}); - CachedSequenceHashesByteCountFound += SequenceSize; + CacheMappingStats.CacheSequenceHashesCount += SequenceSize; + CacheMappingStats.CacheSequenceHashesByteCount++; continue; } } @@ -4403,7 +4605,6 @@ namespace { } tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedBlocksFound; - uint64_t CachedBlocksByteCountFound = 0; { ZEN_TRACE_CPU("UpdateFolder_CheckBlockCache"); @@ -4438,7 +4639,8 @@ namespace { if (BlockSize == BlockDirContent.FileSizes[Index]) { CachedBlocksFound.insert({FileHash, BlockIndex}); - CachedBlocksByteCountFound += BlockSize; + CacheMappingStats.CacheBlockCount++; + CacheMappingStats.CacheBlocksByteCount += BlockSize; continue; } } @@ -4448,7 +4650,6 @@ namespace { } std::vector<uint32_t> LocalPathIndexesMatchingSequenceIndexes; - uint64_t LocalPathIndexesByteCountMatchingSequenceIndexes = 0; // Pick up all whole files we can use from current local state { ZEN_TRACE_CPU("UpdateFolder_CheckLocalChunks"); @@ -4477,7 +4678,8 @@ namespace { const uint32_t LocalPathIndex = GetFirstPathIndexForSeqeuenceIndex(LocalLookup, LocalSequenceIndex); uint64_t RawSize = LocalContent.RawSizes[LocalPathIndex]; LocalPathIndexesMatchingSequenceIndexes.push_back(LocalPathIndex); - LocalPathIndexesByteCountMatchingSequenceIndexes += RawSize; + CacheMappingStats.LocalPathsMatchingSequencesCount++; + CacheMappingStats.LocalPathsMatchingSequencesByteCount += RawSize; } else { @@ -4495,7 +4697,7 @@ namespace { struct ChunkTarget { uint32_t TargetChunkLocationCount = (uint32_t)-1; - uint64_t ChunkRawSize = (uint64_t)-1; + uint32_t RemoteChunkIndex = (uint32_t)-1; uint64_t CacheFileOffset = (uint64_t)-1; }; std::vector<ChunkTarget> ChunkTargets; @@ -4503,8 +4705,6 @@ namespace { tsl::robin_map<IoHash, size_t, IoHash::Hasher> RawHashToCacheCopyDataIndex; std::vector<CacheCopyData> CacheCopyDatas; - uint64_t LocalChunkHashesMatchingRemoteCount = 0; - uint64_t LocalChunkHashesMatchingRemoteByteCount = 0; { ZEN_TRACE_CPU("UpdateFolder_GetLocalChunks"); @@ -4537,7 +4737,7 @@ namespace { { CacheCopyData::ChunkTarget Target = { .TargetChunkLocationCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()), - .ChunkRawSize = LocalChunkRawSize, + .RemoteChunkIndex = RemoteChunkIndex, .CacheFileOffset = SourceOffset}; if (auto CopySourceIt = RawHashToCacheCopyDataIndex.find(LocalSequenceRawHash); CopySourceIt != RawHashToCacheCopyDataIndex.end()) @@ -4567,8 +4767,8 @@ namespace { .TargetChunkLocationPtrs = ChunkTargetPtrs, .ChunkTargets = std::vector<CacheCopyData::ChunkTarget>{Target}}); } - LocalChunkHashesMatchingRemoteByteCount += LocalChunkRawSize; - LocalChunkHashesMatchingRemoteCount++; + CacheMappingStats.LocalChunkMatchingRemoteCount++; + CacheMappingStats.LocalChunkMatchingRemoteByteCount += LocalChunkRawSize; RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex] = true; } } @@ -4580,20 +4780,20 @@ namespace { } if (!CachedSequenceHashesFound.empty() || !CachedChunkHashesFound.empty() || !CachedBlocksFound.empty() || - !LocalPathIndexesMatchingSequenceIndexes.empty() || LocalChunkHashesMatchingRemoteCount > 0) + !LocalPathIndexesMatchingSequenceIndexes.empty() || CacheMappingStats.LocalChunkMatchingRemoteCount > 0) { ZEN_CONSOLE( "Cache: {} ({}) chunk sequences, {} ({}) chunks, {} ({}) blocks. Local state: {} ({}) chunk sequences, {} ({}) chunks", CachedSequenceHashesFound.size(), - NiceBytes(CachedSequenceHashesByteCountFound), + NiceBytes(CacheMappingStats.CacheSequenceHashesByteCount), CachedChunkHashesFound.size(), - NiceBytes(CachedChunkHashesByteCountFound), + NiceBytes(CacheMappingStats.CacheChunkByteCount), CachedBlocksFound.size(), - NiceBytes(CachedBlocksByteCountFound), + NiceBytes(CacheMappingStats.CacheBlocksByteCount), LocalPathIndexesMatchingSequenceIndexes.size(), - NiceBytes(LocalPathIndexesByteCountMatchingSequenceIndexes), - LocalChunkHashesMatchingRemoteCount, - NiceBytes(LocalChunkHashesMatchingRemoteByteCount)); + NiceBytes(CacheMappingStats.LocalPathsMatchingSequencesByteCount), + CacheMappingStats.LocalChunkMatchingRemoteCount, + NiceBytes(CacheMappingStats.LocalChunkMatchingRemoteByteCount)); } uint32_t ChunkCountToWrite = 0; @@ -4616,9 +4816,7 @@ namespace { } uint64_t TotalRequestCount = 0; - std::atomic<uint64_t> RequestsComplete = 0; - std::atomic<uint32_t> ChunkCountWritten = 0; - std::atomic<uint64_t> TotalPartWriteCount = 0; + uint64_t TotalPartWriteCount = 0; std::atomic<uint64_t> WritePartsComplete = 0; { @@ -4635,8 +4833,6 @@ namespace { ProgressBar WriteProgressBar(UsePlainProgress); ParallellWork Work(AbortFlag); - std::atomic<uint64_t> BytesDownloaded = 0; - struct LooseChunkHashWorkData { std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs; @@ -4718,12 +4914,6 @@ namespace { std::vector<uint32_t> FullBlockWorks; - size_t BlocksNeededCount = 0; - uint64_t AllBlocksSize = 0; - uint64_t AllBlocksFetch = 0; - uint64_t AllBlocksSlack = 0; - uint64_t AllBlockRequests = 0; - uint64_t AllBlockChunksSize = 0; for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; BlockIndex++) { const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; @@ -4779,7 +4969,6 @@ namespace { } else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset]) { - AllBlockChunksSize += ChunkCompressedLength; if (NextRange.RangeLength == 0) { NextRange.RangeStart = CurrentOffset; @@ -4796,7 +4985,6 @@ namespace { ZEN_ASSERT(false); } } - AllBlocksSize += CurrentOffset; if (NextRange.RangeLength > 0) { BlockRanges.push_back(NextRange); @@ -4806,7 +4994,6 @@ namespace { std::vector<BlockRangeDescriptor> CollapsedBlockRanges; auto It = BlockRanges.begin(); CollapsedBlockRanges.push_back(*It++); - uint64_t TotalSlack = 0; while (It != BlockRanges.end()) { BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back(); @@ -4817,7 +5004,6 @@ namespace { LastRange.ChunkBlockIndexCount = (It->ChunkBlockIndexStart + It->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart; LastRange.RangeLength = (It->RangeStart + It->RangeLength) - LastRange.RangeStart; - TotalSlack += Slack; } else { @@ -4826,17 +5012,6 @@ namespace { ++It; } - uint64_t TotalFetch = 0; - for (const BlockRangeDescriptor& Range : CollapsedBlockRanges) - { - TotalFetch += Range.RangeLength; - } - - AllBlocksFetch += TotalFetch; - AllBlocksSlack += TotalSlack; - BlocksNeededCount++; - AllBlockRequests += CollapsedBlockRanges.size(); - TotalRequestCount += CollapsedBlockRanges.size(); TotalPartWriteCount += CollapsedBlockRanges.size(); @@ -4844,7 +5019,6 @@ namespace { } else { - BlocksNeededCount++; TotalRequestCount++; TotalPartWriteCount++; @@ -4888,7 +5062,7 @@ namespace { { const ChunkedContentLookup::ChunkSequenceLocation* Target = nullptr; uint64_t CacheFileOffset = (uint64_t)-1; - uint64_t ChunkSize = (uint64_t)-1; + uint32_t ChunkIndex = (uint32_t)-1; }; std::vector<WriteOp> WriteOps; @@ -4905,7 +5079,7 @@ namespace { { WriteOps.push_back(WriteOp{.Target = Target, .CacheFileOffset = ChunkTarget.CacheFileOffset, - .ChunkSize = ChunkTarget.ChunkRawSize}); + .ChunkIndex = ChunkTarget.RemoteChunkIndex}); } TargetStart += ChunkTarget.TargetChunkLocationCount; } @@ -4931,8 +5105,10 @@ namespace { { ZEN_TRACE_CPU("Write"); - BufferedOpenFile SourceFile(LocalFilePath); - WriteFileCache OpenFileCache; + tsl::robin_set<uint32_t> ChunkIndexesWritten; + + BufferedOpenFile SourceFile(LocalFilePath, DiskStats); + WriteFileCache OpenFileCache(DiskStats); for (const WriteOp& Op : WriteOps) { if (AbortFlag) @@ -4944,7 +5120,7 @@ namespace { RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]); ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() > 0); const uint32_t RemotePathIndex = RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]; - const uint64_t ChunkSize = Op.ChunkSize; + const uint64_t ChunkSize = RemoteContent.ChunkedContent.ChunkRawSizes[Op.ChunkIndex]; CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ChunkSize); ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]); @@ -4959,7 +5135,13 @@ namespace { ChunkSource, Op.Target->Offset, RemoteContent.RawSizes[RemotePathIndex]); - WriteToDiskBytes += ChunkSize; + + if (ChunkIndexesWritten.insert(Op.ChunkIndex).second) + { + WriteChunkStats.ChunkCountWritten++; + WriteChunkStats.ChunkBytesWritten += ChunkSize; + } + CacheLocalFileBytesRead += ChunkSize; // TODO: This should be the sum of unique chunk sizes? } } @@ -4992,8 +5174,6 @@ namespace { GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)); } } - - ChunkCountWritten += gsl::narrow<uint32_t>(CopyData.ChunkTargets.size()); ZEN_DEBUG("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), LocalContent.Paths[LocalPathIndex]); } WritePartsComplete++; @@ -5038,9 +5218,8 @@ namespace { uint64_t RawSize; if (CompressedBuffer::ValidateCompressedHeader(ExistingCompressedPart, RawHash, RawSize)) { - LooseChunksBytes += ExistingCompressedPart.GetSize(); - RequestsComplete++; - if (RequestsComplete == TotalRequestCount) + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) { FilteredDownloadedBytesPerSecond.Stop(); } @@ -5063,8 +5242,8 @@ namespace { &RemoteLookup, &CacheFolderPath, &SequenceIndexChunksLeftToWriteCounters, - &WriteToDiskBytes, - &ChunkCountWritten, + &DiskStats, + &WriteChunkStats, &WritePartsComplete, &TotalPartWriteCount, &FilteredWrittenBytesPerSecond, @@ -5094,12 +5273,15 @@ namespace { ChunkHash, ChunkTargetPtrs, std::move(CompressedPart), - WriteToDiskBytes); + DiskStats, + WriteChunkStats); + WriteChunkStats.ChunkCountWritten++; + WriteChunkStats.ChunkBytesWritten += + RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex]; + WritePartsComplete++; if (!AbortFlag) { - ChunkCountWritten++; - WritePartsComplete++; if (WritePartsComplete == TotalPartWriteCount) { FilteredWrittenBytesPerSecond.Stop(); @@ -5132,11 +5314,10 @@ namespace { PreferredMultipartChunkSize, Work, NetworkPool, - BytesDownloaded, - MultipartAttachmentCount, + DownloadStats, [&, RemoteChunkIndex, ChunkTargetPtrs](IoBuffer&& Payload) mutable { - RequestsComplete++; - if (RequestsComplete == TotalRequestCount) + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) { FilteredDownloadedBytesPerSecond.Stop(); } @@ -5149,12 +5330,11 @@ namespace { WritePool, std::move(Payload), SequenceIndexChunksLeftToWriteCounters, - WriteToDiskBytes, - ChunkCountWritten, WritePartsComplete, TotalPartWriteCount, - LooseChunksBytes, - FilteredWrittenBytesPerSecond); + FilteredWrittenBytesPerSecond, + DiskStats, + WriteChunkStats); }); } else @@ -5167,10 +5347,10 @@ namespace { throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash)); } uint64_t BlobSize = BuildBlob.GetSize(); - BytesDownloaded += BlobSize; - - RequestsComplete++; - if (RequestsComplete == TotalRequestCount) + DownloadStats.DownloadedChunkCount++; + DownloadStats.DownloadedChunkByteCount += BlobSize; + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) { FilteredDownloadedBytesPerSecond.Stop(); } @@ -5183,12 +5363,11 @@ namespace { WritePool, std::move(BuildBlob), SequenceIndexChunksLeftToWriteCounters, - WriteToDiskBytes, - ChunkCountWritten, WritePartsComplete, TotalPartWriteCount, - LooseChunksBytes, - FilteredWrittenBytesPerSecond); + FilteredWrittenBytesPerSecond, + DiskStats, + WriteChunkStats); } } } @@ -5228,8 +5407,8 @@ namespace { CompositeBuffer(std::move(BlockBuffer)), RemoteLookup, RemoteChunkIndexNeedsCopyFromSourceFlags, - ChunkCountWritten, - WriteToDiskBytes)) + DiskStats, + WriteChunkStats)) { std::error_code DummyEc; std::filesystem::remove(BlockChunkPath, DummyEc); @@ -5272,11 +5451,10 @@ namespace { throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); } uint64_t BlockSize = BlockBuffer.GetSize(); - BytesDownloaded += BlockSize; - BlockBytes += BlockSize; - DownloadedBlocks++; - RequestsComplete++; - if (RequestsComplete == TotalRequestCount) + DownloadStats.DownloadedBlockCount++; + DownloadStats.DownloadedBlockByteCount += BlockSize; + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) { FilteredDownloadedBytesPerSecond.Stop(); } @@ -5367,21 +5545,21 @@ namespace { BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1, RemoteLookup, RemoteChunkIndexNeedsCopyFromSourceFlags, - ChunkCountWritten, - WriteToDiskBytes)) + DiskStats, + WriteChunkStats)) { std::error_code DummyEc; std::filesystem::remove(BlockChunkPath, DummyEc); throw std::runtime_error( fmt::format("Partial block {} is malformed", BlockDescription.BlockHash)); } - WritePartsComplete++; if (!BlockChunkPath.empty()) { std::filesystem::remove(BlockChunkPath); } + WritePartsComplete++; if (WritePartsComplete == TotalPartWriteCount) { FilteredWrittenBytesPerSecond.Stop(); @@ -5417,11 +5595,10 @@ namespace { throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); } uint64_t BlockSize = BlockBuffer.GetSize(); - BytesDownloaded += BlockSize; - BlockBytes += BlockSize; - DownloadedBlocks++; - RequestsComplete++; - if (RequestsComplete == TotalRequestCount) + DownloadStats.DownloadedBlockCount++; + DownloadStats.DownloadedBlockByteCount += BlockSize; + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) { FilteredDownloadedBytesPerSecond.Stop(); } @@ -5476,8 +5653,8 @@ namespace { &SequenceIndexChunksLeftToWriteCounters, BlockIndex, &BlockDescriptions, - &ChunkCountWritten, - &WriteToDiskBytes, + &WriteChunkStats, + &DiskStats, &WritePartsComplete, &TotalPartWriteCount, &FilteredWrittenBytesPerSecond, @@ -5513,20 +5690,21 @@ namespace { CompositeBuffer(std::move(BlockBuffer)), RemoteLookup, RemoteChunkIndexNeedsCopyFromSourceFlags, - ChunkCountWritten, - WriteToDiskBytes)) + DiskStats, + WriteChunkStats)) { std::error_code DummyEc; std::filesystem::remove(BlockChunkPath, DummyEc); throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash)); } - WritePartsComplete++; if (!BlockChunkPath.empty()) { std::filesystem::remove(BlockChunkPath); } + WritePartsComplete++; + if (WritePartsComplete == TotalPartWriteCount) { FilteredWrittenBytesPerSecond.Stop(); @@ -5540,35 +5718,32 @@ namespace { Work.DefaultErrorFunction()); } - ZEN_DEBUG("Fetching {} with {} slack (ideal {}) out of {} using {} requests for {} blocks", - NiceBytes(AllBlocksFetch), - NiceBytes(AllBlocksSlack), - NiceBytes(AllBlockChunksSize), - NiceBytes(AllBlocksSize), - AllBlockRequests, - BlocksNeededCount); { ZEN_TRACE_CPU("WriteChunks_Wait"); Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted, PendingWork); - ZEN_ASSERT(ChunkCountToWrite >= ChunkCountWritten.load()); - FilteredWrittenBytesPerSecond.Update(WriteToDiskBytes.load()); - FilteredDownloadedBytesPerSecond.Update(BytesDownloaded.load()); + ZEN_ASSERT(ChunkCountToWrite >= WriteChunkStats.ChunkCountWritten.load()); + uint64_t DownloadedBytes = DownloadStats.DownloadedChunkByteCount.load() + + DownloadStats.DownloadedBlockByteCount.load() + + +DownloadStats.DownloadedPartialBlockByteCount.load(); + FilteredWrittenBytesPerSecond.Update(DiskStats.WriteByteCount.load()); + FilteredDownloadedBytesPerSecond.Update(DownloadedBytes); std::string Details = fmt::format("{}/{} ({} {}bits/s) downloaded. {}/{} ({} {}B/s) written.", - RequestsComplete.load(), + DownloadStats.RequestsCompleteCount.load(), TotalRequestCount, - NiceBytes(BytesDownloaded.load()), + NiceBytes(DownloadedBytes), NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8), - ChunkCountWritten.load(), + WriteChunkStats.ChunkCountWritten.load(), ChunkCountToWrite, - NiceBytes(WriteToDiskBytes.load()), + NiceBytes(DiskStats.WriteByteCount.load()), NiceNum(FilteredWrittenBytesPerSecond.GetCurrent())); - WriteProgressBar.UpdateState({.Task = "Writing chunks ", - .Details = Details, - .TotalCount = gsl::narrow<uint64_t>(ChunkCountToWrite), - .RemainingCount = gsl::narrow<uint64_t>(ChunkCountToWrite - ChunkCountWritten.load())}, - false); + WriteProgressBar.UpdateState( + {.Task = "Writing chunks ", + .Details = Details, + .TotalCount = gsl::narrow<uint64_t>(ChunkCountToWrite), + .RemainingCount = gsl::narrow<uint64_t>(ChunkCountToWrite - WriteChunkStats.ChunkCountWritten.load())}, + false); }); } @@ -5598,25 +5773,21 @@ namespace { } ZEN_ASSERT(RawSequencesMissingWriteCount == 0); + const uint64_t DownloadedBytes = DownloadStats.DownloadedChunkByteCount.load() + DownloadStats.DownloadedBlockByteCount.load() + + +DownloadStats.DownloadedPartialBlockByteCount.load(); ZEN_CONSOLE("Downloaded {} ({}bits/s) in {}. Wrote {} ({}B/s) in {}. Completed in {}", - NiceBytes(BytesDownloaded.load()), - NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), BytesDownloaded * 8)), + NiceBytes(DownloadedBytes), + NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), DownloadedBytes * 8)), NiceTimeSpanMs(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS() / 1000), - NiceBytes(WriteToDiskBytes.load()), - NiceNum(GetBytesPerSecond(FilteredWrittenBytesPerSecond.GetElapsedTimeUS(), WriteToDiskBytes.load())), + NiceBytes(DiskStats.WriteByteCount.load()), + NiceNum(GetBytesPerSecond(FilteredWrittenBytesPerSecond.GetElapsedTimeUS(), DiskStats.WriteByteCount.load())), NiceTimeSpanMs(FilteredWrittenBytesPerSecond.GetElapsedTimeUS() / 1000), NiceTimeSpanMs(WriteTimer.GetElapsedTimeMs())); - } - std::vector<std::pair<IoHash, uint32_t>> Targets; - Targets.reserve(RemoteContent.Paths.size()); - for (uint32_t RemotePathIndex = 0; RemotePathIndex < RemoteContent.Paths.size(); RemotePathIndex++) - { - Targets.push_back(std::make_pair(RemoteContent.RawHashes[RemotePathIndex], RemotePathIndex)); + WriteChunkStats.WriteChunksElapsedWallTimeUs = WriteTimer.GetElapsedTimeUs(); + WriteChunkStats.DownloadTimeUs = FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(); + WriteChunkStats.WriteTimeUs = FilteredWrittenBytesPerSecond.GetElapsedTimeUS(); } - std::sort(Targets.begin(), Targets.end(), [](const std::pair<IoHash, uint32_t>& Lhs, const std::pair<IoHash, uint32_t>& Rhs) { - return Lhs.first < Rhs.first; - }); // Move all files we will reuse to cache folder // TODO: If WipeTargetFolder is false we could check which files are already correct and leave them in place @@ -5643,6 +5814,7 @@ namespace { if (WipeTargetFolder) { ZEN_TRACE_CPU("UpdateFolder_WipeTarget"); + Stopwatch Timer; // Clean target folder ZEN_CONSOLE("Wiping {}", Path); @@ -5650,10 +5822,12 @@ namespace { { ZEN_WARN("Some files in {} could not be removed", Path); } + RebuildFolderStateStats.CleanFolderElapsedWallTimeUs = Timer.GetElapsedTimeUs(); } else { ZEN_TRACE_CPU("UpdateFolder_RemoveUnused"); + Stopwatch Timer; // Remove unused tracked files tsl::robin_map<std::string, uint32_t> RemotePathToRemoteIndex; @@ -5683,10 +5857,12 @@ namespace { std::filesystem::remove(LocalFilePath); } } + RebuildFolderStateStats.CleanFolderElapsedWallTimeUs = Timer.GetElapsedTimeUs(); } { ZEN_TRACE_CPU("UpdateFolder_FinalizeTree"); + Stopwatch Timer; WorkerThreadPool& WritePool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // @@ -5700,6 +5876,16 @@ namespace { std::atomic<uint64_t> TargetsComplete = 0; + std::vector<std::pair<IoHash, uint32_t>> Targets; + Targets.reserve(RemoteContent.Paths.size()); + for (uint32_t RemotePathIndex = 0; RemotePathIndex < RemoteContent.Paths.size(); RemotePathIndex++) + { + Targets.push_back(std::make_pair(RemoteContent.RawHashes[RemotePathIndex], RemotePathIndex)); + } + std::sort(Targets.begin(), Targets.end(), [](const std::pair<IoHash, uint32_t>& Lhs, const std::pair<IoHash, uint32_t>& Rhs) { + return Lhs.first < Rhs.first; + }); + size_t TargetOffset = 0; while (TargetOffset < Targets.size()) { @@ -5753,6 +5939,7 @@ namespace { SetFileReadOnly(FirstTargetFilePath, false); } std::filesystem::rename(CacheFilePath, FirstTargetFilePath); + RebuildFolderStateStats.FinalizeTreeFilesMovedCount++; } OutLocalFolderState.Attributes[FirstTargetPathIndex] = @@ -5781,6 +5968,7 @@ namespace { SetFileReadOnly(ExtraTargetFilePath, false); } CopyFile(FirstTargetFilePath, ExtraTargetFilePath, {.EnableClone = false}); + RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++; OutLocalFolderState.Attributes[ExtraTargetPathIndex] = RemoteContent.Attributes.empty() @@ -5815,6 +6003,8 @@ namespace { }); } + RebuildFolderStateStats.FinalizeTreeElapsedWallTimeUs = Timer.GetElapsedTimeUs(); + if (AbortFlag) { return; @@ -6474,13 +6664,21 @@ namespace { } else { - ExtendableStringBuilder<128> SB; + ExtendableStringBuilder<128> BuildPartString; for (const std::pair<Oid, std::string>& BuildPart : AllBuildParts) { - SB.Append(fmt::format(" {} ({})", BuildPart.second, BuildPart.first)); + BuildPartString.Append(fmt::format(" {} ({})", BuildPart.second, BuildPart.first)); } - ZEN_CONSOLE("Downloading build {}, parts:{}", BuildId, SB.ToView()); + ZEN_CONSOLE("Downloading build {}, parts:{}", BuildId, BuildPartString.ToView()); FolderContent LocalFolderState; + + DiskStatistics DiskStats; + CacheMappingStatistics CacheMappingStats; + DownloadStatistics DownloadStats; + WriteChunkStatistics WriteChunkStats; + RebuildFolderStateStatistics RebuildFolderStateStats; + VerifyFolderStatistics VerifyFolderStats; + UpdateFolder(Storage, BuildId, Path, @@ -6492,11 +6690,16 @@ namespace { LooseChunkHashes, AllowPartialBlockRequests, WipeTargetFolder, - LocalFolderState); + LocalFolderState, + DiskStats, + CacheMappingStats, + DownloadStats, + WriteChunkStats, + RebuildFolderStateStats); if (!AbortFlag) { - VerifyFolder(RemoteContent, Path, PostDownloadVerify); + VerifyFolder(RemoteContent, Path, PostDownloadVerify, VerifyFolderStats); Stopwatch WriteStateTimer; CbObject StateObject = CreateStateObject(BuildId, AllBuildParts, PartContents, LocalFolderState); @@ -6510,8 +6713,37 @@ namespace { CompactBinaryToJson(StateObject, SB); WriteFile(Path / ZenStateFileJsonPath, IoBuffer(IoBuffer::Wrap, SB.Data(), SB.Size())); #endif // 0 + const uint64_t DownloadCount = DownloadStats.DownloadedChunkCount.load() + DownloadStats.DownloadedBlockCount.load() + + DownloadStats.DownloadedPartialBlockCount.load(); + const uint64_t DownloadByteCount = DownloadStats.DownloadedChunkByteCount.load() + + DownloadStats.DownloadedBlockByteCount.load() + + DownloadStats.DownloadedPartialBlockByteCount.load(); + const uint64_t DownloadTimeMs = DownloadTimer.GetElapsedTimeMs(); + + ZEN_CONSOLE( + "Downloaded build {}, parts:{} in {}\n" + " Download: {} ({}) {}bits/s\n" + " Write: {} ({}) {}B/s\n" + " Clean: {}\n" + " Finalize: {}\n" + " Verify: {}", + BuildId, + BuildPartString.ToView(), + NiceTimeSpanMs(DownloadTimeMs), + + DownloadCount, + NiceBytes(DownloadByteCount), + NiceNum(GetBytesPerSecond(WriteChunkStats.DownloadTimeUs, DownloadByteCount * 8)), - ZEN_CONSOLE("Downloaded build in {}.", NiceTimeSpanMs(DownloadTimer.GetElapsedTimeMs())); + DiskStats.WriteCount.load(), + NiceBytes(DiskStats.WriteByteCount.load()), + NiceNum(GetBytesPerSecond(WriteChunkStats.WriteTimeUs, DiskStats.WriteByteCount.load())), + + NiceTimeSpanMs(RebuildFolderStateStats.CleanFolderElapsedWallTimeUs / 1000), + + NiceTimeSpanMs(RebuildFolderStateStats.FinalizeTreeElapsedWallTimeUs / 1000), + + NiceTimeSpanMs(VerifyFolderStats.VerifyElapsedWallTimeUs / 1000)); } } if (CleanDirectory(ZenTempFolder, {})) @@ -7504,6 +7736,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); } + Stopwatch Timer; for (const std::string& BuildIdString : m_BuildIds) { Oid BuildId = Oid::FromHexString(BuildIdString); @@ -7527,6 +7760,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } ZEN_CONSOLE("\n"); } + ZEN_CONSOLE("Completed in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs())); return 0; } @@ -7699,27 +7933,29 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) case 0: { uint64_t SourceSize = *FileSizeIt; - if (SourceSize > 0) + if (SourceSize > 256) { Work.ScheduleWork( GetMediumWorkerPool(EWorkloadType::Burst), [SourceSize, FilePath](std::atomic<bool>&) { if (!AbortFlag) { - IoBuffer Scrambled(SourceSize); + bool IsReadOnly = SetFileReadOnly(FilePath, false); { - IoBuffer Source = IoBufferBuilder::MakeFromFile(FilePath); - Scrambled.GetMutableView().CopyFrom( - Source.GetView().Mid(SourceSize / 3, SourceSize / 3)); - Scrambled.GetMutableView() - .Mid(SourceSize / 3) - .CopyFrom(Source.GetView().Mid(0, SourceSize / 3)); - Scrambled.GetMutableView() - .Mid((SourceSize / 3) * 2) - .CopyFrom(Source.GetView().Mid(SourceSize / 2, SourceSize / 3)); + BasicFile Source(FilePath, BasicFile::Mode::kWrite); + uint64_t RangeSize = Min(SourceSize / 3, 512u * 1024u); + IoBuffer TempBuffer1(RangeSize); + IoBuffer TempBuffer2(RangeSize); + IoBuffer TempBuffer3(RangeSize); + Source.Read(TempBuffer1.GetMutableView().GetData(), RangeSize, 0); + Source.Read(TempBuffer2.GetMutableView().GetData(), RangeSize, SourceSize / 2); + Source.Read(TempBuffer3.GetMutableView().GetData(), + RangeSize, + SourceSize - RangeSize); + Source.Write(TempBuffer1, SourceSize / 2); + Source.Write(TempBuffer2, SourceSize - RangeSize); + Source.Write(TempBuffer3, SourceSize - 0); } - bool IsReadOnly = SetFileReadOnly(FilePath, false); - WriteFile(FilePath, Scrambled); if (IsReadOnly) { SetFileReadOnly(FilePath, true); @@ -7957,7 +8193,9 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } Oid BuildPartId = Oid::TryFromHexString(m_BuildPartId); - ValidateBuildPart(*Storage, BuildId, BuildPartId, m_BuildPartName); + ValidateStatistics ValidateStats; + DownloadStatistics DownloadStats; + ValidateBuildPart(*Storage, BuildId, BuildPartId, m_BuildPartName, ValidateStats, DownloadStats); return AbortFlag ? 13 : 0; } diff --git a/src/zencore/compress.cpp b/src/zencore/compress.cpp index 88c3bb5b9..ad6b6103c 100644 --- a/src/zencore/compress.cpp +++ b/src/zencore/compress.cpp @@ -158,9 +158,10 @@ class BaseEncoder { public: [[nodiscard]] virtual CompositeBuffer Compress(const CompositeBuffer& RawData, uint64_t BlockSize = DefaultBlockSize) const = 0; - [[nodiscard]] virtual bool CompressToStream(const CompositeBuffer& RawData, - std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback, - uint64_t BlockSize = DefaultBlockSize) const = 0; + [[nodiscard]] virtual bool CompressToStream( + const CompositeBuffer& RawData, + std::function<void(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback, + uint64_t BlockSize = DefaultBlockSize) const = 0; }; class BaseDecoder @@ -189,11 +190,13 @@ public: uint64_t RawOffset, uint64_t RawSize) const = 0; - virtual bool DecompressToStream(const BufferHeader& Header, - const CompositeBuffer& CompressedData, - uint64_t RawOffset, - uint64_t RawSize, - std::function<bool(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const = 0; + virtual bool DecompressToStream( + const BufferHeader& Header, + const CompositeBuffer& CompressedData, + uint64_t RawOffset, + uint64_t RawSize, + std::function<bool(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback) + const = 0; }; /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -207,13 +210,14 @@ public: return CompositeBuffer(HeaderData.MoveToShared(), RawData.MakeOwned()); } - [[nodiscard]] virtual bool CompressToStream(const CompositeBuffer& RawData, - std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback, - uint64_t /* BlockSize */) const final + [[nodiscard]] virtual bool CompressToStream( + const CompositeBuffer& RawData, + std::function<void(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback, + uint64_t /* BlockSize */) const final { UniqueBuffer HeaderData = CompressedBuffer::CreateHeaderForNoneEncoder(RawData.GetSize(), BLAKE3::HashBuffer(RawData)); - Callback(0, CompositeBuffer(IoBuffer(IoBuffer::Wrap, HeaderData.GetData(), HeaderData.GetSize()))); - Callback(HeaderData.GetSize(), RawData); + Callback(0, 0, 0, CompositeBuffer(IoBuffer(IoBuffer::Wrap, HeaderData.GetData(), HeaderData.GetSize()))); + Callback(0, RawData.GetSize(), HeaderData.GetSize(), RawData); return true; } }; @@ -283,17 +287,19 @@ public: [[nodiscard]] uint64_t GetHeaderSize(const BufferHeader&) const final { return sizeof(BufferHeader); } - virtual bool DecompressToStream(const BufferHeader& Header, - const CompositeBuffer& CompressedData, - uint64_t RawOffset, - uint64_t RawSize, - std::function<bool(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const final + virtual bool DecompressToStream( + const BufferHeader& Header, + const CompositeBuffer& CompressedData, + uint64_t RawOffset, + uint64_t RawSize, + std::function<bool(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback) + const final { if (Header.Method == CompressionMethod::None && Header.TotalCompressedSize == CompressedData.GetSize() && Header.TotalCompressedSize == Header.TotalRawSize + sizeof(BufferHeader) && RawOffset < Header.TotalRawSize && (RawOffset + RawSize) <= Header.TotalRawSize) { - if (!Callback(0, CompressedData.Mid(sizeof(BufferHeader) + RawOffset, RawSize))) + if (!Callback(sizeof(BufferHeader) + RawOffset, RawSize, 0, CompressedData.Mid(sizeof(BufferHeader) + RawOffset, RawSize))) { return false; } @@ -309,9 +315,10 @@ class BlockEncoder : public BaseEncoder { public: virtual CompositeBuffer Compress(const CompositeBuffer& RawData, uint64_t BlockSize) const final; - virtual bool CompressToStream(const CompositeBuffer& RawData, - std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback, - uint64_t BlockSize) const final; + virtual bool CompressToStream( + const CompositeBuffer& RawData, + std::function<void(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback, + uint64_t BlockSize) const final; protected: virtual CompressionMethod GetMethod() const = 0; @@ -460,9 +467,10 @@ BlockEncoder::Compress(const CompositeBuffer& RawData, const uint64_t BlockSize) } bool -BlockEncoder::CompressToStream(const CompositeBuffer& RawData, - std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback, - uint64_t BlockSize = DefaultBlockSize) const +BlockEncoder::CompressToStream( + const CompositeBuffer& RawData, + std::function<void(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback, + uint64_t BlockSize = DefaultBlockSize) const { ZEN_ASSERT(IsPow2(BlockSize) && (BlockSize <= (1u << 31))); @@ -504,13 +512,17 @@ BlockEncoder::CompressToStream(const CompositeBuffer& RawData, uint64_t CompressedBlockSize = CompressedBlock.GetSize(); if (RawBlockSize <= CompressedBlockSize) { - Callback(FullHeaderSize + CompressedSize, + Callback(FileRef.FileChunkOffset + RawOffset, + RawBlockSize, + FullHeaderSize + CompressedSize, CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawBlockCopy.GetView().GetData(), RawBlockSize))); CompressedBlockSize = RawBlockSize; } else { - Callback(FullHeaderSize + CompressedSize, + Callback(FileRef.FileChunkOffset + RawOffset, + RawBlockSize, + FullHeaderSize + CompressedSize, CompositeBuffer(IoBuffer(IoBuffer::Wrap, CompressedBlock.GetData(), CompressedBlockSize))); } @@ -540,12 +552,17 @@ BlockEncoder::CompressToStream(const CompositeBuffer& RawData, uint64_t CompressedBlockSize = CompressedBlock.GetSize(); if (RawBlockSize <= CompressedBlockSize) { - Callback(FullHeaderSize + CompressedSize, CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawBlock.GetData(), RawBlockSize))); + Callback(RawOffset, + RawBlockSize, + FullHeaderSize + CompressedSize, + CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawBlock.GetData(), RawBlockSize))); CompressedBlockSize = RawBlockSize; } else { - Callback(FullHeaderSize + CompressedSize, + Callback(RawOffset, + RawBlockSize, + FullHeaderSize + CompressedSize, CompositeBuffer(IoBuffer(IoBuffer::Wrap, CompressedBlock.GetData(), CompressedBlockSize))); } @@ -582,7 +599,7 @@ BlockEncoder::CompressToStream(const CompositeBuffer& RawData, HeaderBuffer.GetMutableView().Mid(sizeof(BufferHeader), MetaSize).CopyFrom(MakeMemoryView(CompressedBlockSizes)); Header.Write(HeaderBuffer.GetMutableView()); - Callback(0, CompositeBuffer(IoBuffer(IoBuffer::Wrap, HeaderBuffer.GetData(), HeaderBuffer.GetSize()))); + Callback(0, 0, 0, CompositeBuffer(IoBuffer(IoBuffer::Wrap, HeaderBuffer.GetData(), HeaderBuffer.GetSize()))); return true; } @@ -615,11 +632,13 @@ public: MutableMemoryView RawView, uint64_t RawOffset) const final; - virtual bool DecompressToStream(const BufferHeader& Header, - const CompositeBuffer& CompressedData, - uint64_t RawOffset, - uint64_t RawSize, - std::function<bool(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const final; + virtual bool DecompressToStream( + const BufferHeader& Header, + const CompositeBuffer& CompressedData, + uint64_t RawOffset, + uint64_t RawSize, + std::function<bool(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback) + const final; protected: virtual bool DecompressBlock(MutableMemoryView RawData, MemoryView CompressedData) const = 0; @@ -743,11 +762,12 @@ BlockDecoder::DecompressToComposite(const BufferHeader& Header, const CompositeB } bool -BlockDecoder::DecompressToStream(const BufferHeader& Header, - const CompositeBuffer& CompressedData, - uint64_t RawOffset, - uint64_t RawSize, - std::function<bool(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const +BlockDecoder::DecompressToStream( + const BufferHeader& Header, + const CompositeBuffer& CompressedData, + uint64_t RawOffset, + uint64_t RawSize, + std::function<bool(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const { if (Header.TotalCompressedSize != CompressedData.GetSize()) { @@ -817,7 +837,9 @@ BlockDecoder::DecompressToStream(const BufferHeader& Header, Source.Detach(); return false; } - if (!Callback(BlockIndex * BlockSize + OffsetInFirstBlock, + if (!Callback(FileRef.FileChunkOffset + CompressedOffset, + CompressedBlockSize, + BlockIndex * BlockSize + OffsetInFirstBlock, CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawDataBuffer.GetData(), BytesToUncompress)))) { Source.Detach(); @@ -827,6 +849,8 @@ BlockDecoder::DecompressToStream(const BufferHeader& Header, else { if (!Callback( + FileRef.FileChunkOffset + CompressedOffset, + BytesToUncompress, BlockIndex * BlockSize + OffsetInFirstBlock, CompositeBuffer( IoBuffer(IoBuffer::Wrap, CompressedBlockCopy.GetView().Mid(OffsetInFirstBlock).GetData(), BytesToUncompress)))) @@ -870,7 +894,9 @@ BlockDecoder::DecompressToStream(const BufferHeader& Header, { return false; } - if (!Callback(BlockIndex * BlockSize + OffsetInFirstBlock, + if (!Callback(CompressedOffset, + UncompressedBlockSize, + BlockIndex * BlockSize + OffsetInFirstBlock, CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawDataBuffer.GetData(), BytesToUncompress)))) { return false; @@ -879,6 +905,8 @@ BlockDecoder::DecompressToStream(const BufferHeader& Header, else { if (!Callback( + CompressedOffset, + BytesToUncompress, BlockIndex * BlockSize + OffsetInFirstBlock, CompositeBuffer( IoBuffer(IoBuffer::Wrap, CompressedBlockCopy.GetView().Mid(OffsetInFirstBlock).GetData(), BytesToUncompress)))) @@ -1778,11 +1806,12 @@ CompressedBuffer::Compress(const SharedBuffer& RawData, } bool -CompressedBuffer::CompressToStream(const CompositeBuffer& RawData, - std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback, - OodleCompressor Compressor, - OodleCompressionLevel CompressionLevel, - uint64_t BlockSize) +CompressedBuffer::CompressToStream( + const CompositeBuffer& RawData, + std::function<void(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback, + OodleCompressor Compressor, + OodleCompressionLevel CompressionLevel, + uint64_t BlockSize) { using namespace detail; @@ -1995,9 +2024,10 @@ CompressedBuffer::DecompressToComposite() const } bool -CompressedBuffer::DecompressToStream(uint64_t RawOffset, - uint64_t RawSize, - std::function<bool(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const +CompressedBuffer::DecompressToStream( + uint64_t RawOffset, + uint64_t RawSize, + std::function<bool(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const { using namespace detail; if (CompressedData) diff --git a/src/zencore/include/zencore/compress.h b/src/zencore/include/zencore/compress.h index 74fd5f767..09fa6249d 100644 --- a/src/zencore/include/zencore/compress.h +++ b/src/zencore/include/zencore/compress.h @@ -74,11 +74,12 @@ public: OodleCompressor Compressor = OodleCompressor::Mermaid, OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast, uint64_t BlockSize = 0); - [[nodiscard]] ZENCORE_API static bool CompressToStream(const CompositeBuffer& RawData, - std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback, - OodleCompressor Compressor = OodleCompressor::Mermaid, - OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast, - uint64_t BlockSize = 0); + [[nodiscard]] ZENCORE_API static bool CompressToStream( + const CompositeBuffer& RawData, + std::function<void(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback, + OodleCompressor Compressor = OodleCompressor::Mermaid, + OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast, + uint64_t BlockSize = 0); /** * Construct from a compressed buffer previously created by Compress(). @@ -207,9 +208,10 @@ public: * * @return True if the buffer is valid and can be decompressed. */ - [[nodiscard]] ZENCORE_API bool DecompressToStream(uint64_t RawOffset, - uint64_t RawSize, - std::function<bool(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const; + [[nodiscard]] ZENCORE_API bool DecompressToStream( + uint64_t RawOffset, + uint64_t RawSize, + std::function<bool(uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const; /** A null compressed buffer. */ static const CompressedBuffer Null; |