aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-03-05 11:27:54 +0100
committerGitHub Enterprise <[email protected]>2025-03-05 11:27:54 +0100
commit4b981aed0281ac995c326e3d03dc482353f66405 (patch)
tree3441f33899708c3b2a7a8cf42f393ecfe17bb22b /src
parentdo direct update of stats numbers (#294) (diff)
downloadzen-4b981aed0281ac995c326e3d03dc482353f66405.tar.xz
zen-4b981aed0281ac995c326e3d03dc482353f66405.zip
streaming compress (#295)
- Improvement: Validate hash of decompressed data inline with streaming decompression - Improvement: Do streaming compression of large blobs to improve memory and I/O performance
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/builds_cmd.cpp150
-rw-r--r--src/zencore/compress.cpp171
-rw-r--r--src/zencore/include/zencore/compress.h5
3 files changed, 289 insertions, 37 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index 132f5db86..fb9da021d 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -1088,13 +1088,21 @@ namespace {
throw std::runtime_error(
fmt::format("Blob {} ({} bytes) compressed header has a mismatching raw hash {}", BlobHash, Payload.GetSize(), RawHash));
}
- SharedBuffer Decompressed = Compressed.Decompress();
- if (!Decompressed)
+
+ IoHashStream Hash;
+ bool CouldDecompress = Compressed.DecompressToStream(0, RawSize, [&Hash](uint64_t, const CompositeBuffer& RangeBuffer) {
+ for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
+ {
+ Hash.Append(Segment.GetView());
+ }
+ });
+
+ if (!CouldDecompress)
{
throw std::runtime_error(
fmt::format("Blob {} ({} bytes) failed to decompress - header information mismatch", BlobHash, Payload.GetSize()));
}
- IoHash ValidateRawHash = IoHash::HashBuffer(Decompressed);
+ IoHash ValidateRawHash = Hash.GetHash();
if (ValidateRawHash != BlobHash)
{
throw std::runtime_error(fmt::format("Blob {} ({} bytes) decompressed hash {} does not match header information",
@@ -1102,14 +1110,26 @@ namespace {
Payload.GetSize(),
ValidateRawHash));
}
- CompositeBuffer DecompressedComposite = Compressed.DecompressToComposite();
- if (!DecompressedComposite)
+ OodleCompressor Compressor;
+ OodleCompressionLevel CompressionLevel;
+ uint64_t BlockSize;
+ if (!Compressed.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize))
{
- throw std::runtime_error(fmt::format("Blob {} ({} bytes) failed to decompress to composite", BlobHash, Payload.GetSize()));
+ throw std::runtime_error(fmt::format("Blob {} ({} bytes) failed to get compression details", BlobHash, Payload.GetSize()));
}
OutCompressedSize = Payload.GetSize();
OutDecompressedSize = RawSize;
- return DecompressedComposite;
+ if (CompressionLevel == OodleCompressionLevel::None)
+ {
+ // Only decompress to composite if we need it for block verification
+ CompositeBuffer DecompressedComposite = Compressed.DecompressToComposite();
+ if (!DecompressedComposite)
+ {
+ throw std::runtime_error(fmt::format("Blob {} ({} bytes) failed to decompress to composite", BlobHash, Payload.GetSize()));
+ }
+ return DecompressedComposite;
+ }
+ return CompositeBuffer{};
}
CompositeBuffer ValidateBlob(BuildStorage& Storage,
@@ -1132,6 +1152,10 @@ namespace {
uint64_t& OutDecompressedSize)
{
CompositeBuffer BlockBuffer = ValidateBlob(std::move(Payload), BlobHash, OutCompressedSize, OutDecompressedSize);
+ if (!BlockBuffer)
+ {
+ throw std::runtime_error(fmt::format("Chunk block blob {} is not compressed using 'None' compression level", BlobHash));
+ }
return GetChunkBlockDescription(BlockBuffer.Flatten(), BlobHash);
}
@@ -1461,6 +1485,7 @@ namespace {
uint32_t ChunkIndex,
const std::filesystem::path& TempFolderPath)
{
+ ZEN_ASSERT(!TempFolderPath.empty());
const IoHash& ChunkHash = Content.ChunkedContent.ChunkHashes[ChunkIndex];
const uint64_t ChunkSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex];
@@ -1476,21 +1501,53 @@ namespace {
throw std::runtime_error(fmt::format("Fetched chunk {} has invalid size", ChunkHash));
}
ZEN_ASSERT_SLOW(IoHash::HashBuffer(RawSource) == ChunkHash);
+ {
+ std::filesystem::path TempFilePath = (TempFolderPath / ChunkHash.ToHexString()).make_preferred();
+ BasicFile CompressedFile;
+ std::error_code Ec;
+ CompressedFile.Open(TempFilePath, BasicFile::Mode::kTruncate, Ec);
+ if (Ec)
+ {
+ throw std::runtime_error(
+ fmt::format("Failed creating temporary file for compressing blob {}. Reason: {}", ChunkHash, Ec.message()));
+ }
+
+ bool CouldCompress = CompressedBuffer::CompressToStream(
+ CompositeBuffer(SharedBuffer(RawSource)),
+ [&](uint64_t Offset, const CompositeBuffer& RangeBuffer) { CompressedFile.Write(RangeBuffer, Offset); });
+ if (CouldCompress)
+ {
+ uint64_t CompressedSize = CompressedFile.FileSize();
+ void* FileHandle = CompressedFile.Detach();
+ IoBuffer TempPayload = IoBuffer(IoBuffer::File,
+ FileHandle,
+ 0,
+ CompressedSize,
+ /*IsWholeFile*/ true);
+ ZEN_ASSERT(TempPayload);
+ TempPayload.SetDeleteOnClose(true);
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(TempPayload), RawHash, RawSize);
+ ZEN_ASSERT(Compressed);
+ ZEN_ASSERT(RawHash == ChunkHash);
+ ZEN_ASSERT(RawSize == ChunkSize);
+ return Compressed.GetCompressed();
+ }
+ CompressedFile.Close();
+ std::filesystem::remove(TempFilePath, Ec);
+ ZEN_UNUSED(Ec);
+ }
+
+ // Try regular compress - decompress may fail if compressed data is larger than non-compressed
CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(RawSource)));
if (!CompressedBlob)
{
- throw std::runtime_error(fmt::format("Failed decompressing chunk {}", ChunkHash));
- }
- if (TempFolderPath.empty())
- {
- return CompressedBlob.GetCompressed().MakeOwned();
- }
- else
- {
- CompositeBuffer TempPayload = WriteToTempFileIfNeeded(CompressedBlob.GetCompressed(), TempFolderPath, ChunkHash);
- return CompressedBuffer::FromCompressedNoValidate(std::move(TempPayload)).GetCompressed();
+ throw std::runtime_error(fmt::format("Failed to compress large blob {}", ChunkHash));
}
+ CompositeBuffer TempPayload = WriteToTempFileIfNeeded(CompressedBlob.GetCompressed(), TempFolderPath, ChunkHash);
+ return CompressedBuffer::FromCompressedNoValidate(std::move(TempPayload)).GetCompressed();
}
struct GeneratedBlocks
@@ -1934,7 +1991,7 @@ namespace {
{
const uint32_t ChunkIndex = LooseChunkIndexes[CompressLooseChunkOrderIndex];
Work.ScheduleWork(
- ReadChunkPool,
+ ReadChunkPool, // GetSyncWorkerPool(),// ReadChunkPool,
[&, ChunkIndex](std::atomic<bool>&) {
if (!AbortFlag)
{
@@ -3490,7 +3547,7 @@ namespace {
const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash);
TemporaryFile DecompressedTemp;
std::error_code Ec;
- DecompressedTemp.CreateTemporary(CacheFolderPath, Ec);
+ DecompressedTemp.CreateTemporary(TempChunkSequenceFileName.parent_path(), Ec);
if (Ec)
{
throw std::runtime_error(
@@ -3507,14 +3564,25 @@ namespace {
{
throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, SequenceRawHash));
}
+ IoHashStream Hash;
bool CouldDecompress = Compressed.DecompressToStream(0, (uint64_t)-1, [&](uint64_t Offset, const CompositeBuffer& RangeBuffer) {
DecompressedTemp.Write(RangeBuffer, Offset);
+ for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
+ {
+ Hash.Append(Segment.GetView());
+ }
WriteToDiskBytes += RangeBuffer.GetSize();
});
if (!CouldDecompress)
{
throw std::runtime_error(fmt::format("Failed to decompress large blob {}", SequenceRawHash));
}
+ const IoHash VerifyHash = Hash.GetHash();
+ if (VerifyHash != SequenceRawHash)
+ {
+ throw std::runtime_error(
+ fmt::format("Decompressed blob payload hash {} does not match expected hash {}", VerifyHash, SequenceRawHash));
+ }
DecompressedTemp.MoveTemporaryIntoPlace(TempChunkSequenceFileName, Ec);
if (Ec)
{
@@ -3616,6 +3684,7 @@ namespace {
auto ChunkHashToChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash);
ZEN_ASSERT(ChunkHashToChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end());
+ bool NeedHashVerify = true;
if (CanStreamDecompress(RemoteContent, ChunkTargetPtrs))
{
const IoHash& SequenceRawHash =
@@ -3624,6 +3693,8 @@ namespace {
SequenceRawHash,
CompositeBuffer(std::move(CompressedPart)),
WriteToDiskBytes);
+ NeedHashVerify = false;
+ ChunksComplete++;
}
else
{
@@ -3660,14 +3731,17 @@ namespace {
{
const IoHash& SequenceRawHash =
RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(
- GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
- if (VerifyChunkHash != SequenceRawHash)
+ if (NeedHashVerify)
{
- throw std::runtime_error(
- fmt::format("Written chunk sequence {} hash does not match expected hash {}",
- VerifyChunkHash,
- SequenceRawHash));
+ const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(
+ GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
+ if (VerifyChunkHash != ChunkHash)
+ {
+ throw std::runtime_error(
+ fmt::format("Written chunk sequence {} hash does not match expected hash {}",
+ VerifyChunkHash,
+ ChunkHash));
+ }
}
std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash),
GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash));
@@ -3949,6 +4023,7 @@ namespace {
{
FilteredWrittenBytesPerSecond.Start();
+ bool NeedHashVerify = true;
if (CanStreamDecompress(RemoteContent, ChunkTargetPtrs))
{
const IoHash& SequenceRawHash =
@@ -3959,6 +4034,7 @@ namespace {
CompositeBuffer(CompressedPart),
WriteToDiskBytes);
ChunkCountWritten++;
+ NeedHashVerify = false;
}
else
{
@@ -3992,16 +4068,20 @@ namespace {
{
const IoHash& SequenceRawHash =
RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- const IoHash VerifyChunkHash =
- IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(
- GetTempChunkedSequenceFileName(CacheFolderPath,
- SequenceRawHash)));
- if (VerifyChunkHash != SequenceRawHash)
+ if (NeedHashVerify)
{
- throw std::runtime_error(fmt::format(
- "Written hunk sequence {} hash does not match expected hash {}",
- VerifyChunkHash,
- SequenceRawHash));
+ const IoHash VerifyChunkHash =
+ IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(
+ GetTempChunkedSequenceFileName(CacheFolderPath,
+ SequenceRawHash)));
+ if (VerifyChunkHash != SequenceRawHash)
+ {
+ throw std::runtime_error(
+ fmt::format("Written hunk sequence {} hash does not match "
+ "expected hash {}",
+ VerifyChunkHash,
+ SequenceRawHash));
+ }
}
std::filesystem::rename(
GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash),
diff --git a/src/zencore/compress.cpp b/src/zencore/compress.cpp
index f13f8b9ca..1844f6a63 100644
--- a/src/zencore/compress.cpp
+++ b/src/zencore/compress.cpp
@@ -158,6 +158,9 @@ class BaseEncoder
{
public:
[[nodiscard]] virtual CompositeBuffer Compress(const CompositeBuffer& RawData, uint64_t BlockSize = DefaultBlockSize) const = 0;
+ [[nodiscard]] virtual bool CompressToStream(const CompositeBuffer& RawData,
+ std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
+ uint64_t BlockSize = DefaultBlockSize) const = 0;
};
class BaseDecoder
@@ -198,11 +201,21 @@ public:
class NoneEncoder final : public BaseEncoder
{
public:
- [[nodiscard]] CompositeBuffer Compress(const CompositeBuffer& RawData, uint64_t /* BlockSize */) const final
+ [[nodiscard]] virtual CompositeBuffer Compress(const CompositeBuffer& RawData, uint64_t /* BlockSize */) const final
{
UniqueBuffer HeaderData = CompressedBuffer::CreateHeaderForNoneEncoder(RawData.GetSize(), BLAKE3::HashBuffer(RawData));
return CompositeBuffer(HeaderData.MoveToShared(), RawData.MakeOwned());
}
+
+ [[nodiscard]] virtual bool CompressToStream(const CompositeBuffer& RawData,
+ std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
+ uint64_t /* BlockSize */) const final
+ {
+ UniqueBuffer HeaderData = CompressedBuffer::CreateHeaderForNoneEncoder(RawData.GetSize(), BLAKE3::HashBuffer(RawData));
+ Callback(0, CompositeBuffer(IoBuffer(IoBuffer::Wrap, HeaderData.GetData(), HeaderData.GetSize())));
+ Callback(HeaderData.GetSize(), RawData);
+ return true;
+ }
};
class NoneDecoder final : public BaseDecoder
@@ -292,7 +305,10 @@ public:
class BlockEncoder : public BaseEncoder
{
public:
- CompositeBuffer Compress(const CompositeBuffer& RawData, uint64_t BlockSize = DefaultBlockSize) const final;
+ virtual CompositeBuffer Compress(const CompositeBuffer& RawData, uint64_t BlockSize) const final;
+ virtual bool CompressToStream(const CompositeBuffer& RawData,
+ std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
+ uint64_t BlockSize) const final;
protected:
virtual CompressionMethod GetMethod() const = 0;
@@ -440,6 +456,133 @@ BlockEncoder::Compress(const CompositeBuffer& RawData, const uint64_t BlockSize)
return CompositeBuffer(SharedBuffer::MakeView(CompositeView, CompressedData.MoveToShared()));
}
+bool
+BlockEncoder::CompressToStream(const CompositeBuffer& RawData,
+ std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
+ uint64_t BlockSize = DefaultBlockSize) const
+{
+ ZEN_ASSERT(IsPow2(BlockSize) && (BlockSize <= (1u << 31)));
+
+ const uint64_t RawSize = RawData.GetSize();
+ BLAKE3Stream RawHash;
+
+ const uint64_t BlockCount = RoundUp(RawSize, BlockSize) / BlockSize;
+ ZEN_ASSERT(BlockCount <= ~uint32_t(0));
+
+ const uint64_t MetaSize = BlockCount * sizeof(uint32_t);
+ const uint64_t FullHeaderSize = sizeof(BufferHeader) + MetaSize;
+
+ std::vector<uint32_t> CompressedBlockSizes;
+ CompressedBlockSizes.reserve(BlockCount);
+ uint64_t CompressedSize = 0;
+ {
+ UniqueBuffer CompressedBlockBuffer = UniqueBuffer::Alloc(GetCompressedBlocksBound(1, BlockSize, Min(RawSize, BlockSize)));
+
+ IoBufferFileReference FileRef = {nullptr, 0, 0};
+ if ((RawData.GetSegments().size() == 1) && RawData.GetSegments()[0].AsIoBuffer().GetFileReference(FileRef))
+ {
+ ZEN_ASSERT(FileRef.FileHandle != nullptr);
+ UniqueBuffer RawBlockCopy = UniqueBuffer::Alloc(BlockSize);
+ BasicFile Source;
+ Source.Attach(FileRef.FileHandle);
+ for (uint64_t RawOffset = 0; RawOffset < RawSize;)
+ {
+ const uint64_t RawBlockSize = zen::Min(RawSize - RawOffset, BlockSize);
+ Source.Read(RawBlockCopy.GetData(), RawBlockSize, FileRef.FileChunkOffset + RawOffset);
+ const MemoryView RawBlock = RawBlockCopy.GetView().Left(RawBlockSize);
+ RawHash.Append(RawBlock);
+ MutableMemoryView CompressedBlock = CompressedBlockBuffer.GetMutableView();
+ if (!CompressBlock(CompressedBlock, RawBlock))
+ {
+ Source.Detach();
+ return false;
+ }
+
+ uint64_t CompressedBlockSize = CompressedBlock.GetSize();
+ if (RawBlockSize <= CompressedBlockSize)
+ {
+ Callback(FullHeaderSize + CompressedSize,
+ CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawBlockCopy.GetView().GetData(), RawBlockSize)));
+ CompressedBlockSize = RawBlockSize;
+ }
+ else
+ {
+ Callback(FullHeaderSize + CompressedSize,
+ CompositeBuffer(IoBuffer(IoBuffer::Wrap, CompressedBlock.GetData(), CompressedBlockSize)));
+ }
+
+ CompressedBlockSizes.push_back(static_cast<uint32_t>(CompressedBlockSize));
+ CompressedSize += CompressedBlockSize;
+ RawOffset += RawBlockSize;
+ }
+ Source.Detach();
+ }
+ else
+ {
+ UniqueBuffer RawBlockCopy;
+ CompositeBuffer::Iterator It = RawData.GetIterator(0);
+
+ for (uint64_t RawOffset = 0; RawOffset < RawSize;)
+ {
+ const uint64_t RawBlockSize = zen::Min(RawSize - RawOffset, BlockSize);
+ const MemoryView RawBlock = RawData.ViewOrCopyRange(It, RawBlockSize, RawBlockCopy);
+ RawHash.Append(RawBlock);
+
+ MutableMemoryView CompressedBlock = CompressedBlockBuffer.GetMutableView();
+ if (!CompressBlock(CompressedBlock, RawBlock))
+ {
+ return false;
+ }
+
+ uint64_t CompressedBlockSize = CompressedBlock.GetSize();
+ if (RawBlockSize <= CompressedBlockSize)
+ {
+ Callback(FullHeaderSize + CompressedSize, CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawBlock.GetData(), RawBlockSize)));
+ CompressedBlockSize = RawBlockSize;
+ }
+ else
+ {
+ Callback(FullHeaderSize + CompressedSize,
+ CompositeBuffer(IoBuffer(IoBuffer::Wrap, CompressedBlock.GetData(), CompressedBlockSize)));
+ }
+
+ CompressedBlockSizes.push_back(static_cast<uint32_t>(CompressedBlockSize));
+ CompressedSize += CompressedBlockSize;
+ RawOffset += RawBlockSize;
+ }
+ }
+ }
+
+ // Return failure if the compressed data is larger than the raw data.
+ if (RawSize <= MetaSize + CompressedSize)
+ {
+ return false;
+ }
+
+ // Write the header and calculate the CRC-32.
+ for (uint32_t& Size : CompressedBlockSizes)
+ {
+ Size = ByteSwap(Size);
+ }
+ UniqueBuffer HeaderBuffer = UniqueBuffer::Alloc(sizeof(BufferHeader) + MetaSize);
+
+ BufferHeader Header;
+ Header.Method = GetMethod();
+ Header.Compressor = GetCompressor();
+ Header.CompressionLevel = GetCompressionLevel();
+ Header.BlockSizeExponent = static_cast<uint8_t>(zen::FloorLog2_64(BlockSize));
+ Header.BlockCount = static_cast<uint32_t>(BlockCount);
+ Header.TotalRawSize = RawSize;
+ Header.TotalCompressedSize = sizeof(BufferHeader) + MetaSize + CompressedSize;
+ Header.RawHash = RawHash.GetHash();
+
+ HeaderBuffer.GetMutableView().Mid(sizeof(BufferHeader), MetaSize).CopyFrom(MakeMemoryView(CompressedBlockSizes));
+ Header.Write(HeaderBuffer.GetMutableView());
+
+ Callback(0, CompositeBuffer(IoBuffer(IoBuffer::Wrap, HeaderBuffer.GetData(), HeaderBuffer.GetSize())));
+ return true;
+}
+
class BlockDecoder : public BaseDecoder
{
public:
@@ -1615,6 +1758,30 @@ CompressedBuffer::Compress(const SharedBuffer& RawData,
return Compress(CompositeBuffer(RawData), Compressor, CompressionLevel, BlockSize);
}
+bool
+CompressedBuffer::CompressToStream(const CompositeBuffer& RawData,
+ std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
+ OodleCompressor Compressor,
+ OodleCompressionLevel CompressionLevel,
+ uint64_t BlockSize)
+{
+ using namespace detail;
+
+ if (BlockSize == 0)
+ {
+ BlockSize = DefaultBlockSize;
+ }
+
+ if (CompressionLevel == OodleCompressionLevel::None)
+ {
+ return NoneEncoder().CompressToStream(RawData, std::move(Callback), BlockSize);
+ }
+ else
+ {
+ return OodleEncoder(Compressor, CompressionLevel).CompressToStream(RawData, std::move(Callback), BlockSize);
+ }
+}
+
CompressedBuffer
CompressedBuffer::FromCompressed(const CompositeBuffer& InCompressedData, IoHash& OutRawHash, uint64_t& OutRawSize)
{
diff --git a/src/zencore/include/zencore/compress.h b/src/zencore/include/zencore/compress.h
index c056d7561..3969e9dbd 100644
--- a/src/zencore/include/zencore/compress.h
+++ b/src/zencore/include/zencore/compress.h
@@ -74,6 +74,11 @@ public:
OodleCompressor Compressor = OodleCompressor::Mermaid,
OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast,
uint64_t BlockSize = 0);
+ [[nodiscard]] ZENCORE_API static bool CompressToStream(const CompositeBuffer& RawData,
+ std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback,
+ OodleCompressor Compressor = OodleCompressor::Mermaid,
+ OodleCompressionLevel CompressionLevel = OodleCompressionLevel::VeryFast,
+ uint64_t BlockSize = 0);
/**
* Construct from a compressed buffer previously created by Compress().