aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-10-16 09:55:35 +0200
committerGitHub Enterprise <[email protected]>2025-10-16 09:55:35 +0200
commitf426eab5d5a361bb4661b814ceb7ed97113130cd (patch)
treed273892e7d86871eabe5c85fac6f0f7b20a6bf70
parentmove builds state functions to buildsavedstate.h/cpp (#577) (diff)
downloadzen-f426eab5d5a361bb4661b814ceb7ed97113130cd.tar.xz
zen-f426eab5d5a361bb4661b814ceb7ed97113130cd.zip
refactor builds cmd part4 (#579)
* move lambdas to class functions
-rw-r--r--src/zenremotestore/builds/buildstorageoperations.cpp1428
-rw-r--r--src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h116
2 files changed, 804 insertions, 740 deletions
diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp
index 469d29e0d..ef180935e 100644
--- a/src/zenremotestore/builds/buildstorageoperations.cpp
+++ b/src/zenremotestore/builds/buildstorageoperations.cpp
@@ -195,7 +195,8 @@ namespace {
const std::uint64_t PreferredMultipartChunkSize,
ParallelWork& Work,
WorkerThreadPool& NetworkPool,
- DownloadStatistics& DownloadStats,
+ std::atomic<uint64_t>& DownloadedChunkByteCount,
+ std::atomic<uint64_t>& MultipartAttachmentCount,
std::function<void(IoBuffer&& Payload)>&& OnDownloadComplete)
{
ZEN_TRACE_CPU("DownloadLargeBlob");
@@ -217,8 +218,8 @@ namespace {
BuildId,
ChunkHash,
PreferredMultipartChunkSize,
- [&Work, Workload, &DownloadStats](uint64_t Offset, const IoBuffer& Chunk) {
- DownloadStats.DownloadedChunkByteCount += Chunk.GetSize();
+ [&Work, Workload, &DownloadedChunkByteCount](uint64_t Offset, const IoBuffer& Chunk) {
+ DownloadedChunkByteCount += Chunk.GetSize();
if (!Work.IsAborted())
{
@@ -226,8 +227,7 @@ namespace {
Workload->TempFile.Write(Chunk.GetView(), Offset);
}
},
- [&Work, Workload, &DownloadStats, OnDownloadComplete = std::move(OnDownloadComplete)]() {
- DownloadStats.DownloadedChunkCount++;
+ [&Work, Workload, &DownloadedChunkByteCount, OnDownloadComplete = std::move(OnDownloadComplete)]() {
if (!Work.IsAborted())
{
ZEN_TRACE_CPU("Async_DownloadLargeBlob_OnComplete");
@@ -242,7 +242,7 @@ namespace {
});
if (!WorkItems.empty())
{
- DownloadStats.MultipartAttachmentCount++;
+ MultipartAttachmentCount++;
}
for (auto& WorkItem : WorkItems)
{
@@ -344,293 +344,6 @@ namespace {
} // namespace
-struct BlockRangeDescriptor
-{
- uint32_t BlockIndex = (uint32_t)-1;
- uint64_t RangeStart = 0;
- uint64_t RangeLength = 0;
- uint32_t ChunkBlockIndexStart = 0;
- uint32_t ChunkBlockIndexCount = 0;
-};
-
-BlockRangeDescriptor
-MergeBlockRanges(std::span<const BlockRangeDescriptor> Ranges)
-{
- ZEN_ASSERT(Ranges.size() > 1);
- const BlockRangeDescriptor& First = Ranges.front();
- const BlockRangeDescriptor& Last = Ranges.back();
-
- return BlockRangeDescriptor{.BlockIndex = First.BlockIndex,
- .RangeStart = First.RangeStart,
- .RangeLength = Last.RangeStart + Last.RangeLength - First.RangeStart,
- .ChunkBlockIndexStart = First.ChunkBlockIndexStart,
- .ChunkBlockIndexCount = Last.ChunkBlockIndexStart + Last.ChunkBlockIndexCount - First.ChunkBlockIndexStart};
-}
-
-std::optional<std::vector<BlockRangeDescriptor>>
-MakeOptionalBlockRangeVector(uint64_t TotalBlockSize, const BlockRangeDescriptor& Range)
-{
- if (Range.RangeLength == TotalBlockSize)
- {
- return {};
- }
- else
- {
- return std::vector<BlockRangeDescriptor>{Range};
- }
-};
-
-struct BlockRangeLimit
-{
- uint16_t SizePercent;
- uint16_t MaxRangeCount;
-};
-
-static const uint16_t FullBlockRangePercentLimit = 95;
-
-const std::vector<BlockRangeLimit> ForceMergeLimits = {{.SizePercent = FullBlockRangePercentLimit, .MaxRangeCount = 1},
- {.SizePercent = 90, .MaxRangeCount = 2},
- {.SizePercent = 85, .MaxRangeCount = 8},
- {.SizePercent = 80, .MaxRangeCount = 16},
- {.SizePercent = 70, .MaxRangeCount = 32},
- {.SizePercent = 60, .MaxRangeCount = 48},
- {.SizePercent = 2, .MaxRangeCount = 56},
- {.SizePercent = 0, .MaxRangeCount = 64}};
-
-const BlockRangeLimit*
-GetBlockRangeLimitForRange(std::span<const BlockRangeLimit> Limits, uint64_t TotalBlockSize, std::span<const BlockRangeDescriptor> Ranges)
-{
- if (Ranges.size() > 1)
- {
- const std::uint64_t WantedSize =
- std::accumulate(Ranges.begin(), Ranges.end(), uint64_t(0), [](uint64_t Current, const BlockRangeDescriptor& Range) {
- return Current + Range.RangeLength;
- });
-
- const double RangeRequestedPercent = (WantedSize * 100.0) / TotalBlockSize;
-
- for (const BlockRangeLimit& Limit : Limits)
- {
- if (RangeRequestedPercent >= Limit.SizePercent && Ranges.size() > Limit.MaxRangeCount)
- {
- return &Limit;
- }
- }
- }
- return nullptr;
-};
-
-std::vector<BlockRangeDescriptor>
-CollapseBlockRanges(const uint64_t AlwaysAcceptableGap, std::span<const BlockRangeDescriptor> BlockRanges)
-{
- ZEN_ASSERT(BlockRanges.size() > 1);
- std::vector<BlockRangeDescriptor> CollapsedBlockRanges;
-
- auto BlockRangesIt = BlockRanges.begin();
- CollapsedBlockRanges.push_back(*BlockRangesIt++);
- for (; BlockRangesIt != BlockRanges.end(); BlockRangesIt++)
- {
- BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back();
-
- const uint64_t BothRangeSize = BlockRangesIt->RangeLength + LastRange.RangeLength;
-
- const uint64_t Gap = BlockRangesIt->RangeStart - (LastRange.RangeStart + LastRange.RangeLength);
- if (Gap <= Max(BothRangeSize / 16, AlwaysAcceptableGap))
- {
- LastRange.ChunkBlockIndexCount =
- (BlockRangesIt->ChunkBlockIndexStart + BlockRangesIt->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart;
- LastRange.RangeLength = (BlockRangesIt->RangeStart + BlockRangesIt->RangeLength) - LastRange.RangeStart;
- }
- else
- {
- CollapsedBlockRanges.push_back(*BlockRangesIt);
- }
- }
-
- return CollapsedBlockRanges;
-};
-
-uint64_t
-CalculateNextGap(std::span<const BlockRangeDescriptor> BlockRanges)
-{
- ZEN_ASSERT(BlockRanges.size() > 1);
- uint64_t AcceptableGap = (uint64_t)-1;
- for (size_t RangeIndex = 0; RangeIndex < BlockRanges.size() - 1; RangeIndex++)
- {
- const BlockRangeDescriptor& Range = BlockRanges[RangeIndex];
- const BlockRangeDescriptor& NextRange = BlockRanges[RangeIndex + 1];
-
- const uint64_t Gap = NextRange.RangeStart - (Range.RangeStart + Range.RangeLength);
- AcceptableGap = Min(Gap, AcceptableGap);
- }
- AcceptableGap = RoundUp(AcceptableGap, 16u * 1024u);
- return AcceptableGap;
-};
-
-std::optional<std::vector<BlockRangeDescriptor>>
-CalculateBlockRanges(BuildOpLogOutput& LogOutput,
- bool IsVerbose,
- uint32_t BlockIndex,
- const ChunkBlockDescription& BlockDescription,
- std::span<const uint32_t> BlockChunkIndexNeeded,
- bool LimitToSingleRange,
- const uint64_t ChunkStartOffsetInBlock,
- const uint64_t TotalBlockSize,
- uint64_t& OutTotalWantedChunksSize)
-{
- ZEN_TRACE_CPU("CalculateBlockRanges");
-
- std::vector<BlockRangeDescriptor> BlockRanges;
- {
- uint64_t CurrentOffset = ChunkStartOffsetInBlock;
- uint32_t ChunkBlockIndex = 0;
- uint32_t NeedBlockChunkIndexOffset = 0;
- BlockRangeDescriptor NextRange{.BlockIndex = BlockIndex};
- while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() && ChunkBlockIndex < BlockDescription.ChunkRawHashes.size())
- {
- const uint32_t ChunkCompressedLength = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex];
- if (ChunkBlockIndex < BlockChunkIndexNeeded[NeedBlockChunkIndexOffset])
- {
- if (NextRange.RangeLength > 0)
- {
- BlockRanges.push_back(NextRange);
- NextRange = {.BlockIndex = BlockIndex};
- }
- ChunkBlockIndex++;
- CurrentOffset += ChunkCompressedLength;
- }
- else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset])
- {
- if (NextRange.RangeLength == 0)
- {
- NextRange.RangeStart = CurrentOffset;
- NextRange.ChunkBlockIndexStart = ChunkBlockIndex;
- }
- NextRange.RangeLength += ChunkCompressedLength;
- NextRange.ChunkBlockIndexCount++;
- ChunkBlockIndex++;
- CurrentOffset += ChunkCompressedLength;
- NeedBlockChunkIndexOffset++;
- }
- else
- {
- ZEN_ASSERT(false);
- }
- }
- if (NextRange.RangeLength > 0)
- {
- BlockRanges.push_back(NextRange);
- }
- }
- ZEN_ASSERT(!BlockRanges.empty());
-
- OutTotalWantedChunksSize =
- std::accumulate(BlockRanges.begin(), BlockRanges.end(), uint64_t(0), [](uint64_t Current, const BlockRangeDescriptor& Range) {
- return Current + Range.RangeLength;
- });
-
- double RangeWantedPercent = (OutTotalWantedChunksSize * 100.0) / TotalBlockSize;
-
- if (BlockRanges.size() == 1)
- {
- if (IsVerbose)
- {
- LOG_OUTPUT(LogOutput,
- "Range request of {} ({:.2f}%) using single range from block {} ({}) as is",
- NiceBytes(OutTotalWantedChunksSize),
- RangeWantedPercent,
- BlockDescription.BlockHash,
- NiceBytes(TotalBlockSize));
- }
- return BlockRanges;
- }
-
- if (LimitToSingleRange)
- {
- const BlockRangeDescriptor MergedRange = MergeBlockRanges(BlockRanges);
- if (IsVerbose)
- {
- const double RangeRequestedPercent = (MergedRange.RangeLength * 100.0) / TotalBlockSize;
- const double WastedPercent = ((MergedRange.RangeLength - OutTotalWantedChunksSize) * 100.0) / MergedRange.RangeLength;
-
- LOG_OUTPUT(
- LogOutput,
- "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) limited to single block range {} ({:.2f}%) wasting "
- "{:.2f}% ({})",
- NiceBytes(OutTotalWantedChunksSize),
- RangeWantedPercent,
- BlockRanges.size(),
- BlockDescription.BlockHash,
- NiceBytes(TotalBlockSize),
- NiceBytes(MergedRange.RangeLength),
- RangeRequestedPercent,
- WastedPercent,
- NiceBytes(MergedRange.RangeLength - OutTotalWantedChunksSize));
- }
- return MakeOptionalBlockRangeVector(TotalBlockSize, MergedRange);
- }
-
- if (RangeWantedPercent > FullBlockRangePercentLimit)
- {
- const BlockRangeDescriptor MergedRange = MergeBlockRanges(BlockRanges);
- if (IsVerbose)
- {
- const double RangeRequestedPercent = (MergedRange.RangeLength * 100.0) / TotalBlockSize;
- const double WastedPercent = ((MergedRange.RangeLength - OutTotalWantedChunksSize) * 100.0) / MergedRange.RangeLength;
-
- LOG_OUTPUT(LogOutput,
- "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) exceeds {}%. Merged to single block range {} "
- "({:.2f}%) wasting {:.2f}% ({})",
- NiceBytes(OutTotalWantedChunksSize),
- RangeWantedPercent,
- BlockRanges.size(),
- BlockDescription.BlockHash,
- NiceBytes(TotalBlockSize),
- FullBlockRangePercentLimit,
- NiceBytes(MergedRange.RangeLength),
- RangeRequestedPercent,
- WastedPercent,
- NiceBytes(MergedRange.RangeLength - OutTotalWantedChunksSize));
- }
- return MakeOptionalBlockRangeVector(TotalBlockSize, MergedRange);
- }
-
- std::vector<BlockRangeDescriptor> CollapsedBlockRanges = CollapseBlockRanges(16u * 1024u, BlockRanges);
- while (GetBlockRangeLimitForRange(ForceMergeLimits, TotalBlockSize, CollapsedBlockRanges))
- {
- CollapsedBlockRanges = CollapseBlockRanges(CalculateNextGap(CollapsedBlockRanges), CollapsedBlockRanges);
- }
-
- const std::uint64_t WantedCollapsedSize =
- std::accumulate(CollapsedBlockRanges.begin(),
- CollapsedBlockRanges.end(),
- uint64_t(0),
- [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; });
-
- const double CollapsedRangeRequestedPercent = (WantedCollapsedSize * 100.0) / TotalBlockSize;
-
- if (IsVerbose)
- {
- const double WastedPercent = ((WantedCollapsedSize - OutTotalWantedChunksSize) * 100.0) / WantedCollapsedSize;
-
- LOG_OUTPUT(
- LogOutput,
- "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) collapsed to {} {:.2f}% using {} ranges wasting {:.2f}% "
- "({})",
- NiceBytes(OutTotalWantedChunksSize),
- RangeWantedPercent,
- BlockRanges.size(),
- BlockDescription.BlockHash,
- NiceBytes(TotalBlockSize),
- NiceBytes(WantedCollapsedSize),
- CollapsedRangeRequestedPercent,
- CollapsedBlockRanges.size(),
- WastedPercent,
- NiceBytes(WantedCollapsedSize - OutTotalWantedChunksSize));
- }
- return CollapsedBlockRanges;
-}
-
bool
IsSingleFileChunk(const ChunkedFolderContent& RemoteContent,
const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> Locations)
@@ -1517,12 +1230,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
AllBlockChunkIndexNeeded.emplace_back(std::move(BlockChunkIndexNeeded));
}
- struct BlobsExistsResult
- {
- tsl::robin_set<IoHash> ExistingBlobs;
- uint64_t ElapsedTimeMs = 0;
- };
-
BlobsExistsResult ExistsResult;
if (m_Storage.BuildCacheStorage)
@@ -1622,9 +1329,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
bool LimitToSingleRange =
BlockExistInCache ? false : m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Mixed;
uint64_t TotalWantedChunksSize = 0;
- std::optional<std::vector<BlockRangeDescriptor>> MaybeBlockRanges = CalculateBlockRanges(m_LogOutput,
- m_Options.IsQuiet,
- BlockIndex,
+ std::optional<std::vector<BlockRangeDescriptor>> MaybeBlockRanges = CalculateBlockRanges(BlockIndex,
BlockDescription,
BlockChunkIndexNeeded,
LimitToSingleRange,
@@ -1780,274 +1485,47 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
break;
}
- LooseChunkHashWorkData& LooseChunkHashWork = LooseChunkHashWorks[LooseChunkHashWorkIndex];
-
- std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = std::move(LooseChunkHashWork.ChunkTargetPtrs);
- const uint32_t RemoteChunkIndex = LooseChunkHashWork.RemoteChunkIndex;
-
- if (m_Options.PrimeCacheOnly &&
- ExistsResult.ExistingBlobs.contains(m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]))
+ if (m_Options.PrimeCacheOnly)
{
- m_DownloadStats.RequestsCompleteCount++;
- continue;
+ const uint32_t RemoteChunkIndex = LooseChunkHashWorks[LooseChunkHashWorkIndex].RemoteChunkIndex;
+ if (ExistsResult.ExistingBlobs.contains(m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]))
+ {
+ m_DownloadStats.RequestsCompleteCount++;
+ continue;
+ }
}
- Work.ScheduleWork(
- m_IOWorkerPool,
- [this,
- &SequenceIndexChunksLeftToWriteCounters,
- &Work,
- &ExistsResult,
- &WritePartsComplete,
- RemoteChunkIndex,
- ChunkTargetPtrs,
- TotalRequestCount,
- TotalPartWriteCount,
- &WriteCache,
- &FilteredDownloadedBytesPerSecond,
- &FilteredWrittenBytesPerSecond](std::atomic<bool>&) mutable {
- if (!m_AbortFlag)
- {
- ZEN_TRACE_CPU("Async_ReadPreDownloadedChunk");
-
- std::filesystem::path ExistingCompressedChunkPath;
- if (!m_Options.PrimeCacheOnly)
- {
- const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
- ExistingCompressedChunkPath = FindDownloadedChunk(ChunkHash);
- if (!ExistingCompressedChunkPath.empty())
- {
- m_DownloadStats.RequestsCompleteCount++;
- if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
- }
- }
- if (!m_AbortFlag)
-
- {
- if (!ExistingCompressedChunkPath.empty())
- {
- Work.ScheduleWork(
- m_IOWorkerPool,
- [this,
- &SequenceIndexChunksLeftToWriteCounters,
- &WriteCache,
- &Work,
- &WritePartsComplete,
- TotalPartWriteCount,
- &FilteredWrittenBytesPerSecond,
- RemoteChunkIndex,
- ChunkTargetPtrs,
- CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>& AbortFlag) mutable {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("Async_WritePreDownloadedChunk");
-
- FilteredWrittenBytesPerSecond.Start();
-
- const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
-
- IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath);
- if (!CompressedPart)
- {
- throw std::runtime_error(fmt::format("Could not open dowloaded compressed chunk {} from {}",
- ChunkHash,
- CompressedChunkPath));
- }
-
- bool NeedHashVerify = WriteCompressedChunkToCache(ChunkHash,
- ChunkTargetPtrs,
- WriteCache,
- std::move(CompressedPart));
- WritePartsComplete++;
-
- if (!AbortFlag)
- {
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
- }
-
- std::error_code Ec = TryRemoveFile(CompressedChunkPath);
- if (Ec)
- {
- LOG_OUTPUT_DEBUG(m_LogOutput,
- "Failed removing file '{}', reason: ({}) {}",
- CompressedChunkPath,
- Ec.value(),
- Ec.message());
- }
-
- std::vector<uint32_t> CompletedSequences =
- CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters);
- WriteCache.Close(CompletedSequences);
- if (NeedHashVerify)
- {
- VerifyAndCompleteChunkSequencesAsync(CompletedSequences, Work);
- }
- else
- {
- FinalizeChunkSequences(CompletedSequences);
- }
- }
- }
- });
- }
- else
- {
- Work.ScheduleWork(
- m_NetworkPool,
- [this,
- &ExistsResult,
- &SequenceIndexChunksLeftToWriteCounters,
- &WriteCache,
- &Work,
- &WritePartsComplete,
- TotalPartWriteCount,
- TotalRequestCount,
- &FilteredDownloadedBytesPerSecond,
- &FilteredWrittenBytesPerSecond,
- RemoteChunkIndex,
- ChunkTargetPtrs](std::atomic<bool>&) mutable {
- if (!m_AbortFlag)
- {
- ZEN_TRACE_CPU("Async_DownloadChunk");
-
- const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
- FilteredDownloadedBytesPerSecond.Start();
- IoBuffer BuildBlob;
- const bool ExistsInCache =
- m_Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash);
- if (ExistsInCache)
- {
- BuildBlob = m_Storage.BuildCacheStorage->GetBuildBlob(m_BuildId, ChunkHash);
- }
- if (BuildBlob)
- {
- uint64_t BlobSize = BuildBlob.GetSize();
- m_DownloadStats.DownloadedChunkCount++;
- m_DownloadStats.DownloadedChunkByteCount += BlobSize;
- m_DownloadStats.RequestsCompleteCount++;
- if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
- AsyncWriteDownloadedChunk(m_Options.ZenFolderPath,
- RemoteChunkIndex,
- std::move(ChunkTargetPtrs),
- WriteCache,
- Work,
- std::move(BuildBlob),
- SequenceIndexChunksLeftToWriteCounters,
- WritePartsComplete,
- TotalPartWriteCount,
- FilteredWrittenBytesPerSecond);
- }
- else
- {
- if (m_RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >=
- m_Options.LargeAttachmentSize)
- {
- DownloadLargeBlob(*m_Storage.BuildStorage,
- m_TempDownloadFolderPath,
- m_BuildId,
- ChunkHash,
- m_Options.PreferredMultipartChunkSize,
- Work,
- m_NetworkPool,
- m_DownloadStats,
- [this,
- &SequenceIndexChunksLeftToWriteCounters,
- &WriteCache,
- &Work,
- ChunkHash,
- TotalPartWriteCount,
- TotalRequestCount,
- &WritePartsComplete,
- &FilteredWrittenBytesPerSecond,
- &FilteredDownloadedBytesPerSecond,
- RemoteChunkIndex,
- ChunkTargetPtrs](IoBuffer&& Payload) mutable {
- m_DownloadStats.RequestsCompleteCount++;
- if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
- if (Payload && m_Storage.BuildCacheStorage)
- {
- m_Storage.BuildCacheStorage->PutBuildBlob(
- m_BuildId,
- ChunkHash,
- ZenContentType::kCompressedBinary,
- CompositeBuffer(SharedBuffer(Payload)));
- }
- if (!m_Options.PrimeCacheOnly)
- {
- if (!m_AbortFlag)
- {
- AsyncWriteDownloadedChunk(
- m_Options.ZenFolderPath,
- RemoteChunkIndex,
- std::move(ChunkTargetPtrs),
- WriteCache,
- Work,
- std::move(Payload),
- SequenceIndexChunksLeftToWriteCounters,
- WritePartsComplete,
- TotalPartWriteCount,
- FilteredWrittenBytesPerSecond);
- }
- }
- });
- }
- else
- {
- BuildBlob = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, ChunkHash);
- if (BuildBlob && m_Storage.BuildCacheStorage)
- {
- m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId,
- ChunkHash,
- ZenContentType::kCompressedBinary,
- CompositeBuffer(SharedBuffer(BuildBlob)));
- }
- if (!BuildBlob)
- {
- throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash));
- }
- if (!m_Options.PrimeCacheOnly)
- {
- if (!m_AbortFlag)
- {
- uint64_t BlobSize = BuildBlob.GetSize();
- m_DownloadStats.DownloadedChunkCount++;
- m_DownloadStats.DownloadedChunkByteCount += BlobSize;
- m_DownloadStats.RequestsCompleteCount++;
- if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
- AsyncWriteDownloadedChunk(m_Options.ZenFolderPath,
- RemoteChunkIndex,
- std::move(ChunkTargetPtrs),
- WriteCache,
- Work,
- std::move(BuildBlob),
- SequenceIndexChunksLeftToWriteCounters,
- WritePartsComplete,
- TotalPartWriteCount,
- FilteredWrittenBytesPerSecond);
- }
- }
- }
- }
- }
- });
- }
- }
- }
- });
+ Work.ScheduleWork(m_IOWorkerPool,
+ [this,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &Work,
+ &ExistsResult,
+ &WritePartsComplete,
+ &LooseChunkHashWorks,
+ LooseChunkHashWorkIndex,
+ TotalRequestCount,
+ TotalPartWriteCount,
+ &WriteCache,
+ &FilteredDownloadedBytesPerSecond,
+ &FilteredWrittenBytesPerSecond](std::atomic<bool>&) mutable {
+ ZEN_TRACE_CPU("Async_ReadPreDownloadedChunk");
+ if (!m_AbortFlag)
+ {
+ LooseChunkHashWorkData& LooseChunkHashWork = LooseChunkHashWorks[LooseChunkHashWorkIndex];
+ const uint32_t RemoteChunkIndex = LooseChunkHashWorks[LooseChunkHashWorkIndex].RemoteChunkIndex;
+ WriteLooseChunk(RemoteChunkIndex,
+ ExistsResult,
+ SequenceIndexChunksLeftToWriteCounters,
+ WritePartsComplete,
+ std::move(LooseChunkHashWork.ChunkTargetPtrs),
+ WriteCache,
+ Work,
+ TotalRequestCount,
+ TotalPartWriteCount,
+ FilteredDownloadedBytesPerSecond,
+ FilteredWrittenBytesPerSecond);
+ }
+ });
}
std::unique_ptr<CloneQueryInterface> CloneQuery;
@@ -2191,9 +1669,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
{
break;
}
- const BlockRangeDescriptor BlockRange = BlockRangeWorks[BlockRangeIndex];
- ZEN_ASSERT(BlockRange.BlockIndex != (uint32_t)-1);
- const uint32_t BlockIndex = BlockRange.BlockIndex;
Work.ScheduleWork(
m_NetworkPool,
[this,
@@ -2207,168 +1682,109 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
TotalPartWriteCount,
&FilteredWrittenBytesPerSecond,
&Work,
- BlockIndex,
- BlockRange](std::atomic<bool>&) {
+ &BlockRangeWorks,
+ BlockRangeIndex](std::atomic<bool>&) {
if (!m_AbortFlag)
{
ZEN_TRACE_CPU("Async_GetPartialBlock");
- const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
-
- FilteredDownloadedBytesPerSecond.Start();
- IoBuffer BlockBuffer;
- if (m_Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash))
- {
- BlockBuffer = m_Storage.BuildCacheStorage->GetBuildBlob(m_BuildId,
- BlockDescription.BlockHash,
- BlockRange.RangeStart,
- BlockRange.RangeLength);
- }
- if (!BlockBuffer)
- {
- BlockBuffer = m_Storage.BuildStorage->GetBuildBlob(m_BuildId,
- BlockDescription.BlockHash,
- BlockRange.RangeStart,
- BlockRange.RangeLength);
- }
- if (!BlockBuffer)
- {
- throw std::runtime_error(fmt::format("Block {} is missing when fetching range {} -> {}",
- BlockDescription.BlockHash,
- BlockRange.RangeStart,
- BlockRange.RangeStart + BlockRange.RangeLength));
- }
- if (!m_AbortFlag)
- {
- uint64_t BlockSize = BlockBuffer.GetSize();
- m_DownloadStats.DownloadedBlockCount++;
- m_DownloadStats.DownloadedBlockByteCount += BlockSize;
- m_DownloadStats.RequestsCompleteCount++;
- if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
-
- std::filesystem::path BlockChunkPath;
-
- // Check if the dowloaded block is file based and we can move it directly without rewriting it
- {
- IoBufferFileReference FileRef;
- if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) &&
- (FileRef.FileChunkSize == BlockSize))
+ const BlockRangeDescriptor& BlockRange = BlockRangeWorks[BlockRangeIndex];
+
+ DownloadPartialBlock(
+ BlockRange,
+ ExistsResult,
+ [this,
+ &RemoteChunkIndexNeedsCopyFromSourceFlags,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &WritePartsComplete,
+ &WriteCache,
+ &Work,
+ TotalRequestCount,
+ TotalPartWriteCount,
+ &FilteredDownloadedBytesPerSecond,
+ &FilteredWrittenBytesPerSecond,
+ &BlockRange](IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath) {
+ if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount)
{
- ZEN_TRACE_CPU("MoveTempPartialBlock");
-
- std::error_code Ec;
- std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
- if (!Ec)
- {
- BlockBuffer.SetDeleteOnClose(false);
- BlockBuffer = {};
- BlockChunkPath = m_TempBlockFolderPath / fmt::format("{}_{:x}_{:x}",
- BlockDescription.BlockHash,
- BlockRange.RangeStart,
- BlockRange.RangeLength);
- RenameFile(TempBlobPath, BlockChunkPath, Ec);
- if (Ec)
- {
- BlockChunkPath = std::filesystem::path{};
-
- // Re-open the temp file again
- BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
- BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true);
- BlockBuffer.SetDeleteOnClose(true);
- }
- }
+ FilteredDownloadedBytesPerSecond.Stop();
}
- }
- if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u))
- {
- ZEN_TRACE_CPU("WriteTempPartialBlock");
- // Could not be moved and rather large, lets store it on disk
- BlockChunkPath =
- m_TempBlockFolderPath /
- fmt::format("{}_{:x}_{:x}", BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength);
- TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
- BlockBuffer = {};
- }
+ if (!m_AbortFlag)
+ {
+ Work.ScheduleWork(
+ m_IOWorkerPool,
+ [this,
+ &RemoteChunkIndexNeedsCopyFromSourceFlags,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &WritePartsComplete,
+ &WriteCache,
+ &Work,
+ TotalPartWriteCount,
+ &FilteredWrittenBytesPerSecond,
+ &BlockRange,
+ BlockChunkPath = std::move(OnDiskPath),
+ BlockPartialBuffer = std::move(InMemoryBuffer)](std::atomic<bool>&) mutable {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_WritePartialBlock");
- if (!m_AbortFlag)
- {
- Work.ScheduleWork(
- m_IOWorkerPool,
- [this,
- &RemoteChunkIndexNeedsCopyFromSourceFlags,
- &SequenceIndexChunksLeftToWriteCounters,
- &WritePartsComplete,
- &WriteCache,
- &Work,
- TotalPartWriteCount,
- &FilteredWrittenBytesPerSecond,
- BlockIndex,
- BlockRange,
- BlockChunkPath,
- BlockPartialBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable {
- if (!m_AbortFlag)
- {
- ZEN_TRACE_CPU("Async_WritePartialBlock");
+ const uint32_t BlockIndex = BlockRange.BlockIndex;
- const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
- if (BlockChunkPath.empty())
- {
- ZEN_ASSERT(BlockPartialBuffer);
- }
- else
- {
- ZEN_ASSERT(!BlockPartialBuffer);
- BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
- if (!BlockPartialBuffer)
+ if (BlockChunkPath.empty())
{
- throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}",
- BlockDescription.BlockHash,
- BlockChunkPath));
+ ZEN_ASSERT(BlockPartialBuffer);
+ }
+ else
+ {
+ ZEN_ASSERT(!BlockPartialBuffer);
+ BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
+ if (!BlockPartialBuffer)
+ {
+ throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}",
+ BlockDescription.BlockHash,
+ BlockChunkPath));
+ }
}
- }
- FilteredWrittenBytesPerSecond.Start();
-
- if (!WritePartialBlockChunksToCache(
- BlockDescription,
- SequenceIndexChunksLeftToWriteCounters,
- Work,
- CompositeBuffer(std::move(BlockPartialBuffer)),
- BlockRange.ChunkBlockIndexStart,
- BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1,
- RemoteChunkIndexNeedsCopyFromSourceFlags,
- WriteCache))
- {
- std::error_code DummyEc;
- RemoveFile(BlockChunkPath, DummyEc);
- throw std::runtime_error(
- fmt::format("Partial block {} is malformed", BlockDescription.BlockHash));
- }
+ FilteredWrittenBytesPerSecond.Start();
- std::error_code Ec = TryRemoveFile(BlockChunkPath);
- if (Ec)
- {
- LOG_OUTPUT_DEBUG(m_LogOutput,
- "Failed removing file '{}', reason: ({}) {}",
- BlockChunkPath,
- Ec.value(),
- Ec.message());
- }
+ if (!WritePartialBlockChunksToCache(
+ BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ CompositeBuffer(std::move(BlockPartialBuffer)),
+ BlockRange.ChunkBlockIndexStart,
+ BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ WriteCache))
+ {
+ std::error_code DummyEc;
+ RemoveFile(BlockChunkPath, DummyEc);
+ throw std::runtime_error(
+ fmt::format("Partial block {} is malformed", BlockDescription.BlockHash));
+ }
- WritePartsComplete++;
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
+ std::error_code Ec = TryRemoveFile(BlockChunkPath);
+ if (Ec)
+ {
+ LOG_OUTPUT_DEBUG(m_LogOutput,
+ "Failed removing file '{}', reason: ({}) {}",
+ BlockChunkPath,
+ Ec.value(),
+ Ec.message());
+ }
+
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
}
- }
- });
- }
- }
+ });
+ }
+ });
}
});
}
@@ -3716,6 +3132,580 @@ BuildsOperationUpdateFolder::WriteScavengedSequenceToCache(const std::filesystem
RenameFile(TempFilePath, CacheFilePath);
}
+void
+BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkIndex,
+ const BlobsExistsResult& ExistsResult,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ std::atomic<uint64_t>& WritePartsComplete,
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>&& ChunkTargetPtrs,
+ BufferedWriteFileCache& WriteCache,
+ ParallelWork& Work,
+ uint64_t TotalRequestCount,
+ uint64_t TotalPartWriteCount,
+ FilteredRate& FilteredDownloadedBytesPerSecond,
+ FilteredRate& FilteredWrittenBytesPerSecond)
+{
+ std::filesystem::path ExistingCompressedChunkPath;
+ if (!m_Options.PrimeCacheOnly)
+ {
+ const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+ ExistingCompressedChunkPath = FindDownloadedChunk(ChunkHash);
+ if (!ExistingCompressedChunkPath.empty())
+ {
+ m_DownloadStats.RequestsCompleteCount++;
+ if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ }
+ }
+ if (!m_AbortFlag)
+ {
+ if (!ExistingCompressedChunkPath.empty())
+ {
+ Work.ScheduleWork(
+ m_IOWorkerPool,
+ [this,
+ SequenceIndexChunksLeftToWriteCounters,
+ &WriteCache,
+ &Work,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ &FilteredWrittenBytesPerSecond,
+ RemoteChunkIndex,
+ ChunkTargetPtrs = std::move(ChunkTargetPtrs),
+ CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>& AbortFlag) mutable {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_WritePreDownloadedChunk");
+
+ FilteredWrittenBytesPerSecond.Start();
+
+ const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+
+ IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath);
+ if (!CompressedPart)
+ {
+ throw std::runtime_error(
+ fmt::format("Could not open dowloaded compressed chunk {} from {}", ChunkHash, CompressedChunkPath));
+ }
+
+ bool NeedHashVerify =
+ WriteCompressedChunkToCache(ChunkHash, ChunkTargetPtrs, WriteCache, std::move(CompressedPart));
+ WritePartsComplete++;
+
+ if (!AbortFlag)
+ {
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
+
+ std::error_code Ec = TryRemoveFile(CompressedChunkPath);
+ if (Ec)
+ {
+ LOG_OUTPUT_DEBUG(m_LogOutput,
+ "Failed removing file '{}', reason: ({}) {}",
+ CompressedChunkPath,
+ Ec.value(),
+ Ec.message());
+ }
+
+ std::vector<uint32_t> CompletedSequences =
+ CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters);
+ WriteCache.Close(CompletedSequences);
+ if (NeedHashVerify)
+ {
+ VerifyAndCompleteChunkSequencesAsync(CompletedSequences, Work);
+ }
+ else
+ {
+ FinalizeChunkSequences(CompletedSequences);
+ }
+ }
+ }
+ });
+ }
+ else
+ {
+ Work.ScheduleWork(m_NetworkPool,
+ [this,
+ &ExistsResult,
+ SequenceIndexChunksLeftToWriteCounters,
+ &WriteCache,
+ &Work,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ TotalRequestCount,
+ &FilteredDownloadedBytesPerSecond,
+ &FilteredWrittenBytesPerSecond,
+ RemoteChunkIndex,
+ ChunkTargetPtrs = std::move(ChunkTargetPtrs)](std::atomic<bool>&) mutable {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_DownloadChunk");
+
+ FilteredDownloadedBytesPerSecond.Start();
+ DownloadBuildBlob(RemoteChunkIndex,
+ ExistsResult,
+ Work,
+ [this,
+ &ExistsResult,
+ SequenceIndexChunksLeftToWriteCounters,
+ &WriteCache,
+ &Work,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ TotalRequestCount,
+ RemoteChunkIndex,
+ &FilteredDownloadedBytesPerSecond,
+ &FilteredWrittenBytesPerSecond,
+ ChunkTargetPtrs = std::move(ChunkTargetPtrs)](IoBuffer&& Payload) mutable {
+ if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ AsyncWriteDownloadedChunk(m_Options.ZenFolderPath,
+ RemoteChunkIndex,
+ std::move(ChunkTargetPtrs),
+ WriteCache,
+ Work,
+ std::move(Payload),
+ SequenceIndexChunksLeftToWriteCounters,
+ WritePartsComplete,
+ TotalPartWriteCount,
+ FilteredWrittenBytesPerSecond);
+ });
+ }
+ });
+ }
+ }
+}
+
+void
+BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkIndex,
+ const BlobsExistsResult& ExistsResult,
+ ParallelWork& Work,
+ std::function<void(IoBuffer&& Payload)>&& OnDownloaded)
+{
+ const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+ // FilteredDownloadedBytesPerSecond.Start();
+ IoBuffer BuildBlob;
+ const bool ExistsInCache = m_Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash);
+ if (ExistsInCache)
+ {
+ BuildBlob = m_Storage.BuildCacheStorage->GetBuildBlob(m_BuildId, ChunkHash);
+ }
+ if (BuildBlob)
+ {
+ uint64_t BlobSize = BuildBlob.GetSize();
+ m_DownloadStats.DownloadedChunkCount++;
+ m_DownloadStats.DownloadedChunkByteCount += BlobSize;
+ m_DownloadStats.RequestsCompleteCount++;
+ OnDownloaded(std::move(BuildBlob));
+ }
+ else
+ {
+ if (m_RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= m_Options.LargeAttachmentSize)
+ {
+ DownloadLargeBlob(
+ *m_Storage.BuildStorage,
+ m_TempDownloadFolderPath,
+ m_BuildId,
+ ChunkHash,
+ m_Options.PreferredMultipartChunkSize,
+ Work,
+ m_NetworkPool,
+ m_DownloadStats.DownloadedChunkByteCount,
+ m_DownloadStats.MultipartAttachmentCount,
+ [this, &Work, ChunkHash, RemoteChunkIndex, OnDownloaded = std::move(OnDownloaded)](IoBuffer&& Payload) mutable {
+ m_DownloadStats.DownloadedChunkCount++;
+ m_DownloadStats.RequestsCompleteCount++;
+
+ if (Payload && m_Storage.BuildCacheStorage)
+ {
+ m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId,
+ ChunkHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(SharedBuffer(Payload)));
+ }
+
+ OnDownloaded(std::move(Payload));
+ });
+ }
+ else
+ {
+ BuildBlob = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, ChunkHash);
+ if (BuildBlob && m_Storage.BuildCacheStorage)
+ {
+ m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId,
+ ChunkHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(SharedBuffer(BuildBlob)));
+ }
+ if (!BuildBlob)
+ {
+ throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash));
+ }
+ if (!m_Options.PrimeCacheOnly)
+ {
+ if (!m_AbortFlag)
+ {
+ uint64_t BlobSize = BuildBlob.GetSize();
+ m_DownloadStats.DownloadedChunkCount++;
+ m_DownloadStats.DownloadedChunkByteCount += BlobSize;
+ m_DownloadStats.RequestsCompleteCount++;
+
+ OnDownloaded(std::move(BuildBlob));
+ }
+ }
+ }
+ }
+}
+
+BuildsOperationUpdateFolder::BlockRangeDescriptor
+BuildsOperationUpdateFolder::MergeBlockRanges(std::span<const BlockRangeDescriptor> Ranges)
+{
+ ZEN_ASSERT(Ranges.size() > 1);
+ const BlockRangeDescriptor& First = Ranges.front();
+ const BlockRangeDescriptor& Last = Ranges.back();
+
+ return BlockRangeDescriptor{.BlockIndex = First.BlockIndex,
+ .RangeStart = First.RangeStart,
+ .RangeLength = Last.RangeStart + Last.RangeLength - First.RangeStart,
+ .ChunkBlockIndexStart = First.ChunkBlockIndexStart,
+ .ChunkBlockIndexCount = Last.ChunkBlockIndexStart + Last.ChunkBlockIndexCount - First.ChunkBlockIndexStart};
+}
+
+std::optional<std::vector<BuildsOperationUpdateFolder::BlockRangeDescriptor>>
+BuildsOperationUpdateFolder::MakeOptionalBlockRangeVector(uint64_t TotalBlockSize, const BlockRangeDescriptor& Range)
+{
+ if (Range.RangeLength == TotalBlockSize)
+ {
+ return {};
+ }
+ else
+ {
+ return std::vector<BlockRangeDescriptor>{Range};
+ }
+};
+
+const BuildsOperationUpdateFolder::BlockRangeLimit*
+BuildsOperationUpdateFolder::GetBlockRangeLimitForRange(std::span<const BlockRangeLimit> Limits,
+ uint64_t TotalBlockSize,
+ std::span<const BlockRangeDescriptor> Ranges)
+{
+ if (Ranges.size() > 1)
+ {
+ const std::uint64_t WantedSize =
+ std::accumulate(Ranges.begin(), Ranges.end(), uint64_t(0), [](uint64_t Current, const BlockRangeDescriptor& Range) {
+ return Current + Range.RangeLength;
+ });
+
+ const double RangeRequestedPercent = (WantedSize * 100.0) / TotalBlockSize;
+
+ for (const BlockRangeLimit& Limit : Limits)
+ {
+ if (RangeRequestedPercent >= Limit.SizePercent && Ranges.size() > Limit.MaxRangeCount)
+ {
+ return &Limit;
+ }
+ }
+ }
+ return nullptr;
+};
+
+std::vector<BuildsOperationUpdateFolder::BlockRangeDescriptor>
+BuildsOperationUpdateFolder::CollapseBlockRanges(const uint64_t AlwaysAcceptableGap, std::span<const BlockRangeDescriptor> BlockRanges)
+{
+ ZEN_ASSERT(BlockRanges.size() > 1);
+ std::vector<BlockRangeDescriptor> CollapsedBlockRanges;
+
+ auto BlockRangesIt = BlockRanges.begin();
+ CollapsedBlockRanges.push_back(*BlockRangesIt++);
+ for (; BlockRangesIt != BlockRanges.end(); BlockRangesIt++)
+ {
+ BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back();
+
+ const uint64_t BothRangeSize = BlockRangesIt->RangeLength + LastRange.RangeLength;
+
+ const uint64_t Gap = BlockRangesIt->RangeStart - (LastRange.RangeStart + LastRange.RangeLength);
+ if (Gap <= Max(BothRangeSize / 16, AlwaysAcceptableGap))
+ {
+ LastRange.ChunkBlockIndexCount =
+ (BlockRangesIt->ChunkBlockIndexStart + BlockRangesIt->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart;
+ LastRange.RangeLength = (BlockRangesIt->RangeStart + BlockRangesIt->RangeLength) - LastRange.RangeStart;
+ }
+ else
+ {
+ CollapsedBlockRanges.push_back(*BlockRangesIt);
+ }
+ }
+
+ return CollapsedBlockRanges;
+};
+
+uint64_t
+BuildsOperationUpdateFolder::CalculateNextGap(std::span<const BlockRangeDescriptor> BlockRanges)
+{
+ ZEN_ASSERT(BlockRanges.size() > 1);
+ uint64_t AcceptableGap = (uint64_t)-1;
+ for (size_t RangeIndex = 0; RangeIndex < BlockRanges.size() - 1; RangeIndex++)
+ {
+ const BlockRangeDescriptor& Range = BlockRanges[RangeIndex];
+ const BlockRangeDescriptor& NextRange = BlockRanges[RangeIndex + 1];
+
+ const uint64_t Gap = NextRange.RangeStart - (Range.RangeStart + Range.RangeLength);
+ AcceptableGap = Min(Gap, AcceptableGap);
+ }
+ AcceptableGap = RoundUp(AcceptableGap, 16u * 1024u);
+ return AcceptableGap;
+};
+
+std::optional<std::vector<BuildsOperationUpdateFolder::BlockRangeDescriptor>>
+BuildsOperationUpdateFolder::CalculateBlockRanges(uint32_t BlockIndex,
+ const ChunkBlockDescription& BlockDescription,
+ std::span<const uint32_t> BlockChunkIndexNeeded,
+ bool LimitToSingleRange,
+ const uint64_t ChunkStartOffsetInBlock,
+ const uint64_t TotalBlockSize,
+ uint64_t& OutTotalWantedChunksSize)
+{
+ ZEN_TRACE_CPU("CalculateBlockRanges");
+
+ std::vector<BlockRangeDescriptor> BlockRanges;
+ {
+ uint64_t CurrentOffset = ChunkStartOffsetInBlock;
+ uint32_t ChunkBlockIndex = 0;
+ uint32_t NeedBlockChunkIndexOffset = 0;
+ BlockRangeDescriptor NextRange{.BlockIndex = BlockIndex};
+ while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() && ChunkBlockIndex < BlockDescription.ChunkRawHashes.size())
+ {
+ const uint32_t ChunkCompressedLength = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex];
+ if (ChunkBlockIndex < BlockChunkIndexNeeded[NeedBlockChunkIndexOffset])
+ {
+ if (NextRange.RangeLength > 0)
+ {
+ BlockRanges.push_back(NextRange);
+ NextRange = {.BlockIndex = BlockIndex};
+ }
+ ChunkBlockIndex++;
+ CurrentOffset += ChunkCompressedLength;
+ }
+ else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset])
+ {
+ if (NextRange.RangeLength == 0)
+ {
+ NextRange.RangeStart = CurrentOffset;
+ NextRange.ChunkBlockIndexStart = ChunkBlockIndex;
+ }
+ NextRange.RangeLength += ChunkCompressedLength;
+ NextRange.ChunkBlockIndexCount++;
+ ChunkBlockIndex++;
+ CurrentOffset += ChunkCompressedLength;
+ NeedBlockChunkIndexOffset++;
+ }
+ else
+ {
+ ZEN_ASSERT(false);
+ }
+ }
+ if (NextRange.RangeLength > 0)
+ {
+ BlockRanges.push_back(NextRange);
+ }
+ }
+ ZEN_ASSERT(!BlockRanges.empty());
+
+ OutTotalWantedChunksSize =
+ std::accumulate(BlockRanges.begin(), BlockRanges.end(), uint64_t(0), [](uint64_t Current, const BlockRangeDescriptor& Range) {
+ return Current + Range.RangeLength;
+ });
+
+ double RangeWantedPercent = (OutTotalWantedChunksSize * 100.0) / TotalBlockSize;
+
+ if (BlockRanges.size() == 1)
+ {
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Range request of {} ({:.2f}%) using single range from block {} ({}) as is",
+ NiceBytes(OutTotalWantedChunksSize),
+ RangeWantedPercent,
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize));
+ }
+ return BlockRanges;
+ }
+
+ if (LimitToSingleRange)
+ {
+ const BlockRangeDescriptor MergedRange = MergeBlockRanges(BlockRanges);
+ if (m_Options.IsVerbose)
+ {
+ const double RangeRequestedPercent = (MergedRange.RangeLength * 100.0) / TotalBlockSize;
+ const double WastedPercent = ((MergedRange.RangeLength - OutTotalWantedChunksSize) * 100.0) / MergedRange.RangeLength;
+
+ LOG_OUTPUT(
+ m_LogOutput,
+ "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) limited to single block range {} ({:.2f}%) wasting "
+ "{:.2f}% ({})",
+ NiceBytes(OutTotalWantedChunksSize),
+ RangeWantedPercent,
+ BlockRanges.size(),
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize),
+ NiceBytes(MergedRange.RangeLength),
+ RangeRequestedPercent,
+ WastedPercent,
+ NiceBytes(MergedRange.RangeLength - OutTotalWantedChunksSize));
+ }
+ return MakeOptionalBlockRangeVector(TotalBlockSize, MergedRange);
+ }
+
+ if (RangeWantedPercent > FullBlockRangePercentLimit)
+ {
+ const BlockRangeDescriptor MergedRange = MergeBlockRanges(BlockRanges);
+ if (m_Options.IsVerbose)
+ {
+ const double RangeRequestedPercent = (MergedRange.RangeLength * 100.0) / TotalBlockSize;
+ const double WastedPercent = ((MergedRange.RangeLength - OutTotalWantedChunksSize) * 100.0) / MergedRange.RangeLength;
+
+ LOG_OUTPUT(m_LogOutput,
+ "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) exceeds {}%. Merged to single block range {} "
+ "({:.2f}%) wasting {:.2f}% ({})",
+ NiceBytes(OutTotalWantedChunksSize),
+ RangeWantedPercent,
+ BlockRanges.size(),
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize),
+ FullBlockRangePercentLimit,
+ NiceBytes(MergedRange.RangeLength),
+ RangeRequestedPercent,
+ WastedPercent,
+ NiceBytes(MergedRange.RangeLength - OutTotalWantedChunksSize));
+ }
+ return MakeOptionalBlockRangeVector(TotalBlockSize, MergedRange);
+ }
+
+ std::vector<BlockRangeDescriptor> CollapsedBlockRanges = CollapseBlockRanges(16u * 1024u, BlockRanges);
+ while (GetBlockRangeLimitForRange(ForceMergeLimits, TotalBlockSize, CollapsedBlockRanges))
+ {
+ CollapsedBlockRanges = CollapseBlockRanges(CalculateNextGap(CollapsedBlockRanges), CollapsedBlockRanges);
+ }
+
+ const std::uint64_t WantedCollapsedSize =
+ std::accumulate(CollapsedBlockRanges.begin(),
+ CollapsedBlockRanges.end(),
+ uint64_t(0),
+ [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; });
+
+ const double CollapsedRangeRequestedPercent = (WantedCollapsedSize * 100.0) / TotalBlockSize;
+
+ if (m_Options.IsVerbose)
+ {
+ const double WastedPercent = ((WantedCollapsedSize - OutTotalWantedChunksSize) * 100.0) / WantedCollapsedSize;
+
+ LOG_OUTPUT(
+ m_LogOutput,
+ "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) collapsed to {} {:.2f}% using {} ranges wasting {:.2f}% "
+ "({})",
+ NiceBytes(OutTotalWantedChunksSize),
+ RangeWantedPercent,
+ BlockRanges.size(),
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize),
+ NiceBytes(WantedCollapsedSize),
+ CollapsedRangeRequestedPercent,
+ CollapsedBlockRanges.size(),
+ WastedPercent,
+ NiceBytes(WantedCollapsedSize - OutTotalWantedChunksSize));
+ }
+ return CollapsedBlockRanges;
+}
+
+void
+BuildsOperationUpdateFolder::DownloadPartialBlock(
+ const BlockRangeDescriptor BlockRange,
+ const BlobsExistsResult& ExistsResult,
+ std::function<void(IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath)>&& OnDownloaded)
+{
+ const uint32_t BlockIndex = BlockRange.BlockIndex;
+
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
+
+ IoBuffer BlockBuffer;
+ if (m_Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash))
+ {
+ BlockBuffer =
+ m_Storage.BuildCacheStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength);
+ }
+ if (!BlockBuffer)
+ {
+ BlockBuffer =
+ m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength);
+ }
+ if (!BlockBuffer)
+ {
+ throw std::runtime_error(fmt::format("Block {} is missing when fetching range {} -> {}",
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeStart + BlockRange.RangeLength));
+ }
+ if (!m_AbortFlag)
+ {
+ uint64_t BlockSize = BlockBuffer.GetSize();
+ m_DownloadStats.DownloadedBlockCount++;
+ m_DownloadStats.DownloadedBlockByteCount += BlockSize;
+ m_DownloadStats.RequestsCompleteCount++;
+
+ std::filesystem::path BlockChunkPath;
+
+ // Check if the dowloaded block is file based and we can move it directly without rewriting it
+ {
+ IoBufferFileReference FileRef;
+ if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == BlockSize))
+ {
+ ZEN_TRACE_CPU("MoveTempPartialBlock");
+
+ std::error_code Ec;
+ std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
+ if (!Ec)
+ {
+ BlockBuffer.SetDeleteOnClose(false);
+ BlockBuffer = {};
+ BlockChunkPath = m_TempBlockFolderPath /
+ fmt::format("{}_{:x}_{:x}", BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength);
+ RenameFile(TempBlobPath, BlockChunkPath, Ec);
+ if (Ec)
+ {
+ BlockChunkPath = std::filesystem::path{};
+
+ // Re-open the temp file again
+ BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
+ BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true);
+ BlockBuffer.SetDeleteOnClose(true);
+ }
+ }
+ }
+ }
+
+ if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u))
+ {
+ ZEN_TRACE_CPU("WriteTempPartialBlock");
+ // Could not be moved and rather large, lets store it on disk
+ BlockChunkPath = m_TempBlockFolderPath /
+ fmt::format("{}_{:x}_{:x}", BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength);
+ TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
+ BlockBuffer = {};
+ }
+ if (!m_AbortFlag)
+ {
+ OnDownloaded(std::move(BlockBuffer), std::move(BlockChunkPath));
+ }
+ }
+}
+
std::vector<uint32_t>
BuildsOperationUpdateFolder::WriteLocalChunkToCache(CloneQueryInterface* CloneQuery,
const CopyChunkData& CopyData,
@@ -5739,7 +5729,6 @@ BuildsOperationUploadFolder::GenerateBuildBlocks(const ChunkedFolderContent&
ZEN_TRACE_CPU("GenerateBuildBlocks_Generate");
FilteredGeneratedBytesPerSecond.Start();
- // TODO: Convert ScheduleWork body to function
Stopwatch GenerateTimer;
CompressedBuffer CompressedBlock =
@@ -5816,7 +5805,6 @@ BuildsOperationUploadFolder::GenerateBuildBlocks(const ChunkedFolderContent&
ZEN_TRACE_CPU("GenerateBuildBlocks_Upload");
FilteredUploadedBytesPerSecond.Start();
- // TODO: Convert ScheduleWork body to function
const CbObject BlockMetaData =
BuildChunkBlockDescription(OutBlocks.BlockDescriptions[BlockIndex],
@@ -6890,13 +6878,15 @@ BuildsOperationValidateBuildPart::Execute()
PreferredMultipartChunkSize,
Work,
m_NetworkPool,
- m_DownloadStats,
+ m_DownloadStats.DownloadedChunkByteCount,
+ m_DownloadStats.MultipartAttachmentCount,
[this,
&Work,
AttachmentsToVerifyCount,
&FilteredDownloadedBytesPerSecond,
&FilteredVerifiedBytesPerSecond,
ChunkHash = ChunkAttachment](IoBuffer&& Payload) {
+ m_DownloadStats.DownloadedChunkCount++;
Payload.SetContentType(ZenContentType::kCompressedBinary);
if (!m_AbortFlag)
{
diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
index be6720d5d..fcf880e48 100644
--- a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
+++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h
@@ -12,6 +12,10 @@
#include <atomic>
#include <memory>
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <tsl/robin_set.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
namespace zen {
class CloneQueryInterface;
@@ -240,6 +244,61 @@ private:
std::filesystem::path Path;
};
+ struct ScavengedSequenceCopyOperation
+ {
+ uint32_t ScavengedContentIndex = (uint32_t)-1;
+ uint32_t ScavengedPathIndex = (uint32_t)-1;
+ uint32_t RemoteSequenceIndex = (uint32_t)-1;
+ uint64_t RawSize = (uint32_t)-1;
+ };
+
+ struct CopyChunkData
+ {
+ uint32_t ScavengeSourceIndex = (uint32_t)-1;
+ uint32_t SourceSequenceIndex = (uint32_t)-1;
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> TargetChunkLocationPtrs;
+ struct ChunkTarget
+ {
+ uint32_t TargetChunkLocationCount = (uint32_t)-1;
+ uint32_t RemoteChunkIndex = (uint32_t)-1;
+ uint64_t CacheFileOffset = (uint64_t)-1;
+ };
+ std::vector<ChunkTarget> ChunkTargets;
+ };
+
+ struct BlobsExistsResult
+ {
+ tsl::robin_set<IoHash> ExistingBlobs;
+ uint64_t ElapsedTimeMs = 0;
+ };
+
+ struct BlockRangeDescriptor
+ {
+ uint32_t BlockIndex = (uint32_t)-1;
+ uint64_t RangeStart = 0;
+ uint64_t RangeLength = 0;
+ uint32_t ChunkBlockIndexStart = 0;
+ uint32_t ChunkBlockIndexCount = 0;
+ };
+
+ struct BlockRangeLimit
+ {
+ uint16_t SizePercent;
+ uint16_t MaxRangeCount;
+ };
+
+ static constexpr uint16_t FullBlockRangePercentLimit = 95;
+
+ static constexpr BuildsOperationUpdateFolder::BlockRangeLimit ForceMergeLimits[] = {
+ {.SizePercent = FullBlockRangePercentLimit, .MaxRangeCount = 1},
+ {.SizePercent = 90, .MaxRangeCount = 2},
+ {.SizePercent = 85, .MaxRangeCount = 8},
+ {.SizePercent = 80, .MaxRangeCount = 16},
+ {.SizePercent = 70, .MaxRangeCount = 32},
+ {.SizePercent = 60, .MaxRangeCount = 48},
+ {.SizePercent = 2, .MaxRangeCount = 56},
+ {.SizePercent = 0, .MaxRangeCount = 64}};
+
void ScanCacheFolder(tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& OutCachedChunkHashesFound,
tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& OutCachedSequenceHashesFound);
void ScanTempBlocksFolder(tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& OutCachedBlocksFound);
@@ -265,31 +324,46 @@ private:
uint64_t GetChunkWriteCount(std::span<const std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, uint32_t ChunkIndex);
- struct ScavengedSequenceCopyOperation
- {
- uint32_t ScavengedContentIndex = (uint32_t)-1;
- uint32_t ScavengedPathIndex = (uint32_t)-1;
- uint32_t RemoteSequenceIndex = (uint32_t)-1;
- uint64_t RawSize = (uint32_t)-1;
- };
-
void WriteScavengedSequenceToCache(const std::filesystem::path& ScavengeRootPath,
const ChunkedFolderContent& ScavengedContent,
const ScavengedSequenceCopyOperation& ScavengeOp);
- struct CopyChunkData
- {
- uint32_t ScavengeSourceIndex = (uint32_t)-1;
- uint32_t SourceSequenceIndex = (uint32_t)-1;
- std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> TargetChunkLocationPtrs;
- struct ChunkTarget
- {
- uint32_t TargetChunkLocationCount = (uint32_t)-1;
- uint32_t RemoteChunkIndex = (uint32_t)-1;
- uint64_t CacheFileOffset = (uint64_t)-1;
- };
- std::vector<ChunkTarget> ChunkTargets;
- };
+ void WriteLooseChunk(const uint32_t RemoteChunkIndex,
+ const BlobsExistsResult& ExistsResult,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ std::atomic<uint64_t>& WritePartsComplete,
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>&& ChunkTargetPtrs,
+ BufferedWriteFileCache& WriteCache,
+ ParallelWork& Work,
+ uint64_t TotalRequestCount,
+ uint64_t TotalPartWriteCount,
+ FilteredRate& FilteredDownloadedBytesPerSecond,
+ FilteredRate& FilteredWrittenBytesPerSecond);
+
+ void DownloadBuildBlob(uint32_t RemoteChunkIndex,
+ const BlobsExistsResult& ExistsResult,
+ ParallelWork& Work,
+ std::function<void(IoBuffer&& Payload)>&& OnDownloaded);
+
+ BlockRangeDescriptor MergeBlockRanges(std::span<const BlockRangeDescriptor> Ranges);
+ std::optional<std::vector<BlockRangeDescriptor>> MakeOptionalBlockRangeVector(uint64_t TotalBlockSize,
+ const BlockRangeDescriptor& Range);
+ const BlockRangeLimit* GetBlockRangeLimitForRange(std::span<const BlockRangeLimit> Limits,
+ uint64_t TotalBlockSize,
+ std::span<const BlockRangeDescriptor> Ranges);
+ std::vector<BlockRangeDescriptor> CollapseBlockRanges(const uint64_t AlwaysAcceptableGap,
+ std::span<const BlockRangeDescriptor> BlockRanges);
+ uint64_t CalculateNextGap(std::span<const BlockRangeDescriptor> BlockRanges);
+ std::optional<std::vector<BlockRangeDescriptor>> CalculateBlockRanges(uint32_t BlockIndex,
+ const ChunkBlockDescription& BlockDescription,
+ std::span<const uint32_t> BlockChunkIndexNeeded,
+ bool LimitToSingleRange,
+ const uint64_t ChunkStartOffsetInBlock,
+ const uint64_t TotalBlockSize,
+ uint64_t& OutTotalWantedChunksSize);
+ void DownloadPartialBlock(const BlockRangeDescriptor BlockRange,
+ const BlobsExistsResult& ExistsResult,
+ std::function<void(IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath)>&& OnDownloaded);
std::vector<uint32_t> WriteLocalChunkToCache(CloneQueryInterface* CloneQuery,
const CopyChunkData& CopyData,