diff options
Diffstat (limited to 'src/zenutil/chunkblock.cpp')
| -rw-r--r-- | src/zenutil/chunkblock.cpp | 257 |
1 files changed, 257 insertions, 0 deletions
diff --git a/src/zenutil/chunkblock.cpp b/src/zenutil/chunkblock.cpp new file mode 100644 index 000000000..f3c14edc4 --- /dev/null +++ b/src/zenutil/chunkblock.cpp @@ -0,0 +1,257 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/chunkblock.h> + +#include <zencore/compactbinarybuilder.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> + +#include <vector> + +namespace zen { + +using namespace std::literals; + +ChunkBlockDescription +ParseChunkBlockDescription(const CbObjectView& BlockObject) +{ + ChunkBlockDescription Result; + Result.BlockHash = BlockObject["rawHash"sv].AsHash(); + if (Result.BlockHash != IoHash::Zero) + { + Result.HeaderSize = BlockObject["headerSize"sv].AsUInt64(); + CbArrayView ChunksArray = BlockObject["rawHashes"sv].AsArrayView(); + Result.ChunkRawHashes.reserve(ChunksArray.Num()); + for (CbFieldView ChunkView : ChunksArray) + { + Result.ChunkRawHashes.push_back(ChunkView.AsHash()); + } + + CbArrayView ChunkRawLengthsArray = BlockObject["chunkRawLengths"sv].AsArrayView(); + Result.ChunkRawLengths.reserve(ChunkRawLengthsArray.Num()); + for (CbFieldView ChunkView : ChunkRawLengthsArray) + { + Result.ChunkRawLengths.push_back(ChunkView.AsUInt32()); + } + + CbArrayView ChunkCompressedLengthsArray = BlockObject["chunkCompressedLengths"sv].AsArrayView(); + Result.ChunkCompressedLengths.reserve(ChunkCompressedLengthsArray.Num()); + for (CbFieldView ChunkView : ChunkCompressedLengthsArray) + { + Result.ChunkCompressedLengths.push_back(ChunkView.AsUInt32()); + } + } + return Result; +} + +std::vector<ChunkBlockDescription> +ParseChunkBlockDescriptionList(const CbObjectView& BlocksObject) +{ + if (!BlocksObject) + { + return {}; + } + std::vector<ChunkBlockDescription> Result; + CbArrayView Blocks = BlocksObject["blocks"].AsArrayView(); + Result.reserve(Blocks.Num()); + for (CbFieldView BlockView : Blocks) + { + CbObjectView BlockObject = BlockView.AsObjectView(); + Result.emplace_back(ParseChunkBlockDescription(BlockObject)); + } + return Result; +} + +CbObject +BuildChunkBlockDescription(const ChunkBlockDescription& Block, CbObjectView MetaData) +{ + ZEN_ASSERT(Block.BlockHash != IoHash::Zero); + ZEN_ASSERT(Block.HeaderSize > 0); + ZEN_ASSERT(Block.ChunkRawLengths.size() == Block.ChunkRawHashes.size()); + ZEN_ASSERT(Block.ChunkCompressedLengths.size() == Block.ChunkRawHashes.size()); + + CbObjectWriter Writer; + Writer.AddHash("rawHash"sv, Block.BlockHash); + Writer.AddInteger("headerSize"sv, Block.HeaderSize); + Writer.BeginArray("rawHashes"sv); + { + for (const IoHash& ChunkHash : Block.ChunkRawHashes) + { + Writer.AddHash(ChunkHash); + } + } + Writer.EndArray(); + + Writer.BeginArray("chunkRawLengths"); + { + for (uint32_t ChunkSize : Block.ChunkRawLengths) + { + Writer.AddInteger(ChunkSize); + } + } + Writer.EndArray(); + + Writer.BeginArray("chunkCompressedLengths"); + { + for (uint32_t ChunkSize : Block.ChunkCompressedLengths) + { + Writer.AddInteger(ChunkSize); + } + } + Writer.EndArray(); + + Writer.AddObject("metadata", MetaData); + + return Writer.Save(); +} + +ChunkBlockDescription +GetChunkBlockDescription(const SharedBuffer& BlockPayload, const IoHash& RawHash) +{ + ChunkBlockDescription BlockDescription = {{.BlockHash = IoHash::HashBuffer(BlockPayload)}}; + if (BlockDescription.BlockHash != RawHash) + { + throw std::runtime_error(fmt::format("Block {} content hash {} does not match block hash", RawHash, BlockDescription.BlockHash)); + } + if (IterateChunkBlock( + BlockPayload, + [&BlockDescription, RawHash](CompressedBuffer&& Chunk, const IoHash& AttachmentHash) { + if (CompositeBuffer Decompressed = Chunk.DecompressToComposite(); Decompressed) + { + IoHash ChunkHash = IoHash::HashBuffer(Decompressed.Flatten()); + if (ChunkHash != AttachmentHash) + { + throw std::runtime_error( + fmt::format("Chunk {} in block {} content hash {} does not match chunk", AttachmentHash, RawHash, ChunkHash)); + } + BlockDescription.ChunkRawHashes.push_back(AttachmentHash); + BlockDescription.ChunkRawLengths.push_back(gsl::narrow<uint32_t>(Decompressed.GetSize())); + BlockDescription.ChunkCompressedLengths.push_back(gsl::narrow<uint32_t>(Chunk.GetCompressedSize())); + } + else + { + throw std::runtime_error(fmt::format("Chunk {} in block {} is not a compressed buffer", AttachmentHash, RawHash)); + } + }, + BlockDescription.HeaderSize)) + { + return BlockDescription; + } + else + { + throw std::runtime_error(fmt::format("Block {} is malformed", RawHash)); + } +} + +CompressedBuffer +GenerateChunkBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks, ChunkBlockDescription& OutBlock) +{ + const size_t ChunkCount = FetchChunks.size(); + + std::vector<SharedBuffer> ChunkSegments; + ChunkSegments.resize(1); + ChunkSegments.reserve(1 + ChunkCount); + OutBlock.ChunkRawHashes.reserve(ChunkCount); + OutBlock.ChunkRawLengths.reserve(ChunkCount); + OutBlock.ChunkCompressedLengths.reserve(ChunkCount); + { + IoBuffer TempBuffer(ChunkCount * 9); + MutableMemoryView View = TempBuffer.GetMutableView(); + uint8_t* BufferStartPtr = reinterpret_cast<uint8_t*>(View.GetData()); + uint8_t* BufferEndPtr = BufferStartPtr; + BufferEndPtr += WriteVarUInt(gsl::narrow<uint64_t>(ChunkCount), BufferEndPtr); + for (const auto& It : FetchChunks) + { + std::pair<uint64_t, CompressedBuffer> Chunk = It.second(It.first); + uint64_t ChunkSize = 0; + std::span<const SharedBuffer> Segments = Chunk.second.GetCompressed().GetSegments(); + for (const SharedBuffer& Segment : Segments) + { + ZEN_ASSERT(Segment.IsOwned()); + ChunkSize += Segment.GetSize(); + ChunkSegments.push_back(Segment); + } + BufferEndPtr += WriteVarUInt(ChunkSize, BufferEndPtr); + OutBlock.ChunkRawHashes.push_back(It.first); + OutBlock.ChunkRawLengths.push_back(gsl::narrow<uint32_t>(Chunk.first)); + OutBlock.ChunkCompressedLengths.push_back(gsl::narrow<uint32_t>(ChunkSize)); + } + ZEN_ASSERT(BufferEndPtr <= View.GetDataEnd()); + ptrdiff_t TempBufferLength = std::distance(BufferStartPtr, BufferEndPtr); + ChunkSegments[0] = SharedBuffer(IoBuffer(TempBuffer, 0, gsl::narrow<size_t>(TempBufferLength))); + OutBlock.HeaderSize = TempBufferLength; + } + CompressedBuffer CompressedBlock = + CompressedBuffer::Compress(CompositeBuffer(std::move(ChunkSegments)), OodleCompressor::Mermaid, OodleCompressionLevel::None); + OutBlock.BlockHash = CompressedBlock.DecodeRawHash(); + return CompressedBlock; +} + +std::vector<uint32_t> +ReadChunkBlockHeader(const MemoryView BlockView, uint64_t& OutHeaderSize) +{ + const uint8_t* ReadPtr = reinterpret_cast<const uint8_t*>(BlockView.GetData()); + uint32_t NumberSize; + uint64_t ChunkCount = ReadVarUInt(ReadPtr, NumberSize); + ReadPtr += NumberSize; + std::vector<uint32_t> ChunkSizes; + ChunkSizes.reserve(ChunkCount); + while (ChunkCount--) + { + if (ReadPtr >= BlockView.GetDataEnd()) + { + throw std::runtime_error("Invalid block header, block data ended unexpectedly"); + } + uint64_t ChunkSize = ReadVarUInt(ReadPtr, NumberSize); + if (ChunkSize > std::numeric_limits<uint32_t>::max()) + { + throw std::runtime_error("Invalid block header, header data is corrupt"); + } + if (ChunkSize < 1) + { + throw std::runtime_error("Invalid block header, header data is corrupt"); + } + ChunkSizes.push_back(gsl::narrow<uint32_t>(ChunkSize)); + ReadPtr += NumberSize; + } + uint64_t Offset = std::distance((const uint8_t*)BlockView.GetData(), ReadPtr); + OutHeaderSize = Offset; + return ChunkSizes; +} + +bool +IterateChunkBlock(const SharedBuffer& BlockPayload, + std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor, + uint64_t& OutHeaderSize) +{ + ZEN_ASSERT(BlockPayload); + if (BlockPayload.GetSize() < 1) + { + return false; + } + + MemoryView BlockView = BlockPayload.GetView(); + + std::vector<uint32_t> ChunkSizes = ReadChunkBlockHeader(BlockView, OutHeaderSize); + uint64_t Offset = OutHeaderSize; + OutHeaderSize = Offset; + for (uint64_t ChunkSize : ChunkSizes) + { + IoBuffer Chunk(BlockPayload.AsIoBuffer(), Offset, ChunkSize); + IoHash AttachmentRawHash; + uint64_t AttachmentRawSize; + CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), AttachmentRawHash, AttachmentRawSize); + ZEN_ASSERT_SLOW(IoHash::HashBuffer(CompressedChunk.DecompressToComposite()) == AttachmentRawHash); + if (!CompressedChunk) + { + ZEN_ERROR("Invalid chunk in block"); + return false; + } + Visitor(std::move(CompressedChunk), AttachmentRawHash); + Offset += ChunkSize; + ZEN_ASSERT(Offset <= BlockView.GetSize()); + } + return true; +}; + +} // namespace zen |