aboutsummaryrefslogtreecommitdiff
path: root/src/zencore/compress.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-03-05 11:27:54 +0100
committerGitHub Enterprise <[email protected]>2025-03-05 11:27:54 +0100
commit4b981aed0281ac995c326e3d03dc482353f66405 (patch)
tree3441f33899708c3b2a7a8cf42f393ecfe17bb22b /src/zencore/compress.cpp
parentdo direct update of stats numbers (#294) (diff)
downloadzen-4b981aed0281ac995c326e3d03dc482353f66405.tar.xz
zen-4b981aed0281ac995c326e3d03dc482353f66405.zip
streaming compress (#295)
- Improvement: Validate hash of decompressed data inline with streaming decompression - Improvement: Do streaming compression of large blobs to improve memory and I/O performance
Diffstat (limited to 'src/zencore/compress.cpp')
-rw-r--r--src/zencore/compress.cpp171
1 files changed, 169 insertions, 2 deletions
diff --git a/src/zencore/compress.cpp b/src/zencore/compress.cpp
index f13f8b9ca..1844f6a63 100644
--- a/src/zencore/compress.cpp
+++ b/src/zencore/compress.cpp
@@ -158,6 +158,9 @@ class BaseEncoder
{
public:
[[nodiscard]] virtual CompositeBuffer Compress(const CompositeBuffer& RawData, uint64_t BlockSize = DefaultBlockSize) const = 0;
+ [[nodiscard]] virtual bool CompressToStream(const CompositeBuffer& RawData,
+ std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
+ uint64_t BlockSize = DefaultBlockSize) const = 0;
};
class BaseDecoder
@@ -198,11 +201,21 @@ public:
class NoneEncoder final : public BaseEncoder
{
public:
- [[nodiscard]] CompositeBuffer Compress(const CompositeBuffer& RawData, uint64_t /* BlockSize */) const final
+ [[nodiscard]] virtual CompositeBuffer Compress(const CompositeBuffer& RawData, uint64_t /* BlockSize */) const final
{
UniqueBuffer HeaderData = CompressedBuffer::CreateHeaderForNoneEncoder(RawData.GetSize(), BLAKE3::HashBuffer(RawData));
return CompositeBuffer(HeaderData.MoveToShared(), RawData.MakeOwned());
}
+
+ [[nodiscard]] virtual bool CompressToStream(const CompositeBuffer& RawData,
+ std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
+ uint64_t /* BlockSize */) const final
+ {
+ UniqueBuffer HeaderData = CompressedBuffer::CreateHeaderForNoneEncoder(RawData.GetSize(), BLAKE3::HashBuffer(RawData));
+ Callback(0, CompositeBuffer(IoBuffer(IoBuffer::Wrap, HeaderData.GetData(), HeaderData.GetSize())));
+ Callback(HeaderData.GetSize(), RawData);
+ return true;
+ }
};
class NoneDecoder final : public BaseDecoder
@@ -292,7 +305,10 @@ public:
class BlockEncoder : public BaseEncoder
{
public:
- CompositeBuffer Compress(const CompositeBuffer& RawData, uint64_t BlockSize = DefaultBlockSize) const final;
+ virtual CompositeBuffer Compress(const CompositeBuffer& RawData, uint64_t BlockSize) const final;
+ virtual bool CompressToStream(const CompositeBuffer& RawData,
+ std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
+ uint64_t BlockSize) const final;
protected:
virtual CompressionMethod GetMethod() const = 0;
@@ -440,6 +456,133 @@ BlockEncoder::Compress(const CompositeBuffer& RawData, const uint64_t BlockSize)
return CompositeBuffer(SharedBuffer::MakeView(CompositeView, CompressedData.MoveToShared()));
}
+bool
+BlockEncoder::CompressToStream(const CompositeBuffer& RawData,
+ std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
+ uint64_t BlockSize = DefaultBlockSize) const
+{
+ ZEN_ASSERT(IsPow2(BlockSize) && (BlockSize <= (1u << 31)));
+
+ const uint64_t RawSize = RawData.GetSize();
+ BLAKE3Stream RawHash;
+
+ const uint64_t BlockCount = RoundUp(RawSize, BlockSize) / BlockSize;
+ ZEN_ASSERT(BlockCount <= ~uint32_t(0));
+
+ const uint64_t MetaSize = BlockCount * sizeof(uint32_t);
+ const uint64_t FullHeaderSize = sizeof(BufferHeader) + MetaSize;
+
+ std::vector<uint32_t> CompressedBlockSizes;
+ CompressedBlockSizes.reserve(BlockCount);
+ uint64_t CompressedSize = 0;
+ {
+ UniqueBuffer CompressedBlockBuffer = UniqueBuffer::Alloc(GetCompressedBlocksBound(1, BlockSize, Min(RawSize, BlockSize)));
+
+ IoBufferFileReference FileRef = {nullptr, 0, 0};
+ if ((RawData.GetSegments().size() == 1) && RawData.GetSegments()[0].AsIoBuffer().GetFileReference(FileRef))
+ {
+ ZEN_ASSERT(FileRef.FileHandle != nullptr);
+ UniqueBuffer RawBlockCopy = UniqueBuffer::Alloc(BlockSize);
+ BasicFile Source;
+ Source.Attach(FileRef.FileHandle);
+ for (uint64_t RawOffset = 0; RawOffset < RawSize;)
+ {
+ const uint64_t RawBlockSize = zen::Min(RawSize - RawOffset, BlockSize);
+ Source.Read(RawBlockCopy.GetData(), RawBlockSize, FileRef.FileChunkOffset + RawOffset);
+ const MemoryView RawBlock = RawBlockCopy.GetView().Left(RawBlockSize);
+ RawHash.Append(RawBlock);
+ MutableMemoryView CompressedBlock = CompressedBlockBuffer.GetMutableView();
+ if (!CompressBlock(CompressedBlock, RawBlock))
+ {
+ Source.Detach();
+ return false;
+ }
+
+ uint64_t CompressedBlockSize = CompressedBlock.GetSize();
+ if (RawBlockSize <= CompressedBlockSize)
+ {
+ Callback(FullHeaderSize + CompressedSize,
+ CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawBlockCopy.GetView().GetData(), RawBlockSize)));
+ CompressedBlockSize = RawBlockSize;
+ }
+ else
+ {
+ Callback(FullHeaderSize + CompressedSize,
+ CompositeBuffer(IoBuffer(IoBuffer::Wrap, CompressedBlock.GetData(), CompressedBlockSize)));
+ }
+
+ CompressedBlockSizes.push_back(static_cast<uint32_t>(CompressedBlockSize));
+ CompressedSize += CompressedBlockSize;
+ RawOffset += RawBlockSize;
+ }
+ Source.Detach();
+ }
+ else
+ {
+ UniqueBuffer RawBlockCopy;
+ CompositeBuffer::Iterator It = RawData.GetIterator(0);
+
+ for (uint64_t RawOffset = 0; RawOffset < RawSize;)
+ {
+ const uint64_t RawBlockSize = zen::Min(RawSize - RawOffset, BlockSize);
+ const MemoryView RawBlock = RawData.ViewOrCopyRange(It, RawBlockSize, RawBlockCopy);
+ RawHash.Append(RawBlock);
+
+ MutableMemoryView CompressedBlock = CompressedBlockBuffer.GetMutableView();
+ if (!CompressBlock(CompressedBlock, RawBlock))
+ {
+ return false;
+ }
+
+ uint64_t CompressedBlockSize = CompressedBlock.GetSize();
+ if (RawBlockSize <= CompressedBlockSize)
+ {
+ Callback(FullHeaderSize + CompressedSize, CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawBlock.GetData(), RawBlockSize)));
+ CompressedBlockSize = RawBlockSize;
+ }
+ else
+ {
+ Callback(FullHeaderSize + CompressedSize,
+ CompositeBuffer(IoBuffer(IoBuffer::Wrap, CompressedBlock.GetData(), CompressedBlockSize)));
+ }
+
+ CompressedBlockSizes.push_back(static_cast<uint32_t>(CompressedBlockSize));
+ CompressedSize += CompressedBlockSize;
+ RawOffset += RawBlockSize;
+ }
+ }
+ }
+
+ // Return failure if the compressed data is larger than the raw data.
+ if (RawSize <= MetaSize + CompressedSize)
+ {
+ return false;
+ }
+
+ // Write the header and calculate the CRC-32.
+ for (uint32_t& Size : CompressedBlockSizes)
+ {
+ Size = ByteSwap(Size);
+ }
+ UniqueBuffer HeaderBuffer = UniqueBuffer::Alloc(sizeof(BufferHeader) + MetaSize);
+
+ BufferHeader Header;
+ Header.Method = GetMethod();
+ Header.Compressor = GetCompressor();
+ Header.CompressionLevel = GetCompressionLevel();
+ Header.BlockSizeExponent = static_cast<uint8_t>(zen::FloorLog2_64(BlockSize));
+ Header.BlockCount = static_cast<uint32_t>(BlockCount);
+ Header.TotalRawSize = RawSize;
+ Header.TotalCompressedSize = sizeof(BufferHeader) + MetaSize + CompressedSize;
+ Header.RawHash = RawHash.GetHash();
+
+ HeaderBuffer.GetMutableView().Mid(sizeof(BufferHeader), MetaSize).CopyFrom(MakeMemoryView(CompressedBlockSizes));
+ Header.Write(HeaderBuffer.GetMutableView());
+
+ Callback(0, CompositeBuffer(IoBuffer(IoBuffer::Wrap, HeaderBuffer.GetData(), HeaderBuffer.GetSize())));
+ return true;
+}
+
class BlockDecoder : public BaseDecoder
{
public:
@@ -1615,6 +1758,30 @@ CompressedBuffer::Compress(const SharedBuffer& RawData,
return Compress(CompositeBuffer(RawData), Compressor, CompressionLevel, BlockSize);
}
+bool
+CompressedBuffer::CompressToStream(const CompositeBuffer& RawData,
+ std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
+ OodleCompressor Compressor,
+ OodleCompressionLevel CompressionLevel,
+ uint64_t BlockSize)
+{
+ using namespace detail;
+
+ if (BlockSize == 0)
+ {
+ BlockSize = DefaultBlockSize;
+ }
+
+ if (CompressionLevel == OodleCompressionLevel::None)
+ {
+ return NoneEncoder().CompressToStream(RawData, std::move(Callback), BlockSize);
+ }
+ else
+ {
+ return OodleEncoder(Compressor, CompressionLevel).CompressToStream(RawData, std::move(Callback), BlockSize);
+ }
+}
+
CompressedBuffer
CompressedBuffer::FromCompressed(const CompositeBuffer& InCompressedData, IoHash& OutRawHash, uint64_t& OutRawSize)
{