aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-10-22 11:08:39 +0200
committerGitHub Enterprise <[email protected]>2025-10-22 11:08:39 +0200
commitab9face3992228d1bf632a79bce1240d61eb431f (patch)
treeac3100761b50f7a9d652884d2603aad5fe0da859 /src
parentfix error log when using TryCloneFile on Mac/Linux (#597) (diff)
downloadzen-ab9face3992228d1bf632a79bce1240d61eb431f.tar.xz
zen-ab9face3992228d1bf632a79bce1240d61eb431f.zip
make validation of completed sequences in builds download optional (#596)
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/builds_cmd.cpp1
-rw-r--r--src/zenremotestore/builds/buildstorageoperations.cpp5100
-rw-r--r--src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h3
3 files changed, 2577 insertions, 2527 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index 9c6fd17ab..d220eefb2 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -2143,6 +2143,7 @@ namespace {
.PrimeCacheOnly = Options.PrimeCacheOnly,
.EnableOtherDownloadsScavenging = Options.EnableOtherDownloadsScavenging,
.EnableTargetFolderScavenging = Options.EnableTargetFolderScavenging,
+ .ValidateCompletedSequences = Options.PostDownloadVerify,
.ExcludeFolders = DefaultExcludeFolders,
.ExcludeExtensions = DefaultExcludeExtensions});
{
diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp
index ebb876ed9..c9f142d7a 100644
--- a/src/zenremotestore/builds/buildstorageoperations.cpp
+++ b/src/zenremotestore/builds/buildstorageoperations.cpp
@@ -402,95 +402,6 @@ MakeBufferMemoryBased(const CompositeBuffer& PartialBlockBuffer)
}
}
-void
-StreamDecompress(std::atomic<bool>& AbortFlag,
- const std::filesystem::path& CacheFolderPath,
- const IoHash& SequenceRawHash,
- CompositeBuffer&& CompressedPart,
- std::atomic<uint64_t>& ReadCount,
- std::atomic<uint64_t>& ReadByteCount,
- std::atomic<uint64_t>& WriteCount,
- std::atomic<uint64_t>& WriteByteCount,
- std::atomic<uint64_t>& WrittenChunkByteCount,
- std::atomic<uint64_t>& ValidatedChunkByteCount)
-{
- ZEN_TRACE_CPU("StreamDecompress");
- const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash);
- TemporaryFile DecompressedTemp;
- std::error_code Ec;
- DecompressedTemp.CreateTemporary(TempChunkSequenceFileName.parent_path(), Ec);
- if (Ec)
- {
- throw std::runtime_error(fmt::format("Failed creating temporary file for decompressing large blob {}, reason: ({}) {}",
- SequenceRawHash,
- Ec.value(),
- Ec.message()));
- }
- IoHash RawHash;
- uint64_t RawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompressedPart, RawHash, RawSize);
- if (!Compressed)
- {
- throw std::runtime_error(fmt::format("Failed to parse header of compressed large blob {}", SequenceRawHash));
- }
- if (RawHash != SequenceRawHash)
- {
- throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, SequenceRawHash));
- }
- PrepareFileForScatteredWrite(DecompressedTemp.Handle(), RawSize);
-
- IoHashStream Hash;
- bool CouldDecompress =
- Compressed.DecompressToStream(0,
- (uint64_t)-1,
- [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
- ZEN_UNUSED(SourceOffset);
- ZEN_TRACE_CPU("StreamDecompress_Write");
- ReadCount++;
- ReadByteCount += SourceSize;
- if (!AbortFlag)
- {
- for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
- {
- Hash.Append(Segment.GetView());
- ValidatedChunkByteCount += Segment.GetSize();
- DecompressedTemp.Write(Segment, Offset);
- Offset += Segment.GetSize();
- WriteByteCount += Segment.GetSize();
- WriteCount++;
- WrittenChunkByteCount += Segment.GetSize();
- }
- return true;
- }
- return false;
- });
-
- if (AbortFlag)
- {
- return;
- }
-
- if (!CouldDecompress)
- {
- throw std::runtime_error(fmt::format("Failed to decompress large blob {}", SequenceRawHash));
- }
- const IoHash VerifyHash = Hash.GetHash();
- if (VerifyHash != SequenceRawHash)
- {
- throw std::runtime_error(
- fmt::format("Decompressed blob payload hash {} does not match expected hash {}", VerifyHash, SequenceRawHash));
- }
- DecompressedTemp.MoveTemporaryIntoPlace(TempChunkSequenceFileName, Ec);
- if (Ec)
- {
- throw std::runtime_error(fmt::format("Failed moving temporary file for decompressing large blob {}, reason: ({}) {}",
- SequenceRawHash,
- Ec.value(),
- Ec.message()));
- }
- // WriteChunkStats.ChunkCountWritten++;
-}
-
class FilteredRate
{
public:
@@ -645,346 +556,259 @@ void
BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
{
ZEN_TRACE_CPU("BuildsOperationUpdateFolder::Execute");
-
- enum TaskSteps : uint32_t
+ try
{
- ScanExistingData,
- WriteChunks,
- PrepareTarget,
- FinalizeTarget,
- Cleanup,
- StepCount
- };
+ enum TaskSteps : uint32_t
+ {
+ ScanExistingData,
+ WriteChunks,
+ PrepareTarget,
+ FinalizeTarget,
+ Cleanup,
+ StepCount
+ };
- auto EndProgress = MakeGuard([&]() { m_LogOutput.SetLogOperationProgress(TaskSteps::StepCount, TaskSteps::StepCount); });
+ auto EndProgress = MakeGuard([&]() { m_LogOutput.SetLogOperationProgress(TaskSteps::StepCount, TaskSteps::StepCount); });
- ZEN_ASSERT((!m_Options.PrimeCacheOnly) ||
- (m_Options.PrimeCacheOnly && (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Off)));
+ ZEN_ASSERT((!m_Options.PrimeCacheOnly) ||
+ (m_Options.PrimeCacheOnly && (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Off)));
- m_LogOutput.SetLogOperationProgress(TaskSteps::ScanExistingData, TaskSteps::StepCount);
+ m_LogOutput.SetLogOperationProgress(TaskSteps::ScanExistingData, TaskSteps::StepCount);
- CreateDirectories(m_CacheFolderPath);
- CreateDirectories(m_TempDownloadFolderPath);
- CreateDirectories(m_TempBlockFolderPath);
+ CreateDirectories(m_CacheFolderPath);
+ CreateDirectories(m_TempDownloadFolderPath);
+ CreateDirectories(m_TempBlockFolderPath);
- Stopwatch IndexTimer;
+ Stopwatch IndexTimer;
- if (!m_Options.IsQuiet)
- {
- LOG_OUTPUT(m_LogOutput, "Indexed local and remote content in {}", NiceTimeSpanMs(IndexTimer.GetElapsedTimeMs()));
- }
+ if (!m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput, "Indexed local and remote content in {}", NiceTimeSpanMs(IndexTimer.GetElapsedTimeMs()));
+ }
- Stopwatch CacheMappingTimer;
+ Stopwatch CacheMappingTimer;
- std::vector<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters(m_RemoteContent.ChunkedContent.SequenceRawHashes.size());
- std::vector<bool> RemoteChunkIndexNeedsCopyFromLocalFileFlags(m_RemoteContent.ChunkedContent.ChunkHashes.size());
- std::vector<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags(m_RemoteContent.ChunkedContent.ChunkHashes.size());
+ std::vector<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters(m_RemoteContent.ChunkedContent.SequenceRawHashes.size());
+ std::vector<bool> RemoteChunkIndexNeedsCopyFromLocalFileFlags(m_RemoteContent.ChunkedContent.ChunkHashes.size());
+ std::vector<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags(m_RemoteContent.ChunkedContent.ChunkHashes.size());
- tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedChunkHashesFound;
- tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedSequenceHashesFound;
- if (!m_Options.PrimeCacheOnly)
- {
- ScanCacheFolder(CachedChunkHashesFound, CachedSequenceHashesFound);
- }
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedChunkHashesFound;
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedSequenceHashesFound;
+ if (!m_Options.PrimeCacheOnly)
+ {
+ ScanCacheFolder(CachedChunkHashesFound, CachedSequenceHashesFound);
+ }
- tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedBlocksFound;
- if (!m_Options.PrimeCacheOnly)
- {
- ScanTempBlocksFolder(CachedBlocksFound);
- }
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedBlocksFound;
+ if (!m_Options.PrimeCacheOnly)
+ {
+ ScanTempBlocksFolder(CachedBlocksFound);
+ }
- tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> SequenceIndexesLeftToFindToRemoteIndex;
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> SequenceIndexesLeftToFindToRemoteIndex;
- if (!m_Options.PrimeCacheOnly && m_Options.EnableTargetFolderScavenging)
- {
- // Pick up all whole files we can use from current local state
- ZEN_TRACE_CPU("GetLocalSequences");
+ if (!m_Options.PrimeCacheOnly && m_Options.EnableTargetFolderScavenging)
+ {
+ // Pick up all whole files we can use from current local state
+ ZEN_TRACE_CPU("GetLocalSequences");
- Stopwatch LocalTimer;
+ Stopwatch LocalTimer;
- std::vector<uint32_t> MissingSequenceIndexes = ScanTargetFolder(CachedChunkHashesFound, CachedSequenceHashesFound);
+ std::vector<uint32_t> MissingSequenceIndexes = ScanTargetFolder(CachedChunkHashesFound, CachedSequenceHashesFound);
- for (uint32_t RemoteSequenceIndex : MissingSequenceIndexes)
- {
- // We must write the sequence
- const uint32_t ChunkCount = m_RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex];
- const IoHash& RemoteSequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = ChunkCount;
- SequenceIndexesLeftToFindToRemoteIndex.insert({RemoteSequenceRawHash, RemoteSequenceIndex});
+ for (uint32_t RemoteSequenceIndex : MissingSequenceIndexes)
+ {
+ // We must write the sequence
+ const uint32_t ChunkCount = m_RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex];
+ const IoHash& RemoteSequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = ChunkCount;
+ SequenceIndexesLeftToFindToRemoteIndex.insert({RemoteSequenceRawHash, RemoteSequenceIndex});
+ }
}
- }
- else
- {
- for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < m_RemoteContent.ChunkedContent.SequenceRawHashes.size();
- RemoteSequenceIndex++)
+ else
{
- const uint32_t ChunkCount = m_RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex];
- SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = ChunkCount;
+ for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < m_RemoteContent.ChunkedContent.SequenceRawHashes.size();
+ RemoteSequenceIndex++)
+ {
+ const uint32_t ChunkCount = m_RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex];
+ SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = ChunkCount;
+ }
}
- }
-
- std::vector<ChunkedFolderContent> ScavengedContents;
- std::vector<ChunkedContentLookup> ScavengedLookups;
- std::vector<std::filesystem::path> ScavengedPaths;
- std::vector<ScavengedSequenceCopyOperation> ScavengedSequenceCopyOperations;
- uint64_t ScavengedPathsCount = 0;
-
- if (!m_Options.PrimeCacheOnly && m_Options.EnableOtherDownloadsScavenging)
- {
- ZEN_TRACE_CPU("GetScavengedSequences");
+ std::vector<ChunkedFolderContent> ScavengedContents;
+ std::vector<ChunkedContentLookup> ScavengedLookups;
+ std::vector<std::filesystem::path> ScavengedPaths;
- Stopwatch ScavengeTimer;
+ std::vector<ScavengedSequenceCopyOperation> ScavengedSequenceCopyOperations;
+ uint64_t ScavengedPathsCount = 0;
- if (!SequenceIndexesLeftToFindToRemoteIndex.empty())
+ if (!m_Options.PrimeCacheOnly && m_Options.EnableOtherDownloadsScavenging)
{
- std::vector<ScavengeSource> ScavengeSources = FindScavengeSources();
+ ZEN_TRACE_CPU("GetScavengedSequences");
- const size_t ScavengePathCount = ScavengeSources.size();
+ Stopwatch ScavengeTimer;
- ScavengedContents.resize(ScavengePathCount);
- ScavengedLookups.resize(ScavengePathCount);
- ScavengedPaths.resize(ScavengePathCount);
+ if (!SequenceIndexesLeftToFindToRemoteIndex.empty())
+ {
+ std::vector<ScavengeSource> ScavengeSources = FindScavengeSources();
- std::unique_ptr<BuildOpLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Scavenging"));
- BuildOpLogOutput::ProgressBar& ScavengeProgressBar(*ProgressBarPtr);
+ const size_t ScavengePathCount = ScavengeSources.size();
- ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ ScavengedContents.resize(ScavengePathCount);
+ ScavengedLookups.resize(ScavengePathCount);
+ ScavengedPaths.resize(ScavengePathCount);
- std::atomic<uint64_t> PathsFound(0);
- std::atomic<uint64_t> ChunksFound(0);
- std::atomic<uint64_t> PathsScavenged(0);
+ std::unique_ptr<BuildOpLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Scavenging"));
+ BuildOpLogOutput::ProgressBar& ScavengeProgressBar(*ProgressBarPtr);
- for (size_t ScavengeIndex = 0; ScavengeIndex < ScavengePathCount; ScavengeIndex++)
- {
- Work.ScheduleWork(m_IOWorkerPool,
- [this,
- &ScavengeSources,
- &ScavengedContents,
- &ScavengedPaths,
- &ScavengedLookups,
- &PathsFound,
- &ChunksFound,
- &PathsScavenged,
- ScavengeIndex](std::atomic<bool>&) {
- if (!m_AbortFlag)
- {
- ZEN_TRACE_CPU("Async_FindScavengeContent");
+ ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
- const ScavengeSource& Source = ScavengeSources[ScavengeIndex];
- ChunkedFolderContent& ScavengedLocalContent = ScavengedContents[ScavengeIndex];
- ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengeIndex];
+ std::atomic<uint64_t> PathsFound(0);
+ std::atomic<uint64_t> ChunksFound(0);
+ std::atomic<uint64_t> PathsScavenged(0);
- if (FindScavengeContent(Source, ScavengedLocalContent, ScavengedLookup))
- {
- ScavengedPaths[ScavengeIndex] = Source.Path;
- PathsFound += ScavengedLocalContent.Paths.size();
- ChunksFound += ScavengedLocalContent.ChunkedContent.ChunkHashes.size();
- }
- else
+ for (size_t ScavengeIndex = 0; ScavengeIndex < ScavengePathCount; ScavengeIndex++)
+ {
+ Work.ScheduleWork(m_IOWorkerPool,
+ [this,
+ &ScavengeSources,
+ &ScavengedContents,
+ &ScavengedPaths,
+ &ScavengedLookups,
+ &PathsFound,
+ &ChunksFound,
+ &PathsScavenged,
+ ScavengeIndex](std::atomic<bool>&) {
+ if (!m_AbortFlag)
{
- ScavengedPaths[ScavengeIndex].clear();
- }
- PathsScavenged++;
- }
- });
- }
- {
- ZEN_TRACE_CPU("ScavengeScan_Wait");
+ ZEN_TRACE_CPU("Async_FindScavengeContent");
- Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(PendingWork);
- std::string Details = fmt::format("{}/{} scanned. {} paths and {} chunks found for scavenging",
- PathsScavenged.load(),
- ScavengePathCount,
- PathsFound.load(),
- ChunksFound.load());
- ScavengeProgressBar.UpdateState({.Task = "Scavenging ",
- .Details = Details,
- .TotalCount = ScavengePathCount,
- .RemainingCount = ScavengePathCount - PathsScavenged.load(),
- .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
- false);
- });
- }
+ const ScavengeSource& Source = ScavengeSources[ScavengeIndex];
+ ChunkedFolderContent& ScavengedLocalContent = ScavengedContents[ScavengeIndex];
+ ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengeIndex];
- ScavengeProgressBar.Finish();
- if (m_AbortFlag)
- {
- return;
- }
+ if (FindScavengeContent(Source, ScavengedLocalContent, ScavengedLookup))
+ {
+ ScavengedPaths[ScavengeIndex] = Source.Path;
+ PathsFound += ScavengedLocalContent.Paths.size();
+ ChunksFound += ScavengedLocalContent.ChunkedContent.ChunkHashes.size();
+ }
+ else
+ {
+ ScavengedPaths[ScavengeIndex].clear();
+ }
+ PathsScavenged++;
+ }
+ });
+ }
+ {
+ ZEN_TRACE_CPU("ScavengeScan_Wait");
+
+ Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
+ std::string Details = fmt::format("{}/{} scanned. {} paths and {} chunks found for scavenging",
+ PathsScavenged.load(),
+ ScavengePathCount,
+ PathsFound.load(),
+ ChunksFound.load());
+ ScavengeProgressBar.UpdateState(
+ {.Task = "Scavenging ",
+ .Details = Details,
+ .TotalCount = ScavengePathCount,
+ .RemainingCount = ScavengePathCount - PathsScavenged.load(),
+ .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ });
+ }
- for (uint32_t ScavengedContentIndex = 0;
- ScavengedContentIndex < ScavengedContents.size() && (!SequenceIndexesLeftToFindToRemoteIndex.empty());
- ScavengedContentIndex++)
- {
- const std::filesystem::path& ScavengePath = ScavengedPaths[ScavengedContentIndex];
- if (!ScavengePath.empty())
+ ScavengeProgressBar.Finish();
+ if (m_AbortFlag)
{
- const ChunkedFolderContent& ScavengedLocalContent = ScavengedContents[ScavengedContentIndex];
- const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengedContentIndex];
+ return;
+ }
- for (uint32_t ScavengedSequenceIndex = 0;
- ScavengedSequenceIndex < ScavengedLocalContent.ChunkedContent.SequenceRawHashes.size();
- ScavengedSequenceIndex++)
+ for (uint32_t ScavengedContentIndex = 0;
+ ScavengedContentIndex < ScavengedContents.size() && (!SequenceIndexesLeftToFindToRemoteIndex.empty());
+ ScavengedContentIndex++)
+ {
+ const std::filesystem::path& ScavengePath = ScavengedPaths[ScavengedContentIndex];
+ if (!ScavengePath.empty())
{
- const IoHash& SequenceRawHash = ScavengedLocalContent.ChunkedContent.SequenceRawHashes[ScavengedSequenceIndex];
- if (auto It = SequenceIndexesLeftToFindToRemoteIndex.find(SequenceRawHash);
- It != SequenceIndexesLeftToFindToRemoteIndex.end())
+ const ChunkedFolderContent& ScavengedLocalContent = ScavengedContents[ScavengedContentIndex];
+ const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengedContentIndex];
+
+ for (uint32_t ScavengedSequenceIndex = 0;
+ ScavengedSequenceIndex < ScavengedLocalContent.ChunkedContent.SequenceRawHashes.size();
+ ScavengedSequenceIndex++)
{
- const uint32_t RemoteSequenceIndex = It->second;
- const uint64_t RawSize =
- m_RemoteContent.RawSizes[m_RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]];
- ZEN_ASSERT(RawSize > 0);
+ const IoHash& SequenceRawHash = ScavengedLocalContent.ChunkedContent.SequenceRawHashes[ScavengedSequenceIndex];
+ if (auto It = SequenceIndexesLeftToFindToRemoteIndex.find(SequenceRawHash);
+ It != SequenceIndexesLeftToFindToRemoteIndex.end())
+ {
+ const uint32_t RemoteSequenceIndex = It->second;
+ const uint64_t RawSize =
+ m_RemoteContent.RawSizes[m_RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]];
+ ZEN_ASSERT(RawSize > 0);
- const uint32_t ScavengedPathIndex = ScavengedLookup.SequenceIndexFirstPathIndex[ScavengedSequenceIndex];
- ZEN_ASSERT_SLOW(IsFile((ScavengePath / ScavengedLocalContent.Paths[ScavengedPathIndex]).make_preferred()));
+ const uint32_t ScavengedPathIndex = ScavengedLookup.SequenceIndexFirstPathIndex[ScavengedSequenceIndex];
+ ZEN_ASSERT_SLOW(IsFile((ScavengePath / ScavengedLocalContent.Paths[ScavengedPathIndex]).make_preferred()));
- ScavengedSequenceCopyOperations.push_back({.ScavengedContentIndex = ScavengedContentIndex,
- .ScavengedPathIndex = ScavengedPathIndex,
- .RemoteSequenceIndex = RemoteSequenceIndex,
- .RawSize = RawSize});
+ ScavengedSequenceCopyOperations.push_back({.ScavengedContentIndex = ScavengedContentIndex,
+ .ScavengedPathIndex = ScavengedPathIndex,
+ .RemoteSequenceIndex = RemoteSequenceIndex,
+ .RawSize = RawSize});
- SequenceIndexesLeftToFindToRemoteIndex.erase(SequenceRawHash);
- SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = 0;
+ SequenceIndexesLeftToFindToRemoteIndex.erase(SequenceRawHash);
+ SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = 0;
- m_CacheMappingStats.ScavengedPathsMatchingSequencesCount++;
- m_CacheMappingStats.ScavengedPathsMatchingSequencesByteCount += RawSize;
+ m_CacheMappingStats.ScavengedPathsMatchingSequencesCount++;
+ m_CacheMappingStats.ScavengedPathsMatchingSequencesByteCount += RawSize;
+ }
}
+ ScavengedPathsCount++;
}
- ScavengedPathsCount++;
}
}
+ m_CacheMappingStats.ScavengeElapsedWallTimeUs += ScavengeTimer.GetElapsedTimeUs();
}
- m_CacheMappingStats.ScavengeElapsedWallTimeUs += ScavengeTimer.GetElapsedTimeUs();
- }
-
- uint32_t RemainingChunkCount = 0;
- for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < m_RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++)
- {
- uint64_t ChunkWriteCount = GetChunkWriteCount(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex);
- if (ChunkWriteCount > 0)
- {
- RemainingChunkCount++;
- }
- }
-
- // Pick up all chunks in current local state
- tsl::robin_map<IoHash, size_t, IoHash::Hasher> RawHashToCopyChunkDataIndex;
- std::vector<CopyChunkData> CopyChunkDatas;
-
- if (!m_Options.PrimeCacheOnly && m_Options.EnableTargetFolderScavenging)
- {
- ZEN_TRACE_CPU("GetLocalChunks");
- Stopwatch LocalTimer;
-
- for (uint32_t LocalSequenceIndex = 0;
- LocalSequenceIndex < m_LocalContent.ChunkedContent.SequenceRawHashes.size() && (RemainingChunkCount > 0);
- LocalSequenceIndex++)
+ uint32_t RemainingChunkCount = 0;
+ for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < m_RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++)
{
- const IoHash& LocalSequenceRawHash = m_LocalContent.ChunkedContent.SequenceRawHashes[LocalSequenceIndex];
- const uint32_t LocalOrderOffset = m_LocalLookup.SequenceIndexChunkOrderOffset[LocalSequenceIndex];
-
+ uint64_t ChunkWriteCount = GetChunkWriteCount(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex);
+ if (ChunkWriteCount > 0)
{
- uint64_t SourceOffset = 0;
- const uint32_t LocalChunkCount = m_LocalContent.ChunkedContent.ChunkCounts[LocalSequenceIndex];
- for (uint32_t LocalOrderIndex = 0; LocalOrderIndex < LocalChunkCount; LocalOrderIndex++)
- {
- const uint32_t LocalChunkIndex = m_LocalContent.ChunkedContent.ChunkOrders[LocalOrderOffset + LocalOrderIndex];
- const IoHash& LocalChunkHash = m_LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex];
- const uint64_t LocalChunkRawSize = m_LocalContent.ChunkedContent.ChunkRawSizes[LocalChunkIndex];
-
- if (auto RemoteChunkIt = m_RemoteLookup.ChunkHashToChunkIndex.find(LocalChunkHash);
- RemoteChunkIt != m_RemoteLookup.ChunkHashToChunkIndex.end())
- {
- const uint32_t RemoteChunkIndex = RemoteChunkIt->second;
- if (!RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex])
- {
- std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
- GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex);
-
- if (!ChunkTargetPtrs.empty())
- {
- CopyChunkData::ChunkTarget Target = {
- .TargetChunkLocationCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()),
- .RemoteChunkIndex = RemoteChunkIndex,
- .CacheFileOffset = SourceOffset};
- if (auto CopySourceIt = RawHashToCopyChunkDataIndex.find(LocalSequenceRawHash);
- CopySourceIt != RawHashToCopyChunkDataIndex.end())
- {
- CopyChunkData& Data = CopyChunkDatas[CopySourceIt->second];
- if (Data.TargetChunkLocationPtrs.size() > 1024)
- {
- RawHashToCopyChunkDataIndex.insert_or_assign(LocalSequenceRawHash, CopyChunkDatas.size());
- CopyChunkDatas.push_back(
- CopyChunkData{.ScavengeSourceIndex = (uint32_t)-1,
- .SourceSequenceIndex = LocalSequenceIndex,
- .TargetChunkLocationPtrs = ChunkTargetPtrs,
- .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}});
- }
- else
- {
- Data.TargetChunkLocationPtrs.insert(Data.TargetChunkLocationPtrs.end(),
- ChunkTargetPtrs.begin(),
- ChunkTargetPtrs.end());
- Data.ChunkTargets.push_back(Target);
- }
- }
- else
- {
- RawHashToCopyChunkDataIndex.insert_or_assign(LocalSequenceRawHash, CopyChunkDatas.size());
- CopyChunkDatas.push_back(
- CopyChunkData{.ScavengeSourceIndex = (uint32_t)-1,
- .SourceSequenceIndex = LocalSequenceIndex,
- .TargetChunkLocationPtrs = ChunkTargetPtrs,
- .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}});
- }
- m_CacheMappingStats.LocalChunkMatchingRemoteCount++;
- m_CacheMappingStats.LocalChunkMatchingRemoteByteCount += LocalChunkRawSize;
- RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex] = true;
- RemainingChunkCount--;
- }
- }
- }
- SourceOffset += LocalChunkRawSize;
- }
+ RemainingChunkCount++;
}
}
- m_CacheMappingStats.LocalScanElapsedWallTimeUs += LocalTimer.GetElapsedTimeUs();
- }
-
- if (!m_Options.PrimeCacheOnly && m_Options.EnableOtherDownloadsScavenging)
- {
- ZEN_TRACE_CPU("GetScavengeChunks");
- Stopwatch ScavengeTimer;
+ // Pick up all chunks in current local state
+ tsl::robin_map<IoHash, size_t, IoHash::Hasher> RawHashToCopyChunkDataIndex;
+ std::vector<CopyChunkData> CopyChunkDatas;
- for (uint32_t ScavengedContentIndex = 0; ScavengedContentIndex < ScavengedContents.size() && (RemainingChunkCount > 0);
- ScavengedContentIndex++)
+ if (!m_Options.PrimeCacheOnly && m_Options.EnableTargetFolderScavenging)
{
- const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengedContentIndex];
- // const std::filesystem::path& ScavengedPath = ScavengedPaths[ScavengedContentIndex];
- const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengedContentIndex];
+ ZEN_TRACE_CPU("GetLocalChunks");
+
+ Stopwatch LocalTimer;
- for (uint32_t ScavengedSequenceIndex = 0;
- ScavengedSequenceIndex < ScavengedContent.ChunkedContent.SequenceRawHashes.size() && (RemainingChunkCount > 0);
- ScavengedSequenceIndex++)
+ for (uint32_t LocalSequenceIndex = 0;
+ LocalSequenceIndex < m_LocalContent.ChunkedContent.SequenceRawHashes.size() && (RemainingChunkCount > 0);
+ LocalSequenceIndex++)
{
- const IoHash& ScavengedSequenceRawHash = ScavengedContent.ChunkedContent.SequenceRawHashes[ScavengedSequenceIndex];
- const uint32_t ScavengedOrderOffset = ScavengedLookup.SequenceIndexChunkOrderOffset[ScavengedSequenceIndex];
+ const IoHash& LocalSequenceRawHash = m_LocalContent.ChunkedContent.SequenceRawHashes[LocalSequenceIndex];
+ const uint32_t LocalOrderOffset = m_LocalLookup.SequenceIndexChunkOrderOffset[LocalSequenceIndex];
{
- uint64_t SourceOffset = 0;
- const uint32_t ScavengedChunkCount = ScavengedContent.ChunkedContent.ChunkCounts[ScavengedSequenceIndex];
- for (uint32_t ScavengedOrderIndex = 0; ScavengedOrderIndex < ScavengedChunkCount; ScavengedOrderIndex++)
+ uint64_t SourceOffset = 0;
+ const uint32_t LocalChunkCount = m_LocalContent.ChunkedContent.ChunkCounts[LocalSequenceIndex];
+ for (uint32_t LocalOrderIndex = 0; LocalOrderIndex < LocalChunkCount; LocalOrderIndex++)
{
- const uint32_t ScavengedChunkIndex =
- ScavengedContent.ChunkedContent.ChunkOrders[ScavengedOrderOffset + ScavengedOrderIndex];
- const IoHash& ScavengedChunkHash = ScavengedContent.ChunkedContent.ChunkHashes[ScavengedChunkIndex];
- const uint64_t ScavengedChunkRawSize = ScavengedContent.ChunkedContent.ChunkRawSizes[ScavengedChunkIndex];
+ const uint32_t LocalChunkIndex = m_LocalContent.ChunkedContent.ChunkOrders[LocalOrderOffset + LocalOrderIndex];
+ const IoHash& LocalChunkHash = m_LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex];
+ const uint64_t LocalChunkRawSize = m_LocalContent.ChunkedContent.ChunkRawSizes[LocalChunkIndex];
- if (auto RemoteChunkIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ScavengedChunkHash);
+ if (auto RemoteChunkIt = m_RemoteLookup.ChunkHashToChunkIndex.find(LocalChunkHash);
RemoteChunkIt != m_RemoteLookup.ChunkHashToChunkIndex.end())
{
const uint32_t RemoteChunkIndex = RemoteChunkIt->second;
@@ -999,16 +823,16 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
.TargetChunkLocationCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()),
.RemoteChunkIndex = RemoteChunkIndex,
.CacheFileOffset = SourceOffset};
- if (auto CopySourceIt = RawHashToCopyChunkDataIndex.find(ScavengedSequenceRawHash);
+ if (auto CopySourceIt = RawHashToCopyChunkDataIndex.find(LocalSequenceRawHash);
CopySourceIt != RawHashToCopyChunkDataIndex.end())
{
CopyChunkData& Data = CopyChunkDatas[CopySourceIt->second];
if (Data.TargetChunkLocationPtrs.size() > 1024)
{
- RawHashToCopyChunkDataIndex.insert_or_assign(ScavengedSequenceRawHash, CopyChunkDatas.size());
+ RawHashToCopyChunkDataIndex.insert_or_assign(LocalSequenceRawHash, CopyChunkDatas.size());
CopyChunkDatas.push_back(
- CopyChunkData{.ScavengeSourceIndex = ScavengedContentIndex,
- .SourceSequenceIndex = ScavengedSequenceIndex,
+ CopyChunkData{.ScavengeSourceIndex = (uint32_t)-1,
+ .SourceSequenceIndex = LocalSequenceIndex,
.TargetChunkLocationPtrs = ChunkTargetPtrs,
.ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}});
}
@@ -1022,158 +846,248 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
}
else
{
- RawHashToCopyChunkDataIndex.insert_or_assign(ScavengedSequenceRawHash, CopyChunkDatas.size());
+ RawHashToCopyChunkDataIndex.insert_or_assign(LocalSequenceRawHash, CopyChunkDatas.size());
CopyChunkDatas.push_back(
- CopyChunkData{.ScavengeSourceIndex = ScavengedContentIndex,
- .SourceSequenceIndex = ScavengedSequenceIndex,
+ CopyChunkData{.ScavengeSourceIndex = (uint32_t)-1,
+ .SourceSequenceIndex = LocalSequenceIndex,
.TargetChunkLocationPtrs = ChunkTargetPtrs,
.ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}});
}
- m_CacheMappingStats.ScavengedChunkMatchingRemoteCount++;
- m_CacheMappingStats.ScavengedChunkMatchingRemoteByteCount += ScavengedChunkRawSize;
+ m_CacheMappingStats.LocalChunkMatchingRemoteCount++;
+ m_CacheMappingStats.LocalChunkMatchingRemoteByteCount += LocalChunkRawSize;
RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex] = true;
RemainingChunkCount--;
}
}
}
- SourceOffset += ScavengedChunkRawSize;
+ SourceOffset += LocalChunkRawSize;
}
}
}
- }
- m_CacheMappingStats.ScavengeElapsedWallTimeUs += ScavengeTimer.GetElapsedTimeUs();
- }
- if (!m_Options.IsQuiet)
- {
- if (m_CacheMappingStats.CacheSequenceHashesCount > 0 || m_CacheMappingStats.CacheChunkCount > 0 ||
- m_CacheMappingStats.CacheBlockCount > 0)
- {
- LOG_OUTPUT(m_LogOutput,
- "Download cache: Found {} ({}) chunk sequences, {} ({}) chunks, {} ({}) blocks in {}",
- m_CacheMappingStats.CacheSequenceHashesCount,
- NiceBytes(m_CacheMappingStats.CacheSequenceHashesByteCount),
- m_CacheMappingStats.CacheChunkCount,
- NiceBytes(m_CacheMappingStats.CacheChunkByteCount),
- m_CacheMappingStats.CacheBlockCount,
- NiceBytes(m_CacheMappingStats.CacheBlocksByteCount),
- NiceTimeSpanMs(m_CacheMappingStats.CacheScanElapsedWallTimeUs / 1000));
+ m_CacheMappingStats.LocalScanElapsedWallTimeUs += LocalTimer.GetElapsedTimeUs();
}
- if (m_CacheMappingStats.LocalPathsMatchingSequencesCount > 0 || m_CacheMappingStats.LocalChunkMatchingRemoteCount > 0)
+ if (!m_Options.PrimeCacheOnly && m_Options.EnableOtherDownloadsScavenging)
{
- LOG_OUTPUT(m_LogOutput,
- "Local state : Found {} ({}) chunk sequences, {} ({}) chunks in {}",
- m_CacheMappingStats.LocalPathsMatchingSequencesCount,
- NiceBytes(m_CacheMappingStats.LocalPathsMatchingSequencesByteCount),
- m_CacheMappingStats.LocalChunkMatchingRemoteCount,
- NiceBytes(m_CacheMappingStats.LocalChunkMatchingRemoteByteCount),
- NiceTimeSpanMs(m_CacheMappingStats.LocalScanElapsedWallTimeUs / 1000));
+ ZEN_TRACE_CPU("GetScavengeChunks");
+
+ Stopwatch ScavengeTimer;
+
+ for (uint32_t ScavengedContentIndex = 0; ScavengedContentIndex < ScavengedContents.size() && (RemainingChunkCount > 0);
+ ScavengedContentIndex++)
+ {
+ const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengedContentIndex];
+ // const std::filesystem::path& ScavengedPath = ScavengedPaths[ScavengedContentIndex];
+ const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengedContentIndex];
+
+ for (uint32_t ScavengedSequenceIndex = 0;
+ ScavengedSequenceIndex < ScavengedContent.ChunkedContent.SequenceRawHashes.size() && (RemainingChunkCount > 0);
+ ScavengedSequenceIndex++)
+ {
+ const IoHash& ScavengedSequenceRawHash = ScavengedContent.ChunkedContent.SequenceRawHashes[ScavengedSequenceIndex];
+ const uint32_t ScavengedOrderOffset = ScavengedLookup.SequenceIndexChunkOrderOffset[ScavengedSequenceIndex];
+
+ {
+ uint64_t SourceOffset = 0;
+ const uint32_t ScavengedChunkCount = ScavengedContent.ChunkedContent.ChunkCounts[ScavengedSequenceIndex];
+ for (uint32_t ScavengedOrderIndex = 0; ScavengedOrderIndex < ScavengedChunkCount; ScavengedOrderIndex++)
+ {
+ const uint32_t ScavengedChunkIndex =
+ ScavengedContent.ChunkedContent.ChunkOrders[ScavengedOrderOffset + ScavengedOrderIndex];
+ const IoHash& ScavengedChunkHash = ScavengedContent.ChunkedContent.ChunkHashes[ScavengedChunkIndex];
+ const uint64_t ScavengedChunkRawSize = ScavengedContent.ChunkedContent.ChunkRawSizes[ScavengedChunkIndex];
+
+ if (auto RemoteChunkIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ScavengedChunkHash);
+ RemoteChunkIt != m_RemoteLookup.ChunkHashToChunkIndex.end())
+ {
+ const uint32_t RemoteChunkIndex = RemoteChunkIt->second;
+ if (!RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex])
+ {
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
+ GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex);
+
+ if (!ChunkTargetPtrs.empty())
+ {
+ CopyChunkData::ChunkTarget Target = {
+ .TargetChunkLocationCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()),
+ .RemoteChunkIndex = RemoteChunkIndex,
+ .CacheFileOffset = SourceOffset};
+ if (auto CopySourceIt = RawHashToCopyChunkDataIndex.find(ScavengedSequenceRawHash);
+ CopySourceIt != RawHashToCopyChunkDataIndex.end())
+ {
+ CopyChunkData& Data = CopyChunkDatas[CopySourceIt->second];
+ if (Data.TargetChunkLocationPtrs.size() > 1024)
+ {
+ RawHashToCopyChunkDataIndex.insert_or_assign(ScavengedSequenceRawHash,
+ CopyChunkDatas.size());
+ CopyChunkDatas.push_back(
+ CopyChunkData{.ScavengeSourceIndex = ScavengedContentIndex,
+ .SourceSequenceIndex = ScavengedSequenceIndex,
+ .TargetChunkLocationPtrs = ChunkTargetPtrs,
+ .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}});
+ }
+ else
+ {
+ Data.TargetChunkLocationPtrs.insert(Data.TargetChunkLocationPtrs.end(),
+ ChunkTargetPtrs.begin(),
+ ChunkTargetPtrs.end());
+ Data.ChunkTargets.push_back(Target);
+ }
+ }
+ else
+ {
+ RawHashToCopyChunkDataIndex.insert_or_assign(ScavengedSequenceRawHash, CopyChunkDatas.size());
+ CopyChunkDatas.push_back(
+ CopyChunkData{.ScavengeSourceIndex = ScavengedContentIndex,
+ .SourceSequenceIndex = ScavengedSequenceIndex,
+ .TargetChunkLocationPtrs = ChunkTargetPtrs,
+ .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}});
+ }
+ m_CacheMappingStats.ScavengedChunkMatchingRemoteCount++;
+ m_CacheMappingStats.ScavengedChunkMatchingRemoteByteCount += ScavengedChunkRawSize;
+ RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex] = true;
+ RemainingChunkCount--;
+ }
+ }
+ }
+ SourceOffset += ScavengedChunkRawSize;
+ }
+ }
+ }
+ }
+ m_CacheMappingStats.ScavengeElapsedWallTimeUs += ScavengeTimer.GetElapsedTimeUs();
}
- if (m_CacheMappingStats.ScavengedPathsMatchingSequencesCount > 0 || m_CacheMappingStats.ScavengedChunkMatchingRemoteCount > 0)
+ if (!m_Options.IsQuiet)
{
- LOG_OUTPUT(m_LogOutput,
- "Scavenge of {} paths, found {} ({}) chunk sequences, {} ({}) chunks in {}",
- ScavengedPathsCount,
- m_CacheMappingStats.ScavengedPathsMatchingSequencesCount,
- NiceBytes(m_CacheMappingStats.ScavengedPathsMatchingSequencesByteCount),
- m_CacheMappingStats.ScavengedChunkMatchingRemoteCount,
- NiceBytes(m_CacheMappingStats.ScavengedChunkMatchingRemoteByteCount),
- NiceTimeSpanMs(m_CacheMappingStats.ScavengeElapsedWallTimeUs / 1000));
+ if (m_CacheMappingStats.CacheSequenceHashesCount > 0 || m_CacheMappingStats.CacheChunkCount > 0 ||
+ m_CacheMappingStats.CacheBlockCount > 0)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Download cache: Found {} ({}) chunk sequences, {} ({}) chunks, {} ({}) blocks in {}",
+ m_CacheMappingStats.CacheSequenceHashesCount,
+ NiceBytes(m_CacheMappingStats.CacheSequenceHashesByteCount),
+ m_CacheMappingStats.CacheChunkCount,
+ NiceBytes(m_CacheMappingStats.CacheChunkByteCount),
+ m_CacheMappingStats.CacheBlockCount,
+ NiceBytes(m_CacheMappingStats.CacheBlocksByteCount),
+ NiceTimeSpanMs(m_CacheMappingStats.CacheScanElapsedWallTimeUs / 1000));
+ }
+
+ if (m_CacheMappingStats.LocalPathsMatchingSequencesCount > 0 || m_CacheMappingStats.LocalChunkMatchingRemoteCount > 0)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Local state : Found {} ({}) chunk sequences, {} ({}) chunks in {}",
+ m_CacheMappingStats.LocalPathsMatchingSequencesCount,
+ NiceBytes(m_CacheMappingStats.LocalPathsMatchingSequencesByteCount),
+ m_CacheMappingStats.LocalChunkMatchingRemoteCount,
+ NiceBytes(m_CacheMappingStats.LocalChunkMatchingRemoteByteCount),
+ NiceTimeSpanMs(m_CacheMappingStats.LocalScanElapsedWallTimeUs / 1000));
+ }
+ if (m_CacheMappingStats.ScavengedPathsMatchingSequencesCount > 0 || m_CacheMappingStats.ScavengedChunkMatchingRemoteCount > 0)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Scavenge of {} paths, found {} ({}) chunk sequences, {} ({}) chunks in {}",
+ ScavengedPathsCount,
+ m_CacheMappingStats.ScavengedPathsMatchingSequencesCount,
+ NiceBytes(m_CacheMappingStats.ScavengedPathsMatchingSequencesByteCount),
+ m_CacheMappingStats.ScavengedChunkMatchingRemoteCount,
+ NiceBytes(m_CacheMappingStats.ScavengedChunkMatchingRemoteByteCount),
+ NiceTimeSpanMs(m_CacheMappingStats.ScavengeElapsedWallTimeUs / 1000));
+ }
}
- }
- uint64_t BytesToWrite = 0;
+ uint64_t BytesToWrite = 0;
- for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < m_RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++)
- {
- uint64_t ChunkWriteCount = GetChunkWriteCount(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex);
- if (ChunkWriteCount > 0)
+ for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < m_RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++)
{
- BytesToWrite += m_RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] * ChunkWriteCount;
- if (!RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex])
+ uint64_t ChunkWriteCount = GetChunkWriteCount(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex);
+ if (ChunkWriteCount > 0)
{
- RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex] = true;
+ BytesToWrite += m_RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] * ChunkWriteCount;
+ if (!RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex])
+ {
+ RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex] = true;
+ }
}
}
- }
- for (const ScavengedSequenceCopyOperation& ScavengeCopyOp : ScavengedSequenceCopyOperations)
- {
- BytesToWrite += ScavengeCopyOp.RawSize;
- }
+ for (const ScavengedSequenceCopyOperation& ScavengeCopyOp : ScavengedSequenceCopyOperations)
+ {
+ BytesToWrite += ScavengeCopyOp.RawSize;
+ }
- uint64_t BytesToValidate = BytesToWrite;
+ uint64_t BytesToValidate = m_Options.ValidateCompletedSequences ? BytesToWrite : 0;
- uint64_t TotalRequestCount = 0;
- uint64_t TotalPartWriteCount = 0;
- std::atomic<uint64_t> WritePartsComplete = 0;
+ uint64_t TotalRequestCount = 0;
+ uint64_t TotalPartWriteCount = 0;
+ std::atomic<uint64_t> WritePartsComplete = 0;
- {
- ZEN_TRACE_CPU("WriteChunks");
+ {
+ ZEN_TRACE_CPU("WriteChunks");
- m_LogOutput.SetLogOperationProgress(TaskSteps::WriteChunks, TaskSteps::StepCount);
+ m_LogOutput.SetLogOperationProgress(TaskSteps::WriteChunks, TaskSteps::StepCount);
- Stopwatch WriteTimer;
+ Stopwatch WriteTimer;
- FilteredRate FilteredDownloadedBytesPerSecond;
- FilteredRate FilteredWrittenBytesPerSecond;
+ FilteredRate FilteredDownloadedBytesPerSecond;
+ FilteredRate FilteredWrittenBytesPerSecond;
- std::unique_ptr<BuildOpLogOutput::ProgressBar> WriteProgressBarPtr(
- m_LogOutput.CreateProgressBar(m_Options.PrimeCacheOnly ? "Downloading" : "Writing"));
- BuildOpLogOutput::ProgressBar& WriteProgressBar(*WriteProgressBarPtr);
- ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ std::unique_ptr<BuildOpLogOutput::ProgressBar> WriteProgressBarPtr(
+ m_LogOutput.CreateProgressBar(m_Options.PrimeCacheOnly ? "Downloading" : "Writing"));
+ BuildOpLogOutput::ProgressBar& WriteProgressBar(*WriteProgressBarPtr);
+ ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
- struct LooseChunkHashWorkData
- {
- std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs;
- uint32_t RemoteChunkIndex = (uint32_t)-1;
- };
+ struct LooseChunkHashWorkData
+ {
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs;
+ uint32_t RemoteChunkIndex = (uint32_t)-1;
+ };
- std::vector<LooseChunkHashWorkData> LooseChunkHashWorks;
- TotalPartWriteCount += CopyChunkDatas.size();
- TotalPartWriteCount += ScavengedSequenceCopyOperations.size();
+ std::vector<LooseChunkHashWorkData> LooseChunkHashWorks;
+ TotalPartWriteCount += CopyChunkDatas.size();
+ TotalPartWriteCount += ScavengedSequenceCopyOperations.size();
- for (const IoHash ChunkHash : m_LooseChunkHashes)
- {
- auto RemoteChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash);
- ZEN_ASSERT(RemoteChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end());
- const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second;
- if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex])
+ for (const IoHash ChunkHash : m_LooseChunkHashes)
{
- if (m_Options.IsVerbose)
- {
- LOG_OUTPUT(m_LogOutput, "Skipping chunk {} due to cache reuse", ChunkHash);
- }
- continue;
- }
- bool NeedsCopy = true;
- if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex].compare_exchange_strong(NeedsCopy, false))
- {
- std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
- GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex);
-
- if (ChunkTargetPtrs.empty())
+ auto RemoteChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash);
+ ZEN_ASSERT(RemoteChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end());
+ const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second;
+ if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex])
{
if (m_Options.IsVerbose)
{
LOG_OUTPUT(m_LogOutput, "Skipping chunk {} due to cache reuse", ChunkHash);
}
+ continue;
}
- else
+ bool NeedsCopy = true;
+ if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex].compare_exchange_strong(NeedsCopy, false))
{
- TotalRequestCount++;
- TotalPartWriteCount++;
- LooseChunkHashWorks.push_back(
- LooseChunkHashWorkData{.ChunkTargetPtrs = ChunkTargetPtrs, .RemoteChunkIndex = RemoteChunkIndex});
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
+ GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex);
+
+ if (ChunkTargetPtrs.empty())
+ {
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput, "Skipping chunk {} due to cache reuse", ChunkHash);
+ }
+ }
+ else
+ {
+ TotalRequestCount++;
+ TotalPartWriteCount++;
+ LooseChunkHashWorks.push_back(
+ LooseChunkHashWorkData{.ChunkTargetPtrs = ChunkTargetPtrs, .RemoteChunkIndex = RemoteChunkIndex});
+ }
}
}
- }
- uint32_t BlockCount = gsl::narrow<uint32_t>(m_BlockDescriptions.size());
+ uint32_t BlockCount = gsl::narrow<uint32_t>(m_BlockDescriptions.size());
- std::vector<bool> ChunkIsPickedUpByBlock(m_RemoteContent.ChunkedContent.ChunkHashes.size(), false);
- auto GetNeededChunkBlockIndexes =
- [this, &RemoteChunkIndexNeedsCopyFromSourceFlags, &ChunkIsPickedUpByBlock](const ChunkBlockDescription& BlockDescription) {
+ std::vector<bool> ChunkIsPickedUpByBlock(m_RemoteContent.ChunkedContent.ChunkHashes.size(), false);
+ auto GetNeededChunkBlockIndexes = [this, &RemoteChunkIndexNeedsCopyFromSourceFlags, &ChunkIsPickedUpByBlock](
+ const ChunkBlockDescription& BlockDescription) {
ZEN_TRACE_CPU("GetNeededChunkBlockIndexes");
std::vector<uint32_t> NeededBlockChunkIndexes;
for (uint32_t ChunkBlockIndex = 0; ChunkBlockIndex < BlockDescription.ChunkRawHashes.size(); ChunkBlockIndex++)
@@ -1195,181 +1109,190 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
return NeededBlockChunkIndexes;
};
- std::vector<uint32_t> CachedChunkBlockIndexes;
- std::vector<uint32_t> FetchBlockIndexes;
- std::vector<std::vector<uint32_t>> AllBlockChunkIndexNeeded;
-
- for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; BlockIndex++)
- {
- const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
+ std::vector<uint32_t> CachedChunkBlockIndexes;
+ std::vector<uint32_t> FetchBlockIndexes;
+ std::vector<std::vector<uint32_t>> AllBlockChunkIndexNeeded;
- std::vector<uint32_t> BlockChunkIndexNeeded = GetNeededChunkBlockIndexes(BlockDescription);
- if (!BlockChunkIndexNeeded.empty())
+ for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; BlockIndex++)
{
- if (m_Options.PrimeCacheOnly)
- {
- FetchBlockIndexes.push_back(BlockIndex);
- }
- else
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
+
+ std::vector<uint32_t> BlockChunkIndexNeeded = GetNeededChunkBlockIndexes(BlockDescription);
+ if (!BlockChunkIndexNeeded.empty())
{
- bool UsingCachedBlock = false;
- if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end())
+ if (m_Options.PrimeCacheOnly)
{
- TotalPartWriteCount++;
+ FetchBlockIndexes.push_back(BlockIndex);
+ }
+ else
+ {
+ bool UsingCachedBlock = false;
+ if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end())
+ {
+ TotalPartWriteCount++;
- std::filesystem::path BlockPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString();
- if (IsFile(BlockPath))
+ std::filesystem::path BlockPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString();
+ if (IsFile(BlockPath))
+ {
+ CachedChunkBlockIndexes.push_back(BlockIndex);
+ UsingCachedBlock = true;
+ }
+ }
+ if (!UsingCachedBlock)
{
- CachedChunkBlockIndexes.push_back(BlockIndex);
- UsingCachedBlock = true;
+ FetchBlockIndexes.push_back(BlockIndex);
}
}
- if (!UsingCachedBlock)
- {
- FetchBlockIndexes.push_back(BlockIndex);
- }
}
+ AllBlockChunkIndexNeeded.emplace_back(std::move(BlockChunkIndexNeeded));
}
- AllBlockChunkIndexNeeded.emplace_back(std::move(BlockChunkIndexNeeded));
- }
- BlobsExistsResult ExistsResult;
+ BlobsExistsResult ExistsResult;
- if (m_Storage.BuildCacheStorage)
- {
- ZEN_TRACE_CPU("BlobCacheExistCheck");
- Stopwatch Timer;
-
- tsl::robin_set<IoHash> BlobHashesSet;
-
- BlobHashesSet.reserve(LooseChunkHashWorks.size() + FetchBlockIndexes.size());
- for (LooseChunkHashWorkData& LooseChunkHashWork : LooseChunkHashWorks)
+ if (m_Storage.BuildCacheStorage)
{
- BlobHashesSet.insert(m_RemoteContent.ChunkedContent.ChunkHashes[LooseChunkHashWork.RemoteChunkIndex]);
- }
- for (uint32_t BlockIndex : FetchBlockIndexes)
- {
- const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
- BlobHashesSet.insert(BlockDescription.BlockHash);
- }
+ ZEN_TRACE_CPU("BlobCacheExistCheck");
+ Stopwatch Timer;
- if (!BlobHashesSet.empty())
- {
- const std::vector<IoHash> BlobHashes(BlobHashesSet.begin(), BlobHashesSet.end());
- const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult =
- m_Storage.BuildCacheStorage->BlobsExists(m_BuildId, BlobHashes);
+ tsl::robin_set<IoHash> BlobHashesSet;
- if (CacheExistsResult.size() == BlobHashes.size())
+ BlobHashesSet.reserve(LooseChunkHashWorks.size() + FetchBlockIndexes.size());
+ for (LooseChunkHashWorkData& LooseChunkHashWork : LooseChunkHashWorks)
+ {
+ BlobHashesSet.insert(m_RemoteContent.ChunkedContent.ChunkHashes[LooseChunkHashWork.RemoteChunkIndex]);
+ }
+ for (uint32_t BlockIndex : FetchBlockIndexes)
{
- ExistsResult.ExistingBlobs.reserve(CacheExistsResult.size());
- for (size_t BlobIndex = 0; BlobIndex < BlobHashes.size(); BlobIndex++)
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
+ BlobHashesSet.insert(BlockDescription.BlockHash);
+ }
+
+ if (!BlobHashesSet.empty())
+ {
+ const std::vector<IoHash> BlobHashes(BlobHashesSet.begin(), BlobHashesSet.end());
+ const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult =
+ m_Storage.BuildCacheStorage->BlobsExists(m_BuildId, BlobHashes);
+
+ if (CacheExistsResult.size() == BlobHashes.size())
{
- if (CacheExistsResult[BlobIndex].HasBody)
+ ExistsResult.ExistingBlobs.reserve(CacheExistsResult.size());
+ for (size_t BlobIndex = 0; BlobIndex < BlobHashes.size(); BlobIndex++)
{
- ExistsResult.ExistingBlobs.insert(BlobHashes[BlobIndex]);
+ if (CacheExistsResult[BlobIndex].HasBody)
+ {
+ ExistsResult.ExistingBlobs.insert(BlobHashes[BlobIndex]);
+ }
}
}
- }
- ExistsResult.ElapsedTimeMs = Timer.GetElapsedTimeMs();
- if (!ExistsResult.ExistingBlobs.empty() && !m_Options.IsQuiet)
- {
- LOG_OUTPUT(m_LogOutput,
- "Remote cache : Found {} out of {} needed blobs in {}",
- ExistsResult.ExistingBlobs.size(),
- BlobHashes.size(),
- NiceTimeSpanMs(ExistsResult.ElapsedTimeMs));
+ ExistsResult.ElapsedTimeMs = Timer.GetElapsedTimeMs();
+ if (!ExistsResult.ExistingBlobs.empty() && !m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Remote cache : Found {} out of {} needed blobs in {}",
+ ExistsResult.ExistingBlobs.size(),
+ BlobHashes.size(),
+ NiceTimeSpanMs(ExistsResult.ElapsedTimeMs));
+ }
}
}
- }
- std::vector<BlockRangeDescriptor> BlockRangeWorks;
- std::vector<uint32_t> FullBlockWorks;
- {
- Stopwatch Timer;
-
- std::vector<uint32_t> PartialBlockIndexes;
-
- for (uint32_t BlockIndex : FetchBlockIndexes)
+ std::vector<BlockRangeDescriptor> BlockRangeWorks;
+ std::vector<uint32_t> FullBlockWorks;
{
- const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
+ Stopwatch Timer;
- const std::vector<uint32_t> BlockChunkIndexNeeded = std::move(AllBlockChunkIndexNeeded[BlockIndex]);
- if (!BlockChunkIndexNeeded.empty())
+ std::vector<uint32_t> PartialBlockIndexes;
+
+ for (uint32_t BlockIndex : FetchBlockIndexes)
{
- bool WantsToDoPartialBlockDownload = BlockChunkIndexNeeded.size() < BlockDescription.ChunkRawHashes.size();
- bool CanDoPartialBlockDownload = (BlockDescription.HeaderSize > 0) && (BlockDescription.ChunkCompressedLengths.size() ==
- BlockDescription.ChunkRawHashes.size());
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
- bool AllowedToDoPartialRequest = false;
- bool BlockExistInCache = ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash);
- switch (m_Options.PartialBlockRequestMode)
+ const std::vector<uint32_t> BlockChunkIndexNeeded = std::move(AllBlockChunkIndexNeeded[BlockIndex]);
+ if (!BlockChunkIndexNeeded.empty())
{
- case EPartialBlockRequestMode::Off:
- break;
- case EPartialBlockRequestMode::ZenCacheOnly:
- AllowedToDoPartialRequest = BlockExistInCache;
- break;
- case EPartialBlockRequestMode::Mixed:
- case EPartialBlockRequestMode::All:
- AllowedToDoPartialRequest = true;
- break;
- default:
- ZEN_ASSERT(false);
- break;
- }
+ bool WantsToDoPartialBlockDownload = BlockChunkIndexNeeded.size() < BlockDescription.ChunkRawHashes.size();
+ bool CanDoPartialBlockDownload =
+ (BlockDescription.HeaderSize > 0) &&
+ (BlockDescription.ChunkCompressedLengths.size() == BlockDescription.ChunkRawHashes.size());
+
+ bool AllowedToDoPartialRequest = false;
+ bool BlockExistInCache = ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash);
+ switch (m_Options.PartialBlockRequestMode)
+ {
+ case EPartialBlockRequestMode::Off:
+ break;
+ case EPartialBlockRequestMode::ZenCacheOnly:
+ AllowedToDoPartialRequest = BlockExistInCache;
+ break;
+ case EPartialBlockRequestMode::Mixed:
+ case EPartialBlockRequestMode::All:
+ AllowedToDoPartialRequest = true;
+ break;
+ default:
+ ZEN_ASSERT(false);
+ break;
+ }
- const uint32_t ChunkStartOffsetInBlock =
- gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize);
+ const uint32_t ChunkStartOffsetInBlock =
+ gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize);
- const uint64_t TotalBlockSize = std::accumulate(BlockDescription.ChunkCompressedLengths.begin(),
- BlockDescription.ChunkCompressedLengths.end(),
- std::uint64_t(ChunkStartOffsetInBlock));
+ const uint64_t TotalBlockSize = std::accumulate(BlockDescription.ChunkCompressedLengths.begin(),
+ BlockDescription.ChunkCompressedLengths.end(),
+ std::uint64_t(ChunkStartOffsetInBlock));
- if (AllowedToDoPartialRequest && WantsToDoPartialBlockDownload && CanDoPartialBlockDownload)
- {
- ZEN_TRACE_CPU("PartialBlockAnalysis");
-
- bool LimitToSingleRange =
- BlockExistInCache ? false : m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Mixed;
- uint64_t TotalWantedChunksSize = 0;
- std::optional<std::vector<BlockRangeDescriptor>> MaybeBlockRanges = CalculateBlockRanges(BlockIndex,
- BlockDescription,
- BlockChunkIndexNeeded,
- LimitToSingleRange,
- ChunkStartOffsetInBlock,
- TotalBlockSize,
- TotalWantedChunksSize);
- ZEN_ASSERT(TotalWantedChunksSize <= TotalBlockSize);
-
- if (MaybeBlockRanges.has_value())
+ if (AllowedToDoPartialRequest && WantsToDoPartialBlockDownload && CanDoPartialBlockDownload)
{
- const std::vector<BlockRangeDescriptor>& BlockRanges = MaybeBlockRanges.value();
- ZEN_ASSERT(!BlockRanges.empty());
- BlockRangeWorks.insert(BlockRangeWorks.end(), BlockRanges.begin(), BlockRanges.end());
- TotalRequestCount += BlockRanges.size();
- TotalPartWriteCount += BlockRanges.size();
-
- uint64_t RequestedSize = std::accumulate(
- BlockRanges.begin(),
- BlockRanges.end(),
- uint64_t(0),
- [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; });
- PartialBlockIndexes.push_back(BlockIndex);
-
- if (RequestedSize > TotalWantedChunksSize)
+ ZEN_TRACE_CPU("PartialBlockAnalysis");
+
+ bool LimitToSingleRange =
+ BlockExistInCache ? false : m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Mixed;
+ uint64_t TotalWantedChunksSize = 0;
+ std::optional<std::vector<BlockRangeDescriptor>> MaybeBlockRanges =
+ CalculateBlockRanges(BlockIndex,
+ BlockDescription,
+ BlockChunkIndexNeeded,
+ LimitToSingleRange,
+ ChunkStartOffsetInBlock,
+ TotalBlockSize,
+ TotalWantedChunksSize);
+ ZEN_ASSERT(TotalWantedChunksSize <= TotalBlockSize);
+
+ if (MaybeBlockRanges.has_value())
{
- if (m_Options.IsVerbose)
+ const std::vector<BlockRangeDescriptor>& BlockRanges = MaybeBlockRanges.value();
+ ZEN_ASSERT(!BlockRanges.empty());
+ BlockRangeWorks.insert(BlockRangeWorks.end(), BlockRanges.begin(), BlockRanges.end());
+ TotalRequestCount += BlockRanges.size();
+ TotalPartWriteCount += BlockRanges.size();
+
+ uint64_t RequestedSize = std::accumulate(
+ BlockRanges.begin(),
+ BlockRanges.end(),
+ uint64_t(0),
+ [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; });
+ PartialBlockIndexes.push_back(BlockIndex);
+
+ if (RequestedSize > TotalWantedChunksSize)
{
- LOG_OUTPUT(m_LogOutput,
- "Requesting {} chunks ({}) from block {} ({}) using {} requests (extra bytes {})",
- BlockChunkIndexNeeded.size(),
- NiceBytes(RequestedSize),
- BlockDescription.BlockHash,
- NiceBytes(TotalBlockSize),
- BlockRanges.size(),
- NiceBytes(RequestedSize - TotalWantedChunksSize));
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Requesting {} chunks ({}) from block {} ({}) using {} requests (extra bytes {})",
+ BlockChunkIndexNeeded.size(),
+ NiceBytes(RequestedSize),
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize),
+ BlockRanges.size(),
+ NiceBytes(RequestedSize - TotalWantedChunksSize));
+ }
}
}
+ else
+ {
+ FullBlockWorks.push_back(BlockIndex);
+ TotalRequestCount++;
+ TotalPartWriteCount++;
+ }
}
else
{
@@ -1378,584 +1301,393 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
TotalPartWriteCount++;
}
}
- else
- {
- FullBlockWorks.push_back(BlockIndex);
- TotalRequestCount++;
- TotalPartWriteCount++;
- }
}
- }
- if (!PartialBlockIndexes.empty())
- {
- uint64_t TotalFullBlockRequestBytes = 0;
- for (uint32_t BlockIndex : FullBlockWorks)
+ if (!PartialBlockIndexes.empty())
{
- const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
- uint32_t CurrentOffset =
- gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize);
+ uint64_t TotalFullBlockRequestBytes = 0;
+ for (uint32_t BlockIndex : FullBlockWorks)
+ {
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
+ uint32_t CurrentOffset =
+ gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize);
- TotalFullBlockRequestBytes += std::accumulate(BlockDescription.ChunkCompressedLengths.begin(),
- BlockDescription.ChunkCompressedLengths.end(),
- std::uint64_t(CurrentOffset));
- }
+ TotalFullBlockRequestBytes += std::accumulate(BlockDescription.ChunkCompressedLengths.begin(),
+ BlockDescription.ChunkCompressedLengths.end(),
+ std::uint64_t(CurrentOffset));
+ }
- uint64_t TotalPartialBlockBytes = 0;
- for (uint32_t BlockIndex : PartialBlockIndexes)
- {
- const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
- uint32_t CurrentOffset =
- gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize);
+ uint64_t TotalPartialBlockBytes = 0;
+ for (uint32_t BlockIndex : PartialBlockIndexes)
+ {
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
+ uint32_t CurrentOffset =
+ gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize);
- TotalPartialBlockBytes += std::accumulate(BlockDescription.ChunkCompressedLengths.begin(),
- BlockDescription.ChunkCompressedLengths.end(),
- std::uint64_t(CurrentOffset));
- }
+ TotalPartialBlockBytes += std::accumulate(BlockDescription.ChunkCompressedLengths.begin(),
+ BlockDescription.ChunkCompressedLengths.end(),
+ std::uint64_t(CurrentOffset));
+ }
- uint64_t NonPartialTotalBlockBytes = TotalFullBlockRequestBytes + TotalPartialBlockBytes;
+ uint64_t NonPartialTotalBlockBytes = TotalFullBlockRequestBytes + TotalPartialBlockBytes;
- const uint64_t TotalPartialBlockRequestBytes =
- std::accumulate(BlockRangeWorks.begin(),
- BlockRangeWorks.end(),
- uint64_t(0),
- [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; });
- uint64_t TotalExtraPartialBlocksRequests = BlockRangeWorks.size() - PartialBlockIndexes.size();
+ const uint64_t TotalPartialBlockRequestBytes =
+ std::accumulate(BlockRangeWorks.begin(),
+ BlockRangeWorks.end(),
+ uint64_t(0),
+ [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; });
+ uint64_t TotalExtraPartialBlocksRequests = BlockRangeWorks.size() - PartialBlockIndexes.size();
- uint64_t TotalSavedBlocksSize = TotalPartialBlockBytes - TotalPartialBlockRequestBytes;
- double SavedSizePercent = (TotalSavedBlocksSize * 100.0) / NonPartialTotalBlockBytes;
+ uint64_t TotalSavedBlocksSize = TotalPartialBlockBytes - TotalPartialBlockRequestBytes;
+ double SavedSizePercent = (TotalSavedBlocksSize * 100.0) / NonPartialTotalBlockBytes;
- if (!m_Options.IsQuiet)
- {
- LOG_OUTPUT(m_LogOutput,
- "Analisys of partial block requests saves download of {} out of {} ({:.1f}%) using {} extra "
- "requests. Completed in {}",
- NiceBytes(TotalSavedBlocksSize),
- NiceBytes(NonPartialTotalBlockBytes),
- SavedSizePercent,
- TotalExtraPartialBlocksRequests,
- NiceTimeSpanMs(ExistsResult.ElapsedTimeMs));
+ if (!m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Analisys of partial block requests saves download of {} out of {} ({:.1f}%) using {} extra "
+ "requests. Completed in {}",
+ NiceBytes(TotalSavedBlocksSize),
+ NiceBytes(NonPartialTotalBlockBytes),
+ SavedSizePercent,
+ TotalExtraPartialBlocksRequests,
+ NiceTimeSpanMs(ExistsResult.ElapsedTimeMs));
+ }
}
}
- }
- BufferedWriteFileCache WriteCache;
+ BufferedWriteFileCache WriteCache;
- for (uint32_t ScavengeOpIndex = 0; ScavengeOpIndex < ScavengedSequenceCopyOperations.size(); ScavengeOpIndex++)
- {
- if (m_AbortFlag)
- {
- break;
- }
- if (!m_Options.PrimeCacheOnly)
+ for (uint32_t ScavengeOpIndex = 0; ScavengeOpIndex < ScavengedSequenceCopyOperations.size(); ScavengeOpIndex++)
{
- Work.ScheduleWork(
- m_IOWorkerPool,
- [this,
- &ScavengedPaths,
- &ScavengedSequenceCopyOperations,
- &ScavengedContents,
- &FilteredWrittenBytesPerSecond,
- ScavengeOpIndex,
- &WritePartsComplete,
- TotalPartWriteCount](std::atomic<bool>&) mutable {
- if (!m_AbortFlag)
- {
- ZEN_TRACE_CPU("Async_WriteScavenged");
+ if (m_AbortFlag)
+ {
+ break;
+ }
+ if (!m_Options.PrimeCacheOnly)
+ {
+ Work.ScheduleWork(
+ m_IOWorkerPool,
+ [this,
+ &ScavengedPaths,
+ &ScavengedSequenceCopyOperations,
+ &ScavengedContents,
+ &FilteredWrittenBytesPerSecond,
+ ScavengeOpIndex,
+ &WritePartsComplete,
+ TotalPartWriteCount](std::atomic<bool>&) mutable {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_WriteScavenged");
- FilteredWrittenBytesPerSecond.Start();
+ FilteredWrittenBytesPerSecond.Start();
- const ScavengedSequenceCopyOperation& ScavengeOp = ScavengedSequenceCopyOperations[ScavengeOpIndex];
- const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengeOp.ScavengedContentIndex];
- const std::filesystem::path& ScavengeRootPath = ScavengedPaths[ScavengeOp.ScavengedContentIndex];
+ const ScavengedSequenceCopyOperation& ScavengeOp = ScavengedSequenceCopyOperations[ScavengeOpIndex];
+ const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengeOp.ScavengedContentIndex];
+ const std::filesystem::path& ScavengeRootPath = ScavengedPaths[ScavengeOp.ScavengedContentIndex];
- WriteScavengedSequenceToCache(ScavengeRootPath, ScavengedContent, ScavengeOp);
+ WriteScavengedSequenceToCache(ScavengeRootPath, ScavengedContent, ScavengeOp);
- WritePartsComplete++;
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
}
- }
- });
- }
- }
-
- for (uint32_t LooseChunkHashWorkIndex = 0; LooseChunkHashWorkIndex < LooseChunkHashWorks.size(); LooseChunkHashWorkIndex++)
- {
- if (m_AbortFlag)
- {
- break;
+ });
+ }
}
- if (m_Options.PrimeCacheOnly)
+ for (uint32_t LooseChunkHashWorkIndex = 0; LooseChunkHashWorkIndex < LooseChunkHashWorks.size(); LooseChunkHashWorkIndex++)
{
- const uint32_t RemoteChunkIndex = LooseChunkHashWorks[LooseChunkHashWorkIndex].RemoteChunkIndex;
- if (ExistsResult.ExistingBlobs.contains(m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]))
+ if (m_AbortFlag)
{
- m_DownloadStats.RequestsCompleteCount++;
- continue;
+ break;
}
- }
-
- Work.ScheduleWork(m_IOWorkerPool,
- [this,
- &SequenceIndexChunksLeftToWriteCounters,
- &Work,
- &ExistsResult,
- &WritePartsComplete,
- &LooseChunkHashWorks,
- LooseChunkHashWorkIndex,
- TotalRequestCount,
- TotalPartWriteCount,
- &WriteCache,
- &FilteredDownloadedBytesPerSecond,
- &FilteredWrittenBytesPerSecond](std::atomic<bool>&) mutable {
- ZEN_TRACE_CPU("Async_ReadPreDownloadedChunk");
- if (!m_AbortFlag)
- {
- LooseChunkHashWorkData& LooseChunkHashWork = LooseChunkHashWorks[LooseChunkHashWorkIndex];
- const uint32_t RemoteChunkIndex = LooseChunkHashWorks[LooseChunkHashWorkIndex].RemoteChunkIndex;
- WriteLooseChunk(RemoteChunkIndex,
- ExistsResult,
- SequenceIndexChunksLeftToWriteCounters,
- WritePartsComplete,
- std::move(LooseChunkHashWork.ChunkTargetPtrs),
- WriteCache,
- Work,
- TotalRequestCount,
- TotalPartWriteCount,
- FilteredDownloadedBytesPerSecond,
- FilteredWrittenBytesPerSecond);
- }
- });
- }
-
- std::unique_ptr<CloneQueryInterface> CloneQuery;
- if (m_Options.AllowFileClone)
- {
- CloneQuery = GetCloneQueryInterface(m_CacheFolderPath);
- }
- for (size_t CopyDataIndex = 0; CopyDataIndex < CopyChunkDatas.size(); CopyDataIndex++)
- {
- ZEN_ASSERT(!m_Options.PrimeCacheOnly);
- if (m_AbortFlag)
- {
- break;
- }
+ if (m_Options.PrimeCacheOnly)
+ {
+ const uint32_t RemoteChunkIndex = LooseChunkHashWorks[LooseChunkHashWorkIndex].RemoteChunkIndex;
+ if (ExistsResult.ExistingBlobs.contains(m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]))
+ {
+ m_DownloadStats.RequestsCompleteCount++;
+ continue;
+ }
+ }
- Work.ScheduleWork(m_IOWorkerPool,
- [this,
- &CloneQuery,
- &SequenceIndexChunksLeftToWriteCounters,
- &WriteCache,
- &Work,
- &FilteredWrittenBytesPerSecond,
- &CopyChunkDatas,
- &ScavengedContents,
- &ScavengedLookups,
- &ScavengedPaths,
- &WritePartsComplete,
- TotalPartWriteCount,
- CopyDataIndex](std::atomic<bool>&) {
- if (!m_AbortFlag)
- {
- ZEN_TRACE_CPU("Async_CopyLocal");
-
- FilteredWrittenBytesPerSecond.Start();
- const CopyChunkData& CopyData = CopyChunkDatas[CopyDataIndex];
-
- std::vector<uint32_t> WrittenSequenceIndexes = WriteLocalChunkToCache(CloneQuery.get(),
- CopyData,
- ScavengedContents,
- ScavengedLookups,
- ScavengedPaths,
- WriteCache);
- WritePartsComplete++;
+ Work.ScheduleWork(m_IOWorkerPool,
+ [this,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &Work,
+ &ExistsResult,
+ &WritePartsComplete,
+ &LooseChunkHashWorks,
+ LooseChunkHashWorkIndex,
+ TotalRequestCount,
+ TotalPartWriteCount,
+ &WriteCache,
+ &FilteredDownloadedBytesPerSecond,
+ &FilteredWrittenBytesPerSecond](std::atomic<bool>&) mutable {
+ ZEN_TRACE_CPU("Async_ReadPreDownloadedChunk");
if (!m_AbortFlag)
{
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
- }
-
- // Write tracking, updating this must be done without any files open
- std::vector<uint32_t> CompletedChunkSequences;
- for (uint32_t RemoteSequenceIndex : WrittenSequenceIndexes)
- {
- if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters))
- {
- CompletedChunkSequences.push_back(RemoteSequenceIndex);
- }
- }
- WriteCache.Close(CompletedChunkSequences);
- VerifyAndCompleteChunkSequencesAsync(CompletedChunkSequences, Work);
+ LooseChunkHashWorkData& LooseChunkHashWork = LooseChunkHashWorks[LooseChunkHashWorkIndex];
+ const uint32_t RemoteChunkIndex = LooseChunkHashWorks[LooseChunkHashWorkIndex].RemoteChunkIndex;
+ WriteLooseChunk(RemoteChunkIndex,
+ ExistsResult,
+ SequenceIndexChunksLeftToWriteCounters,
+ WritePartsComplete,
+ std::move(LooseChunkHashWork.ChunkTargetPtrs),
+ WriteCache,
+ Work,
+ TotalRequestCount,
+ TotalPartWriteCount,
+ FilteredDownloadedBytesPerSecond,
+ FilteredWrittenBytesPerSecond);
}
- }
- });
- }
+ });
+ }
- for (uint32_t BlockIndex : CachedChunkBlockIndexes)
- {
- ZEN_ASSERT(!m_Options.PrimeCacheOnly);
- if (m_AbortFlag)
+ std::unique_ptr<CloneQueryInterface> CloneQuery;
+ if (m_Options.AllowFileClone)
{
- break;
+ CloneQuery = GetCloneQueryInterface(m_CacheFolderPath);
}
- Work.ScheduleWork(m_IOWorkerPool,
- [this,
- &RemoteChunkIndexNeedsCopyFromSourceFlags,
- &SequenceIndexChunksLeftToWriteCounters,
- &WriteCache,
- &Work,
- &FilteredWrittenBytesPerSecond,
- &WritePartsComplete,
- TotalPartWriteCount,
- BlockIndex](std::atomic<bool>&) mutable {
- if (!m_AbortFlag)
- {
- ZEN_TRACE_CPU("Async_WriteCachedBlock");
-
- const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
- FilteredWrittenBytesPerSecond.Start();
-
- std::filesystem::path BlockChunkPath =
- m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString();
- IoBuffer BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
- if (!BlockBuffer)
- {
- throw std::runtime_error(
- fmt::format("Can not read block {} at {}", BlockDescription.BlockHash, BlockChunkPath));
- }
+ for (size_t CopyDataIndex = 0; CopyDataIndex < CopyChunkDatas.size(); CopyDataIndex++)
+ {
+ ZEN_ASSERT(!m_Options.PrimeCacheOnly);
+ if (m_AbortFlag)
+ {
+ break;
+ }
+ Work.ScheduleWork(m_IOWorkerPool,
+ [this,
+ &CloneQuery,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &WriteCache,
+ &Work,
+ &FilteredWrittenBytesPerSecond,
+ &CopyChunkDatas,
+ &ScavengedContents,
+ &ScavengedLookups,
+ &ScavengedPaths,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ CopyDataIndex](std::atomic<bool>&) {
if (!m_AbortFlag)
{
- if (!WriteChunksBlockToCache(BlockDescription,
- SequenceIndexChunksLeftToWriteCounters,
- Work,
- CompositeBuffer(std::move(BlockBuffer)),
- RemoteChunkIndexNeedsCopyFromSourceFlags,
- WriteCache))
- {
- std::error_code DummyEc;
- RemoveFile(BlockChunkPath, DummyEc);
- throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash));
- }
+ ZEN_TRACE_CPU("Async_CopyLocal");
- std::error_code Ec = TryRemoveFile(BlockChunkPath);
- if (Ec)
- {
- LOG_OUTPUT_DEBUG(m_LogOutput,
- "Failed removing file '{}', reason: ({}) {}",
- BlockChunkPath,
- Ec.value(),
- Ec.message());
- }
+ FilteredWrittenBytesPerSecond.Start();
+ const CopyChunkData& CopyData = CopyChunkDatas[CopyDataIndex];
+ std::vector<uint32_t> WrittenSequenceIndexes = WriteLocalChunkToCache(CloneQuery.get(),
+ CopyData,
+ ScavengedContents,
+ ScavengedLookups,
+ ScavengedPaths,
+ WriteCache);
WritePartsComplete++;
-
- if (WritePartsComplete == TotalPartWriteCount)
+ if (!m_AbortFlag)
{
- FilteredWrittenBytesPerSecond.Stop();
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
+
+ // Write tracking, updating this must be done without any files open
+ std::vector<uint32_t> CompletedChunkSequences;
+ for (uint32_t RemoteSequenceIndex : WrittenSequenceIndexes)
+ {
+ if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters))
+ {
+ CompletedChunkSequences.push_back(RemoteSequenceIndex);
+ }
+ }
+ WriteCache.Close(CompletedChunkSequences);
+ VerifyAndCompleteChunkSequencesAsync(CompletedChunkSequences, Work);
}
}
- }
- });
- }
+ });
+ }
- for (size_t BlockRangeIndex = 0; BlockRangeIndex < BlockRangeWorks.size(); BlockRangeIndex++)
- {
- ZEN_ASSERT(!m_Options.PrimeCacheOnly);
- if (m_AbortFlag)
+ for (uint32_t BlockIndex : CachedChunkBlockIndexes)
{
- break;
- }
- Work.ScheduleWork(
- m_NetworkPool,
- [this,
- &RemoteChunkIndexNeedsCopyFromSourceFlags,
- &SequenceIndexChunksLeftToWriteCounters,
- &ExistsResult,
- &WriteCache,
- &FilteredDownloadedBytesPerSecond,
- TotalRequestCount,
- &WritePartsComplete,
- TotalPartWriteCount,
- &FilteredWrittenBytesPerSecond,
- &Work,
- &BlockRangeWorks,
- BlockRangeIndex](std::atomic<bool>&) {
- if (!m_AbortFlag)
- {
- ZEN_TRACE_CPU("Async_GetPartialBlock");
+ ZEN_ASSERT(!m_Options.PrimeCacheOnly);
+ if (m_AbortFlag)
+ {
+ break;
+ }
- const BlockRangeDescriptor& BlockRange = BlockRangeWorks[BlockRangeIndex];
+ Work.ScheduleWork(
+ m_IOWorkerPool,
+ [this,
+ &RemoteChunkIndexNeedsCopyFromSourceFlags,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &WriteCache,
+ &Work,
+ &FilteredWrittenBytesPerSecond,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ BlockIndex](std::atomic<bool>&) mutable {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_WriteCachedBlock");
- FilteredDownloadedBytesPerSecond.Start();
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
+ FilteredWrittenBytesPerSecond.Start();
- DownloadPartialBlock(
- BlockRange,
- ExistsResult,
- [this,
- &RemoteChunkIndexNeedsCopyFromSourceFlags,
- &SequenceIndexChunksLeftToWriteCounters,
- &WritePartsComplete,
- &WriteCache,
- &Work,
- TotalRequestCount,
- TotalPartWriteCount,
- &FilteredDownloadedBytesPerSecond,
- &FilteredWrittenBytesPerSecond,
- &BlockRange](IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath) {
- if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ std::filesystem::path BlockChunkPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString();
+ IoBuffer BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
+ if (!BlockBuffer)
+ {
+ throw std::runtime_error(
+ fmt::format("Can not read block {} at {}", BlockDescription.BlockHash, BlockChunkPath));
+ }
+
+ if (!m_AbortFlag)
+ {
+ if (!WriteChunksBlockToCache(BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ CompositeBuffer(std::move(BlockBuffer)),
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ WriteCache))
{
- FilteredDownloadedBytesPerSecond.Stop();
+ std::error_code DummyEc;
+ RemoveFile(BlockChunkPath, DummyEc);
+ throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash));
}
- if (!m_AbortFlag)
+ std::error_code Ec = TryRemoveFile(BlockChunkPath);
+ if (Ec)
{
- Work.ScheduleWork(
- m_IOWorkerPool,
- [this,
- &RemoteChunkIndexNeedsCopyFromSourceFlags,
- &SequenceIndexChunksLeftToWriteCounters,
- &WritePartsComplete,
- &WriteCache,
- &Work,
- TotalPartWriteCount,
- &FilteredWrittenBytesPerSecond,
- &BlockRange,
- BlockChunkPath = std::move(OnDiskPath),
- BlockPartialBuffer = std::move(InMemoryBuffer)](std::atomic<bool>&) mutable {
- if (!m_AbortFlag)
- {
- ZEN_TRACE_CPU("Async_WritePartialBlock");
-
- const uint32_t BlockIndex = BlockRange.BlockIndex;
-
- const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
-
- if (BlockChunkPath.empty())
- {
- ZEN_ASSERT(BlockPartialBuffer);
- }
- else
- {
- ZEN_ASSERT(!BlockPartialBuffer);
- BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
- if (!BlockPartialBuffer)
- {
- throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}",
- BlockDescription.BlockHash,
- BlockChunkPath));
- }
- }
-
- FilteredWrittenBytesPerSecond.Start();
-
- if (!WritePartialBlockChunksToCache(
- BlockDescription,
- SequenceIndexChunksLeftToWriteCounters,
- Work,
- CompositeBuffer(std::move(BlockPartialBuffer)),
- BlockRange.ChunkBlockIndexStart,
- BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1,
- RemoteChunkIndexNeedsCopyFromSourceFlags,
- WriteCache))
- {
- std::error_code DummyEc;
- RemoveFile(BlockChunkPath, DummyEc);
- throw std::runtime_error(
- fmt::format("Partial block {} is malformed", BlockDescription.BlockHash));
- }
+ LOG_OUTPUT_DEBUG(m_LogOutput,
+ "Failed removing file '{}', reason: ({}) {}",
+ BlockChunkPath,
+ Ec.value(),
+ Ec.message());
+ }
- std::error_code Ec = TryRemoveFile(BlockChunkPath);
- if (Ec)
- {
- LOG_OUTPUT_DEBUG(m_LogOutput,
- "Failed removing file '{}', reason: ({}) {}",
- BlockChunkPath,
- Ec.value(),
- Ec.message());
- }
+ WritePartsComplete++;
- WritePartsComplete++;
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
- }
- }
- });
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
}
- });
- }
- });
- }
-
- for (uint32_t BlockIndex : FullBlockWorks)
- {
- if (m_AbortFlag)
- {
- break;
+ }
+ }
+ });
}
- if (m_Options.PrimeCacheOnly && ExistsResult.ExistingBlobs.contains(m_BlockDescriptions[BlockIndex].BlockHash))
+ for (size_t BlockRangeIndex = 0; BlockRangeIndex < BlockRangeWorks.size(); BlockRangeIndex++)
{
- m_DownloadStats.RequestsCompleteCount++;
- continue;
- }
-
- Work.ScheduleWork(
- m_NetworkPool,
- [this,
- &WritePartsComplete,
- TotalPartWriteCount,
- &FilteredWrittenBytesPerSecond,
- &ExistsResult,
- &Work,
- &WriteCache,
- &RemoteChunkIndexNeedsCopyFromSourceFlags,
- &SequenceIndexChunksLeftToWriteCounters,
- &FilteredDownloadedBytesPerSecond,
- TotalRequestCount,
- BlockIndex](std::atomic<bool>&) {
- if (!m_AbortFlag)
- {
- ZEN_TRACE_CPU("Async_GetFullBlock");
-
- const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
-
- FilteredDownloadedBytesPerSecond.Start();
-
- IoBuffer BlockBuffer;
- const bool ExistsInCache =
- m_Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash);
- if (ExistsInCache)
- {
- BlockBuffer = m_Storage.BuildCacheStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash);
- }
- if (!BlockBuffer)
- {
- BlockBuffer = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash);
- if (BlockBuffer && m_Storage.BuildCacheStorage)
- {
- m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId,
- BlockDescription.BlockHash,
- ZenContentType::kCompressedBinary,
- CompositeBuffer(SharedBuffer(BlockBuffer)));
- }
- }
- if (!BlockBuffer)
- {
- throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
- }
+ ZEN_ASSERT(!m_Options.PrimeCacheOnly);
+ if (m_AbortFlag)
+ {
+ break;
+ }
+ Work.ScheduleWork(
+ m_NetworkPool,
+ [this,
+ &RemoteChunkIndexNeedsCopyFromSourceFlags,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &ExistsResult,
+ &WriteCache,
+ &FilteredDownloadedBytesPerSecond,
+ TotalRequestCount,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ &FilteredWrittenBytesPerSecond,
+ &Work,
+ &BlockRangeWorks,
+ BlockRangeIndex](std::atomic<bool>&) {
if (!m_AbortFlag)
{
- uint64_t BlockSize = BlockBuffer.GetSize();
- m_DownloadStats.DownloadedBlockCount++;
- m_DownloadStats.DownloadedBlockByteCount += BlockSize;
- m_DownloadStats.RequestsCompleteCount++;
- if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
+ ZEN_TRACE_CPU("Async_GetPartialBlock");
- if (!m_Options.PrimeCacheOnly)
- {
- std::filesystem::path BlockChunkPath;
+ const BlockRangeDescriptor& BlockRange = BlockRangeWorks[BlockRangeIndex];
- // Check if the dowloaded block is file based and we can move it directly without rewriting it
- {
- IoBufferFileReference FileRef;
- if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) &&
- (FileRef.FileChunkSize == BlockSize))
- {
- ZEN_TRACE_CPU("MoveTempFullBlock");
- std::error_code Ec;
- std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
- if (!Ec)
- {
- BlockBuffer.SetDeleteOnClose(false);
- BlockBuffer = {};
- BlockChunkPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString();
- RenameFile(TempBlobPath, BlockChunkPath, Ec);
- if (Ec)
- {
- BlockChunkPath = std::filesystem::path{};
+ FilteredDownloadedBytesPerSecond.Start();
- // Re-open the temp file again
- BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
- BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true);
- BlockBuffer.SetDeleteOnClose(true);
- }
- }
+ DownloadPartialBlock(
+ BlockRange,
+ ExistsResult,
+ [this,
+ &RemoteChunkIndexNeedsCopyFromSourceFlags,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &WritePartsComplete,
+ &WriteCache,
+ &Work,
+ TotalRequestCount,
+ TotalPartWriteCount,
+ &FilteredDownloadedBytesPerSecond,
+ &FilteredWrittenBytesPerSecond,
+ &BlockRange](IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath) {
+ if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
}
- }
- if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u))
- {
- ZEN_TRACE_CPU("WriteTempFullBlock");
- // Could not be moved and rather large, lets store it on disk
- BlockChunkPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString();
- TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
- BlockBuffer = {};
- }
+ if (!m_AbortFlag)
+ {
+ Work.ScheduleWork(
+ m_IOWorkerPool,
+ [this,
+ &RemoteChunkIndexNeedsCopyFromSourceFlags,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &WritePartsComplete,
+ &WriteCache,
+ &Work,
+ TotalPartWriteCount,
+ &FilteredWrittenBytesPerSecond,
+ &BlockRange,
+ BlockChunkPath = std::move(OnDiskPath),
+ BlockPartialBuffer = std::move(InMemoryBuffer)](std::atomic<bool>&) mutable {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_WritePartialBlock");
- if (!m_AbortFlag)
- {
- Work.ScheduleWork(
- m_IOWorkerPool,
- [this,
- &Work,
- &RemoteChunkIndexNeedsCopyFromSourceFlags,
- &SequenceIndexChunksLeftToWriteCounters,
- BlockIndex,
- &WriteCache,
- &WritePartsComplete,
- TotalPartWriteCount,
- &FilteredWrittenBytesPerSecond,
- BlockChunkPath,
- BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable {
- if (!m_AbortFlag)
- {
- ZEN_TRACE_CPU("Async_WriteFullBlock");
+ const uint32_t BlockIndex = BlockRange.BlockIndex;
- const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
- if (BlockChunkPath.empty())
- {
- ZEN_ASSERT(BlockBuffer);
- }
- else
- {
- ZEN_ASSERT(!BlockBuffer);
- BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
- if (!BlockBuffer)
+ if (BlockChunkPath.empty())
{
- throw std::runtime_error(fmt::format("Could not open dowloaded block {} from {}",
- BlockDescription.BlockHash,
- BlockChunkPath));
+ ZEN_ASSERT(BlockPartialBuffer);
+ }
+ else
+ {
+ ZEN_ASSERT(!BlockPartialBuffer);
+ BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
+ if (!BlockPartialBuffer)
+ {
+ throw std::runtime_error(
+ fmt::format("Could not open downloaded block {} from {}",
+ BlockDescription.BlockHash,
+ BlockChunkPath));
+ }
}
- }
- FilteredWrittenBytesPerSecond.Start();
- if (!WriteChunksBlockToCache(BlockDescription,
- SequenceIndexChunksLeftToWriteCounters,
- Work,
- CompositeBuffer(std::move(BlockBuffer)),
- RemoteChunkIndexNeedsCopyFromSourceFlags,
- WriteCache))
- {
- std::error_code DummyEc;
- RemoveFile(BlockChunkPath, DummyEc);
- throw std::runtime_error(
- fmt::format("Block {} is malformed", BlockDescription.BlockHash));
- }
+ FilteredWrittenBytesPerSecond.Start();
+
+ if (!WritePartialBlockChunksToCache(
+ BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ CompositeBuffer(std::move(BlockPartialBuffer)),
+ BlockRange.ChunkBlockIndexStart,
+ BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ WriteCache))
+ {
+ std::error_code DummyEc;
+ RemoveFile(BlockChunkPath, DummyEc);
+ throw std::runtime_error(
+ fmt::format("Partial block {} is malformed", BlockDescription.BlockHash));
+ }
- if (!BlockChunkPath.empty())
- {
std::error_code Ec = TryRemoveFile(BlockChunkPath);
if (Ec)
{
@@ -1965,672 +1697,870 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
Ec.value(),
Ec.message());
}
+
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
}
+ });
+ }
+ });
+ }
+ });
+ }
+
+ for (uint32_t BlockIndex : FullBlockWorks)
+ {
+ if (m_AbortFlag)
+ {
+ break;
+ }
+
+ if (m_Options.PrimeCacheOnly && ExistsResult.ExistingBlobs.contains(m_BlockDescriptions[BlockIndex].BlockHash))
+ {
+ m_DownloadStats.RequestsCompleteCount++;
+ continue;
+ }
- WritePartsComplete++;
+ Work.ScheduleWork(
+ m_NetworkPool,
+ [this,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ &FilteredWrittenBytesPerSecond,
+ &ExistsResult,
+ &Work,
+ &WriteCache,
+ &RemoteChunkIndexNeedsCopyFromSourceFlags,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &FilteredDownloadedBytesPerSecond,
+ TotalRequestCount,
+ BlockIndex](std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_GetFullBlock");
- if (WritePartsComplete == TotalPartWriteCount)
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
+
+ FilteredDownloadedBytesPerSecond.Start();
+
+ IoBuffer BlockBuffer;
+ const bool ExistsInCache =
+ m_Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash);
+ if (ExistsInCache)
+ {
+ BlockBuffer = m_Storage.BuildCacheStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash);
+ }
+ if (!BlockBuffer)
+ {
+ BlockBuffer = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash);
+ if (BlockBuffer && m_Storage.BuildCacheStorage)
+ {
+ m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId,
+ BlockDescription.BlockHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(SharedBuffer(BlockBuffer)));
+ }
+ }
+ if (!BlockBuffer)
+ {
+ throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
+ }
+ if (!m_AbortFlag)
+ {
+ uint64_t BlockSize = BlockBuffer.GetSize();
+ m_DownloadStats.DownloadedBlockCount++;
+ m_DownloadStats.DownloadedBlockByteCount += BlockSize;
+ m_DownloadStats.RequestsCompleteCount++;
+ if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+
+ if (!m_Options.PrimeCacheOnly)
+ {
+ std::filesystem::path BlockChunkPath;
+
+ // Check if the dowloaded block is file based and we can move it directly without rewriting it
+ {
+ IoBufferFileReference FileRef;
+ if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) &&
+ (FileRef.FileChunkSize == BlockSize))
+ {
+ ZEN_TRACE_CPU("MoveTempFullBlock");
+ std::error_code Ec;
+ std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
+ if (!Ec)
+ {
+ BlockBuffer.SetDeleteOnClose(false);
+ BlockBuffer = {};
+ BlockChunkPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString();
+ RenameFile(TempBlobPath, BlockChunkPath, Ec);
+ if (Ec)
{
- FilteredWrittenBytesPerSecond.Stop();
+ BlockChunkPath = std::filesystem::path{};
+
+ // Re-open the temp file again
+ BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
+ BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true);
+ BlockBuffer.SetDeleteOnClose(true);
}
}
- });
+ }
+ }
+
+ if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u))
+ {
+ ZEN_TRACE_CPU("WriteTempFullBlock");
+ // Could not be moved and rather large, lets store it on disk
+ BlockChunkPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString();
+ TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
+ BlockBuffer = {};
+ }
+
+ if (!m_AbortFlag)
+ {
+ Work.ScheduleWork(m_IOWorkerPool,
+ [this,
+ &Work,
+ &RemoteChunkIndexNeedsCopyFromSourceFlags,
+ &SequenceIndexChunksLeftToWriteCounters,
+ BlockIndex,
+ &WriteCache,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ &FilteredWrittenBytesPerSecond,
+ BlockChunkPath,
+ BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_WriteFullBlock");
+
+ const ChunkBlockDescription& BlockDescription =
+ m_BlockDescriptions[BlockIndex];
+
+ if (BlockChunkPath.empty())
+ {
+ ZEN_ASSERT(BlockBuffer);
+ }
+ else
+ {
+ ZEN_ASSERT(!BlockBuffer);
+ BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
+ if (!BlockBuffer)
+ {
+ throw std::runtime_error(
+ fmt::format("Could not open dowloaded block {} from {}",
+ BlockDescription.BlockHash,
+ BlockChunkPath));
+ }
+ }
+
+ FilteredWrittenBytesPerSecond.Start();
+ if (!WriteChunksBlockToCache(BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ CompositeBuffer(std::move(BlockBuffer)),
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ WriteCache))
+ {
+ std::error_code DummyEc;
+ RemoveFile(BlockChunkPath, DummyEc);
+ throw std::runtime_error(
+ fmt::format("Block {} is malformed", BlockDescription.BlockHash));
+ }
+
+ if (!BlockChunkPath.empty())
+ {
+ std::error_code Ec = TryRemoveFile(BlockChunkPath);
+ if (Ec)
+ {
+ LOG_OUTPUT_DEBUG(m_LogOutput,
+ "Failed removing file '{}', reason: ({}) {}",
+ BlockChunkPath,
+ Ec.value(),
+ Ec.message());
+ }
+ }
+
+ WritePartsComplete++;
+
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
+ }
+ });
+ }
}
}
}
- }
- });
- }
+ });
+ }
- {
- ZEN_TRACE_CPU("WriteChunks_Wait");
+ {
+ ZEN_TRACE_CPU("WriteChunks_Wait");
- Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(PendingWork);
- uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() +
- m_DownloadStats.DownloadedBlockByteCount.load() +
- +m_DownloadStats.DownloadedPartialBlockByteCount.load();
- FilteredWrittenBytesPerSecond.Update(m_DiskStats.WriteByteCount.load());
- FilteredDownloadedBytesPerSecond.Update(DownloadedBytes);
- std::string DownloadRateString = (m_DownloadStats.RequestsCompleteCount == TotalRequestCount)
- ? ""
- : fmt::format(" {}bits/s", NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8));
- std::string CloneDetails;
- if (m_DiskStats.CloneCount.load() > 0)
- {
- CloneDetails = fmt::format(" ({} cloned)", NiceBytes(m_DiskStats.CloneByteCount.load()));
- }
- std::string WriteDetails = m_Options.PrimeCacheOnly ? ""
- : fmt::format(" {}/{} ({}B/s) written{}",
- NiceBytes(m_WrittenChunkByteCount.load()),
- NiceBytes(BytesToWrite),
- NiceNum(FilteredWrittenBytesPerSecond.GetCurrent()),
- CloneDetails);
-
- std::string Details = fmt::format("{}/{} ({}{}) downloaded.{}",
- m_DownloadStats.RequestsCompleteCount.load(),
- TotalRequestCount,
- NiceBytes(DownloadedBytes),
- DownloadRateString,
- WriteDetails);
-
- std::string Task;
- if (m_Options.PrimeCacheOnly)
- {
- Task = "Downloading ";
- }
- else if (m_WrittenChunkByteCount < BytesToWrite)
- {
- Task = "Writing chunks ";
- }
- else
- {
- Task = "Verifying chunks ";
- }
+ Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
+ uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() +
+ m_DownloadStats.DownloadedBlockByteCount.load() +
+ +m_DownloadStats.DownloadedPartialBlockByteCount.load();
+ FilteredWrittenBytesPerSecond.Update(m_DiskStats.WriteByteCount.load());
+ FilteredDownloadedBytesPerSecond.Update(DownloadedBytes);
+ std::string DownloadRateString =
+ (m_DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ ? ""
+ : fmt::format(" {}bits/s", NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8));
+ std::string CloneDetails;
+ if (m_DiskStats.CloneCount.load() > 0)
+ {
+ CloneDetails = fmt::format(" ({} cloned)", NiceBytes(m_DiskStats.CloneByteCount.load()));
+ }
+ std::string WriteDetails = m_Options.PrimeCacheOnly ? ""
+ : fmt::format(" {}/{} ({}B/s) written{}",
+ NiceBytes(m_WrittenChunkByteCount.load()),
+ NiceBytes(BytesToWrite),
+ NiceNum(FilteredWrittenBytesPerSecond.GetCurrent()),
+ CloneDetails);
+
+ std::string Details = fmt::format("{}/{} ({}{}) downloaded.{}",
+ m_DownloadStats.RequestsCompleteCount.load(),
+ TotalRequestCount,
+ NiceBytes(DownloadedBytes),
+ DownloadRateString,
+ WriteDetails);
- WriteProgressBar.UpdateState(
- {.Task = Task,
- .Details = Details,
- .TotalCount = m_Options.PrimeCacheOnly ? TotalRequestCount : (BytesToWrite + BytesToValidate),
- .RemainingCount = m_Options.PrimeCacheOnly ? (TotalRequestCount - m_DownloadStats.RequestsCompleteCount.load())
- : ((BytesToWrite + BytesToValidate) -
- (m_WrittenChunkByteCount.load() + m_ValidatedChunkByteCount.load())),
- .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
- false);
- });
- }
+ std::string Task;
+ if (m_Options.PrimeCacheOnly)
+ {
+ Task = "Downloading ";
+ }
+ else if (m_WrittenChunkByteCount < BytesToWrite)
+ {
+ Task = "Writing chunks ";
+ }
+ else
+ {
+ Task = "Verifying chunks ";
+ }
- CloneQuery.reset();
+ WriteProgressBar.UpdateState(
+ {.Task = Task,
+ .Details = Details,
+ .TotalCount = m_Options.PrimeCacheOnly ? TotalRequestCount : (BytesToWrite + BytesToValidate),
+ .RemainingCount = m_Options.PrimeCacheOnly ? (TotalRequestCount - m_DownloadStats.RequestsCompleteCount.load())
+ : ((BytesToWrite + BytesToValidate) -
+ (m_WrittenChunkByteCount.load() + m_ValidatedChunkByteCount.load())),
+ .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ });
+ }
- FilteredWrittenBytesPerSecond.Stop();
- FilteredDownloadedBytesPerSecond.Stop();
+ CloneQuery.reset();
- WriteProgressBar.Finish();
- if (m_AbortFlag)
- {
- return;
- }
+ FilteredWrittenBytesPerSecond.Stop();
+ FilteredDownloadedBytesPerSecond.Stop();
- if (!m_Options.PrimeCacheOnly)
- {
- ZEN_ASSERT(m_WrittenChunkByteCount == BytesToWrite);
- ZEN_ASSERT(m_ValidatedChunkByteCount == BytesToValidate);
+ WriteProgressBar.Finish();
+ if (m_AbortFlag)
+ {
+ return;
+ }
- uint32_t RawSequencesMissingWriteCount = 0;
- for (uint32_t SequenceIndex = 0; SequenceIndex < SequenceIndexChunksLeftToWriteCounters.size(); SequenceIndex++)
+ if (!m_Options.PrimeCacheOnly)
{
- const auto& SequenceIndexChunksLeftToWriteCounter = SequenceIndexChunksLeftToWriteCounters[SequenceIndex];
- if (SequenceIndexChunksLeftToWriteCounter.load() != 0)
+ ZEN_ASSERT(m_WrittenChunkByteCount == BytesToWrite);
+ ZEN_ASSERT(m_ValidatedChunkByteCount == BytesToValidate);
+
+ uint32_t RawSequencesMissingWriteCount = 0;
+ for (uint32_t SequenceIndex = 0; SequenceIndex < SequenceIndexChunksLeftToWriteCounters.size(); SequenceIndex++)
{
- RawSequencesMissingWriteCount++;
- const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex];
- const std::filesystem::path& IncompletePath = m_RemoteContent.Paths[PathIndex];
- ZEN_ASSERT(!IncompletePath.empty());
- const uint32_t ExpectedSequenceCount = m_RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex];
- if (!m_Options.IsQuiet)
+ const auto& SequenceIndexChunksLeftToWriteCounter = SequenceIndexChunksLeftToWriteCounters[SequenceIndex];
+ if (SequenceIndexChunksLeftToWriteCounter.load() != 0)
{
- LOG_OUTPUT(m_LogOutput,
- "{}: Max count {}, Current count {}",
- IncompletePath,
- ExpectedSequenceCount,
- SequenceIndexChunksLeftToWriteCounter.load());
+ RawSequencesMissingWriteCount++;
+ const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex];
+ const std::filesystem::path& IncompletePath = m_RemoteContent.Paths[PathIndex];
+ ZEN_ASSERT(!IncompletePath.empty());
+ const uint32_t ExpectedSequenceCount = m_RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex];
+ if (!m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "{}: Max count {}, Current count {}",
+ IncompletePath,
+ ExpectedSequenceCount,
+ SequenceIndexChunksLeftToWriteCounter.load());
+ }
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounter.load() <= ExpectedSequenceCount);
}
- ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounter.load() <= ExpectedSequenceCount);
}
+ ZEN_ASSERT(RawSequencesMissingWriteCount == 0);
}
- ZEN_ASSERT(RawSequencesMissingWriteCount == 0);
- }
- const uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() + m_DownloadStats.DownloadedBlockByteCount.load() +
- m_DownloadStats.DownloadedPartialBlockByteCount.load();
- if (!m_Options.IsQuiet)
- {
- std::string CloneDetails;
- if (m_DiskStats.CloneCount.load() > 0)
+ const uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() +
+ m_DownloadStats.DownloadedBlockByteCount.load() +
+ m_DownloadStats.DownloadedPartialBlockByteCount.load();
+ if (!m_Options.IsQuiet)
{
- CloneDetails = fmt::format(" ({} cloned)", NiceBytes(m_DiskStats.CloneByteCount.load()));
+ std::string CloneDetails;
+ if (m_DiskStats.CloneCount.load() > 0)
+ {
+ CloneDetails = fmt::format(" ({} cloned)", NiceBytes(m_DiskStats.CloneByteCount.load()));
+ }
+ LOG_OUTPUT(m_LogOutput,
+ "Downloaded {} ({}bits/s) in {}. Wrote {} ({}B/s){} in {}. Completed in {}",
+ NiceBytes(DownloadedBytes),
+ NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), DownloadedBytes * 8)),
+ NiceTimeSpanMs(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS() / 1000),
+ NiceBytes(m_WrittenChunkByteCount.load()),
+ NiceNum(GetBytesPerSecond(FilteredWrittenBytesPerSecond.GetElapsedTimeUS(), m_DiskStats.WriteByteCount.load())),
+ CloneDetails,
+ NiceTimeSpanMs(FilteredWrittenBytesPerSecond.GetElapsedTimeUS() / 1000),
+ NiceTimeSpanMs(WriteTimer.GetElapsedTimeMs()));
}
- LOG_OUTPUT(m_LogOutput,
- "Downloaded {} ({}bits/s) in {}. Wrote {} ({}B/s){} in {}. Completed in {}",
- NiceBytes(DownloadedBytes),
- NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), DownloadedBytes * 8)),
- NiceTimeSpanMs(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS() / 1000),
- NiceBytes(m_WrittenChunkByteCount.load()),
- NiceNum(GetBytesPerSecond(FilteredWrittenBytesPerSecond.GetElapsedTimeUS(), m_DiskStats.WriteByteCount.load())),
- CloneDetails,
- NiceTimeSpanMs(FilteredWrittenBytesPerSecond.GetElapsedTimeUS() / 1000),
- NiceTimeSpanMs(WriteTimer.GetElapsedTimeMs()));
- }
-
- m_WriteChunkStats.WriteChunksElapsedWallTimeUs = WriteTimer.GetElapsedTimeUs();
- m_WriteChunkStats.DownloadTimeUs = FilteredDownloadedBytesPerSecond.GetElapsedTimeUS();
- m_WriteChunkStats.WriteTimeUs = FilteredWrittenBytesPerSecond.GetElapsedTimeUS();
- }
- if (m_Options.PrimeCacheOnly)
- {
- return;
- }
+ m_WriteChunkStats.WriteChunksElapsedWallTimeUs = WriteTimer.GetElapsedTimeUs();
+ m_WriteChunkStats.DownloadTimeUs = FilteredDownloadedBytesPerSecond.GetElapsedTimeUS();
+ m_WriteChunkStats.WriteTimeUs = FilteredWrittenBytesPerSecond.GetElapsedTimeUS();
+ }
- m_LogOutput.SetLogOperationProgress(TaskSteps::PrepareTarget, TaskSteps::StepCount);
+ if (m_Options.PrimeCacheOnly)
+ {
+ return;
+ }
- tsl::robin_map<uint32_t, uint32_t> RemotePathIndexToLocalPathIndex;
- RemotePathIndexToLocalPathIndex.reserve(m_RemoteContent.Paths.size());
+ m_LogOutput.SetLogOperationProgress(TaskSteps::PrepareTarget, TaskSteps::StepCount);
- tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> SequenceHashToLocalPathIndex;
- std::vector<uint32_t> RemoveLocalPathIndexes;
+ tsl::robin_map<uint32_t, uint32_t> RemotePathIndexToLocalPathIndex;
+ RemotePathIndexToLocalPathIndex.reserve(m_RemoteContent.Paths.size());
- if (m_AbortFlag)
- {
- return;
- }
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> SequenceHashToLocalPathIndex;
+ std::vector<uint32_t> RemoveLocalPathIndexes;
- {
- ZEN_TRACE_CPU("PrepareTarget");
-
- tsl::robin_set<IoHash, IoHash::Hasher> CachedRemoteSequences;
- tsl::robin_map<std::string, uint32_t> RemotePathToRemoteIndex;
- RemotePathToRemoteIndex.reserve(m_RemoteContent.Paths.size());
- for (uint32_t RemotePathIndex = 0; RemotePathIndex < m_RemoteContent.Paths.size(); RemotePathIndex++)
+ if (m_AbortFlag)
{
- RemotePathToRemoteIndex.insert({m_RemoteContent.Paths[RemotePathIndex].generic_string(), RemotePathIndex});
+ return;
}
- std::vector<uint32_t> FilesToCache;
-
- uint64_t MatchCount = 0;
- uint64_t PathMismatchCount = 0;
- uint64_t HashMismatchCount = 0;
- std::atomic<uint64_t> CachedCount = 0;
- std::atomic<uint64_t> CachedByteCount = 0;
- uint64_t SkippedCount = 0;
- uint64_t DeleteCount = 0;
- for (uint32_t LocalPathIndex = 0; LocalPathIndex < m_LocalContent.Paths.size(); LocalPathIndex++)
{
- if (m_AbortFlag)
+ ZEN_TRACE_CPU("PrepareTarget");
+
+ tsl::robin_set<IoHash, IoHash::Hasher> CachedRemoteSequences;
+ tsl::robin_map<std::string, uint32_t> RemotePathToRemoteIndex;
+ RemotePathToRemoteIndex.reserve(m_RemoteContent.Paths.size());
+ for (uint32_t RemotePathIndex = 0; RemotePathIndex < m_RemoteContent.Paths.size(); RemotePathIndex++)
{
- break;
+ RemotePathToRemoteIndex.insert({m_RemoteContent.Paths[RemotePathIndex].generic_string(), RemotePathIndex});
}
- const IoHash& RawHash = m_LocalContent.RawHashes[LocalPathIndex];
- const std::filesystem::path& LocalPath = m_LocalContent.Paths[LocalPathIndex];
- ZEN_ASSERT_SLOW(IsFile((m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred()));
+ std::vector<uint32_t> FilesToCache;
- if (m_Options.EnableTargetFolderScavenging)
+ uint64_t MatchCount = 0;
+ uint64_t PathMismatchCount = 0;
+ uint64_t HashMismatchCount = 0;
+ std::atomic<uint64_t> CachedCount = 0;
+ std::atomic<uint64_t> CachedByteCount = 0;
+ uint64_t SkippedCount = 0;
+ uint64_t DeleteCount = 0;
+ for (uint32_t LocalPathIndex = 0; LocalPathIndex < m_LocalContent.Paths.size(); LocalPathIndex++)
{
- if (!m_Options.WipeTargetFolder)
+ if (m_AbortFlag)
+ {
+ break;
+ }
+ const IoHash& RawHash = m_LocalContent.RawHashes[LocalPathIndex];
+ const std::filesystem::path& LocalPath = m_LocalContent.Paths[LocalPathIndex];
+
+ ZEN_ASSERT_SLOW(IsFile((m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred()));
+
+ if (m_Options.EnableTargetFolderScavenging)
{
- if (auto RemotePathIt = RemotePathToRemoteIndex.find(LocalPath.generic_string());
- RemotePathIt != RemotePathToRemoteIndex.end())
+ if (!m_Options.WipeTargetFolder)
{
- const uint32_t RemotePathIndex = RemotePathIt->second;
- if (m_RemoteContent.RawHashes[RemotePathIndex] == RawHash)
+ if (auto RemotePathIt = RemotePathToRemoteIndex.find(LocalPath.generic_string());
+ RemotePathIt != RemotePathToRemoteIndex.end())
{
- // It is already in it's desired place
- RemotePathIndexToLocalPathIndex[RemotePathIndex] = LocalPathIndex;
- SequenceHashToLocalPathIndex.insert({RawHash, LocalPathIndex});
- MatchCount++;
- continue;
+ const uint32_t RemotePathIndex = RemotePathIt->second;
+ if (m_RemoteContent.RawHashes[RemotePathIndex] == RawHash)
+ {
+ // It is already in it's desired place
+ RemotePathIndexToLocalPathIndex[RemotePathIndex] = LocalPathIndex;
+ SequenceHashToLocalPathIndex.insert({RawHash, LocalPathIndex});
+ MatchCount++;
+ continue;
+ }
+ else
+ {
+ HashMismatchCount++;
+ }
}
else
{
- HashMismatchCount++;
+ PathMismatchCount++;
}
}
- else
- {
- PathMismatchCount++;
- }
- }
- if (m_RemoteLookup.RawHashToSequenceIndex.contains(RawHash))
- {
- if (!CachedRemoteSequences.contains(RawHash))
+ if (m_RemoteLookup.RawHashToSequenceIndex.contains(RawHash))
{
- ZEN_TRACE_CPU("MoveToCache");
- // We need it
- FilesToCache.push_back(LocalPathIndex);
- CachedRemoteSequences.insert(RawHash);
+ if (!CachedRemoteSequences.contains(RawHash))
+ {
+ ZEN_TRACE_CPU("MoveToCache");
+ // We need it
+ FilesToCache.push_back(LocalPathIndex);
+ CachedRemoteSequences.insert(RawHash);
+ }
+ else
+ {
+ // We already have it
+ SkippedCount++;
+ }
}
- else
+ else if (!m_Options.WipeTargetFolder)
{
- // We already have it
- SkippedCount++;
+ // We don't need it
+ RemoveLocalPathIndexes.push_back(LocalPathIndex);
+ DeleteCount++;
}
}
- else if (!m_Options.WipeTargetFolder)
+ else
{
- // We don't need it
+ // Delete local file as we did not scavenge the folder
RemoveLocalPathIndexes.push_back(LocalPathIndex);
DeleteCount++;
}
}
- else
+
+ if (m_AbortFlag)
{
- // Delete local file as we did not scavenge the folder
- RemoveLocalPathIndexes.push_back(LocalPathIndex);
- DeleteCount++;
+ return;
}
- }
- if (m_AbortFlag)
- {
- return;
- }
-
- {
- ZEN_TRACE_CPU("CopyToCache");
+ {
+ ZEN_TRACE_CPU("CopyToCache");
- Stopwatch Timer;
+ Stopwatch Timer;
- std::unique_ptr<BuildOpLogOutput::ProgressBar> CacheLocalProgressBarPtr(m_LogOutput.CreateProgressBar("Cache Local Data"));
- BuildOpLogOutput::ProgressBar& CacheLocalProgressBar(*CacheLocalProgressBarPtr);
- ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ std::unique_ptr<BuildOpLogOutput::ProgressBar> CacheLocalProgressBarPtr(m_LogOutput.CreateProgressBar("Cache Local Data"));
+ BuildOpLogOutput::ProgressBar& CacheLocalProgressBar(*CacheLocalProgressBarPtr);
+ ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
- for (uint32_t LocalPathIndex : FilesToCache)
- {
- if (m_AbortFlag)
+ for (uint32_t LocalPathIndex : FilesToCache)
{
- break;
- }
- Work.ScheduleWork(m_IOWorkerPool, [this, &CachedCount, &CachedByteCount, LocalPathIndex](std::atomic<bool>&) {
- if (!m_AbortFlag)
+ if (m_AbortFlag)
{
- ZEN_TRACE_CPU("Async_CopyToCache");
+ break;
+ }
+ Work.ScheduleWork(m_IOWorkerPool, [this, &CachedCount, &CachedByteCount, LocalPathIndex](std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_CopyToCache");
- const IoHash& RawHash = m_LocalContent.RawHashes[LocalPathIndex];
- const std::filesystem::path& LocalPath = m_LocalContent.Paths[LocalPathIndex];
- const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RawHash);
- ZEN_ASSERT_SLOW(!IsFileWithRetry(CacheFilePath));
- const std::filesystem::path LocalFilePath = (m_Path / LocalPath).make_preferred();
+ const IoHash& RawHash = m_LocalContent.RawHashes[LocalPathIndex];
+ const std::filesystem::path& LocalPath = m_LocalContent.Paths[LocalPathIndex];
+ const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RawHash);
+ ZEN_ASSERT_SLOW(!IsFileWithRetry(CacheFilePath));
+ const std::filesystem::path LocalFilePath = (m_Path / LocalPath).make_preferred();
- std::error_code Ec = RenameFileWithRetry(LocalFilePath, CacheFilePath);
- if (Ec)
- {
- LOG_OUTPUT_WARN(m_LogOutput,
- "Failed to move file from '{}' to '{}', reason: ({}) {}, retrying...",
- LocalFilePath,
- CacheFilePath,
- Ec.value(),
- Ec.message());
- Ec = RenameFileWithRetry(LocalFilePath, CacheFilePath);
+ std::error_code Ec = RenameFileWithRetry(LocalFilePath, CacheFilePath);
if (Ec)
{
- throw std::system_error(std::error_code(Ec.value(), std::system_category()),
- fmt::format("Failed to file from '{}' to '{}', reason: ({}) {}",
- LocalFilePath,
- CacheFilePath,
- Ec.value(),
- Ec.message()));
+ LOG_OUTPUT_WARN(m_LogOutput,
+ "Failed to move file from '{}' to '{}', reason: ({}) {}, retrying...",
+ LocalFilePath,
+ CacheFilePath,
+ Ec.value(),
+ Ec.message());
+ Ec = RenameFileWithRetry(LocalFilePath, CacheFilePath);
+ if (Ec)
+ {
+ throw std::system_error(std::error_code(Ec.value(), std::system_category()),
+ fmt::format("Failed to file from '{}' to '{}', reason: ({}) {}",
+ LocalFilePath,
+ CacheFilePath,
+ Ec.value(),
+ Ec.message()));
+ }
}
+
+ CachedCount++;
+ CachedByteCount += m_LocalContent.RawSizes[LocalPathIndex];
}
+ });
+ }
- CachedCount++;
- CachedByteCount += m_LocalContent.RawSizes[LocalPathIndex];
- }
- });
- }
+ {
+ ZEN_TRACE_CPU("CopyToCache_Wait");
+
+ Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
+ const uint64_t WorkTotal = FilesToCache.size();
+ const uint64_t WorkComplete = CachedCount.load();
+ std::string Details = fmt::format("{}/{} ({}) files", WorkComplete, WorkTotal, NiceBytes(CachedByteCount));
+ CacheLocalProgressBar.UpdateState(
+ {.Task = "Caching local ",
+ .Details = Details,
+ .TotalCount = gsl::narrow<uint64_t>(WorkTotal),
+ .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete),
+ .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ });
+ }
- {
- ZEN_TRACE_CPU("CopyToCache_Wait");
+ CacheLocalProgressBar.Finish();
+ if (m_AbortFlag)
+ {
+ return;
+ }
- Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(PendingWork);
- const uint64_t WorkTotal = FilesToCache.size();
- const uint64_t WorkComplete = CachedCount.load();
- std::string Details = fmt::format("{}/{} ({}) files", WorkComplete, WorkTotal, NiceBytes(CachedByteCount));
- CacheLocalProgressBar.UpdateState(
- {.Task = "Caching local ",
- .Details = Details,
- .TotalCount = gsl::narrow<uint64_t>(WorkTotal),
- .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete),
- .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
- false);
- });
+ LOG_OUTPUT_DEBUG(m_LogOutput,
+ "Local state prep: Match: {}, PathMismatch: {}, HashMismatch: {}, Cached: {} ({}), Skipped: {}, "
+ "Delete: {}",
+ MatchCount,
+ PathMismatchCount,
+ HashMismatchCount,
+ CachedCount.load(),
+ NiceBytes(CachedByteCount.load()),
+ SkippedCount,
+ DeleteCount);
}
+ }
- CacheLocalProgressBar.Finish();
- if (m_AbortFlag)
+ m_LogOutput.SetLogOperationProgress(TaskSteps::FinalizeTarget, TaskSteps::StepCount);
+
+ if (m_Options.WipeTargetFolder)
+ {
+ ZEN_TRACE_CPU("WipeTarget");
+ Stopwatch Timer;
+
+ // Clean target folder
+ if (!CleanDirectory(m_LogOutput, m_IOWorkerPool, m_AbortFlag, m_PauseFlag, m_Options.IsQuiet, m_Path, m_Options.ExcludeFolders))
{
- return;
+ LOG_OUTPUT_WARN(m_LogOutput, "Some files in {} could not be removed", m_Path);
}
-
- LOG_OUTPUT_DEBUG(m_LogOutput,
- "Local state prep: Match: {}, PathMismatch: {}, HashMismatch: {}, Cached: {} ({}), Skipped: {}, "
- "Delete: {}",
- MatchCount,
- PathMismatchCount,
- HashMismatchCount,
- CachedCount.load(),
- NiceBytes(CachedByteCount.load()),
- SkippedCount,
- DeleteCount);
+ m_RebuildFolderStateStats.CleanFolderElapsedWallTimeUs = Timer.GetElapsedTimeUs();
}
- }
- m_LogOutput.SetLogOperationProgress(TaskSteps::FinalizeTarget, TaskSteps::StepCount);
-
- if (m_Options.WipeTargetFolder)
- {
- ZEN_TRACE_CPU("WipeTarget");
- Stopwatch Timer;
-
- // Clean target folder
- if (!CleanDirectory(m_LogOutput, m_IOWorkerPool, m_AbortFlag, m_PauseFlag, m_Options.IsQuiet, m_Path, m_Options.ExcludeFolders))
+ if (m_AbortFlag)
{
- LOG_OUTPUT_WARN(m_LogOutput, "Some files in {} could not be removed", m_Path);
+ return;
}
- m_RebuildFolderStateStats.CleanFolderElapsedWallTimeUs = Timer.GetElapsedTimeUs();
- }
-
- if (m_AbortFlag)
- {
- return;
- }
- {
- ZEN_TRACE_CPU("FinalizeTree");
+ {
+ ZEN_TRACE_CPU("FinalizeTree");
- Stopwatch Timer;
+ Stopwatch Timer;
- std::unique_ptr<BuildOpLogOutput::ProgressBar> RebuildProgressBarPtr(m_LogOutput.CreateProgressBar("Rebuild State"));
- BuildOpLogOutput::ProgressBar& RebuildProgressBar(*RebuildProgressBarPtr);
- ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ std::unique_ptr<BuildOpLogOutput::ProgressBar> RebuildProgressBarPtr(m_LogOutput.CreateProgressBar("Rebuild State"));
+ BuildOpLogOutput::ProgressBar& RebuildProgressBar(*RebuildProgressBarPtr);
+ ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
- OutLocalFolderState.Paths.resize(m_RemoteContent.Paths.size());
- OutLocalFolderState.RawSizes.resize(m_RemoteContent.Paths.size());
- OutLocalFolderState.Attributes.resize(m_RemoteContent.Paths.size());
- OutLocalFolderState.ModificationTicks.resize(m_RemoteContent.Paths.size());
+ OutLocalFolderState.Paths.resize(m_RemoteContent.Paths.size());
+ OutLocalFolderState.RawSizes.resize(m_RemoteContent.Paths.size());
+ OutLocalFolderState.Attributes.resize(m_RemoteContent.Paths.size());
+ OutLocalFolderState.ModificationTicks.resize(m_RemoteContent.Paths.size());
- std::atomic<uint64_t> DeletedCount = 0;
+ std::atomic<uint64_t> DeletedCount = 0;
- for (uint32_t LocalPathIndex : RemoveLocalPathIndexes)
- {
- if (m_AbortFlag)
+ for (uint32_t LocalPathIndex : RemoveLocalPathIndexes)
{
- break;
- }
- Work.ScheduleWork(m_IOWorkerPool, [this, &DeletedCount, LocalPathIndex](std::atomic<bool>&) {
- if (!m_AbortFlag)
+ if (m_AbortFlag)
{
- ZEN_TRACE_CPU("Async_RemoveFile");
-
- const std::filesystem::path LocalFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred();
- SetFileReadOnlyWithRetry(LocalFilePath, false);
- RemoveFileWithRetry(LocalFilePath);
- DeletedCount++;
+ break;
}
- });
- }
+ Work.ScheduleWork(m_IOWorkerPool, [this, &DeletedCount, LocalPathIndex](std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_RemoveFile");
- std::atomic<uint64_t> TargetsComplete = 0;
+ const std::filesystem::path LocalFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred();
+ SetFileReadOnlyWithRetry(LocalFilePath, false);
+ RemoveFileWithRetry(LocalFilePath);
+ DeletedCount++;
+ }
+ });
+ }
- struct FinalizeTarget
- {
- IoHash RawHash;
- uint32_t RemotePathIndex;
- };
+ std::atomic<uint64_t> TargetsComplete = 0;
- std::vector<FinalizeTarget> Targets;
- Targets.reserve(m_RemoteContent.Paths.size());
- for (uint32_t RemotePathIndex = 0; RemotePathIndex < m_RemoteContent.Paths.size(); RemotePathIndex++)
- {
- Targets.push_back(FinalizeTarget{.RawHash = m_RemoteContent.RawHashes[RemotePathIndex], .RemotePathIndex = RemotePathIndex});
- }
- std::sort(Targets.begin(), Targets.end(), [](const FinalizeTarget& Lhs, const FinalizeTarget& Rhs) {
- if (Lhs.RawHash < Rhs.RawHash)
- {
- return true;
- }
- else if (Lhs.RawHash > Rhs.RawHash)
+ struct FinalizeTarget
{
- return false;
- }
- return Lhs.RemotePathIndex < Rhs.RemotePathIndex;
- });
+ IoHash RawHash;
+ uint32_t RemotePathIndex;
+ };
- size_t TargetOffset = 0;
- while (TargetOffset < Targets.size())
- {
- if (m_AbortFlag)
+ std::vector<FinalizeTarget> Targets;
+ Targets.reserve(m_RemoteContent.Paths.size());
+ for (uint32_t RemotePathIndex = 0; RemotePathIndex < m_RemoteContent.Paths.size(); RemotePathIndex++)
{
- break;
+ Targets.push_back(
+ FinalizeTarget{.RawHash = m_RemoteContent.RawHashes[RemotePathIndex], .RemotePathIndex = RemotePathIndex});
}
+ std::sort(Targets.begin(), Targets.end(), [](const FinalizeTarget& Lhs, const FinalizeTarget& Rhs) {
+ if (Lhs.RawHash < Rhs.RawHash)
+ {
+ return true;
+ }
+ else if (Lhs.RawHash > Rhs.RawHash)
+ {
+ return false;
+ }
+ return Lhs.RemotePathIndex < Rhs.RemotePathIndex;
+ });
- size_t TargetCount = 1;
- while ((TargetOffset + TargetCount) < Targets.size() &&
- (Targets[TargetOffset + TargetCount].RawHash == Targets[TargetOffset].RawHash))
+ size_t TargetOffset = 0;
+ while (TargetOffset < Targets.size())
{
- TargetCount++;
- }
-
- Work.ScheduleWork(
- m_IOWorkerPool,
- [this,
- &SequenceHashToLocalPathIndex,
- &Targets,
- &RemotePathIndexToLocalPathIndex,
- &OutLocalFolderState,
- BaseTargetOffset = TargetOffset,
- TargetCount,
- &TargetsComplete](std::atomic<bool>&) {
- if (!m_AbortFlag)
- {
- ZEN_TRACE_CPU("Async_FinalizeChunkSequence");
+ if (m_AbortFlag)
+ {
+ break;
+ }
- size_t TargetOffset = BaseTargetOffset;
- const IoHash& RawHash = Targets[TargetOffset].RawHash;
+ size_t TargetCount = 1;
+ while ((TargetOffset + TargetCount) < Targets.size() &&
+ (Targets[TargetOffset + TargetCount].RawHash == Targets[TargetOffset].RawHash))
+ {
+ TargetCount++;
+ }
- if (RawHash == IoHash::Zero)
+ Work.ScheduleWork(
+ m_IOWorkerPool,
+ [this,
+ &SequenceHashToLocalPathIndex,
+ &Targets,
+ &RemotePathIndexToLocalPathIndex,
+ &OutLocalFolderState,
+ BaseTargetOffset = TargetOffset,
+ TargetCount,
+ &TargetsComplete](std::atomic<bool>&) {
+ if (!m_AbortFlag)
{
- ZEN_TRACE_CPU("CreateEmptyFiles");
- while (TargetOffset < (BaseTargetOffset + TargetCount))
+ ZEN_TRACE_CPU("Async_FinalizeChunkSequence");
+
+ size_t TargetOffset = BaseTargetOffset;
+ const IoHash& RawHash = Targets[TargetOffset].RawHash;
+
+ if (RawHash == IoHash::Zero)
{
- const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex;
- ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash);
- const std::filesystem::path& TargetPath = m_RemoteContent.Paths[RemotePathIndex];
- std::filesystem::path TargetFilePath = (m_Path / TargetPath).make_preferred();
- if (!RemotePathIndexToLocalPathIndex[RemotePathIndex])
+ ZEN_TRACE_CPU("CreateEmptyFiles");
+ while (TargetOffset < (BaseTargetOffset + TargetCount))
{
- if (IsFileWithRetry(TargetFilePath))
- {
- SetFileReadOnlyWithRetry(TargetFilePath, false);
- }
- else
+ const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex;
+ ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash);
+ const std::filesystem::path& TargetPath = m_RemoteContent.Paths[RemotePathIndex];
+ std::filesystem::path TargetFilePath = (m_Path / TargetPath).make_preferred();
+ if (!RemotePathIndexToLocalPathIndex[RemotePathIndex])
{
- CreateDirectories(TargetFilePath.parent_path());
+ if (IsFileWithRetry(TargetFilePath))
+ {
+ SetFileReadOnlyWithRetry(TargetFilePath, false);
+ }
+ else
+ {
+ CreateDirectories(TargetFilePath.parent_path());
+ }
+ BasicFile OutputFile;
+ OutputFile.Open(TargetFilePath, BasicFile::Mode::kTruncate);
}
- BasicFile OutputFile;
- OutputFile.Open(TargetFilePath, BasicFile::Mode::kTruncate);
+ OutLocalFolderState.Paths[RemotePathIndex] = TargetPath;
+ OutLocalFolderState.RawSizes[RemotePathIndex] = m_RemoteContent.RawSizes[RemotePathIndex];
+
+ OutLocalFolderState.Attributes[RemotePathIndex] =
+ m_RemoteContent.Attributes.empty()
+ ? GetNativeFileAttributes(TargetFilePath)
+ : SetNativeFileAttributes(TargetFilePath,
+ m_RemoteContent.Platform,
+ m_RemoteContent.Attributes[RemotePathIndex]);
+ OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath);
+
+ TargetOffset++;
+ TargetsComplete++;
}
- OutLocalFolderState.Paths[RemotePathIndex] = TargetPath;
- OutLocalFolderState.RawSizes[RemotePathIndex] = m_RemoteContent.RawSizes[RemotePathIndex];
-
- OutLocalFolderState.Attributes[RemotePathIndex] =
- m_RemoteContent.Attributes.empty()
- ? GetNativeFileAttributes(TargetFilePath)
- : SetNativeFileAttributes(TargetFilePath,
- m_RemoteContent.Platform,
- m_RemoteContent.Attributes[RemotePathIndex]);
- OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath);
-
- TargetOffset++;
- TargetsComplete++;
- }
- }
- else
- {
- ZEN_TRACE_CPU("FinalizeFile");
- ZEN_ASSERT(m_RemoteLookup.RawHashToSequenceIndex.contains(RawHash));
- const uint32_t FirstRemotePathIndex = Targets[TargetOffset].RemotePathIndex;
- const std::filesystem::path& FirstTargetPath = m_RemoteContent.Paths[FirstRemotePathIndex];
- std::filesystem::path FirstTargetFilePath = (m_Path / FirstTargetPath).make_preferred();
-
- if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(FirstRemotePathIndex);
- InPlaceIt != RemotePathIndexToLocalPathIndex.end())
- {
- ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath));
}
else
{
- if (IsFileWithRetry(FirstTargetFilePath))
- {
- SetFileReadOnlyWithRetry(FirstTargetFilePath, false);
- }
- else
- {
- CreateDirectories(FirstTargetFilePath.parent_path());
- }
+ ZEN_TRACE_CPU("FinalizeFile");
+ ZEN_ASSERT(m_RemoteLookup.RawHashToSequenceIndex.contains(RawHash));
+ const uint32_t FirstRemotePathIndex = Targets[TargetOffset].RemotePathIndex;
+ const std::filesystem::path& FirstTargetPath = m_RemoteContent.Paths[FirstRemotePathIndex];
+ std::filesystem::path FirstTargetFilePath = (m_Path / FirstTargetPath).make_preferred();
- if (auto InplaceIt = SequenceHashToLocalPathIndex.find(RawHash);
- InplaceIt != SequenceHashToLocalPathIndex.end())
+ if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(FirstRemotePathIndex);
+ InPlaceIt != RemotePathIndexToLocalPathIndex.end())
{
- ZEN_TRACE_CPU("Copy");
- const uint32_t LocalPathIndex = InplaceIt->second;
- const std::filesystem::path& SourcePath = m_LocalContent.Paths[LocalPathIndex];
- std::filesystem::path SourceFilePath = (m_Path / SourcePath).make_preferred();
- ZEN_ASSERT_SLOW(IsFileWithRetry(SourceFilePath));
-
- LOG_OUTPUT_DEBUG(m_LogOutput, "Copying from '{}' -> '{}'", SourceFilePath, FirstTargetFilePath);
- const uint64_t RawSize = m_LocalContent.RawSizes[LocalPathIndex];
- FastCopyFile(m_Options.AllowFileClone,
- m_Options.UseSparseFiles,
- SourceFilePath,
- FirstTargetFilePath,
- RawSize,
- m_DiskStats.WriteCount,
- m_DiskStats.WriteByteCount,
- m_DiskStats.CloneCount,
- m_DiskStats.CloneByteCount);
-
- m_RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++;
+ ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath));
}
else
{
- ZEN_TRACE_CPU("Rename");
- const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RawHash);
- ZEN_ASSERT_SLOW(IsFileWithRetry(CacheFilePath));
+ if (IsFileWithRetry(FirstTargetFilePath))
+ {
+ SetFileReadOnlyWithRetry(FirstTargetFilePath, false);
+ }
+ else
+ {
+ CreateDirectories(FirstTargetFilePath.parent_path());
+ }
- std::error_code Ec = RenameFileWithRetry(CacheFilePath, FirstTargetFilePath);
- if (Ec)
+ if (auto InplaceIt = SequenceHashToLocalPathIndex.find(RawHash);
+ InplaceIt != SequenceHashToLocalPathIndex.end())
+ {
+ ZEN_TRACE_CPU("Copy");
+ const uint32_t LocalPathIndex = InplaceIt->second;
+ const std::filesystem::path& SourcePath = m_LocalContent.Paths[LocalPathIndex];
+ std::filesystem::path SourceFilePath = (m_Path / SourcePath).make_preferred();
+ ZEN_ASSERT_SLOW(IsFileWithRetry(SourceFilePath));
+
+ LOG_OUTPUT_DEBUG(m_LogOutput, "Copying from '{}' -> '{}'", SourceFilePath, FirstTargetFilePath);
+ const uint64_t RawSize = m_LocalContent.RawSizes[LocalPathIndex];
+ FastCopyFile(m_Options.AllowFileClone,
+ m_Options.UseSparseFiles,
+ SourceFilePath,
+ FirstTargetFilePath,
+ RawSize,
+ m_DiskStats.WriteCount,
+ m_DiskStats.WriteByteCount,
+ m_DiskStats.CloneCount,
+ m_DiskStats.CloneByteCount);
+
+ m_RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++;
+ }
+ else
{
- LOG_OUTPUT_WARN(m_LogOutput,
- "Failed to move file from '{}' to '{}', reason: ({}) {}, retrying...",
- CacheFilePath,
- FirstTargetFilePath,
- Ec.value(),
- Ec.message());
- Ec = RenameFileWithRetry(CacheFilePath, FirstTargetFilePath);
+ ZEN_TRACE_CPU("Rename");
+ const std::filesystem::path CacheFilePath =
+ GetFinalChunkedSequenceFileName(m_CacheFolderPath, RawHash);
+ ZEN_ASSERT_SLOW(IsFileWithRetry(CacheFilePath));
+
+ std::error_code Ec = RenameFileWithRetry(CacheFilePath, FirstTargetFilePath);
if (Ec)
{
- throw std::system_error(std::error_code(Ec.value(), std::system_category()),
- fmt::format("Failed to move file from '{}' to '{}', reason: ({}) {}",
- CacheFilePath,
- FirstTargetFilePath,
- Ec.value(),
- Ec.message()));
+ LOG_OUTPUT_WARN(m_LogOutput,
+ "Failed to move file from '{}' to '{}', reason: ({}) {}, retrying...",
+ CacheFilePath,
+ FirstTargetFilePath,
+ Ec.value(),
+ Ec.message());
+ Ec = RenameFileWithRetry(CacheFilePath, FirstTargetFilePath);
+ if (Ec)
+ {
+ throw std::system_error(
+ std::error_code(Ec.value(), std::system_category()),
+ fmt::format("Failed to move file from '{}' to '{}', reason: ({}) {}",
+ CacheFilePath,
+ FirstTargetFilePath,
+ Ec.value(),
+ Ec.message()));
+ }
}
- }
- m_RebuildFolderStateStats.FinalizeTreeFilesMovedCount++;
+ m_RebuildFolderStateStats.FinalizeTreeFilesMovedCount++;
+ }
}
- }
- OutLocalFolderState.Paths[FirstRemotePathIndex] = FirstTargetPath;
- OutLocalFolderState.RawSizes[FirstRemotePathIndex] = m_RemoteContent.RawSizes[FirstRemotePathIndex];
+ OutLocalFolderState.Paths[FirstRemotePathIndex] = FirstTargetPath;
+ OutLocalFolderState.RawSizes[FirstRemotePathIndex] = m_RemoteContent.RawSizes[FirstRemotePathIndex];
- OutLocalFolderState.Attributes[FirstRemotePathIndex] =
- m_RemoteContent.Attributes.empty()
- ? GetNativeFileAttributes(FirstTargetFilePath)
- : SetNativeFileAttributes(FirstTargetFilePath,
- m_RemoteContent.Platform,
- m_RemoteContent.Attributes[FirstRemotePathIndex]);
- OutLocalFolderState.ModificationTicks[FirstRemotePathIndex] = GetModificationTickFromPath(FirstTargetFilePath);
-
- TargetOffset++;
- TargetsComplete++;
+ OutLocalFolderState.Attributes[FirstRemotePathIndex] =
+ m_RemoteContent.Attributes.empty()
+ ? GetNativeFileAttributes(FirstTargetFilePath)
+ : SetNativeFileAttributes(FirstTargetFilePath,
+ m_RemoteContent.Platform,
+ m_RemoteContent.Attributes[FirstRemotePathIndex]);
+ OutLocalFolderState.ModificationTicks[FirstRemotePathIndex] =
+ GetModificationTickFromPath(FirstTargetFilePath);
- while (TargetOffset < (BaseTargetOffset + TargetCount))
- {
- const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex;
- ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash);
- const std::filesystem::path& TargetPath = m_RemoteContent.Paths[RemotePathIndex];
- std::filesystem::path TargetFilePath = (m_Path / TargetPath).make_preferred();
+ TargetOffset++;
+ TargetsComplete++;
- if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(RemotePathIndex);
- InPlaceIt != RemotePathIndexToLocalPathIndex.end())
- {
- ZEN_ASSERT_SLOW(IsFileWithRetry(TargetFilePath));
- }
- else
+ while (TargetOffset < (BaseTargetOffset + TargetCount))
{
- ZEN_TRACE_CPU("Copy");
- if (IsFileWithRetry(TargetFilePath))
+ const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex;
+ ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash);
+ const std::filesystem::path& TargetPath = m_RemoteContent.Paths[RemotePathIndex];
+ std::filesystem::path TargetFilePath = (m_Path / TargetPath).make_preferred();
+
+ if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(RemotePathIndex);
+ InPlaceIt != RemotePathIndexToLocalPathIndex.end())
{
- SetFileReadOnlyWithRetry(TargetFilePath, false);
+ ZEN_ASSERT_SLOW(IsFileWithRetry(TargetFilePath));
}
else
{
- CreateDirectories(TargetFilePath.parent_path());
- }
+ ZEN_TRACE_CPU("Copy");
+ if (IsFileWithRetry(TargetFilePath))
+ {
+ SetFileReadOnlyWithRetry(TargetFilePath, false);
+ }
+ else
+ {
+ CreateDirectories(TargetFilePath.parent_path());
+ }
- ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath));
- LOG_OUTPUT_DEBUG(m_LogOutput, "Copying from '{}' -> '{}'", FirstTargetFilePath, TargetFilePath);
- const uint64_t RawSize = m_RemoteContent.RawSizes[RemotePathIndex];
- FastCopyFile(m_Options.AllowFileClone,
- m_Options.UseSparseFiles,
- FirstTargetFilePath,
- TargetFilePath,
- RawSize,
- m_DiskStats.WriteCount,
- m_DiskStats.WriteByteCount,
- m_DiskStats.CloneCount,
- m_DiskStats.CloneByteCount);
-
- m_RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++;
- }
+ ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath));
+ LOG_OUTPUT_DEBUG(m_LogOutput, "Copying from '{}' -> '{}'", FirstTargetFilePath, TargetFilePath);
+ const uint64_t RawSize = m_RemoteContent.RawSizes[RemotePathIndex];
+ FastCopyFile(m_Options.AllowFileClone,
+ m_Options.UseSparseFiles,
+ FirstTargetFilePath,
+ TargetFilePath,
+ RawSize,
+ m_DiskStats.WriteCount,
+ m_DiskStats.WriteByteCount,
+ m_DiskStats.CloneCount,
+ m_DiskStats.CloneByteCount);
+
+ m_RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++;
+ }
- OutLocalFolderState.Paths[RemotePathIndex] = TargetPath;
- OutLocalFolderState.RawSizes[RemotePathIndex] = m_RemoteContent.RawSizes[RemotePathIndex];
+ OutLocalFolderState.Paths[RemotePathIndex] = TargetPath;
+ OutLocalFolderState.RawSizes[RemotePathIndex] = m_RemoteContent.RawSizes[RemotePathIndex];
- OutLocalFolderState.Attributes[RemotePathIndex] =
- m_RemoteContent.Attributes.empty()
- ? GetNativeFileAttributes(TargetFilePath)
- : SetNativeFileAttributes(TargetFilePath,
- m_RemoteContent.Platform,
- m_RemoteContent.Attributes[RemotePathIndex]);
- OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath);
+ OutLocalFolderState.Attributes[RemotePathIndex] =
+ m_RemoteContent.Attributes.empty()
+ ? GetNativeFileAttributes(TargetFilePath)
+ : SetNativeFileAttributes(TargetFilePath,
+ m_RemoteContent.Platform,
+ m_RemoteContent.Attributes[RemotePathIndex]);
+ OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath);
- TargetOffset++;
- TargetsComplete++;
+ TargetOffset++;
+ TargetsComplete++;
+ }
}
}
- }
- });
+ });
- TargetOffset += TargetCount;
- }
+ TargetOffset += TargetCount;
+ }
- {
- ZEN_TRACE_CPU("FinalizeTree_Wait");
+ {
+ ZEN_TRACE_CPU("FinalizeTree_Wait");
- Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(PendingWork);
- const uint64_t WorkTotal = Targets.size() + RemoveLocalPathIndexes.size();
- const uint64_t WorkComplete = TargetsComplete.load() + DeletedCount.load();
- std::string Details = fmt::format("{}/{} files", WorkComplete, WorkTotal);
- RebuildProgressBar.UpdateState({.Task = "Rebuilding state ",
- .Details = Details,
- .TotalCount = gsl::narrow<uint64_t>(WorkTotal),
- .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete),
- .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
- false);
- });
- }
+ Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
+ const uint64_t WorkTotal = Targets.size() + RemoveLocalPathIndexes.size();
+ const uint64_t WorkComplete = TargetsComplete.load() + DeletedCount.load();
+ std::string Details = fmt::format("{}/{} files", WorkComplete, WorkTotal);
+ RebuildProgressBar.UpdateState({.Task = "Rebuilding state ",
+ .Details = Details,
+ .TotalCount = gsl::narrow<uint64_t>(WorkTotal),
+ .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete),
+ .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ });
+ }
- m_RebuildFolderStateStats.FinalizeTreeElapsedWallTimeUs = Timer.GetElapsedTimeUs();
- RebuildProgressBar.Finish();
+ m_RebuildFolderStateStats.FinalizeTreeElapsedWallTimeUs = Timer.GetElapsedTimeUs();
+ RebuildProgressBar.Finish();
+ }
+ m_LogOutput.SetLogOperationProgress(TaskSteps::Cleanup, TaskSteps::StepCount);
+ }
+ catch (const std::exception&)
+ {
+ m_AbortFlag = true;
+ throw;
}
- m_LogOutput.SetLogOperationProgress(TaskSteps::Cleanup, TaskSteps::StepCount);
}
void
@@ -3152,7 +3082,10 @@ BuildsOperationUpdateFolder::WriteScavengedSequenceToCache(const std::filesystem
RenameFile(TempFilePath, CacheFilePath);
m_WrittenChunkByteCount += RawSize;
- m_ValidatedChunkByteCount += RawSize;
+ if (m_Options.ValidateCompletedSequences)
+ {
+ m_ValidatedChunkByteCount += RawSize;
+ }
}
void
@@ -3971,16 +3904,7 @@ BuildsOperationUpdateFolder::WriteCompressedChunkToCache(
{
const std::uint32_t SequenceIndex = ChunkTargetPtrs.front()->SequenceIndex;
const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex];
- StreamDecompress(m_AbortFlag,
- m_CacheFolderPath,
- SequenceRawHash,
- CompositeBuffer(std::move(CompressedPart)),
- m_DiskStats.ReadCount,
- m_DiskStats.ReadByteCount,
- m_DiskStats.WriteCount,
- m_DiskStats.WriteByteCount,
- m_WrittenChunkByteCount,
- m_ValidatedChunkByteCount);
+ StreamDecompress(SequenceRawHash, CompositeBuffer(std::move(CompressedPart)));
return false;
}
else
@@ -4039,6 +3963,92 @@ BuildsOperationUpdateFolder::WriteCompressedChunkToCache(
}
void
+BuildsOperationUpdateFolder::StreamDecompress(const IoHash& SequenceRawHash, CompositeBuffer&& CompressedPart)
+{
+ ZEN_TRACE_CPU("StreamDecompress");
+ const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash);
+ TemporaryFile DecompressedTemp;
+ std::error_code Ec;
+ DecompressedTemp.CreateTemporary(TempChunkSequenceFileName.parent_path(), Ec);
+ if (Ec)
+ {
+ throw std::runtime_error(fmt::format("Failed creating temporary file for decompressing large blob {}, reason: ({}) {}",
+ SequenceRawHash,
+ Ec.value(),
+ Ec.message()));
+ }
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompressedPart, RawHash, RawSize);
+ if (!Compressed)
+ {
+ throw std::runtime_error(fmt::format("Failed to parse header of compressed large blob {}", SequenceRawHash));
+ }
+ if (RawHash != SequenceRawHash)
+ {
+ throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, SequenceRawHash));
+ }
+ PrepareFileForScatteredWrite(DecompressedTemp.Handle(), RawSize);
+
+ IoHashStream Hash;
+ bool CouldDecompress =
+ Compressed.DecompressToStream(0,
+ (uint64_t)-1,
+ [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
+ ZEN_UNUSED(SourceOffset);
+ ZEN_TRACE_CPU("StreamDecompress_Write");
+ m_DiskStats.ReadCount++;
+ m_DiskStats.ReadByteCount += SourceSize;
+ if (!m_AbortFlag)
+ {
+ for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
+ {
+ if (m_Options.ValidateCompletedSequences)
+ {
+ Hash.Append(Segment.GetView());
+ m_ValidatedChunkByteCount += Segment.GetSize();
+ }
+ DecompressedTemp.Write(Segment, Offset);
+ Offset += Segment.GetSize();
+ m_DiskStats.WriteByteCount += Segment.GetSize();
+ m_DiskStats.WriteCount++;
+ m_WrittenChunkByteCount += Segment.GetSize();
+ }
+ return true;
+ }
+ return false;
+ });
+
+ if (m_AbortFlag)
+ {
+ return;
+ }
+
+ if (!CouldDecompress)
+ {
+ throw std::runtime_error(fmt::format("Failed to decompress large blob {}", SequenceRawHash));
+ }
+ if (m_Options.ValidateCompletedSequences)
+ {
+ const IoHash VerifyHash = Hash.GetHash();
+ if (VerifyHash != SequenceRawHash)
+ {
+ throw std::runtime_error(
+ fmt::format("Decompressed blob payload hash {} does not match expected hash {}", VerifyHash, SequenceRawHash));
+ }
+ }
+ DecompressedTemp.MoveTemporaryIntoPlace(TempChunkSequenceFileName, Ec);
+ if (Ec)
+ {
+ throw std::runtime_error(fmt::format("Failed moving temporary file for decompressing large blob {}, reason: ({}) {}",
+ SequenceRawHash,
+ Ec.value(),
+ Ec.message()));
+ }
+ // WriteChunkStats.ChunkCountWritten++;
+}
+
+void
BuildsOperationUpdateFolder::WriteSequenceChunkToCache(BufferedWriteFileCache::Local& LocalWriter,
const CompositeBuffer& Chunk,
const uint32_t SequenceIndex,
@@ -4458,28 +4468,40 @@ BuildsOperationUpdateFolder::VerifyAndCompleteChunkSequencesAsync(std::span<cons
return;
}
ZEN_TRACE_CPU("VerifyAndCompleteChunkSequence");
- for (uint32_t RemoteSequenceIndexOffset = 1; RemoteSequenceIndexOffset < RemoteSequenceIndexes.size(); RemoteSequenceIndexOffset++)
+ if (m_Options.ValidateCompletedSequences)
{
- const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset];
- Work.ScheduleWork(m_IOWorkerPool, [this, RemoteSequenceIndex](std::atomic<bool>&) {
- if (!m_AbortFlag)
- {
- ZEN_TRACE_CPU("Async_VerifyAndFinalizeSequence");
-
- VerifySequence(RemoteSequenceIndex);
+ for (uint32_t RemoteSequenceIndexOffset = 1; RemoteSequenceIndexOffset < RemoteSequenceIndexes.size(); RemoteSequenceIndexOffset++)
+ {
+ const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset];
+ Work.ScheduleWork(m_IOWorkerPool, [this, RemoteSequenceIndex](std::atomic<bool>&) {
if (!m_AbortFlag)
{
- const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- FinalizeChunkSequence(SequenceRawHash);
+ ZEN_TRACE_CPU("Async_VerifyAndFinalizeSequence");
+
+ VerifySequence(RemoteSequenceIndex);
+ if (!m_AbortFlag)
+ {
+ const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ FinalizeChunkSequence(SequenceRawHash);
+ }
}
- }
- });
- }
- const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[0];
+ });
+ }
+ const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[0];
- VerifySequence(RemoteSequenceIndex);
- const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- FinalizeChunkSequence(SequenceRawHash);
+ VerifySequence(RemoteSequenceIndex);
+ const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ FinalizeChunkSequence(SequenceRawHash);
+ }
+ else
+ {
+ for (uint32_t RemoteSequenceIndexOffset = 0; RemoteSequenceIndexOffset < RemoteSequenceIndexes.size(); RemoteSequenceIndexOffset++)
+ {
+ const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset];
+ const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ FinalizeChunkSequence(SequenceRawHash);
+ }
+ }
}
bool
@@ -4542,6 +4564,8 @@ BuildsOperationUpdateFolder::VerifySequence(uint32_t RemoteSequenceIndex)
{
ZEN_TRACE_CPU("VerifySequence");
+ ZEN_ASSERT(m_Options.ValidateCompletedSequences);
+
const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
{
ZEN_TRACE_CPU("HashSequence");
@@ -4607,763 +4631,773 @@ BuildsOperationUploadFolder::BuildsOperationUploadFolder(BuildOpLogOutput& L
void
BuildsOperationUploadFolder::Execute()
{
- enum TaskSteps : uint32_t
- {
- PrepareBuild,
- CalculateDelta,
- GenerateBlocks,
- BuildPartManifest,
- Upload,
- FinalizeBuild,
- Cleanup,
- StepCount
- };
+ ZEN_TRACE_CPU("BuildsOperationUploadFolder::Execute");
+ try
+ {
+ enum TaskSteps : uint32_t
+ {
+ PrepareBuild,
+ CalculateDelta,
+ GenerateBlocks,
+ BuildPartManifest,
+ Upload,
+ FinalizeBuild,
+ Cleanup,
+ StepCount
+ };
- auto EndProgress = MakeGuard([&]() { m_LogOutput.SetLogOperationProgress(TaskSteps::StepCount, TaskSteps::StepCount); });
+ auto EndProgress = MakeGuard([&]() { m_LogOutput.SetLogOperationProgress(TaskSteps::StepCount, TaskSteps::StepCount); });
- Stopwatch ProcessTimer;
+ Stopwatch ProcessTimer;
- CreateDirectories(m_Options.TempDir);
- CleanDirectory(m_Options.TempDir, {});
- auto _ = MakeGuard([&]() {
- if (CleanDirectory(m_Options.TempDir, {}))
- {
- std::error_code DummyEc;
- RemoveDir(m_Options.TempDir, DummyEc);
- }
- });
-
- m_LogOutput.SetLogOperationProgress(TaskSteps::PrepareBuild, TaskSteps::StepCount);
+ CreateDirectories(m_Options.TempDir);
+ CleanDirectory(m_Options.TempDir, {});
+ auto _ = MakeGuard([&]() {
+ if (CleanDirectory(m_Options.TempDir, {}))
+ {
+ std::error_code DummyEc;
+ RemoveDir(m_Options.TempDir, DummyEc);
+ }
+ });
- std::uint64_t TotalRawSize = 0;
+ m_LogOutput.SetLogOperationProgress(TaskSteps::PrepareBuild, TaskSteps::StepCount);
- CbObject ChunkerParameters;
+ std::uint64_t TotalRawSize = 0;
- struct PrepareBuildResult
- {
- std::vector<ChunkBlockDescription> KnownBlocks;
- uint64_t PreferredMultipartChunkSize = 0;
- uint64_t PayloadSize = 0;
- uint64_t PrepareBuildTimeMs = 0;
- uint64_t FindBlocksTimeMs = 0;
- uint64_t ElapsedTimeMs = 0;
- };
+ CbObject ChunkerParameters;
- std::future<PrepareBuildResult> PrepBuildResultFuture = m_NetworkPool.EnqueueTask(
- std::packaged_task<PrepareBuildResult()>{[this] {
- ZEN_TRACE_CPU("PrepareBuild");
+ struct PrepareBuildResult
+ {
+ std::vector<ChunkBlockDescription> KnownBlocks;
+ uint64_t PreferredMultipartChunkSize = 0;
+ uint64_t PayloadSize = 0;
+ uint64_t PrepareBuildTimeMs = 0;
+ uint64_t FindBlocksTimeMs = 0;
+ uint64_t ElapsedTimeMs = 0;
+ };
- PrepareBuildResult Result;
- Result.PreferredMultipartChunkSize = m_Options.PreferredMultipartChunkSize;
- Stopwatch Timer;
- if (m_CreateBuild)
- {
- ZEN_TRACE_CPU("CreateBuild");
+ std::future<PrepareBuildResult> PrepBuildResultFuture = m_NetworkPool.EnqueueTask(
+ std::packaged_task<PrepareBuildResult()>{[this] {
+ ZEN_TRACE_CPU("PrepareBuild");
- Stopwatch PutBuildTimer;
- CbObject PutBuildResult = m_Storage.BuildStorage->PutBuild(m_BuildId, m_MetaData);
- Result.PrepareBuildTimeMs = PutBuildTimer.GetElapsedTimeMs();
- Result.PreferredMultipartChunkSize = PutBuildResult["chunkSize"sv].AsUInt64(Result.PreferredMultipartChunkSize);
- Result.PayloadSize = m_MetaData.GetSize();
- }
- else
- {
- ZEN_TRACE_CPU("PutBuild");
- Stopwatch GetBuildTimer;
- CbObject Build = m_Storage.BuildStorage->GetBuild(m_BuildId);
- Result.PrepareBuildTimeMs = GetBuildTimer.GetElapsedTimeMs();
- Result.PayloadSize = Build.GetSize();
- if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0)
+ PrepareBuildResult Result;
+ Result.PreferredMultipartChunkSize = m_Options.PreferredMultipartChunkSize;
+ Stopwatch Timer;
+ if (m_CreateBuild)
{
- Result.PreferredMultipartChunkSize = ChunkSize;
+ ZEN_TRACE_CPU("CreateBuild");
+
+ Stopwatch PutBuildTimer;
+ CbObject PutBuildResult = m_Storage.BuildStorage->PutBuild(m_BuildId, m_MetaData);
+ Result.PrepareBuildTimeMs = PutBuildTimer.GetElapsedTimeMs();
+ Result.PreferredMultipartChunkSize = PutBuildResult["chunkSize"sv].AsUInt64(Result.PreferredMultipartChunkSize);
+ Result.PayloadSize = m_MetaData.GetSize();
}
- else if (m_Options.AllowMultiparts)
+ else
{
- LOG_OUTPUT_WARN(m_LogOutput,
- "PreferredMultipartChunkSize is unknown. Defaulting to '{}'",
- NiceBytes(Result.PreferredMultipartChunkSize));
+ ZEN_TRACE_CPU("PutBuild");
+ Stopwatch GetBuildTimer;
+ CbObject Build = m_Storage.BuildStorage->GetBuild(m_BuildId);
+ Result.PrepareBuildTimeMs = GetBuildTimer.GetElapsedTimeMs();
+ Result.PayloadSize = Build.GetSize();
+ if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0)
+ {
+ Result.PreferredMultipartChunkSize = ChunkSize;
+ }
+ else if (m_Options.AllowMultiparts)
+ {
+ LOG_OUTPUT_WARN(m_LogOutput,
+ "PreferredMultipartChunkSize is unknown. Defaulting to '{}'",
+ NiceBytes(Result.PreferredMultipartChunkSize));
+ }
}
- }
- if (!m_Options.IgnoreExistingBlocks)
- {
- ZEN_TRACE_CPU("FindBlocks");
- Stopwatch KnownBlocksTimer;
- CbObject BlockDescriptionList = m_Storage.BuildStorage->FindBlocks(m_BuildId, m_Options.FindBlockMaxCount);
- if (BlockDescriptionList)
+ if (!m_Options.IgnoreExistingBlocks)
{
- Result.KnownBlocks = ParseChunkBlockDescriptionList(BlockDescriptionList);
+ ZEN_TRACE_CPU("FindBlocks");
+ Stopwatch KnownBlocksTimer;
+ CbObject BlockDescriptionList = m_Storage.BuildStorage->FindBlocks(m_BuildId, m_Options.FindBlockMaxCount);
+ if (BlockDescriptionList)
+ {
+ Result.KnownBlocks = ParseChunkBlockDescriptionList(BlockDescriptionList);
+ }
+ m_FindBlocksStats.FindBlockTimeMS = KnownBlocksTimer.GetElapsedTimeMs();
+ m_FindBlocksStats.FoundBlockCount = Result.KnownBlocks.size();
+ Result.FindBlocksTimeMs = KnownBlocksTimer.GetElapsedTimeMs();
}
- m_FindBlocksStats.FindBlockTimeMS = KnownBlocksTimer.GetElapsedTimeMs();
- m_FindBlocksStats.FoundBlockCount = Result.KnownBlocks.size();
- Result.FindBlocksTimeMs = KnownBlocksTimer.GetElapsedTimeMs();
- }
- Result.ElapsedTimeMs = Timer.GetElapsedTimeMs();
- return Result;
- }},
- WorkerThreadPool::EMode::EnableBacklog);
+ Result.ElapsedTimeMs = Timer.GetElapsedTimeMs();
+ return Result;
+ }},
+ WorkerThreadPool::EMode::EnableBacklog);
- ChunkedFolderContent LocalContent;
+ ChunkedFolderContent LocalContent;
- {
- Stopwatch ScanTimer;
- FolderContent Content;
- if (m_ManifestPath.empty())
{
- std::filesystem::path ExcludeManifestPath = m_Path / m_Options.ZenExcludeManifestName;
- tsl::robin_set<std::string> ExcludeAssetPaths;
- if (IsFile(ExcludeManifestPath))
+ Stopwatch ScanTimer;
+ FolderContent Content;
+ if (m_ManifestPath.empty())
{
- std::vector<std::filesystem::path> AssetPaths = ParseManifest(m_Path, ExcludeManifestPath);
- ExcludeAssetPaths.reserve(AssetPaths.size());
- for (const std::filesystem::path& AssetPath : AssetPaths)
+ std::filesystem::path ExcludeManifestPath = m_Path / m_Options.ZenExcludeManifestName;
+ tsl::robin_set<std::string> ExcludeAssetPaths;
+ if (IsFile(ExcludeManifestPath))
{
- ExcludeAssetPaths.insert(AssetPath.generic_string());
- }
- }
- Content = GetFolderContent(
- m_LocalFolderScanStats,
- m_Path,
- [this](const std::string_view& RelativePath) { return IsAcceptedFolder(RelativePath); },
- [this, &ExcludeAssetPaths](const std::string_view& RelativePath, uint64_t Size, uint32_t Attributes) -> bool {
- ZEN_UNUSED(Size, Attributes);
- if (!IsAcceptedFile(RelativePath))
- {
- return false;
- }
- if (ExcludeAssetPaths.contains(std::filesystem::path(RelativePath).generic_string()))
+ std::vector<std::filesystem::path> AssetPaths = ParseManifest(m_Path, ExcludeManifestPath);
+ ExcludeAssetPaths.reserve(AssetPaths.size());
+ for (const std::filesystem::path& AssetPath : AssetPaths)
{
- return false;
+ ExcludeAssetPaths.insert(AssetPath.generic_string());
}
- return true;
- },
- m_IOWorkerPool,
- m_LogOutput.GetProgressUpdateDelayMS(),
- [&](bool, std::ptrdiff_t) {
- LOG_OUTPUT(m_LogOutput, "Found {} files in '{}'...", m_LocalFolderScanStats.AcceptedFileCount.load(), m_Path);
- },
- m_AbortFlag);
- }
- else
- {
- Stopwatch ManifestParseTimer;
- std::vector<std::filesystem::path> AssetPaths = ParseManifest(m_Path, m_ManifestPath);
- for (const std::filesystem::path& AssetPath : AssetPaths)
+ }
+ Content = GetFolderContent(
+ m_LocalFolderScanStats,
+ m_Path,
+ [this](const std::string_view& RelativePath) { return IsAcceptedFolder(RelativePath); },
+ [this, &ExcludeAssetPaths](const std::string_view& RelativePath, uint64_t Size, uint32_t Attributes) -> bool {
+ ZEN_UNUSED(Size, Attributes);
+ if (!IsAcceptedFile(RelativePath))
+ {
+ return false;
+ }
+ if (ExcludeAssetPaths.contains(std::filesystem::path(RelativePath).generic_string()))
+ {
+ return false;
+ }
+ return true;
+ },
+ m_IOWorkerPool,
+ m_LogOutput.GetProgressUpdateDelayMS(),
+ [&](bool, std::ptrdiff_t) {
+ LOG_OUTPUT(m_LogOutput, "Found {} files in '{}'...", m_LocalFolderScanStats.AcceptedFileCount.load(), m_Path);
+ },
+ m_AbortFlag);
+ }
+ else
{
- Content.Paths.push_back(AssetPath);
- const std::filesystem::path AssetFilePath = (m_Path / AssetPath).make_preferred();
- Content.RawSizes.push_back(FileSizeFromPath(AssetFilePath));
+ Stopwatch ManifestParseTimer;
+ std::vector<std::filesystem::path> AssetPaths = ParseManifest(m_Path, m_ManifestPath);
+ for (const std::filesystem::path& AssetPath : AssetPaths)
+ {
+ Content.Paths.push_back(AssetPath);
+ const std::filesystem::path AssetFilePath = (m_Path / AssetPath).make_preferred();
+ Content.RawSizes.push_back(FileSizeFromPath(AssetFilePath));
#if ZEN_PLATFORM_WINDOWS
- Content.Attributes.push_back(GetFileAttributesFromPath(AssetFilePath));
+ Content.Attributes.push_back(GetFileAttributesFromPath(AssetFilePath));
#endif // ZEN_PLATFORM_WINDOWS
#if ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX
- Content.Attributes.push_back(GetFileMode(AssetFilePath));
+ Content.Attributes.push_back(GetFileMode(AssetFilePath));
#endif // ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX
- m_LocalFolderScanStats.AcceptedFileByteCount += Content.RawSizes.back();
- m_LocalFolderScanStats.AcceptedFileCount++;
- }
- if (m_ManifestPath.is_relative())
- {
- Content.Paths.push_back(m_ManifestPath);
- const std::filesystem::path ManifestFilePath = (m_Path / m_ManifestPath).make_preferred();
- Content.RawSizes.push_back(FileSizeFromPath(ManifestFilePath));
+ m_LocalFolderScanStats.AcceptedFileByteCount += Content.RawSizes.back();
+ m_LocalFolderScanStats.AcceptedFileCount++;
+ }
+ if (m_ManifestPath.is_relative())
+ {
+ Content.Paths.push_back(m_ManifestPath);
+ const std::filesystem::path ManifestFilePath = (m_Path / m_ManifestPath).make_preferred();
+ Content.RawSizes.push_back(FileSizeFromPath(ManifestFilePath));
#if ZEN_PLATFORM_WINDOWS
- Content.Attributes.push_back(GetFileAttributesFromPath(ManifestFilePath));
+ Content.Attributes.push_back(GetFileAttributesFromPath(ManifestFilePath));
#endif // ZEN_PLATFORM_WINDOWS
#if ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX
- Content.Attributes.push_back(GetFileMode(ManifestFilePath));
+ Content.Attributes.push_back(GetFileMode(ManifestFilePath));
#endif // ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX
- m_LocalFolderScanStats.AcceptedFileByteCount += Content.RawSizes.back();
- m_LocalFolderScanStats.AcceptedFileCount++;
+ m_LocalFolderScanStats.AcceptedFileByteCount += Content.RawSizes.back();
+ m_LocalFolderScanStats.AcceptedFileCount++;
+ }
+ m_LocalFolderScanStats.FoundFileByteCount.store(m_LocalFolderScanStats.AcceptedFileByteCount);
+ m_LocalFolderScanStats.FoundFileCount.store(m_LocalFolderScanStats.AcceptedFileCount);
+ m_LocalFolderScanStats.ElapsedWallTimeUS = ManifestParseTimer.GetElapsedTimeUs();
}
- m_LocalFolderScanStats.FoundFileByteCount.store(m_LocalFolderScanStats.AcceptedFileByteCount);
- m_LocalFolderScanStats.FoundFileCount.store(m_LocalFolderScanStats.AcceptedFileCount);
- m_LocalFolderScanStats.ElapsedWallTimeUS = ManifestParseTimer.GetElapsedTimeUs();
- }
- std::unique_ptr<ChunkingController> ChunkController = CreateStandardChunkingController(StandardChunkingControllerSettings{});
- {
- CbObjectWriter ChunkParametersWriter;
- ChunkParametersWriter.AddString("name"sv, ChunkController->GetName());
- ChunkParametersWriter.AddObject("parameters"sv, ChunkController->GetParameters());
- ChunkerParameters = ChunkParametersWriter.Save();
- }
+ std::unique_ptr<ChunkingController> ChunkController = CreateStandardChunkingController(StandardChunkingControllerSettings{});
+ {
+ CbObjectWriter ChunkParametersWriter;
+ ChunkParametersWriter.AddString("name"sv, ChunkController->GetName());
+ ChunkParametersWriter.AddObject("parameters"sv, ChunkController->GetParameters());
+ ChunkerParameters = ChunkParametersWriter.Save();
+ }
- TotalRawSize = std::accumulate(Content.RawSizes.begin(), Content.RawSizes.end(), std::uint64_t(0));
+ TotalRawSize = std::accumulate(Content.RawSizes.begin(), Content.RawSizes.end(), std::uint64_t(0));
- {
- std::unique_ptr<BuildOpLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Scan Folder"));
- BuildOpLogOutput::ProgressBar& Progress(*ProgressBarPtr);
+ {
+ std::unique_ptr<BuildOpLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Scan Folder"));
+ BuildOpLogOutput::ProgressBar& Progress(*ProgressBarPtr);
- FilteredRate FilteredBytesHashed;
- FilteredBytesHashed.Start();
- LocalContent = ChunkFolderContent(
- m_ChunkingStats,
- m_IOWorkerPool,
- m_Path,
- Content,
- *ChunkController,
- m_LogOutput.GetProgressUpdateDelayMS(),
- [&](bool IsAborted, bool IsPaused, std::ptrdiff_t) {
- FilteredBytesHashed.Update(m_ChunkingStats.BytesHashed.load());
- std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found",
- m_ChunkingStats.FilesProcessed.load(),
- Content.Paths.size(),
- NiceBytes(m_ChunkingStats.BytesHashed.load()),
- NiceBytes(TotalRawSize),
- NiceNum(FilteredBytesHashed.GetCurrent()),
- m_ChunkingStats.UniqueChunksFound.load(),
- NiceBytes(m_ChunkingStats.UniqueBytesFound.load()));
- Progress.UpdateState({.Task = "Scanning files ",
- .Details = Details,
- .TotalCount = TotalRawSize,
- .RemainingCount = TotalRawSize - m_ChunkingStats.BytesHashed.load(),
- .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
- false);
- },
- m_AbortFlag,
- m_PauseFlag);
- FilteredBytesHashed.Stop();
- Progress.Finish();
- if (m_AbortFlag)
+ FilteredRate FilteredBytesHashed;
+ FilteredBytesHashed.Start();
+ LocalContent = ChunkFolderContent(
+ m_ChunkingStats,
+ m_IOWorkerPool,
+ m_Path,
+ Content,
+ *ChunkController,
+ m_LogOutput.GetProgressUpdateDelayMS(),
+ [&](bool IsAborted, bool IsPaused, std::ptrdiff_t) {
+ FilteredBytesHashed.Update(m_ChunkingStats.BytesHashed.load());
+ std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found",
+ m_ChunkingStats.FilesProcessed.load(),
+ Content.Paths.size(),
+ NiceBytes(m_ChunkingStats.BytesHashed.load()),
+ NiceBytes(TotalRawSize),
+ NiceNum(FilteredBytesHashed.GetCurrent()),
+ m_ChunkingStats.UniqueChunksFound.load(),
+ NiceBytes(m_ChunkingStats.UniqueBytesFound.load()));
+ Progress.UpdateState({.Task = "Scanning files ",
+ .Details = Details,
+ .TotalCount = TotalRawSize,
+ .RemainingCount = TotalRawSize - m_ChunkingStats.BytesHashed.load(),
+ .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ },
+ m_AbortFlag,
+ m_PauseFlag);
+ FilteredBytesHashed.Stop();
+ Progress.Finish();
+ if (m_AbortFlag)
+ {
+ return;
+ }
+ }
+
+ if (!m_Options.IsQuiet)
{
- return;
+ LOG_OUTPUT(m_LogOutput,
+ "Found {} ({}) files divided into {} ({}) unique chunks in '{}' in {}. Average hash rate {}B/sec",
+ LocalContent.Paths.size(),
+ NiceBytes(TotalRawSize),
+ m_ChunkingStats.UniqueChunksFound.load(),
+ NiceBytes(m_ChunkingStats.UniqueBytesFound.load()),
+ m_Path,
+ NiceTimeSpanMs(ScanTimer.GetElapsedTimeMs()),
+ NiceNum(GetBytesPerSecond(m_ChunkingStats.ElapsedWallTimeUS, m_ChunkingStats.BytesHashed)));
}
}
+ const ChunkedContentLookup LocalLookup = BuildChunkedContentLookup(LocalContent);
+
+ std::vector<size_t> ReuseBlockIndexes;
+ std::vector<uint32_t> NewBlockChunkIndexes;
+
+ PrepareBuildResult PrepBuildResult = PrepBuildResultFuture.get();
+
if (!m_Options.IsQuiet)
{
LOG_OUTPUT(m_LogOutput,
- "Found {} ({}) files divided into {} ({}) unique chunks in '{}' in {}. Average hash rate {}B/sec",
- LocalContent.Paths.size(),
- NiceBytes(TotalRawSize),
- m_ChunkingStats.UniqueChunksFound.load(),
- NiceBytes(m_ChunkingStats.UniqueBytesFound.load()),
- m_Path,
- NiceTimeSpanMs(ScanTimer.GetElapsedTimeMs()),
- NiceNum(GetBytesPerSecond(m_ChunkingStats.ElapsedWallTimeUS, m_ChunkingStats.BytesHashed)));
+ "Build prepare took {}. {} took {}, payload size {}{}",
+ NiceTimeSpanMs(PrepBuildResult.ElapsedTimeMs),
+ m_CreateBuild ? "PutBuild" : "GetBuild",
+ NiceTimeSpanMs(PrepBuildResult.PrepareBuildTimeMs),
+ NiceBytes(PrepBuildResult.PayloadSize),
+ m_Options.IgnoreExistingBlocks ? ""
+ : fmt::format(". Found {} blocks in {}",
+ PrepBuildResult.KnownBlocks.size(),
+ NiceTimeSpanMs(PrepBuildResult.FindBlocksTimeMs)));
}
- }
- const ChunkedContentLookup LocalLookup = BuildChunkedContentLookup(LocalContent);
+ m_LogOutput.SetLogOperationProgress(TaskSteps::CalculateDelta, TaskSteps::StepCount);
- std::vector<size_t> ReuseBlockIndexes;
- std::vector<uint32_t> NewBlockChunkIndexes;
+ const std::uint64_t LargeAttachmentSize =
+ m_Options.AllowMultiparts ? PrepBuildResult.PreferredMultipartChunkSize * 4u : (std::uint64_t)-1;
- PrepareBuildResult PrepBuildResult = PrepBuildResultFuture.get();
+ Stopwatch BlockArrangeTimer;
- if (!m_Options.IsQuiet)
- {
- LOG_OUTPUT(m_LogOutput,
- "Build prepare took {}. {} took {}, payload size {}{}",
- NiceTimeSpanMs(PrepBuildResult.ElapsedTimeMs),
- m_CreateBuild ? "PutBuild" : "GetBuild",
- NiceTimeSpanMs(PrepBuildResult.PrepareBuildTimeMs),
- NiceBytes(PrepBuildResult.PayloadSize),
- m_Options.IgnoreExistingBlocks ? ""
- : fmt::format(". Found {} blocks in {}",
- PrepBuildResult.KnownBlocks.size(),
- NiceTimeSpanMs(PrepBuildResult.FindBlocksTimeMs)));
- }
-
- m_LogOutput.SetLogOperationProgress(TaskSteps::CalculateDelta, TaskSteps::StepCount);
-
- const std::uint64_t LargeAttachmentSize =
- m_Options.AllowMultiparts ? PrepBuildResult.PreferredMultipartChunkSize * 4u : (std::uint64_t)-1;
-
- Stopwatch BlockArrangeTimer;
-
- std::vector<std::uint32_t> LooseChunkIndexes;
- {
- bool EnableBlocks = true;
- std::vector<std::uint32_t> BlockChunkIndexes;
- for (uint32_t ChunkIndex = 0; ChunkIndex < LocalContent.ChunkedContent.ChunkHashes.size(); ChunkIndex++)
+ std::vector<std::uint32_t> LooseChunkIndexes;
{
- const uint64_t ChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[ChunkIndex];
- if (!EnableBlocks || ChunkRawSize == 0 || ChunkRawSize > m_Options.BlockParameters.MaxChunkEmbedSize)
+ bool EnableBlocks = true;
+ std::vector<std::uint32_t> BlockChunkIndexes;
+ for (uint32_t ChunkIndex = 0; ChunkIndex < LocalContent.ChunkedContent.ChunkHashes.size(); ChunkIndex++)
{
- LooseChunkIndexes.push_back(ChunkIndex);
- m_LooseChunksStats.ChunkByteCount += ChunkRawSize;
- }
- else
- {
- BlockChunkIndexes.push_back(ChunkIndex);
- m_FindBlocksStats.PotentialChunkByteCount += ChunkRawSize;
+ const uint64_t ChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[ChunkIndex];
+ if (!EnableBlocks || ChunkRawSize == 0 || ChunkRawSize > m_Options.BlockParameters.MaxChunkEmbedSize)
+ {
+ LooseChunkIndexes.push_back(ChunkIndex);
+ m_LooseChunksStats.ChunkByteCount += ChunkRawSize;
+ }
+ else
+ {
+ BlockChunkIndexes.push_back(ChunkIndex);
+ m_FindBlocksStats.PotentialChunkByteCount += ChunkRawSize;
+ }
}
- }
- m_FindBlocksStats.PotentialChunkCount = BlockChunkIndexes.size();
- m_LooseChunksStats.ChunkCount = LooseChunkIndexes.size();
+ m_FindBlocksStats.PotentialChunkCount = BlockChunkIndexes.size();
+ m_LooseChunksStats.ChunkCount = LooseChunkIndexes.size();
- if (m_Options.IgnoreExistingBlocks)
- {
- if (!m_Options.IsQuiet)
+ if (m_Options.IgnoreExistingBlocks)
{
- LOG_OUTPUT(m_LogOutput, "Ignoring any existing blocks in store");
+ if (!m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput, "Ignoring any existing blocks in store");
+ }
+ NewBlockChunkIndexes = std::move(BlockChunkIndexes);
}
- NewBlockChunkIndexes = std::move(BlockChunkIndexes);
- }
- else
- {
- ReuseBlockIndexes = FindReuseBlocks(PrepBuildResult.KnownBlocks,
- LocalContent.ChunkedContent.ChunkHashes,
- BlockChunkIndexes,
- NewBlockChunkIndexes);
- m_FindBlocksStats.AcceptedBlockCount = ReuseBlockIndexes.size();
-
- for (const ChunkBlockDescription& Description : PrepBuildResult.KnownBlocks)
+ else
{
- for (uint32_t ChunkRawLength : Description.ChunkRawLengths)
+ ReuseBlockIndexes = FindReuseBlocks(PrepBuildResult.KnownBlocks,
+ LocalContent.ChunkedContent.ChunkHashes,
+ BlockChunkIndexes,
+ NewBlockChunkIndexes);
+ m_FindBlocksStats.AcceptedBlockCount = ReuseBlockIndexes.size();
+
+ for (const ChunkBlockDescription& Description : PrepBuildResult.KnownBlocks)
{
- m_FindBlocksStats.FoundBlockByteCount += ChunkRawLength;
+ for (uint32_t ChunkRawLength : Description.ChunkRawLengths)
+ {
+ m_FindBlocksStats.FoundBlockByteCount += ChunkRawLength;
+ }
+ m_FindBlocksStats.FoundBlockChunkCount += Description.ChunkRawHashes.size();
}
- m_FindBlocksStats.FoundBlockChunkCount += Description.ChunkRawHashes.size();
}
}
- }
- std::vector<std::vector<uint32_t>> NewBlockChunks;
- ArrangeChunksIntoBlocks(LocalContent, LocalLookup, NewBlockChunkIndexes, NewBlockChunks);
+ std::vector<std::vector<uint32_t>> NewBlockChunks;
+ ArrangeChunksIntoBlocks(LocalContent, LocalLookup, NewBlockChunkIndexes, NewBlockChunks);
- m_FindBlocksStats.NewBlocksCount = NewBlockChunks.size();
- for (uint32_t ChunkIndex : NewBlockChunkIndexes)
- {
- m_FindBlocksStats.NewBlocksChunkByteCount += LocalContent.ChunkedContent.ChunkRawSizes[ChunkIndex];
- }
- m_FindBlocksStats.NewBlocksChunkCount = NewBlockChunkIndexes.size();
-
- const double AcceptedByteCountPercent =
- m_FindBlocksStats.PotentialChunkByteCount > 0
- ? (100.0 * m_FindBlocksStats.AcceptedRawByteCount / m_FindBlocksStats.PotentialChunkByteCount)
- : 0.0;
-
- const double AcceptedReduntantByteCountPercent =
- m_FindBlocksStats.AcceptedByteCount > 0 ? (100.0 * m_FindBlocksStats.AcceptedReduntantByteCount) /
- (m_FindBlocksStats.AcceptedByteCount + m_FindBlocksStats.AcceptedReduntantByteCount)
- : 0.0;
- if (!m_Options.IsQuiet)
- {
- LOG_OUTPUT(m_LogOutput,
- "Found {} chunks in {} ({}) blocks eligible for reuse in {}\n"
- " Reusing {} ({}) matching chunks in {} blocks ({:.1f}%)\n"
- " Accepting {} ({}) redundant chunks ({:.1f}%)\n"
- " Rejected {} ({}) chunks in {} blocks\n"
- " Arranged {} ({}) chunks in {} new blocks\n"
- " Keeping {} ({}) chunks as loose chunks\n"
- " Discovery completed in {}",
- m_FindBlocksStats.FoundBlockChunkCount,
- m_FindBlocksStats.FoundBlockCount,
- NiceBytes(m_FindBlocksStats.FoundBlockByteCount),
- NiceTimeSpanMs(m_FindBlocksStats.FindBlockTimeMS),
-
- m_FindBlocksStats.AcceptedChunkCount,
- NiceBytes(m_FindBlocksStats.AcceptedRawByteCount),
- m_FindBlocksStats.AcceptedBlockCount,
- AcceptedByteCountPercent,
-
- m_FindBlocksStats.AcceptedReduntantChunkCount,
- NiceBytes(m_FindBlocksStats.AcceptedReduntantByteCount),
- AcceptedReduntantByteCountPercent,
-
- m_FindBlocksStats.RejectedChunkCount,
- NiceBytes(m_FindBlocksStats.RejectedByteCount),
- m_FindBlocksStats.RejectedBlockCount,
-
- m_FindBlocksStats.NewBlocksChunkCount,
- NiceBytes(m_FindBlocksStats.NewBlocksChunkByteCount),
- m_FindBlocksStats.NewBlocksCount,
-
- m_LooseChunksStats.ChunkCount,
- NiceBytes(m_LooseChunksStats.ChunkByteCount),
-
- NiceTimeSpanMs(BlockArrangeTimer.GetElapsedTimeMs()));
- }
+ m_FindBlocksStats.NewBlocksCount = NewBlockChunks.size();
+ for (uint32_t ChunkIndex : NewBlockChunkIndexes)
+ {
+ m_FindBlocksStats.NewBlocksChunkByteCount += LocalContent.ChunkedContent.ChunkRawSizes[ChunkIndex];
+ }
+ m_FindBlocksStats.NewBlocksChunkCount = NewBlockChunkIndexes.size();
- m_LogOutput.SetLogOperationProgress(TaskSteps::GenerateBlocks, TaskSteps::StepCount);
- GeneratedBlocks NewBlocks;
+ const double AcceptedByteCountPercent =
+ m_FindBlocksStats.PotentialChunkByteCount > 0
+ ? (100.0 * m_FindBlocksStats.AcceptedRawByteCount / m_FindBlocksStats.PotentialChunkByteCount)
+ : 0.0;
- if (!NewBlockChunks.empty())
- {
- Stopwatch GenerateBuildBlocksTimer;
- auto __ = MakeGuard([&]() {
- uint64_t BlockGenerateTimeUs = GenerateBuildBlocksTimer.GetElapsedTimeUs();
- if (!m_Options.IsQuiet)
- {
- LOG_OUTPUT(m_LogOutput,
- "Generated {} ({}) and uploaded {} ({}) blocks in {}. Generate speed: {}B/sec. Transfer speed {}bits/sec.",
- m_GenerateBlocksStats.GeneratedBlockCount.load(),
- NiceBytes(m_GenerateBlocksStats.GeneratedBlockByteCount),
- m_UploadStats.BlockCount.load(),
- NiceBytes(m_UploadStats.BlocksBytes.load()),
- NiceTimeSpanMs(BlockGenerateTimeUs / 1000),
- NiceNum(GetBytesPerSecond(m_GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS,
- m_GenerateBlocksStats.GeneratedBlockByteCount)),
- NiceNum(GetBytesPerSecond(m_UploadStats.ElapsedWallTimeUS, m_UploadStats.BlocksBytes * 8)));
- }
- });
- GenerateBuildBlocks(LocalContent, LocalLookup, NewBlockChunks, NewBlocks);
- }
+ const double AcceptedReduntantByteCountPercent =
+ m_FindBlocksStats.AcceptedByteCount > 0
+ ? (100.0 * m_FindBlocksStats.AcceptedReduntantByteCount) /
+ (m_FindBlocksStats.AcceptedByteCount + m_FindBlocksStats.AcceptedReduntantByteCount)
+ : 0.0;
+ if (!m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Found {} chunks in {} ({}) blocks eligible for reuse in {}\n"
+ " Reusing {} ({}) matching chunks in {} blocks ({:.1f}%)\n"
+ " Accepting {} ({}) redundant chunks ({:.1f}%)\n"
+ " Rejected {} ({}) chunks in {} blocks\n"
+ " Arranged {} ({}) chunks in {} new blocks\n"
+ " Keeping {} ({}) chunks as loose chunks\n"
+ " Discovery completed in {}",
+ m_FindBlocksStats.FoundBlockChunkCount,
+ m_FindBlocksStats.FoundBlockCount,
+ NiceBytes(m_FindBlocksStats.FoundBlockByteCount),
+ NiceTimeSpanMs(m_FindBlocksStats.FindBlockTimeMS),
- m_LogOutput.SetLogOperationProgress(TaskSteps::BuildPartManifest, TaskSteps::StepCount);
+ m_FindBlocksStats.AcceptedChunkCount,
+ NiceBytes(m_FindBlocksStats.AcceptedRawByteCount),
+ m_FindBlocksStats.AcceptedBlockCount,
+ AcceptedByteCountPercent,
- CbObject PartManifest;
- {
- CbObjectWriter PartManifestWriter;
- Stopwatch ManifestGenerationTimer;
- auto __ = MakeGuard([&]() {
- if (!m_Options.IsQuiet)
- {
- LOG_OUTPUT(m_LogOutput,
- "Generated build part manifest in {} ({})",
- NiceTimeSpanMs(ManifestGenerationTimer.GetElapsedTimeMs()),
- NiceBytes(PartManifestWriter.GetSaveSize()));
- }
- });
- PartManifestWriter.AddObject("chunker"sv, ChunkerParameters);
+ m_FindBlocksStats.AcceptedReduntantChunkCount,
+ NiceBytes(m_FindBlocksStats.AcceptedReduntantByteCount),
+ AcceptedReduntantByteCountPercent,
- std::vector<IoHash> AllChunkBlockHashes;
- std::vector<ChunkBlockDescription> AllChunkBlockDescriptions;
- AllChunkBlockHashes.reserve(ReuseBlockIndexes.size() + NewBlocks.BlockDescriptions.size());
- AllChunkBlockDescriptions.reserve(ReuseBlockIndexes.size() + NewBlocks.BlockDescriptions.size());
- for (size_t ReuseBlockIndex : ReuseBlockIndexes)
- {
- AllChunkBlockDescriptions.push_back(PrepBuildResult.KnownBlocks[ReuseBlockIndex]);
- AllChunkBlockHashes.push_back(PrepBuildResult.KnownBlocks[ReuseBlockIndex].BlockHash);
+ m_FindBlocksStats.RejectedChunkCount,
+ NiceBytes(m_FindBlocksStats.RejectedByteCount),
+ m_FindBlocksStats.RejectedBlockCount,
+
+ m_FindBlocksStats.NewBlocksChunkCount,
+ NiceBytes(m_FindBlocksStats.NewBlocksChunkByteCount),
+ m_FindBlocksStats.NewBlocksCount,
+
+ m_LooseChunksStats.ChunkCount,
+ NiceBytes(m_LooseChunksStats.ChunkByteCount),
+
+ NiceTimeSpanMs(BlockArrangeTimer.GetElapsedTimeMs()));
}
- AllChunkBlockDescriptions.insert(AllChunkBlockDescriptions.end(),
- NewBlocks.BlockDescriptions.begin(),
- NewBlocks.BlockDescriptions.end());
- for (const ChunkBlockDescription& BlockDescription : NewBlocks.BlockDescriptions)
+
+ m_LogOutput.SetLogOperationProgress(TaskSteps::GenerateBlocks, TaskSteps::StepCount);
+ GeneratedBlocks NewBlocks;
+
+ if (!NewBlockChunks.empty())
{
- AllChunkBlockHashes.push_back(BlockDescription.BlockHash);
- }
+ Stopwatch GenerateBuildBlocksTimer;
+ auto __ = MakeGuard([&]() {
+ uint64_t BlockGenerateTimeUs = GenerateBuildBlocksTimer.GetElapsedTimeUs();
+ if (!m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Generated {} ({}) and uploaded {} ({}) blocks in {}. Generate speed: {}B/sec. Transfer speed {}bits/sec.",
+ m_GenerateBlocksStats.GeneratedBlockCount.load(),
+ NiceBytes(m_GenerateBlocksStats.GeneratedBlockByteCount),
+ m_UploadStats.BlockCount.load(),
+ NiceBytes(m_UploadStats.BlocksBytes.load()),
+ NiceTimeSpanMs(BlockGenerateTimeUs / 1000),
+ NiceNum(GetBytesPerSecond(m_GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS,
+ m_GenerateBlocksStats.GeneratedBlockByteCount)),
+ NiceNum(GetBytesPerSecond(m_UploadStats.ElapsedWallTimeUS, m_UploadStats.BlocksBytes * 8)));
+ }
+ });
+ GenerateBuildBlocks(LocalContent, LocalLookup, NewBlockChunks, NewBlocks);
+ }
+
+ m_LogOutput.SetLogOperationProgress(TaskSteps::BuildPartManifest, TaskSteps::StepCount);
+
+ CbObject PartManifest;
+ {
+ CbObjectWriter PartManifestWriter;
+ Stopwatch ManifestGenerationTimer;
+ auto __ = MakeGuard([&]() {
+ if (!m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Generated build part manifest in {} ({})",
+ NiceTimeSpanMs(ManifestGenerationTimer.GetElapsedTimeMs()),
+ NiceBytes(PartManifestWriter.GetSaveSize()));
+ }
+ });
+ PartManifestWriter.AddObject("chunker"sv, ChunkerParameters);
+
+ std::vector<IoHash> AllChunkBlockHashes;
+ std::vector<ChunkBlockDescription> AllChunkBlockDescriptions;
+ AllChunkBlockHashes.reserve(ReuseBlockIndexes.size() + NewBlocks.BlockDescriptions.size());
+ AllChunkBlockDescriptions.reserve(ReuseBlockIndexes.size() + NewBlocks.BlockDescriptions.size());
+ for (size_t ReuseBlockIndex : ReuseBlockIndexes)
+ {
+ AllChunkBlockDescriptions.push_back(PrepBuildResult.KnownBlocks[ReuseBlockIndex]);
+ AllChunkBlockHashes.push_back(PrepBuildResult.KnownBlocks[ReuseBlockIndex].BlockHash);
+ }
+ AllChunkBlockDescriptions.insert(AllChunkBlockDescriptions.end(),
+ NewBlocks.BlockDescriptions.begin(),
+ NewBlocks.BlockDescriptions.end());
+ for (const ChunkBlockDescription& BlockDescription : NewBlocks.BlockDescriptions)
+ {
+ AllChunkBlockHashes.push_back(BlockDescription.BlockHash);
+ }
#if EXTRA_VERIFY
- tsl::robin_map<IoHash, size_t, IoHash::Hasher> ChunkHashToAbsoluteChunkIndex;
- std::vector<IoHash> AbsoluteChunkHashes;
- AbsoluteChunkHashes.reserve(LocalContent.ChunkedContent.ChunkHashes.size());
- for (uint32_t ChunkIndex : LooseChunkIndexes)
- {
- ChunkHashToAbsoluteChunkIndex.insert({LocalContent.ChunkedContent.ChunkHashes[ChunkIndex], AbsoluteChunkHashes.size()});
- AbsoluteChunkHashes.push_back(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]);
- }
- for (const ChunkBlockDescription& Block : AllChunkBlockDescriptions)
- {
- for (const IoHash& ChunkHash : Block.ChunkRawHashes)
+ tsl::robin_map<IoHash, size_t, IoHash::Hasher> ChunkHashToAbsoluteChunkIndex;
+ std::vector<IoHash> AbsoluteChunkHashes;
+ AbsoluteChunkHashes.reserve(LocalContent.ChunkedContent.ChunkHashes.size());
+ for (uint32_t ChunkIndex : LooseChunkIndexes)
{
- ChunkHashToAbsoluteChunkIndex.insert({ChunkHash, AbsoluteChunkHashes.size()});
- AbsoluteChunkHashes.push_back(ChunkHash);
+ ChunkHashToAbsoluteChunkIndex.insert({LocalContent.ChunkedContent.ChunkHashes[ChunkIndex], AbsoluteChunkHashes.size()});
+ AbsoluteChunkHashes.push_back(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]);
+ }
+ for (const ChunkBlockDescription& Block : AllChunkBlockDescriptions)
+ {
+ for (const IoHash& ChunkHash : Block.ChunkRawHashes)
+ {
+ ChunkHashToAbsoluteChunkIndex.insert({ChunkHash, AbsoluteChunkHashes.size()});
+ AbsoluteChunkHashes.push_back(ChunkHash);
+ }
+ }
+ for (const IoHash& ChunkHash : LocalContent.ChunkedContent.ChunkHashes)
+ {
+ ZEN_ASSERT(AbsoluteChunkHashes[ChunkHashToAbsoluteChunkIndex.at(ChunkHash)] == ChunkHash);
+ ZEN_ASSERT(LocalContent.ChunkedContent.ChunkHashes[LocalLookup.ChunkHashToChunkIndex.at(ChunkHash)] == ChunkHash);
+ }
+ for (const uint32_t ChunkIndex : LocalContent.ChunkedContent.ChunkOrders)
+ {
+ ZEN_ASSERT(AbsoluteChunkHashes[ChunkHashToAbsoluteChunkIndex.at(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex])] ==
+ LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]);
+ ZEN_ASSERT(LocalLookup.ChunkHashToChunkIndex.at(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]) == ChunkIndex);
}
- }
- for (const IoHash& ChunkHash : LocalContent.ChunkedContent.ChunkHashes)
- {
- ZEN_ASSERT(AbsoluteChunkHashes[ChunkHashToAbsoluteChunkIndex.at(ChunkHash)] == ChunkHash);
- ZEN_ASSERT(LocalContent.ChunkedContent.ChunkHashes[LocalLookup.ChunkHashToChunkIndex.at(ChunkHash)] == ChunkHash);
- }
- for (const uint32_t ChunkIndex : LocalContent.ChunkedContent.ChunkOrders)
- {
- ZEN_ASSERT(AbsoluteChunkHashes[ChunkHashToAbsoluteChunkIndex.at(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex])] ==
- LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]);
- ZEN_ASSERT(LocalLookup.ChunkHashToChunkIndex.at(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]) == ChunkIndex);
- }
#endif // EXTRA_VERIFY
- std::vector<uint32_t> AbsoluteChunkOrders = CalculateAbsoluteChunkOrders(LocalContent.ChunkedContent.ChunkHashes,
- LocalContent.ChunkedContent.ChunkOrders,
- LocalLookup.ChunkHashToChunkIndex,
- LooseChunkIndexes,
- AllChunkBlockDescriptions);
+ std::vector<uint32_t> AbsoluteChunkOrders = CalculateAbsoluteChunkOrders(LocalContent.ChunkedContent.ChunkHashes,
+ LocalContent.ChunkedContent.ChunkOrders,
+ LocalLookup.ChunkHashToChunkIndex,
+ LooseChunkIndexes,
+ AllChunkBlockDescriptions);
#if EXTRA_VERIFY
- for (uint32_t ChunkOrderIndex = 0; ChunkOrderIndex < LocalContent.ChunkedContent.ChunkOrders.size(); ChunkOrderIndex++)
- {
- uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[ChunkOrderIndex];
- uint32_t AbsoluteChunkIndex = AbsoluteChunkOrders[ChunkOrderIndex];
- const IoHash& LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex];
- const IoHash& AbsoluteChunkHash = AbsoluteChunkHashes[AbsoluteChunkIndex];
- ZEN_ASSERT(LocalChunkHash == AbsoluteChunkHash);
- }
+ for (uint32_t ChunkOrderIndex = 0; ChunkOrderIndex < LocalContent.ChunkedContent.ChunkOrders.size(); ChunkOrderIndex++)
+ {
+ uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[ChunkOrderIndex];
+ uint32_t AbsoluteChunkIndex = AbsoluteChunkOrders[ChunkOrderIndex];
+ const IoHash& LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex];
+ const IoHash& AbsoluteChunkHash = AbsoluteChunkHashes[AbsoluteChunkIndex];
+ ZEN_ASSERT(LocalChunkHash == AbsoluteChunkHash);
+ }
#endif // EXTRA_VERIFY
- WriteBuildContentToCompactBinary(PartManifestWriter,
- LocalContent.Platform,
- LocalContent.Paths,
- LocalContent.RawHashes,
- LocalContent.RawSizes,
- LocalContent.Attributes,
- LocalContent.ChunkedContent.SequenceRawHashes,
- LocalContent.ChunkedContent.ChunkCounts,
- LocalContent.ChunkedContent.ChunkHashes,
- LocalContent.ChunkedContent.ChunkRawSizes,
- AbsoluteChunkOrders,
- LooseChunkIndexes,
- AllChunkBlockHashes);
+ WriteBuildContentToCompactBinary(PartManifestWriter,
+ LocalContent.Platform,
+ LocalContent.Paths,
+ LocalContent.RawHashes,
+ LocalContent.RawSizes,
+ LocalContent.Attributes,
+ LocalContent.ChunkedContent.SequenceRawHashes,
+ LocalContent.ChunkedContent.ChunkCounts,
+ LocalContent.ChunkedContent.ChunkHashes,
+ LocalContent.ChunkedContent.ChunkRawSizes,
+ AbsoluteChunkOrders,
+ LooseChunkIndexes,
+ AllChunkBlockHashes);
#if EXTRA_VERIFY
- {
- ChunkedFolderContent VerifyFolderContent;
-
- std::vector<uint32_t> OutAbsoluteChunkOrders;
- std::vector<IoHash> OutLooseChunkHashes;
- std::vector<uint64_t> OutLooseChunkRawSizes;
- std::vector<IoHash> OutBlockRawHashes;
- ReadBuildContentFromCompactBinary(PartManifestWriter.Save(),
- VerifyFolderContent.Platform,
- VerifyFolderContent.Paths,
- VerifyFolderContent.RawHashes,
- VerifyFolderContent.RawSizes,
- VerifyFolderContent.Attributes,
- VerifyFolderContent.ChunkedContent.SequenceRawHashes,
- VerifyFolderContent.ChunkedContent.ChunkCounts,
- OutAbsoluteChunkOrders,
- OutLooseChunkHashes,
- OutLooseChunkRawSizes,
- OutBlockRawHashes);
- ZEN_ASSERT(OutBlockRawHashes == AllChunkBlockHashes);
-
- for (uint32_t OrderIndex = 0; OrderIndex < OutAbsoluteChunkOrders.size(); OrderIndex++)
{
- uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[OrderIndex];
- const IoHash LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex];
+ ChunkedFolderContent VerifyFolderContent;
+
+ std::vector<uint32_t> OutAbsoluteChunkOrders;
+ std::vector<IoHash> OutLooseChunkHashes;
+ std::vector<uint64_t> OutLooseChunkRawSizes;
+ std::vector<IoHash> OutBlockRawHashes;
+ ReadBuildContentFromCompactBinary(PartManifestWriter.Save(),
+ VerifyFolderContent.Platform,
+ VerifyFolderContent.Paths,
+ VerifyFolderContent.RawHashes,
+ VerifyFolderContent.RawSizes,
+ VerifyFolderContent.Attributes,
+ VerifyFolderContent.ChunkedContent.SequenceRawHashes,
+ VerifyFolderContent.ChunkedContent.ChunkCounts,
+ OutAbsoluteChunkOrders,
+ OutLooseChunkHashes,
+ OutLooseChunkRawSizes,
+ OutBlockRawHashes);
+ ZEN_ASSERT(OutBlockRawHashes == AllChunkBlockHashes);
+
+ for (uint32_t OrderIndex = 0; OrderIndex < OutAbsoluteChunkOrders.size(); OrderIndex++)
+ {
+ uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[OrderIndex];
+ const IoHash LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex];
- uint32_t VerifyChunkIndex = OutAbsoluteChunkOrders[OrderIndex];
- const IoHash VerifyChunkHash = AbsoluteChunkHashes[VerifyChunkIndex];
+ uint32_t VerifyChunkIndex = OutAbsoluteChunkOrders[OrderIndex];
+ const IoHash VerifyChunkHash = AbsoluteChunkHashes[VerifyChunkIndex];
- ZEN_ASSERT(LocalChunkHash == VerifyChunkHash);
- }
+ ZEN_ASSERT(LocalChunkHash == VerifyChunkHash);
+ }
- CalculateLocalChunkOrders(OutAbsoluteChunkOrders,
- OutLooseChunkHashes,
- OutLooseChunkRawSizes,
- AllChunkBlockDescriptions,
- VerifyFolderContent.ChunkedContent.ChunkHashes,
- VerifyFolderContent.ChunkedContent.ChunkRawSizes,
- VerifyFolderContent.ChunkedContent.ChunkOrders);
-
- ZEN_ASSERT(LocalContent.Paths == VerifyFolderContent.Paths);
- ZEN_ASSERT(LocalContent.RawHashes == VerifyFolderContent.RawHashes);
- ZEN_ASSERT(LocalContent.RawSizes == VerifyFolderContent.RawSizes);
- ZEN_ASSERT(LocalContent.Attributes == VerifyFolderContent.Attributes);
- ZEN_ASSERT(LocalContent.ChunkedContent.SequenceRawHashes == VerifyFolderContent.ChunkedContent.SequenceRawHashes);
- ZEN_ASSERT(LocalContent.ChunkedContent.ChunkCounts == VerifyFolderContent.ChunkedContent.ChunkCounts);
-
- for (uint32_t OrderIndex = 0; OrderIndex < LocalContent.ChunkedContent.ChunkOrders.size(); OrderIndex++)
- {
- uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[OrderIndex];
- const IoHash LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex];
- uint64_t LocalChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[LocalChunkIndex];
+ CalculateLocalChunkOrders(OutAbsoluteChunkOrders,
+ OutLooseChunkHashes,
+ OutLooseChunkRawSizes,
+ AllChunkBlockDescriptions,
+ VerifyFolderContent.ChunkedContent.ChunkHashes,
+ VerifyFolderContent.ChunkedContent.ChunkRawSizes,
+ VerifyFolderContent.ChunkedContent.ChunkOrders);
+
+ ZEN_ASSERT(LocalContent.Paths == VerifyFolderContent.Paths);
+ ZEN_ASSERT(LocalContent.RawHashes == VerifyFolderContent.RawHashes);
+ ZEN_ASSERT(LocalContent.RawSizes == VerifyFolderContent.RawSizes);
+ ZEN_ASSERT(LocalContent.Attributes == VerifyFolderContent.Attributes);
+ ZEN_ASSERT(LocalContent.ChunkedContent.SequenceRawHashes == VerifyFolderContent.ChunkedContent.SequenceRawHashes);
+ ZEN_ASSERT(LocalContent.ChunkedContent.ChunkCounts == VerifyFolderContent.ChunkedContent.ChunkCounts);
+
+ for (uint32_t OrderIndex = 0; OrderIndex < LocalContent.ChunkedContent.ChunkOrders.size(); OrderIndex++)
+ {
+ uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[OrderIndex];
+ const IoHash LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex];
+ uint64_t LocalChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[LocalChunkIndex];
- uint32_t VerifyChunkIndex = VerifyFolderContent.ChunkedContent.ChunkOrders[OrderIndex];
- const IoHash VerifyChunkHash = VerifyFolderContent.ChunkedContent.ChunkHashes[VerifyChunkIndex];
- uint64_t VerifyChunkRawSize = VerifyFolderContent.ChunkedContent.ChunkRawSizes[VerifyChunkIndex];
+ uint32_t VerifyChunkIndex = VerifyFolderContent.ChunkedContent.ChunkOrders[OrderIndex];
+ const IoHash VerifyChunkHash = VerifyFolderContent.ChunkedContent.ChunkHashes[VerifyChunkIndex];
+ uint64_t VerifyChunkRawSize = VerifyFolderContent.ChunkedContent.ChunkRawSizes[VerifyChunkIndex];
- ZEN_ASSERT(LocalChunkHash == VerifyChunkHash);
- ZEN_ASSERT(LocalChunkRawSize == VerifyChunkRawSize);
+ ZEN_ASSERT(LocalChunkHash == VerifyChunkHash);
+ ZEN_ASSERT(LocalChunkRawSize == VerifyChunkRawSize);
+ }
}
- }
#endif // EXTRA_VERIFY
- PartManifest = PartManifestWriter.Save();
- }
-
- m_LogOutput.SetLogOperationProgress(TaskSteps::Upload, TaskSteps::StepCount);
-
- Stopwatch PutBuildPartResultTimer;
- std::pair<IoHash, std::vector<IoHash>> PutBuildPartResult =
- m_Storage.BuildStorage->PutBuildPart(m_BuildId, m_BuildPartId, m_BuildPartName, PartManifest);
- if (!m_Options.IsQuiet)
- {
- LOG_OUTPUT(m_LogOutput,
- "PutBuildPart took {}, payload size {}. {} attachments are needed.",
- NiceTimeSpanMs(PutBuildPartResultTimer.GetElapsedTimeMs()),
- NiceBytes(PartManifest.GetSize()),
- PutBuildPartResult.second.size());
- }
- IoHash PartHash = PutBuildPartResult.first;
+ PartManifest = PartManifestWriter.Save();
+ }
- auto UploadAttachments = [this, &LocalContent, &LocalLookup, &NewBlockChunks, &NewBlocks, &LooseChunkIndexes, &LargeAttachmentSize](
- std::span<IoHash> RawHashes) {
- if (!m_AbortFlag)
- {
- UploadStatistics TempUploadStats;
- LooseChunksStatistics TempLooseChunksStats;
+ m_LogOutput.SetLogOperationProgress(TaskSteps::Upload, TaskSteps::StepCount);
- Stopwatch TempUploadTimer;
- auto __ = MakeGuard([&]() {
- if (!m_Options.IsQuiet)
- {
- uint64_t TempChunkUploadTimeUs = TempUploadTimer.GetElapsedTimeUs();
- LOG_OUTPUT(m_LogOutput,
- "Uploaded {} ({}) blocks. "
- "Compressed {} ({} {}B/s) and uploaded {} ({}) chunks. "
- "Transferred {} ({}bits/s) in {}",
- TempUploadStats.BlockCount.load(),
- NiceBytes(TempUploadStats.BlocksBytes),
-
- TempLooseChunksStats.CompressedChunkCount.load(),
- NiceBytes(TempLooseChunksStats.CompressedChunkBytes.load()),
- NiceNum(GetBytesPerSecond(TempLooseChunksStats.CompressChunksElapsedWallTimeUS,
- TempLooseChunksStats.ChunkByteCount)),
- TempUploadStats.ChunkCount.load(),
- NiceBytes(TempUploadStats.ChunksBytes),
-
- NiceBytes(TempUploadStats.BlocksBytes + TempUploadStats.ChunksBytes),
- NiceNum(GetBytesPerSecond(TempUploadStats.ElapsedWallTimeUS, TempUploadStats.ChunksBytes * 8)),
- NiceTimeSpanMs(TempChunkUploadTimeUs / 1000));
- }
- });
- UploadPartBlobs(LocalContent,
- LocalLookup,
- RawHashes,
- NewBlockChunks,
- NewBlocks,
- LooseChunkIndexes,
- LargeAttachmentSize,
- TempUploadStats,
- TempLooseChunksStats);
- m_UploadStats += TempUploadStats;
- m_LooseChunksStats += TempLooseChunksStats;
- }
- };
- if (m_Options.IgnoreExistingBlocks)
- {
- if (m_Options.IsVerbose)
+ Stopwatch PutBuildPartResultTimer;
+ std::pair<IoHash, std::vector<IoHash>> PutBuildPartResult =
+ m_Storage.BuildStorage->PutBuildPart(m_BuildId, m_BuildPartId, m_BuildPartName, PartManifest);
+ if (!m_Options.IsQuiet)
{
LOG_OUTPUT(m_LogOutput,
- "PutBuildPart uploading all attachments, needs are: {}",
- FormatArray<IoHash>(PutBuildPartResult.second, "\n "sv));
+ "PutBuildPart took {}, payload size {}. {} attachments are needed.",
+ NiceTimeSpanMs(PutBuildPartResultTimer.GetElapsedTimeMs()),
+ NiceBytes(PartManifest.GetSize()),
+ PutBuildPartResult.second.size());
}
+ IoHash PartHash = PutBuildPartResult.first;
- std::vector<IoHash> ForceUploadChunkHashes;
- ForceUploadChunkHashes.reserve(LooseChunkIndexes.size());
-
- for (uint32_t ChunkIndex : LooseChunkIndexes)
+ auto UploadAttachments = [this, &LocalContent, &LocalLookup, &NewBlockChunks, &NewBlocks, &LooseChunkIndexes, &LargeAttachmentSize](
+ std::span<IoHash> RawHashes) {
+ if (!m_AbortFlag)
+ {
+ UploadStatistics TempUploadStats;
+ LooseChunksStatistics TempLooseChunksStats;
+
+ Stopwatch TempUploadTimer;
+ auto __ = MakeGuard([&]() {
+ if (!m_Options.IsQuiet)
+ {
+ uint64_t TempChunkUploadTimeUs = TempUploadTimer.GetElapsedTimeUs();
+ LOG_OUTPUT(m_LogOutput,
+ "Uploaded {} ({}) blocks. "
+ "Compressed {} ({} {}B/s) and uploaded {} ({}) chunks. "
+ "Transferred {} ({}bits/s) in {}",
+ TempUploadStats.BlockCount.load(),
+ NiceBytes(TempUploadStats.BlocksBytes),
+
+ TempLooseChunksStats.CompressedChunkCount.load(),
+ NiceBytes(TempLooseChunksStats.CompressedChunkBytes.load()),
+ NiceNum(GetBytesPerSecond(TempLooseChunksStats.CompressChunksElapsedWallTimeUS,
+ TempLooseChunksStats.ChunkByteCount)),
+ TempUploadStats.ChunkCount.load(),
+ NiceBytes(TempUploadStats.ChunksBytes),
+
+ NiceBytes(TempUploadStats.BlocksBytes + TempUploadStats.ChunksBytes),
+ NiceNum(GetBytesPerSecond(TempUploadStats.ElapsedWallTimeUS, TempUploadStats.ChunksBytes * 8)),
+ NiceTimeSpanMs(TempChunkUploadTimeUs / 1000));
+ }
+ });
+ UploadPartBlobs(LocalContent,
+ LocalLookup,
+ RawHashes,
+ NewBlockChunks,
+ NewBlocks,
+ LooseChunkIndexes,
+ LargeAttachmentSize,
+ TempUploadStats,
+ TempLooseChunksStats);
+ m_UploadStats += TempUploadStats;
+ m_LooseChunksStats += TempLooseChunksStats;
+ }
+ };
+ if (m_Options.IgnoreExistingBlocks)
{
- ForceUploadChunkHashes.push_back(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]);
- }
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "PutBuildPart uploading all attachments, needs are: {}",
+ FormatArray<IoHash>(PutBuildPartResult.second, "\n "sv));
+ }
- for (size_t BlockIndex = 0; BlockIndex < NewBlocks.BlockHeaders.size(); BlockIndex++)
- {
- if (NewBlocks.BlockHeaders[BlockIndex])
+ std::vector<IoHash> ForceUploadChunkHashes;
+ ForceUploadChunkHashes.reserve(LooseChunkIndexes.size());
+
+ for (uint32_t ChunkIndex : LooseChunkIndexes)
{
- // Block was not uploaded during generation
- ForceUploadChunkHashes.push_back(NewBlocks.BlockDescriptions[BlockIndex].BlockHash);
+ ForceUploadChunkHashes.push_back(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]);
}
- }
- UploadAttachments(ForceUploadChunkHashes);
- }
- else if (!PutBuildPartResult.second.empty())
- {
- if (m_Options.IsVerbose)
- {
- LOG_OUTPUT(m_LogOutput, "PutBuildPart needs attachments: {}", FormatArray<IoHash>(PutBuildPartResult.second, "\n "sv));
- }
- UploadAttachments(PutBuildPartResult.second);
- }
- uint32_t FinalizeBuildPartRetryCount = 5;
- while (!m_AbortFlag && (FinalizeBuildPartRetryCount--) > 0)
- {
- Stopwatch FinalizeBuildPartTimer;
- std::vector<IoHash> Needs = m_Storage.BuildStorage->FinalizeBuildPart(m_BuildId, m_BuildPartId, PartHash);
- if (!m_Options.IsQuiet)
- {
- LOG_OUTPUT(m_LogOutput,
- "FinalizeBuildPart took {}. {} attachments are missing.",
- NiceTimeSpanMs(FinalizeBuildPartTimer.GetElapsedTimeMs()),
- Needs.size());
+ for (size_t BlockIndex = 0; BlockIndex < NewBlocks.BlockHeaders.size(); BlockIndex++)
+ {
+ if (NewBlocks.BlockHeaders[BlockIndex])
+ {
+ // Block was not uploaded during generation
+ ForceUploadChunkHashes.push_back(NewBlocks.BlockDescriptions[BlockIndex].BlockHash);
+ }
+ }
+ UploadAttachments(ForceUploadChunkHashes);
}
- if (Needs.empty())
+ else if (!PutBuildPartResult.second.empty())
{
- break;
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput, "PutBuildPart needs attachments: {}", FormatArray<IoHash>(PutBuildPartResult.second, "\n "sv));
+ }
+ UploadAttachments(PutBuildPartResult.second);
}
- if (m_Options.IsVerbose)
+
+ uint32_t FinalizeBuildPartRetryCount = 5;
+ while (!m_AbortFlag && (FinalizeBuildPartRetryCount--) > 0)
{
- LOG_OUTPUT(m_LogOutput, "FinalizeBuildPart needs attachments: {}", FormatArray<IoHash>(Needs, "\n "sv));
+ Stopwatch FinalizeBuildPartTimer;
+ std::vector<IoHash> Needs = m_Storage.BuildStorage->FinalizeBuildPart(m_BuildId, m_BuildPartId, PartHash);
+ if (!m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "FinalizeBuildPart took {}. {} attachments are missing.",
+ NiceTimeSpanMs(FinalizeBuildPartTimer.GetElapsedTimeMs()),
+ Needs.size());
+ }
+ if (Needs.empty())
+ {
+ break;
+ }
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput, "FinalizeBuildPart needs attachments: {}", FormatArray<IoHash>(Needs, "\n "sv));
+ }
+ UploadAttachments(Needs);
}
- UploadAttachments(Needs);
- }
- m_LogOutput.SetLogOperationProgress(TaskSteps::FinalizeBuild, TaskSteps::StepCount);
+ m_LogOutput.SetLogOperationProgress(TaskSteps::FinalizeBuild, TaskSteps::StepCount);
- if (m_CreateBuild && !m_AbortFlag)
- {
- Stopwatch FinalizeBuildTimer;
- m_Storage.BuildStorage->FinalizeBuild(m_BuildId);
- if (!m_Options.IsQuiet)
+ if (m_CreateBuild && !m_AbortFlag)
{
- LOG_OUTPUT(m_LogOutput, "FinalizeBuild took {}", NiceTimeSpanMs(FinalizeBuildTimer.GetElapsedTimeMs()));
+ Stopwatch FinalizeBuildTimer;
+ m_Storage.BuildStorage->FinalizeBuild(m_BuildId);
+ if (!m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput, "FinalizeBuild took {}", NiceTimeSpanMs(FinalizeBuildTimer.GetElapsedTimeMs()));
+ }
}
- }
-
- if (!NewBlocks.BlockDescriptions.empty() && !m_AbortFlag)
- {
- uint64_t UploadBlockMetadataCount = 0;
- Stopwatch UploadBlockMetadataTimer;
- uint32_t FailedMetadataUploadCount = 1;
- int32_t MetadataUploadRetryCount = 3;
- while ((MetadataUploadRetryCount-- > 0) && (FailedMetadataUploadCount > 0))
+ if (!NewBlocks.BlockDescriptions.empty() && !m_AbortFlag)
{
- FailedMetadataUploadCount = 0;
- for (size_t BlockIndex = 0; BlockIndex < NewBlocks.BlockDescriptions.size(); BlockIndex++)
+ uint64_t UploadBlockMetadataCount = 0;
+ Stopwatch UploadBlockMetadataTimer;
+
+ uint32_t FailedMetadataUploadCount = 1;
+ int32_t MetadataUploadRetryCount = 3;
+ while ((MetadataUploadRetryCount-- > 0) && (FailedMetadataUploadCount > 0))
{
- if (m_AbortFlag)
+ FailedMetadataUploadCount = 0;
+ for (size_t BlockIndex = 0; BlockIndex < NewBlocks.BlockDescriptions.size(); BlockIndex++)
{
- break;
- }
- const IoHash& BlockHash = NewBlocks.BlockDescriptions[BlockIndex].BlockHash;
- if (!NewBlocks.MetaDataHasBeenUploaded[BlockIndex])
- {
- const CbObject BlockMetaData =
- BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]);
- if (m_Storage.BuildCacheStorage)
+ if (m_AbortFlag)
{
- m_Storage.BuildCacheStorage->PutBlobMetadatas(m_BuildId,
- std::vector<IoHash>({BlockHash}),
- std::vector<CbObject>({BlockMetaData}));
+ break;
}
- bool MetadataSucceeded = m_Storage.BuildStorage->PutBlockMetadata(m_BuildId, BlockHash, BlockMetaData);
- if (MetadataSucceeded)
+ const IoHash& BlockHash = NewBlocks.BlockDescriptions[BlockIndex].BlockHash;
+ if (!NewBlocks.MetaDataHasBeenUploaded[BlockIndex])
{
- m_UploadStats.BlocksBytes += BlockMetaData.GetSize();
- NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true;
- UploadBlockMetadataCount++;
- }
- else
- {
- FailedMetadataUploadCount++;
+ const CbObject BlockMetaData =
+ BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]);
+ if (m_Storage.BuildCacheStorage)
+ {
+ m_Storage.BuildCacheStorage->PutBlobMetadatas(m_BuildId,
+ std::vector<IoHash>({BlockHash}),
+ std::vector<CbObject>({BlockMetaData}));
+ }
+ bool MetadataSucceeded = m_Storage.BuildStorage->PutBlockMetadata(m_BuildId, BlockHash, BlockMetaData);
+ if (MetadataSucceeded)
+ {
+ m_UploadStats.BlocksBytes += BlockMetaData.GetSize();
+ NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true;
+ UploadBlockMetadataCount++;
+ }
+ else
+ {
+ FailedMetadataUploadCount++;
+ }
}
}
}
- }
- if (UploadBlockMetadataCount > 0)
- {
- uint64_t ElapsedUS = UploadBlockMetadataTimer.GetElapsedTimeUs();
- m_UploadStats.ElapsedWallTimeUS += ElapsedUS;
- if (!m_Options.IsQuiet)
+ if (UploadBlockMetadataCount > 0)
{
- LOG_OUTPUT(m_LogOutput,
- "Uploaded metadata for {} blocks in {}",
- UploadBlockMetadataCount,
- NiceTimeSpanMs(ElapsedUS / 1000));
+ uint64_t ElapsedUS = UploadBlockMetadataTimer.GetElapsedTimeUs();
+ m_UploadStats.ElapsedWallTimeUS += ElapsedUS;
+ if (!m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Uploaded metadata for {} blocks in {}",
+ UploadBlockMetadataCount,
+ NiceTimeSpanMs(ElapsedUS / 1000));
+ }
}
}
- }
- m_Storage.BuildStorage->PutBuildPartStats(
- m_BuildId,
- m_BuildPartId,
- {{"totalSize", double(m_LocalFolderScanStats.FoundFileByteCount.load())},
- {"reusedRatio", AcceptedByteCountPercent / 100.0},
- {"reusedBlockCount", double(m_FindBlocksStats.AcceptedBlockCount)},
- {"reusedBlockByteCount", double(m_FindBlocksStats.AcceptedRawByteCount)},
- {"newBlockCount", double(m_FindBlocksStats.NewBlocksCount)},
- {"newBlockByteCount", double(m_FindBlocksStats.NewBlocksChunkByteCount)},
- {"uploadedCount", double(m_UploadStats.BlockCount.load() + m_UploadStats.ChunkCount.load())},
- {"uploadedByteCount", double(m_UploadStats.BlocksBytes.load() + m_UploadStats.ChunksBytes.load())},
- {"uploadedBytesPerSec",
- double(GetBytesPerSecond(m_UploadStats.ElapsedWallTimeUS, m_UploadStats.ChunksBytes + m_UploadStats.BlocksBytes))},
- {"elapsedTimeSec", double(ProcessTimer.GetElapsedTimeMs() / 1000.0)}});
-
- m_LogOutput.SetLogOperationProgress(TaskSteps::Cleanup, TaskSteps::StepCount);
+ m_Storage.BuildStorage->PutBuildPartStats(
+ m_BuildId,
+ m_BuildPartId,
+ {{"totalSize", double(m_LocalFolderScanStats.FoundFileByteCount.load())},
+ {"reusedRatio", AcceptedByteCountPercent / 100.0},
+ {"reusedBlockCount", double(m_FindBlocksStats.AcceptedBlockCount)},
+ {"reusedBlockByteCount", double(m_FindBlocksStats.AcceptedRawByteCount)},
+ {"newBlockCount", double(m_FindBlocksStats.NewBlocksCount)},
+ {"newBlockByteCount", double(m_FindBlocksStats.NewBlocksChunkByteCount)},
+ {"uploadedCount", double(m_UploadStats.BlockCount.load() + m_UploadStats.ChunkCount.load())},
+ {"uploadedByteCount", double(m_UploadStats.BlocksBytes.load() + m_UploadStats.ChunksBytes.load())},
+ {"uploadedBytesPerSec",
+ double(GetBytesPerSecond(m_UploadStats.ElapsedWallTimeUS, m_UploadStats.ChunksBytes + m_UploadStats.BlocksBytes))},
+ {"elapsedTimeSec", double(ProcessTimer.GetElapsedTimeMs() / 1000.0)}});
+
+ m_LogOutput.SetLogOperationProgress(TaskSteps::Cleanup, TaskSteps::StepCount);
+ }
+ catch (const std::exception&)
+ {
+ m_AbortFlag = true;
+ throw;
+ }
}
std::vector<std::filesystem::path>
@@ -6797,260 +6831,272 @@ void
BuildsOperationValidateBuildPart::Execute()
{
ZEN_TRACE_CPU("ValidateBuildPart");
-
- enum TaskSteps : uint32_t
+ try
{
- FetchBuild,
- FetchBuildPart,
- ValidateBlobs,
- Cleanup,
- StepCount
- };
+ enum TaskSteps : uint32_t
+ {
+ FetchBuild,
+ FetchBuildPart,
+ ValidateBlobs,
+ Cleanup,
+ StepCount
+ };
- auto EndProgress = MakeGuard([&]() { m_LogOutput.SetLogOperationProgress(TaskSteps::StepCount, TaskSteps::StepCount); });
+ auto EndProgress = MakeGuard([&]() { m_LogOutput.SetLogOperationProgress(TaskSteps::StepCount, TaskSteps::StepCount); });
- Stopwatch Timer;
- auto _ = MakeGuard([&]() {
- if (!m_Options.IsQuiet)
- {
- ZEN_CONSOLE("Validated build part {}/{} ('{}') in {}",
- m_BuildId,
- m_BuildPartId,
- m_BuildPartName,
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- }
- });
+ Stopwatch Timer;
+ auto _ = MakeGuard([&]() {
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_CONSOLE("Validated build part {}/{} ('{}') in {}",
+ m_BuildId,
+ m_BuildPartId,
+ m_BuildPartName,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ }
+ });
- m_LogOutput.SetLogOperationProgress(TaskSteps::FetchBuild, TaskSteps::StepCount);
+ m_LogOutput.SetLogOperationProgress(TaskSteps::FetchBuild, TaskSteps::StepCount);
- CbObject Build = m_Storage.GetBuild(m_BuildId);
- if (!m_BuildPartName.empty())
- {
- m_BuildPartId = Build["parts"sv].AsObjectView()[m_BuildPartName].AsObjectId();
- if (m_BuildPartId == Oid::Zero)
+ CbObject Build = m_Storage.GetBuild(m_BuildId);
+ if (!m_BuildPartName.empty())
{
- throw std::runtime_error(fmt::format("Build {} does not have a part named '{}'", m_BuildId, m_BuildPartName));
+ m_BuildPartId = Build["parts"sv].AsObjectView()[m_BuildPartName].AsObjectId();
+ if (m_BuildPartId == Oid::Zero)
+ {
+ throw std::runtime_error(fmt::format("Build {} does not have a part named '{}'", m_BuildId, m_BuildPartName));
+ }
+ }
+ m_ValidateStats.BuildBlobSize = Build.GetSize();
+ uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u;
+ if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0)
+ {
+ PreferredMultipartChunkSize = ChunkSize;
}
- }
- m_ValidateStats.BuildBlobSize = Build.GetSize();
- uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u;
- if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0)
- {
- PreferredMultipartChunkSize = ChunkSize;
- }
-
- m_LogOutput.SetLogOperationProgress(TaskSteps::FetchBuildPart, TaskSteps::StepCount);
-
- CbObject BuildPart = m_Storage.GetBuildPart(m_BuildId, m_BuildPartId);
- m_ValidateStats.BuildPartSize = BuildPart.GetSize();
- if (!m_Options.IsQuiet)
- {
- ZEN_CONSOLE("Validating build part {}/{} ({})", m_BuildId, m_BuildPartId, NiceBytes(BuildPart.GetSize()));
- }
- std::vector<IoHash> ChunkAttachments;
- for (CbFieldView LooseFileView : BuildPart["chunkAttachments"sv].AsObjectView()["rawHashes"sv])
- {
- ChunkAttachments.push_back(LooseFileView.AsBinaryAttachment());
- }
- m_ValidateStats.ChunkAttachmentCount = ChunkAttachments.size();
- std::vector<IoHash> BlockAttachments;
- for (CbFieldView BlocksView : BuildPart["blockAttachments"sv].AsObjectView()["rawHashes"sv])
- {
- BlockAttachments.push_back(BlocksView.AsBinaryAttachment());
- }
- m_ValidateStats.BlockAttachmentCount = BlockAttachments.size();
-
- std::vector<ChunkBlockDescription> VerifyBlockDescriptions =
- ParseChunkBlockDescriptionList(m_Storage.GetBlockMetadatas(m_BuildId, BlockAttachments));
- if (VerifyBlockDescriptions.size() != BlockAttachments.size())
- {
- throw std::runtime_error(fmt::format("Uploaded blocks metadata could not all be found, {} blocks metadata is missing",
- BlockAttachments.size() - VerifyBlockDescriptions.size()));
- }
- ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ m_LogOutput.SetLogOperationProgress(TaskSteps::FetchBuildPart, TaskSteps::StepCount);
- const std::filesystem::path TempFolder = ".zen-tmp";
+ CbObject BuildPart = m_Storage.GetBuildPart(m_BuildId, m_BuildPartId);
+ m_ValidateStats.BuildPartSize = BuildPart.GetSize();
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_CONSOLE("Validating build part {}/{} ({})", m_BuildId, m_BuildPartId, NiceBytes(BuildPart.GetSize()));
+ }
+ std::vector<IoHash> ChunkAttachments;
+ for (CbFieldView LooseFileView : BuildPart["chunkAttachments"sv].AsObjectView()["rawHashes"sv])
+ {
+ ChunkAttachments.push_back(LooseFileView.AsBinaryAttachment());
+ }
+ m_ValidateStats.ChunkAttachmentCount = ChunkAttachments.size();
+ std::vector<IoHash> BlockAttachments;
+ for (CbFieldView BlocksView : BuildPart["blockAttachments"sv].AsObjectView()["rawHashes"sv])
+ {
+ BlockAttachments.push_back(BlocksView.AsBinaryAttachment());
+ }
+ m_ValidateStats.BlockAttachmentCount = BlockAttachments.size();
- CreateDirectories(TempFolder);
- auto __ = MakeGuard([&TempFolder]() {
- if (CleanDirectory(TempFolder, {}))
+ std::vector<ChunkBlockDescription> VerifyBlockDescriptions =
+ ParseChunkBlockDescriptionList(m_Storage.GetBlockMetadatas(m_BuildId, BlockAttachments));
+ if (VerifyBlockDescriptions.size() != BlockAttachments.size())
{
- std::error_code DummyEc;
- RemoveDir(TempFolder, DummyEc);
+ throw std::runtime_error(fmt::format("Uploaded blocks metadata could not all be found, {} blocks metadata is missing",
+ BlockAttachments.size() - VerifyBlockDescriptions.size()));
}
- });
- m_LogOutput.SetLogOperationProgress(TaskSteps::ValidateBlobs, TaskSteps::StepCount);
+ ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
- std::unique_ptr<BuildOpLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Validate Blobs"));
- BuildOpLogOutput::ProgressBar& Progress(*ProgressBarPtr);
+ const std::filesystem::path TempFolder = ".zen-tmp";
- uint64_t AttachmentsToVerifyCount = ChunkAttachments.size() + BlockAttachments.size();
- FilteredRate FilteredDownloadedBytesPerSecond;
- FilteredRate FilteredVerifiedBytesPerSecond;
+ CreateDirectories(TempFolder);
+ auto __ = MakeGuard([&TempFolder]() {
+ if (CleanDirectory(TempFolder, {}))
+ {
+ std::error_code DummyEc;
+ RemoveDir(TempFolder, DummyEc);
+ }
+ });
- std::atomic<uint64_t> MultipartAttachmentCount = 0;
+ m_LogOutput.SetLogOperationProgress(TaskSteps::ValidateBlobs, TaskSteps::StepCount);
- for (const IoHash& ChunkAttachment : ChunkAttachments)
- {
- Work.ScheduleWork(
- m_NetworkPool,
- [this,
- &Work,
- AttachmentsToVerifyCount,
- &TempFolder,
- PreferredMultipartChunkSize,
- &FilteredDownloadedBytesPerSecond,
- &FilteredVerifiedBytesPerSecond,
- ChunkAttachment](std::atomic<bool>&) {
- if (!m_AbortFlag)
- {
- ZEN_TRACE_CPU("ValidateBuildPart_GetChunk");
-
- FilteredDownloadedBytesPerSecond.Start();
- DownloadLargeBlob(
- m_Storage,
- TempFolder,
- m_BuildId,
- ChunkAttachment,
- PreferredMultipartChunkSize,
- Work,
- m_NetworkPool,
- m_DownloadStats.DownloadedChunkByteCount,
- m_DownloadStats.MultipartAttachmentCount,
- [this,
- &Work,
- AttachmentsToVerifyCount,
- &FilteredDownloadedBytesPerSecond,
- &FilteredVerifiedBytesPerSecond,
- ChunkHash = ChunkAttachment](IoBuffer&& Payload) {
- m_DownloadStats.DownloadedChunkCount++;
- Payload.SetContentType(ZenContentType::kCompressedBinary);
- if (!m_AbortFlag)
- {
- Work.ScheduleWork(
- m_IOWorkerPool,
- [this,
- AttachmentsToVerifyCount,
- &FilteredDownloadedBytesPerSecond,
- &FilteredVerifiedBytesPerSecond,
- Payload = std::move(Payload),
- ChunkHash](std::atomic<bool>&) mutable {
- if (!m_AbortFlag)
- {
- ZEN_TRACE_CPU("ValidateBuildPart_Validate");
+ std::unique_ptr<BuildOpLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Validate Blobs"));
+ BuildOpLogOutput::ProgressBar& Progress(*ProgressBarPtr);
- if (m_DownloadStats.DownloadedChunkCount + m_DownloadStats.DownloadedBlockCount ==
- AttachmentsToVerifyCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
+ uint64_t AttachmentsToVerifyCount = ChunkAttachments.size() + BlockAttachments.size();
+ FilteredRate FilteredDownloadedBytesPerSecond;
+ FilteredRate FilteredVerifiedBytesPerSecond;
+
+ std::atomic<uint64_t> MultipartAttachmentCount = 0;
- FilteredVerifiedBytesPerSecond.Start();
+ for (const IoHash& ChunkAttachment : ChunkAttachments)
+ {
+ Work.ScheduleWork(
+ m_NetworkPool,
+ [this,
+ &Work,
+ AttachmentsToVerifyCount,
+ &TempFolder,
+ PreferredMultipartChunkSize,
+ &FilteredDownloadedBytesPerSecond,
+ &FilteredVerifiedBytesPerSecond,
+ ChunkAttachment](std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("ValidateBuildPart_GetChunk");
- uint64_t CompressedSize;
- uint64_t DecompressedSize;
- ValidateBlob(m_AbortFlag, std::move(Payload), ChunkHash, CompressedSize, DecompressedSize);
- m_ValidateStats.VerifiedAttachmentCount++;
- m_ValidateStats.VerifiedByteCount += DecompressedSize;
- if (m_ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount)
+ FilteredDownloadedBytesPerSecond.Start();
+ DownloadLargeBlob(
+ m_Storage,
+ TempFolder,
+ m_BuildId,
+ ChunkAttachment,
+ PreferredMultipartChunkSize,
+ Work,
+ m_NetworkPool,
+ m_DownloadStats.DownloadedChunkByteCount,
+ m_DownloadStats.MultipartAttachmentCount,
+ [this,
+ &Work,
+ AttachmentsToVerifyCount,
+ &FilteredDownloadedBytesPerSecond,
+ &FilteredVerifiedBytesPerSecond,
+ ChunkHash = ChunkAttachment](IoBuffer&& Payload) {
+ m_DownloadStats.DownloadedChunkCount++;
+ Payload.SetContentType(ZenContentType::kCompressedBinary);
+ if (!m_AbortFlag)
+ {
+ Work.ScheduleWork(
+ m_IOWorkerPool,
+ [this,
+ AttachmentsToVerifyCount,
+ &FilteredDownloadedBytesPerSecond,
+ &FilteredVerifiedBytesPerSecond,
+ Payload = std::move(Payload),
+ ChunkHash](std::atomic<bool>&) mutable {
+ if (!m_AbortFlag)
{
- FilteredVerifiedBytesPerSecond.Stop();
- }
- }
- });
- }
- });
- }
- });
- }
+ ZEN_TRACE_CPU("ValidateBuildPart_Validate");
- for (const IoHash& BlockAttachment : BlockAttachments)
- {
- Work.ScheduleWork(
- m_NetworkPool,
- [this, &Work, AttachmentsToVerifyCount, &FilteredDownloadedBytesPerSecond, &FilteredVerifiedBytesPerSecond, BlockAttachment](
- std::atomic<bool>&) {
- if (!m_AbortFlag)
- {
- ZEN_TRACE_CPU("ValidateBuildPart_GetBlock");
+ if (m_DownloadStats.DownloadedChunkCount + m_DownloadStats.DownloadedBlockCount ==
+ AttachmentsToVerifyCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
- FilteredDownloadedBytesPerSecond.Start();
- IoBuffer Payload = m_Storage.GetBuildBlob(m_BuildId, BlockAttachment);
- m_DownloadStats.DownloadedBlockCount++;
- m_DownloadStats.DownloadedBlockByteCount += Payload.GetSize();
- if (m_DownloadStats.DownloadedChunkCount + m_DownloadStats.DownloadedBlockCount == AttachmentsToVerifyCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
- if (!Payload)
- {
- throw std::runtime_error(fmt::format("Block attachment {} could not be found", BlockAttachment));
+ FilteredVerifiedBytesPerSecond.Start();
+
+ uint64_t CompressedSize;
+ uint64_t DecompressedSize;
+ ValidateBlob(m_AbortFlag, std::move(Payload), ChunkHash, CompressedSize, DecompressedSize);
+ m_ValidateStats.VerifiedAttachmentCount++;
+ m_ValidateStats.VerifiedByteCount += DecompressedSize;
+ if (m_ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount)
+ {
+ FilteredVerifiedBytesPerSecond.Stop();
+ }
+ }
+ });
+ }
+ });
}
+ });
+ }
+
+ for (const IoHash& BlockAttachment : BlockAttachments)
+ {
+ Work.ScheduleWork(
+ m_NetworkPool,
+ [this,
+ &Work,
+ AttachmentsToVerifyCount,
+ &FilteredDownloadedBytesPerSecond,
+ &FilteredVerifiedBytesPerSecond,
+ BlockAttachment](std::atomic<bool>&) {
if (!m_AbortFlag)
{
- Work.ScheduleWork(m_IOWorkerPool,
- [this,
- &FilteredVerifiedBytesPerSecond,
- AttachmentsToVerifyCount,
- Payload = std::move(Payload),
- BlockAttachment](std::atomic<bool>&) mutable {
- if (!m_AbortFlag)
- {
- ZEN_TRACE_CPU("ValidateBuildPart_ValidateBlock");
+ ZEN_TRACE_CPU("ValidateBuildPart_GetBlock");
+
+ FilteredDownloadedBytesPerSecond.Start();
+ IoBuffer Payload = m_Storage.GetBuildBlob(m_BuildId, BlockAttachment);
+ m_DownloadStats.DownloadedBlockCount++;
+ m_DownloadStats.DownloadedBlockByteCount += Payload.GetSize();
+ if (m_DownloadStats.DownloadedChunkCount + m_DownloadStats.DownloadedBlockCount == AttachmentsToVerifyCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ if (!Payload)
+ {
+ throw std::runtime_error(fmt::format("Block attachment {} could not be found", BlockAttachment));
+ }
+ if (!m_AbortFlag)
+ {
+ Work.ScheduleWork(
+ m_IOWorkerPool,
+ [this,
+ &FilteredVerifiedBytesPerSecond,
+ AttachmentsToVerifyCount,
+ Payload = std::move(Payload),
+ BlockAttachment](std::atomic<bool>&) mutable {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("ValidateBuildPart_ValidateBlock");
- FilteredVerifiedBytesPerSecond.Start();
+ FilteredVerifiedBytesPerSecond.Start();
- uint64_t CompressedSize;
- uint64_t DecompressedSize;
- ValidateChunkBlock(std::move(Payload), BlockAttachment, CompressedSize, DecompressedSize);
- m_ValidateStats.VerifiedAttachmentCount++;
- m_ValidateStats.VerifiedByteCount += DecompressedSize;
- if (m_ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount)
- {
- FilteredVerifiedBytesPerSecond.Stop();
- }
- }
- });
+ uint64_t CompressedSize;
+ uint64_t DecompressedSize;
+ ValidateChunkBlock(std::move(Payload), BlockAttachment, CompressedSize, DecompressedSize);
+ m_ValidateStats.VerifiedAttachmentCount++;
+ m_ValidateStats.VerifiedByteCount += DecompressedSize;
+ if (m_ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount)
+ {
+ FilteredVerifiedBytesPerSecond.Stop();
+ }
+ }
+ });
+ }
}
- }
- });
- }
+ });
+ }
- Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(PendingWork);
-
- const uint64_t DownloadedAttachmentCount = m_DownloadStats.DownloadedChunkCount + m_DownloadStats.DownloadedBlockCount;
- const uint64_t DownloadedByteCount = m_DownloadStats.DownloadedChunkByteCount + m_DownloadStats.DownloadedBlockByteCount;
-
- FilteredDownloadedBytesPerSecond.Update(DownloadedByteCount);
- FilteredVerifiedBytesPerSecond.Update(m_ValidateStats.VerifiedByteCount);
-
- std::string Details = fmt::format("Downloaded {}/{} ({}, {}bits/s). Verified {}/{} ({}, {}B/s)",
- DownloadedAttachmentCount,
- AttachmentsToVerifyCount,
- NiceBytes(DownloadedByteCount),
- NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8),
- m_ValidateStats.VerifiedAttachmentCount.load(),
- AttachmentsToVerifyCount,
- NiceBytes(m_ValidateStats.VerifiedByteCount.load()),
- NiceNum(FilteredVerifiedBytesPerSecond.GetCurrent()));
-
- Progress.UpdateState(
- {.Task = "Validating blobs ",
- .Details = Details,
- .TotalCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2),
- .RemainingCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2 -
- (DownloadedAttachmentCount + m_ValidateStats.VerifiedAttachmentCount.load())),
- .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
- false);
- });
+ Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
- Progress.Finish();
- m_ValidateStats.ElapsedWallTimeUS = Timer.GetElapsedTimeUs();
+ const uint64_t DownloadedAttachmentCount = m_DownloadStats.DownloadedChunkCount + m_DownloadStats.DownloadedBlockCount;
+ const uint64_t DownloadedByteCount = m_DownloadStats.DownloadedChunkByteCount + m_DownloadStats.DownloadedBlockByteCount;
+
+ FilteredDownloadedBytesPerSecond.Update(DownloadedByteCount);
+ FilteredVerifiedBytesPerSecond.Update(m_ValidateStats.VerifiedByteCount);
+
+ std::string Details = fmt::format("Downloaded {}/{} ({}, {}bits/s). Verified {}/{} ({}, {}B/s)",
+ DownloadedAttachmentCount,
+ AttachmentsToVerifyCount,
+ NiceBytes(DownloadedByteCount),
+ NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8),
+ m_ValidateStats.VerifiedAttachmentCount.load(),
+ AttachmentsToVerifyCount,
+ NiceBytes(m_ValidateStats.VerifiedByteCount.load()),
+ NiceNum(FilteredVerifiedBytesPerSecond.GetCurrent()));
+
+ Progress.UpdateState(
+ {.Task = "Validating blobs ",
+ .Details = Details,
+ .TotalCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2),
+ .RemainingCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2 -
+ (DownloadedAttachmentCount + m_ValidateStats.VerifiedAttachmentCount.load())),
+ .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ });
+
+ Progress.Finish();
+ m_ValidateStats.ElapsedWallTimeUS = Timer.GetElapsedTimeUs();
- m_LogOutput.SetLogOperationProgress(TaskSteps::Cleanup, TaskSteps::StepCount);
+ m_LogOutput.SetLogOperationProgress(TaskSteps::Cleanup, TaskSteps::StepCount);
+ }
+ catch (const std::exception&)
+ {
+ m_AbortFlag = true;
+ throw;
+ }
}
CompositeBuffer
diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
index 7e3903892..8b5b63ef9 100644
--- a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
+++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
@@ -197,6 +197,7 @@ public:
bool PrimeCacheOnly = false;
bool EnableOtherDownloadsScavenging = true;
bool EnableTargetFolderScavenging = true;
+ bool ValidateCompletedSequences = true;
std::vector<std::string> ExcludeFolders;
std::vector<std::string> ExcludeExtensions;
};
@@ -378,6 +379,8 @@ private:
BufferedWriteFileCache& WriteCache,
IoBuffer&& CompressedPart);
+ void StreamDecompress(const IoHash& SequenceRawHash, CompositeBuffer&& CompressedPart);
+
void WriteSequenceChunkToCache(BufferedWriteFileCache::Local& LocalWriter,
const CompositeBuffer& Chunk,
const uint32_t SequenceIndex,