diff options
| author | Dan Engelbrecht <[email protected]> | 2025-03-05 11:27:54 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-03-05 11:27:54 +0100 |
| commit | 4b981aed0281ac995c326e3d03dc482353f66405 (patch) | |
| tree | 3441f33899708c3b2a7a8cf42f393ecfe17bb22b /src | |
| parent | do direct update of stats numbers (#294) (diff) | |
| download | zen-4b981aed0281ac995c326e3d03dc482353f66405.tar.xz zen-4b981aed0281ac995c326e3d03dc482353f66405.zip | |
streaming compress (#295)
- Improvement: Validate hash of decompressed data inline with streaming decompression
- Improvement: Do streaming compression of large blobs to improve memory and I/O performance
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 150 | ||||
| -rw-r--r-- | src/zencore/compress.cpp | 171 | ||||
| -rw-r--r-- | src/zencore/include/zencore/compress.h | 5 |
3 files changed, 289 insertions, 37 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index 132f5db86..fb9da021d 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -1088,13 +1088,21 @@ namespace { throw std::runtime_error( fmt::format("Blob {} ({} bytes) compressed header has a mismatching raw hash {}", BlobHash, Payload.GetSize(), RawHash)); } - SharedBuffer Decompressed = Compressed.Decompress(); - if (!Decompressed) + + IoHashStream Hash; + bool CouldDecompress = Compressed.DecompressToStream(0, RawSize, [&Hash](uint64_t, const CompositeBuffer& RangeBuffer) { + for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) + { + Hash.Append(Segment.GetView()); + } + }); + + if (!CouldDecompress) { throw std::runtime_error( fmt::format("Blob {} ({} bytes) failed to decompress - header information mismatch", BlobHash, Payload.GetSize())); } - IoHash ValidateRawHash = IoHash::HashBuffer(Decompressed); + IoHash ValidateRawHash = Hash.GetHash(); if (ValidateRawHash != BlobHash) { throw std::runtime_error(fmt::format("Blob {} ({} bytes) decompressed hash {} does not match header information", @@ -1102,14 +1110,26 @@ namespace { Payload.GetSize(), ValidateRawHash)); } - CompositeBuffer DecompressedComposite = Compressed.DecompressToComposite(); - if (!DecompressedComposite) + OodleCompressor Compressor; + OodleCompressionLevel CompressionLevel; + uint64_t BlockSize; + if (!Compressed.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) { - throw std::runtime_error(fmt::format("Blob {} ({} bytes) failed to decompress to composite", BlobHash, Payload.GetSize())); + throw std::runtime_error(fmt::format("Blob {} ({} bytes) failed to get compression details", BlobHash, Payload.GetSize())); } OutCompressedSize = Payload.GetSize(); OutDecompressedSize = RawSize; - return DecompressedComposite; + if (CompressionLevel == OodleCompressionLevel::None) + { + // Only decompress to composite if we need it for block verification + CompositeBuffer DecompressedComposite = Compressed.DecompressToComposite(); + if (!DecompressedComposite) + { + throw std::runtime_error(fmt::format("Blob {} ({} bytes) failed to decompress to composite", BlobHash, Payload.GetSize())); + } + return DecompressedComposite; + } + return CompositeBuffer{}; } CompositeBuffer ValidateBlob(BuildStorage& Storage, @@ -1132,6 +1152,10 @@ namespace { uint64_t& OutDecompressedSize) { CompositeBuffer BlockBuffer = ValidateBlob(std::move(Payload), BlobHash, OutCompressedSize, OutDecompressedSize); + if (!BlockBuffer) + { + throw std::runtime_error(fmt::format("Chunk block blob {} is not compressed using 'None' compression level", BlobHash)); + } return GetChunkBlockDescription(BlockBuffer.Flatten(), BlobHash); } @@ -1461,6 +1485,7 @@ namespace { uint32_t ChunkIndex, const std::filesystem::path& TempFolderPath) { + ZEN_ASSERT(!TempFolderPath.empty()); const IoHash& ChunkHash = Content.ChunkedContent.ChunkHashes[ChunkIndex]; const uint64_t ChunkSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; @@ -1476,21 +1501,53 @@ namespace { throw std::runtime_error(fmt::format("Fetched chunk {} has invalid size", ChunkHash)); } ZEN_ASSERT_SLOW(IoHash::HashBuffer(RawSource) == ChunkHash); + { + std::filesystem::path TempFilePath = (TempFolderPath / ChunkHash.ToHexString()).make_preferred(); + BasicFile CompressedFile; + std::error_code Ec; + CompressedFile.Open(TempFilePath, BasicFile::Mode::kTruncate, Ec); + if (Ec) + { + throw std::runtime_error( + fmt::format("Failed creating temporary file for compressing blob {}. Reason: {}", ChunkHash, Ec.message())); + } + + bool CouldCompress = CompressedBuffer::CompressToStream( + CompositeBuffer(SharedBuffer(RawSource)), + [&](uint64_t Offset, const CompositeBuffer& RangeBuffer) { CompressedFile.Write(RangeBuffer, Offset); }); + if (CouldCompress) + { + uint64_t CompressedSize = CompressedFile.FileSize(); + void* FileHandle = CompressedFile.Detach(); + IoBuffer TempPayload = IoBuffer(IoBuffer::File, + FileHandle, + 0, + CompressedSize, + /*IsWholeFile*/ true); + ZEN_ASSERT(TempPayload); + TempPayload.SetDeleteOnClose(true); + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(TempPayload), RawHash, RawSize); + ZEN_ASSERT(Compressed); + ZEN_ASSERT(RawHash == ChunkHash); + ZEN_ASSERT(RawSize == ChunkSize); + return Compressed.GetCompressed(); + } + CompressedFile.Close(); + std::filesystem::remove(TempFilePath, Ec); + ZEN_UNUSED(Ec); + } + + // Try regular compress - decompress may fail if compressed data is larger than non-compressed CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(RawSource))); if (!CompressedBlob) { - throw std::runtime_error(fmt::format("Failed decompressing chunk {}", ChunkHash)); - } - if (TempFolderPath.empty()) - { - return CompressedBlob.GetCompressed().MakeOwned(); - } - else - { - CompositeBuffer TempPayload = WriteToTempFileIfNeeded(CompressedBlob.GetCompressed(), TempFolderPath, ChunkHash); - return CompressedBuffer::FromCompressedNoValidate(std::move(TempPayload)).GetCompressed(); + throw std::runtime_error(fmt::format("Failed to compress large blob {}", ChunkHash)); } + CompositeBuffer TempPayload = WriteToTempFileIfNeeded(CompressedBlob.GetCompressed(), TempFolderPath, ChunkHash); + return CompressedBuffer::FromCompressedNoValidate(std::move(TempPayload)).GetCompressed(); } struct GeneratedBlocks @@ -1934,7 +1991,7 @@ namespace { { const uint32_t ChunkIndex = LooseChunkIndexes[CompressLooseChunkOrderIndex]; Work.ScheduleWork( - ReadChunkPool, + ReadChunkPool, // GetSyncWorkerPool(),// ReadChunkPool, [&, ChunkIndex](std::atomic<bool>&) { if (!AbortFlag) { @@ -3490,7 +3547,7 @@ namespace { const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash); TemporaryFile DecompressedTemp; std::error_code Ec; - DecompressedTemp.CreateTemporary(CacheFolderPath, Ec); + DecompressedTemp.CreateTemporary(TempChunkSequenceFileName.parent_path(), Ec); if (Ec) { throw std::runtime_error( @@ -3507,14 +3564,25 @@ namespace { { throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, SequenceRawHash)); } + IoHashStream Hash; bool CouldDecompress = Compressed.DecompressToStream(0, (uint64_t)-1, [&](uint64_t Offset, const CompositeBuffer& RangeBuffer) { DecompressedTemp.Write(RangeBuffer, Offset); + for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) + { + Hash.Append(Segment.GetView()); + } WriteToDiskBytes += RangeBuffer.GetSize(); }); if (!CouldDecompress) { throw std::runtime_error(fmt::format("Failed to decompress large blob {}", SequenceRawHash)); } + const IoHash VerifyHash = Hash.GetHash(); + if (VerifyHash != SequenceRawHash) + { + throw std::runtime_error( + fmt::format("Decompressed blob payload hash {} does not match expected hash {}", VerifyHash, SequenceRawHash)); + } DecompressedTemp.MoveTemporaryIntoPlace(TempChunkSequenceFileName, Ec); if (Ec) { @@ -3616,6 +3684,7 @@ namespace { auto ChunkHashToChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); ZEN_ASSERT(ChunkHashToChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end()); + bool NeedHashVerify = true; if (CanStreamDecompress(RemoteContent, ChunkTargetPtrs)) { const IoHash& SequenceRawHash = @@ -3624,6 +3693,8 @@ namespace { SequenceRawHash, CompositeBuffer(std::move(CompressedPart)), WriteToDiskBytes); + NeedHashVerify = false; + ChunksComplete++; } else { @@ -3660,14 +3731,17 @@ namespace { { const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; - const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile( - GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash))); - if (VerifyChunkHash != SequenceRawHash) + if (NeedHashVerify) { - throw std::runtime_error( - fmt::format("Written chunk sequence {} hash does not match expected hash {}", - VerifyChunkHash, - SequenceRawHash)); + const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile( + GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash))); + if (VerifyChunkHash != ChunkHash) + { + throw std::runtime_error( + fmt::format("Written chunk sequence {} hash does not match expected hash {}", + VerifyChunkHash, + ChunkHash)); + } } std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash), GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)); @@ -3949,6 +4023,7 @@ namespace { { FilteredWrittenBytesPerSecond.Start(); + bool NeedHashVerify = true; if (CanStreamDecompress(RemoteContent, ChunkTargetPtrs)) { const IoHash& SequenceRawHash = @@ -3959,6 +4034,7 @@ namespace { CompositeBuffer(CompressedPart), WriteToDiskBytes); ChunkCountWritten++; + NeedHashVerify = false; } else { @@ -3992,16 +4068,20 @@ namespace { { const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; - const IoHash VerifyChunkHash = - IoHash::HashBuffer(IoBufferBuilder::MakeFromFile( - GetTempChunkedSequenceFileName(CacheFolderPath, - SequenceRawHash))); - if (VerifyChunkHash != SequenceRawHash) + if (NeedHashVerify) { - throw std::runtime_error(fmt::format( - "Written hunk sequence {} hash does not match expected hash {}", - VerifyChunkHash, - SequenceRawHash)); + const IoHash VerifyChunkHash = + IoHash::HashBuffer(IoBufferBuilder::MakeFromFile( + GetTempChunkedSequenceFileName(CacheFolderPath, + SequenceRawHash))); + if (VerifyChunkHash != SequenceRawHash) + { + throw std::runtime_error( + fmt::format("Written hunk sequence {} hash does not match " + "expected hash {}", + VerifyChunkHash, + SequenceRawHash)); + } } std::filesystem::rename( GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash), diff --git a/src/zencore/compress.cpp b/src/zencore/compress.cpp index f13f8b9ca..1844f6a63 100644 --- a/src/zencore/compress.cpp +++ b/src/zencore/compress.cpp @@ -158,6 +158,9 @@ class BaseEncoder { public: [[nodiscard]] virtual CompositeBuffer Compress(const CompositeBuffer& RawData, uint64_t BlockSize = DefaultBlockSize) const = 0; + [[nodiscard]] virtual bool CompressToStream(const CompositeBuffer& RawData, + std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback, + uint64_t BlockSize = DefaultBlockSize) const = 0; }; class BaseDecoder @@ -198,11 +201,21 @@ public: class NoneEncoder final : public BaseEncoder { public: - [[nodiscard]] CompositeBuffer Compress(const CompositeBuffer& RawData, uint64_t /* BlockSize */) const final + [[nodiscard]] virtual CompositeBuffer Compress(const CompositeBuffer& RawData, uint64_t /* BlockSize */) const final { UniqueBuffer HeaderData = CompressedBuffer::CreateHeaderForNoneEncoder(RawData.GetSize(), BLAKE3::HashBuffer(RawData)); return CompositeBuffer(HeaderData.MoveToShared(), RawData.MakeOwned()); } + + [[nodiscard]] virtual bool CompressToStream(const CompositeBuffer& RawData, + std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback, + uint64_t /* BlockSize */) const final + { + UniqueBuffer HeaderData = CompressedBuffer::CreateHeaderForNoneEncoder(RawData.GetSize(), BLAKE3::HashBuffer(RawData)); + Callback(0, CompositeBuffer(IoBuffer(IoBuffer::Wrap, HeaderData.GetData(), HeaderData.GetSize()))); + Callback(HeaderData.GetSize(), RawData); + return true; + } }; class NoneDecoder final : public BaseDecoder @@ -292,7 +305,10 @@ public: class BlockEncoder : public BaseEncoder { public: - CompositeBuffer Compress(const CompositeBuffer& RawData, uint64_t BlockSize = DefaultBlockSize) const final; + virtual CompositeBuffer Compress(const CompositeBuffer& RawData, uint64_t BlockSize) const final; + virtual bool CompressToStream(const CompositeBuffer& RawData, + std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback, + uint64_t BlockSize) const final; protected: virtual CompressionMethod GetMethod() const = 0; @@ -440,6 +456,133 @@ BlockEncoder::Compress(const CompositeBuffer& RawData, const uint64_t BlockSize) return CompositeBuffer(SharedBuffer::MakeView(CompositeView, CompressedData.MoveToShared())); } +bool +BlockEncoder::CompressToStream(const CompositeBuffer& RawData, + std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback, + uint64_t BlockSize = DefaultBlockSize) const +{ + ZEN_ASSERT(IsPow2(BlockSize) && (BlockSize <= (1u << 31))); + + const uint64_t RawSize = RawData.GetSize(); + BLAKE3Stream RawHash; + + const uint64_t BlockCount = RoundUp(RawSize, BlockSize) / BlockSize; + ZEN_ASSERT(BlockCount <= ~uint32_t(0)); + + const uint64_t MetaSize = BlockCount * sizeof(uint32_t); + const uint64_t FullHeaderSize = sizeof(BufferHeader) + MetaSize; + + std::vector<uint32_t> CompressedBlockSizes; + CompressedBlockSizes.reserve(BlockCount); + uint64_t CompressedSize = 0; + { + UniqueBuffer CompressedBlockBuffer = UniqueBuffer::Alloc(GetCompressedBlocksBound(1, BlockSize, Min(RawSize, BlockSize))); + + IoBufferFileReference FileRef = {nullptr, 0, 0}; + if ((RawData.GetSegments().size() == 1) && RawData.GetSegments()[0].AsIoBuffer().GetFileReference(FileRef)) + { + ZEN_ASSERT(FileRef.FileHandle != nullptr); + UniqueBuffer RawBlockCopy = UniqueBuffer::Alloc(BlockSize); + BasicFile Source; + Source.Attach(FileRef.FileHandle); + for (uint64_t RawOffset = 0; RawOffset < RawSize;) + { + const uint64_t RawBlockSize = zen::Min(RawSize - RawOffset, BlockSize); + Source.Read(RawBlockCopy.GetData(), RawBlockSize, FileRef.FileChunkOffset + RawOffset); + const MemoryView RawBlock = RawBlockCopy.GetView().Left(RawBlockSize); + RawHash.Append(RawBlock); + MutableMemoryView CompressedBlock = CompressedBlockBuffer.GetMutableView(); + if (!CompressBlock(CompressedBlock, RawBlock)) + { + Source.Detach(); + return false; + } + + uint64_t CompressedBlockSize = CompressedBlock.GetSize(); + if (RawBlockSize <= CompressedBlockSize) + { + Callback(FullHeaderSize + CompressedSize, + CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawBlockCopy.GetView().GetData(), RawBlockSize))); + CompressedBlockSize = RawBlockSize; + } + else + { + Callback(FullHeaderSize + CompressedSize, + CompositeBuffer(IoBuffer(IoBuffer::Wrap, CompressedBlock.GetData(), CompressedBlockSize))); + } + + CompressedBlockSizes.push_back(static_cast<uint32_t>(CompressedBlockSize)); + CompressedSize += CompressedBlockSize; + RawOffset += RawBlockSize; + } + Source.Detach(); + } + else + { + UniqueBuffer RawBlockCopy; + CompositeBuffer::Iterator It = RawData.GetIterator(0); + + for (uint64_t RawOffset = 0; RawOffset < RawSize;) + { + const uint64_t RawBlockSize = zen::Min(RawSize - RawOffset, BlockSize); + const MemoryView RawBlock = RawData.ViewOrCopyRange(It, RawBlockSize, RawBlockCopy); + RawHash.Append(RawBlock); + + MutableMemoryView CompressedBlock = CompressedBlockBuffer.GetMutableView(); + if (!CompressBlock(CompressedBlock, RawBlock)) + { + return false; + } + + uint64_t CompressedBlockSize = CompressedBlock.GetSize(); + if (RawBlockSize <= CompressedBlockSize) + { + Callback(FullHeaderSize + CompressedSize, CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawBlock.GetData(), RawBlockSize))); + CompressedBlockSize = RawBlockSize; + } + else + { + Callback(FullHeaderSize + CompressedSize, + CompositeBuffer(IoBuffer(IoBuffer::Wrap, CompressedBlock.GetData(), CompressedBlockSize))); + } + + CompressedBlockSizes.push_back(static_cast<uint32_t>(CompressedBlockSize)); + CompressedSize += CompressedBlockSize; + RawOffset += RawBlockSize; + } + } + } + + // Return failure if the compressed data is larger than the raw data. + if (RawSize <= MetaSize + CompressedSize) + { + return false; + } + + // Write the header and calculate the CRC-32. + for (uint32_t& Size : CompressedBlockSizes) + { + Size = ByteSwap(Size); + } + UniqueBuffer HeaderBuffer = UniqueBuffer::Alloc(sizeof(BufferHeader) + MetaSize); + + BufferHeader Header; + Header.Method = GetMethod(); + Header.Compressor = GetCompressor(); + Header.CompressionLevel = GetCompressionLevel(); + Header.BlockSizeExponent = static_cast<uint8_t>(zen::FloorLog2_64(BlockSize)); + Header.BlockCount = static_cast<uint32_t>(BlockCount); + Header.TotalRawSize = RawSize; + Header.TotalCompressedSize = sizeof(BufferHeader) + MetaSize + CompressedSize; + Header.RawHash = RawHash.GetHash(); + + HeaderBuffer.GetMutableView().Mid(sizeof(BufferHeader), MetaSize).CopyFrom(MakeMemoryView(CompressedBlockSizes)); + Header.Write(HeaderBuffer.GetMutableView()); + + Callback(0, CompositeBuffer(IoBuffer(IoBuffer::Wrap, HeaderBuffer.GetData(), HeaderBuffer.GetSize()))); + return true; +} + class BlockDecoder : public BaseDecoder { public: @@ -1615,6 +1758,30 @@ CompressedBuffer::Compress(const SharedBuffer& RawData, return Compress(CompositeBuffer(RawData), Compressor, CompressionLevel, BlockSize); } +bool +CompressedBuffer::CompressToStream(const CompositeBuffer& RawData, + std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback, + OodleCompressor Compressor, + OodleCompressionLevel CompressionLevel, + uint64_t BlockSize) +{ + using namespace detail; + + if (BlockSize == 0) + { + BlockSize = DefaultBlockSize; + } + + if (CompressionLevel == OodleCompressionLevel::None) + { + return NoneEncoder().CompressToStream(RawData, std::move(Callback), BlockSize); + } + else + { + return OodleEncoder(Compressor, CompressionLevel).CompressToStream(RawData, std::move(Callback), BlockSize); + } +} + CompressedBuffer CompressedBuffer::FromCompressed(const CompositeBuffer& InCompressedData, IoHash& OutRawHash, uint64_t& OutRawSize) { diff --git a/src/zencore/include/zencore/compress.h b/src/zencore/include/zencore/compress.h index c056d7561..3969e9dbd 100644 --- a/src/zencore/include/zencore/compress.h +++ b/src/zencore/include/zencore/compress.h @@ -74,6 +74,11 @@ public: OodleCompressor Compressor = OodleCompressor::Mermaid, OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast, uint64_t BlockSize = 0); + [[nodiscard]] ZENCORE_API static bool CompressToStream(const CompositeBuffer& RawData, + std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback, + OodleCompressor Compressor = OodleCompressor::Mermaid, + OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast, + uint64_t BlockSize = 0); /** * Construct from a compressed buffer previously created by Compress(). |