aboutsummaryrefslogtreecommitdiff
path: root/src/zenremotestore/builds/buildstorageoperations.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenremotestore/builds/buildstorageoperations.cpp')
-rw-r--r--src/zenremotestore/builds/buildstorageoperations.cpp914
1 files changed, 261 insertions, 653 deletions
diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp
index ade431393..5219e86d8 100644
--- a/src/zenremotestore/builds/buildstorageoperations.cpp
+++ b/src/zenremotestore/builds/buildstorageoperations.cpp
@@ -484,24 +484,6 @@ private:
uint64_t FilteredPerSecond = 0;
};
-EPartialBlockRequestMode
-PartialBlockRequestModeFromString(const std::string_view ModeString)
-{
- switch (HashStringAsLowerDjb2(ModeString))
- {
- case HashStringDjb2("false"):
- return EPartialBlockRequestMode::Off;
- case HashStringDjb2("zencacheonly"):
- return EPartialBlockRequestMode::ZenCacheOnly;
- case HashStringDjb2("mixed"):
- return EPartialBlockRequestMode::Mixed;
- case HashStringDjb2("true"):
- return EPartialBlockRequestMode::All;
- default:
- return EPartialBlockRequestMode::Invalid;
- }
-}
-
std::filesystem::path
ZenStateFilePath(const std::filesystem::path& ZenFolderPath)
{
@@ -579,13 +561,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
CreateDirectories(m_TempDownloadFolderPath);
CreateDirectories(m_TempBlockFolderPath);
- Stopwatch IndexTimer;
-
- if (!m_Options.IsQuiet)
- {
- ZEN_OPERATION_LOG_INFO(m_LogOutput, "Indexed local and remote content in {}", NiceTimeSpanMs(IndexTimer.GetElapsedTimeMs()));
- }
-
Stopwatch CacheMappingTimer;
std::vector<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters(m_RemoteContent.ChunkedContent.SequenceRawHashes.size());
@@ -906,343 +881,220 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
CheckRequiredDiskSpace(RemotePathToRemoteIndex);
+ BlobsExistsResult ExistsResult;
{
- ZEN_TRACE_CPU("WriteChunks");
-
- m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::WriteChunks, (uint32_t)TaskSteps::StepCount);
+ ChunkBlockAnalyser BlockAnalyser(m_LogOutput,
+ m_BlockDescriptions,
+ ChunkBlockAnalyser::Options{.IsQuiet = m_Options.IsQuiet,
+ .IsVerbose = m_Options.IsVerbose,
+ .HostLatencySec = m_Storage.BuildStorageLatencySec,
+ .HostHighSpeedLatencySec = m_Storage.CacheLatencySec});
- Stopwatch WriteTimer;
+ std::vector<ChunkBlockAnalyser::NeededBlock> NeededBlocks = BlockAnalyser.GetNeeded(
+ m_RemoteLookup.ChunkHashToChunkIndex,
+ [&](uint32_t RemoteChunkIndex) -> bool { return RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex]; });
- FilteredRate FilteredDownloadedBytesPerSecond;
- FilteredRate FilteredWrittenBytesPerSecond;
+ std::vector<uint32_t> FetchBlockIndexes;
+ std::vector<uint32_t> CachedChunkBlockIndexes;
- std::unique_ptr<OperationLogOutput::ProgressBar> WriteProgressBarPtr(
- m_LogOutput.CreateProgressBar(m_Options.PrimeCacheOnly ? "Downloading" : "Writing"));
- OperationLogOutput::ProgressBar& WriteProgressBar(*WriteProgressBarPtr);
- ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
-
- struct LooseChunkHashWorkData
{
- std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs;
- uint32_t RemoteChunkIndex = (uint32_t)-1;
- };
-
- std::vector<LooseChunkHashWorkData> LooseChunkHashWorks;
- TotalPartWriteCount += CopyChunkDatas.size();
- TotalPartWriteCount += ScavengedSequenceCopyOperations.size();
-
- for (const IoHash ChunkHash : m_LooseChunkHashes)
- {
- auto RemoteChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash);
- ZEN_ASSERT(RemoteChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end());
- const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second;
- if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex])
+ ZEN_TRACE_CPU("BlockCacheFileExists");
+ for (const ChunkBlockAnalyser::NeededBlock& NeededBlock : NeededBlocks)
{
- if (m_Options.IsVerbose)
- {
- ZEN_OPERATION_LOG_INFO(m_LogOutput, "Skipping chunk {} due to cache reuse", ChunkHash);
- }
- continue;
- }
- bool NeedsCopy = true;
- if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex].compare_exchange_strong(NeedsCopy, false))
- {
- std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
- GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex);
-
- if (ChunkTargetPtrs.empty())
+ if (m_Options.PrimeCacheOnly)
{
- if (m_Options.IsVerbose)
- {
- ZEN_OPERATION_LOG_INFO(m_LogOutput, "Skipping chunk {} due to cache reuse", ChunkHash);
- }
+ FetchBlockIndexes.push_back(NeededBlock.BlockIndex);
}
else
{
- TotalRequestCount++;
- TotalPartWriteCount++;
- LooseChunkHashWorks.push_back(
- LooseChunkHashWorkData{.ChunkTargetPtrs = ChunkTargetPtrs, .RemoteChunkIndex = RemoteChunkIndex});
- }
- }
- }
-
- uint32_t BlockCount = gsl::narrow<uint32_t>(m_BlockDescriptions.size());
-
- std::vector<bool> ChunkIsPickedUpByBlock(m_RemoteContent.ChunkedContent.ChunkHashes.size(), false);
- auto GetNeededChunkBlockIndexes = [this, &RemoteChunkIndexNeedsCopyFromSourceFlags, &ChunkIsPickedUpByBlock](
- const ChunkBlockDescription& BlockDescription) {
- ZEN_TRACE_CPU("GetNeededChunkBlockIndexes");
- std::vector<uint32_t> NeededBlockChunkIndexes;
- for (uint32_t ChunkBlockIndex = 0; ChunkBlockIndex < BlockDescription.ChunkRawHashes.size(); ChunkBlockIndex++)
- {
- const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex];
- if (auto It = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); It != m_RemoteLookup.ChunkHashToChunkIndex.end())
- {
- const uint32_t RemoteChunkIndex = It->second;
- if (!ChunkIsPickedUpByBlock[RemoteChunkIndex])
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[NeededBlock.BlockIndex];
+ bool UsingCachedBlock = false;
+ if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end())
{
- if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex])
+ TotalPartWriteCount++;
+
+ std::filesystem::path BlockPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString();
+ if (IsFile(BlockPath))
{
- ChunkIsPickedUpByBlock[RemoteChunkIndex] = true;
- NeededBlockChunkIndexes.push_back(ChunkBlockIndex);
+ CachedChunkBlockIndexes.push_back(NeededBlock.BlockIndex);
+ UsingCachedBlock = true;
}
}
- }
- else
- {
- ZEN_DEBUG("Chunk {} not found in block {}", ChunkHash, BlockDescription.BlockHash);
+ if (!UsingCachedBlock)
+ {
+ FetchBlockIndexes.push_back(NeededBlock.BlockIndex);
+ }
}
}
- return NeededBlockChunkIndexes;
- };
+ }
- std::vector<uint32_t> CachedChunkBlockIndexes;
- std::vector<uint32_t> FetchBlockIndexes;
- std::vector<std::vector<uint32_t>> AllBlockChunkIndexNeeded;
+ std::vector<uint32_t> NeededLooseChunkIndexes;
- for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; BlockIndex++)
{
- const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
-
- std::vector<uint32_t> BlockChunkIndexNeeded = GetNeededChunkBlockIndexes(BlockDescription);
- if (!BlockChunkIndexNeeded.empty())
+ NeededLooseChunkIndexes.reserve(m_LooseChunkHashes.size());
+ for (uint32_t LooseChunkIndex = 0; LooseChunkIndex < m_LooseChunkHashes.size(); LooseChunkIndex++)
{
- if (m_Options.PrimeCacheOnly)
+ const IoHash& ChunkHash = m_LooseChunkHashes[LooseChunkIndex];
+ auto RemoteChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash);
+ ZEN_ASSERT(RemoteChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end());
+ const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second;
+
+ if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex])
{
- FetchBlockIndexes.push_back(BlockIndex);
+ if (m_Options.IsVerbose)
+ {
+ ZEN_OPERATION_LOG_INFO(m_LogOutput,
+ "Skipping chunk {} due to cache reuse",
+ m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]);
+ }
+ continue;
}
- else
+
+ bool NeedsCopy = true;
+ if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex].compare_exchange_strong(NeedsCopy, false))
{
- bool UsingCachedBlock = false;
- if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end())
+ uint64_t WriteCount = GetChunkWriteCount(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex);
+ if (WriteCount == 0)
{
- TotalPartWriteCount++;
-
- std::filesystem::path BlockPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString();
- if (IsFile(BlockPath))
+ if (m_Options.IsVerbose)
{
- CachedChunkBlockIndexes.push_back(BlockIndex);
- UsingCachedBlock = true;
+ ZEN_OPERATION_LOG_INFO(m_LogOutput,
+ "Skipping chunk {} due to cache reuse",
+ m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]);
}
}
- if (!UsingCachedBlock)
+ else
{
- FetchBlockIndexes.push_back(BlockIndex);
+ NeededLooseChunkIndexes.push_back(LooseChunkIndex);
}
}
}
- AllBlockChunkIndexNeeded.emplace_back(std::move(BlockChunkIndexNeeded));
}
- BlobsExistsResult ExistsResult;
-
if (m_Storage.BuildCacheStorage)
{
ZEN_TRACE_CPU("BlobCacheExistCheck");
Stopwatch Timer;
- tsl::robin_set<IoHash> BlobHashesSet;
+ std::vector<IoHash> BlobHashes;
+ BlobHashes.reserve(NeededLooseChunkIndexes.size() + FetchBlockIndexes.size());
- BlobHashesSet.reserve(LooseChunkHashWorks.size() + FetchBlockIndexes.size());
- for (LooseChunkHashWorkData& LooseChunkHashWork : LooseChunkHashWorks)
+ for (const uint32_t LooseChunkIndex : NeededLooseChunkIndexes)
{
- BlobHashesSet.insert(m_RemoteContent.ChunkedContent.ChunkHashes[LooseChunkHashWork.RemoteChunkIndex]);
+ BlobHashes.push_back(m_LooseChunkHashes[LooseChunkIndex]);
}
+
for (uint32_t BlockIndex : FetchBlockIndexes)
{
- const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
- BlobHashesSet.insert(BlockDescription.BlockHash);
+ BlobHashes.push_back(m_BlockDescriptions[BlockIndex].BlockHash);
}
- if (!BlobHashesSet.empty())
- {
- const std::vector<IoHash> BlobHashes(BlobHashesSet.begin(), BlobHashesSet.end());
- const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult =
- m_Storage.BuildCacheStorage->BlobsExists(m_BuildId, BlobHashes);
+ const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult =
+ m_Storage.BuildCacheStorage->BlobsExists(m_BuildId, BlobHashes);
- if (CacheExistsResult.size() == BlobHashes.size())
+ if (CacheExistsResult.size() == BlobHashes.size())
+ {
+ ExistsResult.ExistingBlobs.reserve(CacheExistsResult.size());
+ for (size_t BlobIndex = 0; BlobIndex < BlobHashes.size(); BlobIndex++)
{
- ExistsResult.ExistingBlobs.reserve(CacheExistsResult.size());
- for (size_t BlobIndex = 0; BlobIndex < BlobHashes.size(); BlobIndex++)
+ if (CacheExistsResult[BlobIndex].HasBody)
{
- if (CacheExistsResult[BlobIndex].HasBody)
- {
- ExistsResult.ExistingBlobs.insert(BlobHashes[BlobIndex]);
- }
+ ExistsResult.ExistingBlobs.insert(BlobHashes[BlobIndex]);
}
}
- ExistsResult.ElapsedTimeMs = Timer.GetElapsedTimeMs();
- if (!ExistsResult.ExistingBlobs.empty() && !m_Options.IsQuiet)
- {
- ZEN_OPERATION_LOG_INFO(m_LogOutput,
- "Remote cache : Found {} out of {} needed blobs in {}",
- ExistsResult.ExistingBlobs.size(),
- BlobHashes.size(),
- NiceTimeSpanMs(ExistsResult.ElapsedTimeMs));
- }
+ }
+ ExistsResult.ElapsedTimeMs = Timer.GetElapsedTimeMs();
+ if (!ExistsResult.ExistingBlobs.empty() && !m_Options.IsQuiet)
+ {
+ ZEN_OPERATION_LOG_INFO(m_LogOutput,
+ "Remote cache : Found {} out of {} needed blobs in {}",
+ ExistsResult.ExistingBlobs.size(),
+ BlobHashes.size(),
+ NiceTimeSpanMs(ExistsResult.ElapsedTimeMs));
}
}
- std::vector<BlockRangeDescriptor> BlockRangeWorks;
- std::vector<uint32_t> FullBlockWorks;
+ std::vector<ChunkBlockAnalyser::EPartialBlockDownloadMode> BlockPartialDownloadModes;
+ if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Off)
{
- Stopwatch Timer;
-
- std::vector<uint32_t> PartialBlockIndexes;
-
- for (uint32_t BlockIndex : FetchBlockIndexes)
+ BlockPartialDownloadModes.resize(m_BlockDescriptions.size(), ChunkBlockAnalyser::EPartialBlockDownloadMode::Off);
+ }
+ else
+ {
+ BlockPartialDownloadModes.reserve(m_BlockDescriptions.size());
+ for (uint32_t BlockIndex = 0; BlockIndex < m_BlockDescriptions.size(); BlockIndex++)
{
- const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
-
- const std::vector<uint32_t> BlockChunkIndexNeeded = std::move(AllBlockChunkIndexNeeded[BlockIndex]);
- if (!BlockChunkIndexNeeded.empty())
+ const bool BlockExistInCache = ExistsResult.ExistingBlobs.contains(m_BlockDescriptions[BlockIndex].BlockHash);
+ if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::All)
{
- bool WantsToDoPartialBlockDownload = BlockChunkIndexNeeded.size() < BlockDescription.ChunkRawHashes.size();
- bool CanDoPartialBlockDownload =
- (BlockDescription.HeaderSize > 0) &&
- (BlockDescription.ChunkCompressedLengths.size() == BlockDescription.ChunkRawHashes.size());
-
- bool AllowedToDoPartialRequest = false;
- bool BlockExistInCache = ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash);
- switch (m_Options.PartialBlockRequestMode)
- {
- case EPartialBlockRequestMode::Off:
- break;
- case EPartialBlockRequestMode::ZenCacheOnly:
- AllowedToDoPartialRequest = BlockExistInCache;
- break;
- case EPartialBlockRequestMode::Mixed:
- case EPartialBlockRequestMode::All:
- AllowedToDoPartialRequest = true;
- break;
- default:
- ZEN_ASSERT(false);
- break;
- }
+ BlockPartialDownloadModes.push_back(BlockExistInCache
+ ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed
+ : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange);
+ }
+ else if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::ZenCacheOnly)
+ {
+ BlockPartialDownloadModes.push_back(BlockExistInCache
+ ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed
+ : ChunkBlockAnalyser::EPartialBlockDownloadMode::Off);
+ }
+ else if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Mixed)
+ {
+ BlockPartialDownloadModes.push_back(BlockExistInCache
+ ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed
+ : ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange);
+ }
+ }
+ }
+ ZEN_ASSERT(BlockPartialDownloadModes.size() == m_BlockDescriptions.size());
- const uint32_t ChunkStartOffsetInBlock =
- gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize);
+ ChunkBlockAnalyser::BlockResult PartialBlocks =
+ BlockAnalyser.CalculatePartialBlockDownloads(NeededBlocks, BlockPartialDownloadModes);
- const uint64_t TotalBlockSize = std::accumulate(BlockDescription.ChunkCompressedLengths.begin(),
- BlockDescription.ChunkCompressedLengths.end(),
- std::uint64_t(ChunkStartOffsetInBlock));
+ struct LooseChunkHashWorkData
+ {
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs;
+ uint32_t RemoteChunkIndex = (uint32_t)-1;
+ };
- if (AllowedToDoPartialRequest && WantsToDoPartialBlockDownload && CanDoPartialBlockDownload)
- {
- ZEN_TRACE_CPU("PartialBlockAnalysis");
-
- bool LimitToSingleRange =
- BlockExistInCache ? false : m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Mixed;
- uint64_t TotalWantedChunksSize = 0;
- std::optional<std::vector<BlockRangeDescriptor>> MaybeBlockRanges =
- CalculateBlockRanges(BlockIndex,
- BlockDescription,
- BlockChunkIndexNeeded,
- LimitToSingleRange,
- ChunkStartOffsetInBlock,
- TotalBlockSize,
- TotalWantedChunksSize);
- ZEN_ASSERT(TotalWantedChunksSize <= TotalBlockSize);
-
- if (MaybeBlockRanges.has_value())
- {
- const std::vector<BlockRangeDescriptor>& BlockRanges = MaybeBlockRanges.value();
- ZEN_ASSERT(!BlockRanges.empty());
- BlockRangeWorks.insert(BlockRangeWorks.end(), BlockRanges.begin(), BlockRanges.end());
- TotalRequestCount += BlockRanges.size();
- TotalPartWriteCount += BlockRanges.size();
-
- uint64_t RequestedSize = std::accumulate(
- BlockRanges.begin(),
- BlockRanges.end(),
- uint64_t(0),
- [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; });
- PartialBlockIndexes.push_back(BlockIndex);
-
- if (RequestedSize > TotalWantedChunksSize)
- {
- if (m_Options.IsVerbose)
- {
- ZEN_OPERATION_LOG_INFO(
- m_LogOutput,
- "Requesting {} chunks ({}) from block {} ({}) using {} requests (extra bytes {})",
- BlockChunkIndexNeeded.size(),
- NiceBytes(RequestedSize),
- BlockDescription.BlockHash,
- NiceBytes(TotalBlockSize),
- BlockRanges.size(),
- NiceBytes(RequestedSize - TotalWantedChunksSize));
- }
- }
- }
- else
- {
- FullBlockWorks.push_back(BlockIndex);
- TotalRequestCount++;
- TotalPartWriteCount++;
- }
- }
- else
- {
- FullBlockWorks.push_back(BlockIndex);
- TotalRequestCount++;
- TotalPartWriteCount++;
- }
- }
- }
+ TotalRequestCount += NeededLooseChunkIndexes.size();
+ TotalPartWriteCount += NeededLooseChunkIndexes.size();
+ TotalRequestCount += PartialBlocks.BlockRanges.size();
+ TotalPartWriteCount += PartialBlocks.BlockRanges.size();
+ TotalRequestCount += PartialBlocks.FullBlockIndexes.size();
+ TotalPartWriteCount += PartialBlocks.FullBlockIndexes.size();
- if (!PartialBlockIndexes.empty())
- {
- uint64_t TotalFullBlockRequestBytes = 0;
- for (uint32_t BlockIndex : FullBlockWorks)
- {
- const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
- uint32_t CurrentOffset =
- gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize);
+ std::vector<LooseChunkHashWorkData> LooseChunkHashWorks;
+ for (uint32_t LooseChunkIndex : NeededLooseChunkIndexes)
+ {
+ const IoHash& ChunkHash = m_LooseChunkHashes[LooseChunkIndex];
+ auto RemoteChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash);
+ ZEN_ASSERT(RemoteChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end());
+ const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second;
- TotalFullBlockRequestBytes += std::accumulate(BlockDescription.ChunkCompressedLengths.begin(),
- BlockDescription.ChunkCompressedLengths.end(),
- std::uint64_t(CurrentOffset));
- }
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
+ GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex);
- uint64_t TotalPartialBlockBytes = 0;
- for (uint32_t BlockIndex : PartialBlockIndexes)
- {
- const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
- uint32_t CurrentOffset =
- gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize);
+ ZEN_ASSERT(!ChunkTargetPtrs.empty());
+ LooseChunkHashWorks.push_back(
+ LooseChunkHashWorkData{.ChunkTargetPtrs = ChunkTargetPtrs, .RemoteChunkIndex = RemoteChunkIndex});
+ }
- TotalPartialBlockBytes += std::accumulate(BlockDescription.ChunkCompressedLengths.begin(),
- BlockDescription.ChunkCompressedLengths.end(),
- std::uint64_t(CurrentOffset));
- }
+ ZEN_TRACE_CPU("WriteChunks");
- uint64_t NonPartialTotalBlockBytes = TotalFullBlockRequestBytes + TotalPartialBlockBytes;
+ m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::WriteChunks, (uint32_t)TaskSteps::StepCount);
- const uint64_t TotalPartialBlockRequestBytes =
- std::accumulate(BlockRangeWorks.begin(),
- BlockRangeWorks.end(),
- uint64_t(0),
- [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; });
- uint64_t TotalExtraPartialBlocksRequests = BlockRangeWorks.size() - PartialBlockIndexes.size();
+ Stopwatch WriteTimer;
- uint64_t TotalSavedBlocksSize = TotalPartialBlockBytes - TotalPartialBlockRequestBytes;
- double SavedSizePercent = (TotalSavedBlocksSize * 100.0) / NonPartialTotalBlockBytes;
+ FilteredRate FilteredDownloadedBytesPerSecond;
+ FilteredRate FilteredWrittenBytesPerSecond;
- if (!m_Options.IsQuiet)
- {
- ZEN_OPERATION_LOG_INFO(m_LogOutput,
- "Analysis of partial block requests saves download of {} out of {} ({:.1f}%) using {} extra "
- "requests. Completed in {}",
- NiceBytes(TotalSavedBlocksSize),
- NiceBytes(NonPartialTotalBlockBytes),
- SavedSizePercent,
- TotalExtraPartialBlocksRequests,
- NiceTimeSpanMs(ExistsResult.ElapsedTimeMs));
- }
- }
- }
+ std::unique_ptr<OperationLogOutput::ProgressBar> WriteProgressBarPtr(
+ m_LogOutput.CreateProgressBar(m_Options.PrimeCacheOnly ? "Downloading" : "Writing"));
+ OperationLogOutput::ProgressBar& WriteProgressBar(*WriteProgressBarPtr);
+ ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ TotalPartWriteCount += CopyChunkDatas.size();
+ TotalPartWriteCount += ScavengedSequenceCopyOperations.size();
BufferedWriteFileCache WriteCache;
@@ -1472,13 +1324,23 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
});
}
- for (size_t BlockRangeIndex = 0; BlockRangeIndex < BlockRangeWorks.size(); BlockRangeIndex++)
+ for (size_t BlockRangeIndex = 0; BlockRangeIndex < PartialBlocks.BlockRanges.size();)
{
ZEN_ASSERT(!m_Options.PrimeCacheOnly);
if (m_AbortFlag)
{
break;
}
+
+ size_t RangeCount = 1;
+ size_t RangesLeft = PartialBlocks.BlockRanges.size() - BlockRangeIndex;
+ const ChunkBlockAnalyser::BlockRangeDescriptor& CurrentBlockRange = PartialBlocks.BlockRanges[BlockRangeIndex];
+ while (RangeCount < RangesLeft &&
+ CurrentBlockRange.BlockIndex == PartialBlocks.BlockRanges[BlockRangeIndex + RangeCount].BlockIndex)
+ {
+ RangeCount++;
+ }
+
Work.ScheduleWork(
m_NetworkPool,
[this,
@@ -1492,119 +1354,127 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
TotalPartWriteCount,
&FilteredWrittenBytesPerSecond,
&Work,
- &BlockRangeWorks,
- BlockRangeIndex](std::atomic<bool>&) {
+ &PartialBlocks,
+ BlockRangeStartIndex = BlockRangeIndex,
+ RangeCount](std::atomic<bool>&) {
if (!m_AbortFlag)
{
- ZEN_TRACE_CPU("Async_GetPartialBlock");
-
- const BlockRangeDescriptor& BlockRange = BlockRangeWorks[BlockRangeIndex];
+ ZEN_TRACE_CPU("Async_GetPartialBlockRanges");
FilteredDownloadedBytesPerSecond.Start();
- DownloadPartialBlock(
- BlockRange,
- ExistsResult,
- [this,
- &RemoteChunkIndexNeedsCopyFromSourceFlags,
- &SequenceIndexChunksLeftToWriteCounters,
- &WritePartsComplete,
- &WriteCache,
- &Work,
- TotalRequestCount,
- TotalPartWriteCount,
- &FilteredDownloadedBytesPerSecond,
- &FilteredWrittenBytesPerSecond,
- &BlockRange](IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath) {
- if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
-
- if (!m_AbortFlag)
- {
- Work.ScheduleWork(
- m_IOWorkerPool,
- [this,
- &RemoteChunkIndexNeedsCopyFromSourceFlags,
- &SequenceIndexChunksLeftToWriteCounters,
- &WritePartsComplete,
- &WriteCache,
- &Work,
- TotalPartWriteCount,
- &FilteredWrittenBytesPerSecond,
- &BlockRange,
- BlockChunkPath = std::filesystem::path(OnDiskPath),
- BlockPartialBuffer = std::move(InMemoryBuffer)](std::atomic<bool>&) mutable {
- if (!m_AbortFlag)
- {
- ZEN_TRACE_CPU("Async_WritePartialBlock");
+ for (size_t BlockRangeIndex = BlockRangeStartIndex; BlockRangeIndex < BlockRangeStartIndex + RangeCount;
+ BlockRangeIndex++)
+ {
+ ZEN_TRACE_CPU("GetPartialBlock");
- const uint32_t BlockIndex = BlockRange.BlockIndex;
+ const ChunkBlockAnalyser::BlockRangeDescriptor& BlockRange = PartialBlocks.BlockRanges[BlockRangeIndex];
- const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
+ DownloadPartialBlock(
+ BlockRange,
+ ExistsResult,
+ [this,
+ &RemoteChunkIndexNeedsCopyFromSourceFlags,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &WritePartsComplete,
+ &WriteCache,
+ &Work,
+ TotalRequestCount,
+ TotalPartWriteCount,
+ &FilteredDownloadedBytesPerSecond,
+ &FilteredWrittenBytesPerSecond,
+ &BlockRange](IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath) {
+ if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
- if (BlockChunkPath.empty())
- {
- ZEN_ASSERT(BlockPartialBuffer);
- }
- else
+ if (!m_AbortFlag)
+ {
+ Work.ScheduleWork(
+ m_IOWorkerPool,
+ [this,
+ &RemoteChunkIndexNeedsCopyFromSourceFlags,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &WritePartsComplete,
+ &WriteCache,
+ &Work,
+ TotalPartWriteCount,
+ &FilteredWrittenBytesPerSecond,
+ &BlockRange,
+ BlockChunkPath = std::filesystem::path(OnDiskPath),
+ BlockPartialBuffer = std::move(InMemoryBuffer)](std::atomic<bool>&) mutable {
+ if (!m_AbortFlag)
{
- ZEN_ASSERT(!BlockPartialBuffer);
- BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
- if (!BlockPartialBuffer)
+ ZEN_TRACE_CPU("Async_WritePartialBlock");
+
+ const uint32_t BlockIndex = BlockRange.BlockIndex;
+
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
+
+ if (BlockChunkPath.empty())
{
- throw std::runtime_error(
- fmt::format("Could not open downloaded block {} from {}",
- BlockDescription.BlockHash,
- BlockChunkPath));
+ ZEN_ASSERT(BlockPartialBuffer);
+ }
+ else
+ {
+ ZEN_ASSERT(!BlockPartialBuffer);
+ BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
+ if (!BlockPartialBuffer)
+ {
+ throw std::runtime_error(
+ fmt::format("Could not open downloaded block {} from {}",
+ BlockDescription.BlockHash,
+ BlockChunkPath));
+ }
}
- }
-
- FilteredWrittenBytesPerSecond.Start();
- if (!WritePartialBlockChunksToCache(
- BlockDescription,
- SequenceIndexChunksLeftToWriteCounters,
- Work,
- CompositeBuffer(std::move(BlockPartialBuffer)),
- BlockRange.ChunkBlockIndexStart,
- BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1,
- RemoteChunkIndexNeedsCopyFromSourceFlags,
- WriteCache))
- {
- std::error_code DummyEc;
- RemoveFile(BlockChunkPath, DummyEc);
- throw std::runtime_error(
- fmt::format("Partial block {} is malformed", BlockDescription.BlockHash));
- }
+ FilteredWrittenBytesPerSecond.Start();
+
+ if (!WritePartialBlockChunksToCache(
+ BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ CompositeBuffer(std::move(BlockPartialBuffer)),
+ BlockRange.ChunkBlockIndexStart,
+ BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ WriteCache))
+ {
+ std::error_code DummyEc;
+ RemoveFile(BlockChunkPath, DummyEc);
+ throw std::runtime_error(
+ fmt::format("Partial block {} is malformed", BlockDescription.BlockHash));
+ }
- std::error_code Ec = TryRemoveFile(BlockChunkPath);
- if (Ec)
- {
- ZEN_OPERATION_LOG_DEBUG(m_LogOutput,
- "Failed removing file '{}', reason: ({}) {}",
- BlockChunkPath,
- Ec.value(),
- Ec.message());
- }
+ std::error_code Ec = TryRemoveFile(BlockChunkPath);
+ if (Ec)
+ {
+ ZEN_OPERATION_LOG_DEBUG(m_LogOutput,
+ "Failed removing file '{}', reason: ({}) {}",
+ BlockChunkPath,
+ Ec.value(),
+ Ec.message());
+ }
- WritePartsComplete++;
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
}
- }
- },
- OnDiskPath.empty() ? WorkerThreadPool::EMode::DisableBacklog
- : WorkerThreadPool::EMode::EnableBacklog);
- }
- });
+ },
+ OnDiskPath.empty() ? WorkerThreadPool::EMode::DisableBacklog
+ : WorkerThreadPool::EMode::EnableBacklog);
+ }
+ });
+ }
}
});
+ BlockRangeIndex += RangeCount;
}
- for (uint32_t BlockIndex : FullBlockWorks)
+ for (uint32_t BlockIndex : PartialBlocks.FullBlockIndexes)
{
if (m_AbortFlag)
{
@@ -3289,271 +3159,9 @@ BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkInde
}
}
-BuildsOperationUpdateFolder::BlockRangeDescriptor
-BuildsOperationUpdateFolder::MergeBlockRanges(std::span<const BlockRangeDescriptor> Ranges)
-{
- ZEN_ASSERT(Ranges.size() > 1);
- const BlockRangeDescriptor& First = Ranges.front();
- const BlockRangeDescriptor& Last = Ranges.back();
-
- return BlockRangeDescriptor{.BlockIndex = First.BlockIndex,
- .RangeStart = First.RangeStart,
- .RangeLength = Last.RangeStart + Last.RangeLength - First.RangeStart,
- .ChunkBlockIndexStart = First.ChunkBlockIndexStart,
- .ChunkBlockIndexCount = Last.ChunkBlockIndexStart + Last.ChunkBlockIndexCount - First.ChunkBlockIndexStart};
-}
-
-std::optional<std::vector<BuildsOperationUpdateFolder::BlockRangeDescriptor>>
-BuildsOperationUpdateFolder::MakeOptionalBlockRangeVector(uint64_t TotalBlockSize, const BlockRangeDescriptor& Range)
-{
- if (Range.RangeLength == TotalBlockSize)
- {
- return {};
- }
- else
- {
- return std::vector<BlockRangeDescriptor>{Range};
- }
-};
-
-const BuildsOperationUpdateFolder::BlockRangeLimit*
-BuildsOperationUpdateFolder::GetBlockRangeLimitForRange(std::span<const BlockRangeLimit> Limits,
- uint64_t TotalBlockSize,
- std::span<const BlockRangeDescriptor> Ranges)
-{
- if (Ranges.size() > 1)
- {
- const std::uint64_t WantedSize =
- std::accumulate(Ranges.begin(), Ranges.end(), uint64_t(0), [](uint64_t Current, const BlockRangeDescriptor& Range) {
- return Current + Range.RangeLength;
- });
-
- const double RangeRequestedPercent = (WantedSize * 100.0) / TotalBlockSize;
-
- for (const BlockRangeLimit& Limit : Limits)
- {
- if (RangeRequestedPercent >= Limit.SizePercent && Ranges.size() > Limit.MaxRangeCount)
- {
- return &Limit;
- }
- }
- }
- return nullptr;
-};
-
-std::vector<BuildsOperationUpdateFolder::BlockRangeDescriptor>
-BuildsOperationUpdateFolder::CollapseBlockRanges(const uint64_t AlwaysAcceptableGap, std::span<const BlockRangeDescriptor> BlockRanges)
-{
- ZEN_ASSERT(BlockRanges.size() > 1);
- std::vector<BlockRangeDescriptor> CollapsedBlockRanges;
-
- auto BlockRangesIt = BlockRanges.begin();
- CollapsedBlockRanges.push_back(*BlockRangesIt++);
- for (; BlockRangesIt != BlockRanges.end(); BlockRangesIt++)
- {
- BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back();
-
- const uint64_t BothRangeSize = BlockRangesIt->RangeLength + LastRange.RangeLength;
-
- const uint64_t Gap = BlockRangesIt->RangeStart - (LastRange.RangeStart + LastRange.RangeLength);
- if (Gap <= Max(BothRangeSize / 16, AlwaysAcceptableGap))
- {
- LastRange.ChunkBlockIndexCount =
- (BlockRangesIt->ChunkBlockIndexStart + BlockRangesIt->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart;
- LastRange.RangeLength = (BlockRangesIt->RangeStart + BlockRangesIt->RangeLength) - LastRange.RangeStart;
- }
- else
- {
- CollapsedBlockRanges.push_back(*BlockRangesIt);
- }
- }
-
- return CollapsedBlockRanges;
-};
-
-uint64_t
-BuildsOperationUpdateFolder::CalculateNextGap(std::span<const BlockRangeDescriptor> BlockRanges)
-{
- ZEN_ASSERT(BlockRanges.size() > 1);
- uint64_t AcceptableGap = (uint64_t)-1;
- for (size_t RangeIndex = 0; RangeIndex < BlockRanges.size() - 1; RangeIndex++)
- {
- const BlockRangeDescriptor& Range = BlockRanges[RangeIndex];
- const BlockRangeDescriptor& NextRange = BlockRanges[RangeIndex + 1];
-
- const uint64_t Gap = NextRange.RangeStart - (Range.RangeStart + Range.RangeLength);
- AcceptableGap = Min(Gap, AcceptableGap);
- }
- AcceptableGap = RoundUp(AcceptableGap, 16u * 1024u);
- return AcceptableGap;
-};
-
-std::optional<std::vector<BuildsOperationUpdateFolder::BlockRangeDescriptor>>
-BuildsOperationUpdateFolder::CalculateBlockRanges(uint32_t BlockIndex,
- const ChunkBlockDescription& BlockDescription,
- std::span<const uint32_t> BlockChunkIndexNeeded,
- bool LimitToSingleRange,
- const uint64_t ChunkStartOffsetInBlock,
- const uint64_t TotalBlockSize,
- uint64_t& OutTotalWantedChunksSize)
-{
- ZEN_TRACE_CPU("CalculateBlockRanges");
-
- std::vector<BlockRangeDescriptor> BlockRanges;
- {
- uint64_t CurrentOffset = ChunkStartOffsetInBlock;
- uint32_t ChunkBlockIndex = 0;
- uint32_t NeedBlockChunkIndexOffset = 0;
- BlockRangeDescriptor NextRange{.BlockIndex = BlockIndex};
- while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() && ChunkBlockIndex < BlockDescription.ChunkRawHashes.size())
- {
- const uint32_t ChunkCompressedLength = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex];
- if (ChunkBlockIndex < BlockChunkIndexNeeded[NeedBlockChunkIndexOffset])
- {
- if (NextRange.RangeLength > 0)
- {
- BlockRanges.push_back(NextRange);
- NextRange = {.BlockIndex = BlockIndex};
- }
- ChunkBlockIndex++;
- CurrentOffset += ChunkCompressedLength;
- }
- else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset])
- {
- if (NextRange.RangeLength == 0)
- {
- NextRange.RangeStart = CurrentOffset;
- NextRange.ChunkBlockIndexStart = ChunkBlockIndex;
- }
- NextRange.RangeLength += ChunkCompressedLength;
- NextRange.ChunkBlockIndexCount++;
- ChunkBlockIndex++;
- CurrentOffset += ChunkCompressedLength;
- NeedBlockChunkIndexOffset++;
- }
- else
- {
- ZEN_ASSERT(false);
- }
- }
- if (NextRange.RangeLength > 0)
- {
- BlockRanges.push_back(NextRange);
- }
- }
- ZEN_ASSERT(!BlockRanges.empty());
-
- OutTotalWantedChunksSize =
- std::accumulate(BlockRanges.begin(), BlockRanges.end(), uint64_t(0), [](uint64_t Current, const BlockRangeDescriptor& Range) {
- return Current + Range.RangeLength;
- });
-
- double RangeWantedPercent = (OutTotalWantedChunksSize * 100.0) / TotalBlockSize;
-
- if (BlockRanges.size() == 1)
- {
- if (m_Options.IsVerbose)
- {
- ZEN_OPERATION_LOG_INFO(m_LogOutput,
- "Range request of {} ({:.2f}%) using single range from block {} ({}) as is",
- NiceBytes(OutTotalWantedChunksSize),
- RangeWantedPercent,
- BlockDescription.BlockHash,
- NiceBytes(TotalBlockSize));
- }
- return BlockRanges;
- }
-
- if (LimitToSingleRange)
- {
- const BlockRangeDescriptor MergedRange = MergeBlockRanges(BlockRanges);
- if (m_Options.IsVerbose)
- {
- const double RangeRequestedPercent = (MergedRange.RangeLength * 100.0) / TotalBlockSize;
- const double WastedPercent = ((MergedRange.RangeLength - OutTotalWantedChunksSize) * 100.0) / MergedRange.RangeLength;
-
- ZEN_OPERATION_LOG_INFO(
- m_LogOutput,
- "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) limited to single block range {} ({:.2f}%) wasting "
- "{:.2f}% ({})",
- NiceBytes(OutTotalWantedChunksSize),
- RangeWantedPercent,
- BlockRanges.size(),
- BlockDescription.BlockHash,
- NiceBytes(TotalBlockSize),
- NiceBytes(MergedRange.RangeLength),
- RangeRequestedPercent,
- WastedPercent,
- NiceBytes(MergedRange.RangeLength - OutTotalWantedChunksSize));
- }
- return MakeOptionalBlockRangeVector(TotalBlockSize, MergedRange);
- }
-
- if (RangeWantedPercent > FullBlockRangePercentLimit)
- {
- const BlockRangeDescriptor MergedRange = MergeBlockRanges(BlockRanges);
- if (m_Options.IsVerbose)
- {
- const double RangeRequestedPercent = (MergedRange.RangeLength * 100.0) / TotalBlockSize;
- const double WastedPercent = ((MergedRange.RangeLength - OutTotalWantedChunksSize) * 100.0) / MergedRange.RangeLength;
-
- ZEN_OPERATION_LOG_INFO(
- m_LogOutput,
- "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) exceeds {}%. Merged to single block range {} "
- "({:.2f}%) wasting {:.2f}% ({})",
- NiceBytes(OutTotalWantedChunksSize),
- RangeWantedPercent,
- BlockRanges.size(),
- BlockDescription.BlockHash,
- NiceBytes(TotalBlockSize),
- FullBlockRangePercentLimit,
- NiceBytes(MergedRange.RangeLength),
- RangeRequestedPercent,
- WastedPercent,
- NiceBytes(MergedRange.RangeLength - OutTotalWantedChunksSize));
- }
- return MakeOptionalBlockRangeVector(TotalBlockSize, MergedRange);
- }
-
- std::vector<BlockRangeDescriptor> CollapsedBlockRanges = CollapseBlockRanges(16u * 1024u, BlockRanges);
- while (GetBlockRangeLimitForRange(ForceMergeLimits, TotalBlockSize, CollapsedBlockRanges))
- {
- CollapsedBlockRanges = CollapseBlockRanges(CalculateNextGap(CollapsedBlockRanges), CollapsedBlockRanges);
- }
-
- const std::uint64_t WantedCollapsedSize =
- std::accumulate(CollapsedBlockRanges.begin(),
- CollapsedBlockRanges.end(),
- uint64_t(0),
- [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; });
-
- const double CollapsedRangeRequestedPercent = (WantedCollapsedSize * 100.0) / TotalBlockSize;
-
- if (m_Options.IsVerbose)
- {
- const double WastedPercent = ((WantedCollapsedSize - OutTotalWantedChunksSize) * 100.0) / WantedCollapsedSize;
-
- ZEN_OPERATION_LOG_INFO(
- m_LogOutput,
- "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) collapsed to {} {:.2f}% using {} ranges wasting {:.2f}% "
- "({})",
- NiceBytes(OutTotalWantedChunksSize),
- RangeWantedPercent,
- BlockRanges.size(),
- BlockDescription.BlockHash,
- NiceBytes(TotalBlockSize),
- NiceBytes(WantedCollapsedSize),
- CollapsedRangeRequestedPercent,
- CollapsedBlockRanges.size(),
- WastedPercent,
- NiceBytes(WantedCollapsedSize - OutTotalWantedChunksSize));
- }
- return CollapsedBlockRanges;
-}
-
void
BuildsOperationUpdateFolder::DownloadPartialBlock(
- const BlockRangeDescriptor BlockRange,
+ const ChunkBlockAnalyser::BlockRangeDescriptor BlockRange,
const BlobsExistsResult& ExistsResult,
std::function<void(IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath)>&& OnDownloaded)
{