aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-02-12 09:02:35 +0100
committerGitHub Enterprise <[email protected]>2025-02-12 09:02:35 +0100
commitda9179d330a37132488f6deb8d8068783b087256 (patch)
tree3309dfe685495bab7f18068f7c0d1dbd76a4b536 /src
parentimproved builds api interface in jupiter (#281) (diff)
downloadzen-da9179d330a37132488f6deb8d8068783b087256.tar.xz
zen-da9179d330a37132488f6deb8d8068783b087256.zip
moving and small refactor of chunk blocks to prepare for builds api (#282)
Diffstat (limited to 'src')
-rw-r--r--src/zenserver/projectstore/buildsremoteprojectstore.cpp77
-rw-r--r--src/zenserver/projectstore/fileremoteprojectstore.cpp6
-rw-r--r--src/zenserver/projectstore/jupiterremoteprojectstore.cpp4
-rw-r--r--src/zenserver/projectstore/projectstore.cpp16
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp226
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.h31
-rw-r--r--src/zenserver/projectstore/zenremoteprojectstore.cpp2
-rw-r--r--src/zenutil/chunkblock.cpp166
-rw-r--r--src/zenutil/chunkedfile.cpp (renamed from src/zenstore/chunkedfile.cpp)9
-rw-r--r--src/zenutil/chunking.cpp (renamed from src/zenstore/chunking.cpp)0
-rw-r--r--src/zenutil/chunking.h (renamed from src/zenstore/chunking.h)0
-rw-r--r--src/zenutil/include/zenutil/chunkblock.h32
-rw-r--r--src/zenutil/include/zenutil/chunkedfile.h (renamed from src/zenstore/include/zenstore/chunkedfile.h)6
-rw-r--r--src/zenutil/zenutil.cpp2
14 files changed, 334 insertions, 243 deletions
diff --git a/src/zenserver/projectstore/buildsremoteprojectstore.cpp b/src/zenserver/projectstore/buildsremoteprojectstore.cpp
index 302b81729..412769174 100644
--- a/src/zenserver/projectstore/buildsremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/buildsremoteprojectstore.cpp
@@ -3,6 +3,7 @@
#include "buildsremoteprojectstore.h"
#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinaryvalidation.h>
#include <zencore/compress.h>
#include <zencore/fmtutils.h>
@@ -114,7 +115,9 @@ public:
return Result;
}
- virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, Block&& Block) override
+ virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload,
+ const IoHash& RawHash,
+ ChunkBlockDescription&& Block) override
{
ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero);
JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client());
@@ -139,44 +142,10 @@ public:
if (Block.BlockHash == RawHash)
{
- ZEN_ASSERT(Block.ChunkLengths.size() == Block.ChunkHashes.size());
- CbObjectWriter Writer;
- Writer.AddHash("rawHash"sv, RawHash);
- Writer.BeginArray("rawHashes"sv);
- {
- for (const IoHash& ChunkHash : Block.ChunkHashes)
- {
- Writer.AddHash(ChunkHash);
- }
- }
- Writer.EndArray();
- Writer.BeginArray("chunkLengths");
- {
- for (uint32_t ChunkSize : Block.ChunkLengths)
- {
- Writer.AddInteger(ChunkSize);
- }
- }
- Writer.EndArray();
- Writer.BeginArray("chunkOffsets");
- {
- ZEN_ASSERT(Block.FirstChunkOffset != (uint32_t)-1);
- uint32_t Offset = Block.FirstChunkOffset;
- for (uint32_t ChunkSize : Block.ChunkLengths)
- {
- Writer.AddInteger(Offset);
- Offset += ChunkSize;
- }
- }
- Writer.EndArray();
+ CbObjectWriter BlockMetaData;
+ BlockMetaData.AddString("createdBy", GetRunningExecutablePath().stem().string());
- Writer.BeginObject("metadata"sv);
- {
- Writer.AddString("createdBy", "zenserver");
- }
- Writer.EndObject();
-
- IoBuffer MetaPayload = Writer.Save().GetBuffer().AsIoBuffer();
+ IoBuffer MetaPayload = BuildChunkBlockDescription(Block, BlockMetaData.Save()).GetBuffer().AsIoBuffer();
MetaPayload.SetContentType(ZenContentType::kCbObject);
JupiterResult PutMetaResult =
Session.PutBlockMetadata(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash, MetaPayload);
@@ -357,8 +326,7 @@ public:
Result.Reason);
return Result;
}
- CbObject BlocksObject = LoadCompactBinaryObject(FindResult.Response);
- if (!BlocksObject)
+ if (ValidateCompactBinary(FindResult.Response.GetView(), CbValidateMode::Default) != CbValidateError::None)
{
Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
Result.Reason = fmt::format("The block list {}/{}/{}/{} is not formatted as a compact binary object"sv,
@@ -369,25 +337,20 @@ public:
m_OplogBuildPartId);
return Result;
}
-
- CbArrayView Blocks = BlocksObject["blocks"].AsArrayView();
- Result.Blocks.reserve(Blocks.Num());
- for (CbFieldView BlockView : Blocks)
+ std::optional<std::vector<ChunkBlockDescription>> Blocks =
+ ParseChunkBlockDescriptionList(LoadCompactBinaryObject(FindResult.Response));
+ if (!Blocks)
{
- CbObjectView BlockObject = BlockView.AsObjectView();
- IoHash BlockHash = BlockObject["rawHash"sv].AsHash();
- if (BlockHash != IoHash::Zero)
- {
- CbArrayView ChunksArray = BlockObject["rawHashes"sv].AsArrayView();
- std::vector<IoHash> ChunkHashes;
- ChunkHashes.reserve(ChunksArray.Num());
- for (CbFieldView ChunkView : ChunksArray)
- {
- ChunkHashes.push_back(ChunkView.AsHash());
- }
- Result.Blocks.emplace_back(Block{.BlockHash = BlockHash, .ChunkHashes = ChunkHashes});
- }
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.Reason = fmt::format("The block list {}/{}/{}/{} is not formatted as a list of blocks"sv,
+ m_JupiterClient->ServiceUrl(),
+ m_Namespace,
+ m_Bucket,
+ m_BuildId,
+ m_OplogBuildPartId);
+ return Result;
}
+ Result.Blocks = std::move(Blocks.value());
return Result;
}
diff --git a/src/zenserver/projectstore/fileremoteprojectstore.cpp b/src/zenserver/projectstore/fileremoteprojectstore.cpp
index 0fe739a12..5a21a7540 100644
--- a/src/zenserver/projectstore/fileremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/fileremoteprojectstore.cpp
@@ -106,7 +106,7 @@ public:
return Result;
}
- virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, Block&&) override
+ virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&&) override
{
Stopwatch Timer;
SaveAttachmentResult Result;
@@ -192,8 +192,8 @@ public:
return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent),
.ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000}};
}
- std::vector<RemoteProjectStore::Block> KnownBlocks = GetBlocksFromOplog(LoadResult.ContainerObject, ExistingBlockHashes);
- GetKnownBlocksResult Result{{.ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000}};
+ std::vector<ChunkBlockDescription> KnownBlocks = GetBlocksFromOplog(LoadResult.ContainerObject, ExistingBlockHashes);
+ GetKnownBlocksResult Result{{.ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000}};
Result.Blocks = std::move(KnownBlocks);
return Result;
}
diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
index e906127ff..2b6a437d1 100644
--- a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp
@@ -92,7 +92,7 @@ public:
return Result;
}
- virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, Block&&) override
+ virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&&) override
{
JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client());
JupiterResult PutResult = Session.PutCompressedBlob(m_Namespace, RawHash, Payload);
@@ -193,7 +193,7 @@ public:
return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent),
.ElapsedSeconds = LoadResult.ElapsedSeconds + ExistsResult.ElapsedSeconds}};
}
- std::vector<RemoteProjectStore::Block> KnownBlocks = GetBlocksFromOplog(LoadResult.ContainerObject, ExistingBlockHashes);
+ std::vector<ChunkBlockDescription> KnownBlocks = GetBlocksFromOplog(LoadResult.ContainerObject, ExistingBlockHashes);
GetKnownBlocksResult Result{
{.ElapsedSeconds = LoadResult.ElapsedSeconds + ExistsResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000.0}};
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 46a236af9..f6f7eba99 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -5347,7 +5347,7 @@ ProjectStore::ReadOplog(const std::string_view ProjectId,
/* BuildBlocks */ false,
/* IgnoreMissingAttachments */ false,
/* AllowChunking*/ false,
- [](CompressedBuffer&&, RemoteProjectStore::Block&&) {},
+ [](CompressedBuffer&&, ChunkBlockDescription&&) {},
[](const IoHash&, TGetAttachmentBufferFunc&&) {},
[](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) {},
/* EmbedLooseFiles*/ false);
@@ -8621,14 +8621,14 @@ TEST_CASE("project.store.block")
Chunks.reserve(AttachmentSizes.size());
for (const auto& It : AttachmentsWithId)
{
- Chunks.push_back(std::make_pair(It.second.DecodeRawHash(),
- [Buffer = It.second.GetCompressed().Flatten().AsIoBuffer()](const IoHash&) -> CompositeBuffer {
- return CompositeBuffer(SharedBuffer(Buffer));
- }));
+ Chunks.push_back(
+ std::make_pair(It.second.DecodeRawHash(), [Buffer = It.second](const IoHash&) -> std::pair<uint64_t, CompressedBuffer> {
+ return {Buffer.DecodeRawSize(), Buffer};
+ }));
}
- RemoteProjectStore::Block Block;
- CompressedBuffer BlockBuffer = GenerateBlock(std::move(Chunks), Block);
- CHECK(IterateBlock(BlockBuffer.Decompress(), [](CompressedBuffer&&, const IoHash&) {}));
+ ChunkBlockDescription Block;
+ CompressedBuffer BlockBuffer = GenerateChunkBlock(std::move(Chunks), Block);
+ CHECK(IterateChunkBlock(BlockBuffer.Decompress(), [](CompressedBuffer&&, const IoHash&) {}));
}
TEST_CASE("project.store.iterateoplog")
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index 0589fdc5f..5b75a840e 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -12,8 +12,8 @@
#include <zencore/stream.h>
#include <zencore/timer.h>
#include <zencore/workthreadpool.h>
-#include <zenstore/chunkedfile.h>
#include <zenstore/cidstore.h>
+#include <zenutil/chunkedfile.h>
#include <zenutil/workerpools.h>
#include <unordered_map>
@@ -143,7 +143,7 @@ namespace remotestore_impl {
NiceBytes(Stats.m_PeakReceivedBytes));
}
- size_t AddBlock(RwLock& BlocksLock, std::vector<RemoteProjectStore::Block>& Blocks)
+ size_t AddBlock(RwLock& BlocksLock, std::vector<ChunkBlockDescription>& Blocks)
{
size_t BlockIndex;
{
@@ -573,7 +573,7 @@ namespace remotestore_impl {
return;
}
- bool StoreChunksOK = IterateBlock(
+ bool StoreChunksOK = IterateChunkBlock(
BlockPayload,
[&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info](CompressedBuffer&& Chunk,
const IoHash& AttachmentRawHash) {
@@ -738,14 +738,14 @@ namespace remotestore_impl {
});
};
- void CreateBlock(WorkerThreadPool& WorkerPool,
- Latch& OpSectionsLatch,
- std::vector<std::pair<IoHash, FetchChunkFunc>>&& ChunksInBlock,
- RwLock& SectionsLock,
- std::vector<RemoteProjectStore::Block>& Blocks,
- size_t BlockIndex,
- const std::function<void(CompressedBuffer&&, RemoteProjectStore::Block&&)>& AsyncOnBlock,
- AsyncRemoteResult& RemoteResult)
+ void CreateBlock(WorkerThreadPool& WorkerPool,
+ Latch& OpSectionsLatch,
+ std::vector<std::pair<IoHash, FetchChunkFunc>>&& ChunksInBlock,
+ RwLock& SectionsLock,
+ std::vector<ChunkBlockDescription>& Blocks,
+ size_t BlockIndex,
+ const std::function<void(CompressedBuffer&&, ChunkBlockDescription&&)>& AsyncOnBlock,
+ AsyncRemoteResult& RemoteResult)
{
OpSectionsLatch.AddCount(1);
WorkerPool.ScheduleWork([&Blocks,
@@ -764,10 +764,10 @@ namespace remotestore_impl {
try
{
ZEN_ASSERT(ChunkCount > 0);
- Stopwatch Timer;
- RemoteProjectStore::Block Block;
- CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks), Block);
- IoHash BlockHash = CompressedBlock.DecodeRawHash();
+ Stopwatch Timer;
+ ChunkBlockDescription Block;
+ CompressedBuffer CompressedBlock = GenerateChunkBlock(std::move(Chunks), Block);
+ IoHash BlockHash = CompressedBlock.DecodeRawHash();
{
// We can share the lock as we are not resizing the vector and only touch BlockHash at our own index
RwLock::SharedLockScope __(SectionsLock);
@@ -800,8 +800,8 @@ namespace remotestore_impl {
struct CreatedBlock
{
- IoBuffer Payload;
- RemoteProjectStore::Block Block;
+ IoBuffer Payload;
+ ChunkBlockDescription Block;
};
void UploadAttachments(WorkerThreadPool& WorkerPool,
@@ -931,8 +931,8 @@ namespace remotestore_impl {
}
try
{
- IoBuffer Payload;
- RemoteProjectStore::Block Block;
+ IoBuffer Payload;
+ ChunkBlockDescription Block;
if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end())
{
Payload = BlockIt->second.Payload;
@@ -1058,7 +1058,7 @@ namespace remotestore_impl {
{
auto It = BulkBlockAttachmentsToUpload.find(Chunk);
ZEN_ASSERT(It != BulkBlockAttachmentsToUpload.end());
- CompositeBuffer ChunkPayload = It->second(It->first);
+ CompressedBuffer ChunkPayload = It->second(It->first).second;
if (!ChunkPayload)
{
RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound),
@@ -1067,8 +1067,8 @@ namespace remotestore_impl {
ChunkBuffers.clear();
break;
}
- ChunksSize += ChunkPayload.GetSize();
- ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload).Flatten().AsIoBuffer()));
+ ChunksSize += ChunkPayload.GetCompressedSize();
+ ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload).GetCompressed().Flatten().AsIoBuffer()));
}
RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers);
if (Result.ErrorCode)
@@ -1139,54 +1139,13 @@ namespace remotestore_impl {
}
} // namespace remotestore_impl
-bool
-IterateBlock(const SharedBuffer& BlockPayload, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor)
-{
- ZEN_ASSERT(BlockPayload);
- if (BlockPayload.GetSize() < 1)
- {
- return false;
- }
-
- MemoryView BlockView = BlockPayload.GetView();
- const uint8_t* ReadPtr = reinterpret_cast<const uint8_t*>(BlockView.GetData());
- uint32_t NumberSize;
- uint64_t ChunkCount = ReadVarUInt(ReadPtr, NumberSize);
- ReadPtr += NumberSize;
- std::vector<uint64_t> ChunkSizes;
- ChunkSizes.reserve(ChunkCount);
- while (ChunkCount--)
- {
- ChunkSizes.push_back(ReadVarUInt(ReadPtr, NumberSize));
- ReadPtr += NumberSize;
- }
- ptrdiff_t TempBufferLength = std::distance(reinterpret_cast<const uint8_t*>(BlockView.GetData()), ReadPtr);
- ZEN_ASSERT(TempBufferLength > 0);
- for (uint64_t ChunkSize : ChunkSizes)
- {
- IoBuffer Chunk(IoBuffer::Wrap, ReadPtr, ChunkSize);
- IoHash AttachmentRawHash;
- uint64_t AttachmentRawSize;
- CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), AttachmentRawHash, AttachmentRawSize);
-
- if (!CompressedChunk)
- {
- ZEN_ERROR("Invalid chunk in block");
- return false;
- }
- Visitor(std::move(CompressedChunk), AttachmentRawHash);
- ReadPtr += ChunkSize;
- ZEN_ASSERT(ReadPtr <= BlockView.GetDataEnd());
- }
- return true;
-};
std::vector<IoHash>
GetBlockHashesFromOplog(CbObjectView ContainerObject)
{
using namespace std::literals;
- std::vector<RemoteProjectStore::Block> Result;
- CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView();
+ std::vector<ChunkBlockDescription> Result;
+ CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView();
std::vector<IoHash> BlockHashes;
BlockHashes.reserve(BlocksArray.Num());
@@ -1199,11 +1158,11 @@ GetBlockHashesFromOplog(CbObjectView ContainerObject)
return BlockHashes;
}
-std::vector<RemoteProjectStore::Block>
+std::vector<ChunkBlockDescription>
GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> IncludeBlockHashes)
{
using namespace std::literals;
- std::vector<RemoteProjectStore::Block> Result;
+ std::vector<ChunkBlockDescription> Result;
CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView();
tsl::robin_set<IoHash, IoHash::Hasher> IncludeSet;
IncludeSet.insert(IncludeBlockHashes.begin(), IncludeBlockHashes.end());
@@ -1232,47 +1191,6 @@ GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> Include
return Result;
}
-CompressedBuffer
-GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks, RemoteProjectStore::Block& OutBlock)
-{
- const size_t ChunkCount = FetchChunks.size();
-
- std::vector<SharedBuffer> ChunkSegments;
- ChunkSegments.resize(1);
- ChunkSegments.reserve(1 + ChunkCount);
- OutBlock.ChunkHashes.reserve(ChunkCount);
- OutBlock.ChunkLengths.reserve(ChunkCount);
- {
- IoBuffer TempBuffer(ChunkCount * 9);
- MutableMemoryView View = TempBuffer.GetMutableView();
- uint8_t* BufferStartPtr = reinterpret_cast<uint8_t*>(View.GetData());
- uint8_t* BufferEndPtr = BufferStartPtr;
- BufferEndPtr += WriteVarUInt(gsl::narrow<uint64_t>(ChunkCount), BufferEndPtr);
- for (const auto& It : FetchChunks)
- {
- CompositeBuffer Chunk = It.second(It.first);
- uint64_t ChunkSize = 0;
- std::span<const SharedBuffer> Segments = Chunk.GetSegments();
- for (const SharedBuffer& Segment : Segments)
- {
- ChunkSize += Segment.GetSize();
- ChunkSegments.push_back(Segment);
- }
- BufferEndPtr += WriteVarUInt(ChunkSize, BufferEndPtr);
- OutBlock.ChunkHashes.push_back(It.first);
- OutBlock.ChunkLengths.push_back(gsl::narrow<uint32_t>(ChunkSize));
- }
- ZEN_ASSERT(BufferEndPtr <= View.GetDataEnd());
- ptrdiff_t TempBufferLength = std::distance(BufferStartPtr, BufferEndPtr);
- ChunkSegments[0] = SharedBuffer(IoBuffer(TempBuffer, 0, gsl::narrow<size_t>(TempBufferLength)));
- }
- CompressedBuffer CompressedBlock =
- CompressedBuffer::Compress(CompositeBuffer(std::move(ChunkSegments)), OodleCompressor::Mermaid, OodleCompressionLevel::None);
- OutBlock.BlockHash = CompressedBlock.DecodeRawHash();
- OutBlock.FirstChunkOffset = gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + ChunkSegments[0].GetSize());
- return CompressedBlock;
-}
-
CbObject
BuildContainer(CidStore& ChunkStore,
ProjectStore::Project& Project,
@@ -1283,9 +1201,9 @@ BuildContainer(CidStore& ChunkStore,
bool BuildBlocks,
bool IgnoreMissingAttachments,
bool AllowChunking,
- const std::vector<RemoteProjectStore::Block>& KnownBlocks,
+ const std::vector<ChunkBlockDescription>& KnownBlocks,
WorkerThreadPool& WorkerPool,
- const std::function<void(CompressedBuffer&&, RemoteProjectStore::Block&&)>& AsyncOnBlock,
+ const std::function<void(CompressedBuffer&&, ChunkBlockDescription&&)>& AsyncOnBlock,
const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment,
const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks,
bool EmbedLooseFiles,
@@ -1307,9 +1225,9 @@ BuildContainer(CidStore& ChunkStore,
std::unordered_map<IoHash, FoundAttachment, IoHash::Hasher> UploadAttachments;
- RwLock BlocksLock;
- std::vector<RemoteProjectStore::Block> Blocks;
- CompressedBuffer OpsBuffer;
+ RwLock BlocksLock;
+ std::vector<ChunkBlockDescription> Blocks;
+ CompressedBuffer OpsBuffer;
std::filesystem::path AttachmentTempPath = Oplog.TempPath();
AttachmentTempPath.append(".pending");
@@ -1525,7 +1443,7 @@ BuildContainer(CidStore& ChunkStore,
return {};
}
- auto FindReuseBlocks = [](const std::vector<RemoteProjectStore::Block>& KnownBlocks,
+ auto FindReuseBlocks = [](const std::vector<ChunkBlockDescription>& KnownBlocks,
const std::unordered_set<IoHash, IoHash::Hasher>& Attachments,
JobContext* OptionalContext) -> std::vector<size_t> {
std::vector<size_t> ReuseBlockIndexes;
@@ -1538,8 +1456,8 @@ BuildContainer(CidStore& ChunkStore,
for (size_t KnownBlockIndex = 0; KnownBlockIndex < KnownBlocks.size(); KnownBlockIndex++)
{
- const RemoteProjectStore::Block& KnownBlock = KnownBlocks[KnownBlockIndex];
- size_t BlockAttachmentCount = KnownBlock.ChunkHashes.size();
+ const ChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex];
+ size_t BlockAttachmentCount = KnownBlock.ChunkHashes.size();
if (BlockAttachmentCount == 0)
{
continue;
@@ -1586,7 +1504,7 @@ BuildContainer(CidStore& ChunkStore,
std::vector<size_t> ReusedBlockIndexes = FindReuseBlocks(KnownBlocks, FoundHashes, OptionalContext);
for (size_t KnownBlockIndex : ReusedBlockIndexes)
{
- const RemoteProjectStore::Block& KnownBlock = KnownBlocks[KnownBlockIndex];
+ const ChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex];
for (const IoHash& KnownHash : KnownBlock.ChunkHashes)
{
if (UploadAttachments.erase(KnownHash) == 1)
@@ -1632,12 +1550,12 @@ BuildContainer(CidStore& ChunkStore,
return Chunked;
};
- RwLock ResolveLock;
- std::unordered_set<IoHash, IoHash::Hasher> ChunkedHashes;
- std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes;
- std::unordered_map<IoHash, size_t, IoHash::Hasher> ChunkedUploadAttachments;
- std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> LooseUploadAttachments;
- std::unordered_set<IoHash, IoHash::Hasher> MissingHashes;
+ RwLock ResolveLock;
+ std::unordered_set<IoHash, IoHash::Hasher> ChunkedHashes;
+ std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes;
+ std::unordered_map<IoHash, size_t, IoHash::Hasher> ChunkedUploadAttachments;
+ std::unordered_map<IoHash, std::pair<uint64_t, IoBuffer>, IoHash::Hasher> LooseUploadAttachments;
+ std::unordered_set<IoHash, IoHash::Hasher> MissingHashes;
remotestore_impl::ReportMessage(OptionalContext,
fmt::format("Resolving {} attachments from {} ops", UploadAttachments.size(), TotalOpCount));
@@ -1730,7 +1648,7 @@ BuildContainer(CidStore& ChunkStore,
}
else
{
- size_t RawSize = RawData.GetSize();
+ uint64_t RawSize = RawData.GetSize();
CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(RawData),
OodleCompressor::Mermaid,
OodleCompressionLevel::VeryFast);
@@ -1753,8 +1671,8 @@ BuildContainer(CidStore& ChunkStore,
{
UploadAttachment->Size = Compressed.GetCompressedSize();
ResolveLock.WithExclusiveLock(
- [RawHash, &LooseUploadAttachments, Data = std::move(TempAttachmentBuffer)]() {
- LooseUploadAttachments.insert_or_assign(RawHash, std::move(Data));
+ [RawHash, RawSize, &LooseUploadAttachments, Data = std::move(TempAttachmentBuffer)]() {
+ LooseUploadAttachments.insert_or_assign(RawHash, std::make_pair(RawSize, std::move(Data)));
});
}
}
@@ -1927,7 +1845,7 @@ BuildContainer(CidStore& ChunkStore,
std::vector<size_t> ReusedBlockFromChunking = FindReuseBlocks(KnownBlocks, ChunkedHashes, OptionalContext);
for (size_t KnownBlockIndex : ReusedBlockIndexes)
{
- const RemoteProjectStore::Block& KnownBlock = KnownBlocks[KnownBlockIndex];
+ const ChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex];
for (const IoHash& KnownHash : KnownBlock.ChunkHashes)
{
if (ChunkedHashes.erase(KnownHash) == 1)
@@ -2109,16 +2027,25 @@ BuildContainer(CidStore& ChunkStore,
{
if (auto It = LooseUploadAttachments.find(RawHash); It != LooseUploadAttachments.end())
{
- ChunksInBlock.emplace_back(std::make_pair(RawHash, [IoBuffer = SharedBuffer(It->second)](const IoHash&) {
- return CompositeBuffer(IoBuffer);
- }));
+ ChunksInBlock.emplace_back(std::make_pair(
+ RawHash,
+ [RawSize = It->second.first,
+ IoBuffer = SharedBuffer(It->second.second)](const IoHash&) -> std::pair<uint64_t, CompressedBuffer> {
+ return std::make_pair(RawSize, CompressedBuffer::FromCompressedNoValidate(IoBuffer.AsIoBuffer()));
+ }));
LooseUploadAttachments.erase(It);
}
else
{
- ChunksInBlock.emplace_back(std::make_pair(RawHash, [&ChunkStore](const IoHash& RawHash) {
- return CompositeBuffer(SharedBuffer(ChunkStore.FindChunkByCid(RawHash)));
- }));
+ ChunksInBlock.emplace_back(
+ std::make_pair(RawHash, [&ChunkStore](const IoHash& RawHash) -> std::pair<uint64_t, CompressedBuffer> {
+ IoBuffer Chunk = ChunkStore.FindChunkByCid(RawHash);
+ IoHash _;
+ uint64_t RawSize = 0;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), _, RawSize);
+ ZEN_ASSERT(Compressed);
+ return {RawSize, Compressed};
+ }));
}
BlockSize += PayloadSize;
@@ -2169,14 +2096,15 @@ BuildContainer(CidStore& ChunkStore,
if (BlockAttachmentHashes.insert(ChunkHash).second)
{
const ChunkSource& Source = Chunked.ChunkSources[ChunkIndex];
- ChunksInBlock.emplace_back(std::make_pair(
- ChunkHash,
- [Source = ChunkedFile.Source, Offset = Source.Offset, Size = Source.Size](const IoHash&) {
- return CompressedBuffer::Compress(SharedBuffer(IoBuffer(Source, Offset, Size)),
- OodleCompressor::Mermaid,
- OodleCompressionLevel::None)
- .GetCompressed();
- }));
+ ChunksInBlock.emplace_back(
+ std::make_pair(ChunkHash,
+ [Source = ChunkedFile.Source, Offset = Source.Offset, Size = Source.Size](
+ const IoHash&) -> std::pair<uint64_t, CompressedBuffer> {
+ return {Size,
+ CompressedBuffer::Compress(SharedBuffer(IoBuffer(Source, Offset, Size)),
+ OodleCompressor::Mermaid,
+ OodleCompressionLevel::None)};
+ }));
BlockSize += CompressedBuffer::GetHeaderSizeForNoneEncoder() + Source.Size;
if (BuildBlocks)
{
@@ -2298,7 +2226,7 @@ BuildContainer(CidStore& ChunkStore,
OplogContinerWriter.AddBinary("ops"sv, CompressedOpsSection.GetCompressed().Flatten().AsIoBuffer());
OplogContinerWriter.BeginArray("blocks"sv);
{
- for (const RemoteProjectStore::Block& B : Blocks)
+ for (const ChunkBlockDescription& B : Blocks)
{
ZEN_ASSERT(!B.ChunkHashes.empty());
if (BuildBlocks)
@@ -2392,7 +2320,7 @@ BuildContainer(CidStore& ChunkStore,
bool BuildBlocks,
bool IgnoreMissingAttachments,
bool AllowChunking,
- const std::function<void(CompressedBuffer&&, RemoteProjectStore::Block&&)>& AsyncOnBlock,
+ const std::function<void(CompressedBuffer&&, ChunkBlockDescription&&)>& AsyncOnBlock,
const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment,
const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks,
bool EmbedLooseFiles)
@@ -2458,8 +2386,8 @@ SaveOplog(CidStore& ChunkStore,
std::unordered_map<IoHash, remotestore_impl::CreatedBlock, IoHash::Hasher> CreatedBlocks;
tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher> LooseLargeFiles;
- auto MakeTempBlock = [AttachmentTempPath, &RemoteResult, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock,
- RemoteProjectStore::Block&& Block) {
+ auto MakeTempBlock = [AttachmentTempPath, &RemoteResult, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock,
+ ChunkBlockDescription&& Block) {
std::filesystem::path BlockPath = AttachmentTempPath;
BlockPath.append(Block.BlockHash.ToHexString());
try
@@ -2478,8 +2406,8 @@ SaveOplog(CidStore& ChunkStore,
}
};
- auto UploadBlock = [&RemoteStore, &RemoteResult, &Info, OptionalContext](CompressedBuffer&& CompressedBlock,
- RemoteProjectStore::Block&& Block) {
+ auto UploadBlock = [&RemoteStore, &RemoteResult, &Info, OptionalContext](CompressedBuffer&& CompressedBlock,
+ ChunkBlockDescription&& Block) {
IoHash BlockHash = Block.BlockHash;
RemoteProjectStore::SaveAttachmentResult Result =
RemoteStore.SaveAttachment(CompressedBlock.GetCompressed(), BlockHash, std::move(Block));
@@ -2512,7 +2440,7 @@ SaveOplog(CidStore& ChunkStore,
ZEN_DEBUG("Found attachment {}", AttachmentHash);
};
- std::function<void(CompressedBuffer&&, RemoteProjectStore::Block &&)> OnBlock;
+ std::function<void(CompressedBuffer&&, ChunkBlockDescription &&)> OnBlock;
if (RemoteStoreInfo.UseTempBlockFiles)
{
OnBlock = MakeTempBlock;
@@ -2522,7 +2450,7 @@ SaveOplog(CidStore& ChunkStore,
OnBlock = UploadBlock;
}
- std::vector<RemoteProjectStore::Block> KnownBlocks;
+ std::vector<ChunkBlockDescription> KnownBlocks;
uint64_t TransferWallTimeMS = 0;
diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h
index e05cb9923..1ef0416b7 100644
--- a/src/zenserver/projectstore/remoteprojectstore.h
+++ b/src/zenserver/projectstore/remoteprojectstore.h
@@ -5,6 +5,8 @@
#include <zencore/jobqueue.h>
#include "projectstore.h"
+#include <zenutil/chunkblock.h>
+
#include <unordered_set>
namespace zen {
@@ -16,14 +18,6 @@ struct ChunkedInfo;
class RemoteProjectStore
{
public:
- struct Block
- {
- IoHash BlockHash;
- std::vector<IoHash> ChunkHashes;
- std::vector<uint32_t> ChunkLengths;
- uint32_t FirstChunkOffset = (uint32_t)-1;
- };
-
struct Result
{
int32_t ErrorCode{};
@@ -72,7 +66,7 @@ public:
struct GetKnownBlocksResult : public Result
{
- std::vector<Block> Blocks;
+ std::vector<ChunkBlockDescription> Blocks;
};
struct RemoteStoreInfo
@@ -101,11 +95,11 @@ public:
virtual RemoteStoreInfo GetInfo() const = 0;
virtual Stats GetStats() const = 0;
- virtual CreateContainerResult CreateContainer() = 0;
- virtual SaveResult SaveContainer(const IoBuffer& Payload) = 0;
- virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, Block&& Block) = 0;
- virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) = 0;
- virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Payloads) = 0;
+ virtual CreateContainerResult CreateContainer() = 0;
+ virtual SaveResult SaveContainer(const IoBuffer& Payload) = 0;
+ virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&& Block) = 0;
+ virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) = 0;
+ virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Payloads) = 0;
virtual LoadContainerResult LoadContainer() = 0;
virtual GetKnownBlocksResult GetKnownBlocks() = 0;
@@ -125,7 +119,6 @@ struct RemoteStoreOptions
};
typedef std::function<IoBuffer(const IoHash& AttachmentHash)> TGetAttachmentBufferFunc;
-typedef std::function<CompositeBuffer(const IoHash& RawHash)> FetchChunkFunc;
RemoteProjectStore::LoadContainerResult BuildContainer(
CidStore& ChunkStore,
@@ -137,7 +130,7 @@ RemoteProjectStore::LoadContainerResult BuildContainer(
bool BuildBlocks,
bool IgnoreMissingAttachments,
bool AllowChunking,
- const std::function<void(CompressedBuffer&&, RemoteProjectStore::Block&&)>& AsyncOnBlock,
+ const std::function<void(CompressedBuffer&&, ChunkBlockDescription&&)>& AsyncOnBlock,
const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment,
const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks,
bool EmbedLooseFiles);
@@ -173,9 +166,7 @@ RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore,
bool CleanOplog,
JobContext* OptionalContext);
-CompressedBuffer GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks, RemoteProjectStore::Block& OutBlock);
-bool IterateBlock(const SharedBuffer& BlockPayload, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor);
-std::vector<IoHash> GetBlockHashesFromOplog(CbObjectView ContainerObject);
-std::vector<RemoteProjectStore::Block> GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> IncludeBlockHashes);
+std::vector<IoHash> GetBlockHashesFromOplog(CbObjectView ContainerObject);
+std::vector<ChunkBlockDescription> GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> IncludeBlockHashes);
} // namespace zen
diff --git a/src/zenserver/projectstore/zenremoteprojectstore.cpp b/src/zenserver/projectstore/zenremoteprojectstore.cpp
index 42519b108..2ebf58a5d 100644
--- a/src/zenserver/projectstore/zenremoteprojectstore.cpp
+++ b/src/zenserver/projectstore/zenremoteprojectstore.cpp
@@ -93,7 +93,7 @@ public:
return Result;
}
- virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, Block&&) override
+ virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&&) override
{
std::string SaveRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash);
HttpClient::Response Response = m_Client.Post(SaveRequest, Payload, ZenContentType::kCompressedBinary);
diff --git a/src/zenutil/chunkblock.cpp b/src/zenutil/chunkblock.cpp
new file mode 100644
index 000000000..6dae5af11
--- /dev/null
+++ b/src/zenutil/chunkblock.cpp
@@ -0,0 +1,166 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenutil/chunkblock.h>
+
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/logging.h>
+
+#include <vector>
+
+namespace zen {
+
+using namespace std::literals;
+
+ChunkBlockDescription
+ParseChunkBlockDescription(const CbObjectView& BlockObject)
+{
+ ChunkBlockDescription Result;
+ Result.BlockHash = BlockObject["rawHash"sv].AsHash();
+ if (Result.BlockHash != IoHash::Zero)
+ {
+ CbArrayView ChunksArray = BlockObject["rawHashes"sv].AsArrayView();
+ Result.ChunkHashes.reserve(ChunksArray.Num());
+ for (CbFieldView ChunkView : ChunksArray)
+ {
+ Result.ChunkHashes.push_back(ChunkView.AsHash());
+ }
+
+ CbArrayView ChunkRawLengthsArray = BlockObject["chunkRawLengths"sv].AsArrayView();
+ std::vector<uint32_t> ChunkLengths;
+ Result.ChunkRawLengths.reserve(ChunkRawLengthsArray.Num());
+ for (CbFieldView ChunkView : ChunkRawLengthsArray)
+ {
+ Result.ChunkRawLengths.push_back(ChunkView.AsUInt32());
+ }
+ }
+ return Result;
+}
+
+std::vector<ChunkBlockDescription>
+ParseChunkBlockDescriptionList(const CbObjectView& BlocksObject)
+{
+ if (!BlocksObject)
+ {
+ return {};
+ }
+ std::vector<ChunkBlockDescription> Result;
+ CbArrayView Blocks = BlocksObject["blocks"].AsArrayView();
+ Result.reserve(Blocks.Num());
+ for (CbFieldView BlockView : Blocks)
+ {
+ CbObjectView BlockObject = BlockView.AsObjectView();
+ Result.emplace_back(ParseChunkBlockDescription(BlockObject));
+ }
+ return Result;
+}
+
+CbObject
+BuildChunkBlockDescription(const ChunkBlockDescription& Block, CbObjectView MetaData)
+{
+ ZEN_ASSERT(Block.ChunkRawLengths.size() == Block.ChunkHashes.size());
+
+ CbObjectWriter Writer;
+ Writer.AddHash("rawHash"sv, Block.BlockHash);
+ Writer.BeginArray("rawHashes"sv);
+ {
+ for (const IoHash& ChunkHash : Block.ChunkHashes)
+ {
+ Writer.AddHash(ChunkHash);
+ }
+ }
+ Writer.EndArray();
+ Writer.BeginArray("chunkRawLengths");
+ {
+ for (uint32_t ChunkSize : Block.ChunkRawLengths)
+ {
+ Writer.AddInteger(ChunkSize);
+ }
+ }
+ Writer.EndArray();
+
+ Writer.AddObject("metadata", MetaData);
+
+ return Writer.Save();
+}
+
+CompressedBuffer
+GenerateChunkBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks, ChunkBlockDescription& OutBlock)
+{
+ const size_t ChunkCount = FetchChunks.size();
+
+ std::vector<SharedBuffer> ChunkSegments;
+ ChunkSegments.resize(1);
+ ChunkSegments.reserve(1 + ChunkCount);
+ OutBlock.ChunkHashes.reserve(ChunkCount);
+ OutBlock.ChunkRawLengths.reserve(ChunkCount);
+ {
+ IoBuffer TempBuffer(ChunkCount * 9);
+ MutableMemoryView View = TempBuffer.GetMutableView();
+ uint8_t* BufferStartPtr = reinterpret_cast<uint8_t*>(View.GetData());
+ uint8_t* BufferEndPtr = BufferStartPtr;
+ BufferEndPtr += WriteVarUInt(gsl::narrow<uint64_t>(ChunkCount), BufferEndPtr);
+ for (const auto& It : FetchChunks)
+ {
+ std::pair<uint64_t, CompressedBuffer> Chunk = It.second(It.first);
+ uint64_t ChunkSize = 0;
+ std::span<const SharedBuffer> Segments = Chunk.second.GetCompressed().GetSegments();
+ for (const SharedBuffer& Segment : Segments)
+ {
+ ChunkSize += Segment.GetSize();
+ ChunkSegments.push_back(Segment);
+ }
+ BufferEndPtr += WriteVarUInt(ChunkSize, BufferEndPtr);
+ OutBlock.ChunkHashes.push_back(It.first);
+ OutBlock.ChunkRawLengths.push_back(gsl::narrow<uint32_t>(Chunk.first));
+ }
+ ZEN_ASSERT(BufferEndPtr <= View.GetDataEnd());
+ ptrdiff_t TempBufferLength = std::distance(BufferStartPtr, BufferEndPtr);
+ ChunkSegments[0] = SharedBuffer(IoBuffer(TempBuffer, 0, gsl::narrow<size_t>(TempBufferLength)));
+ }
+ CompressedBuffer CompressedBlock =
+ CompressedBuffer::Compress(CompositeBuffer(std::move(ChunkSegments)), OodleCompressor::Mermaid, OodleCompressionLevel::None);
+ OutBlock.BlockHash = CompressedBlock.DecodeRawHash();
+ return CompressedBlock;
+}
+
+bool
+IterateChunkBlock(const SharedBuffer& BlockPayload, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor)
+{
+ ZEN_ASSERT(BlockPayload);
+ if (BlockPayload.GetSize() < 1)
+ {
+ return false;
+ }
+
+ MemoryView BlockView = BlockPayload.GetView();
+ const uint8_t* ReadPtr = reinterpret_cast<const uint8_t*>(BlockView.GetData());
+ uint32_t NumberSize;
+ uint64_t ChunkCount = ReadVarUInt(ReadPtr, NumberSize);
+ ReadPtr += NumberSize;
+ std::vector<uint64_t> ChunkSizes;
+ ChunkSizes.reserve(ChunkCount);
+ while (ChunkCount--)
+ {
+ ChunkSizes.push_back(ReadVarUInt(ReadPtr, NumberSize));
+ ReadPtr += NumberSize;
+ }
+ for (uint64_t ChunkSize : ChunkSizes)
+ {
+ IoBuffer Chunk(IoBuffer::Wrap, ReadPtr, ChunkSize);
+ IoHash AttachmentRawHash;
+ uint64_t AttachmentRawSize;
+ CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), AttachmentRawHash, AttachmentRawSize);
+
+ if (!CompressedChunk)
+ {
+ ZEN_ERROR("Invalid chunk in block");
+ return false;
+ }
+ Visitor(std::move(CompressedChunk), AttachmentRawHash);
+ ReadPtr += ChunkSize;
+ ZEN_ASSERT(ReadPtr <= BlockView.GetDataEnd());
+ }
+ return true;
+};
+
+} // namespace zen
diff --git a/src/zenstore/chunkedfile.cpp b/src/zenutil/chunkedfile.cpp
index f200bc1ec..c08492eb0 100644
--- a/src/zenstore/chunkedfile.cpp
+++ b/src/zenutil/chunkedfile.cpp
@@ -1,7 +1,8 @@
// Copyright Epic Games, Inc. All Rights Reserved.
+#include <zenutil/chunkedfile.h>
+
#include <zencore/basicfile.h>
-#include <zenstore/chunkedfile.h>
#include "chunking.h"
@@ -111,7 +112,7 @@ Reconstruct(const ChunkedInfo& Info, const std::filesystem::path& TargetPath, st
}
ChunkedInfoWithSource
-ChunkData(BasicFile& RawData, uint64_t Offset, uint64_t Size, ChunkedParams Params)
+ChunkData(BasicFile& RawData, uint64_t Offset, uint64_t Size, ChunkedParams Params, std::atomic<uint64_t>* BytesProcessed)
{
ChunkedInfoWithSource Result;
tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> FoundChunks;
@@ -163,6 +164,10 @@ ChunkData(BasicFile& RawData, uint64_t Offset, uint64_t Size, ChunkedParams Para
SliceSize = SliceView.GetSize();
Offset += ChunkLength;
+ if (BytesProcessed != nullptr)
+ {
+ BytesProcessed->fetch_add(ChunkLength);
+ }
}
Result.Info.RawSize = Size;
Result.Info.RawHash = RawHashStream.GetHash();
diff --git a/src/zenstore/chunking.cpp b/src/zenutil/chunking.cpp
index 30edd322a..30edd322a 100644
--- a/src/zenstore/chunking.cpp
+++ b/src/zenutil/chunking.cpp
diff --git a/src/zenstore/chunking.h b/src/zenutil/chunking.h
index 09c56454f..09c56454f 100644
--- a/src/zenstore/chunking.h
+++ b/src/zenutil/chunking.h
diff --git a/src/zenutil/include/zenutil/chunkblock.h b/src/zenutil/include/zenutil/chunkblock.h
new file mode 100644
index 000000000..9b7414629
--- /dev/null
+++ b/src/zenutil/include/zenutil/chunkblock.h
@@ -0,0 +1,32 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/iohash.h>
+
+#include <zencore/compactbinary.h>
+#include <zencore/compress.h>
+
+#include <optional>
+#include <vector>
+
+namespace zen {
+
+struct ChunkBlockDescription
+{
+ IoHash BlockHash;
+ std::vector<IoHash> ChunkHashes;
+ std::vector<uint32_t> ChunkRawLengths;
+};
+
+std::vector<ChunkBlockDescription> ParseChunkBlockDescriptionList(const CbObjectView& BlocksObject);
+ChunkBlockDescription ParseChunkBlockDescription(const CbObjectView& BlockObject);
+CbObject BuildChunkBlockDescription(const ChunkBlockDescription& Block, CbObjectView MetaData);
+
+typedef std::function<std::pair<uint64_t, CompressedBuffer>(const IoHash& RawHash)> FetchChunkFunc;
+
+CompressedBuffer GenerateChunkBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks, ChunkBlockDescription& OutBlock);
+bool IterateChunkBlock(const SharedBuffer& BlockPayload,
+ std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor);
+
+} // namespace zen
diff --git a/src/zenstore/include/zenstore/chunkedfile.h b/src/zenutil/include/zenutil/chunkedfile.h
index c6330bdbd..7110ad317 100644
--- a/src/zenstore/include/zenstore/chunkedfile.h
+++ b/src/zenutil/include/zenutil/chunkedfile.h
@@ -43,7 +43,11 @@ struct ChunkedParams
static const ChunkedParams UShaderByteCodeParams = {.UseThreshold = true, .MinSize = 17280, .MaxSize = 139264, .AvgSize = 36340};
-ChunkedInfoWithSource ChunkData(BasicFile& RawData, uint64_t Offset, uint64_t Size, ChunkedParams Params = {});
+ChunkedInfoWithSource ChunkData(BasicFile& RawData,
+ uint64_t Offset,
+ uint64_t Size,
+ ChunkedParams Params = {},
+ std::atomic<uint64_t>* BytesProcessed = nullptr);
void Reconstruct(const ChunkedInfo& Info,
const std::filesystem::path& TargetPath,
std::function<IoBuffer(const IoHash& ChunkHash)> GetChunk);
diff --git a/src/zenutil/zenutil.cpp b/src/zenutil/zenutil.cpp
index c54144549..19eb63ce9 100644
--- a/src/zenutil/zenutil.cpp
+++ b/src/zenutil/zenutil.cpp
@@ -6,6 +6,7 @@
# include <zenutil/cache/cacherequests.h>
# include <zenutil/cache/rpcrecording.h>
+# include <zenutil/chunkedfile.h>
namespace zen {
@@ -15,6 +16,7 @@ zenutil_forcelinktests()
cachepolicy_forcelink();
cache::rpcrecord_forcelink();
cacherequests_forcelink();
+ chunkedfile_forcelink();
}
} // namespace zen