aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-02-10 11:03:44 +0100
committerDan Engelbrecht <[email protected]>2026-02-10 11:03:44 +0100
commite4c699ee36a070c9721fde2a41e5fc6cef730870 (patch)
tree8f8462e185608d3afabf30493ff274d75a25f198 /src
parentzen view command to present event recordings such as builds download in a UI (diff)
downloadzen-e4c699ee36a070c9721fde2a41e5fc6cef730870.tar.xz
zen-e4c699ee36a070c9721fde2a41e5fc6cef730870.zip
refactor/cleanup
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/builds_cmd.cpp44
-rw-r--r--src/zen/cmds/view_cmd.cpp48
-rw-r--r--src/zenremotestore/builds/buildstorageoperations.cpp341
-rw-r--r--src/zenremotestore/builds/updatefoldereventdiskstream.cpp (renamed from src/zenremotestore/builds/updatefolderstream.cpp)206
-rw-r--r--src/zenremotestore/builds/updatefoldereventstream.cpp174
-rw-r--r--src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h42
-rw-r--r--src/zenremotestore/include/zenremotestore/builds/updatefoldereventdiskstream.h (renamed from src/zenremotestore/include/zenremotestore/builds/updatefolderstream.h)68
-rw-r--r--src/zenremotestore/include/zenremotestore/builds/updatefoldereventstream.h61
8 files changed, 495 insertions, 489 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index 2e621608e..1f2166d25 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -32,7 +32,7 @@
#include <zenremotestore/builds/buildstorageutil.h>
#include <zenremotestore/builds/filebuildstorage.h>
#include <zenremotestore/builds/jupiterbuildstorage.h>
-#include <zenremotestore/builds/updatefolderstream.h>
+#include <zenremotestore/builds/updatefoldereventstream.h>
#include <zenremotestore/chunking/chunkblock.h>
#include <zenremotestore/chunking/chunkedcontent.h>
#include <zenremotestore/chunking/chunkedfile.h>
@@ -1474,6 +1474,8 @@ namespace {
ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::Download, TaskSteps::StepCount);
+ std::unique_ptr<UpdateFolderEventStream> EventStream = CreateDiskUpdateFolderEventStream("D:\\Temp\\download_log.zrb");
+
BuildsOperationUpdateFolder Updater(
Output,
Storage,
@@ -1490,35 +1492,35 @@ namespace {
BlockDescriptions,
LooseChunkHashes,
BuildsOperationUpdateFolder::Options{
- .IsQuiet = IsQuiet,
- .IsVerbose = IsVerbose,
- .AllowFileClone = Options.AllowFileClone,
- .UseSparseFiles = UseSparseFiles,
- .SystemRootDir = Options.SystemRootDir,
- .ZenFolderPath = Options.ZenFolderPath,
- .LargeAttachmentSize = LargeAttachmentSize,
- .PreferredMultipartChunkSize = PreferredMultipartChunkSize,
- .PartialBlockRequestMode = Options.PartialBlockRequestMode,
- .WipeTargetFolder = Options.CleanTargetFolder,
- .PrimeCacheOnly = Options.PrimeCacheOnly,
- .EnableOtherDownloadsScavenging = Options.EnableOtherDownloadsScavenging,
- .EnableTargetFolderScavenging = Options.EnableTargetFolderScavenging || Options.AppendNewContent,
- .ValidateCompletedSequences = Options.PostDownloadVerify,
- .ExcludeFolders = Options.ExcludeFolders,
- .MaximumInMemoryPayloadSize = Options.MaximumInMemoryPayloadSize,
- .PopulateCache = Options.PopulateCache});
+ .IsQuiet = IsQuiet,
+ .IsVerbose = IsVerbose,
+ .AllowFileClone = Options.AllowFileClone,
+ .UseSparseFiles = UseSparseFiles,
+ .SystemRootDir = Options.SystemRootDir,
+ .ZenFolderPath = Options.ZenFolderPath,
+ .LargeAttachmentSize = LargeAttachmentSize,
+ .PreferredMultipartChunkSize = PreferredMultipartChunkSize,
+ .PartialBlockRequestMode = Options.PartialBlockRequestMode,
+ .WipeTargetFolder = Options.CleanTargetFolder,
+ .PrimeCacheOnly = Options.PrimeCacheOnly,
+ .EnableOtherDownloadsScavenging = Options.EnableOtherDownloadsScavenging,
+ .EnableTargetFolderScavenging = Options.EnableTargetFolderScavenging || Options.AppendNewContent,
+ .ValidateCompletedSequences = Options.PostDownloadVerify,
+ .ExcludeFolders = Options.ExcludeFolders,
+ .MaximumInMemoryPayloadSize = Options.MaximumInMemoryPayloadSize,
+ .PopulateCache = Options.PopulateCache,
+ .OptionalUpdateFolderEventStream = EventStream.get()});
{
ProgressBar::PushLogOperation(ProgressMode, "Download");
auto _ = MakeGuard([]() { ProgressBar::PopLogOperation(ProgressMode); });
- std::unique_ptr<UpdateFolderStream> Feedback = CreateDiskUpdateFolderStream("D:\\Temp\\download_log.zrb");
-
FolderContent UpdatedLocalFolderState;
- Updater.Execute(UpdatedLocalFolderState, Feedback.get());
+ Updater.Execute(UpdatedLocalFolderState);
LocalState.State.ChunkedContent = RemoteContent;
LocalState.FolderState = std::move(UpdatedLocalFolderState);
}
+ EventStream.reset();
VerifyFolderStatistics VerifyFolderStats;
if (!AbortFlag)
diff --git a/src/zen/cmds/view_cmd.cpp b/src/zen/cmds/view_cmd.cpp
index c8bece8c3..8d81a6680 100644
--- a/src/zen/cmds/view_cmd.cpp
+++ b/src/zen/cmds/view_cmd.cpp
@@ -7,7 +7,7 @@
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/scopeguard.h>
-#include <zenremotestore/builds/updatefolderstream.h>
+#include <zenremotestore/builds/updatefoldereventdiskstream.h>
#include "../imgui_ui.h"
#include "imgui.h"
@@ -30,7 +30,7 @@ namespace {
virtual void SetCurrentTimeUs(uint64_t CurrentUs) = 0;
};
- class UpdateFolderDiskStreamUI : public ViewUI
+ class UpdateFolderEventDiskStreamUI : public ViewUI
{
public:
virtual std::string Title() override { return "Update Folder"; }
@@ -41,20 +41,20 @@ namespace {
m_InputReady = false;
m_InputPath = InputPath;
}
- UpdateFolderDiskStream::Read(InputPath,
- m_RemoteContent,
- m_BlockDescriptions,
- m_DownloadChunks,
- m_DownloadBlockRanges,
- m_DownloadBlocks,
- m_SequencesToWrite,
- m_Events);
+ UpdateFolderEventDiskStream::Read(InputPath,
+ m_RemoteContent,
+ m_BlockDescriptions,
+ m_DownloadChunks,
+ m_DownloadBlockRanges,
+ m_DownloadBlocks,
+ m_SequencesToWrite,
+ m_Events);
if (!m_Events.empty())
{
std::sort(m_Events.begin(),
m_Events.end(),
- [](const UpdateFolderDiskStream::Event& Lhs, const UpdateFolderDiskStream::Event& Rhs) {
+ [](const UpdateFolderEventDiskStream::Event& Lhs, const UpdateFolderEventDiskStream::Event& Rhs) {
const uint64_t TimeStampLshUs = Lhs.Header.TimestampUs();
const uint64_t TimeStampRshUs = Rhs.Header.TimestampUs();
if (TimeStampLshUs < TimeStampRshUs)
@@ -96,7 +96,7 @@ namespace {
const uint64_t Size = RawHashToRawSize.at(m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
m_RenderSequenceStatues.push_back(ChunkSequenceStatus{.Size = Size, .Completed = true});
}
- for (const UpdateFolderStream::Sequence& SequenceToWrite : m_SequencesToWrite)
+ for (const UpdateFolderEventStream::Sequence& SequenceToWrite : m_SequencesToWrite)
{
m_RenderSequenceStatues[SequenceToWrite.RemoteSequenceIndex].Completed = false;
}
@@ -196,11 +196,11 @@ namespace {
}
}
- void UpdateStateWithEvent(const UpdateFolderDiskStream::Event& Event)
+ void UpdateStateWithEvent(const UpdateFolderEventDiskStream::Event& Event)
{
switch (Event.Header.EventType)
{
- case UpdateFolderDiskStream::Event::EventType::WroteSequence:
+ case UpdateFolderEventDiskStream::Event::EventType::WroteSequence:
{
ChunkSequenceStatus& Sequence = m_RenderSequenceStatues[Event.Payload.WroteSequence.RemoteSequenceIndex];
ZEN_ASSERT(!Sequence.Completed);
@@ -208,7 +208,7 @@ namespace {
ChunkSequenceStatus::Range{Event.Payload.WroteSequence.Offset, Event.Payload.WroteSequence.Length});
}
break;
- case UpdateFolderDiskStream::Event::EventType::CompletedSequence:
+ case UpdateFolderEventDiskStream::Event::EventType::CompletedSequence:
{
ChunkSequenceStatus& Sequence = m_RenderSequenceStatues[Event.Payload.CompletedSequence.RemoteSequenceIndex];
ZEN_ASSERT(!Sequence.Completed);
@@ -238,7 +238,7 @@ namespace {
Sequence.Completed = true;
}
break;
- case UpdateFolderDiskStream::Event::EventType::WriteComplete:
+ case UpdateFolderEventDiskStream::Event::EventType::WriteComplete:
break;
default:
// TODO
@@ -255,13 +255,13 @@ namespace {
Stopwatch m_RealtimePlaybackStartUs;
- ChunkedFolderContent m_RemoteContent;
- std::vector<ChunkBlockDescription> m_BlockDescriptions;
- std::vector<UpdateFolderStream::Chunk> m_DownloadChunks;
- std::vector<UpdateFolderStream::BlockRange> m_DownloadBlockRanges;
- std::vector<UpdateFolderStream::Block> m_DownloadBlocks;
- std::vector<UpdateFolderStream::Sequence> m_SequencesToWrite;
- std::vector<UpdateFolderDiskStream::Event> m_Events;
+ ChunkedFolderContent m_RemoteContent;
+ std::vector<ChunkBlockDescription> m_BlockDescriptions;
+ std::vector<UpdateFolderEventStream::Chunk> m_DownloadChunks;
+ std::vector<UpdateFolderEventStream::BlockRange> m_DownloadBlockRanges;
+ std::vector<UpdateFolderEventStream::Block> m_DownloadBlocks;
+ std::vector<UpdateFolderEventStream::Sequence> m_SequencesToWrite;
+ std::vector<UpdateFolderEventDiskStream::Event> m_Events;
std::vector<ChunkSequenceStatus> m_RenderSequenceStatues;
};
@@ -306,7 +306,7 @@ ViewCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
std::filesystem::path AbsInputStreamPath = MakeSafeAbsolutePath(m_InputStream);
if (ToLower(AbsInputStreamPath.extension().string()) == ".zrb")
{
- UI = std::make_unique<UpdateFolderDiskStreamUI>();
+ UI = std::make_unique<UpdateFolderEventDiskStreamUI>();
}
if (!UI)
diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp
index 52062f6ff..3a9f2f92f 100644
--- a/src/zenremotestore/builds/buildstorageoperations.cpp
+++ b/src/zenremotestore/builds/buildstorageoperations.cpp
@@ -8,7 +8,7 @@
#include <zenremotestore/builds/buildstorage.h>
#include <zenremotestore/builds/buildstoragecache.h>
#include <zenremotestore/builds/buildstorageutil.h>
-#include <zenremotestore/builds/updatefolderstream.h>
+#include <zenremotestore/builds/updatefoldereventstream.h>
#include <zenremotestore/chunking/chunkblock.h>
#include <zenremotestore/chunking/chunkingcache.h>
#include <zenremotestore/chunking/chunkingcontroller.h>
@@ -553,10 +553,8 @@ BuildsOperationUpdateFolder::BuildsOperationUpdateFolder(OperationLogOutput&
}
void
-BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateFolderStream* OptionalUpdateFolderStream)
+BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
{
- ZEN_UNUSED(OptionalUpdateFolderStream);
-
ZEN_TRACE_CPU("BuildsOperationUpdateFolder::Execute");
try
{
@@ -1247,53 +1245,53 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF
}
}
- if (OptionalUpdateFolderStream)
+ if (m_Options.OptionalUpdateFolderEventStream)
{
- std::vector<UpdateFolderStream::Chunk> DownloadChunks;
+ std::vector<UpdateFolderEventStream::Chunk> DownloadChunks;
DownloadChunks.reserve(LooseChunkHashWorks.size());
for (const LooseChunkHashWorkData& LooseChunkHashWork : LooseChunkHashWorks)
{
- DownloadChunks.push_back(UpdateFolderStream::Chunk{.RemoteChunkIndex = LooseChunkHashWork.RemoteChunkIndex});
+ DownloadChunks.push_back(UpdateFolderEventStream::Chunk{.RemoteChunkIndex = LooseChunkHashWork.RemoteChunkIndex});
}
- std::vector<UpdateFolderStream::BlockRange> DownloadBlockRanges;
+ std::vector<UpdateFolderEventStream::BlockRange> DownloadBlockRanges;
DownloadBlockRanges.reserve(BlockRangeWorks.size());
for (const BlockRangeDescriptor& BlockRange : BlockRangeWorks)
{
DownloadBlockRanges.push_back(
- UpdateFolderStream::BlockRange{.BlockIndex = BlockRange.BlockIndex,
- .RangeStart = gsl::narrow<uint32_t>(BlockRange.RangeStart),
- .RangeLength = gsl::narrow<uint32_t>(BlockRange.RangeLength)});
+ UpdateFolderEventStream::BlockRange{.BlockIndex = BlockRange.BlockIndex,
+ .RangeStart = gsl::narrow<uint32_t>(BlockRange.RangeStart),
+ .RangeLength = gsl::narrow<uint32_t>(BlockRange.RangeLength)});
}
- std::vector<UpdateFolderStream::Block> DownloadBlocks;
+ std::vector<UpdateFolderEventStream::Block> DownloadBlocks;
for (uint32_t BlockIndex : FullBlockWorks)
{
- DownloadBlocks.push_back(UpdateFolderStream::Block{.BlockIndex = BlockIndex});
+ DownloadBlocks.push_back(UpdateFolderEventStream::Block{.BlockIndex = BlockIndex});
}
- std::vector<UpdateFolderStream::Sequence> SequencesToWrite;
+ std::vector<UpdateFolderEventStream::Sequence> SequencesToWrite;
for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < SequenceIndexChunksLeftToWriteCounters.size();
RemoteSequenceIndex++)
{
if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() > 0)
{
- SequencesToWrite.push_back(UpdateFolderStream::Sequence{.RemoteSequenceIndex = RemoteSequenceIndex});
+ SequencesToWrite.push_back(UpdateFolderEventStream::Sequence{.RemoteSequenceIndex = RemoteSequenceIndex});
}
}
for (const ScavengedSequenceCopyOperation& ScaveneCopyOp : ScavengedSequenceCopyOperations)
{
- SequencesToWrite.push_back(UpdateFolderStream::Sequence{ScaveneCopyOp.RemoteSequenceIndex});
+ SequencesToWrite.push_back(UpdateFolderEventStream::Sequence{ScaveneCopyOp.RemoteSequenceIndex});
}
std::sort(SequencesToWrite.begin(),
SequencesToWrite.end(),
- [](const UpdateFolderStream::Sequence& Lhs, const UpdateFolderStream::Sequence& Rhs) {
+ [](const UpdateFolderEventStream::Sequence& Lhs, const UpdateFolderEventStream::Sequence& Rhs) {
return Lhs.RemoteSequenceIndex < Rhs.RemoteSequenceIndex;
});
- OptionalUpdateFolderStream->Initialize(m_RemoteContent,
- m_BlockDescriptions,
- DownloadChunks,
- DownloadBlockRanges,
- DownloadBlocks,
- SequencesToWrite);
+ m_Options.OptionalUpdateFolderEventStream->Initialize(m_RemoteContent,
+ m_BlockDescriptions,
+ DownloadChunks,
+ DownloadBlockRanges,
+ DownloadBlocks,
+ SequencesToWrite);
}
BufferedWriteFileCache WriteCache;
@@ -1315,8 +1313,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF
&FilteredWrittenBytesPerSecond,
ScavengeOpIndex,
&WritePartsComplete,
- TotalPartWriteCount,
- OptionalUpdateFolderStream](std::atomic<bool>&) mutable {
+ TotalPartWriteCount](std::atomic<bool>&) mutable {
if (!m_AbortFlag)
{
ZEN_TRACE_CPU("Async_WriteScavenged");
@@ -1329,10 +1326,12 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF
WriteScavengedSequenceToCache(ScavengeRootPath, ScavengedContent, ScavengeOp);
- if (OptionalUpdateFolderStream)
+ if (m_Options.OptionalUpdateFolderEventStream)
{
- OptionalUpdateFolderStream->WroteSequence(ScavengeOp.RemoteSequenceIndex, 0, ScavengeOp.RawSize);
- OptionalUpdateFolderStream->CompletedSequence(ScavengeOp.RemoteSequenceIndex);
+ m_Options.OptionalUpdateFolderEventStream->WroteSequence(ScavengeOp.RemoteSequenceIndex,
+ 0,
+ ScavengeOp.RawSize);
+ m_Options.OptionalUpdateFolderEventStream->CompletedSequence(ScavengeOp.RemoteSequenceIndex);
}
WritePartsComplete++;
@@ -1375,8 +1374,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF
TotalPartWriteCount,
&WriteCache,
&FilteredDownloadedBytesPerSecond,
- &FilteredWrittenBytesPerSecond,
- OptionalUpdateFolderStream](std::atomic<bool>&) mutable {
+ &FilteredWrittenBytesPerSecond](std::atomic<bool>&) mutable {
ZEN_TRACE_CPU("Async_ReadPreDownloadedChunk");
if (!m_AbortFlag)
{
@@ -1392,8 +1390,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF
TotalRequestCount,
TotalPartWriteCount,
FilteredDownloadedBytesPerSecond,
- FilteredWrittenBytesPerSecond,
- OptionalUpdateFolderStream);
+ FilteredWrittenBytesPerSecond);
}
},
WorkerThreadPool::EMode::EnableBacklog);
@@ -1413,59 +1410,56 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF
break;
}
- Work.ScheduleWork(
- m_IOWorkerPool,
- [this,
- &CloneQuery,
- &SequenceIndexChunksLeftToWriteCounters,
- &WriteCache,
- &Work,
- &FilteredWrittenBytesPerSecond,
- &CopyChunkDatas,
- &ScavengedContents,
- &ScavengedLookups,
- &ScavengedPaths,
- &WritePartsComplete,
- TotalPartWriteCount,
- CopyDataIndex,
- OptionalUpdateFolderStream](std::atomic<bool>&) {
- if (!m_AbortFlag)
- {
- ZEN_TRACE_CPU("Async_CopyLocal");
-
- FilteredWrittenBytesPerSecond.Start();
- const CopyChunkData& CopyData = CopyChunkDatas[CopyDataIndex];
-
- std::vector<uint32_t> WrittenSequenceIndexes = WriteLocalChunkToCache(CloneQuery.get(),
- CopyData,
- ScavengedContents,
- ScavengedLookups,
- ScavengedPaths,
- WriteCache,
- OptionalUpdateFolderStream);
-
- WritePartsComplete++;
- if (!m_AbortFlag)
- {
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
- }
+ Work.ScheduleWork(m_IOWorkerPool,
+ [this,
+ &CloneQuery,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &WriteCache,
+ &Work,
+ &FilteredWrittenBytesPerSecond,
+ &CopyChunkDatas,
+ &ScavengedContents,
+ &ScavengedLookups,
+ &ScavengedPaths,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ CopyDataIndex](std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_CopyLocal");
+
+ FilteredWrittenBytesPerSecond.Start();
+ const CopyChunkData& CopyData = CopyChunkDatas[CopyDataIndex];
+
+ std::vector<uint32_t> WrittenSequenceIndexes = WriteLocalChunkToCache(CloneQuery.get(),
+ CopyData,
+ ScavengedContents,
+ ScavengedLookups,
+ ScavengedPaths,
+ WriteCache);
+
+ WritePartsComplete++;
+ if (!m_AbortFlag)
+ {
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
- // Write tracking, updating this must be done without any files open
- std::vector<uint32_t> CompletedChunkSequences;
- for (uint32_t RemoteSequenceIndex : WrittenSequenceIndexes)
- {
- if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters))
- {
- CompletedChunkSequences.push_back(RemoteSequenceIndex);
- }
- }
- WriteCache.Close(CompletedChunkSequences);
- VerifyAndCompleteChunkSequencesAsync(CompletedChunkSequences, Work, OptionalUpdateFolderStream);
- }
- }
- });
+ // Write tracking, updating this must be done without any files open
+ std::vector<uint32_t> CompletedChunkSequences;
+ for (uint32_t RemoteSequenceIndex : WrittenSequenceIndexes)
+ {
+ if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters))
+ {
+ CompletedChunkSequences.push_back(RemoteSequenceIndex);
+ }
+ }
+ WriteCache.Close(CompletedChunkSequences);
+ VerifyAndCompleteChunkSequencesAsync(CompletedChunkSequences, Work);
+ }
+ }
+ });
}
for (uint32_t BlockIndex : CachedChunkBlockIndexes)
@@ -1486,8 +1480,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF
&FilteredWrittenBytesPerSecond,
&WritePartsComplete,
TotalPartWriteCount,
- BlockIndex,
- OptionalUpdateFolderStream](std::atomic<bool>&) mutable {
+ BlockIndex](std::atomic<bool>&) mutable {
if (!m_AbortFlag)
{
ZEN_TRACE_CPU("Async_WriteCachedBlock");
@@ -1510,16 +1503,15 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF
Work,
CompositeBuffer(std::move(BlockBuffer)),
RemoteChunkIndexNeedsCopyFromSourceFlags,
- WriteCache,
- OptionalUpdateFolderStream))
+ WriteCache))
{
std::error_code DummyEc;
RemoveFile(BlockChunkPath, DummyEc);
throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash));
}
- if (OptionalUpdateFolderStream)
+ if (m_Options.OptionalUpdateFolderEventStream)
{
- OptionalUpdateFolderStream->CompletedBlock(BlockIndex);
+ m_Options.OptionalUpdateFolderEventStream->CompletedBlock(BlockIndex);
}
std::error_code Ec = TryRemoveFile(BlockChunkPath);
@@ -1564,8 +1556,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF
&FilteredWrittenBytesPerSecond,
&Work,
&BlockRangeWorks,
- BlockRangeIndex,
- OptionalUpdateFolderStream](std::atomic<bool>&) {
+ BlockRangeIndex](std::atomic<bool>&) {
if (!m_AbortFlag)
{
ZEN_TRACE_CPU("Async_GetPartialBlock");
@@ -1587,19 +1578,19 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF
TotalPartWriteCount,
&FilteredDownloadedBytesPerSecond,
&FilteredWrittenBytesPerSecond,
- &BlockRange,
- OptionalUpdateFolderStream](IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath) {
+ &BlockRange](IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath) {
if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount)
{
FilteredDownloadedBytesPerSecond.Stop();
}
- if (OptionalUpdateFolderStream)
+ if (m_Options.OptionalUpdateFolderEventStream)
{
const uint32_t BlockIndex = BlockRange.BlockIndex;
- OptionalUpdateFolderStream->DownloadedBlockRange(BlockIndex,
- gsl::narrow<uint32_t>(BlockRange.RangeStart),
- gsl::narrow<uint32_t>(BlockRange.RangeLength));
+ m_Options.OptionalUpdateFolderEventStream->DownloadedBlockRange(
+ BlockIndex,
+ gsl::narrow<uint32_t>(BlockRange.RangeStart),
+ gsl::narrow<uint32_t>(BlockRange.RangeLength));
}
if (!m_AbortFlag)
@@ -1616,8 +1607,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF
&FilteredWrittenBytesPerSecond,
&BlockRange,
BlockChunkPath = std::filesystem::path(OnDiskPath),
- BlockPartialBuffer = std::move(InMemoryBuffer),
- OptionalUpdateFolderStream](std::atomic<bool>&) mutable {
+ BlockPartialBuffer = std::move(InMemoryBuffer)](std::atomic<bool>&) mutable {
if (!m_AbortFlag)
{
ZEN_TRACE_CPU("Async_WritePartialBlock");
@@ -1645,7 +1635,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF
FilteredWrittenBytesPerSecond.Start();
- // TODO: Add OptionalUpdateFolderStream calls for chunks / chunk sequences
if (!WritePartialBlockChunksToCache(
BlockDescription,
SequenceIndexChunksLeftToWriteCounters,
@@ -1654,8 +1643,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF
BlockRange.ChunkBlockIndexStart,
BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1,
RemoteChunkIndexNeedsCopyFromSourceFlags,
- WriteCache,
- OptionalUpdateFolderStream))
+ WriteCache))
{
std::error_code DummyEc;
RemoveFile(BlockChunkPath, DummyEc);
@@ -1673,9 +1661,9 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF
Ec.message());
}
- if (OptionalUpdateFolderStream)
+ if (m_Options.OptionalUpdateFolderEventStream)
{
- OptionalUpdateFolderStream->CompletedBlockRange(
+ m_Options.OptionalUpdateFolderEventStream->CompletedBlockRange(
BlockIndex,
gsl::narrow<uint32_t>(BlockRange.RangeStart),
gsl::narrow<uint32_t>(BlockRange.RangeLength));
@@ -1722,8 +1710,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF
&SequenceIndexChunksLeftToWriteCounters,
&FilteredDownloadedBytesPerSecond,
TotalRequestCount,
- BlockIndex,
- OptionalUpdateFolderStream](std::atomic<bool>&) {
+ BlockIndex](std::atomic<bool>&) {
if (!m_AbortFlag)
{
ZEN_TRACE_CPU("Async_GetFullBlock");
@@ -1755,9 +1742,9 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF
throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
}
- if (OptionalUpdateFolderStream)
+ if (m_Options.OptionalUpdateFolderEventStream)
{
- OptionalUpdateFolderStream->DownloadedBlock(BlockIndex);
+ m_Options.OptionalUpdateFolderEventStream->DownloadedBlock(BlockIndex);
}
if (!m_AbortFlag)
@@ -1826,8 +1813,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF
TotalPartWriteCount,
&FilteredWrittenBytesPerSecond,
BlockChunkPath,
- BlockBuffer = std::move(BlockBuffer),
- OptionalUpdateFolderStream](std::atomic<bool>&) mutable {
+ BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable {
if (!m_AbortFlag)
{
ZEN_TRACE_CPU("Async_WriteFullBlock");
@@ -1858,8 +1844,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF
Work,
CompositeBuffer(std::move(BlockBuffer)),
RemoteChunkIndexNeedsCopyFromSourceFlags,
- WriteCache,
- OptionalUpdateFolderStream))
+ WriteCache))
{
std::error_code DummyEc;
RemoveFile(BlockChunkPath, DummyEc);
@@ -1880,9 +1865,9 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF
}
}
- if (OptionalUpdateFolderStream)
+ if (m_Options.OptionalUpdateFolderEventStream)
{
- OptionalUpdateFolderStream->CompletedBlock(BlockIndex);
+ m_Options.OptionalUpdateFolderEventStream->CompletedBlock(BlockIndex);
}
WritePartsComplete++;
@@ -1968,9 +1953,9 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState, UpdateF
WriteProgressBar.Finish();
- if (OptionalUpdateFolderStream)
+ if (m_Options.OptionalUpdateFolderEventStream)
{
- OptionalUpdateFolderStream->WriteComplete();
+ m_Options.OptionalUpdateFolderEventStream->WriteComplete();
}
if (m_AbortFlag)
@@ -3177,9 +3162,8 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd
ParallelWork& Work,
uint64_t TotalRequestCount,
uint64_t TotalPartWriteCount,
- FilteredRate& FilteredDownloadedBytesPerSecond,
- FilteredRate& FilteredWrittenBytesPerSecond,
- UpdateFolderStream* OptionalUpdateFolderStream)
+ FilteredRate& FilteredDownloadedBytesPerSecond,
+ FilteredRate& FilteredWrittenBytesPerSecond)
{
std::filesystem::path ExistingCompressedChunkPath;
if (!m_Options.PrimeCacheOnly)
@@ -3209,7 +3193,6 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd
TotalPartWriteCount,
&FilteredWrittenBytesPerSecond,
RemoteChunkIndex,
- OptionalUpdateFolderStream,
ChunkTargetPtrs = std::move(ChunkTargetPtrs),
CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>& AbortFlag) mutable {
if (!AbortFlag)
@@ -3227,11 +3210,8 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd
fmt::format("Could not open dowloaded compressed chunk {} from {}", ChunkHash, CompressedChunkPath));
}
- bool NeedHashVerify = WriteCompressedChunkToCache(RemoteChunkIndex,
- ChunkTargetPtrs,
- WriteCache,
- std::move(CompressedPart),
- OptionalUpdateFolderStream);
+ bool NeedHashVerify =
+ WriteCompressedChunkToCache(RemoteChunkIndex, ChunkTargetPtrs, WriteCache, std::move(CompressedPart));
WritePartsComplete++;
if (!AbortFlag)
@@ -3256,11 +3236,11 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd
WriteCache.Close(CompletedSequences);
if (NeedHashVerify)
{
- VerifyAndCompleteChunkSequencesAsync(CompletedSequences, Work, OptionalUpdateFolderStream);
+ VerifyAndCompleteChunkSequencesAsync(CompletedSequences, Work);
}
else
{
- FinalizeChunkSequences(CompletedSequences, OptionalUpdateFolderStream);
+ FinalizeChunkSequences(CompletedSequences);
}
}
}
@@ -3277,7 +3257,6 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd
&WritePartsComplete,
TotalPartWriteCount,
TotalRequestCount,
- OptionalUpdateFolderStream,
&FilteredDownloadedBytesPerSecond,
&FilteredWrittenBytesPerSecond,
RemoteChunkIndex,
@@ -3302,7 +3281,6 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd
RemoteChunkIndex,
&FilteredDownloadedBytesPerSecond,
&FilteredWrittenBytesPerSecond,
- OptionalUpdateFolderStream,
ChunkTargetPtrs = std::move(ChunkTargetPtrs)](IoBuffer&& Payload) mutable {
if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount)
{
@@ -3320,8 +3298,7 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd
WritePartsComplete,
TotalPartWriteCount,
FilteredWrittenBytesPerSecond,
- EnableBacklog,
- OptionalUpdateFolderStream);
+ EnableBacklog);
});
}
});
@@ -3760,8 +3737,7 @@ BuildsOperationUpdateFolder::WriteLocalChunkToCache(CloneQueryInterface* C
const std::vector<ChunkedFolderContent>& ScavengedContents,
const std::vector<ChunkedContentLookup>& ScavengedLookups,
const std::vector<std::filesystem::path>& ScavengedPaths,
- BufferedWriteFileCache& WriteCache,
- UpdateFolderStream* OptionalUpdateFolderStream)
+ BufferedWriteFileCache& WriteCache)
{
ZEN_TRACE_CPU("WriteLocalChunkToCache");
@@ -3933,9 +3909,11 @@ BuildsOperationUpdateFolder::WriteLocalChunkToCache(CloneQueryInterface* C
m_WrittenChunkByteCount += ClonableBytes;
- if (OptionalUpdateFolderStream)
+ if (m_Options.OptionalUpdateFolderEventStream)
{
- OptionalUpdateFolderStream->WroteSequence(RemoteSequenceIndex, Op.Target->Offset + PreBytes, ClonableBytes);
+ m_Options.OptionalUpdateFolderEventStream->WroteSequence(RemoteSequenceIndex,
+ Op.Target->Offset + PreBytes,
+ ClonableBytes);
}
if (PreBytes > 0)
@@ -3945,9 +3923,11 @@ BuildsOperationUpdateFolder::WriteLocalChunkToCache(CloneQueryInterface* C
WriteSequenceChunkToCache(LocalWriter, ChunkSource, RemoteSequenceIndex, FileOffset, RemotePathIndex);
- if (OptionalUpdateFolderStream)
+ if (m_Options.OptionalUpdateFolderEventStream)
{
- OptionalUpdateFolderStream->WroteSequence(RemoteSequenceIndex, FileOffset, ChunkSource.GetSize());
+ m_Options.OptionalUpdateFolderEventStream->WroteSequence(RemoteSequenceIndex,
+ FileOffset,
+ ChunkSource.GetSize());
}
}
if (PostBytes > 0)
@@ -3957,9 +3937,11 @@ BuildsOperationUpdateFolder::WriteLocalChunkToCache(CloneQueryInterface* C
WriteSequenceChunkToCache(LocalWriter, ChunkSource, RemoteSequenceIndex, FileOffset, RemotePathIndex);
- if (OptionalUpdateFolderStream)
+ if (m_Options.OptionalUpdateFolderEventStream)
{
- OptionalUpdateFolderStream->WroteSequence(RemoteSequenceIndex, FileOffset, ChunkSource.GetSize());
+ m_Options.OptionalUpdateFolderEventStream->WroteSequence(RemoteSequenceIndex,
+ FileOffset,
+ ChunkSource.GetSize());
}
}
}
@@ -3974,9 +3956,9 @@ BuildsOperationUpdateFolder::WriteLocalChunkToCache(CloneQueryInterface* C
WriteSequenceChunkToCache(LocalWriter, ChunkSource, RemoteSequenceIndex, FileOffset, RemotePathIndex);
- if (OptionalUpdateFolderStream)
+ if (m_Options.OptionalUpdateFolderEventStream)
{
- OptionalUpdateFolderStream->WroteSequence(RemoteSequenceIndex, FileOffset, ChunkSource.GetSize());
+ m_Options.OptionalUpdateFolderEventStream->WroteSequence(RemoteSequenceIndex, FileOffset, ChunkSource.GetSize());
}
}
}
@@ -4007,8 +3989,7 @@ BuildsOperationUpdateFolder::WriteCompressedChunkToCache(
const uint32_t RemoteChunkIndex,
const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs,
BufferedWriteFileCache& WriteCache,
- IoBuffer&& CompressedPart,
- UpdateFolderStream* OptionalUpdateFolderStream)
+ IoBuffer&& CompressedPart)
{
ZEN_TRACE_CPU("WriteCompressedChunkToCache");
@@ -4017,7 +3998,7 @@ BuildsOperationUpdateFolder::WriteCompressedChunkToCache(
if (IsSingleFileChunk(m_RemoteContent, ChunkTargetPtrs))
{
const std::uint32_t SequenceIndex = ChunkTargetPtrs.front()->SequenceIndex;
- StreamDecompress(SequenceIndex, CompositeBuffer(std::move(CompressedPart)), OptionalUpdateFolderStream);
+ StreamDecompress(SequenceIndex, CompositeBuffer(std::move(CompressedPart)));
return false;
}
else
@@ -4055,9 +4036,9 @@ BuildsOperationUpdateFolder::WriteCompressedChunkToCache(
WriteSequenceChunkToCache(LocalWriter, RangeBuffer, SequenceIndex, FileOffset, PathIndex);
- if (OptionalUpdateFolderStream)
+ if (m_Options.OptionalUpdateFolderEventStream)
{
- OptionalUpdateFolderStream->WroteSequence(SequenceIndex, FileOffset, RangeBuffer.GetSize());
+ m_Options.OptionalUpdateFolderEventStream->WroteSequence(SequenceIndex, FileOffset, RangeBuffer.GetSize());
}
}
@@ -4081,9 +4062,7 @@ BuildsOperationUpdateFolder::WriteCompressedChunkToCache(
}
void
-BuildsOperationUpdateFolder::StreamDecompress(uint32_t RemoteSequenceIndex,
- CompositeBuffer&& CompressedPart,
- UpdateFolderStream* OptionalUpdateFolderStream)
+BuildsOperationUpdateFolder::StreamDecompress(uint32_t RemoteSequenceIndex, CompositeBuffer&& CompressedPart)
{
ZEN_TRACE_CPU("StreamDecompress");
const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
@@ -4131,9 +4110,9 @@ BuildsOperationUpdateFolder::StreamDecompress(uint32_t RemoteSequenceIndex,
m_ValidatedChunkByteCount += Segment.GetSize();
}
DecompressedTemp.Write(Segment, Offset);
- if (OptionalUpdateFolderStream)
+ if (m_Options.OptionalUpdateFolderEventStream)
{
- OptionalUpdateFolderStream->WroteSequence(RemoteSequenceIndex, Offset, Segment.GetSize());
+ m_Options.OptionalUpdateFolderEventStream->WroteSequence(RemoteSequenceIndex, Offset, Segment.GetSize());
}
Offset += Segment.GetSize();
m_DiskStats.WriteByteCount += Segment.GetSize();
@@ -4330,8 +4309,7 @@ void
BuildsOperationUpdateFolder::WriteBlockChunkOpsToCache(std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
const BlockWriteOps& Ops,
BufferedWriteFileCache& WriteCache,
- ParallelWork& Work,
- UpdateFolderStream* OptionalUpdateFolderStream)
+ ParallelWork& Work)
{
ZEN_TRACE_CPU("WriteBlockChunkOpsToCache");
@@ -4353,9 +4331,9 @@ BuildsOperationUpdateFolder::WriteBlockChunkOpsToCache(std::span<std::atomic<uin
WriteSequenceChunkToCache(LocalWriter, Chunk, SequenceIndex, FileOffset, PathIndex);
- if (OptionalUpdateFolderStream)
+ if (m_Options.OptionalUpdateFolderEventStream)
{
- OptionalUpdateFolderStream->WroteSequence(SequenceIndex, FileOffset, Chunk.GetSize());
+ m_Options.OptionalUpdateFolderEventStream->WroteSequence(SequenceIndex, FileOffset, Chunk.GetSize());
}
}
}
@@ -4372,7 +4350,7 @@ BuildsOperationUpdateFolder::WriteBlockChunkOpsToCache(std::span<std::atomic<uin
}
}
WriteCache.Close(CompletedChunkSequences);
- VerifyAndCompleteChunkSequencesAsync(CompletedChunkSequences, Work, OptionalUpdateFolderStream);
+ VerifyAndCompleteChunkSequencesAsync(CompletedChunkSequences, Work);
}
}
@@ -4382,8 +4360,7 @@ BuildsOperationUpdateFolder::WriteChunksBlockToCache(const ChunkBlockDescription
ParallelWork& Work,
CompositeBuffer&& BlockBuffer,
std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
- BufferedWriteFileCache& WriteCache,
- UpdateFolderStream* OptionalUpdateFolderStream)
+ BufferedWriteFileCache& WriteCache)
{
ZEN_TRACE_CPU("WriteChunksBlockToCache");
@@ -4408,7 +4385,7 @@ BuildsOperationUpdateFolder::WriteChunksBlockToCache(const ChunkBlockDescription
gsl::narrow<uint32_t>(BlockDescription.ChunkRawHashes.size() - 1),
Ops))
{
- WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work, OptionalUpdateFolderStream);
+ WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work);
return true;
}
return false;
@@ -4423,7 +4400,7 @@ BuildsOperationUpdateFolder::WriteChunksBlockToCache(const ChunkBlockDescription
gsl::narrow<uint32_t>(BlockDescription.ChunkRawHashes.size() - 1),
Ops))
{
- WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work, OptionalUpdateFolderStream);
+ WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work);
return true;
}
return false;
@@ -4437,8 +4414,7 @@ BuildsOperationUpdateFolder::WritePartialBlockChunksToCache(const ChunkBlockDesc
uint32_t FirstIncludedBlockChunkIndex,
uint32_t LastIncludedBlockChunkIndex,
std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
- BufferedWriteFileCache& WriteCache,
- UpdateFolderStream* OptionalUpdateFolderStream)
+ BufferedWriteFileCache& WriteCache)
{
ZEN_TRACE_CPU("WritePartialBlockChunksToCache");
@@ -4455,7 +4431,7 @@ BuildsOperationUpdateFolder::WritePartialBlockChunksToCache(const ChunkBlockDesc
LastIncludedBlockChunkIndex,
Ops))
{
- WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work, OptionalUpdateFolderStream);
+ WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work);
return true;
}
else
@@ -4475,8 +4451,7 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa
std::atomic<uint64_t>& WritePartsComplete,
const uint64_t TotalPartWriteCount,
FilteredRate& FilteredWrittenBytesPerSecond,
- bool EnableBacklog,
- UpdateFolderStream* OptionalUpdateFolderStream)
+ bool EnableBacklog)
{
ZEN_TRACE_CPU("AsyncWriteDownloadedChunk");
@@ -4534,7 +4509,6 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa
&WriteCache,
&WritePartsComplete,
&FilteredWrittenBytesPerSecond,
- OptionalUpdateFolderStream,
ChunkTargetPtrs = std::move(ChunkTargetPtrs),
CompressedPart = IoBuffer(std::move(Payload))](std::atomic<bool>&) mutable {
if (!m_AbortFlag)
@@ -4559,11 +4533,7 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa
}
}
- bool NeedHashVerify = WriteCompressedChunkToCache(RemoteChunkIndex,
- ChunkTargetPtrs,
- WriteCache,
- std::move(CompressedPart),
- OptionalUpdateFolderStream);
+ bool NeedHashVerify = WriteCompressedChunkToCache(RemoteChunkIndex, ChunkTargetPtrs, WriteCache, std::move(CompressedPart));
if (!m_AbortFlag)
{
WritePartsComplete++;
@@ -4590,11 +4560,11 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa
WriteCache.Close(CompletedSequences);
if (NeedHashVerify)
{
- VerifyAndCompleteChunkSequencesAsync(CompletedSequences, Work, OptionalUpdateFolderStream);
+ VerifyAndCompleteChunkSequencesAsync(CompletedSequences, Work);
}
else
{
- FinalizeChunkSequences(CompletedSequences, OptionalUpdateFolderStream);
+ FinalizeChunkSequences(CompletedSequences);
}
}
}
@@ -4603,9 +4573,7 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa
}
void
-BuildsOperationUpdateFolder::VerifyAndCompleteChunkSequencesAsync(std::span<const uint32_t> RemoteSequenceIndexes,
- ParallelWork& Work,
- UpdateFolderStream* OptionalUpdateFolderStream)
+BuildsOperationUpdateFolder::VerifyAndCompleteChunkSequencesAsync(std::span<const uint32_t> RemoteSequenceIndexes, ParallelWork& Work)
{
if (RemoteSequenceIndexes.empty())
{
@@ -4617,7 +4585,7 @@ BuildsOperationUpdateFolder::VerifyAndCompleteChunkSequencesAsync(std::span<cons
for (uint32_t RemoteSequenceIndexOffset = 1; RemoteSequenceIndexOffset < RemoteSequenceIndexes.size(); RemoteSequenceIndexOffset++)
{
const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset];
- Work.ScheduleWork(m_IOWorkerPool, [this, RemoteSequenceIndex, OptionalUpdateFolderStream](std::atomic<bool>&) {
+ Work.ScheduleWork(m_IOWorkerPool, [this, RemoteSequenceIndex](std::atomic<bool>&) {
if (!m_AbortFlag)
{
ZEN_TRACE_CPU("Async_VerifyAndFinalizeSequence");
@@ -4625,7 +4593,7 @@ BuildsOperationUpdateFolder::VerifyAndCompleteChunkSequencesAsync(std::span<cons
VerifySequence(RemoteSequenceIndex);
if (!m_AbortFlag)
{
- FinalizeChunkSequence(RemoteSequenceIndex, OptionalUpdateFolderStream);
+ FinalizeChunkSequence(RemoteSequenceIndex);
}
}
});
@@ -4633,14 +4601,14 @@ BuildsOperationUpdateFolder::VerifyAndCompleteChunkSequencesAsync(std::span<cons
const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[0];
VerifySequence(RemoteSequenceIndex);
- FinalizeChunkSequence(RemoteSequenceIndex, OptionalUpdateFolderStream);
+ FinalizeChunkSequence(RemoteSequenceIndex);
}
else
{
for (uint32_t RemoteSequenceIndexOffset = 0; RemoteSequenceIndexOffset < RemoteSequenceIndexes.size(); RemoteSequenceIndexOffset++)
{
const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset];
- FinalizeChunkSequence(RemoteSequenceIndex, OptionalUpdateFolderStream);
+ FinalizeChunkSequence(RemoteSequenceIndex);
}
}
}
@@ -4674,7 +4642,7 @@ BuildsOperationUpdateFolder::CompleteChunkTargets(const std::vector<const Chunke
}
void
-BuildsOperationUpdateFolder::FinalizeChunkSequence(uint32_t RemoteSequenceIndex, UpdateFolderStream* OptionalUpdateFolderStream)
+BuildsOperationUpdateFolder::FinalizeChunkSequence(uint32_t RemoteSequenceIndex)
{
ZEN_TRACE_CPU("FinalizeChunkSequence");
@@ -4689,21 +4657,20 @@ BuildsOperationUpdateFolder::FinalizeChunkSequence(uint32_t RemoteSequenceIndex,
{
throw std::system_error(Ec);
}
- if (OptionalUpdateFolderStream)
+ if (m_Options.OptionalUpdateFolderEventStream)
{
- OptionalUpdateFolderStream->CompletedSequence(RemoteSequenceIndex);
+ m_Options.OptionalUpdateFolderEventStream->CompletedSequence(RemoteSequenceIndex);
}
}
void
-BuildsOperationUpdateFolder::FinalizeChunkSequences(std::span<const uint32_t> RemoteSequenceIndexes,
- UpdateFolderStream* OptionalUpdateFolderStream)
+BuildsOperationUpdateFolder::FinalizeChunkSequences(std::span<const uint32_t> RemoteSequenceIndexes)
{
ZEN_TRACE_CPU("FinalizeChunkSequences");
for (uint32_t SequenceIndex : RemoteSequenceIndexes)
{
- FinalizeChunkSequence(SequenceIndex, OptionalUpdateFolderStream);
+ FinalizeChunkSequence(SequenceIndex);
}
}
diff --git a/src/zenremotestore/builds/updatefolderstream.cpp b/src/zenremotestore/builds/updatefoldereventdiskstream.cpp
index 5a1809923..ad36189a3 100644
--- a/src/zenremotestore/builds/updatefolderstream.cpp
+++ b/src/zenremotestore/builds/updatefoldereventdiskstream.cpp
@@ -1,6 +1,6 @@
// Copyright Epic Games, Inc. All Rights Reserved.
-#include <zenremotestore/builds/updatefolderstream.h>
+#include <zenremotestore/builds/updatefoldereventdiskstream.h>
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinaryutil.h>
@@ -23,9 +23,9 @@ namespace {
#pragma pack(1)
struct StreamHeader
{
- static constexpr uint32_t ExpectedMagic = 0x7a726273; // 'zrbs';
- static constexpr uint32_t CurrentVersion = 1;
- static constexpr uint32_t UpdateFolderStreamType = HashStringDjb2("UpdateFolderDiskStreamFeedback"sv);
+ static constexpr uint32_t ExpectedMagic = 0x7a726273; // 'zrbs';
+ static constexpr uint32_t CurrentVersion = 1;
+ static constexpr uint32_t UpdateFolderEventDiskStreamType = HashStringDjb2("UpdateFolderEventDiskStream"sv);
uint32_t Magic = ExpectedMagic;
uint32_t Version = CurrentVersion;
@@ -43,89 +43,10 @@ namespace {
#pragma pack(pop)
static_assert(sizeof(StreamHeader) == 32);
- class UpdateFolderDiskStreamFeedback : public UpdateFolderStream
- {
- using Event = UpdateFolderDiskStream::Event;
-
- public:
- explicit UpdateFolderDiskStreamFeedback(const std::filesystem::path& OutputPath) : m_OutputStream(OutputPath) {}
- virtual ~UpdateFolderDiskStreamFeedback() {}
-
- virtual void Initialize(const ChunkedFolderContent& RemoteContent,
- std::span<const ChunkBlockDescription> BlockDescriptions,
- std::span<const Chunk> DownloadChunks,
- std::span<const BlockRange> DownloadBlockRanges,
- std::span<const Block> DownloadBlocks,
- std::span<const Sequence> SequencesToWrite) override
- {
- m_OutputStream
- .PrepareWrite(RemoteContent, BlockDescriptions, DownloadChunks, DownloadBlockRanges, DownloadBlocks, SequencesToWrite);
- }
-
- virtual void DownloadedChunk(uint32_t RemoteChunkIndex) override
- {
- m_OutputStream.WriteEvent(
- m_OutputStream.MakeEvent(Event::EventType::DownloadedChunk,
- Event::EventPayload{.DownloadedChunk = {.RemoteChunkIndex = RemoteChunkIndex}}));
- }
-
- virtual void DownloadedBlockRange(uint32_t BlockIndex, uint32_t RangeStart, uint32_t RangeLength) override
- {
- m_OutputStream.WriteEvent(m_OutputStream.MakeEvent(
- Event::EventType::DownloadedBlockRange,
- Event::EventPayload{
- .DownloadedBlockRange = {.BlockIndex = BlockIndex, .RangeStart = RangeStart, .RangeLength = RangeLength}}));
- }
-
- virtual void DownloadedBlock(uint32_t BlockIndex) override
- {
- m_OutputStream.WriteEvent(m_OutputStream.MakeEvent(Event::EventType::DownloadedBlock,
- Event::EventPayload{.DownloadedBlock = {.BlockIndex = BlockIndex}}));
- }
-
- virtual void WroteSequence(uint32_t RemoteSequenceIndex, uint64_t Offset, uint64_t Length) override
- {
- m_OutputStream.WriteEvent(m_OutputStream.MakeEvent(
- Event::EventType::WroteSequence,
- Event::EventPayload{.WroteSequence = {.RemoteSequenceIndex = RemoteSequenceIndex, .Offset = Offset, .Length = Length}}));
- }
-
- virtual void CompletedBlockRange(uint32_t BlockIndex, uint32_t RangeStart, uint32_t RangeLength) override
- {
- m_OutputStream.WriteEvent(m_OutputStream.MakeEvent(
- Event::EventType::CompletedBlockRange,
- Event::EventPayload{
- .CompletedBlockRange = {.BlockIndex = BlockIndex, .RangeStart = RangeStart, .RangeLength = RangeLength}}));
- }
-
- virtual void CompletedBlock(uint32_t BlockIndex) override
- {
- m_OutputStream.WriteEvent(m_OutputStream.MakeEvent(Event::EventType::CompletedBlock,
- Event::EventPayload{.CompletedBlock = {.BlockIndex = BlockIndex}}));
- }
-
- virtual void CompletedSequence(uint32_t RemoteSequenceIndex) override
- {
- m_OutputStream.WriteEvent(
- m_OutputStream.MakeEvent(Event::EventType::CompletedSequence,
- Event::EventPayload{.CompletedSequence = {.RemoteSequenceIndex = RemoteSequenceIndex}}));
- }
-
- virtual void WriteComplete() override { m_OutputStream.EndWrite(); }
-
- private:
- UpdateFolderDiskStream m_OutputStream;
- };
} // namespace
-std::unique_ptr<UpdateFolderStream>
-CreateDiskUpdateFolderStream(const std::filesystem::path& OutputPath)
-{
- return std::make_unique<UpdateFolderDiskStreamFeedback>(OutputPath);
-}
-
-UpdateFolderDiskStream::Event::EventHeader
-UpdateFolderDiskStream::Event::MakeHeader(Event::EventType Type, uint64_t TimeStampUs, uint16_t ThreadIndex)
+UpdateFolderEventDiskStream::Event::EventHeader
+UpdateFolderEventDiskStream::Event::MakeHeader(Event::EventType Type, uint64_t TimeStampUs, uint16_t ThreadIndex)
{
return EventHeader{.EventType = Type,
.TimestampUsHigh = gsl::narrow<uint8_t>(TimeStampUs >> 32),
@@ -133,14 +54,14 @@ UpdateFolderDiskStream::Event::MakeHeader(Event::EventType Type, uint64_t TimeSt
.ThreadIndex = ThreadIndex};
}
-UpdateFolderDiskStream::UpdateFolderDiskStream(const std::filesystem::path& OutputPath)
+UpdateFolderEventDiskStream::UpdateFolderEventDiskStream(const std::filesystem::path& OutputPath)
: m_OutputPath(OutputPath)
, m_Output(OutputPath, BasicFile::Mode::kTruncate)
, m_WriteOffset(0)
{
}
-UpdateFolderDiskStream::~UpdateFolderDiskStream()
+UpdateFolderEventDiskStream::~UpdateFolderEventDiskStream()
{
try
{
@@ -152,14 +73,14 @@ UpdateFolderDiskStream::~UpdateFolderDiskStream()
}
}
-UpdateFolderDiskStream::Event
-UpdateFolderDiskStream::MakeEvent(Event::EventType Type, Event::EventPayload Payload)
+UpdateFolderEventDiskStream::Event
+UpdateFolderEventDiskStream::MakeEvent(Event::EventType Type, Event::EventPayload Payload)
{
return Event{.Header = Event::MakeHeader(Type, m_Stopwatch.GetElapsedTimeUs(), GetCurrentThreadIndex()), .Payload = Payload};
}
void
-UpdateFolderDiskStream::WriteEvent(const Event& InEvent)
+UpdateFolderEventDiskStream::WriteEvent(const Event& InEvent)
{
uint64_t WriteOffset = m_WriteOffset.fetch_add(sizeof(Event));
m_Output.Write(&InEvent, sizeof(Event), WriteOffset);
@@ -167,12 +88,12 @@ UpdateFolderDiskStream::WriteEvent(const Event& InEvent)
}
void
-UpdateFolderDiskStream::PrepareWrite(const ChunkedFolderContent& RemoteContent,
- std::span<const ChunkBlockDescription> BlockDescriptions,
- std::span<const Chunk> DownloadChunks,
- std::span<const BlockRange> DownloadBlockRanges,
- std::span<const Block> DownloadBlocks,
- std::span<const Sequence> SequencesToWrite)
+UpdateFolderEventDiskStream::PrepareWrite(const ChunkedFolderContent& RemoteContent,
+ std::span<const ChunkBlockDescription> BlockDescriptions,
+ std::span<const Chunk> DownloadChunks,
+ std::span<const BlockRange> DownloadBlockRanges,
+ std::span<const Block> DownloadBlocks,
+ std::span<const Sequence> SequencesToWrite)
{
CbObjectWriter Writer;
Writer.BeginObject("remoteContent"sv);
@@ -261,7 +182,7 @@ UpdateFolderDiskStream::PrepareWrite(const ChunkedFolderContent& RemoteContent
uint32_t PrepPayloadSize = gsl::narrow<uint32_t>(PayloadBuffer.GetSize());
- StreamHeader Header = {.StreamType = StreamHeader::UpdateFolderStreamType, .PrepPayloadSize = PrepPayloadSize};
+ StreamHeader Header = {.StreamType = StreamHeader::UpdateFolderEventDiskStreamType, .PrepPayloadSize = PrepPayloadSize};
Header.Checksum = StreamHeader::ComputeChecksum(Header);
@@ -279,21 +200,21 @@ UpdateFolderDiskStream::PrepareWrite(const ChunkedFolderContent& RemoteContent
}
void
-UpdateFolderDiskStream::EndWrite()
+UpdateFolderEventDiskStream::EndWrite()
{
RwLock::ExclusiveLockScope _(m_UpdateLock);
m_Output.Close();
}
void
-UpdateFolderDiskStream::Read(const std::filesystem::path& InputPath,
- ChunkedFolderContent& OutRemoteContent,
- std::vector<ChunkBlockDescription>& OutBlockDescriptions,
- std::vector<Chunk>& OutDownloadChunks,
- std::vector<BlockRange>& OutDownloadBlockRanges,
- std::vector<Block>& OutDownloadBlocks,
- std::vector<Sequence>& OutSequencesToWrite,
- std::vector<Event>& OutEvents)
+UpdateFolderEventDiskStream::Read(const std::filesystem::path& InputPath,
+ ChunkedFolderContent& OutRemoteContent,
+ std::vector<ChunkBlockDescription>& OutBlockDescriptions,
+ std::vector<Chunk>& OutDownloadChunks,
+ std::vector<BlockRange>& OutDownloadBlockRanges,
+ std::vector<Block>& OutDownloadBlocks,
+ std::vector<Sequence>& OutSequencesToWrite,
+ std::vector<Event>& OutEvents)
{
BasicFile InputFile(InputPath, BasicFile::Mode::kRead);
@@ -317,10 +238,11 @@ UpdateFolderDiskStream::Read(const std::filesystem::path& InputPath,
{
throw std::runtime_error(fmt::format("Expected version {}, file has version {}", StreamHeader::CurrentVersion, Header.Version));
}
- if (Header.StreamType != StreamHeader::UpdateFolderStreamType)
+ if (Header.StreamType != StreamHeader::UpdateFolderEventDiskStreamType)
{
- throw std::runtime_error(
- fmt::format("Expected stream type {}, file has stream type {}", StreamHeader::UpdateFolderStreamType, Header.StreamType));
+ throw std::runtime_error(fmt::format("Expected stream type {}, file has stream type {}",
+ StreamHeader::UpdateFolderEventDiskStreamType,
+ Header.StreamType));
}
if (Header.Checksum != StreamHeader::ComputeChecksum(Header))
{
@@ -434,70 +356,8 @@ UpdateFolderDiskStream::Read(const std::filesystem::path& InputPath,
InputFile.Read(OutEvents.data(), EntryCount * sizeof(Event), Offset);
}
-void
-UpdateFolderDiskStream::Replay(const std::filesystem::path& InputPath, UpdateFolderStream& UpdateFolderStream)
-{
- ZEN_UNUSED(UpdateFolderStream);
-
- ChunkedFolderContent RemoteContent;
- std::vector<ChunkBlockDescription> BlockDescriptions;
- std::vector<Chunk> DownloadChunks;
- std::vector<BlockRange> DownloadBlockRanges;
- std::vector<Block> DownloadBlocks;
- std::vector<Sequence> SequencesToWrite;
- std::vector<Event> Events;
-
- Read(InputPath, RemoteContent, BlockDescriptions, DownloadChunks, DownloadBlockRanges, DownloadBlocks, SequencesToWrite, Events);
-
- UpdateFolderStream.Initialize(RemoteContent, BlockDescriptions, DownloadChunks, DownloadBlockRanges, DownloadBlocks, SequencesToWrite);
-
- for (const Event& E : Events)
- {
- switch (E.Header.EventType)
- {
- case Event::EventType::DownloadedChunk:
- UpdateFolderStream.DownloadedChunk(E.Payload.DownloadedChunk.RemoteChunkIndex);
- break;
- case Event::EventType::DownloadedBlockRange:
- UpdateFolderStream.DownloadedBlockRange(E.Payload.DownloadedBlockRange.BlockIndex,
- E.Payload.DownloadedBlockRange.RangeStart,
- E.Payload.DownloadedBlockRange.RangeLength);
- break;
- case Event::EventType::DownloadedBlock:
- UpdateFolderStream.DownloadedBlock(E.Payload.DownloadedBlock.BlockIndex);
- break;
-
- case Event::EventType::WroteSequence:
- UpdateFolderStream.WroteSequence(E.Payload.WroteSequence.RemoteSequenceIndex,
- E.Payload.WroteSequence.Offset,
- E.Payload.WroteSequence.Length);
- break;
-
- case Event::EventType::CompletedBlockRange:
- UpdateFolderStream.CompletedBlockRange(E.Payload.CompletedBlockRange.BlockIndex,
- E.Payload.CompletedBlockRange.RangeStart,
- E.Payload.CompletedBlockRange.RangeLength);
- break;
- case Event::EventType::CompletedBlock:
- UpdateFolderStream.CompletedBlock(E.Payload.CompletedBlock.BlockIndex);
- break;
-
- case Event::EventType::CompletedSequence:
- UpdateFolderStream.CompletedSequence(E.Payload.CompletedSequence.RemoteSequenceIndex);
- break;
-
- case Event::EventType::WriteComplete:
- UpdateFolderStream.WriteComplete();
- break;
-
- default:
- throw std::runtime_error(std::format("Unknown event type: {}", uint8_t(E.Header.EventType)));
- }
- }
-}
-
uint16_t
-UpdateFolderDiskStream::GetThreadIndex(int ThreadId)
+UpdateFolderEventDiskStream::GetThreadIndex(int ThreadId)
{
{
RwLock::SharedLockScope _(m_UpdateLock);
@@ -517,7 +377,7 @@ UpdateFolderDiskStream::GetThreadIndex(int ThreadId)
}
uint16_t
-UpdateFolderDiskStream::GetCurrentThreadIndex()
+UpdateFolderEventDiskStream::GetCurrentThreadIndex()
{
return GetThreadIndex(GetCurrentThreadId());
}
diff --git a/src/zenremotestore/builds/updatefoldereventstream.cpp b/src/zenremotestore/builds/updatefoldereventstream.cpp
new file mode 100644
index 000000000..fe635e245
--- /dev/null
+++ b/src/zenremotestore/builds/updatefoldereventstream.cpp
@@ -0,0 +1,174 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenremotestore/builds/updatefoldereventstream.h>
+
+#include <zenremotestore/builds/updatefoldereventdiskstream.h>
+
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/stream.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <xxhash.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#include <numeric>
+
+namespace zen {
+
+using namespace std::literals;
+
+namespace {
+ class DiskUpdateFolderEventStream : public UpdateFolderEventStream
+ {
+ using Event = UpdateFolderEventDiskStream::Event;
+
+ public:
+ explicit DiskUpdateFolderEventStream(const std::filesystem::path& OutputPath) : m_OutputStream(OutputPath) {}
+ virtual ~DiskUpdateFolderEventStream() {}
+
+ virtual void Initialize(const ChunkedFolderContent& RemoteContent,
+ std::span<const ChunkBlockDescription> BlockDescriptions,
+ std::span<const Chunk> DownloadChunks,
+ std::span<const BlockRange> DownloadBlockRanges,
+ std::span<const Block> DownloadBlocks,
+ std::span<const Sequence> SequencesToWrite) override
+ {
+ m_OutputStream
+ .PrepareWrite(RemoteContent, BlockDescriptions, DownloadChunks, DownloadBlockRanges, DownloadBlocks, SequencesToWrite);
+ }
+
+ virtual void DownloadedChunk(uint32_t RemoteChunkIndex) override
+ {
+ m_OutputStream.WriteEvent(
+ m_OutputStream.MakeEvent(Event::EventType::DownloadedChunk,
+ Event::EventPayload{.DownloadedChunk = {.RemoteChunkIndex = RemoteChunkIndex}}));
+ }
+
+ virtual void DownloadedBlockRange(uint32_t BlockIndex, uint32_t RangeStart, uint32_t RangeLength) override
+ {
+ m_OutputStream.WriteEvent(m_OutputStream.MakeEvent(
+ Event::EventType::DownloadedBlockRange,
+ Event::EventPayload{
+ .DownloadedBlockRange = {.BlockIndex = BlockIndex, .RangeStart = RangeStart, .RangeLength = RangeLength}}));
+ }
+
+ virtual void DownloadedBlock(uint32_t BlockIndex) override
+ {
+ m_OutputStream.WriteEvent(m_OutputStream.MakeEvent(Event::EventType::DownloadedBlock,
+ Event::EventPayload{.DownloadedBlock = {.BlockIndex = BlockIndex}}));
+ }
+
+ virtual void WroteSequence(uint32_t RemoteSequenceIndex, uint64_t Offset, uint64_t Length) override
+ {
+ m_OutputStream.WriteEvent(m_OutputStream.MakeEvent(
+ Event::EventType::WroteSequence,
+ Event::EventPayload{.WroteSequence = {.RemoteSequenceIndex = RemoteSequenceIndex, .Offset = Offset, .Length = Length}}));
+ }
+
+ virtual void CompletedBlockRange(uint32_t BlockIndex, uint32_t RangeStart, uint32_t RangeLength) override
+ {
+ m_OutputStream.WriteEvent(m_OutputStream.MakeEvent(
+ Event::EventType::CompletedBlockRange,
+ Event::EventPayload{
+ .CompletedBlockRange = {.BlockIndex = BlockIndex, .RangeStart = RangeStart, .RangeLength = RangeLength}}));
+ }
+
+ virtual void CompletedBlock(uint32_t BlockIndex) override
+ {
+ m_OutputStream.WriteEvent(m_OutputStream.MakeEvent(Event::EventType::CompletedBlock,
+ Event::EventPayload{.CompletedBlock = {.BlockIndex = BlockIndex}}));
+ }
+
+ virtual void CompletedSequence(uint32_t RemoteSequenceIndex) override
+ {
+ m_OutputStream.WriteEvent(
+ m_OutputStream.MakeEvent(Event::EventType::CompletedSequence,
+ Event::EventPayload{.CompletedSequence = {.RemoteSequenceIndex = RemoteSequenceIndex}}));
+ }
+
+ virtual void WriteComplete() override { m_OutputStream.EndWrite(); }
+
+ private:
+ UpdateFolderEventDiskStream m_OutputStream;
+ };
+} // namespace
+
+std::unique_ptr<UpdateFolderEventStream>
+CreateDiskUpdateFolderEventStream(const std::filesystem::path& OutputPath)
+{
+ return std::make_unique<DiskUpdateFolderEventStream>(OutputPath);
+}
+
+void
+ReplayUpdateFolderEventDiskStream(const std::filesystem::path& InputPath, UpdateFolderEventStream& EventStream)
+{
+ using Chunk = UpdateFolderEventStream::Chunk;
+ using BlockRange = UpdateFolderEventStream::BlockRange;
+ using Block = UpdateFolderEventStream::Block;
+ using Sequence = UpdateFolderEventStream::Sequence;
+ using Event = UpdateFolderEventDiskStream::Event;
+
+ ChunkedFolderContent RemoteContent;
+ std::vector<ChunkBlockDescription> BlockDescriptions;
+ std::vector<Chunk> DownloadChunks;
+ std::vector<BlockRange> DownloadBlockRanges;
+ std::vector<Block> DownloadBlocks;
+ std::vector<Sequence> SequencesToWrite;
+ std::vector<Event> Events;
+
+ UpdateFolderEventDiskStream::Read(InputPath,
+ RemoteContent,
+ BlockDescriptions,
+ DownloadChunks,
+ DownloadBlockRanges,
+ DownloadBlocks,
+ SequencesToWrite,
+ Events);
+
+ for (const Event& E : Events)
+ {
+ switch (E.Header.EventType)
+ {
+ case Event::EventType::DownloadedChunk:
+ EventStream.DownloadedChunk(E.Payload.DownloadedChunk.RemoteChunkIndex);
+ break;
+ case Event::EventType::DownloadedBlockRange:
+ EventStream.DownloadedBlockRange(E.Payload.DownloadedBlockRange.BlockIndex,
+ E.Payload.DownloadedBlockRange.RangeStart,
+ E.Payload.DownloadedBlockRange.RangeLength);
+ break;
+ case Event::EventType::DownloadedBlock:
+ EventStream.DownloadedBlock(E.Payload.DownloadedBlock.BlockIndex);
+ break;
+
+ case Event::EventType::WroteSequence:
+ EventStream.WroteSequence(E.Payload.WroteSequence.RemoteSequenceIndex,
+ E.Payload.WroteSequence.Offset,
+ E.Payload.WroteSequence.Length);
+ break;
+
+ case Event::EventType::CompletedBlockRange:
+ EventStream.CompletedBlockRange(E.Payload.CompletedBlockRange.BlockIndex,
+ E.Payload.CompletedBlockRange.RangeStart,
+ E.Payload.CompletedBlockRange.RangeLength);
+ break;
+ case Event::EventType::CompletedBlock:
+ EventStream.CompletedBlock(E.Payload.CompletedBlock.BlockIndex);
+ break;
+
+ case Event::EventType::CompletedSequence:
+ EventStream.CompletedSequence(E.Payload.CompletedSequence.RemoteSequenceIndex);
+ break;
+
+ case Event::EventType::WriteComplete:
+ EventStream.WriteComplete();
+ break;
+
+ default:
+ throw std::runtime_error(std::format("Unknown event type: {}", uint8_t(E.Header.EventType)));
+ }
+ }
+}
+
+} // namespace zen
diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
index fe69a624c..07a9198df 100644
--- a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
+++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
@@ -35,7 +35,7 @@ class BufferedWriteFileCache;
struct ChunkBlockDescription;
struct ChunkedFolderContent;
-class UpdateFolderStream;
+class UpdateFolderEventStream;
struct DiskStatistics
{
@@ -144,8 +144,9 @@ public:
bool EnableTargetFolderScavenging = true;
bool ValidateCompletedSequences = true;
std::vector<std::string> ExcludeFolders;
- uint64_t MaximumInMemoryPayloadSize = 512u * 1024u;
- bool PopulateCache = true;
+ uint64_t MaximumInMemoryPayloadSize = 512u * 1024u;
+ bool PopulateCache = true;
+ UpdateFolderEventStream* OptionalUpdateFolderEventStream = nullptr;
};
BuildsOperationUpdateFolder(OperationLogOutput& OperationLogOutput,
@@ -164,7 +165,7 @@ public:
const std::vector<IoHash>& LooseChunkHashes,
const Options& Options);
- void Execute(FolderContent& OutLocalFolderState, UpdateFolderStream* OptionalUpdateFolderStream = nullptr);
+ void Execute(FolderContent& OutLocalFolderState);
DiskStatistics m_DiskStats;
CacheMappingStatistics m_CacheMappingStats;
@@ -294,8 +295,7 @@ private:
uint64_t TotalRequestCount,
uint64_t TotalPartWriteCount,
FilteredRate& FilteredDownloadedBytesPerSecond,
- FilteredRate& FilteredWrittenBytesPerSecond,
- UpdateFolderStream* OptionalUpdateFolderStream);
+ FilteredRate& FilteredWrittenBytesPerSecond);
void DownloadBuildBlob(uint32_t RemoteChunkIndex,
const BlobsExistsResult& ExistsResult,
@@ -327,16 +327,14 @@ private:
const std::vector<ChunkedFolderContent>& ScavengedContents,
const std::vector<ChunkedContentLookup>& ScavengedLookups,
const std::vector<std::filesystem::path>& ScavengedPaths,
- BufferedWriteFileCache& WriteCache,
- UpdateFolderStream* OptionalUpdateFolderStream);
+ BufferedWriteFileCache& WriteCache);
bool WriteCompressedChunkToCache(const uint32_t RemoteChunkIndex,
const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs,
BufferedWriteFileCache& WriteCache,
- IoBuffer&& CompressedPart,
- UpdateFolderStream* OptionalUpdateFolderStream);
+ IoBuffer&& CompressedPart);
- void StreamDecompress(uint32_t RemoteSequenceIndex, CompositeBuffer&& CompressedPart, UpdateFolderStream* OptionalUpdateFolderStream);
+ void StreamDecompress(uint32_t RemoteSequenceIndex, CompositeBuffer&& CompressedPart);
void WriteSequenceChunkToCache(BufferedWriteFileCache::Local& LocalWriter,
const CompositeBuffer& Chunk,
@@ -356,16 +354,14 @@ private:
void WriteBlockChunkOpsToCache(std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
const BlockWriteOps& Ops,
BufferedWriteFileCache& WriteCache,
- ParallelWork& Work,
- UpdateFolderStream* OptionalUpdateFolderStream);
+ ParallelWork& Work);
bool WriteChunksBlockToCache(const ChunkBlockDescription& BlockDescription,
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
ParallelWork& Work,
CompositeBuffer&& BlockBuffer,
std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
- BufferedWriteFileCache& WriteCache,
- UpdateFolderStream* OptionalUpdateFolderStream);
+ BufferedWriteFileCache& WriteCache);
bool WritePartialBlockChunksToCache(const ChunkBlockDescription& BlockDescription,
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
@@ -374,8 +370,7 @@ private:
uint32_t FirstIncludedBlockChunkIndex,
uint32_t LastIncludedBlockChunkIndex,
std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
- BufferedWriteFileCache& WriteCache,
- UpdateFolderStream* OptionalUpdateFolderStream);
+ BufferedWriteFileCache& WriteCache);
void AsyncWriteDownloadedChunk(const std::filesystem::path& ZenFolderPath,
uint32_t RemoteChunkIndex,
@@ -387,18 +382,15 @@ private:
std::atomic<uint64_t>& WritePartsComplete,
const uint64_t TotalPartWriteCount,
FilteredRate& FilteredWrittenBytesPerSecond,
- bool EnableBacklog,
- UpdateFolderStream* OptionalUpdateFolderStream);
+ bool EnableBacklog);
- void VerifyAndCompleteChunkSequencesAsync(std::span<const uint32_t> RemoteSequenceIndexes,
- ParallelWork& Work,
- UpdateFolderStream* OptionalUpdateFolderStream);
+ void VerifyAndCompleteChunkSequencesAsync(std::span<const uint32_t> RemoteSequenceIndexes, ParallelWork& Work);
bool CompleteSequenceChunk(uint32_t RemoteSequenceIndex, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters);
std::vector<uint32_t> CompleteChunkTargets(const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs,
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters);
- void FinalizeChunkSequence(uint32_t RemoteSequenceIndex, UpdateFolderStream* OptionalUpdateFolderStream);
- void FinalizeChunkSequences(std::span<const uint32_t> RemoteSequenceIndexes, UpdateFolderStream* OptionalUpdateFolderStream);
- void VerifySequence(uint32_t RemoteSequenceIndex);
+ void FinalizeChunkSequence(uint32_t RemoteSequenceIndex);
+ void FinalizeChunkSequences(std::span<const uint32_t> RemoteSequenceIndexes);
+ void VerifySequence(uint32_t RemoteSequenceIndex);
OperationLogOutput& m_LogOutput;
StorageInstance& m_Storage;
diff --git a/src/zenremotestore/include/zenremotestore/builds/updatefolderstream.h b/src/zenremotestore/include/zenremotestore/builds/updatefoldereventdiskstream.h
index 6fe2188cd..6db03fa80 100644
--- a/src/zenremotestore/include/zenremotestore/builds/updatefolderstream.h
+++ b/src/zenremotestore/include/zenremotestore/builds/updatefoldereventdiskstream.h
@@ -4,59 +4,10 @@
#include <zencore/basicfile.h>
#include <zencore/timer.h>
-
-#include <zenremotestore/chunking/chunkblock.h>
-#include <zenremotestore/chunking/chunkedcontent.h>
+#include <zenremotestore/builds/updatefoldereventstream.h>
namespace zen {
-class UpdateFolderStream
-{
-public:
- virtual ~UpdateFolderStream() {}
-
- struct Sequence
- {
- uint32_t RemoteSequenceIndex;
- };
- struct Chunk
- {
- uint32_t RemoteChunkIndex;
- };
- struct Block
- {
- uint32_t BlockIndex;
- };
- struct BlockRange
- {
- uint32_t BlockIndex;
- uint32_t RangeStart;
- uint32_t RangeLength;
- };
-
- virtual void Initialize(const ChunkedFolderContent& RemoteContent,
- std::span<const ChunkBlockDescription> BlockDescriptions,
- std::span<const Chunk> DownloadChunks,
- std::span<const BlockRange> DownloadBlockRanges,
- std::span<const Block> DownloadBlocks,
- std::span<const Sequence> SequencesToWrite) = 0;
-
- virtual void DownloadedChunk(uint32_t RemoteChunkIndex) = 0;
- virtual void DownloadedBlockRange(uint32_t BlockIndex, uint32_t RangeStart, uint32_t RangeLength) = 0;
- virtual void DownloadedBlock(uint32_t BlockIndex) = 0;
-
- virtual void WroteSequence(uint32_t RemoteSequenceIndex, uint64_t Offset, uint64_t Length) = 0;
-
- virtual void CompletedBlockRange(uint32_t BlockIndex, uint32_t RangeStart, uint32_t RangeLength) = 0;
- virtual void CompletedBlock(uint32_t BlockIndex) = 0;
-
- virtual void CompletedSequence(uint32_t RemoteSequenceIndex) = 0;
-
- virtual void WriteComplete() = 0;
-};
-
-std::unique_ptr<UpdateFolderStream> CreateDiskUpdateFolderStream(const std::filesystem::path& OutputPath);
-
struct ChunkSequenceStatus
{
struct Range
@@ -76,12 +27,12 @@ struct ChunkSequenceStatus
void RemoveCompletedRange(const Range& InRange);
};
-class UpdateFolderDiskStream
+class UpdateFolderEventDiskStream
{
- using Sequence = UpdateFolderStream::Sequence;
- using Chunk = UpdateFolderStream::Chunk;
- using Block = UpdateFolderStream::Block;
- using BlockRange = UpdateFolderStream::BlockRange;
+ using Sequence = UpdateFolderEventStream::Sequence;
+ using Chunk = UpdateFolderEventStream::Chunk;
+ using Block = UpdateFolderEventStream::Block;
+ using BlockRange = UpdateFolderEventStream::BlockRange;
public:
#pragma pack(push)
@@ -179,9 +130,10 @@ public:
#pragma pack(pop)
static_assert(sizeof(Event) == 28u);
- UpdateFolderDiskStream(const std::filesystem::path& OutputPath);
+ UpdateFolderEventDiskStream() {}
+ explicit UpdateFolderEventDiskStream(const std::filesystem::path& OutputPath);
- virtual ~UpdateFolderDiskStream();
+ virtual ~UpdateFolderEventDiskStream();
Event MakeEvent(Event::EventType Type, Event::EventPayload Payload);
@@ -205,8 +157,6 @@ public:
std::vector<Sequence>& OutSequencesToWrite,
std::vector<Event>& OutEvents);
- static void Replay(const std::filesystem::path& InputPath, UpdateFolderStream& UpdateFolderStream);
-
private:
uint16_t GetThreadIndex(int ThreadId);
uint16_t GetCurrentThreadIndex();
diff --git a/src/zenremotestore/include/zenremotestore/builds/updatefoldereventstream.h b/src/zenremotestore/include/zenremotestore/builds/updatefoldereventstream.h
new file mode 100644
index 000000000..7a3f152f8
--- /dev/null
+++ b/src/zenremotestore/include/zenremotestore/builds/updatefoldereventstream.h
@@ -0,0 +1,61 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/basicfile.h>
+#include <zencore/timer.h>
+
+#include <zenremotestore/chunking/chunkblock.h>
+#include <zenremotestore/chunking/chunkedcontent.h>
+
+namespace zen {
+
+class UpdateFolderEventStream
+{
+public:
+ virtual ~UpdateFolderEventStream() {}
+
+ struct Sequence
+ {
+ uint32_t RemoteSequenceIndex;
+ };
+ struct Chunk
+ {
+ uint32_t RemoteChunkIndex;
+ };
+ struct Block
+ {
+ uint32_t BlockIndex;
+ };
+ struct BlockRange
+ {
+ uint32_t BlockIndex;
+ uint32_t RangeStart;
+ uint32_t RangeLength;
+ };
+
+ virtual void Initialize(const ChunkedFolderContent& RemoteContent,
+ std::span<const ChunkBlockDescription> BlockDescriptions,
+ std::span<const Chunk> DownloadChunks,
+ std::span<const BlockRange> DownloadBlockRanges,
+ std::span<const Block> DownloadBlocks,
+ std::span<const Sequence> SequencesToWrite) = 0;
+
+ virtual void DownloadedChunk(uint32_t RemoteChunkIndex) = 0;
+ virtual void DownloadedBlockRange(uint32_t BlockIndex, uint32_t RangeStart, uint32_t RangeLength) = 0;
+ virtual void DownloadedBlock(uint32_t BlockIndex) = 0;
+
+ virtual void WroteSequence(uint32_t RemoteSequenceIndex, uint64_t Offset, uint64_t Length) = 0;
+
+ virtual void CompletedBlockRange(uint32_t BlockIndex, uint32_t RangeStart, uint32_t RangeLength) = 0;
+ virtual void CompletedBlock(uint32_t BlockIndex) = 0;
+
+ virtual void CompletedSequence(uint32_t RemoteSequenceIndex) = 0;
+
+ virtual void WriteComplete() = 0;
+};
+
+std::unique_ptr<UpdateFolderEventStream> CreateDiskUpdateFolderEventStream(const std::filesystem::path& OutputPath);
+void ReplayUpdateFolderEventDiskStream(const std::filesystem::path& InputPath, UpdateFolderEventStream& EventStream);
+
+} // namespace zen