aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-03-04 10:22:53 +0100
committerGitHub Enterprise <[email protected]>2025-03-04 10:22:53 +0100
commit7cb1de5a966aa23e0c098d2bbf9009d0f82a6477 (patch)
tree5c340a3947e27b2710968442cee7f009bc6ab439 /src
parentlimit and validate responses before logging the text (#292) (diff)
downloadzen-7cb1de5a966aa23e0c098d2bbf9009d0f82a6477.tar.xz
zen-7cb1de5a966aa23e0c098d2bbf9009d0f82a6477.zip
stream decompress (#293)
* clean up latency parameters and slow down rate updates * add DecompressToStream
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/builds_cmd.cpp198
-rw-r--r--src/zencore/compress.cpp186
-rw-r--r--src/zencore/include/zencore/compress.h10
3 files changed, 337 insertions, 57 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index 219d01240..a9852151f 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -78,15 +78,19 @@ namespace {
const ChunksBlockParameters DefaultChunksBlockParams{.MaxBlockSize = 32u * 1024u * 1024u,
.MaxChunkEmbedSize = DefaultChunkedParams.MaxSize};
- const std::string ZenFolderName = ".zen";
- const std::string ZenStateFilePath = fmt::format("{}/current_state.cbo", ZenFolderName);
- const std::string ZenStateFileJsonPath = fmt::format("{}/current_state.json", ZenFolderName);
- const std::string ZenTempFolderName = fmt::format("{}/tmp", ZenFolderName);
- const std::string ZenTempCacheFolderName = fmt::format("{}/cache", ZenTempFolderName);
- const std::string ZenTempStorageFolderName = fmt::format("{}/storage", ZenTempFolderName);
- const std::string ZenTempBlockFolderName = fmt::format("{}/blocks", ZenTempFolderName);
- const std::string ZenTempChunkFolderName = fmt::format("{}/chunks", ZenTempFolderName);
- const std::string ZenExcludeManifestName = ".zen_exclude_manifest.txt";
+
+ const double DefaultLatency = 0; // .0010;
+ const double DefaultDelayPerKBSec = 0; // 0.00005;
+
+ const std::string ZenFolderName = ".zen";
+ const std::string ZenStateFilePath = fmt::format("{}/current_state.cbo", ZenFolderName);
+ const std::string ZenStateFileJsonPath = fmt::format("{}/current_state.json", ZenFolderName);
+ const std::string ZenTempFolderName = fmt::format("{}/tmp", ZenFolderName);
+ const std::string ZenTempCacheFolderName = fmt::format("{}/cache", ZenTempFolderName);
+ const std::string ZenTempStorageFolderName = fmt::format("{}/storage", ZenTempFolderName);
+ const std::string ZenTempBlockFolderName = fmt::format("{}/blocks", ZenTempFolderName);
+ const std::string ZenTempChunkFolderName = fmt::format("{}/chunks", ZenTempFolderName);
+ const std::string ZenExcludeManifestName = ".zen_exclude_manifest.txt";
const std::string UnsyncFolderName = ".unsync";
@@ -271,7 +275,7 @@ namespace {
}
uint64_t TimeUS = Timer.GetElapsedTimeUs();
uint64_t TimeDeltaUS = TimeUS - LastTimeUS;
- if (TimeDeltaUS >= 1000000)
+ if (TimeDeltaUS >= 2000000)
{
uint64_t Delta = Count - LastCount;
uint64_t PerSecond = (Delta * 1000000) / TimeDeltaUS;
@@ -3464,6 +3468,61 @@ namespace {
}
}
+ bool CanStreamDecompress(const ChunkedFolderContent& RemoteContent,
+ const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> Locations)
+ {
+ if (Locations.size() == 1)
+ {
+ const uint32_t FirstSequenceIndex = Locations[0]->SequenceIndex;
+ if (Locations[0]->Offset == 0 && RemoteContent.ChunkedContent.ChunkCounts[FirstSequenceIndex] == 1)
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ void StreamDecompress(const std::filesystem::path& CacheFolderPath,
+ const IoHash& SequenceRawHash,
+ CompositeBuffer&& CompressedPart,
+ std::atomic<uint64_t>& WriteToDiskBytes)
+ {
+ const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash);
+ TemporaryFile DecompressedTemp;
+ std::error_code Ec;
+ DecompressedTemp.CreateTemporary(CacheFolderPath, Ec);
+ if (Ec)
+ {
+ throw std::runtime_error(
+ fmt::format("Failed creating temporary file for decompressing large blob {}. Reason: {}", SequenceRawHash, Ec.message()));
+ }
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompressedPart, RawHash, RawSize);
+ if (!Compressed)
+ {
+ throw std::runtime_error(fmt::format("Failed to parse header of compressed large blob {}", SequenceRawHash));
+ }
+ if (RawHash != SequenceRawHash)
+ {
+ throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, SequenceRawHash));
+ }
+ bool CouldDecompress = Compressed.DecompressToStream(0, (uint64_t)-1, [&](uint64_t Offset, const CompositeBuffer& RangeBuffer) {
+ DecompressedTemp.Write(RangeBuffer, Offset);
+ WriteToDiskBytes += RangeBuffer.GetSize();
+ });
+ if (!CouldDecompress)
+ {
+ throw std::runtime_error(fmt::format("Failed to decompress large blob {}", SequenceRawHash));
+ }
+ DecompressedTemp.MoveTemporaryIntoPlace(TempChunkSequenceFileName, Ec);
+ if (Ec)
+ {
+ throw std::runtime_error(
+ fmt::format("Failed moving temporary file for decompressing large blob {}. Reason: {}", SequenceRawHash, Ec.message()));
+ }
+ }
+
void DownloadLargeBlob(BuildStorage& Storage,
const std::filesystem::path& TempFolderPath,
const std::filesystem::path& CacheFolderPath,
@@ -3527,7 +3586,7 @@ namespace {
DownloadedChunks++;
Work.ScheduleWork(
- WritePool,
+ WritePool, // GetSyncWorkerPool(),//
[&CacheFolderPath,
&RemoteContent,
&RemoteLookup,
@@ -3555,33 +3614,44 @@ namespace {
}
CompressedPart.SetDeleteOnClose(true);
- uint64_t TotalBytesWritten = 0;
-
auto ChunkHashToChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash);
ZEN_ASSERT(ChunkHashToChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end());
- uint32_t ChunkIndex = ChunkHashToChunkIndexIt->second;
-
- SharedBuffer Chunk = Decompress(CompositeBuffer(std::move(CompressedPart)),
- ChunkHash,
- RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]);
+ if (CanStreamDecompress(RemoteContent, ChunkTargetPtrs))
+ {
+ const IoHash& SequenceRawHash =
+ RemoteContent.ChunkedContent.SequenceRawHashes[ChunkTargetPtrs.front()->SequenceIndex];
+ StreamDecompress(CacheFolderPath,
+ SequenceRawHash,
+ CompositeBuffer(std::move(CompressedPart)),
+ WriteToDiskBytes);
+ }
+ else
+ {
+ const uint32_t ChunkIndex = ChunkHashToChunkIndexIt->second;
+ SharedBuffer Chunk = Decompress(CompositeBuffer(std::move(CompressedPart)),
+ ChunkHash,
+ RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]);
- // ZEN_ASSERT_SLOW(ChunkHash ==
- // IoHash::HashBuffer(Chunk.AsIoBuffer()));
+ // ZEN_ASSERT_SLOW(ChunkHash ==
+ // IoHash::HashBuffer(Chunk.AsIoBuffer()));
- if (!AbortFlag)
- {
- WriteFileCache OpenFileCache;
-
- WriteChunkToDisk(CacheFolderPath,
- RemoteContent,
- RemoteLookup,
- ChunkTargetPtrs,
- CompositeBuffer(Chunk),
- OpenFileCache,
- TotalBytesWritten);
- ChunksComplete++;
- WriteToDiskBytes += TotalBytesWritten;
+ if (!AbortFlag)
+ {
+ WriteFileCache OpenFileCache;
+
+ uint64_t TotalBytesWritten = 0;
+ WriteChunkToDisk(CacheFolderPath,
+ RemoteContent,
+ RemoteLookup,
+ ChunkTargetPtrs,
+ CompositeBuffer(Chunk),
+ OpenFileCache,
+ TotalBytesWritten);
+ ChunksComplete++;
+ WriteToDiskBytes += TotalBytesWritten;
+ }
}
+
if (!AbortFlag)
{
// Write tracking, updating this must be done without any files open (WriteFileCache)
@@ -3619,7 +3689,7 @@ namespace {
for (auto& WorkItem : WorkItems)
{
Work.ScheduleWork(
- NetworkPool,
+ NetworkPool, // GetSyncWorkerPool(),//
[WorkItem = std::move(WorkItem)](std::atomic<bool>&) {
if (!AbortFlag)
{
@@ -3830,7 +3900,7 @@ namespace {
else
{
Work.ScheduleWork(
- NetworkPool,
+ NetworkPool, // GetSyncWorkerPool(),//
[&, ChunkHash, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) {
if (!AbortFlag)
{
@@ -3880,21 +3950,38 @@ namespace {
if (!AbortFlag)
{
FilteredWrittenBytesPerSecond.Start();
- uint64_t TotalBytesWritten = 0;
- SharedBuffer Chunk =
- Decompress(CompressedPart,
- ChunkHash,
- RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex]);
+ if (CanStreamDecompress(RemoteContent, ChunkTargetPtrs))
{
- WriteFileCache OpenFileCache;
- WriteChunkToDisk(CacheFolderPath,
- RemoteContent,
- RemoteLookup,
- ChunkTargetPtrs,
- CompositeBuffer(Chunk),
- OpenFileCache,
- TotalBytesWritten);
+ const IoHash& SequenceRawHash =
+ RemoteContent.ChunkedContent
+ .SequenceRawHashes[ChunkTargetPtrs.front()->SequenceIndex];
+ StreamDecompress(CacheFolderPath,
+ SequenceRawHash,
+ CompositeBuffer(CompressedPart),
+ WriteToDiskBytes);
+ ChunkCountWritten++;
+ }
+ else
+ {
+ uint64_t TotalBytesWritten = 0;
+ SharedBuffer Chunk =
+ Decompress(CompressedPart,
+ ChunkHash,
+ RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex]);
+
+ {
+ WriteFileCache OpenFileCache;
+ WriteChunkToDisk(CacheFolderPath,
+ RemoteContent,
+ RemoteLookup,
+ ChunkTargetPtrs,
+ CompositeBuffer(Chunk),
+ OpenFileCache,
+ TotalBytesWritten);
+ ChunkCountWritten++;
+ WriteToDiskBytes += TotalBytesWritten;
+ }
}
if (!AbortFlag)
{
@@ -3925,9 +4012,6 @@ namespace {
GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash));
}
}
-
- ChunkCountWritten++;
- WriteToDiskBytes += TotalBytesWritten;
}
}
},
@@ -5707,7 +5791,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
else if (!m_StoragePath.empty())
{
ZEN_CONSOLE("Querying builds in folder '{}'.", m_StoragePath);
- Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false); // , .0015, 0.00004
+ Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
}
else
{
@@ -5819,7 +5903,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
m_StoragePath,
GeneratedBuildId ? "Generated " : "",
BuildId);
- Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, m_WriteMetadataAsJson); // , .0015, 0.00004
+ Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, m_WriteMetadataAsJson, DefaultLatency, DefaultDelayPerKBSec);
StorageName = fmt::format("Disk {}", m_StoragePath.stem());
}
else
@@ -5952,7 +6036,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
else if (!m_StoragePath.empty())
{
ZEN_CONSOLE("Downloading '{}' to '{}' from folder {}. BuildId '{}'", BuildId, m_Path, m_StoragePath, BuildId);
- Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false); // , .0015, 0.00004
+ Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
StorageName = fmt::format("Disk {}", m_StoragePath.stem());
}
else
@@ -6056,7 +6140,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
m_Path,
m_StoragePath,
BuildId);
- Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false); // , .0015, 0.00004
+ Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
StorageName = fmt::format("Disk {}", m_StoragePath.stem());
}
else
@@ -6326,7 +6410,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
else if (!m_StoragePath.empty())
{
ZEN_CONSOLE("Using folder {}. BuildId '{}'", m_StoragePath, BuildId);
- Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false); // , .0015, 0.00004
+ Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
StorageName = fmt::format("Disk {}", m_StoragePath.stem());
}
else
@@ -6394,7 +6478,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
else if (!m_StoragePath.empty())
{
ZEN_CONSOLE("Using folder {}. BuildId '{}'", m_StoragePath, BuildId);
- Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false); // , .0015, 0.00004
+ Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
StorageName = fmt::format("Disk {}", m_StoragePath.stem());
}
else
diff --git a/src/zencore/compress.cpp b/src/zencore/compress.cpp
index 0e2ce2b54..f13f8b9ca 100644
--- a/src/zencore/compress.cpp
+++ b/src/zencore/compress.cpp
@@ -185,6 +185,12 @@ public:
const MemoryView HeaderView,
uint64_t RawOffset,
uint64_t RawSize) const = 0;
+
+ virtual bool DecompressToStream(const BufferHeader& Header,
+ const CompositeBuffer& CompressedData,
+ uint64_t RawOffset,
+ uint64_t RawSize,
+ std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const = 0;
};
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -263,6 +269,22 @@ public:
}
[[nodiscard]] uint64_t GetHeaderSize(const BufferHeader&) const final { return sizeof(BufferHeader); }
+
+ virtual bool DecompressToStream(const BufferHeader& Header,
+ const CompositeBuffer& CompressedData,
+ uint64_t RawOffset,
+ uint64_t RawSize,
+ std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const final
+ {
+ if (Header.Method == CompressionMethod::None && Header.TotalCompressedSize == CompressedData.GetSize() &&
+ Header.TotalCompressedSize == Header.TotalRawSize + sizeof(BufferHeader) && RawOffset < Header.TotalRawSize &&
+ (RawOffset + RawSize) <= Header.TotalRawSize)
+ {
+ Callback(0, CompressedData.Mid(sizeof(BufferHeader) + RawOffset, RawSize));
+ return true;
+ }
+ return false;
+ }
};
//////////////////////////////////////////////////////////////////////////
@@ -447,6 +469,12 @@ public:
MutableMemoryView RawView,
uint64_t RawOffset) const final;
+ virtual bool DecompressToStream(const BufferHeader& Header,
+ const CompositeBuffer& CompressedData,
+ uint64_t RawOffset,
+ uint64_t RawSize,
+ std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const final;
+
protected:
virtual bool DecompressBlock(MutableMemoryView RawData, MemoryView CompressedData) const = 0;
};
@@ -569,6 +597,143 @@ BlockDecoder::DecompressToComposite(const BufferHeader& Header, const CompositeB
}
bool
+BlockDecoder::DecompressToStream(const BufferHeader& Header,
+ const CompositeBuffer& CompressedData,
+ uint64_t RawOffset,
+ uint64_t RawSize,
+ std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const
+{
+ if (Header.TotalCompressedSize != CompressedData.GetSize())
+ {
+ return false;
+ }
+
+ const uint64_t BlockSize = uint64_t(1) << Header.BlockSizeExponent;
+
+ UniqueBuffer BlockSizeBuffer;
+ MemoryView BlockSizeView = CompressedData.ViewOrCopyRange(sizeof(BufferHeader), Header.BlockCount * sizeof(uint32_t), BlockSizeBuffer);
+ std::span<uint32_t const> CompressedBlockSizes(reinterpret_cast<const uint32_t*>(BlockSizeView.GetData()), Header.BlockCount);
+
+ UniqueBuffer CompressedBlockCopy;
+
+ const size_t FirstBlockIndex = uint64_t(RawOffset / BlockSize);
+ const size_t LastBlockIndex = uint64_t((RawOffset + RawSize - 1) / BlockSize);
+ const uint64_t LastBlockSize = BlockSize - ((Header.BlockCount * BlockSize) - Header.TotalRawSize);
+ uint64_t OffsetInFirstBlock = RawOffset % BlockSize;
+ uint64_t CompressedOffset = sizeof(BufferHeader) + uint64_t(Header.BlockCount) * sizeof(uint32_t);
+ uint64_t RemainingRawSize = RawSize;
+
+ for (size_t BlockIndex = 0; BlockIndex < FirstBlockIndex; BlockIndex++)
+ {
+ const uint32_t CompressedBlockSize = ByteSwap(CompressedBlockSizes[BlockIndex]);
+ CompressedOffset += CompressedBlockSize;
+ }
+
+ UniqueBuffer RawDataBuffer;
+
+ IoBufferFileReference FileRef = {nullptr, 0, 0};
+ if ((CompressedData.GetSegments().size() == 1) && CompressedData.GetSegments()[0].AsIoBuffer().GetFileReference(FileRef))
+ {
+ ZEN_ASSERT(FileRef.FileHandle != nullptr);
+ BasicFile Source;
+ Source.Attach(FileRef.FileHandle);
+
+ for (size_t BlockIndex = FirstBlockIndex; BlockIndex <= LastBlockIndex; BlockIndex++)
+ {
+ const uint64_t UncompressedBlockSize = BlockIndex == Header.BlockCount - 1 ? LastBlockSize : BlockSize;
+ const uint32_t CompressedBlockSize = ByteSwap(CompressedBlockSizes[BlockIndex]);
+ const bool IsCompressed = CompressedBlockSize < UncompressedBlockSize;
+
+ const uint64_t BytesToUncompress = OffsetInFirstBlock > 0 ? zen::Min(RawSize, UncompressedBlockSize - OffsetInFirstBlock)
+ : zen::Min(RemainingRawSize, BlockSize);
+
+ if (CompressedBlockCopy.GetSize() < CompressedBlockSize)
+ {
+ CompressedBlockCopy = UniqueBuffer::Alloc(CompressedBlockSize);
+ }
+ Source.Read(CompressedBlockCopy.GetData(), CompressedBlockSize, FileRef.FileChunkOffset + CompressedOffset);
+
+ MemoryView CompressedBlock = CompressedBlockCopy.GetView().Left(CompressedBlockSize);
+
+ if (IsCompressed)
+ {
+ if (RawDataBuffer.IsNull())
+ {
+ RawDataBuffer = UniqueBuffer::Alloc(zen::Min(RawSize, UncompressedBlockSize));
+ }
+ else
+ {
+ ZEN_ASSERT(RawDataBuffer.GetSize() >= UncompressedBlockSize);
+ }
+ MutableMemoryView UncompressedBlock = RawDataBuffer.GetMutableView().Left(UncompressedBlockSize);
+ if (!DecompressBlock(UncompressedBlock, CompressedBlock))
+ {
+ Source.Detach();
+ return false;
+ }
+ Callback(BlockIndex * BlockSize + OffsetInFirstBlock,
+ CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawDataBuffer.GetData(), BytesToUncompress)));
+ }
+ else
+ {
+ Callback(BlockIndex * BlockSize + OffsetInFirstBlock,
+ CompositeBuffer(
+ IoBuffer(IoBuffer::Wrap, CompressedBlockCopy.GetView().Mid(OffsetInFirstBlock).GetData(), BytesToUncompress)));
+ }
+
+ OffsetInFirstBlock = 0;
+ RemainingRawSize -= BytesToUncompress;
+ CompressedOffset += CompressedBlockSize;
+ }
+ Source.Detach();
+ }
+ else
+ {
+ for (size_t BlockIndex = FirstBlockIndex; BlockIndex <= LastBlockIndex; BlockIndex++)
+ {
+ const uint64_t UncompressedBlockSize = BlockIndex == Header.BlockCount - 1 ? LastBlockSize : BlockSize;
+ const uint32_t CompressedBlockSize = ByteSwap(CompressedBlockSizes[BlockIndex]);
+ const bool IsCompressed = CompressedBlockSize < UncompressedBlockSize;
+
+ const uint64_t BytesToUncompress = OffsetInFirstBlock > 0 ? zen::Min(RawSize, UncompressedBlockSize - OffsetInFirstBlock)
+ : zen::Min(RemainingRawSize, BlockSize);
+
+ MemoryView CompressedBlock = CompressedData.ViewOrCopyRange(CompressedOffset, CompressedBlockSize, CompressedBlockCopy);
+
+ if (IsCompressed)
+ {
+ if (RawDataBuffer.IsNull())
+ {
+ RawDataBuffer = UniqueBuffer::Alloc(zen::Min(RawSize, UncompressedBlockSize));
+ }
+ else
+ {
+ ZEN_ASSERT(RawDataBuffer.GetSize() >= UncompressedBlockSize);
+ }
+ MutableMemoryView UncompressedBlock = RawDataBuffer.GetMutableView().Left(UncompressedBlockSize);
+ if (!DecompressBlock(UncompressedBlock, CompressedBlock))
+ {
+ return false;
+ }
+ Callback(BlockIndex * BlockSize + OffsetInFirstBlock,
+ CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawDataBuffer.GetData(), BytesToUncompress)));
+ }
+ else
+ {
+ Callback(BlockIndex * BlockSize + OffsetInFirstBlock,
+ CompositeBuffer(
+ IoBuffer(IoBuffer::Wrap, CompressedBlockCopy.GetView().Mid(OffsetInFirstBlock).GetData(), BytesToUncompress)));
+ }
+
+ OffsetInFirstBlock = 0;
+ RemainingRawSize -= BytesToUncompress;
+ CompressedOffset += CompressedBlockSize;
+ }
+ }
+ return true;
+}
+
+bool
BlockDecoder::TryDecompressTo(const BufferHeader& Header,
const CompositeBuffer& CompressedData,
MutableMemoryView RawView,
@@ -1644,6 +1809,27 @@ CompressedBuffer::DecompressToComposite() const
}
bool
+CompressedBuffer::DecompressToStream(uint64_t RawOffset,
+ uint64_t RawSize,
+ std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const
+{
+ using namespace detail;
+ if (CompressedData)
+ {
+ const BufferHeader Header = BufferHeader::Read(CompressedData);
+ if (Header.Magic == BufferHeader::ExpectedMagic)
+ {
+ if (const BaseDecoder* const Decoder = GetDecoder(Header.Method))
+ {
+ const uint64_t TotalRawSize = RawSize < ~uint64_t(0) ? RawSize : Header.TotalRawSize - RawOffset;
+ return Decoder->DecompressToStream(Header, CompressedData, RawOffset, TotalRawSize, std::move(Callback));
+ }
+ }
+ }
+ return false;
+}
+
+bool
CompressedBuffer::TryGetCompressParameters(OodleCompressor& OutCompressor,
OodleCompressionLevel& OutCompressionLevel,
uint64_t& OutBlockSize) const
diff --git a/src/zencore/include/zencore/compress.h b/src/zencore/include/zencore/compress.h
index 5e761ceef..c056d7561 100644
--- a/src/zencore/include/zencore/compress.h
+++ b/src/zencore/include/zencore/compress.h
@@ -196,6 +196,16 @@ public:
*/
[[nodiscard]] ZENCORE_API CompositeBuffer DecompressToComposite() const;
+ /**
+ * Decompress into and call callback for ranges of decompressed data.
+ * The buffer in the callback will be overwritten when the callback returns.
+ *
+ * @return True if the buffer is valid and can be decompressed.
+ */
+ [[nodiscard]] ZENCORE_API bool DecompressToStream(uint64_t RawOffset,
+ uint64_t RawSize,
+ std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const;
+
/** A null compressed buffer. */
static const CompressedBuffer Null;