diff options
| author | Dan Engelbrecht <[email protected]> | 2025-02-12 09:02:35 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-02-12 09:02:35 +0100 |
| commit | da9179d330a37132488f6deb8d8068783b087256 (patch) | |
| tree | 3309dfe685495bab7f18068f7c0d1dbd76a4b536 /src | |
| parent | improved builds api interface in jupiter (#281) (diff) | |
| download | zen-da9179d330a37132488f6deb8d8068783b087256.tar.xz zen-da9179d330a37132488f6deb8d8068783b087256.zip | |
moving and small refactor of chunk blocks to prepare for builds api (#282)
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenserver/projectstore/buildsremoteprojectstore.cpp | 77 | ||||
| -rw-r--r-- | src/zenserver/projectstore/fileremoteprojectstore.cpp | 6 | ||||
| -rw-r--r-- | src/zenserver/projectstore/jupiterremoteprojectstore.cpp | 4 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 16 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 226 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.h | 31 | ||||
| -rw-r--r-- | src/zenserver/projectstore/zenremoteprojectstore.cpp | 2 | ||||
| -rw-r--r-- | src/zenutil/chunkblock.cpp | 166 | ||||
| -rw-r--r-- | src/zenutil/chunkedfile.cpp (renamed from src/zenstore/chunkedfile.cpp) | 9 | ||||
| -rw-r--r-- | src/zenutil/chunking.cpp (renamed from src/zenstore/chunking.cpp) | 0 | ||||
| -rw-r--r-- | src/zenutil/chunking.h (renamed from src/zenstore/chunking.h) | 0 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/chunkblock.h | 32 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/chunkedfile.h (renamed from src/zenstore/include/zenstore/chunkedfile.h) | 6 | ||||
| -rw-r--r-- | src/zenutil/zenutil.cpp | 2 |
14 files changed, 334 insertions, 243 deletions
diff --git a/src/zenserver/projectstore/buildsremoteprojectstore.cpp b/src/zenserver/projectstore/buildsremoteprojectstore.cpp index 302b81729..412769174 100644 --- a/src/zenserver/projectstore/buildsremoteprojectstore.cpp +++ b/src/zenserver/projectstore/buildsremoteprojectstore.cpp @@ -3,6 +3,7 @@ #include "buildsremoteprojectstore.h" #include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryvalidation.h> #include <zencore/compress.h> #include <zencore/fmtutils.h> @@ -114,7 +115,9 @@ public: return Result; } - virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, Block&& Block) override + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, + const IoHash& RawHash, + ChunkBlockDescription&& Block) override { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); @@ -139,44 +142,10 @@ public: if (Block.BlockHash == RawHash) { - ZEN_ASSERT(Block.ChunkLengths.size() == Block.ChunkHashes.size()); - CbObjectWriter Writer; - Writer.AddHash("rawHash"sv, RawHash); - Writer.BeginArray("rawHashes"sv); - { - for (const IoHash& ChunkHash : Block.ChunkHashes) - { - Writer.AddHash(ChunkHash); - } - } - Writer.EndArray(); - Writer.BeginArray("chunkLengths"); - { - for (uint32_t ChunkSize : Block.ChunkLengths) - { - Writer.AddInteger(ChunkSize); - } - } - Writer.EndArray(); - Writer.BeginArray("chunkOffsets"); - { - ZEN_ASSERT(Block.FirstChunkOffset != (uint32_t)-1); - uint32_t Offset = Block.FirstChunkOffset; - for (uint32_t ChunkSize : Block.ChunkLengths) - { - Writer.AddInteger(Offset); - Offset += ChunkSize; - } - } - Writer.EndArray(); + CbObjectWriter BlockMetaData; + BlockMetaData.AddString("createdBy", GetRunningExecutablePath().stem().string()); - Writer.BeginObject("metadata"sv); - { - Writer.AddString("createdBy", "zenserver"); - } - Writer.EndObject(); - - IoBuffer MetaPayload = Writer.Save().GetBuffer().AsIoBuffer(); + IoBuffer MetaPayload = BuildChunkBlockDescription(Block, BlockMetaData.Save()).GetBuffer().AsIoBuffer(); MetaPayload.SetContentType(ZenContentType::kCbObject); JupiterResult PutMetaResult = Session.PutBlockMetadata(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash, MetaPayload); @@ -357,8 +326,7 @@ public: Result.Reason); return Result; } - CbObject BlocksObject = LoadCompactBinaryObject(FindResult.Response); - if (!BlocksObject) + if (ValidateCompactBinary(FindResult.Response.GetView(), CbValidateMode::Default) != CbValidateError::None) { Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("The block list {}/{}/{}/{} is not formatted as a compact binary object"sv, @@ -369,25 +337,20 @@ public: m_OplogBuildPartId); return Result; } - - CbArrayView Blocks = BlocksObject["blocks"].AsArrayView(); - Result.Blocks.reserve(Blocks.Num()); - for (CbFieldView BlockView : Blocks) + std::optional<std::vector<ChunkBlockDescription>> Blocks = + ParseChunkBlockDescriptionList(LoadCompactBinaryObject(FindResult.Response)); + if (!Blocks) { - CbObjectView BlockObject = BlockView.AsObjectView(); - IoHash BlockHash = BlockObject["rawHash"sv].AsHash(); - if (BlockHash != IoHash::Zero) - { - CbArrayView ChunksArray = BlockObject["rawHashes"sv].AsArrayView(); - std::vector<IoHash> ChunkHashes; - ChunkHashes.reserve(ChunksArray.Num()); - for (CbFieldView ChunkView : ChunksArray) - { - ChunkHashes.push_back(ChunkView.AsHash()); - } - Result.Blocks.emplace_back(Block{.BlockHash = BlockHash, .ChunkHashes = ChunkHashes}); - } + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = fmt::format("The block list {}/{}/{}/{} is not formatted as a list of blocks"sv, + m_JupiterClient->ServiceUrl(), + m_Namespace, + m_Bucket, + m_BuildId, + m_OplogBuildPartId); + return Result; } + Result.Blocks = std::move(Blocks.value()); return Result; } diff --git a/src/zenserver/projectstore/fileremoteprojectstore.cpp b/src/zenserver/projectstore/fileremoteprojectstore.cpp index 0fe739a12..5a21a7540 100644 --- a/src/zenserver/projectstore/fileremoteprojectstore.cpp +++ b/src/zenserver/projectstore/fileremoteprojectstore.cpp @@ -106,7 +106,7 @@ public: return Result; } - virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, Block&&) override + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&&) override { Stopwatch Timer; SaveAttachmentResult Result; @@ -192,8 +192,8 @@ public: return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent), .ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000}}; } - std::vector<RemoteProjectStore::Block> KnownBlocks = GetBlocksFromOplog(LoadResult.ContainerObject, ExistingBlockHashes); - GetKnownBlocksResult Result{{.ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000}}; + std::vector<ChunkBlockDescription> KnownBlocks = GetBlocksFromOplog(LoadResult.ContainerObject, ExistingBlockHashes); + GetKnownBlocksResult Result{{.ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000}}; Result.Blocks = std::move(KnownBlocks); return Result; } diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp index e906127ff..2b6a437d1 100644 --- a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp +++ b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp @@ -92,7 +92,7 @@ public: return Result; } - virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, Block&&) override + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&&) override { JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); JupiterResult PutResult = Session.PutCompressedBlob(m_Namespace, RawHash, Payload); @@ -193,7 +193,7 @@ public: return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent), .ElapsedSeconds = LoadResult.ElapsedSeconds + ExistsResult.ElapsedSeconds}}; } - std::vector<RemoteProjectStore::Block> KnownBlocks = GetBlocksFromOplog(LoadResult.ContainerObject, ExistingBlockHashes); + std::vector<ChunkBlockDescription> KnownBlocks = GetBlocksFromOplog(LoadResult.ContainerObject, ExistingBlockHashes); GetKnownBlocksResult Result{ {.ElapsedSeconds = LoadResult.ElapsedSeconds + ExistsResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000.0}}; diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 46a236af9..f6f7eba99 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -5347,7 +5347,7 @@ ProjectStore::ReadOplog(const std::string_view ProjectId, /* BuildBlocks */ false, /* IgnoreMissingAttachments */ false, /* AllowChunking*/ false, - [](CompressedBuffer&&, RemoteProjectStore::Block&&) {}, + [](CompressedBuffer&&, ChunkBlockDescription&&) {}, [](const IoHash&, TGetAttachmentBufferFunc&&) {}, [](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) {}, /* EmbedLooseFiles*/ false); @@ -8621,14 +8621,14 @@ TEST_CASE("project.store.block") Chunks.reserve(AttachmentSizes.size()); for (const auto& It : AttachmentsWithId) { - Chunks.push_back(std::make_pair(It.second.DecodeRawHash(), - [Buffer = It.second.GetCompressed().Flatten().AsIoBuffer()](const IoHash&) -> CompositeBuffer { - return CompositeBuffer(SharedBuffer(Buffer)); - })); + Chunks.push_back( + std::make_pair(It.second.DecodeRawHash(), [Buffer = It.second](const IoHash&) -> std::pair<uint64_t, CompressedBuffer> { + return {Buffer.DecodeRawSize(), Buffer}; + })); } - RemoteProjectStore::Block Block; - CompressedBuffer BlockBuffer = GenerateBlock(std::move(Chunks), Block); - CHECK(IterateBlock(BlockBuffer.Decompress(), [](CompressedBuffer&&, const IoHash&) {})); + ChunkBlockDescription Block; + CompressedBuffer BlockBuffer = GenerateChunkBlock(std::move(Chunks), Block); + CHECK(IterateChunkBlock(BlockBuffer.Decompress(), [](CompressedBuffer&&, const IoHash&) {})); } TEST_CASE("project.store.iterateoplog") diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index 0589fdc5f..5b75a840e 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -12,8 +12,8 @@ #include <zencore/stream.h> #include <zencore/timer.h> #include <zencore/workthreadpool.h> -#include <zenstore/chunkedfile.h> #include <zenstore/cidstore.h> +#include <zenutil/chunkedfile.h> #include <zenutil/workerpools.h> #include <unordered_map> @@ -143,7 +143,7 @@ namespace remotestore_impl { NiceBytes(Stats.m_PeakReceivedBytes)); } - size_t AddBlock(RwLock& BlocksLock, std::vector<RemoteProjectStore::Block>& Blocks) + size_t AddBlock(RwLock& BlocksLock, std::vector<ChunkBlockDescription>& Blocks) { size_t BlockIndex; { @@ -573,7 +573,7 @@ namespace remotestore_impl { return; } - bool StoreChunksOK = IterateBlock( + bool StoreChunksOK = IterateChunkBlock( BlockPayload, [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) { @@ -738,14 +738,14 @@ namespace remotestore_impl { }); }; - void CreateBlock(WorkerThreadPool& WorkerPool, - Latch& OpSectionsLatch, - std::vector<std::pair<IoHash, FetchChunkFunc>>&& ChunksInBlock, - RwLock& SectionsLock, - std::vector<RemoteProjectStore::Block>& Blocks, - size_t BlockIndex, - const std::function<void(CompressedBuffer&&, RemoteProjectStore::Block&&)>& AsyncOnBlock, - AsyncRemoteResult& RemoteResult) + void CreateBlock(WorkerThreadPool& WorkerPool, + Latch& OpSectionsLatch, + std::vector<std::pair<IoHash, FetchChunkFunc>>&& ChunksInBlock, + RwLock& SectionsLock, + std::vector<ChunkBlockDescription>& Blocks, + size_t BlockIndex, + const std::function<void(CompressedBuffer&&, ChunkBlockDescription&&)>& AsyncOnBlock, + AsyncRemoteResult& RemoteResult) { OpSectionsLatch.AddCount(1); WorkerPool.ScheduleWork([&Blocks, @@ -764,10 +764,10 @@ namespace remotestore_impl { try { ZEN_ASSERT(ChunkCount > 0); - Stopwatch Timer; - RemoteProjectStore::Block Block; - CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks), Block); - IoHash BlockHash = CompressedBlock.DecodeRawHash(); + Stopwatch Timer; + ChunkBlockDescription Block; + CompressedBuffer CompressedBlock = GenerateChunkBlock(std::move(Chunks), Block); + IoHash BlockHash = CompressedBlock.DecodeRawHash(); { // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index RwLock::SharedLockScope __(SectionsLock); @@ -800,8 +800,8 @@ namespace remotestore_impl { struct CreatedBlock { - IoBuffer Payload; - RemoteProjectStore::Block Block; + IoBuffer Payload; + ChunkBlockDescription Block; }; void UploadAttachments(WorkerThreadPool& WorkerPool, @@ -931,8 +931,8 @@ namespace remotestore_impl { } try { - IoBuffer Payload; - RemoteProjectStore::Block Block; + IoBuffer Payload; + ChunkBlockDescription Block; if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end()) { Payload = BlockIt->second.Payload; @@ -1058,7 +1058,7 @@ namespace remotestore_impl { { auto It = BulkBlockAttachmentsToUpload.find(Chunk); ZEN_ASSERT(It != BulkBlockAttachmentsToUpload.end()); - CompositeBuffer ChunkPayload = It->second(It->first); + CompressedBuffer ChunkPayload = It->second(It->first).second; if (!ChunkPayload) { RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound), @@ -1067,8 +1067,8 @@ namespace remotestore_impl { ChunkBuffers.clear(); break; } - ChunksSize += ChunkPayload.GetSize(); - ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload).Flatten().AsIoBuffer())); + ChunksSize += ChunkPayload.GetCompressedSize(); + ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload).GetCompressed().Flatten().AsIoBuffer())); } RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers); if (Result.ErrorCode) @@ -1139,54 +1139,13 @@ namespace remotestore_impl { } } // namespace remotestore_impl -bool -IterateBlock(const SharedBuffer& BlockPayload, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor) -{ - ZEN_ASSERT(BlockPayload); - if (BlockPayload.GetSize() < 1) - { - return false; - } - - MemoryView BlockView = BlockPayload.GetView(); - const uint8_t* ReadPtr = reinterpret_cast<const uint8_t*>(BlockView.GetData()); - uint32_t NumberSize; - uint64_t ChunkCount = ReadVarUInt(ReadPtr, NumberSize); - ReadPtr += NumberSize; - std::vector<uint64_t> ChunkSizes; - ChunkSizes.reserve(ChunkCount); - while (ChunkCount--) - { - ChunkSizes.push_back(ReadVarUInt(ReadPtr, NumberSize)); - ReadPtr += NumberSize; - } - ptrdiff_t TempBufferLength = std::distance(reinterpret_cast<const uint8_t*>(BlockView.GetData()), ReadPtr); - ZEN_ASSERT(TempBufferLength > 0); - for (uint64_t ChunkSize : ChunkSizes) - { - IoBuffer Chunk(IoBuffer::Wrap, ReadPtr, ChunkSize); - IoHash AttachmentRawHash; - uint64_t AttachmentRawSize; - CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), AttachmentRawHash, AttachmentRawSize); - - if (!CompressedChunk) - { - ZEN_ERROR("Invalid chunk in block"); - return false; - } - Visitor(std::move(CompressedChunk), AttachmentRawHash); - ReadPtr += ChunkSize; - ZEN_ASSERT(ReadPtr <= BlockView.GetDataEnd()); - } - return true; -}; std::vector<IoHash> GetBlockHashesFromOplog(CbObjectView ContainerObject) { using namespace std::literals; - std::vector<RemoteProjectStore::Block> Result; - CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); + std::vector<ChunkBlockDescription> Result; + CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); std::vector<IoHash> BlockHashes; BlockHashes.reserve(BlocksArray.Num()); @@ -1199,11 +1158,11 @@ GetBlockHashesFromOplog(CbObjectView ContainerObject) return BlockHashes; } -std::vector<RemoteProjectStore::Block> +std::vector<ChunkBlockDescription> GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> IncludeBlockHashes) { using namespace std::literals; - std::vector<RemoteProjectStore::Block> Result; + std::vector<ChunkBlockDescription> Result; CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); tsl::robin_set<IoHash, IoHash::Hasher> IncludeSet; IncludeSet.insert(IncludeBlockHashes.begin(), IncludeBlockHashes.end()); @@ -1232,47 +1191,6 @@ GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> Include return Result; } -CompressedBuffer -GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks, RemoteProjectStore::Block& OutBlock) -{ - const size_t ChunkCount = FetchChunks.size(); - - std::vector<SharedBuffer> ChunkSegments; - ChunkSegments.resize(1); - ChunkSegments.reserve(1 + ChunkCount); - OutBlock.ChunkHashes.reserve(ChunkCount); - OutBlock.ChunkLengths.reserve(ChunkCount); - { - IoBuffer TempBuffer(ChunkCount * 9); - MutableMemoryView View = TempBuffer.GetMutableView(); - uint8_t* BufferStartPtr = reinterpret_cast<uint8_t*>(View.GetData()); - uint8_t* BufferEndPtr = BufferStartPtr; - BufferEndPtr += WriteVarUInt(gsl::narrow<uint64_t>(ChunkCount), BufferEndPtr); - for (const auto& It : FetchChunks) - { - CompositeBuffer Chunk = It.second(It.first); - uint64_t ChunkSize = 0; - std::span<const SharedBuffer> Segments = Chunk.GetSegments(); - for (const SharedBuffer& Segment : Segments) - { - ChunkSize += Segment.GetSize(); - ChunkSegments.push_back(Segment); - } - BufferEndPtr += WriteVarUInt(ChunkSize, BufferEndPtr); - OutBlock.ChunkHashes.push_back(It.first); - OutBlock.ChunkLengths.push_back(gsl::narrow<uint32_t>(ChunkSize)); - } - ZEN_ASSERT(BufferEndPtr <= View.GetDataEnd()); - ptrdiff_t TempBufferLength = std::distance(BufferStartPtr, BufferEndPtr); - ChunkSegments[0] = SharedBuffer(IoBuffer(TempBuffer, 0, gsl::narrow<size_t>(TempBufferLength))); - } - CompressedBuffer CompressedBlock = - CompressedBuffer::Compress(CompositeBuffer(std::move(ChunkSegments)), OodleCompressor::Mermaid, OodleCompressionLevel::None); - OutBlock.BlockHash = CompressedBlock.DecodeRawHash(); - OutBlock.FirstChunkOffset = gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + ChunkSegments[0].GetSize()); - return CompressedBlock; -} - CbObject BuildContainer(CidStore& ChunkStore, ProjectStore::Project& Project, @@ -1283,9 +1201,9 @@ BuildContainer(CidStore& ChunkStore, bool BuildBlocks, bool IgnoreMissingAttachments, bool AllowChunking, - const std::vector<RemoteProjectStore::Block>& KnownBlocks, + const std::vector<ChunkBlockDescription>& KnownBlocks, WorkerThreadPool& WorkerPool, - const std::function<void(CompressedBuffer&&, RemoteProjectStore::Block&&)>& AsyncOnBlock, + const std::function<void(CompressedBuffer&&, ChunkBlockDescription&&)>& AsyncOnBlock, const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment, const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks, bool EmbedLooseFiles, @@ -1307,9 +1225,9 @@ BuildContainer(CidStore& ChunkStore, std::unordered_map<IoHash, FoundAttachment, IoHash::Hasher> UploadAttachments; - RwLock BlocksLock; - std::vector<RemoteProjectStore::Block> Blocks; - CompressedBuffer OpsBuffer; + RwLock BlocksLock; + std::vector<ChunkBlockDescription> Blocks; + CompressedBuffer OpsBuffer; std::filesystem::path AttachmentTempPath = Oplog.TempPath(); AttachmentTempPath.append(".pending"); @@ -1525,7 +1443,7 @@ BuildContainer(CidStore& ChunkStore, return {}; } - auto FindReuseBlocks = [](const std::vector<RemoteProjectStore::Block>& KnownBlocks, + auto FindReuseBlocks = [](const std::vector<ChunkBlockDescription>& KnownBlocks, const std::unordered_set<IoHash, IoHash::Hasher>& Attachments, JobContext* OptionalContext) -> std::vector<size_t> { std::vector<size_t> ReuseBlockIndexes; @@ -1538,8 +1456,8 @@ BuildContainer(CidStore& ChunkStore, for (size_t KnownBlockIndex = 0; KnownBlockIndex < KnownBlocks.size(); KnownBlockIndex++) { - const RemoteProjectStore::Block& KnownBlock = KnownBlocks[KnownBlockIndex]; - size_t BlockAttachmentCount = KnownBlock.ChunkHashes.size(); + const ChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex]; + size_t BlockAttachmentCount = KnownBlock.ChunkHashes.size(); if (BlockAttachmentCount == 0) { continue; @@ -1586,7 +1504,7 @@ BuildContainer(CidStore& ChunkStore, std::vector<size_t> ReusedBlockIndexes = FindReuseBlocks(KnownBlocks, FoundHashes, OptionalContext); for (size_t KnownBlockIndex : ReusedBlockIndexes) { - const RemoteProjectStore::Block& KnownBlock = KnownBlocks[KnownBlockIndex]; + const ChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex]; for (const IoHash& KnownHash : KnownBlock.ChunkHashes) { if (UploadAttachments.erase(KnownHash) == 1) @@ -1632,12 +1550,12 @@ BuildContainer(CidStore& ChunkStore, return Chunked; }; - RwLock ResolveLock; - std::unordered_set<IoHash, IoHash::Hasher> ChunkedHashes; - std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes; - std::unordered_map<IoHash, size_t, IoHash::Hasher> ChunkedUploadAttachments; - std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> LooseUploadAttachments; - std::unordered_set<IoHash, IoHash::Hasher> MissingHashes; + RwLock ResolveLock; + std::unordered_set<IoHash, IoHash::Hasher> ChunkedHashes; + std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes; + std::unordered_map<IoHash, size_t, IoHash::Hasher> ChunkedUploadAttachments; + std::unordered_map<IoHash, std::pair<uint64_t, IoBuffer>, IoHash::Hasher> LooseUploadAttachments; + std::unordered_set<IoHash, IoHash::Hasher> MissingHashes; remotestore_impl::ReportMessage(OptionalContext, fmt::format("Resolving {} attachments from {} ops", UploadAttachments.size(), TotalOpCount)); @@ -1730,7 +1648,7 @@ BuildContainer(CidStore& ChunkStore, } else { - size_t RawSize = RawData.GetSize(); + uint64_t RawSize = RawData.GetSize(); CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(RawData), OodleCompressor::Mermaid, OodleCompressionLevel::VeryFast); @@ -1753,8 +1671,8 @@ BuildContainer(CidStore& ChunkStore, { UploadAttachment->Size = Compressed.GetCompressedSize(); ResolveLock.WithExclusiveLock( - [RawHash, &LooseUploadAttachments, Data = std::move(TempAttachmentBuffer)]() { - LooseUploadAttachments.insert_or_assign(RawHash, std::move(Data)); + [RawHash, RawSize, &LooseUploadAttachments, Data = std::move(TempAttachmentBuffer)]() { + LooseUploadAttachments.insert_or_assign(RawHash, std::make_pair(RawSize, std::move(Data))); }); } } @@ -1927,7 +1845,7 @@ BuildContainer(CidStore& ChunkStore, std::vector<size_t> ReusedBlockFromChunking = FindReuseBlocks(KnownBlocks, ChunkedHashes, OptionalContext); for (size_t KnownBlockIndex : ReusedBlockIndexes) { - const RemoteProjectStore::Block& KnownBlock = KnownBlocks[KnownBlockIndex]; + const ChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex]; for (const IoHash& KnownHash : KnownBlock.ChunkHashes) { if (ChunkedHashes.erase(KnownHash) == 1) @@ -2109,16 +2027,25 @@ BuildContainer(CidStore& ChunkStore, { if (auto It = LooseUploadAttachments.find(RawHash); It != LooseUploadAttachments.end()) { - ChunksInBlock.emplace_back(std::make_pair(RawHash, [IoBuffer = SharedBuffer(It->second)](const IoHash&) { - return CompositeBuffer(IoBuffer); - })); + ChunksInBlock.emplace_back(std::make_pair( + RawHash, + [RawSize = It->second.first, + IoBuffer = SharedBuffer(It->second.second)](const IoHash&) -> std::pair<uint64_t, CompressedBuffer> { + return std::make_pair(RawSize, CompressedBuffer::FromCompressedNoValidate(IoBuffer.AsIoBuffer())); + })); LooseUploadAttachments.erase(It); } else { - ChunksInBlock.emplace_back(std::make_pair(RawHash, [&ChunkStore](const IoHash& RawHash) { - return CompositeBuffer(SharedBuffer(ChunkStore.FindChunkByCid(RawHash))); - })); + ChunksInBlock.emplace_back( + std::make_pair(RawHash, [&ChunkStore](const IoHash& RawHash) -> std::pair<uint64_t, CompressedBuffer> { + IoBuffer Chunk = ChunkStore.FindChunkByCid(RawHash); + IoHash _; + uint64_t RawSize = 0; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), _, RawSize); + ZEN_ASSERT(Compressed); + return {RawSize, Compressed}; + })); } BlockSize += PayloadSize; @@ -2169,14 +2096,15 @@ BuildContainer(CidStore& ChunkStore, if (BlockAttachmentHashes.insert(ChunkHash).second) { const ChunkSource& Source = Chunked.ChunkSources[ChunkIndex]; - ChunksInBlock.emplace_back(std::make_pair( - ChunkHash, - [Source = ChunkedFile.Source, Offset = Source.Offset, Size = Source.Size](const IoHash&) { - return CompressedBuffer::Compress(SharedBuffer(IoBuffer(Source, Offset, Size)), - OodleCompressor::Mermaid, - OodleCompressionLevel::None) - .GetCompressed(); - })); + ChunksInBlock.emplace_back( + std::make_pair(ChunkHash, + [Source = ChunkedFile.Source, Offset = Source.Offset, Size = Source.Size]( + const IoHash&) -> std::pair<uint64_t, CompressedBuffer> { + return {Size, + CompressedBuffer::Compress(SharedBuffer(IoBuffer(Source, Offset, Size)), + OodleCompressor::Mermaid, + OodleCompressionLevel::None)}; + })); BlockSize += CompressedBuffer::GetHeaderSizeForNoneEncoder() + Source.Size; if (BuildBlocks) { @@ -2298,7 +2226,7 @@ BuildContainer(CidStore& ChunkStore, OplogContinerWriter.AddBinary("ops"sv, CompressedOpsSection.GetCompressed().Flatten().AsIoBuffer()); OplogContinerWriter.BeginArray("blocks"sv); { - for (const RemoteProjectStore::Block& B : Blocks) + for (const ChunkBlockDescription& B : Blocks) { ZEN_ASSERT(!B.ChunkHashes.empty()); if (BuildBlocks) @@ -2392,7 +2320,7 @@ BuildContainer(CidStore& ChunkStore, bool BuildBlocks, bool IgnoreMissingAttachments, bool AllowChunking, - const std::function<void(CompressedBuffer&&, RemoteProjectStore::Block&&)>& AsyncOnBlock, + const std::function<void(CompressedBuffer&&, ChunkBlockDescription&&)>& AsyncOnBlock, const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment, const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks, bool EmbedLooseFiles) @@ -2458,8 +2386,8 @@ SaveOplog(CidStore& ChunkStore, std::unordered_map<IoHash, remotestore_impl::CreatedBlock, IoHash::Hasher> CreatedBlocks; tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher> LooseLargeFiles; - auto MakeTempBlock = [AttachmentTempPath, &RemoteResult, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock, - RemoteProjectStore::Block&& Block) { + auto MakeTempBlock = [AttachmentTempPath, &RemoteResult, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock, + ChunkBlockDescription&& Block) { std::filesystem::path BlockPath = AttachmentTempPath; BlockPath.append(Block.BlockHash.ToHexString()); try @@ -2478,8 +2406,8 @@ SaveOplog(CidStore& ChunkStore, } }; - auto UploadBlock = [&RemoteStore, &RemoteResult, &Info, OptionalContext](CompressedBuffer&& CompressedBlock, - RemoteProjectStore::Block&& Block) { + auto UploadBlock = [&RemoteStore, &RemoteResult, &Info, OptionalContext](CompressedBuffer&& CompressedBlock, + ChunkBlockDescription&& Block) { IoHash BlockHash = Block.BlockHash; RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompressedBlock.GetCompressed(), BlockHash, std::move(Block)); @@ -2512,7 +2440,7 @@ SaveOplog(CidStore& ChunkStore, ZEN_DEBUG("Found attachment {}", AttachmentHash); }; - std::function<void(CompressedBuffer&&, RemoteProjectStore::Block &&)> OnBlock; + std::function<void(CompressedBuffer&&, ChunkBlockDescription &&)> OnBlock; if (RemoteStoreInfo.UseTempBlockFiles) { OnBlock = MakeTempBlock; @@ -2522,7 +2450,7 @@ SaveOplog(CidStore& ChunkStore, OnBlock = UploadBlock; } - std::vector<RemoteProjectStore::Block> KnownBlocks; + std::vector<ChunkBlockDescription> KnownBlocks; uint64_t TransferWallTimeMS = 0; diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h index e05cb9923..1ef0416b7 100644 --- a/src/zenserver/projectstore/remoteprojectstore.h +++ b/src/zenserver/projectstore/remoteprojectstore.h @@ -5,6 +5,8 @@ #include <zencore/jobqueue.h> #include "projectstore.h" +#include <zenutil/chunkblock.h> + #include <unordered_set> namespace zen { @@ -16,14 +18,6 @@ struct ChunkedInfo; class RemoteProjectStore { public: - struct Block - { - IoHash BlockHash; - std::vector<IoHash> ChunkHashes; - std::vector<uint32_t> ChunkLengths; - uint32_t FirstChunkOffset = (uint32_t)-1; - }; - struct Result { int32_t ErrorCode{}; @@ -72,7 +66,7 @@ public: struct GetKnownBlocksResult : public Result { - std::vector<Block> Blocks; + std::vector<ChunkBlockDescription> Blocks; }; struct RemoteStoreInfo @@ -101,11 +95,11 @@ public: virtual RemoteStoreInfo GetInfo() const = 0; virtual Stats GetStats() const = 0; - virtual CreateContainerResult CreateContainer() = 0; - virtual SaveResult SaveContainer(const IoBuffer& Payload) = 0; - virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, Block&& Block) = 0; - virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) = 0; - virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Payloads) = 0; + virtual CreateContainerResult CreateContainer() = 0; + virtual SaveResult SaveContainer(const IoBuffer& Payload) = 0; + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&& Block) = 0; + virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) = 0; + virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Payloads) = 0; virtual LoadContainerResult LoadContainer() = 0; virtual GetKnownBlocksResult GetKnownBlocks() = 0; @@ -125,7 +119,6 @@ struct RemoteStoreOptions }; typedef std::function<IoBuffer(const IoHash& AttachmentHash)> TGetAttachmentBufferFunc; -typedef std::function<CompositeBuffer(const IoHash& RawHash)> FetchChunkFunc; RemoteProjectStore::LoadContainerResult BuildContainer( CidStore& ChunkStore, @@ -137,7 +130,7 @@ RemoteProjectStore::LoadContainerResult BuildContainer( bool BuildBlocks, bool IgnoreMissingAttachments, bool AllowChunking, - const std::function<void(CompressedBuffer&&, RemoteProjectStore::Block&&)>& AsyncOnBlock, + const std::function<void(CompressedBuffer&&, ChunkBlockDescription&&)>& AsyncOnBlock, const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment, const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks, bool EmbedLooseFiles); @@ -173,9 +166,7 @@ RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, bool CleanOplog, JobContext* OptionalContext); -CompressedBuffer GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks, RemoteProjectStore::Block& OutBlock); -bool IterateBlock(const SharedBuffer& BlockPayload, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor); -std::vector<IoHash> GetBlockHashesFromOplog(CbObjectView ContainerObject); -std::vector<RemoteProjectStore::Block> GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> IncludeBlockHashes); +std::vector<IoHash> GetBlockHashesFromOplog(CbObjectView ContainerObject); +std::vector<ChunkBlockDescription> GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> IncludeBlockHashes); } // namespace zen diff --git a/src/zenserver/projectstore/zenremoteprojectstore.cpp b/src/zenserver/projectstore/zenremoteprojectstore.cpp index 42519b108..2ebf58a5d 100644 --- a/src/zenserver/projectstore/zenremoteprojectstore.cpp +++ b/src/zenserver/projectstore/zenremoteprojectstore.cpp @@ -93,7 +93,7 @@ public: return Result; } - virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, Block&&) override + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&&) override { std::string SaveRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash); HttpClient::Response Response = m_Client.Post(SaveRequest, Payload, ZenContentType::kCompressedBinary); diff --git a/src/zenutil/chunkblock.cpp b/src/zenutil/chunkblock.cpp new file mode 100644 index 000000000..6dae5af11 --- /dev/null +++ b/src/zenutil/chunkblock.cpp @@ -0,0 +1,166 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/chunkblock.h> + +#include <zencore/compactbinarybuilder.h> +#include <zencore/logging.h> + +#include <vector> + +namespace zen { + +using namespace std::literals; + +ChunkBlockDescription +ParseChunkBlockDescription(const CbObjectView& BlockObject) +{ + ChunkBlockDescription Result; + Result.BlockHash = BlockObject["rawHash"sv].AsHash(); + if (Result.BlockHash != IoHash::Zero) + { + CbArrayView ChunksArray = BlockObject["rawHashes"sv].AsArrayView(); + Result.ChunkHashes.reserve(ChunksArray.Num()); + for (CbFieldView ChunkView : ChunksArray) + { + Result.ChunkHashes.push_back(ChunkView.AsHash()); + } + + CbArrayView ChunkRawLengthsArray = BlockObject["chunkRawLengths"sv].AsArrayView(); + std::vector<uint32_t> ChunkLengths; + Result.ChunkRawLengths.reserve(ChunkRawLengthsArray.Num()); + for (CbFieldView ChunkView : ChunkRawLengthsArray) + { + Result.ChunkRawLengths.push_back(ChunkView.AsUInt32()); + } + } + return Result; +} + +std::vector<ChunkBlockDescription> +ParseChunkBlockDescriptionList(const CbObjectView& BlocksObject) +{ + if (!BlocksObject) + { + return {}; + } + std::vector<ChunkBlockDescription> Result; + CbArrayView Blocks = BlocksObject["blocks"].AsArrayView(); + Result.reserve(Blocks.Num()); + for (CbFieldView BlockView : Blocks) + { + CbObjectView BlockObject = BlockView.AsObjectView(); + Result.emplace_back(ParseChunkBlockDescription(BlockObject)); + } + return Result; +} + +CbObject +BuildChunkBlockDescription(const ChunkBlockDescription& Block, CbObjectView MetaData) +{ + ZEN_ASSERT(Block.ChunkRawLengths.size() == Block.ChunkHashes.size()); + + CbObjectWriter Writer; + Writer.AddHash("rawHash"sv, Block.BlockHash); + Writer.BeginArray("rawHashes"sv); + { + for (const IoHash& ChunkHash : Block.ChunkHashes) + { + Writer.AddHash(ChunkHash); + } + } + Writer.EndArray(); + Writer.BeginArray("chunkRawLengths"); + { + for (uint32_t ChunkSize : Block.ChunkRawLengths) + { + Writer.AddInteger(ChunkSize); + } + } + Writer.EndArray(); + + Writer.AddObject("metadata", MetaData); + + return Writer.Save(); +} + +CompressedBuffer +GenerateChunkBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks, ChunkBlockDescription& OutBlock) +{ + const size_t ChunkCount = FetchChunks.size(); + + std::vector<SharedBuffer> ChunkSegments; + ChunkSegments.resize(1); + ChunkSegments.reserve(1 + ChunkCount); + OutBlock.ChunkHashes.reserve(ChunkCount); + OutBlock.ChunkRawLengths.reserve(ChunkCount); + { + IoBuffer TempBuffer(ChunkCount * 9); + MutableMemoryView View = TempBuffer.GetMutableView(); + uint8_t* BufferStartPtr = reinterpret_cast<uint8_t*>(View.GetData()); + uint8_t* BufferEndPtr = BufferStartPtr; + BufferEndPtr += WriteVarUInt(gsl::narrow<uint64_t>(ChunkCount), BufferEndPtr); + for (const auto& It : FetchChunks) + { + std::pair<uint64_t, CompressedBuffer> Chunk = It.second(It.first); + uint64_t ChunkSize = 0; + std::span<const SharedBuffer> Segments = Chunk.second.GetCompressed().GetSegments(); + for (const SharedBuffer& Segment : Segments) + { + ChunkSize += Segment.GetSize(); + ChunkSegments.push_back(Segment); + } + BufferEndPtr += WriteVarUInt(ChunkSize, BufferEndPtr); + OutBlock.ChunkHashes.push_back(It.first); + OutBlock.ChunkRawLengths.push_back(gsl::narrow<uint32_t>(Chunk.first)); + } + ZEN_ASSERT(BufferEndPtr <= View.GetDataEnd()); + ptrdiff_t TempBufferLength = std::distance(BufferStartPtr, BufferEndPtr); + ChunkSegments[0] = SharedBuffer(IoBuffer(TempBuffer, 0, gsl::narrow<size_t>(TempBufferLength))); + } + CompressedBuffer CompressedBlock = + CompressedBuffer::Compress(CompositeBuffer(std::move(ChunkSegments)), OodleCompressor::Mermaid, OodleCompressionLevel::None); + OutBlock.BlockHash = CompressedBlock.DecodeRawHash(); + return CompressedBlock; +} + +bool +IterateChunkBlock(const SharedBuffer& BlockPayload, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor) +{ + ZEN_ASSERT(BlockPayload); + if (BlockPayload.GetSize() < 1) + { + return false; + } + + MemoryView BlockView = BlockPayload.GetView(); + const uint8_t* ReadPtr = reinterpret_cast<const uint8_t*>(BlockView.GetData()); + uint32_t NumberSize; + uint64_t ChunkCount = ReadVarUInt(ReadPtr, NumberSize); + ReadPtr += NumberSize; + std::vector<uint64_t> ChunkSizes; + ChunkSizes.reserve(ChunkCount); + while (ChunkCount--) + { + ChunkSizes.push_back(ReadVarUInt(ReadPtr, NumberSize)); + ReadPtr += NumberSize; + } + for (uint64_t ChunkSize : ChunkSizes) + { + IoBuffer Chunk(IoBuffer::Wrap, ReadPtr, ChunkSize); + IoHash AttachmentRawHash; + uint64_t AttachmentRawSize; + CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), AttachmentRawHash, AttachmentRawSize); + + if (!CompressedChunk) + { + ZEN_ERROR("Invalid chunk in block"); + return false; + } + Visitor(std::move(CompressedChunk), AttachmentRawHash); + ReadPtr += ChunkSize; + ZEN_ASSERT(ReadPtr <= BlockView.GetDataEnd()); + } + return true; +}; + +} // namespace zen diff --git a/src/zenstore/chunkedfile.cpp b/src/zenutil/chunkedfile.cpp index f200bc1ec..c08492eb0 100644 --- a/src/zenstore/chunkedfile.cpp +++ b/src/zenutil/chunkedfile.cpp @@ -1,7 +1,8 @@ // Copyright Epic Games, Inc. All Rights Reserved. +#include <zenutil/chunkedfile.h> + #include <zencore/basicfile.h> -#include <zenstore/chunkedfile.h> #include "chunking.h" @@ -111,7 +112,7 @@ Reconstruct(const ChunkedInfo& Info, const std::filesystem::path& TargetPath, st } ChunkedInfoWithSource -ChunkData(BasicFile& RawData, uint64_t Offset, uint64_t Size, ChunkedParams Params) +ChunkData(BasicFile& RawData, uint64_t Offset, uint64_t Size, ChunkedParams Params, std::atomic<uint64_t>* BytesProcessed) { ChunkedInfoWithSource Result; tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> FoundChunks; @@ -163,6 +164,10 @@ ChunkData(BasicFile& RawData, uint64_t Offset, uint64_t Size, ChunkedParams Para SliceSize = SliceView.GetSize(); Offset += ChunkLength; + if (BytesProcessed != nullptr) + { + BytesProcessed->fetch_add(ChunkLength); + } } Result.Info.RawSize = Size; Result.Info.RawHash = RawHashStream.GetHash(); diff --git a/src/zenstore/chunking.cpp b/src/zenutil/chunking.cpp index 30edd322a..30edd322a 100644 --- a/src/zenstore/chunking.cpp +++ b/src/zenutil/chunking.cpp diff --git a/src/zenstore/chunking.h b/src/zenutil/chunking.h index 09c56454f..09c56454f 100644 --- a/src/zenstore/chunking.h +++ b/src/zenutil/chunking.h diff --git a/src/zenutil/include/zenutil/chunkblock.h b/src/zenutil/include/zenutil/chunkblock.h new file mode 100644 index 000000000..9b7414629 --- /dev/null +++ b/src/zenutil/include/zenutil/chunkblock.h @@ -0,0 +1,32 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/iohash.h> + +#include <zencore/compactbinary.h> +#include <zencore/compress.h> + +#include <optional> +#include <vector> + +namespace zen { + +struct ChunkBlockDescription +{ + IoHash BlockHash; + std::vector<IoHash> ChunkHashes; + std::vector<uint32_t> ChunkRawLengths; +}; + +std::vector<ChunkBlockDescription> ParseChunkBlockDescriptionList(const CbObjectView& BlocksObject); +ChunkBlockDescription ParseChunkBlockDescription(const CbObjectView& BlockObject); +CbObject BuildChunkBlockDescription(const ChunkBlockDescription& Block, CbObjectView MetaData); + +typedef std::function<std::pair<uint64_t, CompressedBuffer>(const IoHash& RawHash)> FetchChunkFunc; + +CompressedBuffer GenerateChunkBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks, ChunkBlockDescription& OutBlock); +bool IterateChunkBlock(const SharedBuffer& BlockPayload, + std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor); + +} // namespace zen diff --git a/src/zenstore/include/zenstore/chunkedfile.h b/src/zenutil/include/zenutil/chunkedfile.h index c6330bdbd..7110ad317 100644 --- a/src/zenstore/include/zenstore/chunkedfile.h +++ b/src/zenutil/include/zenutil/chunkedfile.h @@ -43,7 +43,11 @@ struct ChunkedParams static const ChunkedParams UShaderByteCodeParams = {.UseThreshold = true, .MinSize = 17280, .MaxSize = 139264, .AvgSize = 36340}; -ChunkedInfoWithSource ChunkData(BasicFile& RawData, uint64_t Offset, uint64_t Size, ChunkedParams Params = {}); +ChunkedInfoWithSource ChunkData(BasicFile& RawData, + uint64_t Offset, + uint64_t Size, + ChunkedParams Params = {}, + std::atomic<uint64_t>* BytesProcessed = nullptr); void Reconstruct(const ChunkedInfo& Info, const std::filesystem::path& TargetPath, std::function<IoBuffer(const IoHash& ChunkHash)> GetChunk); diff --git a/src/zenutil/zenutil.cpp b/src/zenutil/zenutil.cpp index c54144549..19eb63ce9 100644 --- a/src/zenutil/zenutil.cpp +++ b/src/zenutil/zenutil.cpp @@ -6,6 +6,7 @@ # include <zenutil/cache/cacherequests.h> # include <zenutil/cache/rpcrecording.h> +# include <zenutil/chunkedfile.h> namespace zen { @@ -15,6 +16,7 @@ zenutil_forcelinktests() cachepolicy_forcelink(); cache::rpcrecord_forcelink(); cacherequests_forcelink(); + chunkedfile_forcelink(); } } // namespace zen |