aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-02-04 11:39:28 +0100
committerDan Engelbrecht <[email protected]>2026-02-10 10:12:22 +0100
commit401bb08cc57593b8994962940b10e2e552c69a78 (patch)
treef4d84278e6c34fece90cf9f7393159b3d9930541 /src
parentadd imgui dependency (diff)
downloadzen-401bb08cc57593b8994962940b10e2e552c69a78.tar.xz
zen-401bb08cc57593b8994962940b10e2e552c69a78.zip
callback interface for tracking progress of builds download
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/builds_cmd.cpp9
-rw-r--r--src/zenremotestore/builds/buildstorageoperations.cpp427
-rw-r--r--src/zenremotestore/builds/updatefolderstream.cpp631
-rw-r--r--src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h39
-rw-r--r--src/zenremotestore/include/zenremotestore/builds/updatefolderstream.h226
5 files changed, 1190 insertions, 142 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index f4edb65ab..2e621608e 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -6,6 +6,7 @@
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinaryfile.h>
#include <zencore/compactbinaryfmt.h>
+#include <zencore/compactbinaryutil.h>
#include <zencore/compactbinaryvalue.h>
#include <zencore/compress.h>
#include <zencore/except.h>
@@ -31,6 +32,7 @@
#include <zenremotestore/builds/buildstorageutil.h>
#include <zenremotestore/builds/filebuildstorage.h>
#include <zenremotestore/builds/jupiterbuildstorage.h>
+#include <zenremotestore/builds/updatefolderstream.h>
#include <zenremotestore/chunking/chunkblock.h>
#include <zenremotestore/chunking/chunkedcontent.h>
#include <zenremotestore/chunking/chunkedfile.h>
@@ -1507,9 +1509,12 @@ namespace {
.PopulateCache = Options.PopulateCache});
{
ProgressBar::PushLogOperation(ProgressMode, "Download");
- auto _ = MakeGuard([]() { ProgressBar::PopLogOperation(ProgressMode); });
+ auto _ = MakeGuard([]() { ProgressBar::PopLogOperation(ProgressMode); });
+
+ std::unique_ptr<UpdateFolderStream> Feedback = CreateDiskUpdateFolderStream("D:\\Temp\\download_log.zrb");
+
FolderContent UpdatedLocalFolderState;
- Updater.Execute(UpdatedLocalFolderState);
+ Updater.Execute(UpdatedLocalFolderState, Feedback.get());
LocalState.State.ChunkedContent = RemoteContent;
LocalState.FolderState = std::move(UpdatedLocalFolderState);
diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp
index 2319ad66d..52062f6ff 100644
--- a/src/zenremotestore/builds/buildstorageoperations.cpp
+++ b/src/zenremotestore/builds/buildstorageoperations.cpp
@@ -8,6 +8,7 @@
#include <zenremotestore/builds/buildstorage.h>
#include <zenremotestore/builds/buildstoragecache.h>
#include <zenremotestore/builds/buildstorageutil.h>
+#include <zenremotestore/builds/updatefolderstream.h>
#include <zenremotestore/chunking/chunkblock.h>
#include <zenremotestore/chunking/chunkingcache.h>
#include <zenremotestore/chunking/chunkingcontroller.h>
@@ -552,8 +553,10 @@ BuildsOperationUpdateFolder::BuildsOperationUpdateFolder(OperationLogOutput&
}
void
-BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
+BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateFolderStream* OptionalUpdateFolderStream)
{
+ ZEN_UNUSED(OptionalUpdateFolderStream);
+
ZEN_TRACE_CPU("BuildsOperationUpdateFolder::Execute");
try
{
@@ -1244,6 +1247,55 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
}
}
+ if (OptionalUpdateFolderStream)
+ {
+ std::vector<UpdateFolderStream::Chunk> DownloadChunks;
+ DownloadChunks.reserve(LooseChunkHashWorks.size());
+ for (const LooseChunkHashWorkData& LooseChunkHashWork : LooseChunkHashWorks)
+ {
+ DownloadChunks.push_back(UpdateFolderStream::Chunk{.RemoteChunkIndex = LooseChunkHashWork.RemoteChunkIndex});
+ }
+ std::vector<UpdateFolderStream::BlockRange> DownloadBlockRanges;
+ DownloadBlockRanges.reserve(BlockRangeWorks.size());
+ for (const BlockRangeDescriptor& BlockRange : BlockRangeWorks)
+ {
+ DownloadBlockRanges.push_back(
+ UpdateFolderStream::BlockRange{.BlockIndex = BlockRange.BlockIndex,
+ .RangeStart = gsl::narrow<uint32_t>(BlockRange.RangeStart),
+ .RangeLength = gsl::narrow<uint32_t>(BlockRange.RangeLength)});
+ }
+
+ std::vector<UpdateFolderStream::Block> DownloadBlocks;
+ for (uint32_t BlockIndex : FullBlockWorks)
+ {
+ DownloadBlocks.push_back(UpdateFolderStream::Block{.BlockIndex = BlockIndex});
+ }
+ std::vector<UpdateFolderStream::Sequence> SequencesToWrite;
+ for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < SequenceIndexChunksLeftToWriteCounters.size();
+ RemoteSequenceIndex++)
+ {
+ if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() > 0)
+ {
+ SequencesToWrite.push_back(UpdateFolderStream::Sequence{.RemoteSequenceIndex = RemoteSequenceIndex});
+ }
+ }
+ for (const ScavengedSequenceCopyOperation& ScaveneCopyOp : ScavengedSequenceCopyOperations)
+ {
+ SequencesToWrite.push_back(UpdateFolderStream::Sequence{ScaveneCopyOp.RemoteSequenceIndex});
+ }
+ std::sort(SequencesToWrite.begin(),
+ SequencesToWrite.end(),
+ [](const UpdateFolderStream::Sequence& Lhs, const UpdateFolderStream::Sequence& Rhs) {
+ return Lhs.RemoteSequenceIndex < Rhs.RemoteSequenceIndex;
+ });
+ OptionalUpdateFolderStream->Initialize(m_RemoteContent,
+ m_BlockDescriptions,
+ DownloadChunks,
+ DownloadBlockRanges,
+ DownloadBlocks,
+ SequencesToWrite);
+ }
+
BufferedWriteFileCache WriteCache;
for (uint32_t ScavengeOpIndex = 0; ScavengeOpIndex < ScavengedSequenceCopyOperations.size(); ScavengeOpIndex++)
@@ -1263,7 +1315,8 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
&FilteredWrittenBytesPerSecond,
ScavengeOpIndex,
&WritePartsComplete,
- TotalPartWriteCount](std::atomic<bool>&) mutable {
+ TotalPartWriteCount,
+ OptionalUpdateFolderStream](std::atomic<bool>&) mutable {
if (!m_AbortFlag)
{
ZEN_TRACE_CPU("Async_WriteScavenged");
@@ -1276,6 +1329,12 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
WriteScavengedSequenceToCache(ScavengeRootPath, ScavengedContent, ScavengeOp);
+ if (OptionalUpdateFolderStream)
+ {
+ OptionalUpdateFolderStream->WroteSequence(ScavengeOp.RemoteSequenceIndex, 0, ScavengeOp.RawSize);
+ OptionalUpdateFolderStream->CompletedSequence(ScavengeOp.RemoteSequenceIndex);
+ }
+
WritePartsComplete++;
if (WritePartsComplete == TotalPartWriteCount)
{
@@ -1316,7 +1375,8 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
TotalPartWriteCount,
&WriteCache,
&FilteredDownloadedBytesPerSecond,
- &FilteredWrittenBytesPerSecond](std::atomic<bool>&) mutable {
+ &FilteredWrittenBytesPerSecond,
+ OptionalUpdateFolderStream](std::atomic<bool>&) mutable {
ZEN_TRACE_CPU("Async_ReadPreDownloadedChunk");
if (!m_AbortFlag)
{
@@ -1332,7 +1392,8 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
TotalRequestCount,
TotalPartWriteCount,
FilteredDownloadedBytesPerSecond,
- FilteredWrittenBytesPerSecond);
+ FilteredWrittenBytesPerSecond,
+ OptionalUpdateFolderStream);
}
},
WorkerThreadPool::EMode::EnableBacklog);
@@ -1352,55 +1413,59 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
break;
}
- Work.ScheduleWork(m_IOWorkerPool,
- [this,
- &CloneQuery,
- &SequenceIndexChunksLeftToWriteCounters,
- &WriteCache,
- &Work,
- &FilteredWrittenBytesPerSecond,
- &CopyChunkDatas,
- &ScavengedContents,
- &ScavengedLookups,
- &ScavengedPaths,
- &WritePartsComplete,
- TotalPartWriteCount,
- CopyDataIndex](std::atomic<bool>&) {
- if (!m_AbortFlag)
- {
- ZEN_TRACE_CPU("Async_CopyLocal");
-
- FilteredWrittenBytesPerSecond.Start();
- const CopyChunkData& CopyData = CopyChunkDatas[CopyDataIndex];
-
- std::vector<uint32_t> WrittenSequenceIndexes = WriteLocalChunkToCache(CloneQuery.get(),
- CopyData,
- ScavengedContents,
- ScavengedLookups,
- ScavengedPaths,
- WriteCache);
- WritePartsComplete++;
- if (!m_AbortFlag)
- {
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
- }
+ Work.ScheduleWork(
+ m_IOWorkerPool,
+ [this,
+ &CloneQuery,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &WriteCache,
+ &Work,
+ &FilteredWrittenBytesPerSecond,
+ &CopyChunkDatas,
+ &ScavengedContents,
+ &ScavengedLookups,
+ &ScavengedPaths,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ CopyDataIndex,
+ OptionalUpdateFolderStream](std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_CopyLocal");
- // Write tracking, updating this must be done without any files open
- std::vector<uint32_t> CompletedChunkSequences;
- for (uint32_t RemoteSequenceIndex : WrittenSequenceIndexes)
- {
- if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters))
- {
- CompletedChunkSequences.push_back(RemoteSequenceIndex);
- }
- }
- WriteCache.Close(CompletedChunkSequences);
- VerifyAndCompleteChunkSequencesAsync(CompletedChunkSequences, Work);
- }
- }
- });
+ FilteredWrittenBytesPerSecond.Start();
+ const CopyChunkData& CopyData = CopyChunkDatas[CopyDataIndex];
+
+ std::vector<uint32_t> WrittenSequenceIndexes = WriteLocalChunkToCache(CloneQuery.get(),
+ CopyData,
+ ScavengedContents,
+ ScavengedLookups,
+ ScavengedPaths,
+ WriteCache,
+ OptionalUpdateFolderStream);
+
+ WritePartsComplete++;
+ if (!m_AbortFlag)
+ {
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
+
+ // Write tracking, updating this must be done without any files open
+ std::vector<uint32_t> CompletedChunkSequences;
+ for (uint32_t RemoteSequenceIndex : WrittenSequenceIndexes)
+ {
+ if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters))
+ {
+ CompletedChunkSequences.push_back(RemoteSequenceIndex);
+ }
+ }
+ WriteCache.Close(CompletedChunkSequences);
+ VerifyAndCompleteChunkSequencesAsync(CompletedChunkSequences, Work, OptionalUpdateFolderStream);
+ }
+ }
+ });
}
for (uint32_t BlockIndex : CachedChunkBlockIndexes)
@@ -1421,7 +1486,8 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
&FilteredWrittenBytesPerSecond,
&WritePartsComplete,
TotalPartWriteCount,
- BlockIndex](std::atomic<bool>&) mutable {
+ BlockIndex,
+ OptionalUpdateFolderStream](std::atomic<bool>&) mutable {
if (!m_AbortFlag)
{
ZEN_TRACE_CPU("Async_WriteCachedBlock");
@@ -1444,12 +1510,17 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
Work,
CompositeBuffer(std::move(BlockBuffer)),
RemoteChunkIndexNeedsCopyFromSourceFlags,
- WriteCache))
+ WriteCache,
+ OptionalUpdateFolderStream))
{
std::error_code DummyEc;
RemoveFile(BlockChunkPath, DummyEc);
throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash));
}
+ if (OptionalUpdateFolderStream)
+ {
+ OptionalUpdateFolderStream->CompletedBlock(BlockIndex);
+ }
std::error_code Ec = TryRemoveFile(BlockChunkPath);
if (Ec)
@@ -1493,7 +1564,8 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
&FilteredWrittenBytesPerSecond,
&Work,
&BlockRangeWorks,
- BlockRangeIndex](std::atomic<bool>&) {
+ BlockRangeIndex,
+ OptionalUpdateFolderStream](std::atomic<bool>&) {
if (!m_AbortFlag)
{
ZEN_TRACE_CPU("Async_GetPartialBlock");
@@ -1515,12 +1587,21 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
TotalPartWriteCount,
&FilteredDownloadedBytesPerSecond,
&FilteredWrittenBytesPerSecond,
- &BlockRange](IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath) {
+ &BlockRange,
+ OptionalUpdateFolderStream](IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath) {
if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount)
{
FilteredDownloadedBytesPerSecond.Stop();
}
+ if (OptionalUpdateFolderStream)
+ {
+ const uint32_t BlockIndex = BlockRange.BlockIndex;
+ OptionalUpdateFolderStream->DownloadedBlockRange(BlockIndex,
+ gsl::narrow<uint32_t>(BlockRange.RangeStart),
+ gsl::narrow<uint32_t>(BlockRange.RangeLength));
+ }
+
if (!m_AbortFlag)
{
Work.ScheduleWork(
@@ -1535,7 +1616,8 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
&FilteredWrittenBytesPerSecond,
&BlockRange,
BlockChunkPath = std::filesystem::path(OnDiskPath),
- BlockPartialBuffer = std::move(InMemoryBuffer)](std::atomic<bool>&) mutable {
+ BlockPartialBuffer = std::move(InMemoryBuffer),
+ OptionalUpdateFolderStream](std::atomic<bool>&) mutable {
if (!m_AbortFlag)
{
ZEN_TRACE_CPU("Async_WritePartialBlock");
@@ -1563,6 +1645,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
FilteredWrittenBytesPerSecond.Start();
+ // TODO: Add OptionalUpdateFolderStream calls for chunks / chunk sequences
if (!WritePartialBlockChunksToCache(
BlockDescription,
SequenceIndexChunksLeftToWriteCounters,
@@ -1571,7 +1654,8 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
BlockRange.ChunkBlockIndexStart,
BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1,
RemoteChunkIndexNeedsCopyFromSourceFlags,
- WriteCache))
+ WriteCache,
+ OptionalUpdateFolderStream))
{
std::error_code DummyEc;
RemoveFile(BlockChunkPath, DummyEc);
@@ -1589,6 +1673,14 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
Ec.message());
}
+ if (OptionalUpdateFolderStream)
+ {
+ OptionalUpdateFolderStream->CompletedBlockRange(
+ BlockIndex,
+ gsl::narrow<uint32_t>(BlockRange.RangeStart),
+ gsl::narrow<uint32_t>(BlockRange.RangeLength));
+ }
+
WritePartsComplete++;
if (WritePartsComplete == TotalPartWriteCount)
{
@@ -1630,7 +1722,8 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
&SequenceIndexChunksLeftToWriteCounters,
&FilteredDownloadedBytesPerSecond,
TotalRequestCount,
- BlockIndex](std::atomic<bool>&) {
+ BlockIndex,
+ OptionalUpdateFolderStream](std::atomic<bool>&) {
if (!m_AbortFlag)
{
ZEN_TRACE_CPU("Async_GetFullBlock");
@@ -1661,6 +1754,12 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
{
throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
}
+
+ if (OptionalUpdateFolderStream)
+ {
+ OptionalUpdateFolderStream->DownloadedBlock(BlockIndex);
+ }
+
if (!m_AbortFlag)
{
uint64_t BlockSize = BlockBuffer.GetSize();
@@ -1727,7 +1826,8 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
TotalPartWriteCount,
&FilteredWrittenBytesPerSecond,
BlockChunkPath,
- BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable {
+ BlockBuffer = std::move(BlockBuffer),
+ OptionalUpdateFolderStream](std::atomic<bool>&) mutable {
if (!m_AbortFlag)
{
ZEN_TRACE_CPU("Async_WriteFullBlock");
@@ -1752,12 +1852,14 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
}
FilteredWrittenBytesPerSecond.Start();
+
if (!WriteChunksBlockToCache(BlockDescription,
SequenceIndexChunksLeftToWriteCounters,
Work,
CompositeBuffer(std::move(BlockBuffer)),
RemoteChunkIndexNeedsCopyFromSourceFlags,
- WriteCache))
+ WriteCache,
+ OptionalUpdateFolderStream))
{
std::error_code DummyEc;
RemoveFile(BlockChunkPath, DummyEc);
@@ -1778,6 +1880,11 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
}
}
+ if (OptionalUpdateFolderStream)
+ {
+ OptionalUpdateFolderStream->CompletedBlock(BlockIndex);
+ }
+
WritePartsComplete++;
if (WritePartsComplete == TotalPartWriteCount)
@@ -1860,6 +1967,12 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
FilteredDownloadedBytesPerSecond.Stop();
WriteProgressBar.Finish();
+
+ if (OptionalUpdateFolderStream)
+ {
+ OptionalUpdateFolderStream->WriteComplete();
+ }
+
if (m_AbortFlag)
{
return;
@@ -3064,8 +3177,9 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd
ParallelWork& Work,
uint64_t TotalRequestCount,
uint64_t TotalPartWriteCount,
- FilteredRate& FilteredDownloadedBytesPerSecond,
- FilteredRate& FilteredWrittenBytesPerSecond)
+ FilteredRate& FilteredDownloadedBytesPerSecond,
+ FilteredRate& FilteredWrittenBytesPerSecond,
+ UpdateFolderStream* OptionalUpdateFolderStream)
{
std::filesystem::path ExistingCompressedChunkPath;
if (!m_Options.PrimeCacheOnly)
@@ -3095,6 +3209,7 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd
TotalPartWriteCount,
&FilteredWrittenBytesPerSecond,
RemoteChunkIndex,
+ OptionalUpdateFolderStream,
ChunkTargetPtrs = std::move(ChunkTargetPtrs),
CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>& AbortFlag) mutable {
if (!AbortFlag)
@@ -3112,8 +3227,11 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd
fmt::format("Could not open dowloaded compressed chunk {} from {}", ChunkHash, CompressedChunkPath));
}
- bool NeedHashVerify =
- WriteCompressedChunkToCache(ChunkHash, ChunkTargetPtrs, WriteCache, std::move(CompressedPart));
+ bool NeedHashVerify = WriteCompressedChunkToCache(RemoteChunkIndex,
+ ChunkTargetPtrs,
+ WriteCache,
+ std::move(CompressedPart),
+ OptionalUpdateFolderStream);
WritePartsComplete++;
if (!AbortFlag)
@@ -3138,11 +3256,11 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd
WriteCache.Close(CompletedSequences);
if (NeedHashVerify)
{
- VerifyAndCompleteChunkSequencesAsync(CompletedSequences, Work);
+ VerifyAndCompleteChunkSequencesAsync(CompletedSequences, Work, OptionalUpdateFolderStream);
}
else
{
- FinalizeChunkSequences(CompletedSequences);
+ FinalizeChunkSequences(CompletedSequences, OptionalUpdateFolderStream);
}
}
}
@@ -3159,6 +3277,7 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd
&WritePartsComplete,
TotalPartWriteCount,
TotalRequestCount,
+ OptionalUpdateFolderStream,
&FilteredDownloadedBytesPerSecond,
&FilteredWrittenBytesPerSecond,
RemoteChunkIndex,
@@ -3183,6 +3302,7 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd
RemoteChunkIndex,
&FilteredDownloadedBytesPerSecond,
&FilteredWrittenBytesPerSecond,
+ OptionalUpdateFolderStream,
ChunkTargetPtrs = std::move(ChunkTargetPtrs)](IoBuffer&& Payload) mutable {
if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount)
{
@@ -3200,7 +3320,8 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd
WritePartsComplete,
TotalPartWriteCount,
FilteredWrittenBytesPerSecond,
- EnableBacklog);
+ EnableBacklog,
+ OptionalUpdateFolderStream);
});
}
});
@@ -3639,7 +3760,8 @@ BuildsOperationUpdateFolder::WriteLocalChunkToCache(CloneQueryInterface* C
const std::vector<ChunkedFolderContent>& ScavengedContents,
const std::vector<ChunkedContentLookup>& ScavengedLookups,
const std::vector<std::filesystem::path>& ScavengedPaths,
- BufferedWriteFileCache& WriteCache)
+ BufferedWriteFileCache& WriteCache,
+ UpdateFolderStream* OptionalUpdateFolderStream)
{
ZEN_TRACE_CPU("WriteLocalChunkToCache");
@@ -3811,12 +3933,22 @@ BuildsOperationUpdateFolder::WriteLocalChunkToCache(CloneQueryInterface* C
m_WrittenChunkByteCount += ClonableBytes;
+ if (OptionalUpdateFolderStream)
+ {
+ OptionalUpdateFolderStream->WroteSequence(RemoteSequenceIndex, Op.Target->Offset + PreBytes, ClonableBytes);
+ }
+
if (PreBytes > 0)
{
CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, PreBytes);
const uint64_t FileOffset = Op.Target->Offset;
WriteSequenceChunkToCache(LocalWriter, ChunkSource, RemoteSequenceIndex, FileOffset, RemotePathIndex);
+
+ if (OptionalUpdateFolderStream)
+ {
+ OptionalUpdateFolderStream->WroteSequence(RemoteSequenceIndex, FileOffset, ChunkSource.GetSize());
+ }
}
if (PostBytes > 0)
{
@@ -3824,6 +3956,11 @@ BuildsOperationUpdateFolder::WriteLocalChunkToCache(CloneQueryInterface* C
const uint64_t FileOffset = Op.Target->Offset + ReadLength - PostBytes;
WriteSequenceChunkToCache(LocalWriter, ChunkSource, RemoteSequenceIndex, FileOffset, RemotePathIndex);
+
+ if (OptionalUpdateFolderStream)
+ {
+ OptionalUpdateFolderStream->WroteSequence(RemoteSequenceIndex, FileOffset, ChunkSource.GetSize());
+ }
}
}
}
@@ -3836,6 +3973,11 @@ BuildsOperationUpdateFolder::WriteLocalChunkToCache(CloneQueryInterface* C
const uint64_t FileOffset = Op.Target->Offset;
WriteSequenceChunkToCache(LocalWriter, ChunkSource, RemoteSequenceIndex, FileOffset, RemotePathIndex);
+
+ if (OptionalUpdateFolderStream)
+ {
+ OptionalUpdateFolderStream->WroteSequence(RemoteSequenceIndex, FileOffset, ChunkSource.GetSize());
+ }
}
}
@@ -3862,20 +4004,20 @@ BuildsOperationUpdateFolder::WriteLocalChunkToCache(CloneQueryInterface* C
bool
BuildsOperationUpdateFolder::WriteCompressedChunkToCache(
- const IoHash& ChunkHash,
+ const uint32_t RemoteChunkIndex,
const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs,
BufferedWriteFileCache& WriteCache,
- IoBuffer&& CompressedPart)
+ IoBuffer&& CompressedPart,
+ UpdateFolderStream* OptionalUpdateFolderStream)
{
ZEN_TRACE_CPU("WriteCompressedChunkToCache");
- auto ChunkHashToChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash);
- ZEN_ASSERT(ChunkHashToChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end());
+ const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+
if (IsSingleFileChunk(m_RemoteContent, ChunkTargetPtrs))
{
- const std::uint32_t SequenceIndex = ChunkTargetPtrs.front()->SequenceIndex;
- const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex];
- StreamDecompress(SequenceRawHash, CompositeBuffer(std::move(CompressedPart)));
+ const std::uint32_t SequenceIndex = ChunkTargetPtrs.front()->SequenceIndex;
+ StreamDecompress(SequenceIndex, CompositeBuffer(std::move(CompressedPart)), OptionalUpdateFolderStream);
return false;
}
else
@@ -3912,6 +4054,11 @@ BuildsOperationUpdateFolder::WriteCompressedChunkToCache(
const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex];
WriteSequenceChunkToCache(LocalWriter, RangeBuffer, SequenceIndex, FileOffset, PathIndex);
+
+ if (OptionalUpdateFolderStream)
+ {
+ OptionalUpdateFolderStream->WroteSequence(SequenceIndex, FileOffset, RangeBuffer.GetSize());
+ }
}
return true;
@@ -3934,9 +4081,13 @@ BuildsOperationUpdateFolder::WriteCompressedChunkToCache(
}
void
-BuildsOperationUpdateFolder::StreamDecompress(const IoHash& SequenceRawHash, CompositeBuffer&& CompressedPart)
+BuildsOperationUpdateFolder::StreamDecompress(uint32_t RemoteSequenceIndex,
+ CompositeBuffer&& CompressedPart,
+ UpdateFolderStream* OptionalUpdateFolderStream)
{
ZEN_TRACE_CPU("StreamDecompress");
+ const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+
const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash);
TemporaryFile DecompressedTemp;
std::error_code Ec;
@@ -3962,33 +4113,37 @@ BuildsOperationUpdateFolder::StreamDecompress(const IoHash& SequenceRawHash, Com
PrepareFileForScatteredWrite(DecompressedTemp.Handle(), RawSize);
IoHashStream Hash;
- bool CouldDecompress =
- Compressed.DecompressToStream(0,
- (uint64_t)-1,
- [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
- ZEN_UNUSED(SourceOffset);
- ZEN_TRACE_CPU("StreamDecompress_Write");
- m_DiskStats.ReadCount++;
- m_DiskStats.ReadByteCount += SourceSize;
- if (!m_AbortFlag)
- {
- for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
- {
- if (m_Options.ValidateCompletedSequences)
- {
- Hash.Append(Segment.GetView());
- m_ValidatedChunkByteCount += Segment.GetSize();
- }
- DecompressedTemp.Write(Segment, Offset);
- Offset += Segment.GetSize();
- m_DiskStats.WriteByteCount += Segment.GetSize();
- m_DiskStats.WriteCount++;
- m_WrittenChunkByteCount += Segment.GetSize();
- }
- return true;
- }
- return false;
- });
+ bool CouldDecompress = Compressed.DecompressToStream(
+ 0,
+ (uint64_t)-1,
+ [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
+ ZEN_UNUSED(SourceOffset);
+ ZEN_TRACE_CPU("StreamDecompress_Write");
+ m_DiskStats.ReadCount++;
+ m_DiskStats.ReadByteCount += SourceSize;
+ if (!m_AbortFlag)
+ {
+ for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
+ {
+ if (m_Options.ValidateCompletedSequences)
+ {
+ Hash.Append(Segment.GetView());
+ m_ValidatedChunkByteCount += Segment.GetSize();
+ }
+ DecompressedTemp.Write(Segment, Offset);
+ if (OptionalUpdateFolderStream)
+ {
+ OptionalUpdateFolderStream->WroteSequence(RemoteSequenceIndex, Offset, Segment.GetSize());
+ }
+ Offset += Segment.GetSize();
+ m_DiskStats.WriteByteCount += Segment.GetSize();
+ m_DiskStats.WriteCount++;
+ m_WrittenChunkByteCount += Segment.GetSize();
+ }
+ return true;
+ }
+ return false;
+ });
if (m_AbortFlag)
{
@@ -4175,7 +4330,8 @@ void
BuildsOperationUpdateFolder::WriteBlockChunkOpsToCache(std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
const BlockWriteOps& Ops,
BufferedWriteFileCache& WriteCache,
- ParallelWork& Work)
+ ParallelWork& Work,
+ UpdateFolderStream* OptionalUpdateFolderStream)
{
ZEN_TRACE_CPU("WriteBlockChunkOpsToCache");
@@ -4196,6 +4352,11 @@ BuildsOperationUpdateFolder::WriteBlockChunkOpsToCache(std::span<std::atomic<uin
const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex];
WriteSequenceChunkToCache(LocalWriter, Chunk, SequenceIndex, FileOffset, PathIndex);
+
+ if (OptionalUpdateFolderStream)
+ {
+ OptionalUpdateFolderStream->WroteSequence(SequenceIndex, FileOffset, Chunk.GetSize());
+ }
}
}
if (!Work.IsAborted())
@@ -4211,7 +4372,7 @@ BuildsOperationUpdateFolder::WriteBlockChunkOpsToCache(std::span<std::atomic<uin
}
}
WriteCache.Close(CompletedChunkSequences);
- VerifyAndCompleteChunkSequencesAsync(CompletedChunkSequences, Work);
+ VerifyAndCompleteChunkSequencesAsync(CompletedChunkSequences, Work, OptionalUpdateFolderStream);
}
}
@@ -4221,7 +4382,8 @@ BuildsOperationUpdateFolder::WriteChunksBlockToCache(const ChunkBlockDescription
ParallelWork& Work,
CompositeBuffer&& BlockBuffer,
std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
- BufferedWriteFileCache& WriteCache)
+ BufferedWriteFileCache& WriteCache,
+ UpdateFolderStream* OptionalUpdateFolderStream)
{
ZEN_TRACE_CPU("WriteChunksBlockToCache");
@@ -4246,7 +4408,7 @@ BuildsOperationUpdateFolder::WriteChunksBlockToCache(const ChunkBlockDescription
gsl::narrow<uint32_t>(BlockDescription.ChunkRawHashes.size() - 1),
Ops))
{
- WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work);
+ WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work, OptionalUpdateFolderStream);
return true;
}
return false;
@@ -4261,7 +4423,7 @@ BuildsOperationUpdateFolder::WriteChunksBlockToCache(const ChunkBlockDescription
gsl::narrow<uint32_t>(BlockDescription.ChunkRawHashes.size() - 1),
Ops))
{
- WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work);
+ WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work, OptionalUpdateFolderStream);
return true;
}
return false;
@@ -4275,7 +4437,8 @@ BuildsOperationUpdateFolder::WritePartialBlockChunksToCache(const ChunkBlockDesc
uint32_t FirstIncludedBlockChunkIndex,
uint32_t LastIncludedBlockChunkIndex,
std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
- BufferedWriteFileCache& WriteCache)
+ BufferedWriteFileCache& WriteCache,
+ UpdateFolderStream* OptionalUpdateFolderStream)
{
ZEN_TRACE_CPU("WritePartialBlockChunksToCache");
@@ -4292,7 +4455,7 @@ BuildsOperationUpdateFolder::WritePartialBlockChunksToCache(const ChunkBlockDesc
LastIncludedBlockChunkIndex,
Ops))
{
- WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work);
+ WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work, OptionalUpdateFolderStream);
return true;
}
else
@@ -4312,7 +4475,8 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa
std::atomic<uint64_t>& WritePartsComplete,
const uint64_t TotalPartWriteCount,
FilteredRate& FilteredWrittenBytesPerSecond,
- bool EnableBacklog)
+ bool EnableBacklog,
+ UpdateFolderStream* OptionalUpdateFolderStream)
{
ZEN_TRACE_CPU("AsyncWriteDownloadedChunk");
@@ -4370,6 +4534,7 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa
&WriteCache,
&WritePartsComplete,
&FilteredWrittenBytesPerSecond,
+ OptionalUpdateFolderStream,
ChunkTargetPtrs = std::move(ChunkTargetPtrs),
CompressedPart = IoBuffer(std::move(Payload))](std::atomic<bool>&) mutable {
if (!m_AbortFlag)
@@ -4394,7 +4559,11 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa
}
}
- bool NeedHashVerify = WriteCompressedChunkToCache(ChunkHash, ChunkTargetPtrs, WriteCache, std::move(CompressedPart));
+ bool NeedHashVerify = WriteCompressedChunkToCache(RemoteChunkIndex,
+ ChunkTargetPtrs,
+ WriteCache,
+ std::move(CompressedPart),
+ OptionalUpdateFolderStream);
if (!m_AbortFlag)
{
WritePartsComplete++;
@@ -4421,11 +4590,11 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa
WriteCache.Close(CompletedSequences);
if (NeedHashVerify)
{
- VerifyAndCompleteChunkSequencesAsync(CompletedSequences, Work);
+ VerifyAndCompleteChunkSequencesAsync(CompletedSequences, Work, OptionalUpdateFolderStream);
}
else
{
- FinalizeChunkSequences(CompletedSequences);
+ FinalizeChunkSequences(CompletedSequences, OptionalUpdateFolderStream);
}
}
}
@@ -4434,7 +4603,9 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa
}
void
-BuildsOperationUpdateFolder::VerifyAndCompleteChunkSequencesAsync(std::span<const uint32_t> RemoteSequenceIndexes, ParallelWork& Work)
+BuildsOperationUpdateFolder::VerifyAndCompleteChunkSequencesAsync(std::span<const uint32_t> RemoteSequenceIndexes,
+ ParallelWork& Work,
+ UpdateFolderStream* OptionalUpdateFolderStream)
{
if (RemoteSequenceIndexes.empty())
{
@@ -4446,7 +4617,7 @@ BuildsOperationUpdateFolder::VerifyAndCompleteChunkSequencesAsync(std::span<cons
for (uint32_t RemoteSequenceIndexOffset = 1; RemoteSequenceIndexOffset < RemoteSequenceIndexes.size(); RemoteSequenceIndexOffset++)
{
const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset];
- Work.ScheduleWork(m_IOWorkerPool, [this, RemoteSequenceIndex](std::atomic<bool>&) {
+ Work.ScheduleWork(m_IOWorkerPool, [this, RemoteSequenceIndex, OptionalUpdateFolderStream](std::atomic<bool>&) {
if (!m_AbortFlag)
{
ZEN_TRACE_CPU("Async_VerifyAndFinalizeSequence");
@@ -4454,8 +4625,7 @@ BuildsOperationUpdateFolder::VerifyAndCompleteChunkSequencesAsync(std::span<cons
VerifySequence(RemoteSequenceIndex);
if (!m_AbortFlag)
{
- const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- FinalizeChunkSequence(SequenceRawHash);
+ FinalizeChunkSequence(RemoteSequenceIndex, OptionalUpdateFolderStream);
}
}
});
@@ -4463,16 +4633,14 @@ BuildsOperationUpdateFolder::VerifyAndCompleteChunkSequencesAsync(std::span<cons
const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[0];
VerifySequence(RemoteSequenceIndex);
- const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- FinalizeChunkSequence(SequenceRawHash);
+ FinalizeChunkSequence(RemoteSequenceIndex, OptionalUpdateFolderStream);
}
else
{
for (uint32_t RemoteSequenceIndexOffset = 0; RemoteSequenceIndexOffset < RemoteSequenceIndexes.size(); RemoteSequenceIndexOffset++)
{
const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset];
- const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- FinalizeChunkSequence(SequenceRawHash);
+ FinalizeChunkSequence(RemoteSequenceIndex, OptionalUpdateFolderStream);
}
}
}
@@ -4506,10 +4674,12 @@ BuildsOperationUpdateFolder::CompleteChunkTargets(const std::vector<const Chunke
}
void
-BuildsOperationUpdateFolder::FinalizeChunkSequence(const IoHash& SequenceRawHash)
+BuildsOperationUpdateFolder::FinalizeChunkSequence(uint32_t RemoteSequenceIndex, UpdateFolderStream* OptionalUpdateFolderStream)
{
ZEN_TRACE_CPU("FinalizeChunkSequence");
+ const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+
ZEN_ASSERT_SLOW(!IsFile(GetFinalChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash)));
std::error_code Ec;
RenameFile(GetTempChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash),
@@ -4519,16 +4689,21 @@ BuildsOperationUpdateFolder::FinalizeChunkSequence(const IoHash& SequenceRawHash
{
throw std::system_error(Ec);
}
+ if (OptionalUpdateFolderStream)
+ {
+ OptionalUpdateFolderStream->CompletedSequence(RemoteSequenceIndex);
+ }
}
void
-BuildsOperationUpdateFolder::FinalizeChunkSequences(std::span<const uint32_t> RemoteSequenceIndexes)
+BuildsOperationUpdateFolder::FinalizeChunkSequences(std::span<const uint32_t> RemoteSequenceIndexes,
+ UpdateFolderStream* OptionalUpdateFolderStream)
{
ZEN_TRACE_CPU("FinalizeChunkSequences");
for (uint32_t SequenceIndex : RemoteSequenceIndexes)
{
- FinalizeChunkSequence(m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
+ FinalizeChunkSequence(SequenceIndex, OptionalUpdateFolderStream);
}
}
diff --git a/src/zenremotestore/builds/updatefolderstream.cpp b/src/zenremotestore/builds/updatefolderstream.cpp
new file mode 100644
index 000000000..5a1809923
--- /dev/null
+++ b/src/zenremotestore/builds/updatefolderstream.cpp
@@ -0,0 +1,631 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenremotestore/builds/updatefolderstream.h>
+
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinaryutil.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/stream.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <xxhash.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#include <numeric>
+
+namespace zen {
+
+using namespace std::literals;
+
+namespace {
+#pragma pack(push)
+#pragma pack(1)
+ struct StreamHeader
+ {
+ static constexpr uint32_t ExpectedMagic = 0x7a726273; // 'zrbs';
+ static constexpr uint32_t CurrentVersion = 1;
+ static constexpr uint32_t UpdateFolderStreamType = HashStringDjb2("UpdateFolderDiskStreamFeedback"sv);
+
+ uint32_t Magic = ExpectedMagic;
+ uint32_t Version = CurrentVersion;
+ uint32_t StreamType = 0;
+ uint32_t PrepPayloadSize = 0;
+ uint64_t Reserved0 = 0;
+ uint32_t Reserved1 = 0;
+ uint32_t Checksum = 0;
+
+ static uint32_t ComputeChecksum(const StreamHeader& Header)
+ {
+ return XXH32(&Header.Magic, sizeof(Header) - sizeof(uint32_t), 0xC0C0'BABA);
+ }
+ };
+#pragma pack(pop)
+ static_assert(sizeof(StreamHeader) == 32);
+
+ class UpdateFolderDiskStreamFeedback : public UpdateFolderStream
+ {
+ using Event = UpdateFolderDiskStream::Event;
+
+ public:
+ explicit UpdateFolderDiskStreamFeedback(const std::filesystem::path& OutputPath) : m_OutputStream(OutputPath) {}
+ virtual ~UpdateFolderDiskStreamFeedback() {}
+
+ virtual void Initialize(const ChunkedFolderContent& RemoteContent,
+ std::span<const ChunkBlockDescription> BlockDescriptions,
+ std::span<const Chunk> DownloadChunks,
+ std::span<const BlockRange> DownloadBlockRanges,
+ std::span<const Block> DownloadBlocks,
+ std::span<const Sequence> SequencesToWrite) override
+ {
+ m_OutputStream
+ .PrepareWrite(RemoteContent, BlockDescriptions, DownloadChunks, DownloadBlockRanges, DownloadBlocks, SequencesToWrite);
+ }
+
+ virtual void DownloadedChunk(uint32_t RemoteChunkIndex) override
+ {
+ m_OutputStream.WriteEvent(
+ m_OutputStream.MakeEvent(Event::EventType::DownloadedChunk,
+ Event::EventPayload{.DownloadedChunk = {.RemoteChunkIndex = RemoteChunkIndex}}));
+ }
+
+ virtual void DownloadedBlockRange(uint32_t BlockIndex, uint32_t RangeStart, uint32_t RangeLength) override
+ {
+ m_OutputStream.WriteEvent(m_OutputStream.MakeEvent(
+ Event::EventType::DownloadedBlockRange,
+ Event::EventPayload{
+ .DownloadedBlockRange = {.BlockIndex = BlockIndex, .RangeStart = RangeStart, .RangeLength = RangeLength}}));
+ }
+
+ virtual void DownloadedBlock(uint32_t BlockIndex) override
+ {
+ m_OutputStream.WriteEvent(m_OutputStream.MakeEvent(Event::EventType::DownloadedBlock,
+ Event::EventPayload{.DownloadedBlock = {.BlockIndex = BlockIndex}}));
+ }
+
+ virtual void WroteSequence(uint32_t RemoteSequenceIndex, uint64_t Offset, uint64_t Length) override
+ {
+ m_OutputStream.WriteEvent(m_OutputStream.MakeEvent(
+ Event::EventType::WroteSequence,
+ Event::EventPayload{.WroteSequence = {.RemoteSequenceIndex = RemoteSequenceIndex, .Offset = Offset, .Length = Length}}));
+ }
+
+ virtual void CompletedBlockRange(uint32_t BlockIndex, uint32_t RangeStart, uint32_t RangeLength) override
+ {
+ m_OutputStream.WriteEvent(m_OutputStream.MakeEvent(
+ Event::EventType::CompletedBlockRange,
+ Event::EventPayload{
+ .CompletedBlockRange = {.BlockIndex = BlockIndex, .RangeStart = RangeStart, .RangeLength = RangeLength}}));
+ }
+
+ virtual void CompletedBlock(uint32_t BlockIndex) override
+ {
+ m_OutputStream.WriteEvent(m_OutputStream.MakeEvent(Event::EventType::CompletedBlock,
+ Event::EventPayload{.CompletedBlock = {.BlockIndex = BlockIndex}}));
+ }
+
+ virtual void CompletedSequence(uint32_t RemoteSequenceIndex) override
+ {
+ m_OutputStream.WriteEvent(
+ m_OutputStream.MakeEvent(Event::EventType::CompletedSequence,
+ Event::EventPayload{.CompletedSequence = {.RemoteSequenceIndex = RemoteSequenceIndex}}));
+ }
+
+ virtual void WriteComplete() override { m_OutputStream.EndWrite(); }
+
+ private:
+ UpdateFolderDiskStream m_OutputStream;
+ };
+} // namespace
+
+std::unique_ptr<UpdateFolderStream>
+CreateDiskUpdateFolderStream(const std::filesystem::path& OutputPath)
+{
+ return std::make_unique<UpdateFolderDiskStreamFeedback>(OutputPath);
+}
+
+UpdateFolderDiskStream::Event::EventHeader
+UpdateFolderDiskStream::Event::MakeHeader(Event::EventType Type, uint64_t TimeStampUs, uint16_t ThreadIndex)
+{
+ return EventHeader{.EventType = Type,
+ .TimestampUsHigh = gsl::narrow<uint8_t>(TimeStampUs >> 32),
+ .TimestampUsLow = static_cast<uint32_t>(TimeStampUs & 0xffffffffu),
+ .ThreadIndex = ThreadIndex};
+}
+
+UpdateFolderDiskStream::UpdateFolderDiskStream(const std::filesystem::path& OutputPath)
+: m_OutputPath(OutputPath)
+, m_Output(OutputPath, BasicFile::Mode::kTruncate)
+, m_WriteOffset(0)
+{
+}
+
+UpdateFolderDiskStream::~UpdateFolderDiskStream()
+{
+ try
+ {
+ m_Output.Close();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed closing streaming output at '{}'. Reason: '{}'", m_OutputPath, Ex.what());
+ }
+}
+
+UpdateFolderDiskStream::Event
+UpdateFolderDiskStream::MakeEvent(Event::EventType Type, Event::EventPayload Payload)
+{
+ return Event{.Header = Event::MakeHeader(Type, m_Stopwatch.GetElapsedTimeUs(), GetCurrentThreadIndex()), .Payload = Payload};
+}
+
+void
+UpdateFolderDiskStream::WriteEvent(const Event& InEvent)
+{
+ uint64_t WriteOffset = m_WriteOffset.fetch_add(sizeof(Event));
+ m_Output.Write(&InEvent, sizeof(Event), WriteOffset);
+ m_EventCount++;
+}
+
+void
+UpdateFolderDiskStream::PrepareWrite(const ChunkedFolderContent& RemoteContent,
+ std::span<const ChunkBlockDescription> BlockDescriptions,
+ std::span<const Chunk> DownloadChunks,
+ std::span<const BlockRange> DownloadBlockRanges,
+ std::span<const Block> DownloadBlocks,
+ std::span<const Sequence> SequencesToWrite)
+{
+ CbObjectWriter Writer;
+ Writer.BeginObject("remoteContent"sv);
+ {
+ compactbinary_helpers::WriteArray(RemoteContent.Paths, "paths"sv, Writer);
+ compactbinary_helpers::WriteArray(RemoteContent.RawSizes, "rawSizes"sv, Writer);
+ compactbinary_helpers::WriteArray(RemoteContent.Attributes, "attributes"sv, Writer);
+ compactbinary_helpers::WriteArray(RemoteContent.RawHashes, "rawHashes"sv, Writer);
+ Writer.BeginObject("chunkedContent"sv);
+ {
+ compactbinary_helpers::WriteArray(RemoteContent.ChunkedContent.SequenceRawHashes, "sequenceRawHashes"sv, Writer);
+ compactbinary_helpers::WriteArray(RemoteContent.ChunkedContent.ChunkCounts, "chunkCounts"sv, Writer);
+ compactbinary_helpers::WriteArray(RemoteContent.ChunkedContent.ChunkOrders, "chunkOrders"sv, Writer);
+ compactbinary_helpers::WriteArray(RemoteContent.ChunkedContent.ChunkHashes, "chunkHashes"sv, Writer);
+ compactbinary_helpers::WriteArray(RemoteContent.ChunkedContent.ChunkRawSizes, "chunkRawSizes"sv, Writer);
+ }
+ Writer.EndObject(); // chunkedContent
+ }
+ Writer.EndObject(); // remoteContent
+
+ Writer.BeginArray("blockDescriptions"sv);
+ for (const ChunkBlockDescription& Block : BlockDescriptions)
+ {
+ Writer.BeginObject();
+ {
+ Writer.AddHash("blockHash"sv, Block.BlockHash);
+ compactbinary_helpers::WriteArray(Block.ChunkRawHashes, "chunkRawHashes"sv, Writer);
+ Writer.AddInteger("headerSize"sv, Block.HeaderSize);
+ compactbinary_helpers::WriteArray(Block.ChunkRawLengths, "chunkRawLengths"sv, Writer);
+ compactbinary_helpers::WriteArray(Block.ChunkCompressedLengths, "chunkCompressedLengths"sv, Writer);
+ }
+ Writer.EndObject();
+ }
+ Writer.EndArray(); // blockDescriptions
+
+ Writer.BeginArray("downloadChunks"sv);
+ for (const Chunk& DownloadChunk : DownloadChunks)
+ {
+ Writer.BeginObject();
+ {
+ Writer.AddInteger("remoteChunkIndex"sv, DownloadChunk.RemoteChunkIndex);
+ }
+ Writer.EndObject();
+ }
+ Writer.EndArray(); // downloadChunks
+
+ Writer.BeginArray("downloadBlockRanges"sv);
+ for (const BlockRange& DownloadBlockRange : DownloadBlockRanges)
+ {
+ Writer.BeginObject();
+ {
+ Writer.AddInteger("blockIndex"sv, DownloadBlockRange.BlockIndex);
+ Writer.AddInteger("rangeStart"sv, DownloadBlockRange.RangeStart);
+ Writer.AddInteger("rangeLength"sv, DownloadBlockRange.RangeLength);
+ }
+ Writer.EndObject();
+ }
+ Writer.EndArray(); // downloadBlockRanges
+
+ Writer.BeginArray("downloadBlocks"sv);
+ for (const Block& DownloadBlock : DownloadBlocks)
+ {
+ Writer.BeginObject();
+ {
+ Writer.AddInteger("blockIndex"sv, DownloadBlock.BlockIndex);
+ }
+ Writer.EndObject();
+ }
+ Writer.EndArray(); // downloadBlocks
+
+ Writer.BeginArray("sequencesToWrite"sv);
+ for (const Sequence& SequenceToWrite : SequencesToWrite)
+ {
+ Writer.BeginObject();
+ {
+ Writer.AddInteger("remoteSequenceIndex"sv, SequenceToWrite.RemoteSequenceIndex);
+ }
+ Writer.EndObject();
+ }
+ Writer.EndArray(); // sequencesToWrite
+
+ CbObject PrepPayload = Writer.Save();
+
+ BinaryWriter PayloadBuffer;
+ PrepPayload.CopyTo(PayloadBuffer);
+
+ uint32_t PrepPayloadSize = gsl::narrow<uint32_t>(PayloadBuffer.GetSize());
+
+ StreamHeader Header = {.StreamType = StreamHeader::UpdateFolderStreamType, .PrepPayloadSize = PrepPayloadSize};
+
+ Header.Checksum = StreamHeader::ComputeChecksum(Header);
+
+ m_Output.Write(&Header, sizeof(StreamHeader), 0);
+
+ m_WriteOffset = sizeof(StreamHeader);
+
+ m_Output.Write(PayloadBuffer.GetView(), m_WriteOffset.load());
+
+ m_WriteOffset += RoundUp(PrepPayloadSize, 8);
+
+ m_Output.Flush();
+
+ m_Stopwatch.Reset();
+}
+
+void
+UpdateFolderDiskStream::EndWrite()
+{
+ RwLock::ExclusiveLockScope _(m_UpdateLock);
+ m_Output.Close();
+}
+
+void
+UpdateFolderDiskStream::Read(const std::filesystem::path& InputPath,
+ ChunkedFolderContent& OutRemoteContent,
+ std::vector<ChunkBlockDescription>& OutBlockDescriptions,
+ std::vector<Chunk>& OutDownloadChunks,
+ std::vector<BlockRange>& OutDownloadBlockRanges,
+ std::vector<Block>& OutDownloadBlocks,
+ std::vector<Sequence>& OutSequencesToWrite,
+ std::vector<Event>& OutEvents)
+{
+ BasicFile InputFile(InputPath, BasicFile::Mode::kRead);
+
+ uint64_t Size = InputFile.FileSize();
+ if (Size < sizeof(StreamHeader))
+ {
+ throw std::runtime_error(fmt::format("Expected size >= {}, file has size {}", sizeof(StreamHeader), Size));
+ }
+
+ uint64_t Offset = 0;
+ StreamHeader Header;
+ InputFile.Read(&Header, sizeof(StreamHeader), Offset);
+ Offset += sizeof(Header);
+
+ if (Header.Magic != StreamHeader::ExpectedMagic)
+ {
+ throw std::runtime_error(
+ fmt::format("Expected magic 0x{:04x}, file has magic 0x{:04x}", StreamHeader::ExpectedMagic, Header.Magic));
+ }
+ if (Header.Version != StreamHeader::CurrentVersion)
+ {
+ throw std::runtime_error(fmt::format("Expected version {}, file has version {}", StreamHeader::CurrentVersion, Header.Version));
+ }
+ if (Header.StreamType != StreamHeader::UpdateFolderStreamType)
+ {
+ throw std::runtime_error(
+ fmt::format("Expected stream type {}, file has stream type {}", StreamHeader::UpdateFolderStreamType, Header.StreamType));
+ }
+ if (Header.Checksum != StreamHeader::ComputeChecksum(Header))
+ {
+ throw std::runtime_error(
+ fmt::format("Expected checksum 0x{:04x}, file has checksum 0x{:04x}", Header.Checksum, StreamHeader::ComputeChecksum(Header)));
+ }
+
+ uint64_t ExpectedMinSize = sizeof(StreamHeader) + RoundUp(Header.PrepPayloadSize, 8);
+
+ if (Size < ExpectedMinSize)
+ {
+ throw std::runtime_error(fmt::format("Expected size >= {}, file has size {}", ExpectedMinSize, Size));
+ }
+
+ IoBuffer PrepPayloadHeader(Header.PrepPayloadSize);
+ InputFile.Read(PrepPayloadHeader.MutableData(), Header.PrepPayloadSize, Offset);
+ CbValidateError ValidateError = CbValidateError::None;
+ CbObject PrepPayload = ValidateAndReadCompactBinaryObject(std::move(PrepPayloadHeader), ValidateError);
+ if (ValidateError != CbValidateError::None)
+ {
+ throw std::runtime_error(fmt::format("Initial setup payload is malformed. Reason: '{}'", ToString(ValidateError)));
+ }
+
+ if (CbObjectView RemoteContentView = PrepPayload["remoteContent"sv].AsObjectView(); RemoteContentView)
+ {
+ OutRemoteContent.Paths = compactbinary_helpers::ReadArray<std::filesystem::path>("paths"sv, RemoteContentView);
+ OutRemoteContent.RawSizes = compactbinary_helpers::ReadArray<uint64_t>("rawSizes"sv, RemoteContentView);
+ OutRemoteContent.Attributes = compactbinary_helpers::ReadArray<uint32_t>("attributes"sv, RemoteContentView);
+ OutRemoteContent.RawHashes = compactbinary_helpers::ReadArray<IoHash>("rawHashes"sv, RemoteContentView);
+ if (CbObjectView ChunkedContentView = RemoteContentView["chunkedContent"sv].AsObjectView(); ChunkedContentView)
+ {
+ OutRemoteContent.ChunkedContent.SequenceRawHashes =
+ compactbinary_helpers::ReadArray<IoHash>("sequenceRawHashes"sv, ChunkedContentView);
+ OutRemoteContent.ChunkedContent.ChunkCounts = compactbinary_helpers::ReadArray<uint32_t>("chunkCounts"sv, ChunkedContentView);
+ OutRemoteContent.ChunkedContent.ChunkOrders = compactbinary_helpers::ReadArray<uint32_t>("chunkOrders"sv, ChunkedContentView);
+ OutRemoteContent.ChunkedContent.ChunkHashes = compactbinary_helpers::ReadArray<IoHash>("chunkHashes"sv, ChunkedContentView);
+ OutRemoteContent.ChunkedContent.ChunkRawSizes =
+ compactbinary_helpers::ReadArray<uint64_t>("chunkRawSizes"sv, ChunkedContentView);
+ }
+ }
+
+ if (CbArrayView BlockDescriptionsView = PrepPayload["blockDescriptions"sv].AsArrayView(); BlockDescriptionsView)
+ {
+ OutBlockDescriptions.reserve(BlockDescriptionsView.Num());
+ for (CbFieldView BlockDescriptionFieldView : BlockDescriptionsView)
+ {
+ CbObjectView BlockDescriptionView = BlockDescriptionFieldView.AsObjectView();
+ OutBlockDescriptions.push_back(ChunkBlockDescription{
+ ThinChunkBlockDescription{
+ .BlockHash = BlockDescriptionView["blockHash"sv].AsHash(),
+ .ChunkRawHashes = compactbinary_helpers::ReadArray<IoHash>("chunkRawHashes"sv, BlockDescriptionView)},
+ BlockDescriptionView["headerSize"sv].AsUInt64(),
+ compactbinary_helpers::ReadArray<uint32_t>("chunkRawLengths"sv, BlockDescriptionView),
+ compactbinary_helpers::ReadArray<uint32_t>("chunkCompressedLengths"sv, BlockDescriptionView)});
+ }
+ }
+
+ if (CbArrayView DownloadChunksView = PrepPayload["downloadChunks"sv].AsArrayView(); DownloadChunksView)
+ {
+ OutDownloadChunks.reserve(DownloadChunksView.Num());
+ for (CbFieldView DownloadChunkFieldView : DownloadChunksView)
+ {
+ CbObjectView DownloadChunkView = DownloadChunkFieldView.AsObjectView();
+ OutDownloadChunks.push_back(Chunk{.RemoteChunkIndex = DownloadChunkView["headerSize"sv].AsUInt32()});
+ }
+ }
+
+ if (CbArrayView DownloadBlockRangesView = PrepPayload["downloadBlockRanges"sv].AsArrayView(); DownloadBlockRangesView)
+ {
+ OutDownloadBlockRanges.reserve(DownloadBlockRangesView.Num());
+ for (CbFieldView DownloadBlockRangeFieldView : DownloadBlockRangesView)
+ {
+ CbObjectView DownloadBlockRangeView = DownloadBlockRangeFieldView.AsObjectView();
+ OutDownloadBlockRanges.push_back(BlockRange{.BlockIndex = DownloadBlockRangeView["blockIndex"sv].AsUInt32(),
+ .RangeStart = DownloadBlockRangeView["rangeStart"sv].AsUInt32(),
+ .RangeLength = DownloadBlockRangeView["rangeLength"sv].AsUInt32()});
+ }
+ }
+
+ if (CbArrayView DownloadBlocksView = PrepPayload["downloadBlocks"sv].AsArrayView(); DownloadBlocksView)
+ {
+ OutDownloadBlocks.reserve(DownloadBlocksView.Num());
+ for (CbFieldView DownloadBlockFieldView : DownloadBlocksView)
+ {
+ CbObjectView DownloadBlockView = DownloadBlockFieldView.AsObjectView();
+ OutDownloadBlocks.push_back(Block{.BlockIndex = DownloadBlockView["blockIndex"sv].AsUInt32()});
+ }
+ }
+
+ if (CbArrayView SequencesToWriteView = PrepPayload["sequencesToWrite"sv].AsArrayView(); SequencesToWriteView)
+ {
+ OutSequencesToWrite.reserve(SequencesToWriteView.Num());
+ for (CbFieldView SequenceToWriteFieldView : SequencesToWriteView)
+ {
+ CbObjectView SequenceToWriteView = SequenceToWriteFieldView.AsObjectView();
+ OutSequencesToWrite.push_back(Sequence{.RemoteSequenceIndex = SequenceToWriteView["remoteSequenceIndex"sv].AsUInt32()});
+ }
+ }
+
+ Offset += RoundUp(Header.PrepPayloadSize, 8);
+
+ uint64_t EntryCount = (Size - Offset) / sizeof(Event);
+ uint64_t ExpectedSize = Offset + EntryCount * sizeof(Event);
+ if (ExpectedSize != Size)
+ {
+ throw std::runtime_error(fmt::format("Expected size {}, file has size {}", ExpectedSize, Size));
+ }
+
+ OutEvents.resize(EntryCount);
+
+ InputFile.Read(OutEvents.data(), EntryCount * sizeof(Event), Offset);
+}
+
+void
+UpdateFolderDiskStream::Replay(const std::filesystem::path& InputPath, UpdateFolderStream& UpdateFolderStream)
+{
+ ZEN_UNUSED(UpdateFolderStream);
+
+ ChunkedFolderContent RemoteContent;
+ std::vector<ChunkBlockDescription> BlockDescriptions;
+ std::vector<Chunk> DownloadChunks;
+ std::vector<BlockRange> DownloadBlockRanges;
+ std::vector<Block> DownloadBlocks;
+ std::vector<Sequence> SequencesToWrite;
+ std::vector<Event> Events;
+
+ Read(InputPath, RemoteContent, BlockDescriptions, DownloadChunks, DownloadBlockRanges, DownloadBlocks, SequencesToWrite, Events);
+
+ UpdateFolderStream.Initialize(RemoteContent, BlockDescriptions, DownloadChunks, DownloadBlockRanges, DownloadBlocks, SequencesToWrite);
+
+ for (const Event& E : Events)
+ {
+ switch (E.Header.EventType)
+ {
+ case Event::EventType::DownloadedChunk:
+ UpdateFolderStream.DownloadedChunk(E.Payload.DownloadedChunk.RemoteChunkIndex);
+ break;
+ case Event::EventType::DownloadedBlockRange:
+ UpdateFolderStream.DownloadedBlockRange(E.Payload.DownloadedBlockRange.BlockIndex,
+ E.Payload.DownloadedBlockRange.RangeStart,
+ E.Payload.DownloadedBlockRange.RangeLength);
+ break;
+ case Event::EventType::DownloadedBlock:
+ UpdateFolderStream.DownloadedBlock(E.Payload.DownloadedBlock.BlockIndex);
+ break;
+
+ case Event::EventType::WroteSequence:
+ UpdateFolderStream.WroteSequence(E.Payload.WroteSequence.RemoteSequenceIndex,
+ E.Payload.WroteSequence.Offset,
+ E.Payload.WroteSequence.Length);
+ break;
+
+ case Event::EventType::CompletedBlockRange:
+ UpdateFolderStream.CompletedBlockRange(E.Payload.CompletedBlockRange.BlockIndex,
+ E.Payload.CompletedBlockRange.RangeStart,
+ E.Payload.CompletedBlockRange.RangeLength);
+ break;
+ case Event::EventType::CompletedBlock:
+ UpdateFolderStream.CompletedBlock(E.Payload.CompletedBlock.BlockIndex);
+ break;
+
+ case Event::EventType::CompletedSequence:
+ UpdateFolderStream.CompletedSequence(E.Payload.CompletedSequence.RemoteSequenceIndex);
+ break;
+
+ case Event::EventType::WriteComplete:
+ UpdateFolderStream.WriteComplete();
+ break;
+
+ default:
+ throw std::runtime_error(std::format("Unknown event type: {}", uint8_t(E.Header.EventType)));
+ }
+ }
+}
+
+uint16_t
+UpdateFolderDiskStream::GetThreadIndex(int ThreadId)
+{
+ {
+ RwLock::SharedLockScope _(m_UpdateLock);
+ if (auto It = m_ThreadLookup.find(ThreadId); It != m_ThreadLookup.end())
+ {
+ return It->second;
+ }
+ }
+ RwLock::ExclusiveLockScope _(m_UpdateLock);
+ if (auto It = m_ThreadLookup.find(ThreadId); It != m_ThreadLookup.end())
+ {
+ return It->second;
+ }
+ uint16_t ThreadIndex = gsl::narrow<uint16_t>(m_ThreadLookup.size());
+ m_ThreadLookup.insert({ThreadId, ThreadIndex});
+ return ThreadIndex;
+}
+
+uint16_t
+UpdateFolderDiskStream::GetCurrentThreadIndex()
+{
+ return GetThreadIndex(GetCurrentThreadId());
+}
+
+uint64_t
+ChunkSequenceStatus::CompletedSize() const
+{
+ return std::accumulate(CompletedRanges.begin(), CompletedRanges.end(), uint64_t(0), [](uint64_t Current, const Range& Val) {
+ return Current + Val.Length;
+ });
+}
+
+void
+ChunkSequenceStatus::AddCompletedRange(const Range& InRange)
+{
+ ZEN_ASSERT(InRange.Offset + InRange.Length <= Size);
+#ifndef NDEBUG
+ uint64_t PreviousCompletedSize = CompletedSize();
+ for (const Range& R : CompletedRanges)
+ {
+ ZEN_ASSERT(InRange.Offset + InRange.Length <= R.Offset || InRange.Offset >= R.Offset + R.Length);
+ }
+#endif // NDEBUG
+ if (CompletedRanges.empty())
+ {
+ CompletedRanges.push_back(InRange);
+ }
+ else
+ {
+ auto InsertIt =
+ std::upper_bound(CompletedRanges.begin(), CompletedRanges.end(), InRange.Offset, [](uint64_t InRangeOffset, const Range& Val) {
+ return InRangeOffset < Val.Offset;
+ });
+ if (InsertIt != CompletedRanges.begin())
+ {
+ // Extend previous?
+ Range& PreviousRange = *(InsertIt - 1);
+ ZEN_ASSERT(PreviousRange.Offset + PreviousRange.Length <= InRange.Offset);
+ if (PreviousRange.Offset + PreviousRange.Length == InRange.Offset)
+ {
+ PreviousRange.Length += InRange.Length;
+ if (InsertIt != CompletedRanges.end())
+ {
+ // Extend next?
+ Range& NextRange = *InsertIt;
+ if (NextRange.Offset == PreviousRange.Offset + PreviousRange.Length)
+ {
+ PreviousRange.Length += NextRange.Length;
+ CompletedRanges.erase(InsertIt);
+ }
+ }
+ }
+ else if (InsertIt != CompletedRanges.end())
+ {
+ // Extend next?
+ Range& NextRange = *InsertIt;
+ if (NextRange.Offset == InRange.Offset + InRange.Length)
+ {
+ NextRange.Offset = InRange.Offset;
+ NextRange.Length += InRange.Length;
+ }
+ else
+ {
+ CompletedRanges.insert(InsertIt, InRange);
+ }
+ }
+ else
+ {
+ CompletedRanges.push_back(InRange);
+ }
+ }
+ else if (InsertIt != CompletedRanges.end())
+ {
+ // Extend next?
+ Range& NextRange = *InsertIt;
+ ZEN_ASSERT(InRange.Offset + InRange.Length <= NextRange.Offset);
+ if (NextRange.Offset == InRange.Offset + InRange.Length)
+ {
+ NextRange.Offset = InRange.Offset;
+ NextRange.Length += InRange.Length;
+ }
+ else
+ {
+ CompletedRanges.insert(InsertIt, InRange);
+ }
+ }
+ else
+ {
+ CompletedRanges.push_back(InRange);
+ }
+ }
+#ifndef NDEBUG
+ uint64_t NewCompletedSize = CompletedSize();
+ ZEN_ASSERT(PreviousCompletedSize + InRange.Length == NewCompletedSize);
+ ZEN_ASSERT(CompletedRanges.back().Offset + CompletedRanges.back().Length <= Size);
+ for (size_t Index = 1; Index < CompletedRanges.size(); Index++)
+ {
+ ZEN_ASSERT(CompletedRanges[Index - 1].Offset + CompletedRanges[Index - 1].Length < CompletedRanges[Index].Offset);
+ }
+#endif // NDEBUG
+}
+
+void
+ChunkSequenceStatus::RemoveCompletedRange(const Range& InRange)
+{
+ // TODO
+ ZEN_UNUSED(InRange);
+ ZEN_ASSERT(!CompletedRanges.empty());
+}
+
+} // namespace zen
diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
index 6304159ae..fe69a624c 100644
--- a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
+++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
@@ -35,6 +35,8 @@ class BufferedWriteFileCache;
struct ChunkBlockDescription;
struct ChunkedFolderContent;
+class UpdateFolderStream;
+
struct DiskStatistics
{
std::atomic<uint64_t> OpenReadCount = 0;
@@ -162,7 +164,7 @@ public:
const std::vector<IoHash>& LooseChunkHashes,
const Options& Options);
- void Execute(FolderContent& OutLocalFolderState);
+ void Execute(FolderContent& OutLocalFolderState, UpdateFolderStream* OptionalUpdateFolderStream = nullptr);
DiskStatistics m_DiskStats;
CacheMappingStatistics m_CacheMappingStats;
@@ -292,7 +294,8 @@ private:
uint64_t TotalRequestCount,
uint64_t TotalPartWriteCount,
FilteredRate& FilteredDownloadedBytesPerSecond,
- FilteredRate& FilteredWrittenBytesPerSecond);
+ FilteredRate& FilteredWrittenBytesPerSecond,
+ UpdateFolderStream* OptionalUpdateFolderStream);
void DownloadBuildBlob(uint32_t RemoteChunkIndex,
const BlobsExistsResult& ExistsResult,
@@ -324,14 +327,16 @@ private:
const std::vector<ChunkedFolderContent>& ScavengedContents,
const std::vector<ChunkedContentLookup>& ScavengedLookups,
const std::vector<std::filesystem::path>& ScavengedPaths,
- BufferedWriteFileCache& WriteCache);
+ BufferedWriteFileCache& WriteCache,
+ UpdateFolderStream* OptionalUpdateFolderStream);
- bool WriteCompressedChunkToCache(const IoHash& ChunkHash,
+ bool WriteCompressedChunkToCache(const uint32_t RemoteChunkIndex,
const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs,
BufferedWriteFileCache& WriteCache,
- IoBuffer&& CompressedPart);
+ IoBuffer&& CompressedPart,
+ UpdateFolderStream* OptionalUpdateFolderStream);
- void StreamDecompress(const IoHash& SequenceRawHash, CompositeBuffer&& CompressedPart);
+ void StreamDecompress(uint32_t RemoteSequenceIndex, CompositeBuffer&& CompressedPart, UpdateFolderStream* OptionalUpdateFolderStream);
void WriteSequenceChunkToCache(BufferedWriteFileCache::Local& LocalWriter,
const CompositeBuffer& Chunk,
@@ -351,14 +356,16 @@ private:
void WriteBlockChunkOpsToCache(std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
const BlockWriteOps& Ops,
BufferedWriteFileCache& WriteCache,
- ParallelWork& Work);
+ ParallelWork& Work,
+ UpdateFolderStream* OptionalUpdateFolderStream);
bool WriteChunksBlockToCache(const ChunkBlockDescription& BlockDescription,
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
ParallelWork& Work,
CompositeBuffer&& BlockBuffer,
std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
- BufferedWriteFileCache& WriteCache);
+ BufferedWriteFileCache& WriteCache,
+ UpdateFolderStream* OptionalUpdateFolderStream);
bool WritePartialBlockChunksToCache(const ChunkBlockDescription& BlockDescription,
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
@@ -367,7 +374,8 @@ private:
uint32_t FirstIncludedBlockChunkIndex,
uint32_t LastIncludedBlockChunkIndex,
std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
- BufferedWriteFileCache& WriteCache);
+ BufferedWriteFileCache& WriteCache,
+ UpdateFolderStream* OptionalUpdateFolderStream);
void AsyncWriteDownloadedChunk(const std::filesystem::path& ZenFolderPath,
uint32_t RemoteChunkIndex,
@@ -379,15 +387,18 @@ private:
std::atomic<uint64_t>& WritePartsComplete,
const uint64_t TotalPartWriteCount,
FilteredRate& FilteredWrittenBytesPerSecond,
- bool EnableBacklog);
+ bool EnableBacklog,
+ UpdateFolderStream* OptionalUpdateFolderStream);
- void VerifyAndCompleteChunkSequencesAsync(std::span<const uint32_t> RemoteSequenceIndexes, ParallelWork& Work);
+ void VerifyAndCompleteChunkSequencesAsync(std::span<const uint32_t> RemoteSequenceIndexes,
+ ParallelWork& Work,
+ UpdateFolderStream* OptionalUpdateFolderStream);
bool CompleteSequenceChunk(uint32_t RemoteSequenceIndex, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters);
std::vector<uint32_t> CompleteChunkTargets(const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs,
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters);
- void FinalizeChunkSequence(const IoHash& SequenceRawHash);
- void FinalizeChunkSequences(std::span<const uint32_t> RemoteSequenceIndexes);
- void VerifySequence(uint32_t RemoteSequenceIndex);
+ void FinalizeChunkSequence(uint32_t RemoteSequenceIndex, UpdateFolderStream* OptionalUpdateFolderStream);
+ void FinalizeChunkSequences(std::span<const uint32_t> RemoteSequenceIndexes, UpdateFolderStream* OptionalUpdateFolderStream);
+ void VerifySequence(uint32_t RemoteSequenceIndex);
OperationLogOutput& m_LogOutput;
StorageInstance& m_Storage;
diff --git a/src/zenremotestore/include/zenremotestore/builds/updatefolderstream.h b/src/zenremotestore/include/zenremotestore/builds/updatefolderstream.h
new file mode 100644
index 000000000..6fe2188cd
--- /dev/null
+++ b/src/zenremotestore/include/zenremotestore/builds/updatefolderstream.h
@@ -0,0 +1,226 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/basicfile.h>
+#include <zencore/timer.h>
+
+#include <zenremotestore/chunking/chunkblock.h>
+#include <zenremotestore/chunking/chunkedcontent.h>
+
+namespace zen {
+
+class UpdateFolderStream
+{
+public:
+ virtual ~UpdateFolderStream() {}
+
+ struct Sequence
+ {
+ uint32_t RemoteSequenceIndex;
+ };
+ struct Chunk
+ {
+ uint32_t RemoteChunkIndex;
+ };
+ struct Block
+ {
+ uint32_t BlockIndex;
+ };
+ struct BlockRange
+ {
+ uint32_t BlockIndex;
+ uint32_t RangeStart;
+ uint32_t RangeLength;
+ };
+
+ virtual void Initialize(const ChunkedFolderContent& RemoteContent,
+ std::span<const ChunkBlockDescription> BlockDescriptions,
+ std::span<const Chunk> DownloadChunks,
+ std::span<const BlockRange> DownloadBlockRanges,
+ std::span<const Block> DownloadBlocks,
+ std::span<const Sequence> SequencesToWrite) = 0;
+
+ virtual void DownloadedChunk(uint32_t RemoteChunkIndex) = 0;
+ virtual void DownloadedBlockRange(uint32_t BlockIndex, uint32_t RangeStart, uint32_t RangeLength) = 0;
+ virtual void DownloadedBlock(uint32_t BlockIndex) = 0;
+
+ virtual void WroteSequence(uint32_t RemoteSequenceIndex, uint64_t Offset, uint64_t Length) = 0;
+
+ virtual void CompletedBlockRange(uint32_t BlockIndex, uint32_t RangeStart, uint32_t RangeLength) = 0;
+ virtual void CompletedBlock(uint32_t BlockIndex) = 0;
+
+ virtual void CompletedSequence(uint32_t RemoteSequenceIndex) = 0;
+
+ virtual void WriteComplete() = 0;
+};
+
+std::unique_ptr<UpdateFolderStream> CreateDiskUpdateFolderStream(const std::filesystem::path& OutputPath);
+
+struct ChunkSequenceStatus
+{
+ struct Range
+ {
+ uint64_t Offset = 0;
+ uint64_t Length = 0;
+ };
+ std::vector<Range> CompletedRanges;
+ uint64_t Size = 0;
+ uint32_t NormalizedSize = 0;
+ bool Completed = false;
+
+ uint64_t CompletedSize() const;
+
+ void AddCompletedRange(const Range& InRange);
+
+ void RemoveCompletedRange(const Range& InRange);
+};
+
+class UpdateFolderDiskStream
+{
+ using Sequence = UpdateFolderStream::Sequence;
+ using Chunk = UpdateFolderStream::Chunk;
+ using Block = UpdateFolderStream::Block;
+ using BlockRange = UpdateFolderStream::BlockRange;
+
+public:
+#pragma pack(push)
+#pragma pack(4)
+ struct Event
+ {
+ enum class EventType : uint8_t
+ {
+ DownloadedChunk,
+ DownloadedBlockRange,
+ DownloadedBlock,
+
+ WroteSequence,
+
+ CompletedBlockRange,
+ CompletedBlock,
+
+ CompletedSequence,
+
+ WriteComplete
+ };
+
+ struct DownloadedChunkEvent
+ {
+ uint32_t RemoteChunkIndex;
+ };
+
+ struct DownloadedBlockRangeEvent
+ {
+ uint32_t BlockIndex;
+ uint32_t RangeStart;
+ uint32_t RangeLength;
+ };
+
+ struct DownloadedBlockEvent
+ {
+ uint32_t BlockIndex;
+ };
+
+ struct WroteSequenceEvent
+ {
+ uint32_t RemoteSequenceIndex;
+ uint64_t Offset;
+ uint64_t Length;
+ };
+
+ struct CompletedBlockRangeEvent
+ {
+ uint32_t BlockIndex;
+ uint32_t RangeStart;
+ uint32_t RangeLength;
+ };
+
+ struct CompletedBlockEvent
+ {
+ uint32_t BlockIndex;
+ };
+
+ struct CompletedSequenceEvent
+ {
+ uint32_t RemoteSequenceIndex;
+ };
+
+#pragma pack(push)
+#pragma pack(1)
+ struct EventHeader
+ {
+ EventType EventType;
+ uint8_t TimestampUsHigh;
+ uint32_t TimestampUsLow;
+ uint16_t ThreadIndex;
+
+ inline uint64_t TimestampUs() const { return (((uint64_t)TimestampUsHigh) << 32) | TimestampUsLow; }
+ };
+#pragma pack(pop)
+
+ EventHeader Header;
+
+ static EventHeader MakeHeader(Event::EventType Type, uint64_t TimeStampUs, uint16_t ThreadIndex);
+
+ union EventPayload
+ {
+ DownloadedChunkEvent DownloadedChunk;
+ DownloadedBlockRangeEvent DownloadedBlockRange;
+ DownloadedBlockEvent DownloadedBlock;
+
+ WroteSequenceEvent WroteSequence;
+
+ CompletedBlockRangeEvent CompletedBlockRange;
+ CompletedBlockEvent CompletedBlock;
+ CompletedSequenceEvent CompletedSequence;
+ };
+ EventPayload Payload;
+ };
+#pragma pack(pop)
+ static_assert(sizeof(Event) == 28u);
+
+ UpdateFolderDiskStream(const std::filesystem::path& OutputPath);
+
+ virtual ~UpdateFolderDiskStream();
+
+ Event MakeEvent(Event::EventType Type, Event::EventPayload Payload);
+
+ void WriteEvent(const Event& InEvent);
+
+ void PrepareWrite(const ChunkedFolderContent& RemoteContent,
+ std::span<const ChunkBlockDescription> BlockDescriptions,
+ std::span<const Chunk> DownloadChunks,
+ std::span<const BlockRange> DownloadBlockRanges,
+ std::span<const Block> DownloadBlocks,
+ std::span<const Sequence> SequencesToWrite);
+
+ void EndWrite();
+
+ static void Read(const std::filesystem::path& InputPath,
+ ChunkedFolderContent& OutRemoteContent,
+ std::vector<ChunkBlockDescription>& OutBlockDescriptions,
+ std::vector<Chunk>& OutDownloadChunks,
+ std::vector<BlockRange>& OutDownloadBlockRanges,
+ std::vector<Block>& OutDownloadBlocks,
+ std::vector<Sequence>& OutSequencesToWrite,
+ std::vector<Event>& OutEvents);
+
+ static void Replay(const std::filesystem::path& InputPath, UpdateFolderStream& UpdateFolderStream);
+
+private:
+ uint16_t GetThreadIndex(int ThreadId);
+ uint16_t GetCurrentThreadIndex();
+
+ tsl::robin_map<int, uint16_t> m_ThreadLookup;
+
+ RwLock m_UpdateLock;
+
+ const std::filesystem::path m_OutputPath;
+ BasicFile m_Output;
+ std::atomic<uint64_t> m_WriteOffset;
+ std::atomic<uint64_t> m_EventCount;
+
+ Stopwatch m_Stopwatch;
+};
+
+} // namespace zen