diff options
| author | Dan Engelbrecht <[email protected]> | 2026-02-04 11:39:28 +0100 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2026-02-10 10:12:22 +0100 |
| commit | 401bb08cc57593b8994962940b10e2e552c69a78 (patch) | |
| tree | f4d84278e6c34fece90cf9f7393159b3d9930541 /src | |
| parent | add imgui dependency (diff) | |
| download | zen-401bb08cc57593b8994962940b10e2e552c69a78.tar.xz zen-401bb08cc57593b8994962940b10e2e552c69a78.zip | |
callback interface for tracking progress of builds download
Diffstat (limited to 'src')
5 files changed, 1190 insertions, 142 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index f4edb65ab..2e621608e 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -6,6 +6,7 @@ #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinaryfile.h> #include <zencore/compactbinaryfmt.h> +#include <zencore/compactbinaryutil.h> #include <zencore/compactbinaryvalue.h> #include <zencore/compress.h> #include <zencore/except.h> @@ -31,6 +32,7 @@ #include <zenremotestore/builds/buildstorageutil.h> #include <zenremotestore/builds/filebuildstorage.h> #include <zenremotestore/builds/jupiterbuildstorage.h> +#include <zenremotestore/builds/updatefolderstream.h> #include <zenremotestore/chunking/chunkblock.h> #include <zenremotestore/chunking/chunkedcontent.h> #include <zenremotestore/chunking/chunkedfile.h> @@ -1507,9 +1509,12 @@ namespace { .PopulateCache = Options.PopulateCache}); { ProgressBar::PushLogOperation(ProgressMode, "Download"); - auto _ = MakeGuard([]() { ProgressBar::PopLogOperation(ProgressMode); }); + auto _ = MakeGuard([]() { ProgressBar::PopLogOperation(ProgressMode); }); + + std::unique_ptr<UpdateFolderStream> Feedback = CreateDiskUpdateFolderStream("D:\\Temp\\download_log.zrb"); + FolderContent UpdatedLocalFolderState; - Updater.Execute(UpdatedLocalFolderState); + Updater.Execute(UpdatedLocalFolderState, Feedback.get()); LocalState.State.ChunkedContent = RemoteContent; LocalState.FolderState = std::move(UpdatedLocalFolderState); diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp index 2319ad66d..52062f6ff 100644 --- a/src/zenremotestore/builds/buildstorageoperations.cpp +++ b/src/zenremotestore/builds/buildstorageoperations.cpp @@ -8,6 +8,7 @@ #include <zenremotestore/builds/buildstorage.h> #include <zenremotestore/builds/buildstoragecache.h> #include <zenremotestore/builds/buildstorageutil.h> +#include <zenremotestore/builds/updatefolderstream.h> #include <zenremotestore/chunking/chunkblock.h> #include <zenremotestore/chunking/chunkingcache.h> #include <zenremotestore/chunking/chunkingcontroller.h> @@ -552,8 +553,10 @@ BuildsOperationUpdateFolder::BuildsOperationUpdateFolder(OperationLogOutput& } void -BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) +BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateFolderStream* OptionalUpdateFolderStream) { + ZEN_UNUSED(OptionalUpdateFolderStream); + ZEN_TRACE_CPU("BuildsOperationUpdateFolder::Execute"); try { @@ -1244,6 +1247,55 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) } } + if (OptionalUpdateFolderStream) + { + std::vector<UpdateFolderStream::Chunk> DownloadChunks; + DownloadChunks.reserve(LooseChunkHashWorks.size()); + for (const LooseChunkHashWorkData& LooseChunkHashWork : LooseChunkHashWorks) + { + DownloadChunks.push_back(UpdateFolderStream::Chunk{.RemoteChunkIndex = LooseChunkHashWork.RemoteChunkIndex}); + } + std::vector<UpdateFolderStream::BlockRange> DownloadBlockRanges; + DownloadBlockRanges.reserve(BlockRangeWorks.size()); + for (const BlockRangeDescriptor& BlockRange : BlockRangeWorks) + { + DownloadBlockRanges.push_back( + UpdateFolderStream::BlockRange{.BlockIndex = BlockRange.BlockIndex, + .RangeStart = gsl::narrow<uint32_t>(BlockRange.RangeStart), + .RangeLength = gsl::narrow<uint32_t>(BlockRange.RangeLength)}); + } + + std::vector<UpdateFolderStream::Block> DownloadBlocks; + for (uint32_t BlockIndex : FullBlockWorks) + { + DownloadBlocks.push_back(UpdateFolderStream::Block{.BlockIndex = BlockIndex}); + } + std::vector<UpdateFolderStream::Sequence> SequencesToWrite; + for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < SequenceIndexChunksLeftToWriteCounters.size(); + RemoteSequenceIndex++) + { + if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() > 0) + { + SequencesToWrite.push_back(UpdateFolderStream::Sequence{.RemoteSequenceIndex = RemoteSequenceIndex}); + } + } + for (const ScavengedSequenceCopyOperation& ScaveneCopyOp : ScavengedSequenceCopyOperations) + { + SequencesToWrite.push_back(UpdateFolderStream::Sequence{ScaveneCopyOp.RemoteSequenceIndex}); + } + std::sort(SequencesToWrite.begin(), + SequencesToWrite.end(), + [](const UpdateFolderStream::Sequence& Lhs, const UpdateFolderStream::Sequence& Rhs) { + return Lhs.RemoteSequenceIndex < Rhs.RemoteSequenceIndex; + }); + OptionalUpdateFolderStream->Initialize(m_RemoteContent, + m_BlockDescriptions, + DownloadChunks, + DownloadBlockRanges, + DownloadBlocks, + SequencesToWrite); + } + BufferedWriteFileCache WriteCache; for (uint32_t ScavengeOpIndex = 0; ScavengeOpIndex < ScavengedSequenceCopyOperations.size(); ScavengeOpIndex++) @@ -1263,7 +1315,8 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) &FilteredWrittenBytesPerSecond, ScavengeOpIndex, &WritePartsComplete, - TotalPartWriteCount](std::atomic<bool>&) mutable { + TotalPartWriteCount, + OptionalUpdateFolderStream](std::atomic<bool>&) mutable { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_WriteScavenged"); @@ -1276,6 +1329,12 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) WriteScavengedSequenceToCache(ScavengeRootPath, ScavengedContent, ScavengeOp); + if (OptionalUpdateFolderStream) + { + OptionalUpdateFolderStream->WroteSequence(ScavengeOp.RemoteSequenceIndex, 0, ScavengeOp.RawSize); + OptionalUpdateFolderStream->CompletedSequence(ScavengeOp.RemoteSequenceIndex); + } + WritePartsComplete++; if (WritePartsComplete == TotalPartWriteCount) { @@ -1316,7 +1375,8 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) TotalPartWriteCount, &WriteCache, &FilteredDownloadedBytesPerSecond, - &FilteredWrittenBytesPerSecond](std::atomic<bool>&) mutable { + &FilteredWrittenBytesPerSecond, + OptionalUpdateFolderStream](std::atomic<bool>&) mutable { ZEN_TRACE_CPU("Async_ReadPreDownloadedChunk"); if (!m_AbortFlag) { @@ -1332,7 +1392,8 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) TotalRequestCount, TotalPartWriteCount, FilteredDownloadedBytesPerSecond, - FilteredWrittenBytesPerSecond); + FilteredWrittenBytesPerSecond, + OptionalUpdateFolderStream); } }, WorkerThreadPool::EMode::EnableBacklog); @@ -1352,55 +1413,59 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) break; } - Work.ScheduleWork(m_IOWorkerPool, - [this, - &CloneQuery, - &SequenceIndexChunksLeftToWriteCounters, - &WriteCache, - &Work, - &FilteredWrittenBytesPerSecond, - &CopyChunkDatas, - &ScavengedContents, - &ScavengedLookups, - &ScavengedPaths, - &WritePartsComplete, - TotalPartWriteCount, - CopyDataIndex](std::atomic<bool>&) { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Async_CopyLocal"); - - FilteredWrittenBytesPerSecond.Start(); - const CopyChunkData& CopyData = CopyChunkDatas[CopyDataIndex]; - - std::vector<uint32_t> WrittenSequenceIndexes = WriteLocalChunkToCache(CloneQuery.get(), - CopyData, - ScavengedContents, - ScavengedLookups, - ScavengedPaths, - WriteCache); - WritePartsComplete++; - if (!m_AbortFlag) - { - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); - } + Work.ScheduleWork( + m_IOWorkerPool, + [this, + &CloneQuery, + &SequenceIndexChunksLeftToWriteCounters, + &WriteCache, + &Work, + &FilteredWrittenBytesPerSecond, + &CopyChunkDatas, + &ScavengedContents, + &ScavengedLookups, + &ScavengedPaths, + &WritePartsComplete, + TotalPartWriteCount, + CopyDataIndex, + OptionalUpdateFolderStream](std::atomic<bool>&) { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_CopyLocal"); - // Write tracking, updating this must be done without any files open - std::vector<uint32_t> CompletedChunkSequences; - for (uint32_t RemoteSequenceIndex : WrittenSequenceIndexes) - { - if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) - { - CompletedChunkSequences.push_back(RemoteSequenceIndex); - } - } - WriteCache.Close(CompletedChunkSequences); - VerifyAndCompleteChunkSequencesAsync(CompletedChunkSequences, Work); - } - } - }); + FilteredWrittenBytesPerSecond.Start(); + const CopyChunkData& CopyData = CopyChunkDatas[CopyDataIndex]; + + std::vector<uint32_t> WrittenSequenceIndexes = WriteLocalChunkToCache(CloneQuery.get(), + CopyData, + ScavengedContents, + ScavengedLookups, + ScavengedPaths, + WriteCache, + OptionalUpdateFolderStream); + + WritePartsComplete++; + if (!m_AbortFlag) + { + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } + + // Write tracking, updating this must be done without any files open + std::vector<uint32_t> CompletedChunkSequences; + for (uint32_t RemoteSequenceIndex : WrittenSequenceIndexes) + { + if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) + { + CompletedChunkSequences.push_back(RemoteSequenceIndex); + } + } + WriteCache.Close(CompletedChunkSequences); + VerifyAndCompleteChunkSequencesAsync(CompletedChunkSequences, Work, OptionalUpdateFolderStream); + } + } + }); } for (uint32_t BlockIndex : CachedChunkBlockIndexes) @@ -1421,7 +1486,8 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) &FilteredWrittenBytesPerSecond, &WritePartsComplete, TotalPartWriteCount, - BlockIndex](std::atomic<bool>&) mutable { + BlockIndex, + OptionalUpdateFolderStream](std::atomic<bool>&) mutable { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_WriteCachedBlock"); @@ -1444,12 +1510,17 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) Work, CompositeBuffer(std::move(BlockBuffer)), RemoteChunkIndexNeedsCopyFromSourceFlags, - WriteCache)) + WriteCache, + OptionalUpdateFolderStream)) { std::error_code DummyEc; RemoveFile(BlockChunkPath, DummyEc); throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash)); } + if (OptionalUpdateFolderStream) + { + OptionalUpdateFolderStream->CompletedBlock(BlockIndex); + } std::error_code Ec = TryRemoveFile(BlockChunkPath); if (Ec) @@ -1493,7 +1564,8 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) &FilteredWrittenBytesPerSecond, &Work, &BlockRangeWorks, - BlockRangeIndex](std::atomic<bool>&) { + BlockRangeIndex, + OptionalUpdateFolderStream](std::atomic<bool>&) { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_GetPartialBlock"); @@ -1515,12 +1587,21 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) TotalPartWriteCount, &FilteredDownloadedBytesPerSecond, &FilteredWrittenBytesPerSecond, - &BlockRange](IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath) { + &BlockRange, + OptionalUpdateFolderStream](IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath) { if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) { FilteredDownloadedBytesPerSecond.Stop(); } + if (OptionalUpdateFolderStream) + { + const uint32_t BlockIndex = BlockRange.BlockIndex; + OptionalUpdateFolderStream->DownloadedBlockRange(BlockIndex, + gsl::narrow<uint32_t>(BlockRange.RangeStart), + gsl::narrow<uint32_t>(BlockRange.RangeLength)); + } + if (!m_AbortFlag) { Work.ScheduleWork( @@ -1535,7 +1616,8 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) &FilteredWrittenBytesPerSecond, &BlockRange, BlockChunkPath = std::filesystem::path(OnDiskPath), - BlockPartialBuffer = std::move(InMemoryBuffer)](std::atomic<bool>&) mutable { + BlockPartialBuffer = std::move(InMemoryBuffer), + OptionalUpdateFolderStream](std::atomic<bool>&) mutable { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_WritePartialBlock"); @@ -1563,6 +1645,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) FilteredWrittenBytesPerSecond.Start(); + // TODO: Add OptionalUpdateFolderStream calls for chunks / chunk sequences if (!WritePartialBlockChunksToCache( BlockDescription, SequenceIndexChunksLeftToWriteCounters, @@ -1571,7 +1654,8 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) BlockRange.ChunkBlockIndexStart, BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1, RemoteChunkIndexNeedsCopyFromSourceFlags, - WriteCache)) + WriteCache, + OptionalUpdateFolderStream)) { std::error_code DummyEc; RemoveFile(BlockChunkPath, DummyEc); @@ -1589,6 +1673,14 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) Ec.message()); } + if (OptionalUpdateFolderStream) + { + OptionalUpdateFolderStream->CompletedBlockRange( + BlockIndex, + gsl::narrow<uint32_t>(BlockRange.RangeStart), + gsl::narrow<uint32_t>(BlockRange.RangeLength)); + } + WritePartsComplete++; if (WritePartsComplete == TotalPartWriteCount) { @@ -1630,7 +1722,8 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) &SequenceIndexChunksLeftToWriteCounters, &FilteredDownloadedBytesPerSecond, TotalRequestCount, - BlockIndex](std::atomic<bool>&) { + BlockIndex, + OptionalUpdateFolderStream](std::atomic<bool>&) { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_GetFullBlock"); @@ -1661,6 +1754,12 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) { throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); } + + if (OptionalUpdateFolderStream) + { + OptionalUpdateFolderStream->DownloadedBlock(BlockIndex); + } + if (!m_AbortFlag) { uint64_t BlockSize = BlockBuffer.GetSize(); @@ -1727,7 +1826,8 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) TotalPartWriteCount, &FilteredWrittenBytesPerSecond, BlockChunkPath, - BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable { + BlockBuffer = std::move(BlockBuffer), + OptionalUpdateFolderStream](std::atomic<bool>&) mutable { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_WriteFullBlock"); @@ -1752,12 +1852,14 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) } FilteredWrittenBytesPerSecond.Start(); + if (!WriteChunksBlockToCache(BlockDescription, SequenceIndexChunksLeftToWriteCounters, Work, CompositeBuffer(std::move(BlockBuffer)), RemoteChunkIndexNeedsCopyFromSourceFlags, - WriteCache)) + WriteCache, + OptionalUpdateFolderStream)) { std::error_code DummyEc; RemoveFile(BlockChunkPath, DummyEc); @@ -1778,6 +1880,11 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) } } + if (OptionalUpdateFolderStream) + { + OptionalUpdateFolderStream->CompletedBlock(BlockIndex); + } + WritePartsComplete++; if (WritePartsComplete == TotalPartWriteCount) @@ -1860,6 +1967,12 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) FilteredDownloadedBytesPerSecond.Stop(); WriteProgressBar.Finish(); + + if (OptionalUpdateFolderStream) + { + OptionalUpdateFolderStream->WriteComplete(); + } + if (m_AbortFlag) { return; @@ -3064,8 +3177,9 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd ParallelWork& Work, uint64_t TotalRequestCount, uint64_t TotalPartWriteCount, - FilteredRate& FilteredDownloadedBytesPerSecond, - FilteredRate& FilteredWrittenBytesPerSecond) + FilteredRate& FilteredDownloadedBytesPerSecond, + FilteredRate& FilteredWrittenBytesPerSecond, + UpdateFolderStream* OptionalUpdateFolderStream) { std::filesystem::path ExistingCompressedChunkPath; if (!m_Options.PrimeCacheOnly) @@ -3095,6 +3209,7 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd TotalPartWriteCount, &FilteredWrittenBytesPerSecond, RemoteChunkIndex, + OptionalUpdateFolderStream, ChunkTargetPtrs = std::move(ChunkTargetPtrs), CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>& AbortFlag) mutable { if (!AbortFlag) @@ -3112,8 +3227,11 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd fmt::format("Could not open dowloaded compressed chunk {} from {}", ChunkHash, CompressedChunkPath)); } - bool NeedHashVerify = - WriteCompressedChunkToCache(ChunkHash, ChunkTargetPtrs, WriteCache, std::move(CompressedPart)); + bool NeedHashVerify = WriteCompressedChunkToCache(RemoteChunkIndex, + ChunkTargetPtrs, + WriteCache, + std::move(CompressedPart), + OptionalUpdateFolderStream); WritePartsComplete++; if (!AbortFlag) @@ -3138,11 +3256,11 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd WriteCache.Close(CompletedSequences); if (NeedHashVerify) { - VerifyAndCompleteChunkSequencesAsync(CompletedSequences, Work); + VerifyAndCompleteChunkSequencesAsync(CompletedSequences, Work, OptionalUpdateFolderStream); } else { - FinalizeChunkSequences(CompletedSequences); + FinalizeChunkSequences(CompletedSequences, OptionalUpdateFolderStream); } } } @@ -3159,6 +3277,7 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd &WritePartsComplete, TotalPartWriteCount, TotalRequestCount, + OptionalUpdateFolderStream, &FilteredDownloadedBytesPerSecond, &FilteredWrittenBytesPerSecond, RemoteChunkIndex, @@ -3183,6 +3302,7 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd RemoteChunkIndex, &FilteredDownloadedBytesPerSecond, &FilteredWrittenBytesPerSecond, + OptionalUpdateFolderStream, ChunkTargetPtrs = std::move(ChunkTargetPtrs)](IoBuffer&& Payload) mutable { if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) { @@ -3200,7 +3320,8 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd WritePartsComplete, TotalPartWriteCount, FilteredWrittenBytesPerSecond, - EnableBacklog); + EnableBacklog, + OptionalUpdateFolderStream); }); } }); @@ -3639,7 +3760,8 @@ BuildsOperationUpdateFolder::WriteLocalChunkToCache(CloneQueryInterface* C const std::vector<ChunkedFolderContent>& ScavengedContents, const std::vector<ChunkedContentLookup>& ScavengedLookups, const std::vector<std::filesystem::path>& ScavengedPaths, - BufferedWriteFileCache& WriteCache) + BufferedWriteFileCache& WriteCache, + UpdateFolderStream* OptionalUpdateFolderStream) { ZEN_TRACE_CPU("WriteLocalChunkToCache"); @@ -3811,12 +3933,22 @@ BuildsOperationUpdateFolder::WriteLocalChunkToCache(CloneQueryInterface* C m_WrittenChunkByteCount += ClonableBytes; + if (OptionalUpdateFolderStream) + { + OptionalUpdateFolderStream->WroteSequence(RemoteSequenceIndex, Op.Target->Offset + PreBytes, ClonableBytes); + } + if (PreBytes > 0) { CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, PreBytes); const uint64_t FileOffset = Op.Target->Offset; WriteSequenceChunkToCache(LocalWriter, ChunkSource, RemoteSequenceIndex, FileOffset, RemotePathIndex); + + if (OptionalUpdateFolderStream) + { + OptionalUpdateFolderStream->WroteSequence(RemoteSequenceIndex, FileOffset, ChunkSource.GetSize()); + } } if (PostBytes > 0) { @@ -3824,6 +3956,11 @@ BuildsOperationUpdateFolder::WriteLocalChunkToCache(CloneQueryInterface* C const uint64_t FileOffset = Op.Target->Offset + ReadLength - PostBytes; WriteSequenceChunkToCache(LocalWriter, ChunkSource, RemoteSequenceIndex, FileOffset, RemotePathIndex); + + if (OptionalUpdateFolderStream) + { + OptionalUpdateFolderStream->WroteSequence(RemoteSequenceIndex, FileOffset, ChunkSource.GetSize()); + } } } } @@ -3836,6 +3973,11 @@ BuildsOperationUpdateFolder::WriteLocalChunkToCache(CloneQueryInterface* C const uint64_t FileOffset = Op.Target->Offset; WriteSequenceChunkToCache(LocalWriter, ChunkSource, RemoteSequenceIndex, FileOffset, RemotePathIndex); + + if (OptionalUpdateFolderStream) + { + OptionalUpdateFolderStream->WroteSequence(RemoteSequenceIndex, FileOffset, ChunkSource.GetSize()); + } } } @@ -3862,20 +4004,20 @@ BuildsOperationUpdateFolder::WriteLocalChunkToCache(CloneQueryInterface* C bool BuildsOperationUpdateFolder::WriteCompressedChunkToCache( - const IoHash& ChunkHash, + const uint32_t RemoteChunkIndex, const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs, BufferedWriteFileCache& WriteCache, - IoBuffer&& CompressedPart) + IoBuffer&& CompressedPart, + UpdateFolderStream* OptionalUpdateFolderStream) { ZEN_TRACE_CPU("WriteCompressedChunkToCache"); - auto ChunkHashToChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); - ZEN_ASSERT(ChunkHashToChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end()); + const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + if (IsSingleFileChunk(m_RemoteContent, ChunkTargetPtrs)) { - const std::uint32_t SequenceIndex = ChunkTargetPtrs.front()->SequenceIndex; - const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]; - StreamDecompress(SequenceRawHash, CompositeBuffer(std::move(CompressedPart))); + const std::uint32_t SequenceIndex = ChunkTargetPtrs.front()->SequenceIndex; + StreamDecompress(SequenceIndex, CompositeBuffer(std::move(CompressedPart)), OptionalUpdateFolderStream); return false; } else @@ -3912,6 +4054,11 @@ BuildsOperationUpdateFolder::WriteCompressedChunkToCache( const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; WriteSequenceChunkToCache(LocalWriter, RangeBuffer, SequenceIndex, FileOffset, PathIndex); + + if (OptionalUpdateFolderStream) + { + OptionalUpdateFolderStream->WroteSequence(SequenceIndex, FileOffset, RangeBuffer.GetSize()); + } } return true; @@ -3934,9 +4081,13 @@ BuildsOperationUpdateFolder::WriteCompressedChunkToCache( } void -BuildsOperationUpdateFolder::StreamDecompress(const IoHash& SequenceRawHash, CompositeBuffer&& CompressedPart) +BuildsOperationUpdateFolder::StreamDecompress(uint32_t RemoteSequenceIndex, + CompositeBuffer&& CompressedPart, + UpdateFolderStream* OptionalUpdateFolderStream) { ZEN_TRACE_CPU("StreamDecompress"); + const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash); TemporaryFile DecompressedTemp; std::error_code Ec; @@ -3962,33 +4113,37 @@ BuildsOperationUpdateFolder::StreamDecompress(const IoHash& SequenceRawHash, Com PrepareFileForScatteredWrite(DecompressedTemp.Handle(), RawSize); IoHashStream Hash; - bool CouldDecompress = - Compressed.DecompressToStream(0, - (uint64_t)-1, - [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { - ZEN_UNUSED(SourceOffset); - ZEN_TRACE_CPU("StreamDecompress_Write"); - m_DiskStats.ReadCount++; - m_DiskStats.ReadByteCount += SourceSize; - if (!m_AbortFlag) - { - for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) - { - if (m_Options.ValidateCompletedSequences) - { - Hash.Append(Segment.GetView()); - m_ValidatedChunkByteCount += Segment.GetSize(); - } - DecompressedTemp.Write(Segment, Offset); - Offset += Segment.GetSize(); - m_DiskStats.WriteByteCount += Segment.GetSize(); - m_DiskStats.WriteCount++; - m_WrittenChunkByteCount += Segment.GetSize(); - } - return true; - } - return false; - }); + bool CouldDecompress = Compressed.DecompressToStream( + 0, + (uint64_t)-1, + [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { + ZEN_UNUSED(SourceOffset); + ZEN_TRACE_CPU("StreamDecompress_Write"); + m_DiskStats.ReadCount++; + m_DiskStats.ReadByteCount += SourceSize; + if (!m_AbortFlag) + { + for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) + { + if (m_Options.ValidateCompletedSequences) + { + Hash.Append(Segment.GetView()); + m_ValidatedChunkByteCount += Segment.GetSize(); + } + DecompressedTemp.Write(Segment, Offset); + if (OptionalUpdateFolderStream) + { + OptionalUpdateFolderStream->WroteSequence(RemoteSequenceIndex, Offset, Segment.GetSize()); + } + Offset += Segment.GetSize(); + m_DiskStats.WriteByteCount += Segment.GetSize(); + m_DiskStats.WriteCount++; + m_WrittenChunkByteCount += Segment.GetSize(); + } + return true; + } + return false; + }); if (m_AbortFlag) { @@ -4175,7 +4330,8 @@ void BuildsOperationUpdateFolder::WriteBlockChunkOpsToCache(std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, const BlockWriteOps& Ops, BufferedWriteFileCache& WriteCache, - ParallelWork& Work) + ParallelWork& Work, + UpdateFolderStream* OptionalUpdateFolderStream) { ZEN_TRACE_CPU("WriteBlockChunkOpsToCache"); @@ -4196,6 +4352,11 @@ BuildsOperationUpdateFolder::WriteBlockChunkOpsToCache(std::span<std::atomic<uin const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; WriteSequenceChunkToCache(LocalWriter, Chunk, SequenceIndex, FileOffset, PathIndex); + + if (OptionalUpdateFolderStream) + { + OptionalUpdateFolderStream->WroteSequence(SequenceIndex, FileOffset, Chunk.GetSize()); + } } } if (!Work.IsAborted()) @@ -4211,7 +4372,7 @@ BuildsOperationUpdateFolder::WriteBlockChunkOpsToCache(std::span<std::atomic<uin } } WriteCache.Close(CompletedChunkSequences); - VerifyAndCompleteChunkSequencesAsync(CompletedChunkSequences, Work); + VerifyAndCompleteChunkSequencesAsync(CompletedChunkSequences, Work, OptionalUpdateFolderStream); } } @@ -4221,7 +4382,8 @@ BuildsOperationUpdateFolder::WriteChunksBlockToCache(const ChunkBlockDescription ParallelWork& Work, CompositeBuffer&& BlockBuffer, std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, - BufferedWriteFileCache& WriteCache) + BufferedWriteFileCache& WriteCache, + UpdateFolderStream* OptionalUpdateFolderStream) { ZEN_TRACE_CPU("WriteChunksBlockToCache"); @@ -4246,7 +4408,7 @@ BuildsOperationUpdateFolder::WriteChunksBlockToCache(const ChunkBlockDescription gsl::narrow<uint32_t>(BlockDescription.ChunkRawHashes.size() - 1), Ops)) { - WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work); + WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work, OptionalUpdateFolderStream); return true; } return false; @@ -4261,7 +4423,7 @@ BuildsOperationUpdateFolder::WriteChunksBlockToCache(const ChunkBlockDescription gsl::narrow<uint32_t>(BlockDescription.ChunkRawHashes.size() - 1), Ops)) { - WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work); + WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work, OptionalUpdateFolderStream); return true; } return false; @@ -4275,7 +4437,8 @@ BuildsOperationUpdateFolder::WritePartialBlockChunksToCache(const ChunkBlockDesc uint32_t FirstIncludedBlockChunkIndex, uint32_t LastIncludedBlockChunkIndex, std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, - BufferedWriteFileCache& WriteCache) + BufferedWriteFileCache& WriteCache, + UpdateFolderStream* OptionalUpdateFolderStream) { ZEN_TRACE_CPU("WritePartialBlockChunksToCache"); @@ -4292,7 +4455,7 @@ BuildsOperationUpdateFolder::WritePartialBlockChunksToCache(const ChunkBlockDesc LastIncludedBlockChunkIndex, Ops)) { - WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work); + WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work, OptionalUpdateFolderStream); return true; } else @@ -4312,7 +4475,8 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa std::atomic<uint64_t>& WritePartsComplete, const uint64_t TotalPartWriteCount, FilteredRate& FilteredWrittenBytesPerSecond, - bool EnableBacklog) + bool EnableBacklog, + UpdateFolderStream* OptionalUpdateFolderStream) { ZEN_TRACE_CPU("AsyncWriteDownloadedChunk"); @@ -4370,6 +4534,7 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa &WriteCache, &WritePartsComplete, &FilteredWrittenBytesPerSecond, + OptionalUpdateFolderStream, ChunkTargetPtrs = std::move(ChunkTargetPtrs), CompressedPart = IoBuffer(std::move(Payload))](std::atomic<bool>&) mutable { if (!m_AbortFlag) @@ -4394,7 +4559,11 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa } } - bool NeedHashVerify = WriteCompressedChunkToCache(ChunkHash, ChunkTargetPtrs, WriteCache, std::move(CompressedPart)); + bool NeedHashVerify = WriteCompressedChunkToCache(RemoteChunkIndex, + ChunkTargetPtrs, + WriteCache, + std::move(CompressedPart), + OptionalUpdateFolderStream); if (!m_AbortFlag) { WritePartsComplete++; @@ -4421,11 +4590,11 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa WriteCache.Close(CompletedSequences); if (NeedHashVerify) { - VerifyAndCompleteChunkSequencesAsync(CompletedSequences, Work); + VerifyAndCompleteChunkSequencesAsync(CompletedSequences, Work, OptionalUpdateFolderStream); } else { - FinalizeChunkSequences(CompletedSequences); + FinalizeChunkSequences(CompletedSequences, OptionalUpdateFolderStream); } } } @@ -4434,7 +4603,9 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa } void -BuildsOperationUpdateFolder::VerifyAndCompleteChunkSequencesAsync(std::span<const uint32_t> RemoteSequenceIndexes, ParallelWork& Work) +BuildsOperationUpdateFolder::VerifyAndCompleteChunkSequencesAsync(std::span<const uint32_t> RemoteSequenceIndexes, + ParallelWork& Work, + UpdateFolderStream* OptionalUpdateFolderStream) { if (RemoteSequenceIndexes.empty()) { @@ -4446,7 +4617,7 @@ BuildsOperationUpdateFolder::VerifyAndCompleteChunkSequencesAsync(std::span<cons for (uint32_t RemoteSequenceIndexOffset = 1; RemoteSequenceIndexOffset < RemoteSequenceIndexes.size(); RemoteSequenceIndexOffset++) { const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset]; - Work.ScheduleWork(m_IOWorkerPool, [this, RemoteSequenceIndex](std::atomic<bool>&) { + Work.ScheduleWork(m_IOWorkerPool, [this, RemoteSequenceIndex, OptionalUpdateFolderStream](std::atomic<bool>&) { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_VerifyAndFinalizeSequence"); @@ -4454,8 +4625,7 @@ BuildsOperationUpdateFolder::VerifyAndCompleteChunkSequencesAsync(std::span<cons VerifySequence(RemoteSequenceIndex); if (!m_AbortFlag) { - const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; - FinalizeChunkSequence(SequenceRawHash); + FinalizeChunkSequence(RemoteSequenceIndex, OptionalUpdateFolderStream); } } }); @@ -4463,16 +4633,14 @@ BuildsOperationUpdateFolder::VerifyAndCompleteChunkSequencesAsync(std::span<cons const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[0]; VerifySequence(RemoteSequenceIndex); - const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; - FinalizeChunkSequence(SequenceRawHash); + FinalizeChunkSequence(RemoteSequenceIndex, OptionalUpdateFolderStream); } else { for (uint32_t RemoteSequenceIndexOffset = 0; RemoteSequenceIndexOffset < RemoteSequenceIndexes.size(); RemoteSequenceIndexOffset++) { const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset]; - const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; - FinalizeChunkSequence(SequenceRawHash); + FinalizeChunkSequence(RemoteSequenceIndex, OptionalUpdateFolderStream); } } } @@ -4506,10 +4674,12 @@ BuildsOperationUpdateFolder::CompleteChunkTargets(const std::vector<const Chunke } void -BuildsOperationUpdateFolder::FinalizeChunkSequence(const IoHash& SequenceRawHash) +BuildsOperationUpdateFolder::FinalizeChunkSequence(uint32_t RemoteSequenceIndex, UpdateFolderStream* OptionalUpdateFolderStream) { ZEN_TRACE_CPU("FinalizeChunkSequence"); + const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + ZEN_ASSERT_SLOW(!IsFile(GetFinalChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash))); std::error_code Ec; RenameFile(GetTempChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash), @@ -4519,16 +4689,21 @@ BuildsOperationUpdateFolder::FinalizeChunkSequence(const IoHash& SequenceRawHash { throw std::system_error(Ec); } + if (OptionalUpdateFolderStream) + { + OptionalUpdateFolderStream->CompletedSequence(RemoteSequenceIndex); + } } void -BuildsOperationUpdateFolder::FinalizeChunkSequences(std::span<const uint32_t> RemoteSequenceIndexes) +BuildsOperationUpdateFolder::FinalizeChunkSequences(std::span<const uint32_t> RemoteSequenceIndexes, + UpdateFolderStream* OptionalUpdateFolderStream) { ZEN_TRACE_CPU("FinalizeChunkSequences"); for (uint32_t SequenceIndex : RemoteSequenceIndexes) { - FinalizeChunkSequence(m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); + FinalizeChunkSequence(SequenceIndex, OptionalUpdateFolderStream); } } diff --git a/src/zenremotestore/builds/updatefolderstream.cpp b/src/zenremotestore/builds/updatefolderstream.cpp new file mode 100644 index 000000000..5a1809923 --- /dev/null +++ b/src/zenremotestore/builds/updatefolderstream.cpp @@ -0,0 +1,631 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenremotestore/builds/updatefolderstream.h> + +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryutil.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/stream.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <xxhash.h> +ZEN_THIRD_PARTY_INCLUDES_END + +#include <numeric> + +namespace zen { + +using namespace std::literals; + +namespace { +#pragma pack(push) +#pragma pack(1) + struct StreamHeader + { + static constexpr uint32_t ExpectedMagic = 0x7a726273; // 'zrbs'; + static constexpr uint32_t CurrentVersion = 1; + static constexpr uint32_t UpdateFolderStreamType = HashStringDjb2("UpdateFolderDiskStreamFeedback"sv); + + uint32_t Magic = ExpectedMagic; + uint32_t Version = CurrentVersion; + uint32_t StreamType = 0; + uint32_t PrepPayloadSize = 0; + uint64_t Reserved0 = 0; + uint32_t Reserved1 = 0; + uint32_t Checksum = 0; + + static uint32_t ComputeChecksum(const StreamHeader& Header) + { + return XXH32(&Header.Magic, sizeof(Header) - sizeof(uint32_t), 0xC0C0'BABA); + } + }; +#pragma pack(pop) + static_assert(sizeof(StreamHeader) == 32); + + class UpdateFolderDiskStreamFeedback : public UpdateFolderStream + { + using Event = UpdateFolderDiskStream::Event; + + public: + explicit UpdateFolderDiskStreamFeedback(const std::filesystem::path& OutputPath) : m_OutputStream(OutputPath) {} + virtual ~UpdateFolderDiskStreamFeedback() {} + + virtual void Initialize(const ChunkedFolderContent& RemoteContent, + std::span<const ChunkBlockDescription> BlockDescriptions, + std::span<const Chunk> DownloadChunks, + std::span<const BlockRange> DownloadBlockRanges, + std::span<const Block> DownloadBlocks, + std::span<const Sequence> SequencesToWrite) override + { + m_OutputStream + .PrepareWrite(RemoteContent, BlockDescriptions, DownloadChunks, DownloadBlockRanges, DownloadBlocks, SequencesToWrite); + } + + virtual void DownloadedChunk(uint32_t RemoteChunkIndex) override + { + m_OutputStream.WriteEvent( + m_OutputStream.MakeEvent(Event::EventType::DownloadedChunk, + Event::EventPayload{.DownloadedChunk = {.RemoteChunkIndex = RemoteChunkIndex}})); + } + + virtual void DownloadedBlockRange(uint32_t BlockIndex, uint32_t RangeStart, uint32_t RangeLength) override + { + m_OutputStream.WriteEvent(m_OutputStream.MakeEvent( + Event::EventType::DownloadedBlockRange, + Event::EventPayload{ + .DownloadedBlockRange = {.BlockIndex = BlockIndex, .RangeStart = RangeStart, .RangeLength = RangeLength}})); + } + + virtual void DownloadedBlock(uint32_t BlockIndex) override + { + m_OutputStream.WriteEvent(m_OutputStream.MakeEvent(Event::EventType::DownloadedBlock, + Event::EventPayload{.DownloadedBlock = {.BlockIndex = BlockIndex}})); + } + + virtual void WroteSequence(uint32_t RemoteSequenceIndex, uint64_t Offset, uint64_t Length) override + { + m_OutputStream.WriteEvent(m_OutputStream.MakeEvent( + Event::EventType::WroteSequence, + Event::EventPayload{.WroteSequence = {.RemoteSequenceIndex = RemoteSequenceIndex, .Offset = Offset, .Length = Length}})); + } + + virtual void CompletedBlockRange(uint32_t BlockIndex, uint32_t RangeStart, uint32_t RangeLength) override + { + m_OutputStream.WriteEvent(m_OutputStream.MakeEvent( + Event::EventType::CompletedBlockRange, + Event::EventPayload{ + .CompletedBlockRange = {.BlockIndex = BlockIndex, .RangeStart = RangeStart, .RangeLength = RangeLength}})); + } + + virtual void CompletedBlock(uint32_t BlockIndex) override + { + m_OutputStream.WriteEvent(m_OutputStream.MakeEvent(Event::EventType::CompletedBlock, + Event::EventPayload{.CompletedBlock = {.BlockIndex = BlockIndex}})); + } + + virtual void CompletedSequence(uint32_t RemoteSequenceIndex) override + { + m_OutputStream.WriteEvent( + m_OutputStream.MakeEvent(Event::EventType::CompletedSequence, + Event::EventPayload{.CompletedSequence = {.RemoteSequenceIndex = RemoteSequenceIndex}})); + } + + virtual void WriteComplete() override { m_OutputStream.EndWrite(); } + + private: + UpdateFolderDiskStream m_OutputStream; + }; +} // namespace + +std::unique_ptr<UpdateFolderStream> +CreateDiskUpdateFolderStream(const std::filesystem::path& OutputPath) +{ + return std::make_unique<UpdateFolderDiskStreamFeedback>(OutputPath); +} + +UpdateFolderDiskStream::Event::EventHeader +UpdateFolderDiskStream::Event::MakeHeader(Event::EventType Type, uint64_t TimeStampUs, uint16_t ThreadIndex) +{ + return EventHeader{.EventType = Type, + .TimestampUsHigh = gsl::narrow<uint8_t>(TimeStampUs >> 32), + .TimestampUsLow = static_cast<uint32_t>(TimeStampUs & 0xffffffffu), + .ThreadIndex = ThreadIndex}; +} + +UpdateFolderDiskStream::UpdateFolderDiskStream(const std::filesystem::path& OutputPath) +: m_OutputPath(OutputPath) +, m_Output(OutputPath, BasicFile::Mode::kTruncate) +, m_WriteOffset(0) +{ +} + +UpdateFolderDiskStream::~UpdateFolderDiskStream() +{ + try + { + m_Output.Close(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed closing streaming output at '{}'. Reason: '{}'", m_OutputPath, Ex.what()); + } +} + +UpdateFolderDiskStream::Event +UpdateFolderDiskStream::MakeEvent(Event::EventType Type, Event::EventPayload Payload) +{ + return Event{.Header = Event::MakeHeader(Type, m_Stopwatch.GetElapsedTimeUs(), GetCurrentThreadIndex()), .Payload = Payload}; +} + +void +UpdateFolderDiskStream::WriteEvent(const Event& InEvent) +{ + uint64_t WriteOffset = m_WriteOffset.fetch_add(sizeof(Event)); + m_Output.Write(&InEvent, sizeof(Event), WriteOffset); + m_EventCount++; +} + +void +UpdateFolderDiskStream::PrepareWrite(const ChunkedFolderContent& RemoteContent, + std::span<const ChunkBlockDescription> BlockDescriptions, + std::span<const Chunk> DownloadChunks, + std::span<const BlockRange> DownloadBlockRanges, + std::span<const Block> DownloadBlocks, + std::span<const Sequence> SequencesToWrite) +{ + CbObjectWriter Writer; + Writer.BeginObject("remoteContent"sv); + { + compactbinary_helpers::WriteArray(RemoteContent.Paths, "paths"sv, Writer); + compactbinary_helpers::WriteArray(RemoteContent.RawSizes, "rawSizes"sv, Writer); + compactbinary_helpers::WriteArray(RemoteContent.Attributes, "attributes"sv, Writer); + compactbinary_helpers::WriteArray(RemoteContent.RawHashes, "rawHashes"sv, Writer); + Writer.BeginObject("chunkedContent"sv); + { + compactbinary_helpers::WriteArray(RemoteContent.ChunkedContent.SequenceRawHashes, "sequenceRawHashes"sv, Writer); + compactbinary_helpers::WriteArray(RemoteContent.ChunkedContent.ChunkCounts, "chunkCounts"sv, Writer); + compactbinary_helpers::WriteArray(RemoteContent.ChunkedContent.ChunkOrders, "chunkOrders"sv, Writer); + compactbinary_helpers::WriteArray(RemoteContent.ChunkedContent.ChunkHashes, "chunkHashes"sv, Writer); + compactbinary_helpers::WriteArray(RemoteContent.ChunkedContent.ChunkRawSizes, "chunkRawSizes"sv, Writer); + } + Writer.EndObject(); // chunkedContent + } + Writer.EndObject(); // remoteContent + + Writer.BeginArray("blockDescriptions"sv); + for (const ChunkBlockDescription& Block : BlockDescriptions) + { + Writer.BeginObject(); + { + Writer.AddHash("blockHash"sv, Block.BlockHash); + compactbinary_helpers::WriteArray(Block.ChunkRawHashes, "chunkRawHashes"sv, Writer); + Writer.AddInteger("headerSize"sv, Block.HeaderSize); + compactbinary_helpers::WriteArray(Block.ChunkRawLengths, "chunkRawLengths"sv, Writer); + compactbinary_helpers::WriteArray(Block.ChunkCompressedLengths, "chunkCompressedLengths"sv, Writer); + } + Writer.EndObject(); + } + Writer.EndArray(); // blockDescriptions + + Writer.BeginArray("downloadChunks"sv); + for (const Chunk& DownloadChunk : DownloadChunks) + { + Writer.BeginObject(); + { + Writer.AddInteger("remoteChunkIndex"sv, DownloadChunk.RemoteChunkIndex); + } + Writer.EndObject(); + } + Writer.EndArray(); // downloadChunks + + Writer.BeginArray("downloadBlockRanges"sv); + for (const BlockRange& DownloadBlockRange : DownloadBlockRanges) + { + Writer.BeginObject(); + { + Writer.AddInteger("blockIndex"sv, DownloadBlockRange.BlockIndex); + Writer.AddInteger("rangeStart"sv, DownloadBlockRange.RangeStart); + Writer.AddInteger("rangeLength"sv, DownloadBlockRange.RangeLength); + } + Writer.EndObject(); + } + Writer.EndArray(); // downloadBlockRanges + + Writer.BeginArray("downloadBlocks"sv); + for (const Block& DownloadBlock : DownloadBlocks) + { + Writer.BeginObject(); + { + Writer.AddInteger("blockIndex"sv, DownloadBlock.BlockIndex); + } + Writer.EndObject(); + } + Writer.EndArray(); // downloadBlocks + + Writer.BeginArray("sequencesToWrite"sv); + for (const Sequence& SequenceToWrite : SequencesToWrite) + { + Writer.BeginObject(); + { + Writer.AddInteger("remoteSequenceIndex"sv, SequenceToWrite.RemoteSequenceIndex); + } + Writer.EndObject(); + } + Writer.EndArray(); // sequencesToWrite + + CbObject PrepPayload = Writer.Save(); + + BinaryWriter PayloadBuffer; + PrepPayload.CopyTo(PayloadBuffer); + + uint32_t PrepPayloadSize = gsl::narrow<uint32_t>(PayloadBuffer.GetSize()); + + StreamHeader Header = {.StreamType = StreamHeader::UpdateFolderStreamType, .PrepPayloadSize = PrepPayloadSize}; + + Header.Checksum = StreamHeader::ComputeChecksum(Header); + + m_Output.Write(&Header, sizeof(StreamHeader), 0); + + m_WriteOffset = sizeof(StreamHeader); + + m_Output.Write(PayloadBuffer.GetView(), m_WriteOffset.load()); + + m_WriteOffset += RoundUp(PrepPayloadSize, 8); + + m_Output.Flush(); + + m_Stopwatch.Reset(); +} + +void +UpdateFolderDiskStream::EndWrite() +{ + RwLock::ExclusiveLockScope _(m_UpdateLock); + m_Output.Close(); +} + +void +UpdateFolderDiskStream::Read(const std::filesystem::path& InputPath, + ChunkedFolderContent& OutRemoteContent, + std::vector<ChunkBlockDescription>& OutBlockDescriptions, + std::vector<Chunk>& OutDownloadChunks, + std::vector<BlockRange>& OutDownloadBlockRanges, + std::vector<Block>& OutDownloadBlocks, + std::vector<Sequence>& OutSequencesToWrite, + std::vector<Event>& OutEvents) +{ + BasicFile InputFile(InputPath, BasicFile::Mode::kRead); + + uint64_t Size = InputFile.FileSize(); + if (Size < sizeof(StreamHeader)) + { + throw std::runtime_error(fmt::format("Expected size >= {}, file has size {}", sizeof(StreamHeader), Size)); + } + + uint64_t Offset = 0; + StreamHeader Header; + InputFile.Read(&Header, sizeof(StreamHeader), Offset); + Offset += sizeof(Header); + + if (Header.Magic != StreamHeader::ExpectedMagic) + { + throw std::runtime_error( + fmt::format("Expected magic 0x{:04x}, file has magic 0x{:04x}", StreamHeader::ExpectedMagic, Header.Magic)); + } + if (Header.Version != StreamHeader::CurrentVersion) + { + throw std::runtime_error(fmt::format("Expected version {}, file has version {}", StreamHeader::CurrentVersion, Header.Version)); + } + if (Header.StreamType != StreamHeader::UpdateFolderStreamType) + { + throw std::runtime_error( + fmt::format("Expected stream type {}, file has stream type {}", StreamHeader::UpdateFolderStreamType, Header.StreamType)); + } + if (Header.Checksum != StreamHeader::ComputeChecksum(Header)) + { + throw std::runtime_error( + fmt::format("Expected checksum 0x{:04x}, file has checksum 0x{:04x}", Header.Checksum, StreamHeader::ComputeChecksum(Header))); + } + + uint64_t ExpectedMinSize = sizeof(StreamHeader) + RoundUp(Header.PrepPayloadSize, 8); + + if (Size < ExpectedMinSize) + { + throw std::runtime_error(fmt::format("Expected size >= {}, file has size {}", ExpectedMinSize, Size)); + } + + IoBuffer PrepPayloadHeader(Header.PrepPayloadSize); + InputFile.Read(PrepPayloadHeader.MutableData(), Header.PrepPayloadSize, Offset); + CbValidateError ValidateError = CbValidateError::None; + CbObject PrepPayload = ValidateAndReadCompactBinaryObject(std::move(PrepPayloadHeader), ValidateError); + if (ValidateError != CbValidateError::None) + { + throw std::runtime_error(fmt::format("Initial setup payload is malformed. Reason: '{}'", ToString(ValidateError))); + } + + if (CbObjectView RemoteContentView = PrepPayload["remoteContent"sv].AsObjectView(); RemoteContentView) + { + OutRemoteContent.Paths = compactbinary_helpers::ReadArray<std::filesystem::path>("paths"sv, RemoteContentView); + OutRemoteContent.RawSizes = compactbinary_helpers::ReadArray<uint64_t>("rawSizes"sv, RemoteContentView); + OutRemoteContent.Attributes = compactbinary_helpers::ReadArray<uint32_t>("attributes"sv, RemoteContentView); + OutRemoteContent.RawHashes = compactbinary_helpers::ReadArray<IoHash>("rawHashes"sv, RemoteContentView); + if (CbObjectView ChunkedContentView = RemoteContentView["chunkedContent"sv].AsObjectView(); ChunkedContentView) + { + OutRemoteContent.ChunkedContent.SequenceRawHashes = + compactbinary_helpers::ReadArray<IoHash>("sequenceRawHashes"sv, ChunkedContentView); + OutRemoteContent.ChunkedContent.ChunkCounts = compactbinary_helpers::ReadArray<uint32_t>("chunkCounts"sv, ChunkedContentView); + OutRemoteContent.ChunkedContent.ChunkOrders = compactbinary_helpers::ReadArray<uint32_t>("chunkOrders"sv, ChunkedContentView); + OutRemoteContent.ChunkedContent.ChunkHashes = compactbinary_helpers::ReadArray<IoHash>("chunkHashes"sv, ChunkedContentView); + OutRemoteContent.ChunkedContent.ChunkRawSizes = + compactbinary_helpers::ReadArray<uint64_t>("chunkRawSizes"sv, ChunkedContentView); + } + } + + if (CbArrayView BlockDescriptionsView = PrepPayload["blockDescriptions"sv].AsArrayView(); BlockDescriptionsView) + { + OutBlockDescriptions.reserve(BlockDescriptionsView.Num()); + for (CbFieldView BlockDescriptionFieldView : BlockDescriptionsView) + { + CbObjectView BlockDescriptionView = BlockDescriptionFieldView.AsObjectView(); + OutBlockDescriptions.push_back(ChunkBlockDescription{ + ThinChunkBlockDescription{ + .BlockHash = BlockDescriptionView["blockHash"sv].AsHash(), + .ChunkRawHashes = compactbinary_helpers::ReadArray<IoHash>("chunkRawHashes"sv, BlockDescriptionView)}, + BlockDescriptionView["headerSize"sv].AsUInt64(), + compactbinary_helpers::ReadArray<uint32_t>("chunkRawLengths"sv, BlockDescriptionView), + compactbinary_helpers::ReadArray<uint32_t>("chunkCompressedLengths"sv, BlockDescriptionView)}); + } + } + + if (CbArrayView DownloadChunksView = PrepPayload["downloadChunks"sv].AsArrayView(); DownloadChunksView) + { + OutDownloadChunks.reserve(DownloadChunksView.Num()); + for (CbFieldView DownloadChunkFieldView : DownloadChunksView) + { + CbObjectView DownloadChunkView = DownloadChunkFieldView.AsObjectView(); + OutDownloadChunks.push_back(Chunk{.RemoteChunkIndex = DownloadChunkView["headerSize"sv].AsUInt32()}); + } + } + + if (CbArrayView DownloadBlockRangesView = PrepPayload["downloadBlockRanges"sv].AsArrayView(); DownloadBlockRangesView) + { + OutDownloadBlockRanges.reserve(DownloadBlockRangesView.Num()); + for (CbFieldView DownloadBlockRangeFieldView : DownloadBlockRangesView) + { + CbObjectView DownloadBlockRangeView = DownloadBlockRangeFieldView.AsObjectView(); + OutDownloadBlockRanges.push_back(BlockRange{.BlockIndex = DownloadBlockRangeView["blockIndex"sv].AsUInt32(), + .RangeStart = DownloadBlockRangeView["rangeStart"sv].AsUInt32(), + .RangeLength = DownloadBlockRangeView["rangeLength"sv].AsUInt32()}); + } + } + + if (CbArrayView DownloadBlocksView = PrepPayload["downloadBlocks"sv].AsArrayView(); DownloadBlocksView) + { + OutDownloadBlocks.reserve(DownloadBlocksView.Num()); + for (CbFieldView DownloadBlockFieldView : DownloadBlocksView) + { + CbObjectView DownloadBlockView = DownloadBlockFieldView.AsObjectView(); + OutDownloadBlocks.push_back(Block{.BlockIndex = DownloadBlockView["blockIndex"sv].AsUInt32()}); + } + } + + if (CbArrayView SequencesToWriteView = PrepPayload["sequencesToWrite"sv].AsArrayView(); SequencesToWriteView) + { + OutSequencesToWrite.reserve(SequencesToWriteView.Num()); + for (CbFieldView SequenceToWriteFieldView : SequencesToWriteView) + { + CbObjectView SequenceToWriteView = SequenceToWriteFieldView.AsObjectView(); + OutSequencesToWrite.push_back(Sequence{.RemoteSequenceIndex = SequenceToWriteView["remoteSequenceIndex"sv].AsUInt32()}); + } + } + + Offset += RoundUp(Header.PrepPayloadSize, 8); + + uint64_t EntryCount = (Size - Offset) / sizeof(Event); + uint64_t ExpectedSize = Offset + EntryCount * sizeof(Event); + if (ExpectedSize != Size) + { + throw std::runtime_error(fmt::format("Expected size {}, file has size {}", ExpectedSize, Size)); + } + + OutEvents.resize(EntryCount); + + InputFile.Read(OutEvents.data(), EntryCount * sizeof(Event), Offset); +} + +void +UpdateFolderDiskStream::Replay(const std::filesystem::path& InputPath, UpdateFolderStream& UpdateFolderStream) +{ + ZEN_UNUSED(UpdateFolderStream); + + ChunkedFolderContent RemoteContent; + std::vector<ChunkBlockDescription> BlockDescriptions; + std::vector<Chunk> DownloadChunks; + std::vector<BlockRange> DownloadBlockRanges; + std::vector<Block> DownloadBlocks; + std::vector<Sequence> SequencesToWrite; + std::vector<Event> Events; + + Read(InputPath, RemoteContent, BlockDescriptions, DownloadChunks, DownloadBlockRanges, DownloadBlocks, SequencesToWrite, Events); + + UpdateFolderStream.Initialize(RemoteContent, BlockDescriptions, DownloadChunks, DownloadBlockRanges, DownloadBlocks, SequencesToWrite); + + for (const Event& E : Events) + { + switch (E.Header.EventType) + { + case Event::EventType::DownloadedChunk: + UpdateFolderStream.DownloadedChunk(E.Payload.DownloadedChunk.RemoteChunkIndex); + break; + case Event::EventType::DownloadedBlockRange: + UpdateFolderStream.DownloadedBlockRange(E.Payload.DownloadedBlockRange.BlockIndex, + E.Payload.DownloadedBlockRange.RangeStart, + E.Payload.DownloadedBlockRange.RangeLength); + break; + case Event::EventType::DownloadedBlock: + UpdateFolderStream.DownloadedBlock(E.Payload.DownloadedBlock.BlockIndex); + break; + + case Event::EventType::WroteSequence: + UpdateFolderStream.WroteSequence(E.Payload.WroteSequence.RemoteSequenceIndex, + E.Payload.WroteSequence.Offset, + E.Payload.WroteSequence.Length); + break; + + case Event::EventType::CompletedBlockRange: + UpdateFolderStream.CompletedBlockRange(E.Payload.CompletedBlockRange.BlockIndex, + E.Payload.CompletedBlockRange.RangeStart, + E.Payload.CompletedBlockRange.RangeLength); + break; + case Event::EventType::CompletedBlock: + UpdateFolderStream.CompletedBlock(E.Payload.CompletedBlock.BlockIndex); + break; + + case Event::EventType::CompletedSequence: + UpdateFolderStream.CompletedSequence(E.Payload.CompletedSequence.RemoteSequenceIndex); + break; + + case Event::EventType::WriteComplete: + UpdateFolderStream.WriteComplete(); + break; + + default: + throw std::runtime_error(std::format("Unknown event type: {}", uint8_t(E.Header.EventType))); + } + } +} + +uint16_t +UpdateFolderDiskStream::GetThreadIndex(int ThreadId) +{ + { + RwLock::SharedLockScope _(m_UpdateLock); + if (auto It = m_ThreadLookup.find(ThreadId); It != m_ThreadLookup.end()) + { + return It->second; + } + } + RwLock::ExclusiveLockScope _(m_UpdateLock); + if (auto It = m_ThreadLookup.find(ThreadId); It != m_ThreadLookup.end()) + { + return It->second; + } + uint16_t ThreadIndex = gsl::narrow<uint16_t>(m_ThreadLookup.size()); + m_ThreadLookup.insert({ThreadId, ThreadIndex}); + return ThreadIndex; +} + +uint16_t +UpdateFolderDiskStream::GetCurrentThreadIndex() +{ + return GetThreadIndex(GetCurrentThreadId()); +} + +uint64_t +ChunkSequenceStatus::CompletedSize() const +{ + return std::accumulate(CompletedRanges.begin(), CompletedRanges.end(), uint64_t(0), [](uint64_t Current, const Range& Val) { + return Current + Val.Length; + }); +} + +void +ChunkSequenceStatus::AddCompletedRange(const Range& InRange) +{ + ZEN_ASSERT(InRange.Offset + InRange.Length <= Size); +#ifndef NDEBUG + uint64_t PreviousCompletedSize = CompletedSize(); + for (const Range& R : CompletedRanges) + { + ZEN_ASSERT(InRange.Offset + InRange.Length <= R.Offset || InRange.Offset >= R.Offset + R.Length); + } +#endif // NDEBUG + if (CompletedRanges.empty()) + { + CompletedRanges.push_back(InRange); + } + else + { + auto InsertIt = + std::upper_bound(CompletedRanges.begin(), CompletedRanges.end(), InRange.Offset, [](uint64_t InRangeOffset, const Range& Val) { + return InRangeOffset < Val.Offset; + }); + if (InsertIt != CompletedRanges.begin()) + { + // Extend previous? + Range& PreviousRange = *(InsertIt - 1); + ZEN_ASSERT(PreviousRange.Offset + PreviousRange.Length <= InRange.Offset); + if (PreviousRange.Offset + PreviousRange.Length == InRange.Offset) + { + PreviousRange.Length += InRange.Length; + if (InsertIt != CompletedRanges.end()) + { + // Extend next? + Range& NextRange = *InsertIt; + if (NextRange.Offset == PreviousRange.Offset + PreviousRange.Length) + { + PreviousRange.Length += NextRange.Length; + CompletedRanges.erase(InsertIt); + } + } + } + else if (InsertIt != CompletedRanges.end()) + { + // Extend next? + Range& NextRange = *InsertIt; + if (NextRange.Offset == InRange.Offset + InRange.Length) + { + NextRange.Offset = InRange.Offset; + NextRange.Length += InRange.Length; + } + else + { + CompletedRanges.insert(InsertIt, InRange); + } + } + else + { + CompletedRanges.push_back(InRange); + } + } + else if (InsertIt != CompletedRanges.end()) + { + // Extend next? + Range& NextRange = *InsertIt; + ZEN_ASSERT(InRange.Offset + InRange.Length <= NextRange.Offset); + if (NextRange.Offset == InRange.Offset + InRange.Length) + { + NextRange.Offset = InRange.Offset; + NextRange.Length += InRange.Length; + } + else + { + CompletedRanges.insert(InsertIt, InRange); + } + } + else + { + CompletedRanges.push_back(InRange); + } + } +#ifndef NDEBUG + uint64_t NewCompletedSize = CompletedSize(); + ZEN_ASSERT(PreviousCompletedSize + InRange.Length == NewCompletedSize); + ZEN_ASSERT(CompletedRanges.back().Offset + CompletedRanges.back().Length <= Size); + for (size_t Index = 1; Index < CompletedRanges.size(); Index++) + { + ZEN_ASSERT(CompletedRanges[Index - 1].Offset + CompletedRanges[Index - 1].Length < CompletedRanges[Index].Offset); + } +#endif // NDEBUG +} + +void +ChunkSequenceStatus::RemoveCompletedRange(const Range& InRange) +{ + // TODO + ZEN_UNUSED(InRange); + ZEN_ASSERT(!CompletedRanges.empty()); +} + +} // namespace zen diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h index 6304159ae..fe69a624c 100644 --- a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h +++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h @@ -35,6 +35,8 @@ class BufferedWriteFileCache; struct ChunkBlockDescription; struct ChunkedFolderContent; +class UpdateFolderStream; + struct DiskStatistics { std::atomic<uint64_t> OpenReadCount = 0; @@ -162,7 +164,7 @@ public: const std::vector<IoHash>& LooseChunkHashes, const Options& Options); - void Execute(FolderContent& OutLocalFolderState); + void Execute(FolderContent& OutLocalFolderState, UpdateFolderStream* OptionalUpdateFolderStream = nullptr); DiskStatistics m_DiskStats; CacheMappingStatistics m_CacheMappingStats; @@ -292,7 +294,8 @@ private: uint64_t TotalRequestCount, uint64_t TotalPartWriteCount, FilteredRate& FilteredDownloadedBytesPerSecond, - FilteredRate& FilteredWrittenBytesPerSecond); + FilteredRate& FilteredWrittenBytesPerSecond, + UpdateFolderStream* OptionalUpdateFolderStream); void DownloadBuildBlob(uint32_t RemoteChunkIndex, const BlobsExistsResult& ExistsResult, @@ -324,14 +327,16 @@ private: const std::vector<ChunkedFolderContent>& ScavengedContents, const std::vector<ChunkedContentLookup>& ScavengedLookups, const std::vector<std::filesystem::path>& ScavengedPaths, - BufferedWriteFileCache& WriteCache); + BufferedWriteFileCache& WriteCache, + UpdateFolderStream* OptionalUpdateFolderStream); - bool WriteCompressedChunkToCache(const IoHash& ChunkHash, + bool WriteCompressedChunkToCache(const uint32_t RemoteChunkIndex, const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs, BufferedWriteFileCache& WriteCache, - IoBuffer&& CompressedPart); + IoBuffer&& CompressedPart, + UpdateFolderStream* OptionalUpdateFolderStream); - void StreamDecompress(const IoHash& SequenceRawHash, CompositeBuffer&& CompressedPart); + void StreamDecompress(uint32_t RemoteSequenceIndex, CompositeBuffer&& CompressedPart, UpdateFolderStream* OptionalUpdateFolderStream); void WriteSequenceChunkToCache(BufferedWriteFileCache::Local& LocalWriter, const CompositeBuffer& Chunk, @@ -351,14 +356,16 @@ private: void WriteBlockChunkOpsToCache(std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, const BlockWriteOps& Ops, BufferedWriteFileCache& WriteCache, - ParallelWork& Work); + ParallelWork& Work, + UpdateFolderStream* OptionalUpdateFolderStream); bool WriteChunksBlockToCache(const ChunkBlockDescription& BlockDescription, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, ParallelWork& Work, CompositeBuffer&& BlockBuffer, std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, - BufferedWriteFileCache& WriteCache); + BufferedWriteFileCache& WriteCache, + UpdateFolderStream* OptionalUpdateFolderStream); bool WritePartialBlockChunksToCache(const ChunkBlockDescription& BlockDescription, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, @@ -367,7 +374,8 @@ private: uint32_t FirstIncludedBlockChunkIndex, uint32_t LastIncludedBlockChunkIndex, std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, - BufferedWriteFileCache& WriteCache); + BufferedWriteFileCache& WriteCache, + UpdateFolderStream* OptionalUpdateFolderStream); void AsyncWriteDownloadedChunk(const std::filesystem::path& ZenFolderPath, uint32_t RemoteChunkIndex, @@ -379,15 +387,18 @@ private: std::atomic<uint64_t>& WritePartsComplete, const uint64_t TotalPartWriteCount, FilteredRate& FilteredWrittenBytesPerSecond, - bool EnableBacklog); + bool EnableBacklog, + UpdateFolderStream* OptionalUpdateFolderStream); - void VerifyAndCompleteChunkSequencesAsync(std::span<const uint32_t> RemoteSequenceIndexes, ParallelWork& Work); + void VerifyAndCompleteChunkSequencesAsync(std::span<const uint32_t> RemoteSequenceIndexes, + ParallelWork& Work, + UpdateFolderStream* OptionalUpdateFolderStream); bool CompleteSequenceChunk(uint32_t RemoteSequenceIndex, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters); std::vector<uint32_t> CompleteChunkTargets(const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters); - void FinalizeChunkSequence(const IoHash& SequenceRawHash); - void FinalizeChunkSequences(std::span<const uint32_t> RemoteSequenceIndexes); - void VerifySequence(uint32_t RemoteSequenceIndex); + void FinalizeChunkSequence(uint32_t RemoteSequenceIndex, UpdateFolderStream* OptionalUpdateFolderStream); + void FinalizeChunkSequences(std::span<const uint32_t> RemoteSequenceIndexes, UpdateFolderStream* OptionalUpdateFolderStream); + void VerifySequence(uint32_t RemoteSequenceIndex); OperationLogOutput& m_LogOutput; StorageInstance& m_Storage; diff --git a/src/zenremotestore/include/zenremotestore/builds/updatefolderstream.h b/src/zenremotestore/include/zenremotestore/builds/updatefolderstream.h new file mode 100644 index 000000000..6fe2188cd --- /dev/null +++ b/src/zenremotestore/include/zenremotestore/builds/updatefolderstream.h @@ -0,0 +1,226 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/basicfile.h> +#include <zencore/timer.h> + +#include <zenremotestore/chunking/chunkblock.h> +#include <zenremotestore/chunking/chunkedcontent.h> + +namespace zen { + +class UpdateFolderStream +{ +public: + virtual ~UpdateFolderStream() {} + + struct Sequence + { + uint32_t RemoteSequenceIndex; + }; + struct Chunk + { + uint32_t RemoteChunkIndex; + }; + struct Block + { + uint32_t BlockIndex; + }; + struct BlockRange + { + uint32_t BlockIndex; + uint32_t RangeStart; + uint32_t RangeLength; + }; + + virtual void Initialize(const ChunkedFolderContent& RemoteContent, + std::span<const ChunkBlockDescription> BlockDescriptions, + std::span<const Chunk> DownloadChunks, + std::span<const BlockRange> DownloadBlockRanges, + std::span<const Block> DownloadBlocks, + std::span<const Sequence> SequencesToWrite) = 0; + + virtual void DownloadedChunk(uint32_t RemoteChunkIndex) = 0; + virtual void DownloadedBlockRange(uint32_t BlockIndex, uint32_t RangeStart, uint32_t RangeLength) = 0; + virtual void DownloadedBlock(uint32_t BlockIndex) = 0; + + virtual void WroteSequence(uint32_t RemoteSequenceIndex, uint64_t Offset, uint64_t Length) = 0; + + virtual void CompletedBlockRange(uint32_t BlockIndex, uint32_t RangeStart, uint32_t RangeLength) = 0; + virtual void CompletedBlock(uint32_t BlockIndex) = 0; + + virtual void CompletedSequence(uint32_t RemoteSequenceIndex) = 0; + + virtual void WriteComplete() = 0; +}; + +std::unique_ptr<UpdateFolderStream> CreateDiskUpdateFolderStream(const std::filesystem::path& OutputPath); + +struct ChunkSequenceStatus +{ + struct Range + { + uint64_t Offset = 0; + uint64_t Length = 0; + }; + std::vector<Range> CompletedRanges; + uint64_t Size = 0; + uint32_t NormalizedSize = 0; + bool Completed = false; + + uint64_t CompletedSize() const; + + void AddCompletedRange(const Range& InRange); + + void RemoveCompletedRange(const Range& InRange); +}; + +class UpdateFolderDiskStream +{ + using Sequence = UpdateFolderStream::Sequence; + using Chunk = UpdateFolderStream::Chunk; + using Block = UpdateFolderStream::Block; + using BlockRange = UpdateFolderStream::BlockRange; + +public: +#pragma pack(push) +#pragma pack(4) + struct Event + { + enum class EventType : uint8_t + { + DownloadedChunk, + DownloadedBlockRange, + DownloadedBlock, + + WroteSequence, + + CompletedBlockRange, + CompletedBlock, + + CompletedSequence, + + WriteComplete + }; + + struct DownloadedChunkEvent + { + uint32_t RemoteChunkIndex; + }; + + struct DownloadedBlockRangeEvent + { + uint32_t BlockIndex; + uint32_t RangeStart; + uint32_t RangeLength; + }; + + struct DownloadedBlockEvent + { + uint32_t BlockIndex; + }; + + struct WroteSequenceEvent + { + uint32_t RemoteSequenceIndex; + uint64_t Offset; + uint64_t Length; + }; + + struct CompletedBlockRangeEvent + { + uint32_t BlockIndex; + uint32_t RangeStart; + uint32_t RangeLength; + }; + + struct CompletedBlockEvent + { + uint32_t BlockIndex; + }; + + struct CompletedSequenceEvent + { + uint32_t RemoteSequenceIndex; + }; + +#pragma pack(push) +#pragma pack(1) + struct EventHeader + { + EventType EventType; + uint8_t TimestampUsHigh; + uint32_t TimestampUsLow; + uint16_t ThreadIndex; + + inline uint64_t TimestampUs() const { return (((uint64_t)TimestampUsHigh) << 32) | TimestampUsLow; } + }; +#pragma pack(pop) + + EventHeader Header; + + static EventHeader MakeHeader(Event::EventType Type, uint64_t TimeStampUs, uint16_t ThreadIndex); + + union EventPayload + { + DownloadedChunkEvent DownloadedChunk; + DownloadedBlockRangeEvent DownloadedBlockRange; + DownloadedBlockEvent DownloadedBlock; + + WroteSequenceEvent WroteSequence; + + CompletedBlockRangeEvent CompletedBlockRange; + CompletedBlockEvent CompletedBlock; + CompletedSequenceEvent CompletedSequence; + }; + EventPayload Payload; + }; +#pragma pack(pop) + static_assert(sizeof(Event) == 28u); + + UpdateFolderDiskStream(const std::filesystem::path& OutputPath); + + virtual ~UpdateFolderDiskStream(); + + Event MakeEvent(Event::EventType Type, Event::EventPayload Payload); + + void WriteEvent(const Event& InEvent); + + void PrepareWrite(const ChunkedFolderContent& RemoteContent, + std::span<const ChunkBlockDescription> BlockDescriptions, + std::span<const Chunk> DownloadChunks, + std::span<const BlockRange> DownloadBlockRanges, + std::span<const Block> DownloadBlocks, + std::span<const Sequence> SequencesToWrite); + + void EndWrite(); + + static void Read(const std::filesystem::path& InputPath, + ChunkedFolderContent& OutRemoteContent, + std::vector<ChunkBlockDescription>& OutBlockDescriptions, + std::vector<Chunk>& OutDownloadChunks, + std::vector<BlockRange>& OutDownloadBlockRanges, + std::vector<Block>& OutDownloadBlocks, + std::vector<Sequence>& OutSequencesToWrite, + std::vector<Event>& OutEvents); + + static void Replay(const std::filesystem::path& InputPath, UpdateFolderStream& UpdateFolderStream); + +private: + uint16_t GetThreadIndex(int ThreadId); + uint16_t GetCurrentThreadIndex(); + + tsl::robin_map<int, uint16_t> m_ThreadLookup; + + RwLock m_UpdateLock; + + const std::filesystem::path m_OutputPath; + BasicFile m_Output; + std::atomic<uint64_t> m_WriteOffset; + std::atomic<uint64_t> m_EventCount; + + Stopwatch m_Stopwatch; +}; + +} // namespace zen |