aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/chunkblock.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenutil/chunkblock.cpp')
-rw-r--r--src/zenutil/chunkblock.cpp94
1 files changed, 81 insertions, 13 deletions
diff --git a/src/zenutil/chunkblock.cpp b/src/zenutil/chunkblock.cpp
index 6dae5af11..a19cf5c1b 100644
--- a/src/zenutil/chunkblock.cpp
+++ b/src/zenutil/chunkblock.cpp
@@ -3,6 +3,7 @@
#include <zenutil/chunkblock.h>
#include <zencore/compactbinarybuilder.h>
+#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <vector>
@@ -18,20 +19,27 @@ ParseChunkBlockDescription(const CbObjectView& BlockObject)
Result.BlockHash = BlockObject["rawHash"sv].AsHash();
if (Result.BlockHash != IoHash::Zero)
{
+ Result.HeaderSize = BlockObject["headerSize"sv].AsUInt64();
CbArrayView ChunksArray = BlockObject["rawHashes"sv].AsArrayView();
- Result.ChunkHashes.reserve(ChunksArray.Num());
+ Result.ChunkRawHashes.reserve(ChunksArray.Num());
for (CbFieldView ChunkView : ChunksArray)
{
- Result.ChunkHashes.push_back(ChunkView.AsHash());
+ Result.ChunkRawHashes.push_back(ChunkView.AsHash());
}
- CbArrayView ChunkRawLengthsArray = BlockObject["chunkRawLengths"sv].AsArrayView();
- std::vector<uint32_t> ChunkLengths;
+ CbArrayView ChunkRawLengthsArray = BlockObject["chunkRawLengths"sv].AsArrayView();
Result.ChunkRawLengths.reserve(ChunkRawLengthsArray.Num());
for (CbFieldView ChunkView : ChunkRawLengthsArray)
{
Result.ChunkRawLengths.push_back(ChunkView.AsUInt32());
}
+
+ CbArrayView ChunkCompressedLengthsArray = BlockObject["chunkCompressedLengths"sv].AsArrayView();
+ Result.ChunkCompressedLengths.reserve(ChunkCompressedLengthsArray.Num());
+ for (CbFieldView ChunkView : ChunkCompressedLengthsArray)
+ {
+ Result.ChunkCompressedLengths.push_back(ChunkView.AsUInt32());
+ }
}
return Result;
}
@@ -57,18 +65,23 @@ ParseChunkBlockDescriptionList(const CbObjectView& BlocksObject)
CbObject
BuildChunkBlockDescription(const ChunkBlockDescription& Block, CbObjectView MetaData)
{
- ZEN_ASSERT(Block.ChunkRawLengths.size() == Block.ChunkHashes.size());
+ ZEN_ASSERT(Block.BlockHash != IoHash::Zero);
+ ZEN_ASSERT(Block.HeaderSize > 0);
+ ZEN_ASSERT(Block.ChunkRawLengths.size() == Block.ChunkRawHashes.size());
+ ZEN_ASSERT(Block.ChunkCompressedLengths.size() == Block.ChunkRawHashes.size());
CbObjectWriter Writer;
Writer.AddHash("rawHash"sv, Block.BlockHash);
+ Writer.AddInteger("headerSize"sv, Block.HeaderSize);
Writer.BeginArray("rawHashes"sv);
{
- for (const IoHash& ChunkHash : Block.ChunkHashes)
+ for (const IoHash& ChunkHash : Block.ChunkRawHashes)
{
Writer.AddHash(ChunkHash);
}
}
Writer.EndArray();
+
Writer.BeginArray("chunkRawLengths");
{
for (uint32_t ChunkSize : Block.ChunkRawLengths)
@@ -78,11 +91,58 @@ BuildChunkBlockDescription(const ChunkBlockDescription& Block, CbObjectView Meta
}
Writer.EndArray();
+ Writer.BeginArray("chunkCompressedLengths");
+ {
+ for (uint32_t ChunkSize : Block.ChunkCompressedLengths)
+ {
+ Writer.AddInteger(ChunkSize);
+ }
+ }
+ Writer.EndArray();
+
Writer.AddObject("metadata", MetaData);
return Writer.Save();
}
+ChunkBlockDescription
+GetChunkBlockDescription(const SharedBuffer& BlockPayload, const IoHash& RawHash)
+{
+ ChunkBlockDescription BlockDescription = {{.BlockHash = IoHash::HashBuffer(BlockPayload)}};
+ if (BlockDescription.BlockHash != RawHash)
+ {
+ throw std::runtime_error(fmt::format("Block {} content hash {} does not match block hash", RawHash, BlockDescription.BlockHash));
+ }
+ if (IterateChunkBlock(
+ BlockPayload,
+ [&BlockDescription, RawHash](CompressedBuffer&& Chunk, const IoHash& AttachmentHash) {
+ if (CompositeBuffer Decompressed = Chunk.DecompressToComposite(); Decompressed)
+ {
+ IoHash ChunkHash = IoHash::HashBuffer(Decompressed.Flatten());
+ if (ChunkHash != AttachmentHash)
+ {
+ throw std::runtime_error(
+ fmt::format("Chunk {} in block {} content hash {} does not match chunk", AttachmentHash, RawHash, ChunkHash));
+ }
+ BlockDescription.ChunkRawHashes.push_back(AttachmentHash);
+ BlockDescription.ChunkRawLengths.push_back(gsl::narrow<uint32_t>(Decompressed.GetSize()));
+ BlockDescription.ChunkCompressedLengths.push_back(gsl::narrow<uint32_t>(Chunk.GetCompressedSize()));
+ }
+ else
+ {
+ throw std::runtime_error(fmt::format("Chunk {} in block {} is not a compressed buffer", AttachmentHash, RawHash));
+ }
+ },
+ BlockDescription.HeaderSize))
+ {
+ return BlockDescription;
+ }
+ else
+ {
+ throw std::runtime_error(fmt::format("Block {} is malformed", RawHash));
+ }
+}
+
CompressedBuffer
GenerateChunkBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks, ChunkBlockDescription& OutBlock)
{
@@ -91,8 +151,9 @@ GenerateChunkBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks,
std::vector<SharedBuffer> ChunkSegments;
ChunkSegments.resize(1);
ChunkSegments.reserve(1 + ChunkCount);
- OutBlock.ChunkHashes.reserve(ChunkCount);
+ OutBlock.ChunkRawHashes.reserve(ChunkCount);
OutBlock.ChunkRawLengths.reserve(ChunkCount);
+ OutBlock.ChunkCompressedLengths.reserve(ChunkCount);
{
IoBuffer TempBuffer(ChunkCount * 9);
MutableMemoryView View = TempBuffer.GetMutableView();
@@ -106,16 +167,19 @@ GenerateChunkBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks,
std::span<const SharedBuffer> Segments = Chunk.second.GetCompressed().GetSegments();
for (const SharedBuffer& Segment : Segments)
{
+ ZEN_ASSERT(Segment.IsOwned());
ChunkSize += Segment.GetSize();
ChunkSegments.push_back(Segment);
}
BufferEndPtr += WriteVarUInt(ChunkSize, BufferEndPtr);
- OutBlock.ChunkHashes.push_back(It.first);
+ OutBlock.ChunkRawHashes.push_back(It.first);
OutBlock.ChunkRawLengths.push_back(gsl::narrow<uint32_t>(Chunk.first));
+ OutBlock.ChunkCompressedLengths.push_back(gsl::narrow<uint32_t>(ChunkSize));
}
ZEN_ASSERT(BufferEndPtr <= View.GetDataEnd());
ptrdiff_t TempBufferLength = std::distance(BufferStartPtr, BufferEndPtr);
ChunkSegments[0] = SharedBuffer(IoBuffer(TempBuffer, 0, gsl::narrow<size_t>(TempBufferLength)));
+ OutBlock.HeaderSize = TempBufferLength;
}
CompressedBuffer CompressedBlock =
CompressedBuffer::Compress(CompositeBuffer(std::move(ChunkSegments)), OodleCompressor::Mermaid, OodleCompressionLevel::None);
@@ -124,7 +188,9 @@ GenerateChunkBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks,
}
bool
-IterateChunkBlock(const SharedBuffer& BlockPayload, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor)
+IterateChunkBlock(const SharedBuffer& BlockPayload,
+ std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor,
+ uint64_t& OutHeaderSize)
{
ZEN_ASSERT(BlockPayload);
if (BlockPayload.GetSize() < 1)
@@ -144,21 +210,23 @@ IterateChunkBlock(const SharedBuffer& BlockPayload, std::function<void(Compresse
ChunkSizes.push_back(ReadVarUInt(ReadPtr, NumberSize));
ReadPtr += NumberSize;
}
+ uint64_t Offset = std::distance((const uint8_t*)BlockView.GetData(), ReadPtr);
+ OutHeaderSize = Offset;
for (uint64_t ChunkSize : ChunkSizes)
{
- IoBuffer Chunk(IoBuffer::Wrap, ReadPtr, ChunkSize);
+ IoBuffer Chunk(BlockPayload.AsIoBuffer(), Offset, ChunkSize);
IoHash AttachmentRawHash;
uint64_t AttachmentRawSize;
CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), AttachmentRawHash, AttachmentRawSize);
-
+ ZEN_ASSERT_SLOW(IoHash::HashBuffer(CompressedChunk.DecompressToComposite()) == AttachmentRawHash);
if (!CompressedChunk)
{
ZEN_ERROR("Invalid chunk in block");
return false;
}
Visitor(std::move(CompressedChunk), AttachmentRawHash);
- ReadPtr += ChunkSize;
- ZEN_ASSERT(ReadPtr <= BlockView.GetDataEnd());
+ Offset += ChunkSize;
+ ZEN_ASSERT(Offset <= BlockView.GetSize());
}
return true;
};