diff options
| author | Dan Engelbrecht <[email protected]> | 2025-03-04 10:22:53 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-03-04 10:22:53 +0100 |
| commit | 7cb1de5a966aa23e0c098d2bbf9009d0f82a6477 (patch) | |
| tree | 5c340a3947e27b2710968442cee7f009bc6ab439 /src | |
| parent | limit and validate responses before logging the text (#292) (diff) | |
| download | zen-7cb1de5a966aa23e0c098d2bbf9009d0f82a6477.tar.xz zen-7cb1de5a966aa23e0c098d2bbf9009d0f82a6477.zip | |
stream decompress (#293)
* clean up latency parameters and slow down rate updates
* add DecompressToStream
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 198 | ||||
| -rw-r--r-- | src/zencore/compress.cpp | 186 | ||||
| -rw-r--r-- | src/zencore/include/zencore/compress.h | 10 |
3 files changed, 337 insertions, 57 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index 219d01240..a9852151f 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -78,15 +78,19 @@ namespace { const ChunksBlockParameters DefaultChunksBlockParams{.MaxBlockSize = 32u * 1024u * 1024u, .MaxChunkEmbedSize = DefaultChunkedParams.MaxSize}; - const std::string ZenFolderName = ".zen"; - const std::string ZenStateFilePath = fmt::format("{}/current_state.cbo", ZenFolderName); - const std::string ZenStateFileJsonPath = fmt::format("{}/current_state.json", ZenFolderName); - const std::string ZenTempFolderName = fmt::format("{}/tmp", ZenFolderName); - const std::string ZenTempCacheFolderName = fmt::format("{}/cache", ZenTempFolderName); - const std::string ZenTempStorageFolderName = fmt::format("{}/storage", ZenTempFolderName); - const std::string ZenTempBlockFolderName = fmt::format("{}/blocks", ZenTempFolderName); - const std::string ZenTempChunkFolderName = fmt::format("{}/chunks", ZenTempFolderName); - const std::string ZenExcludeManifestName = ".zen_exclude_manifest.txt"; + + const double DefaultLatency = 0; // .0010; + const double DefaultDelayPerKBSec = 0; // 0.00005; + + const std::string ZenFolderName = ".zen"; + const std::string ZenStateFilePath = fmt::format("{}/current_state.cbo", ZenFolderName); + const std::string ZenStateFileJsonPath = fmt::format("{}/current_state.json", ZenFolderName); + const std::string ZenTempFolderName = fmt::format("{}/tmp", ZenFolderName); + const std::string ZenTempCacheFolderName = fmt::format("{}/cache", ZenTempFolderName); + const std::string ZenTempStorageFolderName = fmt::format("{}/storage", ZenTempFolderName); + const std::string ZenTempBlockFolderName = fmt::format("{}/blocks", ZenTempFolderName); + const std::string ZenTempChunkFolderName = fmt::format("{}/chunks", ZenTempFolderName); + const std::string ZenExcludeManifestName = ".zen_exclude_manifest.txt"; const std::string UnsyncFolderName = ".unsync"; @@ -271,7 +275,7 @@ namespace { } uint64_t TimeUS = Timer.GetElapsedTimeUs(); uint64_t TimeDeltaUS = TimeUS - LastTimeUS; - if (TimeDeltaUS >= 1000000) + if (TimeDeltaUS >= 2000000) { uint64_t Delta = Count - LastCount; uint64_t PerSecond = (Delta * 1000000) / TimeDeltaUS; @@ -3464,6 +3468,61 @@ namespace { } } + bool CanStreamDecompress(const ChunkedFolderContent& RemoteContent, + const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> Locations) + { + if (Locations.size() == 1) + { + const uint32_t FirstSequenceIndex = Locations[0]->SequenceIndex; + if (Locations[0]->Offset == 0 && RemoteContent.ChunkedContent.ChunkCounts[FirstSequenceIndex] == 1) + { + return true; + } + } + return false; + } + + void StreamDecompress(const std::filesystem::path& CacheFolderPath, + const IoHash& SequenceRawHash, + CompositeBuffer&& CompressedPart, + std::atomic<uint64_t>& WriteToDiskBytes) + { + const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash); + TemporaryFile DecompressedTemp; + std::error_code Ec; + DecompressedTemp.CreateTemporary(CacheFolderPath, Ec); + if (Ec) + { + throw std::runtime_error( + fmt::format("Failed creating temporary file for decompressing large blob {}. Reason: {}", SequenceRawHash, Ec.message())); + } + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompressedPart, RawHash, RawSize); + if (!Compressed) + { + throw std::runtime_error(fmt::format("Failed to parse header of compressed large blob {}", SequenceRawHash)); + } + if (RawHash != SequenceRawHash) + { + throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, SequenceRawHash)); + } + bool CouldDecompress = Compressed.DecompressToStream(0, (uint64_t)-1, [&](uint64_t Offset, const CompositeBuffer& RangeBuffer) { + DecompressedTemp.Write(RangeBuffer, Offset); + WriteToDiskBytes += RangeBuffer.GetSize(); + }); + if (!CouldDecompress) + { + throw std::runtime_error(fmt::format("Failed to decompress large blob {}", SequenceRawHash)); + } + DecompressedTemp.MoveTemporaryIntoPlace(TempChunkSequenceFileName, Ec); + if (Ec) + { + throw std::runtime_error( + fmt::format("Failed moving temporary file for decompressing large blob {}. Reason: {}", SequenceRawHash, Ec.message())); + } + } + void DownloadLargeBlob(BuildStorage& Storage, const std::filesystem::path& TempFolderPath, const std::filesystem::path& CacheFolderPath, @@ -3527,7 +3586,7 @@ namespace { DownloadedChunks++; Work.ScheduleWork( - WritePool, + WritePool, // GetSyncWorkerPool(),// [&CacheFolderPath, &RemoteContent, &RemoteLookup, @@ -3555,33 +3614,44 @@ namespace { } CompressedPart.SetDeleteOnClose(true); - uint64_t TotalBytesWritten = 0; - auto ChunkHashToChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); ZEN_ASSERT(ChunkHashToChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end()); - uint32_t ChunkIndex = ChunkHashToChunkIndexIt->second; - - SharedBuffer Chunk = Decompress(CompositeBuffer(std::move(CompressedPart)), - ChunkHash, - RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]); + if (CanStreamDecompress(RemoteContent, ChunkTargetPtrs)) + { + const IoHash& SequenceRawHash = + RemoteContent.ChunkedContent.SequenceRawHashes[ChunkTargetPtrs.front()->SequenceIndex]; + StreamDecompress(CacheFolderPath, + SequenceRawHash, + CompositeBuffer(std::move(CompressedPart)), + WriteToDiskBytes); + } + else + { + const uint32_t ChunkIndex = ChunkHashToChunkIndexIt->second; + SharedBuffer Chunk = Decompress(CompositeBuffer(std::move(CompressedPart)), + ChunkHash, + RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]); - // ZEN_ASSERT_SLOW(ChunkHash == - // IoHash::HashBuffer(Chunk.AsIoBuffer())); + // ZEN_ASSERT_SLOW(ChunkHash == + // IoHash::HashBuffer(Chunk.AsIoBuffer())); - if (!AbortFlag) - { - WriteFileCache OpenFileCache; - - WriteChunkToDisk(CacheFolderPath, - RemoteContent, - RemoteLookup, - ChunkTargetPtrs, - CompositeBuffer(Chunk), - OpenFileCache, - TotalBytesWritten); - ChunksComplete++; - WriteToDiskBytes += TotalBytesWritten; + if (!AbortFlag) + { + WriteFileCache OpenFileCache; + + uint64_t TotalBytesWritten = 0; + WriteChunkToDisk(CacheFolderPath, + RemoteContent, + RemoteLookup, + ChunkTargetPtrs, + CompositeBuffer(Chunk), + OpenFileCache, + TotalBytesWritten); + ChunksComplete++; + WriteToDiskBytes += TotalBytesWritten; + } } + if (!AbortFlag) { // Write tracking, updating this must be done without any files open (WriteFileCache) @@ -3619,7 +3689,7 @@ namespace { for (auto& WorkItem : WorkItems) { Work.ScheduleWork( - NetworkPool, + NetworkPool, // GetSyncWorkerPool(),// [WorkItem = std::move(WorkItem)](std::atomic<bool>&) { if (!AbortFlag) { @@ -3830,7 +3900,7 @@ namespace { else { Work.ScheduleWork( - NetworkPool, + NetworkPool, // GetSyncWorkerPool(),// [&, ChunkHash, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) { if (!AbortFlag) { @@ -3880,21 +3950,38 @@ namespace { if (!AbortFlag) { FilteredWrittenBytesPerSecond.Start(); - uint64_t TotalBytesWritten = 0; - SharedBuffer Chunk = - Decompress(CompressedPart, - ChunkHash, - RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex]); + if (CanStreamDecompress(RemoteContent, ChunkTargetPtrs)) { - WriteFileCache OpenFileCache; - WriteChunkToDisk(CacheFolderPath, - RemoteContent, - RemoteLookup, - ChunkTargetPtrs, - CompositeBuffer(Chunk), - OpenFileCache, - TotalBytesWritten); + const IoHash& SequenceRawHash = + RemoteContent.ChunkedContent + .SequenceRawHashes[ChunkTargetPtrs.front()->SequenceIndex]; + StreamDecompress(CacheFolderPath, + SequenceRawHash, + CompositeBuffer(CompressedPart), + WriteToDiskBytes); + ChunkCountWritten++; + } + else + { + uint64_t TotalBytesWritten = 0; + SharedBuffer Chunk = + Decompress(CompressedPart, + ChunkHash, + RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex]); + + { + WriteFileCache OpenFileCache; + WriteChunkToDisk(CacheFolderPath, + RemoteContent, + RemoteLookup, + ChunkTargetPtrs, + CompositeBuffer(Chunk), + OpenFileCache, + TotalBytesWritten); + ChunkCountWritten++; + WriteToDiskBytes += TotalBytesWritten; + } } if (!AbortFlag) { @@ -3925,9 +4012,6 @@ namespace { GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)); } } - - ChunkCountWritten++; - WriteToDiskBytes += TotalBytesWritten; } } }, @@ -5707,7 +5791,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) else if (!m_StoragePath.empty()) { ZEN_CONSOLE("Querying builds in folder '{}'.", m_StoragePath); - Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false); // , .0015, 0.00004 + Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); } else { @@ -5819,7 +5903,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) m_StoragePath, GeneratedBuildId ? "Generated " : "", BuildId); - Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, m_WriteMetadataAsJson); // , .0015, 0.00004 + Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, m_WriteMetadataAsJson, DefaultLatency, DefaultDelayPerKBSec); StorageName = fmt::format("Disk {}", m_StoragePath.stem()); } else @@ -5952,7 +6036,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) else if (!m_StoragePath.empty()) { ZEN_CONSOLE("Downloading '{}' to '{}' from folder {}. BuildId '{}'", BuildId, m_Path, m_StoragePath, BuildId); - Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false); // , .0015, 0.00004 + Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); StorageName = fmt::format("Disk {}", m_StoragePath.stem()); } else @@ -6056,7 +6140,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) m_Path, m_StoragePath, BuildId); - Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false); // , .0015, 0.00004 + Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); StorageName = fmt::format("Disk {}", m_StoragePath.stem()); } else @@ -6326,7 +6410,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) else if (!m_StoragePath.empty()) { ZEN_CONSOLE("Using folder {}. BuildId '{}'", m_StoragePath, BuildId); - Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false); // , .0015, 0.00004 + Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); StorageName = fmt::format("Disk {}", m_StoragePath.stem()); } else @@ -6394,7 +6478,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) else if (!m_StoragePath.empty()) { ZEN_CONSOLE("Using folder {}. BuildId '{}'", m_StoragePath, BuildId); - Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false); // , .0015, 0.00004 + Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); StorageName = fmt::format("Disk {}", m_StoragePath.stem()); } else diff --git a/src/zencore/compress.cpp b/src/zencore/compress.cpp index 0e2ce2b54..f13f8b9ca 100644 --- a/src/zencore/compress.cpp +++ b/src/zencore/compress.cpp @@ -185,6 +185,12 @@ public: const MemoryView HeaderView, uint64_t RawOffset, uint64_t RawSize) const = 0; + + virtual bool DecompressToStream(const BufferHeader& Header, + const CompositeBuffer& CompressedData, + uint64_t RawOffset, + uint64_t RawSize, + std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const = 0; }; /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -263,6 +269,22 @@ public: } [[nodiscard]] uint64_t GetHeaderSize(const BufferHeader&) const final { return sizeof(BufferHeader); } + + virtual bool DecompressToStream(const BufferHeader& Header, + const CompositeBuffer& CompressedData, + uint64_t RawOffset, + uint64_t RawSize, + std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const final + { + if (Header.Method == CompressionMethod::None && Header.TotalCompressedSize == CompressedData.GetSize() && + Header.TotalCompressedSize == Header.TotalRawSize + sizeof(BufferHeader) && RawOffset < Header.TotalRawSize && + (RawOffset + RawSize) <= Header.TotalRawSize) + { + Callback(0, CompressedData.Mid(sizeof(BufferHeader) + RawOffset, RawSize)); + return true; + } + return false; + } }; ////////////////////////////////////////////////////////////////////////// @@ -447,6 +469,12 @@ public: MutableMemoryView RawView, uint64_t RawOffset) const final; + virtual bool DecompressToStream(const BufferHeader& Header, + const CompositeBuffer& CompressedData, + uint64_t RawOffset, + uint64_t RawSize, + std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const final; + protected: virtual bool DecompressBlock(MutableMemoryView RawData, MemoryView CompressedData) const = 0; }; @@ -569,6 +597,143 @@ BlockDecoder::DecompressToComposite(const BufferHeader& Header, const CompositeB } bool +BlockDecoder::DecompressToStream(const BufferHeader& Header, + const CompositeBuffer& CompressedData, + uint64_t RawOffset, + uint64_t RawSize, + std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const +{ + if (Header.TotalCompressedSize != CompressedData.GetSize()) + { + return false; + } + + const uint64_t BlockSize = uint64_t(1) << Header.BlockSizeExponent; + + UniqueBuffer BlockSizeBuffer; + MemoryView BlockSizeView = CompressedData.ViewOrCopyRange(sizeof(BufferHeader), Header.BlockCount * sizeof(uint32_t), BlockSizeBuffer); + std::span<uint32_t const> CompressedBlockSizes(reinterpret_cast<const uint32_t*>(BlockSizeView.GetData()), Header.BlockCount); + + UniqueBuffer CompressedBlockCopy; + + const size_t FirstBlockIndex = uint64_t(RawOffset / BlockSize); + const size_t LastBlockIndex = uint64_t((RawOffset + RawSize - 1) / BlockSize); + const uint64_t LastBlockSize = BlockSize - ((Header.BlockCount * BlockSize) - Header.TotalRawSize); + uint64_t OffsetInFirstBlock = RawOffset % BlockSize; + uint64_t CompressedOffset = sizeof(BufferHeader) + uint64_t(Header.BlockCount) * sizeof(uint32_t); + uint64_t RemainingRawSize = RawSize; + + for (size_t BlockIndex = 0; BlockIndex < FirstBlockIndex; BlockIndex++) + { + const uint32_t CompressedBlockSize = ByteSwap(CompressedBlockSizes[BlockIndex]); + CompressedOffset += CompressedBlockSize; + } + + UniqueBuffer RawDataBuffer; + + IoBufferFileReference FileRef = {nullptr, 0, 0}; + if ((CompressedData.GetSegments().size() == 1) && CompressedData.GetSegments()[0].AsIoBuffer().GetFileReference(FileRef)) + { + ZEN_ASSERT(FileRef.FileHandle != nullptr); + BasicFile Source; + Source.Attach(FileRef.FileHandle); + + for (size_t BlockIndex = FirstBlockIndex; BlockIndex <= LastBlockIndex; BlockIndex++) + { + const uint64_t UncompressedBlockSize = BlockIndex == Header.BlockCount - 1 ? LastBlockSize : BlockSize; + const uint32_t CompressedBlockSize = ByteSwap(CompressedBlockSizes[BlockIndex]); + const bool IsCompressed = CompressedBlockSize < UncompressedBlockSize; + + const uint64_t BytesToUncompress = OffsetInFirstBlock > 0 ? zen::Min(RawSize, UncompressedBlockSize - OffsetInFirstBlock) + : zen::Min(RemainingRawSize, BlockSize); + + if (CompressedBlockCopy.GetSize() < CompressedBlockSize) + { + CompressedBlockCopy = UniqueBuffer::Alloc(CompressedBlockSize); + } + Source.Read(CompressedBlockCopy.GetData(), CompressedBlockSize, FileRef.FileChunkOffset + CompressedOffset); + + MemoryView CompressedBlock = CompressedBlockCopy.GetView().Left(CompressedBlockSize); + + if (IsCompressed) + { + if (RawDataBuffer.IsNull()) + { + RawDataBuffer = UniqueBuffer::Alloc(zen::Min(RawSize, UncompressedBlockSize)); + } + else + { + ZEN_ASSERT(RawDataBuffer.GetSize() >= UncompressedBlockSize); + } + MutableMemoryView UncompressedBlock = RawDataBuffer.GetMutableView().Left(UncompressedBlockSize); + if (!DecompressBlock(UncompressedBlock, CompressedBlock)) + { + Source.Detach(); + return false; + } + Callback(BlockIndex * BlockSize + OffsetInFirstBlock, + CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawDataBuffer.GetData(), BytesToUncompress))); + } + else + { + Callback(BlockIndex * BlockSize + OffsetInFirstBlock, + CompositeBuffer( + IoBuffer(IoBuffer::Wrap, CompressedBlockCopy.GetView().Mid(OffsetInFirstBlock).GetData(), BytesToUncompress))); + } + + OffsetInFirstBlock = 0; + RemainingRawSize -= BytesToUncompress; + CompressedOffset += CompressedBlockSize; + } + Source.Detach(); + } + else + { + for (size_t BlockIndex = FirstBlockIndex; BlockIndex <= LastBlockIndex; BlockIndex++) + { + const uint64_t UncompressedBlockSize = BlockIndex == Header.BlockCount - 1 ? LastBlockSize : BlockSize; + const uint32_t CompressedBlockSize = ByteSwap(CompressedBlockSizes[BlockIndex]); + const bool IsCompressed = CompressedBlockSize < UncompressedBlockSize; + + const uint64_t BytesToUncompress = OffsetInFirstBlock > 0 ? zen::Min(RawSize, UncompressedBlockSize - OffsetInFirstBlock) + : zen::Min(RemainingRawSize, BlockSize); + + MemoryView CompressedBlock = CompressedData.ViewOrCopyRange(CompressedOffset, CompressedBlockSize, CompressedBlockCopy); + + if (IsCompressed) + { + if (RawDataBuffer.IsNull()) + { + RawDataBuffer = UniqueBuffer::Alloc(zen::Min(RawSize, UncompressedBlockSize)); + } + else + { + ZEN_ASSERT(RawDataBuffer.GetSize() >= UncompressedBlockSize); + } + MutableMemoryView UncompressedBlock = RawDataBuffer.GetMutableView().Left(UncompressedBlockSize); + if (!DecompressBlock(UncompressedBlock, CompressedBlock)) + { + return false; + } + Callback(BlockIndex * BlockSize + OffsetInFirstBlock, + CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawDataBuffer.GetData(), BytesToUncompress))); + } + else + { + Callback(BlockIndex * BlockSize + OffsetInFirstBlock, + CompositeBuffer( + IoBuffer(IoBuffer::Wrap, CompressedBlockCopy.GetView().Mid(OffsetInFirstBlock).GetData(), BytesToUncompress))); + } + + OffsetInFirstBlock = 0; + RemainingRawSize -= BytesToUncompress; + CompressedOffset += CompressedBlockSize; + } + } + return true; +} + +bool BlockDecoder::TryDecompressTo(const BufferHeader& Header, const CompositeBuffer& CompressedData, MutableMemoryView RawView, @@ -1644,6 +1809,27 @@ CompressedBuffer::DecompressToComposite() const } bool +CompressedBuffer::DecompressToStream(uint64_t RawOffset, + uint64_t RawSize, + std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const +{ + using namespace detail; + if (CompressedData) + { + const BufferHeader Header = BufferHeader::Read(CompressedData); + if (Header.Magic == BufferHeader::ExpectedMagic) + { + if (const BaseDecoder* const Decoder = GetDecoder(Header.Method)) + { + const uint64_t TotalRawSize = RawSize < ~uint64_t(0) ? RawSize : Header.TotalRawSize - RawOffset; + return Decoder->DecompressToStream(Header, CompressedData, RawOffset, TotalRawSize, std::move(Callback)); + } + } + } + return false; +} + +bool CompressedBuffer::TryGetCompressParameters(OodleCompressor& OutCompressor, OodleCompressionLevel& OutCompressionLevel, uint64_t& OutBlockSize) const diff --git a/src/zencore/include/zencore/compress.h b/src/zencore/include/zencore/compress.h index 5e761ceef..c056d7561 100644 --- a/src/zencore/include/zencore/compress.h +++ b/src/zencore/include/zencore/compress.h @@ -196,6 +196,16 @@ public: */ [[nodiscard]] ZENCORE_API CompositeBuffer DecompressToComposite() const; + /** + * Decompress into and call callback for ranges of decompressed data. + * The buffer in the callback will be overwritten when the callback returns. + * + * @return True if the buffer is valid and can be decompressed. + */ + [[nodiscard]] ZENCORE_API bool DecompressToStream(uint64_t RawOffset, + uint64_t RawSize, + std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const; + /** A null compressed buffer. */ static const CompressedBuffer Null; |