diff options
| author | Dan Engelbrecht <[email protected]> | 2025-10-03 11:49:14 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-03 11:49:14 +0200 |
| commit | faf0b7c9b6a08b095f8dc895904f4f7d3f30dcde (patch) | |
| tree | 2bcd09fe17af6f25108fd05578e7eda6a827d8ec /src/zenutil/chunkblock.cpp | |
| parent | cache RPC replay fixes (minor) (#544) (diff) | |
| download | zen-faf0b7c9b6a08b095f8dc895904f4f7d3f30dcde.tar.xz zen-faf0b7c9b6a08b095f8dc895904f4f7d3f30dcde.zip | |
move chunking code to zenremotestore lib (#545)
Diffstat (limited to 'src/zenutil/chunkblock.cpp')
| -rw-r--r-- | src/zenutil/chunkblock.cpp | 257 |
1 files changed, 0 insertions, 257 deletions
diff --git a/src/zenutil/chunkblock.cpp b/src/zenutil/chunkblock.cpp deleted file mode 100644 index abfc0fb63..000000000 --- a/src/zenutil/chunkblock.cpp +++ /dev/null @@ -1,257 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include <zenutil/chunkblock.h> - -#include <zencore/compactbinarybuilder.h> -#include <zencore/fmtutils.h> -#include <zencore/logging.h> - -#include <vector> - -namespace zen { - -using namespace std::literals; - -ChunkBlockDescription -ParseChunkBlockDescription(const CbObjectView& BlockObject) -{ - ChunkBlockDescription Result; - Result.BlockHash = BlockObject["rawHash"sv].AsHash(); - if (Result.BlockHash != IoHash::Zero) - { - Result.HeaderSize = BlockObject["headerSize"sv].AsUInt64(); - CbArrayView ChunksArray = BlockObject["rawHashes"sv].AsArrayView(); - Result.ChunkRawHashes.reserve(ChunksArray.Num()); - for (CbFieldView ChunkView : ChunksArray) - { - Result.ChunkRawHashes.push_back(ChunkView.AsHash()); - } - - CbArrayView ChunkRawLengthsArray = BlockObject["chunkRawLengths"sv].AsArrayView(); - Result.ChunkRawLengths.reserve(ChunkRawLengthsArray.Num()); - for (CbFieldView ChunkView : ChunkRawLengthsArray) - { - Result.ChunkRawLengths.push_back(ChunkView.AsUInt32()); - } - - CbArrayView ChunkCompressedLengthsArray = BlockObject["chunkCompressedLengths"sv].AsArrayView(); - Result.ChunkCompressedLengths.reserve(ChunkCompressedLengthsArray.Num()); - for (CbFieldView ChunkView : ChunkCompressedLengthsArray) - { - Result.ChunkCompressedLengths.push_back(ChunkView.AsUInt32()); - } - } - return Result; -} - -std::vector<ChunkBlockDescription> -ParseChunkBlockDescriptionList(const CbObjectView& BlocksObject) -{ - if (!BlocksObject) - { - return {}; - } - std::vector<ChunkBlockDescription> Result; - CbArrayView Blocks = BlocksObject["blocks"sv].AsArrayView(); - Result.reserve(Blocks.Num()); - for (CbFieldView BlockView : Blocks) - { - CbObjectView BlockObject = BlockView.AsObjectView(); - Result.emplace_back(ParseChunkBlockDescription(BlockObject)); - } - return Result; -} - -CbObject -BuildChunkBlockDescription(const ChunkBlockDescription& Block, CbObjectView MetaData) -{ - ZEN_ASSERT(Block.BlockHash != IoHash::Zero); - ZEN_ASSERT(Block.HeaderSize > 0); - ZEN_ASSERT(Block.ChunkRawLengths.size() == Block.ChunkRawHashes.size()); - ZEN_ASSERT(Block.ChunkCompressedLengths.size() == Block.ChunkRawHashes.size()); - - CbObjectWriter Writer; - Writer.AddHash("rawHash"sv, Block.BlockHash); - Writer.AddInteger("headerSize"sv, Block.HeaderSize); - Writer.BeginArray("rawHashes"sv); - { - for (const IoHash& ChunkHash : Block.ChunkRawHashes) - { - Writer.AddHash(ChunkHash); - } - } - Writer.EndArray(); - - Writer.BeginArray("chunkRawLengths"); - { - for (uint32_t ChunkSize : Block.ChunkRawLengths) - { - Writer.AddInteger(ChunkSize); - } - } - Writer.EndArray(); - - Writer.BeginArray("chunkCompressedLengths"); - { - for (uint32_t ChunkSize : Block.ChunkCompressedLengths) - { - Writer.AddInteger(ChunkSize); - } - } - Writer.EndArray(); - - Writer.AddObject("metadata", MetaData); - - return Writer.Save(); -} - -ChunkBlockDescription -GetChunkBlockDescription(const SharedBuffer& BlockPayload, const IoHash& RawHash) -{ - ChunkBlockDescription BlockDescription = {{.BlockHash = IoHash::HashBuffer(BlockPayload)}}; - if (BlockDescription.BlockHash != RawHash) - { - throw std::runtime_error(fmt::format("Block {} content hash {} does not match block hash", RawHash, BlockDescription.BlockHash)); - } - if (IterateChunkBlock( - BlockPayload, - [&BlockDescription, RawHash](CompressedBuffer&& Chunk, const IoHash& AttachmentHash) { - if (CompositeBuffer Decompressed = Chunk.DecompressToComposite(); Decompressed) - { - IoHash ChunkHash = IoHash::HashBuffer(Decompressed.Flatten()); - if (ChunkHash != AttachmentHash) - { - throw std::runtime_error( - fmt::format("Chunk {} in block {} content hash {} does not match chunk", AttachmentHash, RawHash, ChunkHash)); - } - BlockDescription.ChunkRawHashes.push_back(AttachmentHash); - BlockDescription.ChunkRawLengths.push_back(gsl::narrow<uint32_t>(Decompressed.GetSize())); - BlockDescription.ChunkCompressedLengths.push_back(gsl::narrow<uint32_t>(Chunk.GetCompressedSize())); - } - else - { - throw std::runtime_error(fmt::format("Chunk {} in block {} is not a compressed buffer", AttachmentHash, RawHash)); - } - }, - BlockDescription.HeaderSize)) - { - return BlockDescription; - } - else - { - throw std::runtime_error(fmt::format("Block {} is malformed", RawHash)); - } -} - -CompressedBuffer -GenerateChunkBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks, ChunkBlockDescription& OutBlock) -{ - const size_t ChunkCount = FetchChunks.size(); - - std::vector<SharedBuffer> ChunkSegments; - ChunkSegments.resize(1); - ChunkSegments.reserve(1 + ChunkCount); - OutBlock.ChunkRawHashes.reserve(ChunkCount); - OutBlock.ChunkRawLengths.reserve(ChunkCount); - OutBlock.ChunkCompressedLengths.reserve(ChunkCount); - { - IoBuffer TempBuffer(ChunkCount * 9); - MutableMemoryView View = TempBuffer.GetMutableView(); - uint8_t* BufferStartPtr = reinterpret_cast<uint8_t*>(View.GetData()); - uint8_t* BufferEndPtr = BufferStartPtr; - BufferEndPtr += WriteVarUInt(gsl::narrow<uint64_t>(ChunkCount), BufferEndPtr); - for (const auto& It : FetchChunks) - { - std::pair<uint64_t, CompressedBuffer> Chunk = It.second(It.first); - uint64_t ChunkSize = 0; - std::span<const SharedBuffer> Segments = Chunk.second.GetCompressed().GetSegments(); - for (const SharedBuffer& Segment : Segments) - { - ZEN_ASSERT(Segment.IsOwned()); - ChunkSize += Segment.GetSize(); - ChunkSegments.push_back(Segment); - } - BufferEndPtr += WriteVarUInt(ChunkSize, BufferEndPtr); - OutBlock.ChunkRawHashes.push_back(It.first); - OutBlock.ChunkRawLengths.push_back(gsl::narrow<uint32_t>(Chunk.first)); - OutBlock.ChunkCompressedLengths.push_back(gsl::narrow<uint32_t>(ChunkSize)); - } - ZEN_ASSERT(BufferEndPtr <= View.GetDataEnd()); - ptrdiff_t TempBufferLength = std::distance(BufferStartPtr, BufferEndPtr); - ChunkSegments[0] = SharedBuffer(IoBuffer(TempBuffer, 0, gsl::narrow<size_t>(TempBufferLength))); - OutBlock.HeaderSize = TempBufferLength; - } - CompressedBuffer CompressedBlock = - CompressedBuffer::Compress(CompositeBuffer(std::move(ChunkSegments)), OodleCompressor::Mermaid, OodleCompressionLevel::None); - OutBlock.BlockHash = CompressedBlock.DecodeRawHash(); - return CompressedBlock; -} - -std::vector<uint32_t> -ReadChunkBlockHeader(const MemoryView BlockView, uint64_t& OutHeaderSize) -{ - const uint8_t* ReadPtr = reinterpret_cast<const uint8_t*>(BlockView.GetData()); - uint32_t NumberSize; - uint64_t ChunkCount = ReadVarUInt(ReadPtr, NumberSize); - ReadPtr += NumberSize; - std::vector<uint32_t> ChunkSizes; - ChunkSizes.reserve(ChunkCount); - while (ChunkCount--) - { - if (ReadPtr >= BlockView.GetDataEnd()) - { - throw std::runtime_error("Invalid block header, block data ended unexpectedly"); - } - uint64_t ChunkSize = ReadVarUInt(ReadPtr, NumberSize); - if (ChunkSize > std::numeric_limits<uint32_t>::max()) - { - throw std::runtime_error("Invalid block header, header data is corrupt"); - } - if (ChunkSize < 1) - { - throw std::runtime_error("Invalid block header, header data is corrupt"); - } - ChunkSizes.push_back(gsl::narrow<uint32_t>(ChunkSize)); - ReadPtr += NumberSize; - } - uint64_t Offset = std::distance((const uint8_t*)BlockView.GetData(), ReadPtr); - OutHeaderSize = Offset; - return ChunkSizes; -} - -bool -IterateChunkBlock(const SharedBuffer& BlockPayload, - std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor, - uint64_t& OutHeaderSize) -{ - ZEN_ASSERT(BlockPayload); - if (BlockPayload.GetSize() < 1) - { - return false; - } - - MemoryView BlockView = BlockPayload.GetView(); - - std::vector<uint32_t> ChunkSizes = ReadChunkBlockHeader(BlockView, OutHeaderSize); - uint64_t Offset = OutHeaderSize; - OutHeaderSize = Offset; - for (uint64_t ChunkSize : ChunkSizes) - { - IoBuffer Chunk(BlockPayload.AsIoBuffer(), Offset, ChunkSize); - IoHash AttachmentRawHash; - uint64_t AttachmentRawSize; - CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), AttachmentRawHash, AttachmentRawSize); - ZEN_ASSERT_SLOW(IoHash::HashBuffer(CompressedChunk.DecompressToComposite()) == AttachmentRawHash); - if (!CompressedChunk) - { - ZEN_ERROR("Invalid chunk in block"); - return false; - } - Visitor(std::move(CompressedChunk), AttachmentRawHash); - Offset += ChunkSize; - ZEN_ASSERT(Offset <= BlockView.GetSize()); - } - return true; -}; - -} // namespace zen |