diff options
| author | Stefan Boberg <[email protected]> | 2025-03-05 11:25:48 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-03-05 11:25:48 +0100 |
| commit | 88a83be8f5a372e6904065a029e17ce75032c8ed (patch) | |
| tree | e3e41cbac2cd21a7aedecdcdfedfa2c7b4abe843 /src | |
| parent | stub out build store/cache service (diff) | |
| parent | do direct update of stats numbers (#294) (diff) | |
| download | zen-88a83be8f5a372e6904065a029e17ce75032c8ed.tar.xz zen-88a83be8f5a372e6904065a029e17ce75032c8ed.zip | |
Merge branch 'main' into sb/build-cache
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 223 | ||||
| -rw-r--r-- | src/zencore/compress.cpp | 186 | ||||
| -rw-r--r-- | src/zencore/include/zencore/compress.h | 10 |
3 files changed, 344 insertions, 75 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index 0036192e4..0aae7814c 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -79,15 +79,19 @@ namespace { const ChunksBlockParameters DefaultChunksBlockParams{.MaxBlockSize = 32u * 1024u * 1024u, .MaxChunkEmbedSize = DefaultChunkedParams.MaxSize}; - const std::string ZenFolderName = ".zen"; - const std::string ZenStateFilePath = fmt::format("{}/current_state.cbo", ZenFolderName); - const std::string ZenStateFileJsonPath = fmt::format("{}/current_state.json", ZenFolderName); - const std::string ZenTempFolderName = fmt::format("{}/tmp", ZenFolderName); - const std::string ZenTempCacheFolderName = fmt::format("{}/cache", ZenTempFolderName); - const std::string ZenTempStorageFolderName = fmt::format("{}/storage", ZenTempFolderName); - const std::string ZenTempBlockFolderName = fmt::format("{}/blocks", ZenTempFolderName); - const std::string ZenTempChunkFolderName = fmt::format("{}/chunks", ZenTempFolderName); - const std::string ZenExcludeManifestName = ".zen_exclude_manifest.txt"; + + const double DefaultLatency = 0; // .0010; + const double DefaultDelayPerKBSec = 0; // 0.00005; + + const std::string ZenFolderName = ".zen"; + const std::string ZenStateFilePath = fmt::format("{}/current_state.cbo", ZenFolderName); + const std::string ZenStateFileJsonPath = fmt::format("{}/current_state.json", ZenFolderName); + const std::string ZenTempFolderName = fmt::format("{}/tmp", ZenFolderName); + const std::string ZenTempCacheFolderName = fmt::format("{}/cache", ZenTempFolderName); + const std::string ZenTempStorageFolderName = fmt::format("{}/storage", ZenTempFolderName); + const std::string ZenTempBlockFolderName = fmt::format("{}/blocks", ZenTempFolderName); + const std::string ZenTempChunkFolderName = fmt::format("{}/chunks", ZenTempFolderName); + const std::string ZenExcludeManifestName = ".zen_exclude_manifest.txt"; const std::string UnsyncFolderName = ".unsync"; @@ -274,7 +278,7 @@ namespace { } uint64_t TimeUS = Timer.GetElapsedTimeUs(); uint64_t TimeDeltaUS = TimeUS - LastTimeUS; - if (TimeDeltaUS >= 1000000) + if (TimeDeltaUS >= 2000000) { uint64_t Delta = Count - LastCount; uint64_t PerSecond = (Delta * 1000000) / TimeDeltaUS; @@ -3303,8 +3307,8 @@ namespace { const CompositeBuffer& DecompressedBlockBuffer, const ChunkedContentLookup& Lookup, std::atomic<bool>* RemoteChunkIndexNeedsCopyFromSourceFlags, - uint32_t& OutChunksComplete, - uint64_t& OutBytesWritten) + std::atomic<uint32_t>& OutChunksComplete, + std::atomic<uint64_t>& OutBytesWritten) { ZEN_TRACE_CPU("WriteBlockToDisk"); @@ -3468,7 +3472,7 @@ namespace { std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> ChunkTargets, const CompositeBuffer& ChunkData, WriteFileCache& OpenFileCache, - uint64_t& OutBytesWritten) + std::atomic<uint64_t>& OutBytesWritten) { ZEN_TRACE_CPU("WriteChunkToDisk"); @@ -3491,6 +3495,61 @@ namespace { } } + bool CanStreamDecompress(const ChunkedFolderContent& RemoteContent, + const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> Locations) + { + if (Locations.size() == 1) + { + const uint32_t FirstSequenceIndex = Locations[0]->SequenceIndex; + if (Locations[0]->Offset == 0 && RemoteContent.ChunkedContent.ChunkCounts[FirstSequenceIndex] == 1) + { + return true; + } + } + return false; + } + + void StreamDecompress(const std::filesystem::path& CacheFolderPath, + const IoHash& SequenceRawHash, + CompositeBuffer&& CompressedPart, + std::atomic<uint64_t>& WriteToDiskBytes) + { + const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash); + TemporaryFile DecompressedTemp; + std::error_code Ec; + DecompressedTemp.CreateTemporary(CacheFolderPath, Ec); + if (Ec) + { + throw std::runtime_error( + fmt::format("Failed creating temporary file for decompressing large blob {}. Reason: {}", SequenceRawHash, Ec.message())); + } + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompressedPart, RawHash, RawSize); + if (!Compressed) + { + throw std::runtime_error(fmt::format("Failed to parse header of compressed large blob {}", SequenceRawHash)); + } + if (RawHash != SequenceRawHash) + { + throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, SequenceRawHash)); + } + bool CouldDecompress = Compressed.DecompressToStream(0, (uint64_t)-1, [&](uint64_t Offset, const CompositeBuffer& RangeBuffer) { + DecompressedTemp.Write(RangeBuffer, Offset); + WriteToDiskBytes += RangeBuffer.GetSize(); + }); + if (!CouldDecompress) + { + throw std::runtime_error(fmt::format("Failed to decompress large blob {}", SequenceRawHash)); + } + DecompressedTemp.MoveTemporaryIntoPlace(TempChunkSequenceFileName, Ec); + if (Ec) + { + throw std::runtime_error( + fmt::format("Failed moving temporary file for decompressing large blob {}. Reason: {}", SequenceRawHash, Ec.message())); + } + } + void DownloadLargeBlob(BuildStorage& Storage, const std::filesystem::path& TempFolderPath, const std::filesystem::path& CacheFolderPath, @@ -3556,7 +3615,7 @@ namespace { DownloadedChunks++; Work.ScheduleWork( - WritePool, + WritePool, // GetSyncWorkerPool(),// [&CacheFolderPath, &RemoteContent, &RemoteLookup, @@ -3586,33 +3645,42 @@ namespace { } CompressedPart.SetDeleteOnClose(true); - uint64_t TotalBytesWritten = 0; - auto ChunkHashToChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); ZEN_ASSERT(ChunkHashToChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end()); - uint32_t ChunkIndex = ChunkHashToChunkIndexIt->second; - - SharedBuffer Chunk = Decompress(CompositeBuffer(std::move(CompressedPart)), - ChunkHash, - RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]); + if (CanStreamDecompress(RemoteContent, ChunkTargetPtrs)) + { + const IoHash& SequenceRawHash = + RemoteContent.ChunkedContent.SequenceRawHashes[ChunkTargetPtrs.front()->SequenceIndex]; + StreamDecompress(CacheFolderPath, + SequenceRawHash, + CompositeBuffer(std::move(CompressedPart)), + WriteToDiskBytes); + } + else + { + const uint32_t ChunkIndex = ChunkHashToChunkIndexIt->second; + SharedBuffer Chunk = Decompress(CompositeBuffer(std::move(CompressedPart)), + ChunkHash, + RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]); - // ZEN_ASSERT_SLOW(ChunkHash == - // IoHash::HashBuffer(Chunk.AsIoBuffer())); + // ZEN_ASSERT_SLOW(ChunkHash == + // IoHash::HashBuffer(Chunk.AsIoBuffer())); - if (!AbortFlag) - { - WriteFileCache OpenFileCache; - - WriteChunkToDisk(CacheFolderPath, - RemoteContent, - RemoteLookup, - ChunkTargetPtrs, - CompositeBuffer(Chunk), - OpenFileCache, - TotalBytesWritten); - ChunksComplete++; - WriteToDiskBytes += TotalBytesWritten; + if (!AbortFlag) + { + WriteFileCache OpenFileCache; + + WriteChunkToDisk(CacheFolderPath, + RemoteContent, + RemoteLookup, + ChunkTargetPtrs, + CompositeBuffer(Chunk), + OpenFileCache, + WriteToDiskBytes); + ChunksComplete++; + } } + if (!AbortFlag) { // Write tracking, updating this must be done without any files open (WriteFileCache) @@ -3654,7 +3722,7 @@ namespace { for (auto& WorkItem : WorkItems) { Work.ScheduleWork( - NetworkPool, + NetworkPool, // GetSyncWorkerPool(),// [WorkItem = std::move(WorkItem)](std::atomic<bool>&) { if (!AbortFlag) { @@ -3869,7 +3937,7 @@ namespace { else { Work.ScheduleWork( - NetworkPool, + NetworkPool, // GetSyncWorkerPool(),// [&, ChunkHash, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) { ZEN_TRACE_CPU("UpdateFolder_LooseChunk"); @@ -3923,21 +3991,36 @@ namespace { if (!AbortFlag) { FilteredWrittenBytesPerSecond.Start(); - uint64_t TotalBytesWritten = 0; - SharedBuffer Chunk = - Decompress(CompressedPart, - ChunkHash, - RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex]); + if (CanStreamDecompress(RemoteContent, ChunkTargetPtrs)) { - WriteFileCache OpenFileCache; - WriteChunkToDisk(CacheFolderPath, - RemoteContent, - RemoteLookup, - ChunkTargetPtrs, - CompositeBuffer(Chunk), - OpenFileCache, - TotalBytesWritten); + const IoHash& SequenceRawHash = + RemoteContent.ChunkedContent + .SequenceRawHashes[ChunkTargetPtrs.front()->SequenceIndex]; + StreamDecompress(CacheFolderPath, + SequenceRawHash, + CompositeBuffer(CompressedPart), + WriteToDiskBytes); + ChunkCountWritten++; + } + else + { + SharedBuffer Chunk = + Decompress(CompressedPart, + ChunkHash, + RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex]); + + { + WriteFileCache OpenFileCache; + WriteChunkToDisk(CacheFolderPath, + RemoteContent, + RemoteLookup, + ChunkTargetPtrs, + CompositeBuffer(Chunk), + OpenFileCache, + WriteToDiskBytes); + ChunkCountWritten++; + } } if (!AbortFlag) { @@ -3973,9 +4056,6 @@ namespace { GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)); } } - - ChunkCountWritten++; - WriteToDiskBytes += TotalBytesWritten; } } }, @@ -4214,21 +4294,14 @@ namespace { ZEN_ASSERT_SLOW(BlockDescriptions[BlockIndex].BlockHash == IoHash::HashBuffer(DecompressedBlockBuffer)); - uint64_t BytesWrittenToDisk = 0; - uint32_t ChunksReadFromBlock = 0; - if (WriteBlockToDisk(CacheFolderPath, - RemoteContent, - SequenceIndexChunksLeftToWriteCounters, - DecompressedBlockBuffer, - RemoteLookup, - RemoteChunkIndexNeedsCopyFromSourceFlags.data(), - ChunksReadFromBlock, - BytesWrittenToDisk)) - { - WriteToDiskBytes += BytesWrittenToDisk; - ChunkCountWritten += ChunksReadFromBlock; - } - else + if (!WriteBlockToDisk(CacheFolderPath, + RemoteContent, + SequenceIndexChunksLeftToWriteCounters, + DecompressedBlockBuffer, + RemoteLookup, + RemoteChunkIndexNeedsCopyFromSourceFlags.data(), + ChunkCountWritten, + WriteToDiskBytes)) { throw std::runtime_error( fmt::format("Block {} is malformed", BlockDescriptions[BlockIndex].BlockHash)); @@ -5785,7 +5858,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) else if (!m_StoragePath.empty()) { ZEN_CONSOLE("Querying builds in folder '{}'.", m_StoragePath); - Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false); // , .0015, 0.00004 + Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); } else { @@ -5897,7 +5970,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) m_StoragePath, GeneratedBuildId ? "Generated " : "", BuildId); - Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, m_WriteMetadataAsJson); // , .0015, 0.00004 + Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, m_WriteMetadataAsJson, DefaultLatency, DefaultDelayPerKBSec); StorageName = fmt::format("Disk {}", m_StoragePath.stem()); } else @@ -6030,7 +6103,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) else if (!m_StoragePath.empty()) { ZEN_CONSOLE("Downloading '{}' to '{}' from folder {}. BuildId '{}'", BuildId, m_Path, m_StoragePath, BuildId); - Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false); // , .0015, 0.00004 + Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); StorageName = fmt::format("Disk {}", m_StoragePath.stem()); } else @@ -6134,7 +6207,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) m_Path, m_StoragePath, BuildId); - Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false); // , .0015, 0.00004 + Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); StorageName = fmt::format("Disk {}", m_StoragePath.stem()); } else @@ -6404,7 +6477,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) else if (!m_StoragePath.empty()) { ZEN_CONSOLE("Using folder {}. BuildId '{}'", m_StoragePath, BuildId); - Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false); // , .0015, 0.00004 + Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); StorageName = fmt::format("Disk {}", m_StoragePath.stem()); } else @@ -6472,7 +6545,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) else if (!m_StoragePath.empty()) { ZEN_CONSOLE("Using folder {}. BuildId '{}'", m_StoragePath, BuildId); - Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false); // , .0015, 0.00004 + Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); StorageName = fmt::format("Disk {}", m_StoragePath.stem()); } else diff --git a/src/zencore/compress.cpp b/src/zencore/compress.cpp index 0e2ce2b54..f13f8b9ca 100644 --- a/src/zencore/compress.cpp +++ b/src/zencore/compress.cpp @@ -185,6 +185,12 @@ public: const MemoryView HeaderView, uint64_t RawOffset, uint64_t RawSize) const = 0; + + virtual bool DecompressToStream(const BufferHeader& Header, + const CompositeBuffer& CompressedData, + uint64_t RawOffset, + uint64_t RawSize, + std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const = 0; }; /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -263,6 +269,22 @@ public: } [[nodiscard]] uint64_t GetHeaderSize(const BufferHeader&) const final { return sizeof(BufferHeader); } + + virtual bool DecompressToStream(const BufferHeader& Header, + const CompositeBuffer& CompressedData, + uint64_t RawOffset, + uint64_t RawSize, + std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const final + { + if (Header.Method == CompressionMethod::None && Header.TotalCompressedSize == CompressedData.GetSize() && + Header.TotalCompressedSize == Header.TotalRawSize + sizeof(BufferHeader) && RawOffset < Header.TotalRawSize && + (RawOffset + RawSize) <= Header.TotalRawSize) + { + Callback(0, CompressedData.Mid(sizeof(BufferHeader) + RawOffset, RawSize)); + return true; + } + return false; + } }; ////////////////////////////////////////////////////////////////////////// @@ -447,6 +469,12 @@ public: MutableMemoryView RawView, uint64_t RawOffset) const final; + virtual bool DecompressToStream(const BufferHeader& Header, + const CompositeBuffer& CompressedData, + uint64_t RawOffset, + uint64_t RawSize, + std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const final; + protected: virtual bool DecompressBlock(MutableMemoryView RawData, MemoryView CompressedData) const = 0; }; @@ -569,6 +597,143 @@ BlockDecoder::DecompressToComposite(const BufferHeader& Header, const CompositeB } bool +BlockDecoder::DecompressToStream(const BufferHeader& Header, + const CompositeBuffer& CompressedData, + uint64_t RawOffset, + uint64_t RawSize, + std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const +{ + if (Header.TotalCompressedSize != CompressedData.GetSize()) + { + return false; + } + + const uint64_t BlockSize = uint64_t(1) << Header.BlockSizeExponent; + + UniqueBuffer BlockSizeBuffer; + MemoryView BlockSizeView = CompressedData.ViewOrCopyRange(sizeof(BufferHeader), Header.BlockCount * sizeof(uint32_t), BlockSizeBuffer); + std::span<uint32_t const> CompressedBlockSizes(reinterpret_cast<const uint32_t*>(BlockSizeView.GetData()), Header.BlockCount); + + UniqueBuffer CompressedBlockCopy; + + const size_t FirstBlockIndex = uint64_t(RawOffset / BlockSize); + const size_t LastBlockIndex = uint64_t((RawOffset + RawSize - 1) / BlockSize); + const uint64_t LastBlockSize = BlockSize - ((Header.BlockCount * BlockSize) - Header.TotalRawSize); + uint64_t OffsetInFirstBlock = RawOffset % BlockSize; + uint64_t CompressedOffset = sizeof(BufferHeader) + uint64_t(Header.BlockCount) * sizeof(uint32_t); + uint64_t RemainingRawSize = RawSize; + + for (size_t BlockIndex = 0; BlockIndex < FirstBlockIndex; BlockIndex++) + { + const uint32_t CompressedBlockSize = ByteSwap(CompressedBlockSizes[BlockIndex]); + CompressedOffset += CompressedBlockSize; + } + + UniqueBuffer RawDataBuffer; + + IoBufferFileReference FileRef = {nullptr, 0, 0}; + if ((CompressedData.GetSegments().size() == 1) && CompressedData.GetSegments()[0].AsIoBuffer().GetFileReference(FileRef)) + { + ZEN_ASSERT(FileRef.FileHandle != nullptr); + BasicFile Source; + Source.Attach(FileRef.FileHandle); + + for (size_t BlockIndex = FirstBlockIndex; BlockIndex <= LastBlockIndex; BlockIndex++) + { + const uint64_t UncompressedBlockSize = BlockIndex == Header.BlockCount - 1 ? LastBlockSize : BlockSize; + const uint32_t CompressedBlockSize = ByteSwap(CompressedBlockSizes[BlockIndex]); + const bool IsCompressed = CompressedBlockSize < UncompressedBlockSize; + + const uint64_t BytesToUncompress = OffsetInFirstBlock > 0 ? zen::Min(RawSize, UncompressedBlockSize - OffsetInFirstBlock) + : zen::Min(RemainingRawSize, BlockSize); + + if (CompressedBlockCopy.GetSize() < CompressedBlockSize) + { + CompressedBlockCopy = UniqueBuffer::Alloc(CompressedBlockSize); + } + Source.Read(CompressedBlockCopy.GetData(), CompressedBlockSize, FileRef.FileChunkOffset + CompressedOffset); + + MemoryView CompressedBlock = CompressedBlockCopy.GetView().Left(CompressedBlockSize); + + if (IsCompressed) + { + if (RawDataBuffer.IsNull()) + { + RawDataBuffer = UniqueBuffer::Alloc(zen::Min(RawSize, UncompressedBlockSize)); + } + else + { + ZEN_ASSERT(RawDataBuffer.GetSize() >= UncompressedBlockSize); + } + MutableMemoryView UncompressedBlock = RawDataBuffer.GetMutableView().Left(UncompressedBlockSize); + if (!DecompressBlock(UncompressedBlock, CompressedBlock)) + { + Source.Detach(); + return false; + } + Callback(BlockIndex * BlockSize + OffsetInFirstBlock, + CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawDataBuffer.GetData(), BytesToUncompress))); + } + else + { + Callback(BlockIndex * BlockSize + OffsetInFirstBlock, + CompositeBuffer( + IoBuffer(IoBuffer::Wrap, CompressedBlockCopy.GetView().Mid(OffsetInFirstBlock).GetData(), BytesToUncompress))); + } + + OffsetInFirstBlock = 0; + RemainingRawSize -= BytesToUncompress; + CompressedOffset += CompressedBlockSize; + } + Source.Detach(); + } + else + { + for (size_t BlockIndex = FirstBlockIndex; BlockIndex <= LastBlockIndex; BlockIndex++) + { + const uint64_t UncompressedBlockSize = BlockIndex == Header.BlockCount - 1 ? LastBlockSize : BlockSize; + const uint32_t CompressedBlockSize = ByteSwap(CompressedBlockSizes[BlockIndex]); + const bool IsCompressed = CompressedBlockSize < UncompressedBlockSize; + + const uint64_t BytesToUncompress = OffsetInFirstBlock > 0 ? zen::Min(RawSize, UncompressedBlockSize - OffsetInFirstBlock) + : zen::Min(RemainingRawSize, BlockSize); + + MemoryView CompressedBlock = CompressedData.ViewOrCopyRange(CompressedOffset, CompressedBlockSize, CompressedBlockCopy); + + if (IsCompressed) + { + if (RawDataBuffer.IsNull()) + { + RawDataBuffer = UniqueBuffer::Alloc(zen::Min(RawSize, UncompressedBlockSize)); + } + else + { + ZEN_ASSERT(RawDataBuffer.GetSize() >= UncompressedBlockSize); + } + MutableMemoryView UncompressedBlock = RawDataBuffer.GetMutableView().Left(UncompressedBlockSize); + if (!DecompressBlock(UncompressedBlock, CompressedBlock)) + { + return false; + } + Callback(BlockIndex * BlockSize + OffsetInFirstBlock, + CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawDataBuffer.GetData(), BytesToUncompress))); + } + else + { + Callback(BlockIndex * BlockSize + OffsetInFirstBlock, + CompositeBuffer( + IoBuffer(IoBuffer::Wrap, CompressedBlockCopy.GetView().Mid(OffsetInFirstBlock).GetData(), BytesToUncompress))); + } + + OffsetInFirstBlock = 0; + RemainingRawSize -= BytesToUncompress; + CompressedOffset += CompressedBlockSize; + } + } + return true; +} + +bool BlockDecoder::TryDecompressTo(const BufferHeader& Header, const CompositeBuffer& CompressedData, MutableMemoryView RawView, @@ -1644,6 +1809,27 @@ CompressedBuffer::DecompressToComposite() const } bool +CompressedBuffer::DecompressToStream(uint64_t RawOffset, + uint64_t RawSize, + std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const +{ + using namespace detail; + if (CompressedData) + { + const BufferHeader Header = BufferHeader::Read(CompressedData); + if (Header.Magic == BufferHeader::ExpectedMagic) + { + if (const BaseDecoder* const Decoder = GetDecoder(Header.Method)) + { + const uint64_t TotalRawSize = RawSize < ~uint64_t(0) ? RawSize : Header.TotalRawSize - RawOffset; + return Decoder->DecompressToStream(Header, CompressedData, RawOffset, TotalRawSize, std::move(Callback)); + } + } + } + return false; +} + +bool CompressedBuffer::TryGetCompressParameters(OodleCompressor& OutCompressor, OodleCompressionLevel& OutCompressionLevel, uint64_t& OutBlockSize) const diff --git a/src/zencore/include/zencore/compress.h b/src/zencore/include/zencore/compress.h index 5e761ceef..c056d7561 100644 --- a/src/zencore/include/zencore/compress.h +++ b/src/zencore/include/zencore/compress.h @@ -196,6 +196,16 @@ public: */ [[nodiscard]] ZENCORE_API CompositeBuffer DecompressToComposite() const; + /** + * Decompress into and call callback for ranges of decompressed data. + * The buffer in the callback will be overwritten when the callback returns. + * + * @return True if the buffer is valid and can be decompressed. + */ + [[nodiscard]] ZENCORE_API bool DecompressToStream(uint64_t RawOffset, + uint64_t RawSize, + std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const; + /** A null compressed buffer. */ static const CompressedBuffer Null; |