aboutsummaryrefslogtreecommitdiff
path: root/src/zenremotestore/builds/buildstorageoperations.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenremotestore/builds/buildstorageoperations.cpp')
-rw-r--r--src/zenremotestore/builds/buildstorageoperations.cpp1636
1 files changed, 830 insertions, 806 deletions
diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp
index 2319ad66d..f4b167b73 100644
--- a/src/zenremotestore/builds/buildstorageoperations.cpp
+++ b/src/zenremotestore/builds/buildstorageoperations.cpp
@@ -38,6 +38,7 @@ ZEN_THIRD_PARTY_INCLUDES_END
#if ZEN_WITH_TESTS
# include <zencore/testing.h>
# include <zencore/testutils.h>
+# include <zenhttp/httpclientauth.h>
# include <zenremotestore/builds/filebuildstorage.h>
#endif // ZEN_WITH_TESTS
@@ -484,24 +485,6 @@ private:
uint64_t FilteredPerSecond = 0;
};
-EPartialBlockRequestMode
-PartialBlockRequestModeFromString(const std::string_view ModeString)
-{
- switch (HashStringAsLowerDjb2(ModeString))
- {
- case HashStringDjb2("false"):
- return EPartialBlockRequestMode::Off;
- case HashStringDjb2("zencacheonly"):
- return EPartialBlockRequestMode::ZenCacheOnly;
- case HashStringDjb2("mixed"):
- return EPartialBlockRequestMode::Mixed;
- case HashStringDjb2("true"):
- return EPartialBlockRequestMode::All;
- default:
- return EPartialBlockRequestMode::Invalid;
- }
-}
-
std::filesystem::path
ZenStateFilePath(const std::filesystem::path& ZenFolderPath)
{
@@ -579,13 +562,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
CreateDirectories(m_TempDownloadFolderPath);
CreateDirectories(m_TempBlockFolderPath);
- Stopwatch IndexTimer;
-
- if (!m_Options.IsQuiet)
- {
- ZEN_OPERATION_LOG_INFO(m_LogOutput, "Indexed local and remote content in {}", NiceTimeSpanMs(IndexTimer.GetElapsedTimeMs()));
- }
-
Stopwatch CacheMappingTimer;
std::vector<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters(m_RemoteContent.ChunkedContent.SequenceRawHashes.size());
@@ -906,343 +882,240 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
CheckRequiredDiskSpace(RemotePathToRemoteIndex);
+ BlobsExistsResult ExistsResult;
{
- ZEN_TRACE_CPU("WriteChunks");
-
- m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::WriteChunks, (uint32_t)TaskSteps::StepCount);
-
- Stopwatch WriteTimer;
-
- FilteredRate FilteredDownloadedBytesPerSecond;
- FilteredRate FilteredWrittenBytesPerSecond;
-
- std::unique_ptr<OperationLogOutput::ProgressBar> WriteProgressBarPtr(
- m_LogOutput.CreateProgressBar(m_Options.PrimeCacheOnly ? "Downloading" : "Writing"));
- OperationLogOutput::ProgressBar& WriteProgressBar(*WriteProgressBarPtr);
- ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ ChunkBlockAnalyser BlockAnalyser(
+ m_LogOutput,
+ m_BlockDescriptions,
+ ChunkBlockAnalyser::Options{.IsQuiet = m_Options.IsQuiet,
+ .IsVerbose = m_Options.IsVerbose,
+ .HostLatencySec = m_Storage.BuildStorageHost.LatencySec,
+ .HostHighSpeedLatencySec = m_Storage.CacheHost.LatencySec,
+ .HostMaxRangeCountPerRequest = m_Storage.BuildStorageHost.Caps.MaxRangeCountPerRequest,
+ .HostHighSpeedMaxRangeCountPerRequest = m_Storage.CacheHost.Caps.MaxRangeCountPerRequest});
- struct LooseChunkHashWorkData
- {
- std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs;
- uint32_t RemoteChunkIndex = (uint32_t)-1;
- };
+ std::vector<ChunkBlockAnalyser::NeededBlock> NeededBlocks = BlockAnalyser.GetNeeded(
+ m_RemoteLookup.ChunkHashToChunkIndex,
+ [&](uint32_t RemoteChunkIndex) -> bool { return RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex]; });
- std::vector<LooseChunkHashWorkData> LooseChunkHashWorks;
- TotalPartWriteCount += CopyChunkDatas.size();
- TotalPartWriteCount += ScavengedSequenceCopyOperations.size();
+ std::vector<uint32_t> FetchBlockIndexes;
+ std::vector<uint32_t> CachedChunkBlockIndexes;
- for (const IoHash ChunkHash : m_LooseChunkHashes)
{
- auto RemoteChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash);
- ZEN_ASSERT(RemoteChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end());
- const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second;
- if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex])
+ ZEN_TRACE_CPU("BlockCacheFileExists");
+ for (const ChunkBlockAnalyser::NeededBlock& NeededBlock : NeededBlocks)
{
- if (m_Options.IsVerbose)
+ if (m_Options.PrimeCacheOnly)
{
- ZEN_OPERATION_LOG_INFO(m_LogOutput, "Skipping chunk {} due to cache reuse", ChunkHash);
- }
- continue;
- }
- bool NeedsCopy = true;
- if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex].compare_exchange_strong(NeedsCopy, false))
- {
- std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
- GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex);
-
- if (ChunkTargetPtrs.empty())
- {
- if (m_Options.IsVerbose)
- {
- ZEN_OPERATION_LOG_INFO(m_LogOutput, "Skipping chunk {} due to cache reuse", ChunkHash);
- }
+ FetchBlockIndexes.push_back(NeededBlock.BlockIndex);
}
else
{
- TotalRequestCount++;
- TotalPartWriteCount++;
- LooseChunkHashWorks.push_back(
- LooseChunkHashWorkData{.ChunkTargetPtrs = ChunkTargetPtrs, .RemoteChunkIndex = RemoteChunkIndex});
- }
- }
- }
-
- uint32_t BlockCount = gsl::narrow<uint32_t>(m_BlockDescriptions.size());
-
- std::vector<bool> ChunkIsPickedUpByBlock(m_RemoteContent.ChunkedContent.ChunkHashes.size(), false);
- auto GetNeededChunkBlockIndexes = [this, &RemoteChunkIndexNeedsCopyFromSourceFlags, &ChunkIsPickedUpByBlock](
- const ChunkBlockDescription& BlockDescription) {
- ZEN_TRACE_CPU("GetNeededChunkBlockIndexes");
- std::vector<uint32_t> NeededBlockChunkIndexes;
- for (uint32_t ChunkBlockIndex = 0; ChunkBlockIndex < BlockDescription.ChunkRawHashes.size(); ChunkBlockIndex++)
- {
- const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex];
- if (auto It = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); It != m_RemoteLookup.ChunkHashToChunkIndex.end())
- {
- const uint32_t RemoteChunkIndex = It->second;
- if (!ChunkIsPickedUpByBlock[RemoteChunkIndex])
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[NeededBlock.BlockIndex];
+ bool UsingCachedBlock = false;
+ if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end())
{
- if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex])
+ TotalPartWriteCount++;
+
+ std::filesystem::path BlockPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString();
+ if (IsFile(BlockPath))
{
- ChunkIsPickedUpByBlock[RemoteChunkIndex] = true;
- NeededBlockChunkIndexes.push_back(ChunkBlockIndex);
+ CachedChunkBlockIndexes.push_back(NeededBlock.BlockIndex);
+ UsingCachedBlock = true;
}
}
- }
- else
- {
- ZEN_DEBUG("Chunk {} not found in block {}", ChunkHash, BlockDescription.BlockHash);
+ if (!UsingCachedBlock)
+ {
+ FetchBlockIndexes.push_back(NeededBlock.BlockIndex);
+ }
}
}
- return NeededBlockChunkIndexes;
- };
+ }
- std::vector<uint32_t> CachedChunkBlockIndexes;
- std::vector<uint32_t> FetchBlockIndexes;
- std::vector<std::vector<uint32_t>> AllBlockChunkIndexNeeded;
+ std::vector<uint32_t> NeededLooseChunkIndexes;
- for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; BlockIndex++)
{
- const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
-
- std::vector<uint32_t> BlockChunkIndexNeeded = GetNeededChunkBlockIndexes(BlockDescription);
- if (!BlockChunkIndexNeeded.empty())
+ NeededLooseChunkIndexes.reserve(m_LooseChunkHashes.size());
+ for (uint32_t LooseChunkIndex = 0; LooseChunkIndex < m_LooseChunkHashes.size(); LooseChunkIndex++)
{
- if (m_Options.PrimeCacheOnly)
+ const IoHash& ChunkHash = m_LooseChunkHashes[LooseChunkIndex];
+ auto RemoteChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash);
+ ZEN_ASSERT(RemoteChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end());
+ const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second;
+
+ if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex])
{
- FetchBlockIndexes.push_back(BlockIndex);
+ if (m_Options.IsVerbose)
+ {
+ ZEN_OPERATION_LOG_INFO(m_LogOutput,
+ "Skipping chunk {} due to cache reuse",
+ m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]);
+ }
+ continue;
}
- else
+
+ bool NeedsCopy = true;
+ if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex].compare_exchange_strong(NeedsCopy, false))
{
- bool UsingCachedBlock = false;
- if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end())
+ uint64_t WriteCount = GetChunkWriteCount(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex);
+ if (WriteCount == 0)
{
- TotalPartWriteCount++;
-
- std::filesystem::path BlockPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString();
- if (IsFile(BlockPath))
+ if (m_Options.IsVerbose)
{
- CachedChunkBlockIndexes.push_back(BlockIndex);
- UsingCachedBlock = true;
+ ZEN_OPERATION_LOG_INFO(m_LogOutput,
+ "Skipping chunk {} due to cache reuse",
+ m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]);
}
}
- if (!UsingCachedBlock)
+ else
{
- FetchBlockIndexes.push_back(BlockIndex);
+ NeededLooseChunkIndexes.push_back(LooseChunkIndex);
}
}
}
- AllBlockChunkIndexNeeded.emplace_back(std::move(BlockChunkIndexNeeded));
}
- BlobsExistsResult ExistsResult;
-
- if (m_Storage.BuildCacheStorage)
+ if (m_Storage.CacheStorage)
{
ZEN_TRACE_CPU("BlobCacheExistCheck");
Stopwatch Timer;
- tsl::robin_set<IoHash> BlobHashesSet;
+ std::vector<IoHash> BlobHashes;
+ BlobHashes.reserve(NeededLooseChunkIndexes.size() + FetchBlockIndexes.size());
- BlobHashesSet.reserve(LooseChunkHashWorks.size() + FetchBlockIndexes.size());
- for (LooseChunkHashWorkData& LooseChunkHashWork : LooseChunkHashWorks)
+ for (const uint32_t LooseChunkIndex : NeededLooseChunkIndexes)
{
- BlobHashesSet.insert(m_RemoteContent.ChunkedContent.ChunkHashes[LooseChunkHashWork.RemoteChunkIndex]);
+ BlobHashes.push_back(m_LooseChunkHashes[LooseChunkIndex]);
}
+
for (uint32_t BlockIndex : FetchBlockIndexes)
{
- const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
- BlobHashesSet.insert(BlockDescription.BlockHash);
+ BlobHashes.push_back(m_BlockDescriptions[BlockIndex].BlockHash);
}
- if (!BlobHashesSet.empty())
- {
- const std::vector<IoHash> BlobHashes(BlobHashesSet.begin(), BlobHashesSet.end());
- const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult =
- m_Storage.BuildCacheStorage->BlobsExists(m_BuildId, BlobHashes);
+ const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult =
+ m_Storage.CacheStorage->BlobsExists(m_BuildId, BlobHashes);
- if (CacheExistsResult.size() == BlobHashes.size())
+ if (CacheExistsResult.size() == BlobHashes.size())
+ {
+ ExistsResult.ExistingBlobs.reserve(CacheExistsResult.size());
+ for (size_t BlobIndex = 0; BlobIndex < BlobHashes.size(); BlobIndex++)
{
- ExistsResult.ExistingBlobs.reserve(CacheExistsResult.size());
- for (size_t BlobIndex = 0; BlobIndex < BlobHashes.size(); BlobIndex++)
+ if (CacheExistsResult[BlobIndex].HasBody)
{
- if (CacheExistsResult[BlobIndex].HasBody)
- {
- ExistsResult.ExistingBlobs.insert(BlobHashes[BlobIndex]);
- }
+ ExistsResult.ExistingBlobs.insert(BlobHashes[BlobIndex]);
}
}
- ExistsResult.ElapsedTimeMs = Timer.GetElapsedTimeMs();
- if (!ExistsResult.ExistingBlobs.empty() && !m_Options.IsQuiet)
- {
- ZEN_OPERATION_LOG_INFO(m_LogOutput,
- "Remote cache : Found {} out of {} needed blobs in {}",
- ExistsResult.ExistingBlobs.size(),
- BlobHashes.size(),
- NiceTimeSpanMs(ExistsResult.ElapsedTimeMs));
- }
+ }
+ ExistsResult.ElapsedTimeMs = Timer.GetElapsedTimeMs();
+ if (!ExistsResult.ExistingBlobs.empty() && !m_Options.IsQuiet)
+ {
+ ZEN_OPERATION_LOG_INFO(m_LogOutput,
+ "Remote cache : Found {} out of {} needed blobs in {}",
+ ExistsResult.ExistingBlobs.size(),
+ BlobHashes.size(),
+ NiceTimeSpanMs(ExistsResult.ElapsedTimeMs));
}
}
- std::vector<BlockRangeDescriptor> BlockRangeWorks;
- std::vector<uint32_t> FullBlockWorks;
+ std::vector<ChunkBlockAnalyser::EPartialBlockDownloadMode> BlockPartialDownloadModes;
+
+ if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Off)
{
- Stopwatch Timer;
+ BlockPartialDownloadModes.resize(m_BlockDescriptions.size(), ChunkBlockAnalyser::EPartialBlockDownloadMode::Off);
+ }
+ else
+ {
+ ChunkBlockAnalyser::EPartialBlockDownloadMode CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::Off;
+ ChunkBlockAnalyser::EPartialBlockDownloadMode CachePartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::Off;
- std::vector<uint32_t> PartialBlockIndexes;
+ switch (m_Options.PartialBlockRequestMode)
+ {
+ case EPartialBlockRequestMode::Off:
+ break;
+ case EPartialBlockRequestMode::ZenCacheOnly:
+ CachePartialDownloadMode = m_Storage.CacheHost.Caps.MaxRangeCountPerRequest > 1
+ ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed
+ : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange;
+ CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::Off;
+ break;
+ case EPartialBlockRequestMode::Mixed:
+ CachePartialDownloadMode = m_Storage.CacheHost.Caps.MaxRangeCountPerRequest > 1
+ ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed
+ : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange;
+ CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange;
+ break;
+ case EPartialBlockRequestMode::All:
+ CachePartialDownloadMode = m_Storage.CacheHost.Caps.MaxRangeCountPerRequest > 1
+ ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed
+ : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange;
+ CloudPartialDownloadMode = m_Storage.BuildStorageHost.Caps.MaxRangeCountPerRequest > 1
+ ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange
+ : ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange;
+ break;
+ default:
+ ZEN_ASSERT(false);
+ break;
+ }
- for (uint32_t BlockIndex : FetchBlockIndexes)
+ BlockPartialDownloadModes.reserve(m_BlockDescriptions.size());
+ for (uint32_t BlockIndex = 0; BlockIndex < m_BlockDescriptions.size(); BlockIndex++)
{
- const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
+ const bool BlockExistInCache = ExistsResult.ExistingBlobs.contains(m_BlockDescriptions[BlockIndex].BlockHash);
+ BlockPartialDownloadModes.push_back(BlockExistInCache ? CachePartialDownloadMode : CloudPartialDownloadMode);
+ }
+ }
- const std::vector<uint32_t> BlockChunkIndexNeeded = std::move(AllBlockChunkIndexNeeded[BlockIndex]);
- if (!BlockChunkIndexNeeded.empty())
- {
- bool WantsToDoPartialBlockDownload = BlockChunkIndexNeeded.size() < BlockDescription.ChunkRawHashes.size();
- bool CanDoPartialBlockDownload =
- (BlockDescription.HeaderSize > 0) &&
- (BlockDescription.ChunkCompressedLengths.size() == BlockDescription.ChunkRawHashes.size());
-
- bool AllowedToDoPartialRequest = false;
- bool BlockExistInCache = ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash);
- switch (m_Options.PartialBlockRequestMode)
- {
- case EPartialBlockRequestMode::Off:
- break;
- case EPartialBlockRequestMode::ZenCacheOnly:
- AllowedToDoPartialRequest = BlockExistInCache;
- break;
- case EPartialBlockRequestMode::Mixed:
- case EPartialBlockRequestMode::All:
- AllowedToDoPartialRequest = true;
- break;
- default:
- ZEN_ASSERT(false);
- break;
- }
+ ZEN_ASSERT(BlockPartialDownloadModes.size() == m_BlockDescriptions.size());
- const uint32_t ChunkStartOffsetInBlock =
- gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize);
+ ChunkBlockAnalyser::BlockResult PartialBlocks =
+ BlockAnalyser.CalculatePartialBlockDownloads(NeededBlocks, BlockPartialDownloadModes);
- const uint64_t TotalBlockSize = std::accumulate(BlockDescription.ChunkCompressedLengths.begin(),
- BlockDescription.ChunkCompressedLengths.end(),
- std::uint64_t(ChunkStartOffsetInBlock));
+ struct LooseChunkHashWorkData
+ {
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs;
+ uint32_t RemoteChunkIndex = (uint32_t)-1;
+ };
- if (AllowedToDoPartialRequest && WantsToDoPartialBlockDownload && CanDoPartialBlockDownload)
- {
- ZEN_TRACE_CPU("PartialBlockAnalysis");
-
- bool LimitToSingleRange =
- BlockExistInCache ? false : m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Mixed;
- uint64_t TotalWantedChunksSize = 0;
- std::optional<std::vector<BlockRangeDescriptor>> MaybeBlockRanges =
- CalculateBlockRanges(BlockIndex,
- BlockDescription,
- BlockChunkIndexNeeded,
- LimitToSingleRange,
- ChunkStartOffsetInBlock,
- TotalBlockSize,
- TotalWantedChunksSize);
- ZEN_ASSERT(TotalWantedChunksSize <= TotalBlockSize);
-
- if (MaybeBlockRanges.has_value())
- {
- const std::vector<BlockRangeDescriptor>& BlockRanges = MaybeBlockRanges.value();
- ZEN_ASSERT(!BlockRanges.empty());
- BlockRangeWorks.insert(BlockRangeWorks.end(), BlockRanges.begin(), BlockRanges.end());
- TotalRequestCount += BlockRanges.size();
- TotalPartWriteCount += BlockRanges.size();
-
- uint64_t RequestedSize = std::accumulate(
- BlockRanges.begin(),
- BlockRanges.end(),
- uint64_t(0),
- [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; });
- PartialBlockIndexes.push_back(BlockIndex);
-
- if (RequestedSize > TotalWantedChunksSize)
- {
- if (m_Options.IsVerbose)
- {
- ZEN_OPERATION_LOG_INFO(
- m_LogOutput,
- "Requesting {} chunks ({}) from block {} ({}) using {} requests (extra bytes {})",
- BlockChunkIndexNeeded.size(),
- NiceBytes(RequestedSize),
- BlockDescription.BlockHash,
- NiceBytes(TotalBlockSize),
- BlockRanges.size(),
- NiceBytes(RequestedSize - TotalWantedChunksSize));
- }
- }
- }
- else
- {
- FullBlockWorks.push_back(BlockIndex);
- TotalRequestCount++;
- TotalPartWriteCount++;
- }
- }
- else
- {
- FullBlockWorks.push_back(BlockIndex);
- TotalRequestCount++;
- TotalPartWriteCount++;
- }
- }
- }
+ TotalRequestCount += NeededLooseChunkIndexes.size();
+ TotalPartWriteCount += NeededLooseChunkIndexes.size();
+ TotalRequestCount += PartialBlocks.BlockRanges.size();
+ TotalPartWriteCount += PartialBlocks.BlockRanges.size();
+ TotalRequestCount += PartialBlocks.FullBlockIndexes.size();
+ TotalPartWriteCount += PartialBlocks.FullBlockIndexes.size();
- if (!PartialBlockIndexes.empty())
- {
- uint64_t TotalFullBlockRequestBytes = 0;
- for (uint32_t BlockIndex : FullBlockWorks)
- {
- const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
- uint32_t CurrentOffset =
- gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize);
+ std::vector<LooseChunkHashWorkData> LooseChunkHashWorks;
+ for (uint32_t LooseChunkIndex : NeededLooseChunkIndexes)
+ {
+ const IoHash& ChunkHash = m_LooseChunkHashes[LooseChunkIndex];
+ auto RemoteChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash);
+ ZEN_ASSERT(RemoteChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end());
+ const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second;
- TotalFullBlockRequestBytes += std::accumulate(BlockDescription.ChunkCompressedLengths.begin(),
- BlockDescription.ChunkCompressedLengths.end(),
- std::uint64_t(CurrentOffset));
- }
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
+ GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex);
- uint64_t TotalPartialBlockBytes = 0;
- for (uint32_t BlockIndex : PartialBlockIndexes)
- {
- const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
- uint32_t CurrentOffset =
- gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize);
+ ZEN_ASSERT(!ChunkTargetPtrs.empty());
+ LooseChunkHashWorks.push_back(
+ LooseChunkHashWorkData{.ChunkTargetPtrs = ChunkTargetPtrs, .RemoteChunkIndex = RemoteChunkIndex});
+ }
- TotalPartialBlockBytes += std::accumulate(BlockDescription.ChunkCompressedLengths.begin(),
- BlockDescription.ChunkCompressedLengths.end(),
- std::uint64_t(CurrentOffset));
- }
+ ZEN_TRACE_CPU("WriteChunks");
- uint64_t NonPartialTotalBlockBytes = TotalFullBlockRequestBytes + TotalPartialBlockBytes;
+ m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::WriteChunks, (uint32_t)TaskSteps::StepCount);
- const uint64_t TotalPartialBlockRequestBytes =
- std::accumulate(BlockRangeWorks.begin(),
- BlockRangeWorks.end(),
- uint64_t(0),
- [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; });
- uint64_t TotalExtraPartialBlocksRequests = BlockRangeWorks.size() - PartialBlockIndexes.size();
+ Stopwatch WriteTimer;
- uint64_t TotalSavedBlocksSize = TotalPartialBlockBytes - TotalPartialBlockRequestBytes;
- double SavedSizePercent = (TotalSavedBlocksSize * 100.0) / NonPartialTotalBlockBytes;
+ FilteredRate FilteredDownloadedBytesPerSecond;
+ FilteredRate FilteredWrittenBytesPerSecond;
- if (!m_Options.IsQuiet)
- {
- ZEN_OPERATION_LOG_INFO(m_LogOutput,
- "Analysis of partial block requests saves download of {} out of {} ({:.1f}%) using {} extra "
- "requests. Completed in {}",
- NiceBytes(TotalSavedBlocksSize),
- NiceBytes(NonPartialTotalBlockBytes),
- SavedSizePercent,
- TotalExtraPartialBlocksRequests,
- NiceTimeSpanMs(ExistsResult.ElapsedTimeMs));
- }
- }
- }
+ std::unique_ptr<OperationLogOutput::ProgressBar> WriteProgressBarPtr(
+ m_LogOutput.CreateProgressBar(m_Options.PrimeCacheOnly ? "Downloading" : "Writing"));
+ OperationLogOutput::ProgressBar& WriteProgressBar(*WriteProgressBarPtr);
+ ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ TotalPartWriteCount += CopyChunkDatas.size();
+ TotalPartWriteCount += ScavengedSequenceCopyOperations.size();
BufferedWriteFileCache WriteCache;
@@ -1472,13 +1345,23 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
});
}
- for (size_t BlockRangeIndex = 0; BlockRangeIndex < BlockRangeWorks.size(); BlockRangeIndex++)
+ for (size_t BlockRangeIndex = 0; BlockRangeIndex < PartialBlocks.BlockRanges.size();)
{
ZEN_ASSERT(!m_Options.PrimeCacheOnly);
if (m_AbortFlag)
{
break;
}
+
+ size_t RangeCount = 1;
+ size_t RangesLeft = PartialBlocks.BlockRanges.size() - BlockRangeIndex;
+ const ChunkBlockAnalyser::BlockRangeDescriptor& CurrentBlockRange = PartialBlocks.BlockRanges[BlockRangeIndex];
+ while (RangeCount < RangesLeft &&
+ CurrentBlockRange.BlockIndex == PartialBlocks.BlockRanges[BlockRangeIndex + RangeCount].BlockIndex)
+ {
+ RangeCount++;
+ }
+
Work.ScheduleWork(
m_NetworkPool,
[this,
@@ -1492,18 +1375,19 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
TotalPartWriteCount,
&FilteredWrittenBytesPerSecond,
&Work,
- &BlockRangeWorks,
- BlockRangeIndex](std::atomic<bool>&) {
+ &PartialBlocks,
+ BlockRangeStartIndex = BlockRangeIndex,
+ RangeCount = RangeCount](std::atomic<bool>&) {
if (!m_AbortFlag)
{
- ZEN_TRACE_CPU("Async_GetPartialBlock");
-
- const BlockRangeDescriptor& BlockRange = BlockRangeWorks[BlockRangeIndex];
+ ZEN_TRACE_CPU("Async_GetPartialBlockRanges");
FilteredDownloadedBytesPerSecond.Start();
DownloadPartialBlock(
- BlockRange,
+ PartialBlocks.BlockRanges,
+ BlockRangeStartIndex,
+ RangeCount,
ExistsResult,
[this,
&RemoteChunkIndexNeedsCopyFromSourceFlags,
@@ -1515,7 +1399,10 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
TotalPartWriteCount,
&FilteredDownloadedBytesPerSecond,
&FilteredWrittenBytesPerSecond,
- &BlockRange](IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath) {
+ &PartialBlocks](IoBuffer&& InMemoryBuffer,
+ const std::filesystem::path& OnDiskPath,
+ size_t BlockRangeStartIndex,
+ std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengths) {
if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount)
{
FilteredDownloadedBytesPerSecond.Stop();
@@ -1533,14 +1420,18 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
&Work,
TotalPartWriteCount,
&FilteredWrittenBytesPerSecond,
- &BlockRange,
+ &PartialBlocks,
+ BlockRangeStartIndex,
BlockChunkPath = std::filesystem::path(OnDiskPath),
- BlockPartialBuffer = std::move(InMemoryBuffer)](std::atomic<bool>&) mutable {
+ BlockPartialBuffer = std::move(InMemoryBuffer),
+ OffsetAndLengths = std::vector<std::pair<uint64_t, uint64_t>>(OffsetAndLengths.begin(),
+ OffsetAndLengths.end())](
+ std::atomic<bool>&) mutable {
if (!m_AbortFlag)
{
ZEN_TRACE_CPU("Async_WritePartialBlock");
- const uint32_t BlockIndex = BlockRange.BlockIndex;
+ const uint32_t BlockIndex = PartialBlocks.BlockRanges[BlockRangeStartIndex].BlockIndex;
const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
@@ -1563,22 +1454,41 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
FilteredWrittenBytesPerSecond.Start();
- if (!WritePartialBlockChunksToCache(
- BlockDescription,
- SequenceIndexChunksLeftToWriteCounters,
- Work,
- CompositeBuffer(std::move(BlockPartialBuffer)),
- BlockRange.ChunkBlockIndexStart,
- BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1,
- RemoteChunkIndexNeedsCopyFromSourceFlags,
- WriteCache))
+ size_t RangeCount = OffsetAndLengths.size();
+
+ for (size_t PartialRangeIndex = 0; PartialRangeIndex < RangeCount; PartialRangeIndex++)
{
- std::error_code DummyEc;
- RemoveFile(BlockChunkPath, DummyEc);
- throw std::runtime_error(
- fmt::format("Partial block {} is malformed", BlockDescription.BlockHash));
- }
+ const std::pair<uint64_t, uint64_t>& OffsetAndLength =
+ OffsetAndLengths[PartialRangeIndex];
+ IoBuffer BlockRangeBuffer(BlockPartialBuffer,
+ OffsetAndLength.first,
+ OffsetAndLength.second);
+
+ const ChunkBlockAnalyser::BlockRangeDescriptor& RangeDescriptor =
+ PartialBlocks.BlockRanges[BlockRangeStartIndex + PartialRangeIndex];
+
+ if (!WritePartialBlockChunksToCache(BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ CompositeBuffer(std::move(BlockRangeBuffer)),
+ RangeDescriptor.ChunkBlockIndexStart,
+ RangeDescriptor.ChunkBlockIndexStart +
+ RangeDescriptor.ChunkBlockIndexCount - 1,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ WriteCache))
+ {
+ std::error_code DummyEc;
+ RemoveFile(BlockChunkPath, DummyEc);
+ throw std::runtime_error(
+ fmt::format("Partial block {} is malformed", BlockDescription.BlockHash));
+ }
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
+ }
std::error_code Ec = TryRemoveFile(BlockChunkPath);
if (Ec)
{
@@ -1588,12 +1498,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
Ec.value(),
Ec.message());
}
-
- WritePartsComplete++;
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
- }
}
},
OnDiskPath.empty() ? WorkerThreadPool::EMode::DisableBacklog
@@ -1602,9 +1506,10 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
});
}
});
+ BlockRangeIndex += RangeCount;
}
- for (uint32_t BlockIndex : FullBlockWorks)
+ for (uint32_t BlockIndex : PartialBlocks.FullBlockIndexes)
{
if (m_AbortFlag)
{
@@ -1641,20 +1546,20 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
IoBuffer BlockBuffer;
const bool ExistsInCache =
- m_Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash);
+ m_Storage.CacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash);
if (ExistsInCache)
{
- BlockBuffer = m_Storage.BuildCacheStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash);
+ BlockBuffer = m_Storage.CacheStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash);
}
if (!BlockBuffer)
{
BlockBuffer = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash);
- if (BlockBuffer && m_Storage.BuildCacheStorage && m_Options.PopulateCache)
+ if (BlockBuffer && m_Storage.CacheStorage && m_Options.PopulateCache)
{
- m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId,
- BlockDescription.BlockHash,
- ZenContentType::kCompressedBinary,
- CompositeBuffer(SharedBuffer(BlockBuffer)));
+ m_Storage.CacheStorage->PutBuildBlob(m_BuildId,
+ BlockDescription.BlockHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(SharedBuffer(BlockBuffer)));
}
}
if (!BlockBuffer)
@@ -3217,10 +3122,10 @@ BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkInde
const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
// FilteredDownloadedBytesPerSecond.Start();
IoBuffer BuildBlob;
- const bool ExistsInCache = m_Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash);
+ const bool ExistsInCache = m_Storage.CacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash);
if (ExistsInCache)
{
- BuildBlob = m_Storage.BuildCacheStorage->GetBuildBlob(m_BuildId, ChunkHash);
+ BuildBlob = m_Storage.CacheStorage->GetBuildBlob(m_BuildId, ChunkHash);
}
if (BuildBlob)
{
@@ -3248,12 +3153,12 @@ BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkInde
m_DownloadStats.DownloadedChunkCount++;
m_DownloadStats.RequestsCompleteCount++;
- if (Payload && m_Storage.BuildCacheStorage && m_Options.PopulateCache)
+ if (Payload && m_Storage.CacheStorage && m_Options.PopulateCache)
{
- m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId,
- ChunkHash,
- ZenContentType::kCompressedBinary,
- CompositeBuffer(SharedBuffer(Payload)));
+ m_Storage.CacheStorage->PutBuildBlob(m_BuildId,
+ ChunkHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(SharedBuffer(Payload)));
}
OnDownloaded(std::move(Payload));
@@ -3262,12 +3167,12 @@ BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkInde
else
{
BuildBlob = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, ChunkHash);
- if (BuildBlob && m_Storage.BuildCacheStorage && m_Options.PopulateCache)
+ if (BuildBlob && m_Storage.CacheStorage && m_Options.PopulateCache)
{
- m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId,
- ChunkHash,
- ZenContentType::kCompressedBinary,
- CompositeBuffer(SharedBuffer(BuildBlob)));
+ m_Storage.CacheStorage->PutBuildBlob(m_BuildId,
+ ChunkHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(SharedBuffer(BuildBlob)));
}
if (!BuildBlob)
{
@@ -3289,347 +3194,241 @@ BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkInde
}
}
-BuildsOperationUpdateFolder::BlockRangeDescriptor
-BuildsOperationUpdateFolder::MergeBlockRanges(std::span<const BlockRangeDescriptor> Ranges)
+void
+BuildsOperationUpdateFolder::DownloadPartialBlock(
+ std::span<const ChunkBlockAnalyser::BlockRangeDescriptor> BlockRanges,
+ size_t BlockRangeStartIndex,
+ size_t BlockRangeCount,
+ const BlobsExistsResult& ExistsResult,
+ std::function<void(IoBuffer&& InMemoryBuffer,
+ const std::filesystem::path& OnDiskPath,
+ size_t BlockRangeStartIndex,
+ std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengths)>&& OnDownloaded)
{
- ZEN_ASSERT(Ranges.size() > 1);
- const BlockRangeDescriptor& First = Ranges.front();
- const BlockRangeDescriptor& Last = Ranges.back();
-
- return BlockRangeDescriptor{.BlockIndex = First.BlockIndex,
- .RangeStart = First.RangeStart,
- .RangeLength = Last.RangeStart + Last.RangeLength - First.RangeStart,
- .ChunkBlockIndexStart = First.ChunkBlockIndexStart,
- .ChunkBlockIndexCount = Last.ChunkBlockIndexStart + Last.ChunkBlockIndexCount - First.ChunkBlockIndexStart};
-}
+ const uint32_t BlockIndex = BlockRanges[BlockRangeStartIndex].BlockIndex;
-std::optional<std::vector<BuildsOperationUpdateFolder::BlockRangeDescriptor>>
-BuildsOperationUpdateFolder::MakeOptionalBlockRangeVector(uint64_t TotalBlockSize, const BlockRangeDescriptor& Range)
-{
- if (Range.RangeLength == TotalBlockSize)
- {
- return {};
- }
- else
- {
- return std::vector<BlockRangeDescriptor>{Range};
- }
-};
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
-const BuildsOperationUpdateFolder::BlockRangeLimit*
-BuildsOperationUpdateFolder::GetBlockRangeLimitForRange(std::span<const BlockRangeLimit> Limits,
- uint64_t TotalBlockSize,
- std::span<const BlockRangeDescriptor> Ranges)
-{
- if (Ranges.size() > 1)
- {
- const std::uint64_t WantedSize =
- std::accumulate(Ranges.begin(), Ranges.end(), uint64_t(0), [](uint64_t Current, const BlockRangeDescriptor& Range) {
- return Current + Range.RangeLength;
- });
+ auto ProcessDownload = [this](
+ const ChunkBlockDescription& BlockDescription,
+ IoBuffer&& BlockRangeBuffer,
+ size_t BlockRangeStartIndex,
+ std::span<const std::pair<uint64_t, uint64_t>> BlockOffsetAndLengths,
+ const std::function<void(IoBuffer && InMemoryBuffer,
+ const std::filesystem::path& OnDiskPath,
+ size_t BlockRangeStartIndex,
+ std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengths)>& OnDownloaded) {
+ uint64_t BlockRangeBufferSize = BlockRangeBuffer.GetSize();
+ m_DownloadStats.DownloadedBlockCount++;
+ m_DownloadStats.DownloadedBlockByteCount += BlockRangeBufferSize;
+ m_DownloadStats.RequestsCompleteCount += BlockOffsetAndLengths.size();
- const double RangeRequestedPercent = (WantedSize * 100.0) / TotalBlockSize;
+ std::filesystem::path BlockChunkPath;
- for (const BlockRangeLimit& Limit : Limits)
+ // Check if the dowloaded block is file based and we can move it directly without rewriting it
{
- if (RangeRequestedPercent >= Limit.SizePercent && Ranges.size() > Limit.MaxRangeCount)
+ IoBufferFileReference FileRef;
+ if (BlockRangeBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) &&
+ (FileRef.FileChunkSize == BlockRangeBufferSize))
{
- return &Limit;
- }
- }
- }
- return nullptr;
-};
+ ZEN_TRACE_CPU("MoveTempPartialBlock");
-std::vector<BuildsOperationUpdateFolder::BlockRangeDescriptor>
-BuildsOperationUpdateFolder::CollapseBlockRanges(const uint64_t AlwaysAcceptableGap, std::span<const BlockRangeDescriptor> BlockRanges)
-{
- ZEN_ASSERT(BlockRanges.size() > 1);
- std::vector<BlockRangeDescriptor> CollapsedBlockRanges;
+ std::error_code Ec;
+ std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
+ if (!Ec)
+ {
+ BlockRangeBuffer.SetDeleteOnClose(false);
+ BlockRangeBuffer = {};
- auto BlockRangesIt = BlockRanges.begin();
- CollapsedBlockRanges.push_back(*BlockRangesIt++);
- for (; BlockRangesIt != BlockRanges.end(); BlockRangesIt++)
- {
- BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back();
+ IoHashStream RangeId;
+ for (const std::pair<uint64_t, uint64_t>& Range : BlockOffsetAndLengths)
+ {
+ RangeId.Append(&Range.first, sizeof(uint64_t));
+ RangeId.Append(&Range.second, sizeof(uint64_t));
+ }
+
+ BlockChunkPath = m_TempBlockFolderPath / fmt::format("{}_{}", BlockDescription.BlockHash, RangeId.GetHash());
+ RenameFile(TempBlobPath, BlockChunkPath, Ec);
+ if (Ec)
+ {
+ BlockChunkPath = std::filesystem::path{};
- const uint64_t BothRangeSize = BlockRangesIt->RangeLength + LastRange.RangeLength;
+ // Re-open the temp file again
+ BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
+ BlockRangeBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockRangeBufferSize, true);
+ BlockRangeBuffer.SetDeleteOnClose(true);
+ }
+ }
+ }
+ }
- const uint64_t Gap = BlockRangesIt->RangeStart - (LastRange.RangeStart + LastRange.RangeLength);
- if (Gap <= Max(BothRangeSize / 16, AlwaysAcceptableGap))
+ if (BlockChunkPath.empty() && (BlockRangeBufferSize > m_Options.MaximumInMemoryPayloadSize))
{
- LastRange.ChunkBlockIndexCount =
- (BlockRangesIt->ChunkBlockIndexStart + BlockRangesIt->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart;
- LastRange.RangeLength = (BlockRangesIt->RangeStart + BlockRangesIt->RangeLength) - LastRange.RangeStart;
+ ZEN_TRACE_CPU("WriteTempPartialBlock");
+
+ IoHashStream RangeId;
+ for (const std::pair<uint64_t, uint64_t>& Range : BlockOffsetAndLengths)
+ {
+ RangeId.Append(&Range.first, sizeof(uint64_t));
+ RangeId.Append(&Range.second, sizeof(uint64_t));
+ }
+
+ // Could not be moved and rather large, lets store it on disk
+ BlockChunkPath = m_TempBlockFolderPath / fmt::format("{}_{}", BlockDescription.BlockHash, RangeId.GetHash());
+ TemporaryFile::SafeWriteFile(BlockChunkPath, BlockRangeBuffer);
+ BlockRangeBuffer = {};
}
- else
+ if (!m_AbortFlag)
{
- CollapsedBlockRanges.push_back(*BlockRangesIt);
+ OnDownloaded(std::move(BlockRangeBuffer), std::move(BlockChunkPath), BlockRangeStartIndex, BlockOffsetAndLengths);
}
- }
-
- return CollapsedBlockRanges;
-};
+ };
-uint64_t
-BuildsOperationUpdateFolder::CalculateNextGap(std::span<const BlockRangeDescriptor> BlockRanges)
-{
- ZEN_ASSERT(BlockRanges.size() > 1);
- uint64_t AcceptableGap = (uint64_t)-1;
- for (size_t RangeIndex = 0; RangeIndex < BlockRanges.size() - 1; RangeIndex++)
+ std::vector<std::pair<uint64_t, uint64_t>> Ranges;
+ Ranges.reserve(BlockRangeCount);
+ for (size_t BlockRangeIndex = BlockRangeStartIndex; BlockRangeIndex < BlockRangeStartIndex + BlockRangeCount; BlockRangeIndex++)
{
- const BlockRangeDescriptor& Range = BlockRanges[RangeIndex];
- const BlockRangeDescriptor& NextRange = BlockRanges[RangeIndex + 1];
-
- const uint64_t Gap = NextRange.RangeStart - (Range.RangeStart + Range.RangeLength);
- AcceptableGap = Min(Gap, AcceptableGap);
+ const ChunkBlockAnalyser::BlockRangeDescriptor& BlockRange = BlockRanges[BlockRangeIndex];
+ Ranges.push_back(std::make_pair(BlockRange.RangeStart, BlockRange.RangeLength));
}
- AcceptableGap = RoundUp(AcceptableGap, 16u * 1024u);
- return AcceptableGap;
-};
-std::optional<std::vector<BuildsOperationUpdateFolder::BlockRangeDescriptor>>
-BuildsOperationUpdateFolder::CalculateBlockRanges(uint32_t BlockIndex,
- const ChunkBlockDescription& BlockDescription,
- std::span<const uint32_t> BlockChunkIndexNeeded,
- bool LimitToSingleRange,
- const uint64_t ChunkStartOffsetInBlock,
- const uint64_t TotalBlockSize,
- uint64_t& OutTotalWantedChunksSize)
-{
- ZEN_TRACE_CPU("CalculateBlockRanges");
+ const bool ExistsInCache = m_Storage.CacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash);
- std::vector<BlockRangeDescriptor> BlockRanges;
+ size_t SubBlockRangeCount = BlockRangeCount;
+ size_t SubRangeCountComplete = 0;
+ std::span<const std::pair<uint64_t, uint64_t>> RangesSpan(Ranges);
+ while (SubRangeCountComplete < SubBlockRangeCount)
{
- uint64_t CurrentOffset = ChunkStartOffsetInBlock;
- uint32_t ChunkBlockIndex = 0;
- uint32_t NeedBlockChunkIndexOffset = 0;
- BlockRangeDescriptor NextRange{.BlockIndex = BlockIndex};
- while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() && ChunkBlockIndex < BlockDescription.ChunkRawHashes.size())
+ if (m_AbortFlag)
+ {
+ break;
+ }
+
+ // First try to get subrange from cache.
+ // If not successful, try to get the ranges from the build store and adapt SubRangeCount...
+
+ size_t SubRangeStartIndex = BlockRangeStartIndex + SubRangeCountComplete;
+ if (ExistsInCache)
{
- const uint32_t ChunkCompressedLength = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex];
- if (ChunkBlockIndex < BlockChunkIndexNeeded[NeedBlockChunkIndexOffset])
+ size_t SubRangeCount = Min(BlockRangeCount - SubRangeCountComplete, m_Storage.CacheHost.Caps.MaxRangeCountPerRequest);
+
+ if (SubRangeCount == 1)
{
- if (NextRange.RangeLength > 0)
+ // Legacy single-range path, prefer that for max compatibility
+
+ const std::pair<uint64_t, uint64_t> SubRange = RangesSpan[SubRangeCountComplete];
+ IoBuffer PayloadBuffer =
+ m_Storage.CacheStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash, SubRange.first, SubRange.second);
+ if (m_AbortFlag)
{
- BlockRanges.push_back(NextRange);
- NextRange = {.BlockIndex = BlockIndex};
+ break;
}
- ChunkBlockIndex++;
- CurrentOffset += ChunkCompressedLength;
- }
- else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset])
- {
- if (NextRange.RangeLength == 0)
+ if (PayloadBuffer)
{
- NextRange.RangeStart = CurrentOffset;
- NextRange.ChunkBlockIndexStart = ChunkBlockIndex;
+ ProcessDownload(BlockDescription,
+ std::move(PayloadBuffer),
+ SubRangeStartIndex,
+ std::vector<std::pair<uint64_t, uint64_t>>{std::make_pair(0u, SubRange.second)},
+ OnDownloaded);
+ SubRangeCountComplete += SubRangeCount;
+ continue;
}
- NextRange.RangeLength += ChunkCompressedLength;
- NextRange.ChunkBlockIndexCount++;
- ChunkBlockIndex++;
- CurrentOffset += ChunkCompressedLength;
- NeedBlockChunkIndexOffset++;
}
else
{
- ZEN_ASSERT(false);
- }
- }
- if (NextRange.RangeLength > 0)
- {
- BlockRanges.push_back(NextRange);
- }
- }
- ZEN_ASSERT(!BlockRanges.empty());
-
- OutTotalWantedChunksSize =
- std::accumulate(BlockRanges.begin(), BlockRanges.end(), uint64_t(0), [](uint64_t Current, const BlockRangeDescriptor& Range) {
- return Current + Range.RangeLength;
- });
+ auto SubRanges = RangesSpan.subspan(SubRangeCountComplete, SubRangeCount);
- double RangeWantedPercent = (OutTotalWantedChunksSize * 100.0) / TotalBlockSize;
-
- if (BlockRanges.size() == 1)
- {
- if (m_Options.IsVerbose)
- {
- ZEN_OPERATION_LOG_INFO(m_LogOutput,
- "Range request of {} ({:.2f}%) using single range from block {} ({}) as is",
- NiceBytes(OutTotalWantedChunksSize),
- RangeWantedPercent,
- BlockDescription.BlockHash,
- NiceBytes(TotalBlockSize));
+ BuildStorageCache::BuildBlobRanges RangeBuffers =
+ m_Storage.CacheStorage->GetBuildBlobRanges(m_BuildId, BlockDescription.BlockHash, SubRanges);
+ if (m_AbortFlag)
+ {
+ break;
+ }
+ if (RangeBuffers.PayloadBuffer)
+ {
+ if (RangeBuffers.Ranges.empty())
+ {
+ SubRangeCount = Ranges.size() - SubRangeCountComplete;
+ ProcessDownload(BlockDescription,
+ std::move(RangeBuffers.PayloadBuffer),
+ SubRangeStartIndex,
+ RangesSpan.subspan(SubRangeCountComplete, SubRangeCount),
+ OnDownloaded);
+ SubRangeCountComplete += SubRangeCount;
+ continue;
+ }
+ else if (RangeBuffers.Ranges.size() == SubRangeCount)
+ {
+ ProcessDownload(BlockDescription,
+ std::move(RangeBuffers.PayloadBuffer),
+ SubRangeStartIndex,
+ RangeBuffers.Ranges,
+ OnDownloaded);
+ SubRangeCountComplete += SubRangeCount;
+ continue;
+ }
+ }
+ }
}
- return BlockRanges;
- }
- if (LimitToSingleRange)
- {
- const BlockRangeDescriptor MergedRange = MergeBlockRanges(BlockRanges);
- if (m_Options.IsVerbose)
- {
- const double RangeRequestedPercent = (MergedRange.RangeLength * 100.0) / TotalBlockSize;
- const double WastedPercent = ((MergedRange.RangeLength - OutTotalWantedChunksSize) * 100.0) / MergedRange.RangeLength;
+ size_t SubRangeCount = Min(BlockRangeCount - SubRangeCountComplete, m_Storage.BuildStorageHost.Caps.MaxRangeCountPerRequest);
- ZEN_OPERATION_LOG_INFO(
- m_LogOutput,
- "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) limited to single block range {} ({:.2f}%) wasting "
- "{:.2f}% ({})",
- NiceBytes(OutTotalWantedChunksSize),
- RangeWantedPercent,
- BlockRanges.size(),
- BlockDescription.BlockHash,
- NiceBytes(TotalBlockSize),
- NiceBytes(MergedRange.RangeLength),
- RangeRequestedPercent,
- WastedPercent,
- NiceBytes(MergedRange.RangeLength - OutTotalWantedChunksSize));
- }
- return MakeOptionalBlockRangeVector(TotalBlockSize, MergedRange);
- }
+ auto SubRanges = RangesSpan.subspan(SubRangeCountComplete, SubRangeCount);
- if (RangeWantedPercent > FullBlockRangePercentLimit)
- {
- const BlockRangeDescriptor MergedRange = MergeBlockRanges(BlockRanges);
- if (m_Options.IsVerbose)
+ BuildStorageBase::BuildBlobRanges RangeBuffers =
+ m_Storage.BuildStorage->GetBuildBlobRanges(m_BuildId, BlockDescription.BlockHash, SubRanges);
+ if (m_AbortFlag)
{
- const double RangeRequestedPercent = (MergedRange.RangeLength * 100.0) / TotalBlockSize;
- const double WastedPercent = ((MergedRange.RangeLength - OutTotalWantedChunksSize) * 100.0) / MergedRange.RangeLength;
-
- ZEN_OPERATION_LOG_INFO(
- m_LogOutput,
- "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) exceeds {}%. Merged to single block range {} "
- "({:.2f}%) wasting {:.2f}% ({})",
- NiceBytes(OutTotalWantedChunksSize),
- RangeWantedPercent,
- BlockRanges.size(),
- BlockDescription.BlockHash,
- NiceBytes(TotalBlockSize),
- FullBlockRangePercentLimit,
- NiceBytes(MergedRange.RangeLength),
- RangeRequestedPercent,
- WastedPercent,
- NiceBytes(MergedRange.RangeLength - OutTotalWantedChunksSize));
+ break;
}
- return MakeOptionalBlockRangeVector(TotalBlockSize, MergedRange);
- }
-
- std::vector<BlockRangeDescriptor> CollapsedBlockRanges = CollapseBlockRanges(16u * 1024u, BlockRanges);
- while (GetBlockRangeLimitForRange(ForceMergeLimits, TotalBlockSize, CollapsedBlockRanges))
- {
- CollapsedBlockRanges = CollapseBlockRanges(CalculateNextGap(CollapsedBlockRanges), CollapsedBlockRanges);
- }
-
- const std::uint64_t WantedCollapsedSize =
- std::accumulate(CollapsedBlockRanges.begin(),
- CollapsedBlockRanges.end(),
- uint64_t(0),
- [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; });
-
- const double CollapsedRangeRequestedPercent = (WantedCollapsedSize * 100.0) / TotalBlockSize;
-
- if (m_Options.IsVerbose)
- {
- const double WastedPercent = ((WantedCollapsedSize - OutTotalWantedChunksSize) * 100.0) / WantedCollapsedSize;
-
- ZEN_OPERATION_LOG_INFO(
- m_LogOutput,
- "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) collapsed to {} {:.2f}% using {} ranges wasting {:.2f}% "
- "({})",
- NiceBytes(OutTotalWantedChunksSize),
- RangeWantedPercent,
- BlockRanges.size(),
- BlockDescription.BlockHash,
- NiceBytes(TotalBlockSize),
- NiceBytes(WantedCollapsedSize),
- CollapsedRangeRequestedPercent,
- CollapsedBlockRanges.size(),
- WastedPercent,
- NiceBytes(WantedCollapsedSize - OutTotalWantedChunksSize));
- }
- return CollapsedBlockRanges;
-}
-
-void
-BuildsOperationUpdateFolder::DownloadPartialBlock(
- const BlockRangeDescriptor BlockRange,
- const BlobsExistsResult& ExistsResult,
- std::function<void(IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath)>&& OnDownloaded)
-{
- const uint32_t BlockIndex = BlockRange.BlockIndex;
-
- const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
-
- IoBuffer BlockBuffer;
- if (m_Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash))
- {
- BlockBuffer =
- m_Storage.BuildCacheStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength);
- }
- if (!BlockBuffer)
- {
- BlockBuffer =
- m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength);
- }
- if (!BlockBuffer)
- {
- throw std::runtime_error(fmt::format("Block {} is missing when fetching range {} -> {}",
- BlockDescription.BlockHash,
- BlockRange.RangeStart,
- BlockRange.RangeStart + BlockRange.RangeLength));
- }
- if (!m_AbortFlag)
- {
- uint64_t BlockSize = BlockBuffer.GetSize();
- m_DownloadStats.DownloadedBlockCount++;
- m_DownloadStats.DownloadedBlockByteCount += BlockSize;
- m_DownloadStats.RequestsCompleteCount++;
-
- std::filesystem::path BlockChunkPath;
-
- // Check if the dowloaded block is file based and we can move it directly without rewriting it
+ if (RangeBuffers.PayloadBuffer)
{
- IoBufferFileReference FileRef;
- if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == BlockSize))
+ if (RangeBuffers.Ranges.empty())
{
- ZEN_TRACE_CPU("MoveTempPartialBlock");
+ // Jupiter will ignore the ranges and send the whole payload if it fetches the payload from S3
+ // Upload to cache (if enabled) and use the whole payload for the remaining ranges
- std::error_code Ec;
- std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
- if (!Ec)
+ if (m_Storage.CacheStorage && m_Options.PopulateCache)
{
- BlockBuffer.SetDeleteOnClose(false);
- BlockBuffer = {};
- BlockChunkPath = m_TempBlockFolderPath /
- fmt::format("{}_{:x}_{:x}", BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength);
- RenameFile(TempBlobPath, BlockChunkPath, Ec);
- if (Ec)
+ m_Storage.CacheStorage->PutBuildBlob(m_BuildId,
+ BlockDescription.BlockHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(std::vector<IoBuffer>{RangeBuffers.PayloadBuffer}));
+ if (m_AbortFlag)
{
- BlockChunkPath = std::filesystem::path{};
-
- // Re-open the temp file again
- BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
- BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true);
- BlockBuffer.SetDeleteOnClose(true);
+ break;
}
}
- }
- }
- if (BlockChunkPath.empty() && (BlockSize > m_Options.MaximumInMemoryPayloadSize))
- {
- ZEN_TRACE_CPU("WriteTempPartialBlock");
- // Could not be moved and rather large, lets store it on disk
- BlockChunkPath = m_TempBlockFolderPath /
- fmt::format("{}_{:x}_{:x}", BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength);
- TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
- BlockBuffer = {};
+ SubRangeCount = Ranges.size() - SubRangeCountComplete;
+ ProcessDownload(BlockDescription,
+ std::move(RangeBuffers.PayloadBuffer),
+ SubRangeStartIndex,
+ RangesSpan.subspan(SubRangeCountComplete, SubRangeCount),
+ OnDownloaded);
+ }
+ else
+ {
+ if (RangeBuffers.Ranges.size() != SubRanges.size())
+ {
+ throw std::runtime_error(fmt::format("Fetching {} ranges from {} resulted in {} ranges",
+ SubRanges.size(),
+ BlockDescription.BlockHash,
+ RangeBuffers.Ranges.size()));
+ }
+ ProcessDownload(BlockDescription,
+ std::move(RangeBuffers.PayloadBuffer),
+ SubRangeStartIndex,
+ RangeBuffers.Ranges,
+ OnDownloaded);
+ }
}
- if (!m_AbortFlag)
+ else
{
- OnDownloaded(std::move(BlockBuffer), std::move(BlockChunkPath));
+ throw std::runtime_error(fmt::format("Block {} is missing when fetching {} ranges", BlockDescription.BlockHash, SubRangeCount));
}
+
+ SubRangeCountComplete += SubRangeCount;
}
}
@@ -4083,7 +3882,8 @@ BuildsOperationUpdateFolder::WriteSequenceChunkToCache(BufferedWriteFileCache::L
}
bool
-BuildsOperationUpdateFolder::GetBlockWriteOps(std::span<const IoHash> ChunkRawHashes,
+BuildsOperationUpdateFolder::GetBlockWriteOps(const IoHash& BlockRawHash,
+ std::span<const IoHash> ChunkRawHashes,
std::span<const uint32_t> ChunkCompressedLengths,
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
@@ -4115,9 +3915,34 @@ BuildsOperationUpdateFolder::GetBlockWriteOps(std::span<const IoHash> ChunkR
uint64_t VerifyChunkSize;
CompressedBuffer CompressedChunk =
CompressedBuffer::FromCompressed(SharedBuffer::MakeView(ChunkMemoryView), VerifyChunkHash, VerifyChunkSize);
- ZEN_ASSERT(CompressedChunk);
- ZEN_ASSERT(VerifyChunkHash == ChunkHash);
- ZEN_ASSERT(VerifyChunkSize == m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]);
+ if (!CompressedChunk)
+ {
+ throw std::runtime_error(fmt::format("Chunk {} at {}, size {} in block {} is not a valid compressed buffer",
+ ChunkHash,
+ OffsetInBlock,
+ ChunkCompressedSize,
+ BlockRawHash));
+ }
+ if (VerifyChunkHash != ChunkHash)
+ {
+ throw std::runtime_error(fmt::format("Chunk {} at {}, size {} in block {} has a mismatching content hash {}",
+ ChunkHash,
+ OffsetInBlock,
+ ChunkCompressedSize,
+ BlockRawHash,
+ VerifyChunkHash));
+ }
+ if (VerifyChunkSize != m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex])
+ {
+ throw std::runtime_error(
+ fmt::format("Chunk {} at {}, size {} in block {} has a mismatching raw size {}, expected {}",
+ ChunkHash,
+ OffsetInBlock,
+ ChunkCompressedSize,
+ BlockRawHash,
+ VerifyChunkSize,
+ m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]));
+ }
OodleCompressor ChunkCompressor;
OodleCompressionLevel ChunkCompressionLevel;
@@ -4138,7 +3963,18 @@ BuildsOperationUpdateFolder::GetBlockWriteOps(std::span<const IoHash> ChunkR
{
Decompressed = CompressedChunk.Decompress().AsIoBuffer();
}
- ZEN_ASSERT(Decompressed.GetSize() == m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]);
+
+ if (Decompressed.GetSize() != m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex])
+ {
+ throw std::runtime_error(fmt::format("Chunk {} at {}, size {} in block {} decompressed to size {}, expected {}",
+ ChunkHash,
+ OffsetInBlock,
+ ChunkCompressedSize,
+ BlockRawHash,
+ Decompressed.GetSize(),
+ m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]));
+ }
+
ZEN_ASSERT_SLOW(ChunkHash == IoHash::HashBuffer(Decompressed));
for (const ChunkedContentLookup::ChunkSequenceLocation* Target : ChunkTargetPtrs)
{
@@ -4237,7 +4073,8 @@ BuildsOperationUpdateFolder::WriteChunksBlockToCache(const ChunkBlockDescription
const std::vector<uint32_t> ChunkCompressedLengths =
ReadChunkBlockHeader(BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder()), HeaderSize);
- if (GetBlockWriteOps(BlockDescription.ChunkRawHashes,
+ if (GetBlockWriteOps(BlockDescription.BlockHash,
+ BlockDescription.ChunkRawHashes,
ChunkCompressedLengths,
SequenceIndexChunksLeftToWriteCounters,
RemoteChunkIndexNeedsCopyFromSourceFlags,
@@ -4252,7 +4089,8 @@ BuildsOperationUpdateFolder::WriteChunksBlockToCache(const ChunkBlockDescription
return false;
}
- if (GetBlockWriteOps(BlockDescription.ChunkRawHashes,
+ if (GetBlockWriteOps(BlockDescription.BlockHash,
+ BlockDescription.ChunkRawHashes,
BlockDescription.ChunkCompressedLengths,
SequenceIndexChunksLeftToWriteCounters,
RemoteChunkIndexNeedsCopyFromSourceFlags,
@@ -4283,7 +4121,8 @@ BuildsOperationUpdateFolder::WritePartialBlockChunksToCache(const ChunkBlockDesc
const MemoryView BlockView = BlockMemoryBuffer.GetView();
BlockWriteOps Ops;
- if (GetBlockWriteOps(BlockDescription.ChunkRawHashes,
+ if (GetBlockWriteOps(BlockDescription.BlockHash,
+ BlockDescription.ChunkRawHashes,
BlockDescription.ChunkCompressedLengths,
SequenceIndexChunksLeftToWriteCounters,
RemoteChunkIndexNeedsCopyFromSourceFlags,
@@ -5156,12 +4995,12 @@ BuildsOperationUploadFolder::GenerateBuildBlocks(const ChunkedFolderContent&
const IoHash& BlockHash = OutBlocks.BlockDescriptions[BlockIndex].BlockHash;
const uint64_t CompressedBlockSize = Payload.GetCompressedSize();
- if (m_Storage.BuildCacheStorage && m_Options.PopulateCache)
+ if (m_Storage.CacheStorage && m_Options.PopulateCache)
{
- m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId,
- BlockHash,
- ZenContentType::kCompressedBinary,
- Payload.GetCompressed());
+ m_Storage.CacheStorage->PutBuildBlob(m_BuildId,
+ BlockHash,
+ ZenContentType::kCompressedBinary,
+ Payload.GetCompressed());
}
m_Storage.BuildStorage->PutBuildBlob(m_BuildId,
@@ -5179,11 +5018,11 @@ BuildsOperationUploadFolder::GenerateBuildBlocks(const ChunkedFolderContent&
OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size());
}
- if (m_Storage.BuildCacheStorage && m_Options.PopulateCache)
+ if (m_Storage.CacheStorage && m_Options.PopulateCache)
{
- m_Storage.BuildCacheStorage->PutBlobMetadatas(m_BuildId,
- std::vector<IoHash>({BlockHash}),
- std::vector<CbObject>({BlockMetaData}));
+ m_Storage.CacheStorage->PutBlobMetadatas(m_BuildId,
+ std::vector<IoHash>({BlockHash}),
+ std::vector<CbObject>({BlockMetaData}));
}
bool MetadataSucceeded =
@@ -5334,6 +5173,13 @@ BuildsOperationUploadFolder::FetchChunk(const ChunkedFolderContent& Content,
ZEN_ASSERT(!ChunkLocations.empty());
CompositeBuffer Chunk =
OpenFileCache.GetRange(ChunkLocations[0].SequenceIndex, ChunkLocations[0].Offset, Content.ChunkedContent.ChunkRawSizes[ChunkIndex]);
+ if (!Chunk)
+ {
+ throw std::runtime_error(fmt::format("Unable to read chunk at {}, size {} from '{}'",
+ ChunkLocations[0].Offset,
+ Content.ChunkedContent.ChunkRawSizes[ChunkIndex],
+ Content.Paths[Lookup.SequenceIndexFirstPathIndex[ChunkLocations[0].SequenceIndex]]));
+ }
ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == ChunkHash);
return Chunk;
};
@@ -5362,10 +5208,7 @@ BuildsOperationUploadFolder::GenerateBlock(const ChunkedFolderContent& Content,
Content.ChunkedContent.ChunkHashes[ChunkIndex],
[this, &Content, &Lookup, &OpenFileCache, ChunkIndex](const IoHash& ChunkHash) -> std::pair<uint64_t, CompressedBuffer> {
CompositeBuffer Chunk = FetchChunk(Content, Lookup, ChunkHash, OpenFileCache);
- if (!Chunk)
- {
- ZEN_ASSERT(false);
- }
+ ZEN_ASSERT(Chunk);
uint64_t RawSize = Chunk.GetSize();
const bool ShouldCompressChunk = RawSize >= m_Options.MinimumSizeForCompressInBlock &&
@@ -6023,11 +5866,11 @@ BuildsOperationUploadFolder::UploadBuildPart(ChunkingController& ChunkController
{
const CbObject BlockMetaData =
BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]);
- if (m_Storage.BuildCacheStorage && m_Options.PopulateCache)
+ if (m_Storage.CacheStorage && m_Options.PopulateCache)
{
- m_Storage.BuildCacheStorage->PutBlobMetadatas(m_BuildId,
- std::vector<IoHash>({BlockHash}),
- std::vector<CbObject>({BlockMetaData}));
+ m_Storage.CacheStorage->PutBlobMetadatas(m_BuildId,
+ std::vector<IoHash>({BlockHash}),
+ std::vector<CbObject>({BlockMetaData}));
}
bool MetadataSucceeded = m_Storage.BuildStorage->PutBlockMetadata(m_BuildId, BlockHash, BlockMetaData);
if (MetadataSucceeded)
@@ -6221,9 +6064,9 @@ BuildsOperationUploadFolder::UploadPartBlobs(const ChunkedFolderContent& Co
const CbObject BlockMetaData =
BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]);
- if (m_Storage.BuildCacheStorage && m_Options.PopulateCache)
+ if (m_Storage.CacheStorage && m_Options.PopulateCache)
{
- m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload);
+ m_Storage.CacheStorage->PutBuildBlob(m_BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload);
}
m_Storage.BuildStorage->PutBuildBlob(m_BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload);
if (m_Options.IsVerbose)
@@ -6237,11 +6080,11 @@ BuildsOperationUploadFolder::UploadPartBlobs(const ChunkedFolderContent& Co
UploadedBlockSize += PayloadSize;
TempUploadStats.BlocksBytes += PayloadSize;
- if (m_Storage.BuildCacheStorage && m_Options.PopulateCache)
+ if (m_Storage.CacheStorage && m_Options.PopulateCache)
{
- m_Storage.BuildCacheStorage->PutBlobMetadatas(m_BuildId,
- std::vector<IoHash>({BlockHash}),
- std::vector<CbObject>({BlockMetaData}));
+ m_Storage.CacheStorage->PutBlobMetadatas(m_BuildId,
+ std::vector<IoHash>({BlockHash}),
+ std::vector<CbObject>({BlockMetaData}));
}
bool MetadataSucceeded = m_Storage.BuildStorage->PutBlockMetadata(m_BuildId, BlockHash, BlockMetaData);
if (MetadataSucceeded)
@@ -6304,9 +6147,9 @@ BuildsOperationUploadFolder::UploadPartBlobs(const ChunkedFolderContent& Co
const uint64_t PayloadSize = Payload.GetSize();
- if (m_Storage.BuildCacheStorage && m_Options.PopulateCache)
+ if (m_Storage.CacheStorage && m_Options.PopulateCache)
{
- m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload);
+ m_Storage.CacheStorage->PutBuildBlob(m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload);
}
if (PayloadSize >= LargeAttachmentSize)
@@ -7050,14 +6893,14 @@ BuildsOperationPrimeCache::Execute()
std::vector<IoHash> BlobsToDownload;
BlobsToDownload.reserve(BuildBlobs.size());
- if (m_Storage.BuildCacheStorage && !BuildBlobs.empty() && !m_Options.ForceUpload)
+ if (m_Storage.CacheStorage && !BuildBlobs.empty() && !m_Options.ForceUpload)
{
ZEN_TRACE_CPU("BlobCacheExistCheck");
Stopwatch Timer;
const std::vector<IoHash> BlobHashes(BuildBlobs.begin(), BuildBlobs.end());
const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult =
- m_Storage.BuildCacheStorage->BlobsExists(m_BuildId, BlobHashes);
+ m_Storage.CacheStorage->BlobsExists(m_BuildId, BlobHashes);
if (CacheExistsResult.size() == BlobHashes.size())
{
@@ -7104,33 +6947,33 @@ BuildsOperationPrimeCache::Execute()
for (size_t BlobIndex = 0; BlobIndex < BlobCount; BlobIndex++)
{
- Work.ScheduleWork(
- m_NetworkPool,
- [this,
- &Work,
- &BlobsToDownload,
- BlobCount,
- &LooseChunkRawSizes,
- &CompletedDownloadCount,
- &FilteredDownloadedBytesPerSecond,
- &MultipartAttachmentCount,
- BlobIndex](std::atomic<bool>&) {
- if (!m_AbortFlag)
- {
- const IoHash& BlobHash = BlobsToDownload[BlobIndex];
+ Work.ScheduleWork(m_NetworkPool,
+ [this,
+ &Work,
+ &BlobsToDownload,
+ BlobCount,
+ &LooseChunkRawSizes,
+ &CompletedDownloadCount,
+ &FilteredDownloadedBytesPerSecond,
+ &MultipartAttachmentCount,
+ BlobIndex](std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ const IoHash& BlobHash = BlobsToDownload[BlobIndex];
- bool IsLargeBlob = false;
+ bool IsLargeBlob = false;
- if (auto It = LooseChunkRawSizes.find(BlobHash); It != LooseChunkRawSizes.end())
- {
- IsLargeBlob = It->second >= m_Options.LargeAttachmentSize;
- }
+ if (auto It = LooseChunkRawSizes.find(BlobHash); It != LooseChunkRawSizes.end())
+ {
+ IsLargeBlob = It->second >= m_Options.LargeAttachmentSize;
+ }
- FilteredDownloadedBytesPerSecond.Start();
+ FilteredDownloadedBytesPerSecond.Start();
- if (IsLargeBlob)
- {
- DownloadLargeBlob(*m_Storage.BuildStorage,
+ if (IsLargeBlob)
+ {
+ DownloadLargeBlob(
+ *m_Storage.BuildStorage,
m_TempPath,
m_BuildId,
BlobHash,
@@ -7146,12 +6989,12 @@ BuildsOperationPrimeCache::Execute()
if (!m_AbortFlag)
{
- if (Payload && m_Storage.BuildCacheStorage)
+ if (Payload && m_Storage.CacheStorage)
{
- m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId,
- BlobHash,
- ZenContentType::kCompressedBinary,
- CompositeBuffer(SharedBuffer(Payload)));
+ m_Storage.CacheStorage->PutBuildBlob(m_BuildId,
+ BlobHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(SharedBuffer(Payload)));
}
}
CompletedDownloadCount++;
@@ -7160,32 +7003,32 @@ BuildsOperationPrimeCache::Execute()
FilteredDownloadedBytesPerSecond.Stop();
}
});
- }
- else
- {
- IoBuffer Payload = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlobHash);
- m_DownloadStats.DownloadedBlockCount++;
- m_DownloadStats.DownloadedBlockByteCount += Payload.GetSize();
- m_DownloadStats.RequestsCompleteCount++;
+ }
+ else
+ {
+ IoBuffer Payload = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlobHash);
+ m_DownloadStats.DownloadedBlockCount++;
+ m_DownloadStats.DownloadedBlockByteCount += Payload.GetSize();
+ m_DownloadStats.RequestsCompleteCount++;
- if (!m_AbortFlag)
- {
- if (Payload && m_Storage.BuildCacheStorage)
- {
- m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId,
- BlobHash,
- ZenContentType::kCompressedBinary,
- CompositeBuffer(SharedBuffer(std::move(Payload))));
- }
- }
- CompletedDownloadCount++;
- if (CompletedDownloadCount == BlobCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
- }
- }
- });
+ if (!m_AbortFlag)
+ {
+ if (Payload && m_Storage.CacheStorage)
+ {
+ m_Storage.CacheStorage->PutBuildBlob(m_BuildId,
+ BlobHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(SharedBuffer(std::move(Payload))));
+ }
+ }
+ CompletedDownloadCount++;
+ if (CompletedDownloadCount == BlobCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ }
+ }
+ });
}
Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
@@ -7197,10 +7040,10 @@ BuildsOperationPrimeCache::Execute()
std::string DownloadRateString = (CompletedDownloadCount == BlobCount)
? ""
: fmt::format(" {}bits/s", NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8));
- std::string UploadDetails = m_Storage.BuildCacheStorage ? fmt::format(" {} ({}) uploaded.",
- m_StorageCacheStats.PutBlobCount.load(),
- NiceBytes(m_StorageCacheStats.PutBlobByteCount.load()))
- : "";
+ std::string UploadDetails = m_Storage.CacheStorage ? fmt::format(" {} ({}) uploaded.",
+ m_StorageCacheStats.PutBlobCount.load(),
+ NiceBytes(m_StorageCacheStats.PutBlobByteCount.load()))
+ : "";
std::string Details = fmt::format("{}/{} ({}{}) downloaded.{}",
CompletedDownloadCount.load(),
@@ -7225,13 +7068,13 @@ BuildsOperationPrimeCache::Execute()
return;
}
- if (m_Storage.BuildCacheStorage)
+ if (m_Storage.CacheStorage)
{
- m_Storage.BuildCacheStorage->Flush(m_LogOutput.GetProgressUpdateDelayMS(), [this](intptr_t Remaining) -> bool {
+ m_Storage.CacheStorage->Flush(m_LogOutput.GetProgressUpdateDelayMS(), [this](intptr_t Remaining) -> bool {
ZEN_UNUSED(Remaining);
if (!m_Options.IsQuiet)
{
- ZEN_OPERATION_LOG_INFO(m_LogOutput, "Waiting for {} blobs to finish upload to '{}'", Remaining, m_Storage.CacheName);
+ ZEN_OPERATION_LOG_INFO(m_LogOutput, "Waiting for {} blobs to finish upload to '{}'", Remaining, m_Storage.CacheHost.Name);
}
return !m_AbortFlag;
});
@@ -7431,16 +7274,31 @@ GetRemoteContent(OperationLogOutput& Output,
// TODO: GetBlockDescriptions for all BlockRawHashes in one go - check for local block descriptions when we cache them
{
+ if (!IsQuiet)
+ {
+ ZEN_OPERATION_LOG_INFO(Output, "Fetching metadata for {} blocks", BlockRawHashes.size());
+ }
+
+ Stopwatch GetBlockMetadataTimer;
+
bool AttemptFallback = false;
OutBlockDescriptions = GetBlockDescriptions(Output,
*Storage.BuildStorage,
- Storage.BuildCacheStorage.get(),
+ Storage.CacheStorage.get(),
BuildId,
- BuildPartId,
BlockRawHashes,
AttemptFallback,
IsQuiet,
IsVerbose);
+
+ if (!IsQuiet)
+ {
+ ZEN_OPERATION_LOG_INFO(Output,
+ "GetBlockMetadata for {} took {}. Found {} blocks",
+ BuildPartId,
+ NiceTimeSpanMs(GetBlockMetadataTimer.GetElapsedTimeMs()),
+ OutBlockDescriptions.size());
+ }
}
CalculateLocalChunkOrders(AbsoluteChunkOrders,
@@ -7989,6 +7847,8 @@ namespace buildstorageoperations_testutils {
} // namespace buildstorageoperations_testutils
+TEST_SUITE_BEGIN("remotestore.buildstorageoperations");
+
TEST_CASE("buildstorageoperations.upload.folder")
{
using namespace buildstorageoperations_testutils;
@@ -8176,106 +8036,270 @@ TEST_CASE("buildstorageoperations.memorychunkingcache")
TEST_CASE("buildstorageoperations.upload.multipart")
{
- using namespace buildstorageoperations_testutils;
+ // Disabled since it relies on authentication and specific block being present in cloud storage
+ if (false)
+ {
+ using namespace buildstorageoperations_testutils;
- FastRandom BaseRandom;
+ FastRandom BaseRandom;
- const size_t FileCount = 11;
+ const size_t FileCount = 11;
- const std::string Paths[FileCount] = {{"file_1"},
- {"file_2.exe"},
- {"file_3.txt"},
- {"dir_1/dir1_file_1.exe"},
- {"dir_1/dir1_file_2.pdb"},
- {"dir_1/dir1_file_3.txt"},
- {"dir_2/dir2_dir1/dir2_dir1_file_1.exe"},
- {"dir_2/dir2_dir1/dir2_dir1_file_2.pdb"},
- {"dir_2/dir2_dir1/dir2_dir1_file_3.dll"},
- {"dir_2/dir2_dir2/dir2_dir2_file_1.txt"},
- {"dir_2/dir2_dir2/dir2_dir2_file_2.json"}};
- const uint64_t Sizes[FileCount] =
- {6u * 1024u, 0, 798, 19u * 1024u, 7u * 1024u, 93, 31u * 1024u, 17u * 1024u, 13u * 1024u, 2u * 1024u, 3u * 1024u};
+ const std::string Paths[FileCount] = {{"file_1"},
+ {"file_2.exe"},
+ {"file_3.txt"},
+ {"dir_1/dir1_file_1.exe"},
+ {"dir_1/dir1_file_2.pdb"},
+ {"dir_1/dir1_file_3.txt"},
+ {"dir_2/dir2_dir1/dir2_dir1_file_1.exe"},
+ {"dir_2/dir2_dir1/dir2_dir1_file_2.pdb"},
+ {"dir_2/dir2_dir1/dir2_dir1_file_3.dll"},
+ {"dir_2/dir2_dir2/dir2_dir2_file_1.txt"},
+ {"dir_2/dir2_dir2/dir2_dir2_file_2.json"}};
+ const uint64_t Sizes[FileCount] =
+ {6u * 1024u, 0, 798, 19u * 1024u, 7u * 1024u, 93, 31u * 1024u, 17u * 1024u, 13u * 1024u, 2u * 1024u, 3u * 1024u};
- ScopedTemporaryDirectory SourceFolder;
- TestState State(SourceFolder.Path());
- State.Initialize();
- State.CreateSourceData("source", Paths, Sizes);
+ ScopedTemporaryDirectory SourceFolder;
+ TestState State(SourceFolder.Path());
+ State.Initialize();
+ State.CreateSourceData("source", Paths, Sizes);
- std::span<const std::string> ManifestFiles1(Paths);
- ManifestFiles1 = ManifestFiles1.subspan(0, FileCount / 2);
+ std::span<const std::string> ManifestFiles1(Paths);
+ ManifestFiles1 = ManifestFiles1.subspan(0, FileCount / 2);
- std::span<const uint64_t> ManifestSizes1(Sizes);
- ManifestSizes1 = ManifestSizes1.subspan(0, FileCount / 2);
+ std::span<const uint64_t> ManifestSizes1(Sizes);
+ ManifestSizes1 = ManifestSizes1.subspan(0, FileCount / 2);
- std::span<const std::string> ManifestFiles2(Paths);
- ManifestFiles2 = ManifestFiles2.subspan(FileCount / 2 - 1);
+ std::span<const std::string> ManifestFiles2(Paths);
+ ManifestFiles2 = ManifestFiles2.subspan(FileCount / 2 - 1);
- std::span<const uint64_t> ManifestSizes2(Sizes);
- ManifestSizes2 = ManifestSizes2.subspan(FileCount / 2 - 1);
+ std::span<const uint64_t> ManifestSizes2(Sizes);
+ ManifestSizes2 = ManifestSizes2.subspan(FileCount / 2 - 1);
- const Oid BuildPart1Id = Oid::NewOid();
- const std::string BuildPart1Name = "part1";
- const Oid BuildPart2Id = Oid::NewOid();
- const std::string BuildPart2Name = "part2";
- {
- CbObjectWriter Writer;
- Writer.BeginObject("parts"sv);
+ const Oid BuildPart1Id = Oid::NewOid();
+ const std::string BuildPart1Name = "part1";
+ const Oid BuildPart2Id = Oid::NewOid();
+ const std::string BuildPart2Name = "part2";
{
- Writer.BeginObject(BuildPart1Name);
+ CbObjectWriter Writer;
+ Writer.BeginObject("parts"sv);
{
- Writer.AddObjectId("partId"sv, BuildPart1Id);
- Writer.BeginArray("files"sv);
- for (const std::string& ManifestFile : ManifestFiles1)
+ Writer.BeginObject(BuildPart1Name);
{
- Writer.AddString(ManifestFile);
+ Writer.AddObjectId("partId"sv, BuildPart1Id);
+ Writer.BeginArray("files"sv);
+ for (const std::string& ManifestFile : ManifestFiles1)
+ {
+ Writer.AddString(ManifestFile);
+ }
+ Writer.EndArray(); // files
+ }
+ Writer.EndObject(); // part1
+
+ Writer.BeginObject(BuildPart2Name);
+ {
+ Writer.AddObjectId("partId"sv, BuildPart2Id);
+ Writer.BeginArray("files"sv);
+ for (const std::string& ManifestFile : ManifestFiles2)
+ {
+ Writer.AddString(ManifestFile);
+ }
+ Writer.EndArray(); // files
}
- Writer.EndArray(); // files
+ Writer.EndObject(); // part2
+ }
+ Writer.EndObject(); // parts
+
+ ExtendableStringBuilder<1024> Manifest;
+ CompactBinaryToJson(Writer.Save(), Manifest);
+ WriteFile(State.RootPath / "manifest.json", IoBuffer(IoBuffer::Wrap, Manifest.Data(), Manifest.Size()));
+ }
+
+ const Oid BuildId = Oid::NewOid();
+
+ auto Result = State.Upload(BuildId, {}, {}, "source", State.RootPath / "manifest.json");
+
+ CHECK_EQ(Result.size(), 2u);
+ CHECK_EQ(Result[0].first, BuildPart1Id);
+ CHECK_EQ(Result[0].second, BuildPart1Name);
+ CHECK_EQ(Result[1].first, BuildPart2Id);
+ CHECK_EQ(Result[1].second, BuildPart2Name);
+ State.ValidateUpload(BuildId, Result);
+
+ FolderContent DownloadContent = State.Download(BuildId, Oid::Zero, {}, "download", /* Append */ false);
+ State.ValidateDownload(Paths, Sizes, "source", "download", DownloadContent);
+
+ FolderContent Part1DownloadContent = State.Download(BuildId, BuildPart1Id, {}, "download_part1", /* Append */ false);
+ State.ValidateDownload(ManifestFiles1, ManifestSizes1, "source", "download_part1", Part1DownloadContent);
+
+ FolderContent Part2DownloadContent = State.Download(BuildId, Oid::Zero, BuildPart2Name, "download_part2", /* Append */ false);
+ State.ValidateDownload(ManifestFiles2, ManifestSizes2, "source", "download_part2", Part2DownloadContent);
+
+ (void)State.Download(BuildId, BuildPart1Id, BuildPart1Name, "download_part1+2", /* Append */ false);
+ FolderContent Part1And2DownloadContent = State.Download(BuildId, BuildPart2Id, {}, "download_part1+2", /* Append */ true);
+ State.ValidateDownload(Paths, Sizes, "source", "download_part1+2", Part1And2DownloadContent);
+ }
+}
+
+TEST_CASE("buildstorageoperations.partial.block.download" * doctest::skip(true))
+{
+ const std::string OidcExecutableName = "OidcToken" ZEN_EXE_SUFFIX_LITERAL;
+ std::filesystem::path OidcTokenExePath = (GetRunningExecutablePath().parent_path() / OidcExecutableName).make_preferred();
+
+ HttpClientSettings ClientSettings{
+ .LogCategory = "httpbuildsclient",
+ .AccessTokenProvider =
+ httpclientauth::CreateFromOidcTokenExecutable(OidcTokenExePath, "https://jupiter.devtools.epicgames.com", true, false, false),
+ .AssumeHttp2 = false,
+ .AllowResume = true,
+ .RetryCount = 0,
+ .Verbose = false};
+
+ HttpClient HttpClient("https://euc.jupiter.devtools.epicgames.com", ClientSettings);
+
+ const std::string_view Namespace = "fortnite.oplog";
+ const std::string_view Bucket = "fortnitegame.staged-build.fortnite-main.ps4-client";
+ const Oid BuildId = Oid::FromHexString("09a76ea92ad301d4724fafad");
+
+ {
+ HttpClient::Response Response = HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, Bucket, BuildId),
+ HttpClient::Accept(ZenContentType::kCbObject));
+ CbValidateError ValidateResult = CbValidateError::None;
+ CbObject Object = ValidateAndReadCompactBinaryObject(IoBuffer(Response.ResponsePayload), ValidateResult);
+ REQUIRE(ValidateResult == CbValidateError::None);
+ }
+
+ std::vector<ChunkBlockDescription> BlockDescriptions;
+ {
+ CbObjectWriter Request;
+
+ Request.BeginArray("blocks"sv);
+ {
+ Request.AddHash(IoHash::FromHexString("7c353ed782675a5e8f968e61e51fc797ecdc2882"));
+ }
+ Request.EndArray();
+
+ IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer();
+ Payload.SetContentType(ZenContentType::kCbObject);
+
+ HttpClient::Response BlockDescriptionsResponse =
+ HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/blocks/getBlockMetadata", Namespace, Bucket, BuildId),
+ Payload,
+ HttpClient::Accept(ZenContentType::kCbObject));
+ REQUIRE(BlockDescriptionsResponse.IsSuccess());
+
+ CbValidateError ValidateResult = CbValidateError::None;
+ CbObject Object = ValidateAndReadCompactBinaryObject(IoBuffer(BlockDescriptionsResponse.ResponsePayload), ValidateResult);
+ REQUIRE(ValidateResult == CbValidateError::None);
+
+ {
+ CbArrayView BlocksArray = Object["blocks"sv].AsArrayView();
+ for (CbFieldView Block : BlocksArray)
+ {
+ ChunkBlockDescription Description = ParseChunkBlockDescription(Block.AsObjectView());
+ BlockDescriptions.emplace_back(std::move(Description));
}
- Writer.EndObject(); // part1
+ }
+ }
+
+ REQUIRE(!BlockDescriptions.empty());
- Writer.BeginObject(BuildPart2Name);
+ const IoHash BlockHash = BlockDescriptions.back().BlockHash;
+
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions.front();
+ REQUIRE(!BlockDescription.ChunkRawHashes.empty());
+ REQUIRE(!BlockDescription.ChunkCompressedLengths.empty());
+
+ std::vector<std::pair<uint64_t, uint64_t>> ChunkOffsetAndSizes;
+ uint64_t Offset = gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize);
+
+ for (uint32_t ChunkCompressedSize : BlockDescription.ChunkCompressedLengths)
+ {
+ ChunkOffsetAndSizes.push_back(std::make_pair(Offset, ChunkCompressedSize));
+ Offset += ChunkCompressedSize;
+ }
+
+ ScopedTemporaryDirectory SourceFolder;
+
+ auto Validate = [&](std::span<const uint32_t> ChunkIndexesToFetch) {
+ std::vector<std::pair<uint64_t, uint64_t>> Ranges;
+ for (uint32_t ChunkIndex : ChunkIndexesToFetch)
+ {
+ Ranges.push_back(ChunkOffsetAndSizes[ChunkIndex]);
+ }
+
+ HttpClient::KeyValueMap Headers;
+ if (!Ranges.empty())
+ {
+ ExtendableStringBuilder<512> SB;
+ for (const std::pair<uint64_t, uint64_t>& R : Ranges)
{
- Writer.AddObjectId("partId"sv, BuildPart2Id);
- Writer.BeginArray("files"sv);
- for (const std::string& ManifestFile : ManifestFiles2)
+ if (SB.Size() > 0)
{
- Writer.AddString(ManifestFile);
+ SB << ", ";
}
- Writer.EndArray(); // files
+ SB << R.first << "-" << R.first + R.second - 1;
}
- Writer.EndObject(); // part2
+ Headers.Entries.insert({"Range", fmt::format("bytes={}", SB.ToView())});
}
- Writer.EndObject(); // parts
- ExtendableStringBuilder<1024> Manifest;
- CompactBinaryToJson(Writer.Save(), Manifest);
- WriteFile(State.RootPath / "manifest.json", IoBuffer(IoBuffer::Wrap, Manifest.Data(), Manifest.Size()));
- }
+ HttpClient::Response GetBlobRangesResponse = HttpClient.Download(
+ fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect=false", Namespace, Bucket, BuildId, BlockHash),
+ SourceFolder.Path(),
+ Headers);
- const Oid BuildId = Oid::NewOid();
+ REQUIRE(GetBlobRangesResponse.IsSuccess());
+ [[maybe_unused]] MemoryView RangesMemoryView = GetBlobRangesResponse.ResponsePayload.GetView();
- auto Result = State.Upload(BuildId, {}, {}, "source", State.RootPath / "manifest.json");
+ std::vector<std::pair<uint64_t, uint64_t>> PayloadRanges = GetBlobRangesResponse.GetRanges(Ranges);
+ if (PayloadRanges.empty())
+ {
+ // We got the whole blob, use the ranges as is
+ PayloadRanges = Ranges;
+ }
- CHECK_EQ(Result.size(), 2u);
- CHECK_EQ(Result[0].first, BuildPart1Id);
- CHECK_EQ(Result[0].second, BuildPart1Name);
- CHECK_EQ(Result[1].first, BuildPart2Id);
- CHECK_EQ(Result[1].second, BuildPart2Name);
- State.ValidateUpload(BuildId, Result);
+ REQUIRE(PayloadRanges.size() == Ranges.size());
- FolderContent DownloadContent = State.Download(BuildId, Oid::Zero, {}, "download", /* Append */ false);
- State.ValidateDownload(Paths, Sizes, "source", "download", DownloadContent);
+ for (uint32_t RangeIndex = 0; RangeIndex < PayloadRanges.size(); RangeIndex++)
+ {
+ const std::pair<uint64_t, uint64_t>& PayloadRange = PayloadRanges[RangeIndex];
+
+ CHECK_EQ(PayloadRange.second, Ranges[RangeIndex].second);
- FolderContent Part1DownloadContent = State.Download(BuildId, BuildPart1Id, {}, "download_part1", /* Append */ false);
- State.ValidateDownload(ManifestFiles1, ManifestSizes1, "source", "download_part1", Part1DownloadContent);
+ IoBuffer ChunkPayload(GetBlobRangesResponse.ResponsePayload, PayloadRange.first, PayloadRange.second);
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(SharedBuffer(ChunkPayload), RawHash, RawSize);
+ CHECK(CompressedChunk);
+ CHECK_EQ(RawHash, BlockDescription.ChunkRawHashes[ChunkIndexesToFetch[RangeIndex]]);
+ CHECK_EQ(RawSize, BlockDescription.ChunkRawLengths[ChunkIndexesToFetch[RangeIndex]]);
+ }
+ };
- FolderContent Part2DownloadContent = State.Download(BuildId, Oid::Zero, BuildPart2Name, "download_part2", /* Append */ false);
- State.ValidateDownload(ManifestFiles2, ManifestSizes2, "source", "download_part2", Part2DownloadContent);
+ {
+ // Single
+ std::vector<uint32_t> ChunkIndexesToFetch{uint32_t(BlockDescription.ChunkCompressedLengths.size() / 2)};
+ Validate(ChunkIndexesToFetch);
+ }
+ {
+ // Many
+ std::vector<uint32_t> ChunkIndexesToFetch;
+ for (uint32_t Index = 0; Index < BlockDescription.ChunkCompressedLengths.size() / 16; Index++)
+ {
+ ChunkIndexesToFetch.push_back(uint32_t(BlockDescription.ChunkCompressedLengths.size() / 6 + Index * 7));
+ ChunkIndexesToFetch.push_back(uint32_t(BlockDescription.ChunkCompressedLengths.size() / 6 + Index * 7 + 1));
+ ChunkIndexesToFetch.push_back(uint32_t(BlockDescription.ChunkCompressedLengths.size() / 6 + Index * 7 + 3));
+ }
+ Validate(ChunkIndexesToFetch);
+ }
- (void)State.Download(BuildId, BuildPart1Id, BuildPart1Name, "download_part1+2", /* Append */ false);
- FolderContent Part1And2DownloadContent = State.Download(BuildId, BuildPart2Id, {}, "download_part1+2", /* Append */ true);
- State.ValidateDownload(Paths, Sizes, "source", "download_part1+2", Part1And2DownloadContent);
+ {
+ // First and last
+ std::vector<uint32_t> ChunkIndexesToFetch{0, uint32_t(BlockDescription.ChunkCompressedLengths.size() - 1)};
+ Validate(ChunkIndexesToFetch);
+ }
}
+TEST_SUITE_END();
void
buildstorageoperations_forcelink()