diff options
| author | Dan Engelbrecht <[email protected]> | 2026-02-10 11:03:44 +0100 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2026-02-10 11:03:44 +0100 |
| commit | e4c699ee36a070c9721fde2a41e5fc6cef730870 (patch) | |
| tree | 8f8462e185608d3afabf30493ff274d75a25f198 /src | |
| parent | zen view command to present event recordings such as builds download in a UI (diff) | |
| download | zen-e4c699ee36a070c9721fde2a41e5fc6cef730870.tar.xz zen-e4c699ee36a070c9721fde2a41e5fc6cef730870.zip | |
refactor/cleanup
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 44 | ||||
| -rw-r--r-- | src/zen/cmds/view_cmd.cpp | 48 | ||||
| -rw-r--r-- | src/zenremotestore/builds/buildstorageoperations.cpp | 341 | ||||
| -rw-r--r-- | src/zenremotestore/builds/updatefoldereventdiskstream.cpp (renamed from src/zenremotestore/builds/updatefolderstream.cpp) | 206 | ||||
| -rw-r--r-- | src/zenremotestore/builds/updatefoldereventstream.cpp | 174 | ||||
| -rw-r--r-- | src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h | 42 | ||||
| -rw-r--r-- | src/zenremotestore/include/zenremotestore/builds/updatefoldereventdiskstream.h (renamed from src/zenremotestore/include/zenremotestore/builds/updatefolderstream.h) | 68 | ||||
| -rw-r--r-- | src/zenremotestore/include/zenremotestore/builds/updatefoldereventstream.h | 61 |
8 files changed, 495 insertions, 489 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index 2e621608e..1f2166d25 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -32,7 +32,7 @@ #include <zenremotestore/builds/buildstorageutil.h> #include <zenremotestore/builds/filebuildstorage.h> #include <zenremotestore/builds/jupiterbuildstorage.h> -#include <zenremotestore/builds/updatefolderstream.h> +#include <zenremotestore/builds/updatefoldereventstream.h> #include <zenremotestore/chunking/chunkblock.h> #include <zenremotestore/chunking/chunkedcontent.h> #include <zenremotestore/chunking/chunkedfile.h> @@ -1474,6 +1474,8 @@ namespace { ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::Download, TaskSteps::StepCount); + std::unique_ptr<UpdateFolderEventStream> EventStream = CreateDiskUpdateFolderEventStream("D:\\Temp\\download_log.zrb"); + BuildsOperationUpdateFolder Updater( Output, Storage, @@ -1490,35 +1492,35 @@ namespace { BlockDescriptions, LooseChunkHashes, BuildsOperationUpdateFolder::Options{ - .IsQuiet = IsQuiet, - .IsVerbose = IsVerbose, - .AllowFileClone = Options.AllowFileClone, - .UseSparseFiles = UseSparseFiles, - .SystemRootDir = Options.SystemRootDir, - .ZenFolderPath = Options.ZenFolderPath, - .LargeAttachmentSize = LargeAttachmentSize, - .PreferredMultipartChunkSize = PreferredMultipartChunkSize, - .PartialBlockRequestMode = Options.PartialBlockRequestMode, - .WipeTargetFolder = Options.CleanTargetFolder, - .PrimeCacheOnly = Options.PrimeCacheOnly, - .EnableOtherDownloadsScavenging = Options.EnableOtherDownloadsScavenging, - .EnableTargetFolderScavenging = Options.EnableTargetFolderScavenging || Options.AppendNewContent, - .ValidateCompletedSequences = Options.PostDownloadVerify, - .ExcludeFolders = Options.ExcludeFolders, - .MaximumInMemoryPayloadSize = Options.MaximumInMemoryPayloadSize, - .PopulateCache = Options.PopulateCache}); + .IsQuiet = IsQuiet, + .IsVerbose = IsVerbose, + .AllowFileClone = Options.AllowFileClone, + .UseSparseFiles = UseSparseFiles, + .SystemRootDir = Options.SystemRootDir, + .ZenFolderPath = Options.ZenFolderPath, + .LargeAttachmentSize = LargeAttachmentSize, + .PreferredMultipartChunkSize = PreferredMultipartChunkSize, + .PartialBlockRequestMode = Options.PartialBlockRequestMode, + .WipeTargetFolder = Options.CleanTargetFolder, + .PrimeCacheOnly = Options.PrimeCacheOnly, + .EnableOtherDownloadsScavenging = Options.EnableOtherDownloadsScavenging, + .EnableTargetFolderScavenging = Options.EnableTargetFolderScavenging || Options.AppendNewContent, + .ValidateCompletedSequences = Options.PostDownloadVerify, + .ExcludeFolders = Options.ExcludeFolders, + .MaximumInMemoryPayloadSize = Options.MaximumInMemoryPayloadSize, + .PopulateCache = Options.PopulateCache, + .OptionalUpdateFolderEventStream = EventStream.get()}); { ProgressBar::PushLogOperation(ProgressMode, "Download"); auto _ = MakeGuard([]() { ProgressBar::PopLogOperation(ProgressMode); }); - std::unique_ptr<UpdateFolderStream> Feedback = CreateDiskUpdateFolderStream("D:\\Temp\\download_log.zrb"); - FolderContent UpdatedLocalFolderState; - Updater.Execute(UpdatedLocalFolderState, Feedback.get()); + Updater.Execute(UpdatedLocalFolderState); LocalState.State.ChunkedContent = RemoteContent; LocalState.FolderState = std::move(UpdatedLocalFolderState); } + EventStream.reset(); VerifyFolderStatistics VerifyFolderStats; if (!AbortFlag) diff --git a/src/zen/cmds/view_cmd.cpp b/src/zen/cmds/view_cmd.cpp index c8bece8c3..8d81a6680 100644 --- a/src/zen/cmds/view_cmd.cpp +++ b/src/zen/cmds/view_cmd.cpp @@ -7,7 +7,7 @@ #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/scopeguard.h> -#include <zenremotestore/builds/updatefolderstream.h> +#include <zenremotestore/builds/updatefoldereventdiskstream.h> #include "../imgui_ui.h" #include "imgui.h" @@ -30,7 +30,7 @@ namespace { virtual void SetCurrentTimeUs(uint64_t CurrentUs) = 0; }; - class UpdateFolderDiskStreamUI : public ViewUI + class UpdateFolderEventDiskStreamUI : public ViewUI { public: virtual std::string Title() override { return "Update Folder"; } @@ -41,20 +41,20 @@ namespace { m_InputReady = false; m_InputPath = InputPath; } - UpdateFolderDiskStream::Read(InputPath, - m_RemoteContent, - m_BlockDescriptions, - m_DownloadChunks, - m_DownloadBlockRanges, - m_DownloadBlocks, - m_SequencesToWrite, - m_Events); + UpdateFolderEventDiskStream::Read(InputPath, + m_RemoteContent, + m_BlockDescriptions, + m_DownloadChunks, + m_DownloadBlockRanges, + m_DownloadBlocks, + m_SequencesToWrite, + m_Events); if (!m_Events.empty()) { std::sort(m_Events.begin(), m_Events.end(), - [](const UpdateFolderDiskStream::Event& Lhs, const UpdateFolderDiskStream::Event& Rhs) { + [](const UpdateFolderEventDiskStream::Event& Lhs, const UpdateFolderEventDiskStream::Event& Rhs) { const uint64_t TimeStampLshUs = Lhs.Header.TimestampUs(); const uint64_t TimeStampRshUs = Rhs.Header.TimestampUs(); if (TimeStampLshUs < TimeStampRshUs) @@ -96,7 +96,7 @@ namespace { const uint64_t Size = RawHashToRawSize.at(m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); m_RenderSequenceStatues.push_back(ChunkSequenceStatus{.Size = Size, .Completed = true}); } - for (const UpdateFolderStream::Sequence& SequenceToWrite : m_SequencesToWrite) + for (const UpdateFolderEventStream::Sequence& SequenceToWrite : m_SequencesToWrite) { m_RenderSequenceStatues[SequenceToWrite.RemoteSequenceIndex].Completed = false; } @@ -196,11 +196,11 @@ namespace { } } - void UpdateStateWithEvent(const UpdateFolderDiskStream::Event& Event) + void UpdateStateWithEvent(const UpdateFolderEventDiskStream::Event& Event) { switch (Event.Header.EventType) { - case UpdateFolderDiskStream::Event::EventType::WroteSequence: + case UpdateFolderEventDiskStream::Event::EventType::WroteSequence: { ChunkSequenceStatus& Sequence = m_RenderSequenceStatues[Event.Payload.WroteSequence.RemoteSequenceIndex]; ZEN_ASSERT(!Sequence.Completed); @@ -208,7 +208,7 @@ namespace { ChunkSequenceStatus::Range{Event.Payload.WroteSequence.Offset, Event.Payload.WroteSequence.Length}); } break; - case UpdateFolderDiskStream::Event::EventType::CompletedSequence: + case UpdateFolderEventDiskStream::Event::EventType::CompletedSequence: { ChunkSequenceStatus& Sequence = m_RenderSequenceStatues[Event.Payload.CompletedSequence.RemoteSequenceIndex]; ZEN_ASSERT(!Sequence.Completed); @@ -238,7 +238,7 @@ namespace { Sequence.Completed = true; } break; - case UpdateFolderDiskStream::Event::EventType::WriteComplete: + case UpdateFolderEventDiskStream::Event::EventType::WriteComplete: break; default: // TODO @@ -255,13 +255,13 @@ namespace { Stopwatch m_RealtimePlaybackStartUs; - ChunkedFolderContent m_RemoteContent; - std::vector<ChunkBlockDescription> m_BlockDescriptions; - std::vector<UpdateFolderStream::Chunk> m_DownloadChunks; - std::vector<UpdateFolderStream::BlockRange> m_DownloadBlockRanges; - std::vector<UpdateFolderStream::Block> m_DownloadBlocks; - std::vector<UpdateFolderStream::Sequence> m_SequencesToWrite; - std::vector<UpdateFolderDiskStream::Event> m_Events; + ChunkedFolderContent m_RemoteContent; + std::vector<ChunkBlockDescription> m_BlockDescriptions; + std::vector<UpdateFolderEventStream::Chunk> m_DownloadChunks; + std::vector<UpdateFolderEventStream::BlockRange> m_DownloadBlockRanges; + std::vector<UpdateFolderEventStream::Block> m_DownloadBlocks; + std::vector<UpdateFolderEventStream::Sequence> m_SequencesToWrite; + std::vector<UpdateFolderEventDiskStream::Event> m_Events; std::vector<ChunkSequenceStatus> m_RenderSequenceStatues; }; @@ -306,7 +306,7 @@ ViewCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) std::filesystem::path AbsInputStreamPath = MakeSafeAbsolutePath(m_InputStream); if (ToLower(AbsInputStreamPath.extension().string()) == ".zrb") { - UI = std::make_unique<UpdateFolderDiskStreamUI>(); + UI = std::make_unique<UpdateFolderEventDiskStreamUI>(); } if (!UI) diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp index 52062f6ff..3a9f2f92f 100644 --- a/src/zenremotestore/builds/buildstorageoperations.cpp +++ b/src/zenremotestore/builds/buildstorageoperations.cpp @@ -8,7 +8,7 @@ #include <zenremotestore/builds/buildstorage.h> #include <zenremotestore/builds/buildstoragecache.h> #include <zenremotestore/builds/buildstorageutil.h> -#include <zenremotestore/builds/updatefolderstream.h> +#include <zenremotestore/builds/updatefoldereventstream.h> #include <zenremotestore/chunking/chunkblock.h> #include <zenremotestore/chunking/chunkingcache.h> #include <zenremotestore/chunking/chunkingcontroller.h> @@ -553,10 +553,8 @@ BuildsOperationUpdateFolder::BuildsOperationUpdateFolder(OperationLogOutput& } void -BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateFolderStream* OptionalUpdateFolderStream) +BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) { - ZEN_UNUSED(OptionalUpdateFolderStream); - ZEN_TRACE_CPU("BuildsOperationUpdateFolder::Execute"); try { @@ -1247,53 +1245,53 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF } } - if (OptionalUpdateFolderStream) + if (m_Options.OptionalUpdateFolderEventStream) { - std::vector<UpdateFolderStream::Chunk> DownloadChunks; + std::vector<UpdateFolderEventStream::Chunk> DownloadChunks; DownloadChunks.reserve(LooseChunkHashWorks.size()); for (const LooseChunkHashWorkData& LooseChunkHashWork : LooseChunkHashWorks) { - DownloadChunks.push_back(UpdateFolderStream::Chunk{.RemoteChunkIndex = LooseChunkHashWork.RemoteChunkIndex}); + DownloadChunks.push_back(UpdateFolderEventStream::Chunk{.RemoteChunkIndex = LooseChunkHashWork.RemoteChunkIndex}); } - std::vector<UpdateFolderStream::BlockRange> DownloadBlockRanges; + std::vector<UpdateFolderEventStream::BlockRange> DownloadBlockRanges; DownloadBlockRanges.reserve(BlockRangeWorks.size()); for (const BlockRangeDescriptor& BlockRange : BlockRangeWorks) { DownloadBlockRanges.push_back( - UpdateFolderStream::BlockRange{.BlockIndex = BlockRange.BlockIndex, - .RangeStart = gsl::narrow<uint32_t>(BlockRange.RangeStart), - .RangeLength = gsl::narrow<uint32_t>(BlockRange.RangeLength)}); + UpdateFolderEventStream::BlockRange{.BlockIndex = BlockRange.BlockIndex, + .RangeStart = gsl::narrow<uint32_t>(BlockRange.RangeStart), + .RangeLength = gsl::narrow<uint32_t>(BlockRange.RangeLength)}); } - std::vector<UpdateFolderStream::Block> DownloadBlocks; + std::vector<UpdateFolderEventStream::Block> DownloadBlocks; for (uint32_t BlockIndex : FullBlockWorks) { - DownloadBlocks.push_back(UpdateFolderStream::Block{.BlockIndex = BlockIndex}); + DownloadBlocks.push_back(UpdateFolderEventStream::Block{.BlockIndex = BlockIndex}); } - std::vector<UpdateFolderStream::Sequence> SequencesToWrite; + std::vector<UpdateFolderEventStream::Sequence> SequencesToWrite; for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < SequenceIndexChunksLeftToWriteCounters.size(); RemoteSequenceIndex++) { if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() > 0) { - SequencesToWrite.push_back(UpdateFolderStream::Sequence{.RemoteSequenceIndex = RemoteSequenceIndex}); + SequencesToWrite.push_back(UpdateFolderEventStream::Sequence{.RemoteSequenceIndex = RemoteSequenceIndex}); } } for (const ScavengedSequenceCopyOperation& ScaveneCopyOp : ScavengedSequenceCopyOperations) { - SequencesToWrite.push_back(UpdateFolderStream::Sequence{ScaveneCopyOp.RemoteSequenceIndex}); + SequencesToWrite.push_back(UpdateFolderEventStream::Sequence{ScaveneCopyOp.RemoteSequenceIndex}); } std::sort(SequencesToWrite.begin(), SequencesToWrite.end(), - [](const UpdateFolderStream::Sequence& Lhs, const UpdateFolderStream::Sequence& Rhs) { + [](const UpdateFolderEventStream::Sequence& Lhs, const UpdateFolderEventStream::Sequence& Rhs) { return Lhs.RemoteSequenceIndex < Rhs.RemoteSequenceIndex; }); - OptionalUpdateFolderStream->Initialize(m_RemoteContent, - m_BlockDescriptions, - DownloadChunks, - DownloadBlockRanges, - DownloadBlocks, - SequencesToWrite); + m_Options.OptionalUpdateFolderEventStream->Initialize(m_RemoteContent, + m_BlockDescriptions, + DownloadChunks, + DownloadBlockRanges, + DownloadBlocks, + SequencesToWrite); } BufferedWriteFileCache WriteCache; @@ -1315,8 +1313,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF &FilteredWrittenBytesPerSecond, ScavengeOpIndex, &WritePartsComplete, - TotalPartWriteCount, - OptionalUpdateFolderStream](std::atomic<bool>&) mutable { + TotalPartWriteCount](std::atomic<bool>&) mutable { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_WriteScavenged"); @@ -1329,10 +1326,12 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF WriteScavengedSequenceToCache(ScavengeRootPath, ScavengedContent, ScavengeOp); - if (OptionalUpdateFolderStream) + if (m_Options.OptionalUpdateFolderEventStream) { - OptionalUpdateFolderStream->WroteSequence(ScavengeOp.RemoteSequenceIndex, 0, ScavengeOp.RawSize); - OptionalUpdateFolderStream->CompletedSequence(ScavengeOp.RemoteSequenceIndex); + m_Options.OptionalUpdateFolderEventStream->WroteSequence(ScavengeOp.RemoteSequenceIndex, + 0, + ScavengeOp.RawSize); + m_Options.OptionalUpdateFolderEventStream->CompletedSequence(ScavengeOp.RemoteSequenceIndex); } WritePartsComplete++; @@ -1375,8 +1374,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF TotalPartWriteCount, &WriteCache, &FilteredDownloadedBytesPerSecond, - &FilteredWrittenBytesPerSecond, - OptionalUpdateFolderStream](std::atomic<bool>&) mutable { + &FilteredWrittenBytesPerSecond](std::atomic<bool>&) mutable { ZEN_TRACE_CPU("Async_ReadPreDownloadedChunk"); if (!m_AbortFlag) { @@ -1392,8 +1390,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF TotalRequestCount, TotalPartWriteCount, FilteredDownloadedBytesPerSecond, - FilteredWrittenBytesPerSecond, - OptionalUpdateFolderStream); + FilteredWrittenBytesPerSecond); } }, WorkerThreadPool::EMode::EnableBacklog); @@ -1413,59 +1410,56 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF break; } - Work.ScheduleWork( - m_IOWorkerPool, - [this, - &CloneQuery, - &SequenceIndexChunksLeftToWriteCounters, - &WriteCache, - &Work, - &FilteredWrittenBytesPerSecond, - &CopyChunkDatas, - &ScavengedContents, - &ScavengedLookups, - &ScavengedPaths, - &WritePartsComplete, - TotalPartWriteCount, - CopyDataIndex, - OptionalUpdateFolderStream](std::atomic<bool>&) { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Async_CopyLocal"); - - FilteredWrittenBytesPerSecond.Start(); - const CopyChunkData& CopyData = CopyChunkDatas[CopyDataIndex]; - - std::vector<uint32_t> WrittenSequenceIndexes = WriteLocalChunkToCache(CloneQuery.get(), - CopyData, - ScavengedContents, - ScavengedLookups, - ScavengedPaths, - WriteCache, - OptionalUpdateFolderStream); - - WritePartsComplete++; - if (!m_AbortFlag) - { - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); - } + Work.ScheduleWork(m_IOWorkerPool, + [this, + &CloneQuery, + &SequenceIndexChunksLeftToWriteCounters, + &WriteCache, + &Work, + &FilteredWrittenBytesPerSecond, + &CopyChunkDatas, + &ScavengedContents, + &ScavengedLookups, + &ScavengedPaths, + &WritePartsComplete, + TotalPartWriteCount, + CopyDataIndex](std::atomic<bool>&) { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_CopyLocal"); + + FilteredWrittenBytesPerSecond.Start(); + const CopyChunkData& CopyData = CopyChunkDatas[CopyDataIndex]; + + std::vector<uint32_t> WrittenSequenceIndexes = WriteLocalChunkToCache(CloneQuery.get(), + CopyData, + ScavengedContents, + ScavengedLookups, + ScavengedPaths, + WriteCache); + + WritePartsComplete++; + if (!m_AbortFlag) + { + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } - // Write tracking, updating this must be done without any files open - std::vector<uint32_t> CompletedChunkSequences; - for (uint32_t RemoteSequenceIndex : WrittenSequenceIndexes) - { - if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) - { - CompletedChunkSequences.push_back(RemoteSequenceIndex); - } - } - WriteCache.Close(CompletedChunkSequences); - VerifyAndCompleteChunkSequencesAsync(CompletedChunkSequences, Work, OptionalUpdateFolderStream); - } - } - }); + // Write tracking, updating this must be done without any files open + std::vector<uint32_t> CompletedChunkSequences; + for (uint32_t RemoteSequenceIndex : WrittenSequenceIndexes) + { + if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) + { + CompletedChunkSequences.push_back(RemoteSequenceIndex); + } + } + WriteCache.Close(CompletedChunkSequences); + VerifyAndCompleteChunkSequencesAsync(CompletedChunkSequences, Work); + } + } + }); } for (uint32_t BlockIndex : CachedChunkBlockIndexes) @@ -1486,8 +1480,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF &FilteredWrittenBytesPerSecond, &WritePartsComplete, TotalPartWriteCount, - BlockIndex, - OptionalUpdateFolderStream](std::atomic<bool>&) mutable { + BlockIndex](std::atomic<bool>&) mutable { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_WriteCachedBlock"); @@ -1510,16 +1503,15 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF Work, CompositeBuffer(std::move(BlockBuffer)), RemoteChunkIndexNeedsCopyFromSourceFlags, - WriteCache, - OptionalUpdateFolderStream)) + WriteCache)) { std::error_code DummyEc; RemoveFile(BlockChunkPath, DummyEc); throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash)); } - if (OptionalUpdateFolderStream) + if (m_Options.OptionalUpdateFolderEventStream) { - OptionalUpdateFolderStream->CompletedBlock(BlockIndex); + m_Options.OptionalUpdateFolderEventStream->CompletedBlock(BlockIndex); } std::error_code Ec = TryRemoveFile(BlockChunkPath); @@ -1564,8 +1556,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF &FilteredWrittenBytesPerSecond, &Work, &BlockRangeWorks, - BlockRangeIndex, - OptionalUpdateFolderStream](std::atomic<bool>&) { + BlockRangeIndex](std::atomic<bool>&) { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_GetPartialBlock"); @@ -1587,19 +1578,19 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF TotalPartWriteCount, &FilteredDownloadedBytesPerSecond, &FilteredWrittenBytesPerSecond, - &BlockRange, - OptionalUpdateFolderStream](IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath) { + &BlockRange](IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath) { if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) { FilteredDownloadedBytesPerSecond.Stop(); } - if (OptionalUpdateFolderStream) + if (m_Options.OptionalUpdateFolderEventStream) { const uint32_t BlockIndex = BlockRange.BlockIndex; - OptionalUpdateFolderStream->DownloadedBlockRange(BlockIndex, - gsl::narrow<uint32_t>(BlockRange.RangeStart), - gsl::narrow<uint32_t>(BlockRange.RangeLength)); + m_Options.OptionalUpdateFolderEventStream->DownloadedBlockRange( + BlockIndex, + gsl::narrow<uint32_t>(BlockRange.RangeStart), + gsl::narrow<uint32_t>(BlockRange.RangeLength)); } if (!m_AbortFlag) @@ -1616,8 +1607,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF &FilteredWrittenBytesPerSecond, &BlockRange, BlockChunkPath = std::filesystem::path(OnDiskPath), - BlockPartialBuffer = std::move(InMemoryBuffer), - OptionalUpdateFolderStream](std::atomic<bool>&) mutable { + BlockPartialBuffer = std::move(InMemoryBuffer)](std::atomic<bool>&) mutable { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_WritePartialBlock"); @@ -1645,7 +1635,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF FilteredWrittenBytesPerSecond.Start(); - // TODO: Add OptionalUpdateFolderStream calls for chunks / chunk sequences if (!WritePartialBlockChunksToCache( BlockDescription, SequenceIndexChunksLeftToWriteCounters, @@ -1654,8 +1643,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF BlockRange.ChunkBlockIndexStart, BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1, RemoteChunkIndexNeedsCopyFromSourceFlags, - WriteCache, - OptionalUpdateFolderStream)) + WriteCache)) { std::error_code DummyEc; RemoveFile(BlockChunkPath, DummyEc); @@ -1673,9 +1661,9 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF Ec.message()); } - if (OptionalUpdateFolderStream) + if (m_Options.OptionalUpdateFolderEventStream) { - OptionalUpdateFolderStream->CompletedBlockRange( + m_Options.OptionalUpdateFolderEventStream->CompletedBlockRange( BlockIndex, gsl::narrow<uint32_t>(BlockRange.RangeStart), gsl::narrow<uint32_t>(BlockRange.RangeLength)); @@ -1722,8 +1710,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF &SequenceIndexChunksLeftToWriteCounters, &FilteredDownloadedBytesPerSecond, TotalRequestCount, - BlockIndex, - OptionalUpdateFolderStream](std::atomic<bool>&) { + BlockIndex](std::atomic<bool>&) { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_GetFullBlock"); @@ -1755,9 +1742,9 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); } - if (OptionalUpdateFolderStream) + if (m_Options.OptionalUpdateFolderEventStream) { - OptionalUpdateFolderStream->DownloadedBlock(BlockIndex); + m_Options.OptionalUpdateFolderEventStream->DownloadedBlock(BlockIndex); } if (!m_AbortFlag) @@ -1826,8 +1813,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF TotalPartWriteCount, &FilteredWrittenBytesPerSecond, BlockChunkPath, - BlockBuffer = std::move(BlockBuffer), - OptionalUpdateFolderStream](std::atomic<bool>&) mutable { + BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_WriteFullBlock"); @@ -1858,8 +1844,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF Work, CompositeBuffer(std::move(BlockBuffer)), RemoteChunkIndexNeedsCopyFromSourceFlags, - WriteCache, - OptionalUpdateFolderStream)) + WriteCache)) { std::error_code DummyEc; RemoveFile(BlockChunkPath, DummyEc); @@ -1880,9 +1865,9 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF } } - if (OptionalUpdateFolderStream) + if (m_Options.OptionalUpdateFolderEventStream) { - OptionalUpdateFolderStream->CompletedBlock(BlockIndex); + m_Options.OptionalUpdateFolderEventStream->CompletedBlock(BlockIndex); } WritePartsComplete++; @@ -1968,9 +1953,9 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF WriteProgressBar.Finish(); - if (OptionalUpdateFolderStream) + if (m_Options.OptionalUpdateFolderEventStream) { - OptionalUpdateFolderStream->WriteComplete(); + m_Options.OptionalUpdateFolderEventStream->WriteComplete(); } if (m_AbortFlag) @@ -3177,9 +3162,8 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd ParallelWork& Work, uint64_t TotalRequestCount, uint64_t TotalPartWriteCount, - FilteredRate& FilteredDownloadedBytesPerSecond, - FilteredRate& FilteredWrittenBytesPerSecond, - UpdateFolderStream* OptionalUpdateFolderStream) + FilteredRate& FilteredDownloadedBytesPerSecond, + FilteredRate& FilteredWrittenBytesPerSecond) { std::filesystem::path ExistingCompressedChunkPath; if (!m_Options.PrimeCacheOnly) @@ -3209,7 +3193,6 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd TotalPartWriteCount, &FilteredWrittenBytesPerSecond, RemoteChunkIndex, - OptionalUpdateFolderStream, ChunkTargetPtrs = std::move(ChunkTargetPtrs), CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>& AbortFlag) mutable { if (!AbortFlag) @@ -3227,11 +3210,8 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd fmt::format("Could not open dowloaded compressed chunk {} from {}", ChunkHash, CompressedChunkPath)); } - bool NeedHashVerify = WriteCompressedChunkToCache(RemoteChunkIndex, - ChunkTargetPtrs, - WriteCache, - std::move(CompressedPart), - OptionalUpdateFolderStream); + bool NeedHashVerify = + WriteCompressedChunkToCache(RemoteChunkIndex, ChunkTargetPtrs, WriteCache, std::move(CompressedPart)); WritePartsComplete++; if (!AbortFlag) @@ -3256,11 +3236,11 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd WriteCache.Close(CompletedSequences); if (NeedHashVerify) { - VerifyAndCompleteChunkSequencesAsync(CompletedSequences, Work, OptionalUpdateFolderStream); + VerifyAndCompleteChunkSequencesAsync(CompletedSequences, Work); } else { - FinalizeChunkSequences(CompletedSequences, OptionalUpdateFolderStream); + FinalizeChunkSequences(CompletedSequences); } } } @@ -3277,7 +3257,6 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd &WritePartsComplete, TotalPartWriteCount, TotalRequestCount, - OptionalUpdateFolderStream, &FilteredDownloadedBytesPerSecond, &FilteredWrittenBytesPerSecond, RemoteChunkIndex, @@ -3302,7 +3281,6 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd RemoteChunkIndex, &FilteredDownloadedBytesPerSecond, &FilteredWrittenBytesPerSecond, - OptionalUpdateFolderStream, ChunkTargetPtrs = std::move(ChunkTargetPtrs)](IoBuffer&& Payload) mutable { if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) { @@ -3320,8 +3298,7 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd WritePartsComplete, TotalPartWriteCount, FilteredWrittenBytesPerSecond, - EnableBacklog, - OptionalUpdateFolderStream); + EnableBacklog); }); } }); @@ -3760,8 +3737,7 @@ BuildsOperationUpdateFolder::WriteLocalChunkToCache(CloneQueryInterface* C const std::vector<ChunkedFolderContent>& ScavengedContents, const std::vector<ChunkedContentLookup>& ScavengedLookups, const std::vector<std::filesystem::path>& ScavengedPaths, - BufferedWriteFileCache& WriteCache, - UpdateFolderStream* OptionalUpdateFolderStream) + BufferedWriteFileCache& WriteCache) { ZEN_TRACE_CPU("WriteLocalChunkToCache"); @@ -3933,9 +3909,11 @@ BuildsOperationUpdateFolder::WriteLocalChunkToCache(CloneQueryInterface* C m_WrittenChunkByteCount += ClonableBytes; - if (OptionalUpdateFolderStream) + if (m_Options.OptionalUpdateFolderEventStream) { - OptionalUpdateFolderStream->WroteSequence(RemoteSequenceIndex, Op.Target->Offset + PreBytes, ClonableBytes); + m_Options.OptionalUpdateFolderEventStream->WroteSequence(RemoteSequenceIndex, + Op.Target->Offset + PreBytes, + ClonableBytes); } if (PreBytes > 0) @@ -3945,9 +3923,11 @@ BuildsOperationUpdateFolder::WriteLocalChunkToCache(CloneQueryInterface* C WriteSequenceChunkToCache(LocalWriter, ChunkSource, RemoteSequenceIndex, FileOffset, RemotePathIndex); - if (OptionalUpdateFolderStream) + if (m_Options.OptionalUpdateFolderEventStream) { - OptionalUpdateFolderStream->WroteSequence(RemoteSequenceIndex, FileOffset, ChunkSource.GetSize()); + m_Options.OptionalUpdateFolderEventStream->WroteSequence(RemoteSequenceIndex, + FileOffset, + ChunkSource.GetSize()); } } if (PostBytes > 0) @@ -3957,9 +3937,11 @@ BuildsOperationUpdateFolder::WriteLocalChunkToCache(CloneQueryInterface* C WriteSequenceChunkToCache(LocalWriter, ChunkSource, RemoteSequenceIndex, FileOffset, RemotePathIndex); - if (OptionalUpdateFolderStream) + if (m_Options.OptionalUpdateFolderEventStream) { - OptionalUpdateFolderStream->WroteSequence(RemoteSequenceIndex, FileOffset, ChunkSource.GetSize()); + m_Options.OptionalUpdateFolderEventStream->WroteSequence(RemoteSequenceIndex, + FileOffset, + ChunkSource.GetSize()); } } } @@ -3974,9 +3956,9 @@ BuildsOperationUpdateFolder::WriteLocalChunkToCache(CloneQueryInterface* C WriteSequenceChunkToCache(LocalWriter, ChunkSource, RemoteSequenceIndex, FileOffset, RemotePathIndex); - if (OptionalUpdateFolderStream) + if (m_Options.OptionalUpdateFolderEventStream) { - OptionalUpdateFolderStream->WroteSequence(RemoteSequenceIndex, FileOffset, ChunkSource.GetSize()); + m_Options.OptionalUpdateFolderEventStream->WroteSequence(RemoteSequenceIndex, FileOffset, ChunkSource.GetSize()); } } } @@ -4007,8 +3989,7 @@ BuildsOperationUpdateFolder::WriteCompressedChunkToCache( const uint32_t RemoteChunkIndex, const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs, BufferedWriteFileCache& WriteCache, - IoBuffer&& CompressedPart, - UpdateFolderStream* OptionalUpdateFolderStream) + IoBuffer&& CompressedPart) { ZEN_TRACE_CPU("WriteCompressedChunkToCache"); @@ -4017,7 +3998,7 @@ BuildsOperationUpdateFolder::WriteCompressedChunkToCache( if (IsSingleFileChunk(m_RemoteContent, ChunkTargetPtrs)) { const std::uint32_t SequenceIndex = ChunkTargetPtrs.front()->SequenceIndex; - StreamDecompress(SequenceIndex, CompositeBuffer(std::move(CompressedPart)), OptionalUpdateFolderStream); + StreamDecompress(SequenceIndex, CompositeBuffer(std::move(CompressedPart))); return false; } else @@ -4055,9 +4036,9 @@ BuildsOperationUpdateFolder::WriteCompressedChunkToCache( WriteSequenceChunkToCache(LocalWriter, RangeBuffer, SequenceIndex, FileOffset, PathIndex); - if (OptionalUpdateFolderStream) + if (m_Options.OptionalUpdateFolderEventStream) { - OptionalUpdateFolderStream->WroteSequence(SequenceIndex, FileOffset, RangeBuffer.GetSize()); + m_Options.OptionalUpdateFolderEventStream->WroteSequence(SequenceIndex, FileOffset, RangeBuffer.GetSize()); } } @@ -4081,9 +4062,7 @@ BuildsOperationUpdateFolder::WriteCompressedChunkToCache( } void -BuildsOperationUpdateFolder::StreamDecompress(uint32_t RemoteSequenceIndex, - CompositeBuffer&& CompressedPart, - UpdateFolderStream* OptionalUpdateFolderStream) +BuildsOperationUpdateFolder::StreamDecompress(uint32_t RemoteSequenceIndex, CompositeBuffer&& CompressedPart) { ZEN_TRACE_CPU("StreamDecompress"); const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; @@ -4131,9 +4110,9 @@ BuildsOperationUpdateFolder::StreamDecompress(uint32_t RemoteSequenceIndex, m_ValidatedChunkByteCount += Segment.GetSize(); } DecompressedTemp.Write(Segment, Offset); - if (OptionalUpdateFolderStream) + if (m_Options.OptionalUpdateFolderEventStream) { - OptionalUpdateFolderStream->WroteSequence(RemoteSequenceIndex, Offset, Segment.GetSize()); + m_Options.OptionalUpdateFolderEventStream->WroteSequence(RemoteSequenceIndex, Offset, Segment.GetSize()); } Offset += Segment.GetSize(); m_DiskStats.WriteByteCount += Segment.GetSize(); @@ -4330,8 +4309,7 @@ void BuildsOperationUpdateFolder::WriteBlockChunkOpsToCache(std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, const BlockWriteOps& Ops, BufferedWriteFileCache& WriteCache, - ParallelWork& Work, - UpdateFolderStream* OptionalUpdateFolderStream) + ParallelWork& Work) { ZEN_TRACE_CPU("WriteBlockChunkOpsToCache"); @@ -4353,9 +4331,9 @@ BuildsOperationUpdateFolder::WriteBlockChunkOpsToCache(std::span<std::atomic<uin WriteSequenceChunkToCache(LocalWriter, Chunk, SequenceIndex, FileOffset, PathIndex); - if (OptionalUpdateFolderStream) + if (m_Options.OptionalUpdateFolderEventStream) { - OptionalUpdateFolderStream->WroteSequence(SequenceIndex, FileOffset, Chunk.GetSize()); + m_Options.OptionalUpdateFolderEventStream->WroteSequence(SequenceIndex, FileOffset, Chunk.GetSize()); } } } @@ -4372,7 +4350,7 @@ BuildsOperationUpdateFolder::WriteBlockChunkOpsToCache(std::span<std::atomic<uin } } WriteCache.Close(CompletedChunkSequences); - VerifyAndCompleteChunkSequencesAsync(CompletedChunkSequences, Work, OptionalUpdateFolderStream); + VerifyAndCompleteChunkSequencesAsync(CompletedChunkSequences, Work); } } @@ -4382,8 +4360,7 @@ BuildsOperationUpdateFolder::WriteChunksBlockToCache(const ChunkBlockDescription ParallelWork& Work, CompositeBuffer&& BlockBuffer, std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, - BufferedWriteFileCache& WriteCache, - UpdateFolderStream* OptionalUpdateFolderStream) + BufferedWriteFileCache& WriteCache) { ZEN_TRACE_CPU("WriteChunksBlockToCache"); @@ -4408,7 +4385,7 @@ BuildsOperationUpdateFolder::WriteChunksBlockToCache(const ChunkBlockDescription gsl::narrow<uint32_t>(BlockDescription.ChunkRawHashes.size() - 1), Ops)) { - WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work, OptionalUpdateFolderStream); + WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work); return true; } return false; @@ -4423,7 +4400,7 @@ BuildsOperationUpdateFolder::WriteChunksBlockToCache(const ChunkBlockDescription gsl::narrow<uint32_t>(BlockDescription.ChunkRawHashes.size() - 1), Ops)) { - WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work, OptionalUpdateFolderStream); + WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work); return true; } return false; @@ -4437,8 +4414,7 @@ BuildsOperationUpdateFolder::WritePartialBlockChunksToCache(const ChunkBlockDesc uint32_t FirstIncludedBlockChunkIndex, uint32_t LastIncludedBlockChunkIndex, std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, - BufferedWriteFileCache& WriteCache, - UpdateFolderStream* OptionalUpdateFolderStream) + BufferedWriteFileCache& WriteCache) { ZEN_TRACE_CPU("WritePartialBlockChunksToCache"); @@ -4455,7 +4431,7 @@ BuildsOperationUpdateFolder::WritePartialBlockChunksToCache(const ChunkBlockDesc LastIncludedBlockChunkIndex, Ops)) { - WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work, OptionalUpdateFolderStream); + WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work); return true; } else @@ -4475,8 +4451,7 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa std::atomic<uint64_t>& WritePartsComplete, const uint64_t TotalPartWriteCount, FilteredRate& FilteredWrittenBytesPerSecond, - bool EnableBacklog, - UpdateFolderStream* OptionalUpdateFolderStream) + bool EnableBacklog) { ZEN_TRACE_CPU("AsyncWriteDownloadedChunk"); @@ -4534,7 +4509,6 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa &WriteCache, &WritePartsComplete, &FilteredWrittenBytesPerSecond, - OptionalUpdateFolderStream, ChunkTargetPtrs = std::move(ChunkTargetPtrs), CompressedPart = IoBuffer(std::move(Payload))](std::atomic<bool>&) mutable { if (!m_AbortFlag) @@ -4559,11 +4533,7 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa } } - bool NeedHashVerify = WriteCompressedChunkToCache(RemoteChunkIndex, - ChunkTargetPtrs, - WriteCache, - std::move(CompressedPart), - OptionalUpdateFolderStream); + bool NeedHashVerify = WriteCompressedChunkToCache(RemoteChunkIndex, ChunkTargetPtrs, WriteCache, std::move(CompressedPart)); if (!m_AbortFlag) { WritePartsComplete++; @@ -4590,11 +4560,11 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa WriteCache.Close(CompletedSequences); if (NeedHashVerify) { - VerifyAndCompleteChunkSequencesAsync(CompletedSequences, Work, OptionalUpdateFolderStream); + VerifyAndCompleteChunkSequencesAsync(CompletedSequences, Work); } else { - FinalizeChunkSequences(CompletedSequences, OptionalUpdateFolderStream); + FinalizeChunkSequences(CompletedSequences); } } } @@ -4603,9 +4573,7 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa } void -BuildsOperationUpdateFolder::VerifyAndCompleteChunkSequencesAsync(std::span<const uint32_t> RemoteSequenceIndexes, - ParallelWork& Work, - UpdateFolderStream* OptionalUpdateFolderStream) +BuildsOperationUpdateFolder::VerifyAndCompleteChunkSequencesAsync(std::span<const uint32_t> RemoteSequenceIndexes, ParallelWork& Work) { if (RemoteSequenceIndexes.empty()) { @@ -4617,7 +4585,7 @@ BuildsOperationUpdateFolder::VerifyAndCompleteChunkSequencesAsync(std::span<cons for (uint32_t RemoteSequenceIndexOffset = 1; RemoteSequenceIndexOffset < RemoteSequenceIndexes.size(); RemoteSequenceIndexOffset++) { const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset]; - Work.ScheduleWork(m_IOWorkerPool, [this, RemoteSequenceIndex, OptionalUpdateFolderStream](std::atomic<bool>&) { + Work.ScheduleWork(m_IOWorkerPool, [this, RemoteSequenceIndex](std::atomic<bool>&) { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_VerifyAndFinalizeSequence"); @@ -4625,7 +4593,7 @@ BuildsOperationUpdateFolder::VerifyAndCompleteChunkSequencesAsync(std::span<cons VerifySequence(RemoteSequenceIndex); if (!m_AbortFlag) { - FinalizeChunkSequence(RemoteSequenceIndex, OptionalUpdateFolderStream); + FinalizeChunkSequence(RemoteSequenceIndex); } } }); @@ -4633,14 +4601,14 @@ BuildsOperationUpdateFolder::VerifyAndCompleteChunkSequencesAsync(std::span<cons const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[0]; VerifySequence(RemoteSequenceIndex); - FinalizeChunkSequence(RemoteSequenceIndex, OptionalUpdateFolderStream); + FinalizeChunkSequence(RemoteSequenceIndex); } else { for (uint32_t RemoteSequenceIndexOffset = 0; RemoteSequenceIndexOffset < RemoteSequenceIndexes.size(); RemoteSequenceIndexOffset++) { const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset]; - FinalizeChunkSequence(RemoteSequenceIndex, OptionalUpdateFolderStream); + FinalizeChunkSequence(RemoteSequenceIndex); } } } @@ -4674,7 +4642,7 @@ BuildsOperationUpdateFolder::CompleteChunkTargets(const std::vector<const Chunke } void -BuildsOperationUpdateFolder::FinalizeChunkSequence(uint32_t RemoteSequenceIndex, UpdateFolderStream* OptionalUpdateFolderStream) +BuildsOperationUpdateFolder::FinalizeChunkSequence(uint32_t RemoteSequenceIndex) { ZEN_TRACE_CPU("FinalizeChunkSequence"); @@ -4689,21 +4657,20 @@ BuildsOperationUpdateFolder::FinalizeChunkSequence(uint32_t RemoteSequenceIndex, { throw std::system_error(Ec); } - if (OptionalUpdateFolderStream) + if (m_Options.OptionalUpdateFolderEventStream) { - OptionalUpdateFolderStream->CompletedSequence(RemoteSequenceIndex); + m_Options.OptionalUpdateFolderEventStream->CompletedSequence(RemoteSequenceIndex); } } void -BuildsOperationUpdateFolder::FinalizeChunkSequences(std::span<const uint32_t> RemoteSequenceIndexes, - UpdateFolderStream* OptionalUpdateFolderStream) +BuildsOperationUpdateFolder::FinalizeChunkSequences(std::span<const uint32_t> RemoteSequenceIndexes) { ZEN_TRACE_CPU("FinalizeChunkSequences"); for (uint32_t SequenceIndex : RemoteSequenceIndexes) { - FinalizeChunkSequence(SequenceIndex, OptionalUpdateFolderStream); + FinalizeChunkSequence(SequenceIndex); } } diff --git a/src/zenremotestore/builds/updatefolderstream.cpp b/src/zenremotestore/builds/updatefoldereventdiskstream.cpp index 5a1809923..ad36189a3 100644 --- a/src/zenremotestore/builds/updatefolderstream.cpp +++ b/src/zenremotestore/builds/updatefoldereventdiskstream.cpp @@ -1,6 +1,6 @@ // Copyright Epic Games, Inc. All Rights Reserved. -#include <zenremotestore/builds/updatefolderstream.h> +#include <zenremotestore/builds/updatefoldereventdiskstream.h> #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinaryutil.h> @@ -23,9 +23,9 @@ namespace { #pragma pack(1) struct StreamHeader { - static constexpr uint32_t ExpectedMagic = 0x7a726273; // 'zrbs'; - static constexpr uint32_t CurrentVersion = 1; - static constexpr uint32_t UpdateFolderStreamType = HashStringDjb2("UpdateFolderDiskStreamFeedback"sv); + static constexpr uint32_t ExpectedMagic = 0x7a726273; // 'zrbs'; + static constexpr uint32_t CurrentVersion = 1; + static constexpr uint32_t UpdateFolderEventDiskStreamType = HashStringDjb2("UpdateFolderEventDiskStream"sv); uint32_t Magic = ExpectedMagic; uint32_t Version = CurrentVersion; @@ -43,89 +43,10 @@ namespace { #pragma pack(pop) static_assert(sizeof(StreamHeader) == 32); - class UpdateFolderDiskStreamFeedback : public UpdateFolderStream - { - using Event = UpdateFolderDiskStream::Event; - - public: - explicit UpdateFolderDiskStreamFeedback(const std::filesystem::path& OutputPath) : m_OutputStream(OutputPath) {} - virtual ~UpdateFolderDiskStreamFeedback() {} - - virtual void Initialize(const ChunkedFolderContent& RemoteContent, - std::span<const ChunkBlockDescription> BlockDescriptions, - std::span<const Chunk> DownloadChunks, - std::span<const BlockRange> DownloadBlockRanges, - std::span<const Block> DownloadBlocks, - std::span<const Sequence> SequencesToWrite) override - { - m_OutputStream - .PrepareWrite(RemoteContent, BlockDescriptions, DownloadChunks, DownloadBlockRanges, DownloadBlocks, SequencesToWrite); - } - - virtual void DownloadedChunk(uint32_t RemoteChunkIndex) override - { - m_OutputStream.WriteEvent( - m_OutputStream.MakeEvent(Event::EventType::DownloadedChunk, - Event::EventPayload{.DownloadedChunk = {.RemoteChunkIndex = RemoteChunkIndex}})); - } - - virtual void DownloadedBlockRange(uint32_t BlockIndex, uint32_t RangeStart, uint32_t RangeLength) override - { - m_OutputStream.WriteEvent(m_OutputStream.MakeEvent( - Event::EventType::DownloadedBlockRange, - Event::EventPayload{ - .DownloadedBlockRange = {.BlockIndex = BlockIndex, .RangeStart = RangeStart, .RangeLength = RangeLength}})); - } - - virtual void DownloadedBlock(uint32_t BlockIndex) override - { - m_OutputStream.WriteEvent(m_OutputStream.MakeEvent(Event::EventType::DownloadedBlock, - Event::EventPayload{.DownloadedBlock = {.BlockIndex = BlockIndex}})); - } - - virtual void WroteSequence(uint32_t RemoteSequenceIndex, uint64_t Offset, uint64_t Length) override - { - m_OutputStream.WriteEvent(m_OutputStream.MakeEvent( - Event::EventType::WroteSequence, - Event::EventPayload{.WroteSequence = {.RemoteSequenceIndex = RemoteSequenceIndex, .Offset = Offset, .Length = Length}})); - } - - virtual void CompletedBlockRange(uint32_t BlockIndex, uint32_t RangeStart, uint32_t RangeLength) override - { - m_OutputStream.WriteEvent(m_OutputStream.MakeEvent( - Event::EventType::CompletedBlockRange, - Event::EventPayload{ - .CompletedBlockRange = {.BlockIndex = BlockIndex, .RangeStart = RangeStart, .RangeLength = RangeLength}})); - } - - virtual void CompletedBlock(uint32_t BlockIndex) override - { - m_OutputStream.WriteEvent(m_OutputStream.MakeEvent(Event::EventType::CompletedBlock, - Event::EventPayload{.CompletedBlock = {.BlockIndex = BlockIndex}})); - } - - virtual void CompletedSequence(uint32_t RemoteSequenceIndex) override - { - m_OutputStream.WriteEvent( - m_OutputStream.MakeEvent(Event::EventType::CompletedSequence, - Event::EventPayload{.CompletedSequence = {.RemoteSequenceIndex = RemoteSequenceIndex}})); - } - - virtual void WriteComplete() override { m_OutputStream.EndWrite(); } - - private: - UpdateFolderDiskStream m_OutputStream; - }; } // namespace -std::unique_ptr<UpdateFolderStream> -CreateDiskUpdateFolderStream(const std::filesystem::path& OutputPath) -{ - return std::make_unique<UpdateFolderDiskStreamFeedback>(OutputPath); -} - -UpdateFolderDiskStream::Event::EventHeader -UpdateFolderDiskStream::Event::MakeHeader(Event::EventType Type, uint64_t TimeStampUs, uint16_t ThreadIndex) +UpdateFolderEventDiskStream::Event::EventHeader +UpdateFolderEventDiskStream::Event::MakeHeader(Event::EventType Type, uint64_t TimeStampUs, uint16_t ThreadIndex) { return EventHeader{.EventType = Type, .TimestampUsHigh = gsl::narrow<uint8_t>(TimeStampUs >> 32), @@ -133,14 +54,14 @@ UpdateFolderDiskStream::Event::MakeHeader(Event::EventType Type, uint64_t TimeSt .ThreadIndex = ThreadIndex}; } -UpdateFolderDiskStream::UpdateFolderDiskStream(const std::filesystem::path& OutputPath) +UpdateFolderEventDiskStream::UpdateFolderEventDiskStream(const std::filesystem::path& OutputPath) : m_OutputPath(OutputPath) , m_Output(OutputPath, BasicFile::Mode::kTruncate) , m_WriteOffset(0) { } -UpdateFolderDiskStream::~UpdateFolderDiskStream() +UpdateFolderEventDiskStream::~UpdateFolderEventDiskStream() { try { @@ -152,14 +73,14 @@ UpdateFolderDiskStream::~UpdateFolderDiskStream() } } -UpdateFolderDiskStream::Event -UpdateFolderDiskStream::MakeEvent(Event::EventType Type, Event::EventPayload Payload) +UpdateFolderEventDiskStream::Event +UpdateFolderEventDiskStream::MakeEvent(Event::EventType Type, Event::EventPayload Payload) { return Event{.Header = Event::MakeHeader(Type, m_Stopwatch.GetElapsedTimeUs(), GetCurrentThreadIndex()), .Payload = Payload}; } void -UpdateFolderDiskStream::WriteEvent(const Event& InEvent) +UpdateFolderEventDiskStream::WriteEvent(const Event& InEvent) { uint64_t WriteOffset = m_WriteOffset.fetch_add(sizeof(Event)); m_Output.Write(&InEvent, sizeof(Event), WriteOffset); @@ -167,12 +88,12 @@ UpdateFolderDiskStream::WriteEvent(const Event& InEvent) } void -UpdateFolderDiskStream::PrepareWrite(const ChunkedFolderContent& RemoteContent, - std::span<const ChunkBlockDescription> BlockDescriptions, - std::span<const Chunk> DownloadChunks, - std::span<const BlockRange> DownloadBlockRanges, - std::span<const Block> DownloadBlocks, - std::span<const Sequence> SequencesToWrite) +UpdateFolderEventDiskStream::PrepareWrite(const ChunkedFolderContent& RemoteContent, + std::span<const ChunkBlockDescription> BlockDescriptions, + std::span<const Chunk> DownloadChunks, + std::span<const BlockRange> DownloadBlockRanges, + std::span<const Block> DownloadBlocks, + std::span<const Sequence> SequencesToWrite) { CbObjectWriter Writer; Writer.BeginObject("remoteContent"sv); @@ -261,7 +182,7 @@ UpdateFolderDiskStream::PrepareWrite(const ChunkedFolderContent& RemoteContent uint32_t PrepPayloadSize = gsl::narrow<uint32_t>(PayloadBuffer.GetSize()); - StreamHeader Header = {.StreamType = StreamHeader::UpdateFolderStreamType, .PrepPayloadSize = PrepPayloadSize}; + StreamHeader Header = {.StreamType = StreamHeader::UpdateFolderEventDiskStreamType, .PrepPayloadSize = PrepPayloadSize}; Header.Checksum = StreamHeader::ComputeChecksum(Header); @@ -279,21 +200,21 @@ UpdateFolderDiskStream::PrepareWrite(const ChunkedFolderContent& RemoteContent } void -UpdateFolderDiskStream::EndWrite() +UpdateFolderEventDiskStream::EndWrite() { RwLock::ExclusiveLockScope _(m_UpdateLock); m_Output.Close(); } void -UpdateFolderDiskStream::Read(const std::filesystem::path& InputPath, - ChunkedFolderContent& OutRemoteContent, - std::vector<ChunkBlockDescription>& OutBlockDescriptions, - std::vector<Chunk>& OutDownloadChunks, - std::vector<BlockRange>& OutDownloadBlockRanges, - std::vector<Block>& OutDownloadBlocks, - std::vector<Sequence>& OutSequencesToWrite, - std::vector<Event>& OutEvents) +UpdateFolderEventDiskStream::Read(const std::filesystem::path& InputPath, + ChunkedFolderContent& OutRemoteContent, + std::vector<ChunkBlockDescription>& OutBlockDescriptions, + std::vector<Chunk>& OutDownloadChunks, + std::vector<BlockRange>& OutDownloadBlockRanges, + std::vector<Block>& OutDownloadBlocks, + std::vector<Sequence>& OutSequencesToWrite, + std::vector<Event>& OutEvents) { BasicFile InputFile(InputPath, BasicFile::Mode::kRead); @@ -317,10 +238,11 @@ UpdateFolderDiskStream::Read(const std::filesystem::path& InputPath, { throw std::runtime_error(fmt::format("Expected version {}, file has version {}", StreamHeader::CurrentVersion, Header.Version)); } - if (Header.StreamType != StreamHeader::UpdateFolderStreamType) + if (Header.StreamType != StreamHeader::UpdateFolderEventDiskStreamType) { - throw std::runtime_error( - fmt::format("Expected stream type {}, file has stream type {}", StreamHeader::UpdateFolderStreamType, Header.StreamType)); + throw std::runtime_error(fmt::format("Expected stream type {}, file has stream type {}", + StreamHeader::UpdateFolderEventDiskStreamType, + Header.StreamType)); } if (Header.Checksum != StreamHeader::ComputeChecksum(Header)) { @@ -434,70 +356,8 @@ UpdateFolderDiskStream::Read(const std::filesystem::path& InputPath, InputFile.Read(OutEvents.data(), EntryCount * sizeof(Event), Offset); } -void -UpdateFolderDiskStream::Replay(const std::filesystem::path& InputPath, UpdateFolderStream& UpdateFolderStream) -{ - ZEN_UNUSED(UpdateFolderStream); - - ChunkedFolderContent RemoteContent; - std::vector<ChunkBlockDescription> BlockDescriptions; - std::vector<Chunk> DownloadChunks; - std::vector<BlockRange> DownloadBlockRanges; - std::vector<Block> DownloadBlocks; - std::vector<Sequence> SequencesToWrite; - std::vector<Event> Events; - - Read(InputPath, RemoteContent, BlockDescriptions, DownloadChunks, DownloadBlockRanges, DownloadBlocks, SequencesToWrite, Events); - - UpdateFolderStream.Initialize(RemoteContent, BlockDescriptions, DownloadChunks, DownloadBlockRanges, DownloadBlocks, SequencesToWrite); - - for (const Event& E : Events) - { - switch (E.Header.EventType) - { - case Event::EventType::DownloadedChunk: - UpdateFolderStream.DownloadedChunk(E.Payload.DownloadedChunk.RemoteChunkIndex); - break; - case Event::EventType::DownloadedBlockRange: - UpdateFolderStream.DownloadedBlockRange(E.Payload.DownloadedBlockRange.BlockIndex, - E.Payload.DownloadedBlockRange.RangeStart, - E.Payload.DownloadedBlockRange.RangeLength); - break; - case Event::EventType::DownloadedBlock: - UpdateFolderStream.DownloadedBlock(E.Payload.DownloadedBlock.BlockIndex); - break; - - case Event::EventType::WroteSequence: - UpdateFolderStream.WroteSequence(E.Payload.WroteSequence.RemoteSequenceIndex, - E.Payload.WroteSequence.Offset, - E.Payload.WroteSequence.Length); - break; - - case Event::EventType::CompletedBlockRange: - UpdateFolderStream.CompletedBlockRange(E.Payload.CompletedBlockRange.BlockIndex, - E.Payload.CompletedBlockRange.RangeStart, - E.Payload.CompletedBlockRange.RangeLength); - break; - case Event::EventType::CompletedBlock: - UpdateFolderStream.CompletedBlock(E.Payload.CompletedBlock.BlockIndex); - break; - - case Event::EventType::CompletedSequence: - UpdateFolderStream.CompletedSequence(E.Payload.CompletedSequence.RemoteSequenceIndex); - break; - - case Event::EventType::WriteComplete: - UpdateFolderStream.WriteComplete(); - break; - - default: - throw std::runtime_error(std::format("Unknown event type: {}", uint8_t(E.Header.EventType))); - } - } -} - uint16_t -UpdateFolderDiskStream::GetThreadIndex(int ThreadId) +UpdateFolderEventDiskStream::GetThreadIndex(int ThreadId) { { RwLock::SharedLockScope _(m_UpdateLock); @@ -517,7 +377,7 @@ UpdateFolderDiskStream::GetThreadIndex(int ThreadId) } uint16_t -UpdateFolderDiskStream::GetCurrentThreadIndex() +UpdateFolderEventDiskStream::GetCurrentThreadIndex() { return GetThreadIndex(GetCurrentThreadId()); } diff --git a/src/zenremotestore/builds/updatefoldereventstream.cpp b/src/zenremotestore/builds/updatefoldereventstream.cpp new file mode 100644 index 000000000..fe635e245 --- /dev/null +++ b/src/zenremotestore/builds/updatefoldereventstream.cpp @@ -0,0 +1,174 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenremotestore/builds/updatefoldereventstream.h> + +#include <zenremotestore/builds/updatefoldereventdiskstream.h> + +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/stream.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <xxhash.h> +ZEN_THIRD_PARTY_INCLUDES_END + +#include <numeric> + +namespace zen { + +using namespace std::literals; + +namespace { + class DiskUpdateFolderEventStream : public UpdateFolderEventStream + { + using Event = UpdateFolderEventDiskStream::Event; + + public: + explicit DiskUpdateFolderEventStream(const std::filesystem::path& OutputPath) : m_OutputStream(OutputPath) {} + virtual ~DiskUpdateFolderEventStream() {} + + virtual void Initialize(const ChunkedFolderContent& RemoteContent, + std::span<const ChunkBlockDescription> BlockDescriptions, + std::span<const Chunk> DownloadChunks, + std::span<const BlockRange> DownloadBlockRanges, + std::span<const Block> DownloadBlocks, + std::span<const Sequence> SequencesToWrite) override + { + m_OutputStream + .PrepareWrite(RemoteContent, BlockDescriptions, DownloadChunks, DownloadBlockRanges, DownloadBlocks, SequencesToWrite); + } + + virtual void DownloadedChunk(uint32_t RemoteChunkIndex) override + { + m_OutputStream.WriteEvent( + m_OutputStream.MakeEvent(Event::EventType::DownloadedChunk, + Event::EventPayload{.DownloadedChunk = {.RemoteChunkIndex = RemoteChunkIndex}})); + } + + virtual void DownloadedBlockRange(uint32_t BlockIndex, uint32_t RangeStart, uint32_t RangeLength) override + { + m_OutputStream.WriteEvent(m_OutputStream.MakeEvent( + Event::EventType::DownloadedBlockRange, + Event::EventPayload{ + .DownloadedBlockRange = {.BlockIndex = BlockIndex, .RangeStart = RangeStart, .RangeLength = RangeLength}})); + } + + virtual void DownloadedBlock(uint32_t BlockIndex) override + { + m_OutputStream.WriteEvent(m_OutputStream.MakeEvent(Event::EventType::DownloadedBlock, + Event::EventPayload{.DownloadedBlock = {.BlockIndex = BlockIndex}})); + } + + virtual void WroteSequence(uint32_t RemoteSequenceIndex, uint64_t Offset, uint64_t Length) override + { + m_OutputStream.WriteEvent(m_OutputStream.MakeEvent( + Event::EventType::WroteSequence, + Event::EventPayload{.WroteSequence = {.RemoteSequenceIndex = RemoteSequenceIndex, .Offset = Offset, .Length = Length}})); + } + + virtual void CompletedBlockRange(uint32_t BlockIndex, uint32_t RangeStart, uint32_t RangeLength) override + { + m_OutputStream.WriteEvent(m_OutputStream.MakeEvent( + Event::EventType::CompletedBlockRange, + Event::EventPayload{ + .CompletedBlockRange = {.BlockIndex = BlockIndex, .RangeStart = RangeStart, .RangeLength = RangeLength}})); + } + + virtual void CompletedBlock(uint32_t BlockIndex) override + { + m_OutputStream.WriteEvent(m_OutputStream.MakeEvent(Event::EventType::CompletedBlock, + Event::EventPayload{.CompletedBlock = {.BlockIndex = BlockIndex}})); + } + + virtual void CompletedSequence(uint32_t RemoteSequenceIndex) override + { + m_OutputStream.WriteEvent( + m_OutputStream.MakeEvent(Event::EventType::CompletedSequence, + Event::EventPayload{.CompletedSequence = {.RemoteSequenceIndex = RemoteSequenceIndex}})); + } + + virtual void WriteComplete() override { m_OutputStream.EndWrite(); } + + private: + UpdateFolderEventDiskStream m_OutputStream; + }; +} // namespace + +std::unique_ptr<UpdateFolderEventStream> +CreateDiskUpdateFolderEventStream(const std::filesystem::path& OutputPath) +{ + return std::make_unique<DiskUpdateFolderEventStream>(OutputPath); +} + +void +ReplayUpdateFolderEventDiskStream(const std::filesystem::path& InputPath, UpdateFolderEventStream& EventStream) +{ + using Chunk = UpdateFolderEventStream::Chunk; + using BlockRange = UpdateFolderEventStream::BlockRange; + using Block = UpdateFolderEventStream::Block; + using Sequence = UpdateFolderEventStream::Sequence; + using Event = UpdateFolderEventDiskStream::Event; + + ChunkedFolderContent RemoteContent; + std::vector<ChunkBlockDescription> BlockDescriptions; + std::vector<Chunk> DownloadChunks; + std::vector<BlockRange> DownloadBlockRanges; + std::vector<Block> DownloadBlocks; + std::vector<Sequence> SequencesToWrite; + std::vector<Event> Events; + + UpdateFolderEventDiskStream::Read(InputPath, + RemoteContent, + BlockDescriptions, + DownloadChunks, + DownloadBlockRanges, + DownloadBlocks, + SequencesToWrite, + Events); + + for (const Event& E : Events) + { + switch (E.Header.EventType) + { + case Event::EventType::DownloadedChunk: + EventStream.DownloadedChunk(E.Payload.DownloadedChunk.RemoteChunkIndex); + break; + case Event::EventType::DownloadedBlockRange: + EventStream.DownloadedBlockRange(E.Payload.DownloadedBlockRange.BlockIndex, + E.Payload.DownloadedBlockRange.RangeStart, + E.Payload.DownloadedBlockRange.RangeLength); + break; + case Event::EventType::DownloadedBlock: + EventStream.DownloadedBlock(E.Payload.DownloadedBlock.BlockIndex); + break; + + case Event::EventType::WroteSequence: + EventStream.WroteSequence(E.Payload.WroteSequence.RemoteSequenceIndex, + E.Payload.WroteSequence.Offset, + E.Payload.WroteSequence.Length); + break; + + case Event::EventType::CompletedBlockRange: + EventStream.CompletedBlockRange(E.Payload.CompletedBlockRange.BlockIndex, + E.Payload.CompletedBlockRange.RangeStart, + E.Payload.CompletedBlockRange.RangeLength); + break; + case Event::EventType::CompletedBlock: + EventStream.CompletedBlock(E.Payload.CompletedBlock.BlockIndex); + break; + + case Event::EventType::CompletedSequence: + EventStream.CompletedSequence(E.Payload.CompletedSequence.RemoteSequenceIndex); + break; + + case Event::EventType::WriteComplete: + EventStream.WriteComplete(); + break; + + default: + throw std::runtime_error(std::format("Unknown event type: {}", uint8_t(E.Header.EventType))); + } + } +} + +} // namespace zen diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h index fe69a624c..07a9198df 100644 --- a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h +++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h @@ -35,7 +35,7 @@ class BufferedWriteFileCache; struct ChunkBlockDescription; struct ChunkedFolderContent; -class UpdateFolderStream; +class UpdateFolderEventStream; struct DiskStatistics { @@ -144,8 +144,9 @@ public: bool EnableTargetFolderScavenging = true; bool ValidateCompletedSequences = true; std::vector<std::string> ExcludeFolders; - uint64_t MaximumInMemoryPayloadSize = 512u * 1024u; - bool PopulateCache = true; + uint64_t MaximumInMemoryPayloadSize = 512u * 1024u; + bool PopulateCache = true; + UpdateFolderEventStream* OptionalUpdateFolderEventStream = nullptr; }; BuildsOperationUpdateFolder(OperationLogOutput& OperationLogOutput, @@ -164,7 +165,7 @@ public: const std::vector<IoHash>& LooseChunkHashes, const Options& Options); - void Execute(FolderContent& OutLocalFolderState, UpdateFolderStream* OptionalUpdateFolderStream = nullptr); + void Execute(FolderContent& OutLocalFolderState); DiskStatistics m_DiskStats; CacheMappingStatistics m_CacheMappingStats; @@ -294,8 +295,7 @@ private: uint64_t TotalRequestCount, uint64_t TotalPartWriteCount, FilteredRate& FilteredDownloadedBytesPerSecond, - FilteredRate& FilteredWrittenBytesPerSecond, - UpdateFolderStream* OptionalUpdateFolderStream); + FilteredRate& FilteredWrittenBytesPerSecond); void DownloadBuildBlob(uint32_t RemoteChunkIndex, const BlobsExistsResult& ExistsResult, @@ -327,16 +327,14 @@ private: const std::vector<ChunkedFolderContent>& ScavengedContents, const std::vector<ChunkedContentLookup>& ScavengedLookups, const std::vector<std::filesystem::path>& ScavengedPaths, - BufferedWriteFileCache& WriteCache, - UpdateFolderStream* OptionalUpdateFolderStream); + BufferedWriteFileCache& WriteCache); bool WriteCompressedChunkToCache(const uint32_t RemoteChunkIndex, const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs, BufferedWriteFileCache& WriteCache, - IoBuffer&& CompressedPart, - UpdateFolderStream* OptionalUpdateFolderStream); + IoBuffer&& CompressedPart); - void StreamDecompress(uint32_t RemoteSequenceIndex, CompositeBuffer&& CompressedPart, UpdateFolderStream* OptionalUpdateFolderStream); + void StreamDecompress(uint32_t RemoteSequenceIndex, CompositeBuffer&& CompressedPart); void WriteSequenceChunkToCache(BufferedWriteFileCache::Local& LocalWriter, const CompositeBuffer& Chunk, @@ -356,16 +354,14 @@ private: void WriteBlockChunkOpsToCache(std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, const BlockWriteOps& Ops, BufferedWriteFileCache& WriteCache, - ParallelWork& Work, - UpdateFolderStream* OptionalUpdateFolderStream); + ParallelWork& Work); bool WriteChunksBlockToCache(const ChunkBlockDescription& BlockDescription, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, ParallelWork& Work, CompositeBuffer&& BlockBuffer, std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, - BufferedWriteFileCache& WriteCache, - UpdateFolderStream* OptionalUpdateFolderStream); + BufferedWriteFileCache& WriteCache); bool WritePartialBlockChunksToCache(const ChunkBlockDescription& BlockDescription, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, @@ -374,8 +370,7 @@ private: uint32_t FirstIncludedBlockChunkIndex, uint32_t LastIncludedBlockChunkIndex, std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, - BufferedWriteFileCache& WriteCache, - UpdateFolderStream* OptionalUpdateFolderStream); + BufferedWriteFileCache& WriteCache); void AsyncWriteDownloadedChunk(const std::filesystem::path& ZenFolderPath, uint32_t RemoteChunkIndex, @@ -387,18 +382,15 @@ private: std::atomic<uint64_t>& WritePartsComplete, const uint64_t TotalPartWriteCount, FilteredRate& FilteredWrittenBytesPerSecond, - bool EnableBacklog, - UpdateFolderStream* OptionalUpdateFolderStream); + bool EnableBacklog); - void VerifyAndCompleteChunkSequencesAsync(std::span<const uint32_t> RemoteSequenceIndexes, - ParallelWork& Work, - UpdateFolderStream* OptionalUpdateFolderStream); + void VerifyAndCompleteChunkSequencesAsync(std::span<const uint32_t> RemoteSequenceIndexes, ParallelWork& Work); bool CompleteSequenceChunk(uint32_t RemoteSequenceIndex, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters); std::vector<uint32_t> CompleteChunkTargets(const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters); - void FinalizeChunkSequence(uint32_t RemoteSequenceIndex, UpdateFolderStream* OptionalUpdateFolderStream); - void FinalizeChunkSequences(std::span<const uint32_t> RemoteSequenceIndexes, UpdateFolderStream* OptionalUpdateFolderStream); - void VerifySequence(uint32_t RemoteSequenceIndex); + void FinalizeChunkSequence(uint32_t RemoteSequenceIndex); + void FinalizeChunkSequences(std::span<const uint32_t> RemoteSequenceIndexes); + void VerifySequence(uint32_t RemoteSequenceIndex); OperationLogOutput& m_LogOutput; StorageInstance& m_Storage; diff --git a/src/zenremotestore/include/zenremotestore/builds/updatefolderstream.h b/src/zenremotestore/include/zenremotestore/builds/updatefoldereventdiskstream.h index 6fe2188cd..6db03fa80 100644 --- a/src/zenremotestore/include/zenremotestore/builds/updatefolderstream.h +++ b/src/zenremotestore/include/zenremotestore/builds/updatefoldereventdiskstream.h @@ -4,59 +4,10 @@ #include <zencore/basicfile.h> #include <zencore/timer.h> - -#include <zenremotestore/chunking/chunkblock.h> -#include <zenremotestore/chunking/chunkedcontent.h> +#include <zenremotestore/builds/updatefoldereventstream.h> namespace zen { -class UpdateFolderStream -{ -public: - virtual ~UpdateFolderStream() {} - - struct Sequence - { - uint32_t RemoteSequenceIndex; - }; - struct Chunk - { - uint32_t RemoteChunkIndex; - }; - struct Block - { - uint32_t BlockIndex; - }; - struct BlockRange - { - uint32_t BlockIndex; - uint32_t RangeStart; - uint32_t RangeLength; - }; - - virtual void Initialize(const ChunkedFolderContent& RemoteContent, - std::span<const ChunkBlockDescription> BlockDescriptions, - std::span<const Chunk> DownloadChunks, - std::span<const BlockRange> DownloadBlockRanges, - std::span<const Block> DownloadBlocks, - std::span<const Sequence> SequencesToWrite) = 0; - - virtual void DownloadedChunk(uint32_t RemoteChunkIndex) = 0; - virtual void DownloadedBlockRange(uint32_t BlockIndex, uint32_t RangeStart, uint32_t RangeLength) = 0; - virtual void DownloadedBlock(uint32_t BlockIndex) = 0; - - virtual void WroteSequence(uint32_t RemoteSequenceIndex, uint64_t Offset, uint64_t Length) = 0; - - virtual void CompletedBlockRange(uint32_t BlockIndex, uint32_t RangeStart, uint32_t RangeLength) = 0; - virtual void CompletedBlock(uint32_t BlockIndex) = 0; - - virtual void CompletedSequence(uint32_t RemoteSequenceIndex) = 0; - - virtual void WriteComplete() = 0; -}; - -std::unique_ptr<UpdateFolderStream> CreateDiskUpdateFolderStream(const std::filesystem::path& OutputPath); - struct ChunkSequenceStatus { struct Range @@ -76,12 +27,12 @@ struct ChunkSequenceStatus void RemoveCompletedRange(const Range& InRange); }; -class UpdateFolderDiskStream +class UpdateFolderEventDiskStream { - using Sequence = UpdateFolderStream::Sequence; - using Chunk = UpdateFolderStream::Chunk; - using Block = UpdateFolderStream::Block; - using BlockRange = UpdateFolderStream::BlockRange; + using Sequence = UpdateFolderEventStream::Sequence; + using Chunk = UpdateFolderEventStream::Chunk; + using Block = UpdateFolderEventStream::Block; + using BlockRange = UpdateFolderEventStream::BlockRange; public: #pragma pack(push) @@ -179,9 +130,10 @@ public: #pragma pack(pop) static_assert(sizeof(Event) == 28u); - UpdateFolderDiskStream(const std::filesystem::path& OutputPath); + UpdateFolderEventDiskStream() {} + explicit UpdateFolderEventDiskStream(const std::filesystem::path& OutputPath); - virtual ~UpdateFolderDiskStream(); + virtual ~UpdateFolderEventDiskStream(); Event MakeEvent(Event::EventType Type, Event::EventPayload Payload); @@ -205,8 +157,6 @@ public: std::vector<Sequence>& OutSequencesToWrite, std::vector<Event>& OutEvents); - static void Replay(const std::filesystem::path& InputPath, UpdateFolderStream& UpdateFolderStream); - private: uint16_t GetThreadIndex(int ThreadId); uint16_t GetCurrentThreadIndex(); diff --git a/src/zenremotestore/include/zenremotestore/builds/updatefoldereventstream.h b/src/zenremotestore/include/zenremotestore/builds/updatefoldereventstream.h new file mode 100644 index 000000000..7a3f152f8 --- /dev/null +++ b/src/zenremotestore/include/zenremotestore/builds/updatefoldereventstream.h @@ -0,0 +1,61 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/basicfile.h> +#include <zencore/timer.h> + +#include <zenremotestore/chunking/chunkblock.h> +#include <zenremotestore/chunking/chunkedcontent.h> + +namespace zen { + +class UpdateFolderEventStream +{ +public: + virtual ~UpdateFolderEventStream() {} + + struct Sequence + { + uint32_t RemoteSequenceIndex; + }; + struct Chunk + { + uint32_t RemoteChunkIndex; + }; + struct Block + { + uint32_t BlockIndex; + }; + struct BlockRange + { + uint32_t BlockIndex; + uint32_t RangeStart; + uint32_t RangeLength; + }; + + virtual void Initialize(const ChunkedFolderContent& RemoteContent, + std::span<const ChunkBlockDescription> BlockDescriptions, + std::span<const Chunk> DownloadChunks, + std::span<const BlockRange> DownloadBlockRanges, + std::span<const Block> DownloadBlocks, + std::span<const Sequence> SequencesToWrite) = 0; + + virtual void DownloadedChunk(uint32_t RemoteChunkIndex) = 0; + virtual void DownloadedBlockRange(uint32_t BlockIndex, uint32_t RangeStart, uint32_t RangeLength) = 0; + virtual void DownloadedBlock(uint32_t BlockIndex) = 0; + + virtual void WroteSequence(uint32_t RemoteSequenceIndex, uint64_t Offset, uint64_t Length) = 0; + + virtual void CompletedBlockRange(uint32_t BlockIndex, uint32_t RangeStart, uint32_t RangeLength) = 0; + virtual void CompletedBlock(uint32_t BlockIndex) = 0; + + virtual void CompletedSequence(uint32_t RemoteSequenceIndex) = 0; + + virtual void WriteComplete() = 0; +}; + +std::unique_ptr<UpdateFolderEventStream> CreateDiskUpdateFolderEventStream(const std::filesystem::path& OutputPath); +void ReplayUpdateFolderEventDiskStream(const std::filesystem::path& InputPath, UpdateFolderEventStream& EventStream); + +} // namespace zen |