From fa4ef162b1dd53cbad135850a8f9cf8fb532f395 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Tue, 18 Mar 2025 08:56:40 +0100 Subject: improved post upload/download summary (#308) * added ValidateStatistics and improved post upload summary * improved download statistics * smoother stats update when compressing * better feedback during stream compresss/decompress * don't capture TotalPartWriteCount by reference * disk stats cleanup * multi-test-download overall timer --- src/zencore/compress.cpp | 130 +++++++++++++++++++++++++++++------------------ 1 file changed, 80 insertions(+), 50 deletions(-) (limited to 'src/zencore/compress.cpp') diff --git a/src/zencore/compress.cpp b/src/zencore/compress.cpp index 88c3bb5b9..ad6b6103c 100644 --- a/src/zencore/compress.cpp +++ b/src/zencore/compress.cpp @@ -158,9 +158,10 @@ class BaseEncoder { public: [[nodiscard]] virtual CompositeBuffer Compress(const CompositeBuffer& RawData, uint64_t BlockSize = DefaultBlockSize) const = 0; - [[nodiscard]] virtual bool CompressToStream(const CompositeBuffer& RawData, - std::function&& Callback, - uint64_t BlockSize = DefaultBlockSize) const = 0; + [[nodiscard]] virtual bool CompressToStream( + const CompositeBuffer& RawData, + std::function&& Callback, + uint64_t BlockSize = DefaultBlockSize) const = 0; }; class BaseDecoder @@ -189,11 +190,13 @@ public: uint64_t RawOffset, uint64_t RawSize) const = 0; - virtual bool DecompressToStream(const BufferHeader& Header, - const CompositeBuffer& CompressedData, - uint64_t RawOffset, - uint64_t RawSize, - std::function&& Callback) const = 0; + virtual bool DecompressToStream( + const BufferHeader& Header, + const CompositeBuffer& CompressedData, + uint64_t RawOffset, + uint64_t RawSize, + std::function&& Callback) + const = 0; }; /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -207,13 +210,14 @@ public: return CompositeBuffer(HeaderData.MoveToShared(), RawData.MakeOwned()); } - [[nodiscard]] virtual bool CompressToStream(const CompositeBuffer& RawData, - std::function&& Callback, - uint64_t /* BlockSize */) const final + [[nodiscard]] virtual bool CompressToStream( + const CompositeBuffer& RawData, + std::function&& Callback, + uint64_t /* BlockSize */) const final { UniqueBuffer HeaderData = CompressedBuffer::CreateHeaderForNoneEncoder(RawData.GetSize(), BLAKE3::HashBuffer(RawData)); - Callback(0, CompositeBuffer(IoBuffer(IoBuffer::Wrap, HeaderData.GetData(), HeaderData.GetSize()))); - Callback(HeaderData.GetSize(), RawData); + Callback(0, 0, 0, CompositeBuffer(IoBuffer(IoBuffer::Wrap, HeaderData.GetData(), HeaderData.GetSize()))); + Callback(0, RawData.GetSize(), HeaderData.GetSize(), RawData); return true; } }; @@ -283,17 +287,19 @@ public: [[nodiscard]] uint64_t GetHeaderSize(const BufferHeader&) const final { return sizeof(BufferHeader); } - virtual bool DecompressToStream(const BufferHeader& Header, - const CompositeBuffer& CompressedData, - uint64_t RawOffset, - uint64_t RawSize, - std::function&& Callback) const final + virtual bool DecompressToStream( + const BufferHeader& Header, + const CompositeBuffer& CompressedData, + uint64_t RawOffset, + uint64_t RawSize, + std::function&& Callback) + const final { if (Header.Method == CompressionMethod::None && Header.TotalCompressedSize == CompressedData.GetSize() && Header.TotalCompressedSize == Header.TotalRawSize + sizeof(BufferHeader) && RawOffset < Header.TotalRawSize && (RawOffset + RawSize) <= Header.TotalRawSize) { - if (!Callback(0, CompressedData.Mid(sizeof(BufferHeader) + RawOffset, RawSize))) + if (!Callback(sizeof(BufferHeader) + RawOffset, RawSize, 0, CompressedData.Mid(sizeof(BufferHeader) + RawOffset, RawSize))) { return false; } @@ -309,9 +315,10 @@ class BlockEncoder : public BaseEncoder { public: virtual CompositeBuffer Compress(const CompositeBuffer& RawData, uint64_t BlockSize) const final; - virtual bool CompressToStream(const CompositeBuffer& RawData, - std::function&& Callback, - uint64_t BlockSize) const final; + virtual bool CompressToStream( + const CompositeBuffer& RawData, + std::function&& Callback, + uint64_t BlockSize) const final; protected: virtual CompressionMethod GetMethod() const = 0; @@ -460,9 +467,10 @@ BlockEncoder::Compress(const CompositeBuffer& RawData, const uint64_t BlockSize) } bool -BlockEncoder::CompressToStream(const CompositeBuffer& RawData, - std::function&& Callback, - uint64_t BlockSize = DefaultBlockSize) const +BlockEncoder::CompressToStream( + const CompositeBuffer& RawData, + std::function&& Callback, + uint64_t BlockSize = DefaultBlockSize) const { ZEN_ASSERT(IsPow2(BlockSize) && (BlockSize <= (1u << 31))); @@ -504,13 +512,17 @@ BlockEncoder::CompressToStream(const CompositeBuffer& RawData, uint64_t CompressedBlockSize = CompressedBlock.GetSize(); if (RawBlockSize <= CompressedBlockSize) { - Callback(FullHeaderSize + CompressedSize, + Callback(FileRef.FileChunkOffset + RawOffset, + RawBlockSize, + FullHeaderSize + CompressedSize, CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawBlockCopy.GetView().GetData(), RawBlockSize))); CompressedBlockSize = RawBlockSize; } else { - Callback(FullHeaderSize + CompressedSize, + Callback(FileRef.FileChunkOffset + RawOffset, + RawBlockSize, + FullHeaderSize + CompressedSize, CompositeBuffer(IoBuffer(IoBuffer::Wrap, CompressedBlock.GetData(), CompressedBlockSize))); } @@ -540,12 +552,17 @@ BlockEncoder::CompressToStream(const CompositeBuffer& RawData, uint64_t CompressedBlockSize = CompressedBlock.GetSize(); if (RawBlockSize <= CompressedBlockSize) { - Callback(FullHeaderSize + CompressedSize, CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawBlock.GetData(), RawBlockSize))); + Callback(RawOffset, + RawBlockSize, + FullHeaderSize + CompressedSize, + CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawBlock.GetData(), RawBlockSize))); CompressedBlockSize = RawBlockSize; } else { - Callback(FullHeaderSize + CompressedSize, + Callback(RawOffset, + RawBlockSize, + FullHeaderSize + CompressedSize, CompositeBuffer(IoBuffer(IoBuffer::Wrap, CompressedBlock.GetData(), CompressedBlockSize))); } @@ -582,7 +599,7 @@ BlockEncoder::CompressToStream(const CompositeBuffer& RawData, HeaderBuffer.GetMutableView().Mid(sizeof(BufferHeader), MetaSize).CopyFrom(MakeMemoryView(CompressedBlockSizes)); Header.Write(HeaderBuffer.GetMutableView()); - Callback(0, CompositeBuffer(IoBuffer(IoBuffer::Wrap, HeaderBuffer.GetData(), HeaderBuffer.GetSize()))); + Callback(0, 0, 0, CompositeBuffer(IoBuffer(IoBuffer::Wrap, HeaderBuffer.GetData(), HeaderBuffer.GetSize()))); return true; } @@ -615,11 +632,13 @@ public: MutableMemoryView RawView, uint64_t RawOffset) const final; - virtual bool DecompressToStream(const BufferHeader& Header, - const CompositeBuffer& CompressedData, - uint64_t RawOffset, - uint64_t RawSize, - std::function&& Callback) const final; + virtual bool DecompressToStream( + const BufferHeader& Header, + const CompositeBuffer& CompressedData, + uint64_t RawOffset, + uint64_t RawSize, + std::function&& Callback) + const final; protected: virtual bool DecompressBlock(MutableMemoryView RawData, MemoryView CompressedData) const = 0; @@ -743,11 +762,12 @@ BlockDecoder::DecompressToComposite(const BufferHeader& Header, const CompositeB } bool -BlockDecoder::DecompressToStream(const BufferHeader& Header, - const CompositeBuffer& CompressedData, - uint64_t RawOffset, - uint64_t RawSize, - std::function&& Callback) const +BlockDecoder::DecompressToStream( + const BufferHeader& Header, + const CompositeBuffer& CompressedData, + uint64_t RawOffset, + uint64_t RawSize, + std::function&& Callback) const { if (Header.TotalCompressedSize != CompressedData.GetSize()) { @@ -817,7 +837,9 @@ BlockDecoder::DecompressToStream(const BufferHeader& Header, Source.Detach(); return false; } - if (!Callback(BlockIndex * BlockSize + OffsetInFirstBlock, + if (!Callback(FileRef.FileChunkOffset + CompressedOffset, + CompressedBlockSize, + BlockIndex * BlockSize + OffsetInFirstBlock, CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawDataBuffer.GetData(), BytesToUncompress)))) { Source.Detach(); @@ -827,6 +849,8 @@ BlockDecoder::DecompressToStream(const BufferHeader& Header, else { if (!Callback( + FileRef.FileChunkOffset + CompressedOffset, + BytesToUncompress, BlockIndex * BlockSize + OffsetInFirstBlock, CompositeBuffer( IoBuffer(IoBuffer::Wrap, CompressedBlockCopy.GetView().Mid(OffsetInFirstBlock).GetData(), BytesToUncompress)))) @@ -870,7 +894,9 @@ BlockDecoder::DecompressToStream(const BufferHeader& Header, { return false; } - if (!Callback(BlockIndex * BlockSize + OffsetInFirstBlock, + if (!Callback(CompressedOffset, + UncompressedBlockSize, + BlockIndex * BlockSize + OffsetInFirstBlock, CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawDataBuffer.GetData(), BytesToUncompress)))) { return false; @@ -879,6 +905,8 @@ BlockDecoder::DecompressToStream(const BufferHeader& Header, else { if (!Callback( + CompressedOffset, + BytesToUncompress, BlockIndex * BlockSize + OffsetInFirstBlock, CompositeBuffer( IoBuffer(IoBuffer::Wrap, CompressedBlockCopy.GetView().Mid(OffsetInFirstBlock).GetData(), BytesToUncompress)))) @@ -1778,11 +1806,12 @@ CompressedBuffer::Compress(const SharedBuffer& RawData, } bool -CompressedBuffer::CompressToStream(const CompositeBuffer& RawData, - std::function&& Callback, - OodleCompressor Compressor, - OodleCompressionLevel CompressionLevel, - uint64_t BlockSize) +CompressedBuffer::CompressToStream( + const CompositeBuffer& RawData, + std::function&& Callback, + OodleCompressor Compressor, + OodleCompressionLevel CompressionLevel, + uint64_t BlockSize) { using namespace detail; @@ -1995,9 +2024,10 @@ CompressedBuffer::DecompressToComposite() const } bool -CompressedBuffer::DecompressToStream(uint64_t RawOffset, - uint64_t RawSize, - std::function&& Callback) const +CompressedBuffer::DecompressToStream( + uint64_t RawOffset, + uint64_t RawSize, + std::function&& Callback) const { using namespace detail; if (CompressedData) -- cgit v1.2.3 From db6a98ba01b7e3b0dd79c725dbaadd5f465c6799 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 2 Jun 2025 14:55:35 +0200 Subject: streaming none compressor (#414) * add proper streaming to none compressor type --- src/zencore/compress.cpp | 39 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 36 insertions(+), 3 deletions(-) (limited to 'src/zencore/compress.cpp') diff --git a/src/zencore/compress.cpp b/src/zencore/compress.cpp index ad6b6103c..62b64bc9d 100644 --- a/src/zencore/compress.cpp +++ b/src/zencore/compress.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -217,6 +218,20 @@ public: { UniqueBuffer HeaderData = CompressedBuffer::CreateHeaderForNoneEncoder(RawData.GetSize(), BLAKE3::HashBuffer(RawData)); Callback(0, 0, 0, CompositeBuffer(IoBuffer(IoBuffer::Wrap, HeaderData.GetData(), HeaderData.GetSize()))); + + IoBufferFileReference FileRef = {nullptr, 0, 0}; + if ((RawData.GetSegments().size() == 1) && RawData.GetSegments()[0].AsIoBuffer().GetFileReference(FileRef)) + { + ZEN_ASSERT(FileRef.FileHandle != nullptr); + uint64_t CallbackOffset = 0; + ScanFile(FileRef.FileHandle, 0, RawData.GetSize(), 512u * 1024u, [&](const void* Data, size_t Size) { + CompositeBuffer Tmp(SharedBuffer(IoBuffer(IoBuffer::Wrap, Data, Size))); + Callback(CallbackOffset, Size, HeaderData.GetSize() + CallbackOffset, Tmp); + CallbackOffset += Size; + }); + return true; + } + Callback(0, RawData.GetSize(), HeaderData.GetSize(), RawData); return true; } @@ -299,11 +314,29 @@ public: Header.TotalCompressedSize == Header.TotalRawSize + sizeof(BufferHeader) && RawOffset < Header.TotalRawSize && (RawOffset + RawSize) <= Header.TotalRawSize) { - if (!Callback(sizeof(BufferHeader) + RawOffset, RawSize, 0, CompressedData.Mid(sizeof(BufferHeader) + RawOffset, RawSize))) + bool Result = true; + IoBufferFileReference FileRef = {nullptr, 0, 0}; + if ((CompressedData.GetSegments().size() == 1) && CompressedData.GetSegments()[0].AsIoBuffer().GetFileReference(FileRef)) { - return false; + ZEN_ASSERT(FileRef.FileHandle != nullptr); + uint64_t CallbackOffset = 0; + ScanFile(FileRef.FileHandle, sizeof(BufferHeader) + RawOffset, RawSize, 512u * 1024u, [&](const void* Data, size_t Size) { + if (Result) + { + CompositeBuffer Tmp(SharedBuffer(IoBuffer(IoBuffer::Wrap, Data, Size))); + Result = Callback(sizeof(BufferHeader) + RawOffset + CallbackOffset, Size, CallbackOffset, Tmp); + } + CallbackOffset += Size; + }); + return Result; + } + else + { + return Callback(sizeof(BufferHeader) + RawOffset, + RawSize, + 0, + CompressedData.Mid(sizeof(BufferHeader) + RawOffset, RawSize)); } - return true; } return false; } -- cgit v1.2.3 From a0b10b046095d57ffbdb46c83084601a832f4562 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Tue, 3 Jun 2025 16:21:01 +0200 Subject: fixed size chunking for encrypted files (#410) - Improvement: Use fixed size block chunking for know encrypted/compressed file types - Improvement: Skip trying to compress chunks that are sourced from files that are known to be encrypted/compressed - Improvement: Add global open file cache for written files increasing throughput during download by reducing overhead of open/close of file by 80% --- src/zencore/compress.cpp | 50 ++++++++++++++++++++++++++++++++++-------------- 1 file changed, 36 insertions(+), 14 deletions(-) (limited to 'src/zencore/compress.cpp') diff --git a/src/zencore/compress.cpp b/src/zencore/compress.cpp index 62b64bc9d..d9f381811 100644 --- a/src/zencore/compress.cpp +++ b/src/zencore/compress.cpp @@ -216,23 +216,45 @@ public: std::function&& Callback, uint64_t /* BlockSize */) const final { - UniqueBuffer HeaderData = CompressedBuffer::CreateHeaderForNoneEncoder(RawData.GetSize(), BLAKE3::HashBuffer(RawData)); - Callback(0, 0, 0, CompositeBuffer(IoBuffer(IoBuffer::Wrap, HeaderData.GetData(), HeaderData.GetSize()))); + const uint64_t HeaderSize = CompressedBuffer::GetHeaderSizeForNoneEncoder(); - IoBufferFileReference FileRef = {nullptr, 0, 0}; - if ((RawData.GetSegments().size() == 1) && RawData.GetSegments()[0].AsIoBuffer().GetFileReference(FileRef)) + uint64_t RawOffset = 0; + BLAKE3Stream HashStream; + + for (const SharedBuffer& Segment : RawData.GetSegments()) { - ZEN_ASSERT(FileRef.FileHandle != nullptr); - uint64_t CallbackOffset = 0; - ScanFile(FileRef.FileHandle, 0, RawData.GetSize(), 512u * 1024u, [&](const void* Data, size_t Size) { - CompositeBuffer Tmp(SharedBuffer(IoBuffer(IoBuffer::Wrap, Data, Size))); - Callback(CallbackOffset, Size, HeaderData.GetSize() + CallbackOffset, Tmp); - CallbackOffset += Size; - }); - return true; + IoBufferFileReference FileRef = {nullptr, 0, 0}; + IoBuffer SegmentBuffer = Segment.AsIoBuffer(); + if (SegmentBuffer.GetFileReference(FileRef)) + { + ZEN_ASSERT(FileRef.FileHandle != nullptr); + + ScanFile(FileRef.FileHandle, + FileRef.FileChunkOffset, + FileRef.FileChunkSize, + 512u * 1024u, + [&](const void* Data, size_t Size) { + HashStream.Append(Data, Size); + CompositeBuffer Tmp(SharedBuffer::MakeView(Data, Size)); + Callback(RawOffset, Size, HeaderSize + RawOffset, Tmp); + RawOffset += Size; + }); + } + else + { + const uint64_t Size = SegmentBuffer.GetSize(); + HashStream.Append(SegmentBuffer); + Callback(RawOffset, Size, HeaderSize + RawOffset, CompositeBuffer(Segment)); + RawOffset += Size; + } } - Callback(0, RawData.GetSize(), HeaderData.GetSize(), RawData); + ZEN_ASSERT(RawOffset == RawData.GetSize()); + + UniqueBuffer HeaderData = CompressedBuffer::CreateHeaderForNoneEncoder(RawData.GetSize(), HashStream.GetHash()); + ZEN_ASSERT(HeaderData.GetSize() == HeaderSize); + Callback(0, 0, 0, CompositeBuffer(HeaderData.MoveToShared())); + return true; } }; @@ -323,7 +345,7 @@ public: ScanFile(FileRef.FileHandle, sizeof(BufferHeader) + RawOffset, RawSize, 512u * 1024u, [&](const void* Data, size_t Size) { if (Result) { - CompositeBuffer Tmp(SharedBuffer(IoBuffer(IoBuffer::Wrap, Data, Size))); + CompositeBuffer Tmp(SharedBuffer::MakeView(Data, Size)); Result = Callback(sizeof(BufferHeader) + RawOffset + CallbackOffset, Size, CallbackOffset, Tmp); } CallbackOffset += Size; -- cgit v1.2.3