diff options
Diffstat (limited to 'src/zenremotestore/builds/buildstorageoperations.cpp')
| -rw-r--r-- | src/zenremotestore/builds/buildstorageoperations.cpp | 504 |
1 files changed, 200 insertions, 304 deletions
diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp index 3a41cd7eb..389f8614d 100644 --- a/src/zenremotestore/builds/buildstorageoperations.cpp +++ b/src/zenremotestore/builds/buildstorageoperations.cpp @@ -382,6 +382,46 @@ namespace { return CompositeBuffer{}; } + std::filesystem::path TryMoveDownloadedChunk(IoBuffer& BlockBuffer, const std::filesystem::path& Path, bool ForceDiskBased) + { + uint64_t BlockSize = BlockBuffer.GetSize(); + IoBufferFileReference FileRef; + if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == BlockSize)) + { + ZEN_TRACE_CPU("MoveTempFullBlock"); + std::error_code Ec; + std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); + if (!Ec) + { + BlockBuffer.SetDeleteOnClose(false); + BlockBuffer = {}; + RenameFile(TempBlobPath, Path, Ec); + if (Ec) + { + // Re-open the temp file again + BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); + BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); + BlockBuffer.SetDeleteOnClose(true); + } + else + { + return Path; + } + } + } + + if (ForceDiskBased) + { + // Could not be moved and rather large, lets store it on disk + ZEN_TRACE_CPU("WriteTempFullBlock"); + TemporaryFile::SafeWriteFile(Path, BlockBuffer); + BlockBuffer = {}; + return Path; + } + + return {}; + } + } // namespace class ReadFileCache @@ -673,9 +713,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) auto EndProgress = MakeGuard([&]() { m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::StepCount, (uint32_t)TaskSteps::StepCount); }); - ZEN_ASSERT((!m_Options.PrimeCacheOnly) || - (m_Options.PrimeCacheOnly && (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Off))); - m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::ScanExistingData, (uint32_t)TaskSteps::StepCount); CreateDirectories(m_CacheFolderPath); @@ -690,20 +727,14 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedChunkHashesFound; tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedSequenceHashesFound; - if (!m_Options.PrimeCacheOnly) - { - ScanCacheFolder(CachedChunkHashesFound, CachedSequenceHashesFound); - } + ScanCacheFolder(CachedChunkHashesFound, CachedSequenceHashesFound); tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedBlocksFound; - if (!m_Options.PrimeCacheOnly) - { - ScanTempBlocksFolder(CachedBlocksFound); - } + ScanTempBlocksFolder(CachedBlocksFound); tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> SequenceIndexesLeftToFindToRemoteIndex; - if (!m_Options.PrimeCacheOnly && m_Options.EnableTargetFolderScavenging) + if (m_Options.EnableTargetFolderScavenging) { // Pick up all whole files we can use from current local state ZEN_TRACE_CPU("GetLocalSequences"); @@ -738,7 +769,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) std::vector<ScavengedSequenceCopyOperation> ScavengedSequenceCopyOperations; uint64_t ScavengedPathsCount = 0; - if (!m_Options.PrimeCacheOnly && m_Options.EnableOtherDownloadsScavenging) + if (m_Options.EnableOtherDownloadsScavenging) { ZEN_TRACE_CPU("GetScavengedSequences"); @@ -882,7 +913,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) tsl::robin_map<IoHash, size_t, IoHash::Hasher> RawHashToCopyChunkDataIndex; std::vector<CopyChunkData> CopyChunkDatas; - if (!m_Options.PrimeCacheOnly && m_Options.EnableTargetFolderScavenging) + if (m_Options.EnableTargetFolderScavenging) { ZEN_TRACE_CPU("GetLocalChunks"); @@ -902,7 +933,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) m_CacheMappingStats.LocalScanElapsedWallTimeUs += LocalTimer.GetElapsedTimeUs(); } - if (!m_Options.PrimeCacheOnly && m_Options.EnableOtherDownloadsScavenging) + if (m_Options.EnableOtherDownloadsScavenging) { ZEN_TRACE_CPU("GetScavengeChunks"); @@ -1025,30 +1056,23 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) ZEN_TRACE_CPU("BlockCacheFileExists"); for (const ChunkBlockAnalyser::NeededBlock& NeededBlock : NeededBlocks) { - if (m_Options.PrimeCacheOnly) + const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[NeededBlock.BlockIndex]; + bool UsingCachedBlock = false; + if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end()) { - FetchBlockIndexes.push_back(NeededBlock.BlockIndex); - } - else - { - const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[NeededBlock.BlockIndex]; - bool UsingCachedBlock = false; - if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end()) - { - TotalPartWriteCount++; + TotalPartWriteCount++; - std::filesystem::path BlockPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); - if (IsFile(BlockPath)) - { - CachedChunkBlockIndexes.push_back(NeededBlock.BlockIndex); - UsingCachedBlock = true; - } - } - if (!UsingCachedBlock) + std::filesystem::path BlockPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); + if (IsFile(BlockPath)) { - FetchBlockIndexes.push_back(NeededBlock.BlockIndex); + CachedChunkBlockIndexes.push_back(NeededBlock.BlockIndex); + UsingCachedBlock = true; } } + if (!UsingCachedBlock) + { + FetchBlockIndexes.push_back(NeededBlock.BlockIndex); + } } } @@ -1229,10 +1253,9 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) FilteredRate FilteredDownloadedBytesPerSecond; FilteredRate FilteredWrittenBytesPerSecond; - std::unique_ptr<OperationLogOutput::ProgressBar> WriteProgressBarPtr( - m_LogOutput.CreateProgressBar(m_Options.PrimeCacheOnly ? "Downloading" : "Writing")); - OperationLogOutput::ProgressBar& WriteProgressBar(*WriteProgressBarPtr); - ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + std::unique_ptr<OperationLogOutput::ProgressBar> WriteProgressBarPtr(m_LogOutput.CreateProgressBar("Writing")); + OperationLogOutput::ProgressBar& WriteProgressBar(*WriteProgressBarPtr); + ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); TotalPartWriteCount += CopyChunkDatas.size(); TotalPartWriteCount += ScavengedSequenceCopyOperations.size(); @@ -1245,37 +1268,34 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) { break; } - if (!m_Options.PrimeCacheOnly) - { - Work.ScheduleWork( - m_IOWorkerPool, - [this, - &ScavengedPaths, - &ScavengedSequenceCopyOperations, - &ScavengedContents, - &FilteredWrittenBytesPerSecond, - ScavengeOpIndex, - &WritePartsComplete, - TotalPartWriteCount](std::atomic<bool>&) mutable { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Async_WriteScavenged"); + Work.ScheduleWork( + m_IOWorkerPool, + [this, + &ScavengedPaths, + &ScavengedSequenceCopyOperations, + &ScavengedContents, + &FilteredWrittenBytesPerSecond, + ScavengeOpIndex, + &WritePartsComplete, + TotalPartWriteCount](std::atomic<bool>&) mutable { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("Async_WriteScavenged"); - FilteredWrittenBytesPerSecond.Start(); + FilteredWrittenBytesPerSecond.Start(); - const ScavengedSequenceCopyOperation& ScavengeOp = ScavengedSequenceCopyOperations[ScavengeOpIndex]; - const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengeOp.ScavengedContentIndex]; - const std::filesystem::path& ScavengeRootPath = ScavengedPaths[ScavengeOp.ScavengedContentIndex]; + const ScavengedSequenceCopyOperation& ScavengeOp = ScavengedSequenceCopyOperations[ScavengeOpIndex]; + const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengeOp.ScavengedContentIndex]; + const std::filesystem::path& ScavengeRootPath = ScavengedPaths[ScavengeOp.ScavengedContentIndex]; - WriteScavengedSequenceToCache(ScavengeRootPath, ScavengedContent, ScavengeOp); + WriteScavengedSequenceToCache(ScavengeRootPath, ScavengedContent, ScavengeOp); - if (WritePartsComplete.fetch_add(1) + 1 == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); - } + if (WritePartsComplete.fetch_add(1) + 1 == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); } - }); - } + } + }); } for (uint32_t LooseChunkHashWorkIndex = 0; LooseChunkHashWorkIndex < LooseChunkHashWorks.size(); LooseChunkHashWorkIndex++) @@ -1285,16 +1305,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) break; } - if (m_Options.PrimeCacheOnly) - { - const uint32_t RemoteChunkIndex = LooseChunkHashWorks[LooseChunkHashWorkIndex].RemoteChunkIndex; - if (ExistsResult.ExistingBlobs.contains(m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex])) - { - m_DownloadStats.RequestsCompleteCount++; - continue; - } - } - Work.ScheduleWork( m_IOWorkerPool, [this, @@ -1338,7 +1348,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) for (size_t CopyDataIndex = 0; CopyDataIndex < CopyChunkDatas.size(); CopyDataIndex++) { - ZEN_ASSERT(!m_Options.PrimeCacheOnly); if (m_AbortFlag) { break; @@ -1397,7 +1406,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) for (uint32_t BlockIndex : CachedChunkBlockIndexes) { - ZEN_ASSERT(!m_Options.PrimeCacheOnly); if (m_AbortFlag) { break; @@ -1464,7 +1472,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) for (size_t BlockRangeIndex = 0; BlockRangeIndex < PartialBlocks.BlockRanges.size();) { - ZEN_ASSERT(!m_Options.PrimeCacheOnly); if (m_AbortFlag) { break; @@ -1627,12 +1634,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) break; } - if (m_Options.PrimeCacheOnly && ExistsResult.ExistingBlobs.contains(m_BlockDescriptions[BlockIndex].BlockHash)) - { - m_DownloadStats.RequestsCompleteCount++; - continue; - } - Work.ScheduleWork( m_NetworkPool, [this, @@ -1684,14 +1685,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); } - if (m_Storage.CacheStorage && m_Options.PopulateCache) - { - m_Storage.CacheStorage->PutBuildBlob(m_BuildId, - BlockDescription.BlockHash, - ZenContentType::kCompressedBinary, - CompositeBuffer(SharedBuffer(BlockBuffer))); - } - uint64_t BlockSize = BlockBuffer.GetSize(); m_DownloadStats.DownloadedBlockCount++; m_DownloadStats.DownloadedBlockByteCount += BlockSize; @@ -1700,47 +1693,27 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) FilteredDownloadedBytesPerSecond.Stop(); } - if (!m_Options.PrimeCacheOnly) - { - std::filesystem::path BlockChunkPath; + const bool PutInCache = !ExistsInCache && m_Storage.CacheStorage && m_Options.PopulateCache; - // Check if the dowloaded block is file based and we can move it directly without rewriting it - { - IoBufferFileReference FileRef; - if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && - (FileRef.FileChunkSize == BlockSize)) - { - ZEN_TRACE_CPU("MoveTempFullBlock"); - std::error_code Ec; - std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); - if (!Ec) - { - BlockBuffer.SetDeleteOnClose(false); - BlockBuffer = {}; - BlockChunkPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); - RenameFile(TempBlobPath, BlockChunkPath, Ec); - if (Ec) - { - BlockChunkPath = std::filesystem::path{}; + std::filesystem::path BlockChunkPath = TryMoveDownloadedChunk( + BlockBuffer, + m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(), + /* ForceDiskBased */ PutInCache || (BlockSize > m_Options.MaximumInMemoryPayloadSize)); - // Re-open the temp file again - BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); - BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); - BlockBuffer.SetDeleteOnClose(true); - } - } - } - } - - if (BlockChunkPath.empty() && (BlockSize > m_Options.MaximumInMemoryPayloadSize)) + if (PutInCache) + { + ZEN_ASSERT(!BlockChunkPath.empty()); + IoBuffer CacheBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); + if (CacheBuffer) { - ZEN_TRACE_CPU("WriteTempFullBlock"); - // Could not be moved and rather large, lets store it on disk - BlockChunkPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); - TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); - BlockBuffer = {}; + m_Storage.CacheStorage->PutBuildBlob(m_BuildId, + BlockDescription.BlockHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(SharedBuffer(CacheBuffer))); } + } + { if (!m_AbortFlag) { Work.ScheduleWork( @@ -1840,12 +1813,11 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) { CloneDetails = fmt::format(" ({} cloned)", NiceBytes(m_DiskStats.CloneByteCount.load())); } - std::string WriteDetails = m_Options.PrimeCacheOnly ? "" - : fmt::format(" {}/{} ({}B/s) written{}", - NiceBytes(m_WrittenChunkByteCount.load()), - NiceBytes(BytesToWrite), - NiceNum(FilteredWrittenBytesPerSecond.GetCurrent()), - CloneDetails); + std::string WriteDetails = fmt::format(" {}/{} ({}B/s) written{}", + NiceBytes(m_WrittenChunkByteCount.load()), + NiceBytes(BytesToWrite), + NiceNum(FilteredWrittenBytesPerSecond.GetCurrent()), + CloneDetails); std::string Details = fmt::format("{}/{} ({}{}) downloaded.{}", m_DownloadStats.RequestsCompleteCount.load(), @@ -1855,11 +1827,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) WriteDetails); std::string Task; - if (m_Options.PrimeCacheOnly) - { - Task = "Downloading "; - } - else if ((m_WrittenChunkByteCount < BytesToWrite) || (BytesToValidate == 0)) + if ((m_WrittenChunkByteCount < BytesToWrite) || (BytesToValidate == 0)) { Task = "Writing chunks "; } @@ -1868,15 +1836,13 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) Task = "Verifying chunks "; } - WriteProgressBar.UpdateState( - {.Task = Task, - .Details = Details, - .TotalCount = m_Options.PrimeCacheOnly ? TotalRequestCount : (BytesToWrite + BytesToValidate), - .RemainingCount = m_Options.PrimeCacheOnly ? (TotalRequestCount - m_DownloadStats.RequestsCompleteCount.load()) - : ((BytesToWrite + BytesToValidate) - - (m_WrittenChunkByteCount.load() + m_ValidatedChunkByteCount.load())), - .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, - false); + WriteProgressBar.UpdateState({.Task = Task, + .Details = Details, + .TotalCount = (BytesToWrite + BytesToValidate), + .RemainingCount = ((BytesToWrite + BytesToValidate) - + (m_WrittenChunkByteCount.load() + m_ValidatedChunkByteCount.load())), + .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, + false); }); } @@ -1891,34 +1857,31 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) return; } - if (!m_Options.PrimeCacheOnly) + uint32_t RawSequencesMissingWriteCount = 0; + for (uint32_t SequenceIndex = 0; SequenceIndex < SequenceIndexChunksLeftToWriteCounters.size(); SequenceIndex++) { - uint32_t RawSequencesMissingWriteCount = 0; - for (uint32_t SequenceIndex = 0; SequenceIndex < SequenceIndexChunksLeftToWriteCounters.size(); SequenceIndex++) + const auto& SequenceIndexChunksLeftToWriteCounter = SequenceIndexChunksLeftToWriteCounters[SequenceIndex]; + if (SequenceIndexChunksLeftToWriteCounter.load() != 0) { - const auto& SequenceIndexChunksLeftToWriteCounter = SequenceIndexChunksLeftToWriteCounters[SequenceIndex]; - if (SequenceIndexChunksLeftToWriteCounter.load() != 0) + RawSequencesMissingWriteCount++; + const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; + const std::filesystem::path& IncompletePath = m_RemoteContent.Paths[PathIndex]; + ZEN_ASSERT(!IncompletePath.empty()); + const uint32_t ExpectedSequenceCount = m_RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]; + if (!m_Options.IsQuiet) { - RawSequencesMissingWriteCount++; - const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; - const std::filesystem::path& IncompletePath = m_RemoteContent.Paths[PathIndex]; - ZEN_ASSERT(!IncompletePath.empty()); - const uint32_t ExpectedSequenceCount = m_RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]; - if (!m_Options.IsQuiet) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "{}: Max count {}, Current count {}", - IncompletePath, - ExpectedSequenceCount, - SequenceIndexChunksLeftToWriteCounter.load()); - } - ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounter.load() <= ExpectedSequenceCount); + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "{}: Max count {}, Current count {}", + IncompletePath, + ExpectedSequenceCount, + SequenceIndexChunksLeftToWriteCounter.load()); } + ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounter.load() <= ExpectedSequenceCount); } - ZEN_ASSERT(RawSequencesMissingWriteCount == 0); - ZEN_ASSERT(m_WrittenChunkByteCount == BytesToWrite); - ZEN_ASSERT(m_ValidatedChunkByteCount == BytesToValidate); } + ZEN_ASSERT(RawSequencesMissingWriteCount == 0); + ZEN_ASSERT(m_WrittenChunkByteCount == BytesToWrite); + ZEN_ASSERT(m_ValidatedChunkByteCount == BytesToValidate); const uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() + m_DownloadStats.DownloadedBlockByteCount.load() + @@ -1948,11 +1911,6 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) m_WriteChunkStats.WriteTimeUs = FilteredWrittenBytesPerSecond.GetElapsedTimeUS(); } - if (m_Options.PrimeCacheOnly) - { - return; - } - m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::PrepareTarget, (uint32_t)TaskSteps::StepCount); tsl::robin_map<uint32_t, uint32_t> RemotePathIndexToLocalPathIndex; @@ -3104,17 +3062,13 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd FilteredRate& FilteredDownloadedBytesPerSecond, FilteredRate& FilteredWrittenBytesPerSecond) { - std::filesystem::path ExistingCompressedChunkPath; - if (!m_Options.PrimeCacheOnly) + const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + std::filesystem::path ExistingCompressedChunkPath = FindDownloadedChunk(ChunkHash); + if (!ExistingCompressedChunkPath.empty()) { - const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; - ExistingCompressedChunkPath = FindDownloadedChunk(ChunkHash); - if (!ExistingCompressedChunkPath.empty()) + if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount) { - if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } + FilteredDownloadedBytesPerSecond.Stop(); } } if (!m_AbortFlag) @@ -3220,10 +3174,9 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd RemoteChunkIndex, &FilteredWrittenBytesPerSecond, ChunkTargetPtrs = std::move(ChunkTargetPtrs)](IoBuffer&& Payload) mutable { - IoBufferFileReference FileRef; - bool EnableBacklog = Payload.GetFileReference(FileRef); AsyncWriteDownloadedChunk(m_Options.ZenFolderPath, RemoteChunkIndex, + ExistsResult, std::move(ChunkTargetPtrs), WriteCache, Work, @@ -3231,8 +3184,7 @@ BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkInd SequenceIndexChunksLeftToWriteCounters, WritePartsComplete, TotalPartWriteCount, - FilteredWrittenBytesPerSecond, - EnableBacklog); + FilteredWrittenBytesPerSecond); }); } }); @@ -3293,14 +3245,6 @@ BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkInde FilteredDownloadedBytesPerSecond.Stop(); } - if (Payload && m_Storage.CacheStorage && m_Options.PopulateCache) - { - m_Storage.CacheStorage->PutBuildBlob(m_BuildId, - ChunkHash, - ZenContentType::kCompressedBinary, - CompositeBuffer(SharedBuffer(Payload))); - } - OnDownloaded(std::move(Payload)); }); } @@ -3320,31 +3264,22 @@ BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkInde } if (!m_AbortFlag) { - if (BuildBlob && m_Storage.CacheStorage && m_Options.PopulateCache) - { - m_Storage.CacheStorage->PutBuildBlob(m_BuildId, - ChunkHash, - ZenContentType::kCompressedBinary, - CompositeBuffer(SharedBuffer(BuildBlob))); - } if (!BuildBlob) { throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash)); } - if (!m_Options.PrimeCacheOnly) + + if (!m_AbortFlag) { - if (!m_AbortFlag) + uint64_t BlobSize = BuildBlob.GetSize(); + m_DownloadStats.DownloadedChunkCount++; + m_DownloadStats.DownloadedChunkByteCount += BlobSize; + if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount) { - uint64_t BlobSize = BuildBlob.GetSize(); - m_DownloadStats.DownloadedChunkCount++; - m_DownloadStats.DownloadedChunkByteCount += BlobSize; - if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - - OnDownloaded(std::move(BuildBlob)); + FilteredDownloadedBytesPerSecond.Stop(); } + + OnDownloaded(std::move(BuildBlob)); } } } @@ -3388,61 +3323,17 @@ BuildsOperationUpdateFolder::DownloadPartialBlock( FilteredDownloadedBytesPerSecond.Stop(); } - std::filesystem::path BlockChunkPath; - - // Check if the dowloaded block is file based and we can move it directly without rewriting it + IoHashStream RangeId; + for (const std::pair<uint64_t, uint64_t>& Range : BlockOffsetAndLengths) { - IoBufferFileReference FileRef; - if (BlockRangeBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && - (FileRef.FileChunkSize == BlockRangeBufferSize)) - { - ZEN_TRACE_CPU("MoveTempPartialBlock"); - - std::error_code Ec; - std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); - if (!Ec) - { - BlockRangeBuffer.SetDeleteOnClose(false); - BlockRangeBuffer = {}; - - IoHashStream RangeId; - for (const std::pair<uint64_t, uint64_t>& Range : BlockOffsetAndLengths) - { - RangeId.Append(&Range.first, sizeof(uint64_t)); - RangeId.Append(&Range.second, sizeof(uint64_t)); - } - - BlockChunkPath = m_TempBlockFolderPath / fmt::format("{}_{}", BlockDescription.BlockHash, RangeId.GetHash()); - RenameFile(TempBlobPath, BlockChunkPath, Ec); - if (Ec) - { - BlockChunkPath = std::filesystem::path{}; - - // Re-open the temp file again - BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); - BlockRangeBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockRangeBufferSize, true); - BlockRangeBuffer.SetDeleteOnClose(true); - } - } - } + RangeId.Append(&Range.first, sizeof(uint64_t)); + RangeId.Append(&Range.second, sizeof(uint64_t)); } + std::filesystem::path BlockChunkPath = + TryMoveDownloadedChunk(BlockRangeBuffer, + m_TempBlockFolderPath / fmt::format("{}_{}", BlockDescription.BlockHash, RangeId.GetHash()), + /* ForceDiskBased */ BlockRangeBufferSize > m_Options.MaximumInMemoryPayloadSize); - if (BlockChunkPath.empty() && (BlockRangeBufferSize > m_Options.MaximumInMemoryPayloadSize)) - { - ZEN_TRACE_CPU("WriteTempPartialBlock"); - - IoHashStream RangeId; - for (const std::pair<uint64_t, uint64_t>& Range : BlockOffsetAndLengths) - { - RangeId.Append(&Range.first, sizeof(uint64_t)); - RangeId.Append(&Range.second, sizeof(uint64_t)); - } - - // Could not be moved and rather large, lets store it on disk - BlockChunkPath = m_TempBlockFolderPath / fmt::format("{}_{}", BlockDescription.BlockHash, RangeId.GetHash()); - TemporaryFile::SafeWriteFile(BlockChunkPath, BlockRangeBuffer); - BlockRangeBuffer = {}; - } if (!m_AbortFlag) { OnDownloaded(std::move(BlockRangeBuffer), std::move(BlockChunkPath), BlockRangeStartIndex, BlockOffsetAndLengths); @@ -3570,16 +3461,36 @@ BuildsOperationUpdateFolder::DownloadPartialBlock( // Jupiter will ignore the ranges and send the whole payload if it fetches the payload from S3 // Upload to cache (if enabled) and use the whole payload for the remaining ranges - if (m_Storage.CacheStorage && m_Options.PopulateCache) + const uint64_t Size = RangeBuffers.PayloadBuffer.GetSize(); + + const bool PopulateCache = !ExistsInCache && m_Storage.CacheStorage && m_Options.PopulateCache; + + std::filesystem::path BlockPath = + TryMoveDownloadedChunk(RangeBuffers.PayloadBuffer, + m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(), + /* ForceDiskBased */ PopulateCache || Size > m_Options.MaximumInMemoryPayloadSize); + if (!BlockPath.empty()) + { + RangeBuffers.PayloadBuffer = IoBufferBuilder::MakeFromFile(BlockPath); + if (!RangeBuffers.PayloadBuffer) + { + throw std::runtime_error( + fmt::format("Failed to read block {} from temporary path '{}'", BlockDescription.BlockHash, BlockPath)); + } + RangeBuffers.PayloadBuffer.SetDeleteOnClose(true); + } + + if (PopulateCache) { m_Storage.CacheStorage->PutBuildBlob(m_BuildId, BlockDescription.BlockHash, ZenContentType::kCompressedBinary, - CompositeBuffer(std::vector<IoBuffer>{RangeBuffers.PayloadBuffer})); - if (m_AbortFlag) - { - break; - } + CompositeBuffer(SharedBuffer(RangeBuffers.PayloadBuffer))); + } + + if (m_AbortFlag) + { + break; } SubRangeCount = Ranges.size() - SubRangeCountComplete; @@ -4331,6 +4242,7 @@ BuildsOperationUpdateFolder::WritePartialBlockChunksToCache(const ChunkBlockDesc void BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::path& ZenFolderPath, uint32_t RemoteChunkIndex, + const BlobsExistsResult& ExistsResult, std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>&& ChunkTargetPtrs, BufferedWriteFileCache& WriteCache, ParallelWork& Work, @@ -4338,8 +4250,7 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, std::atomic<uint64_t>& WritePartsComplete, const uint64_t TotalPartWriteCount, - FilteredRate& FilteredWrittenBytesPerSecond, - bool EnableBacklog) + FilteredRate& FilteredWrittenBytesPerSecond) { ZEN_TRACE_CPU("AsyncWriteDownloadedChunk"); @@ -4347,43 +4258,28 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa const uint64_t Size = Payload.GetSize(); - std::filesystem::path CompressedChunkPath; + const bool ExistsInCache = m_Storage.CacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash); + + const bool PopulateCache = !ExistsInCache && m_Storage.CacheStorage && m_Options.PopulateCache; - // Check if the dowloaded chunk is file based and we can move it directly without rewriting it + std::filesystem::path CompressedChunkPath = + TryMoveDownloadedChunk(Payload, + m_TempDownloadFolderPath / ChunkHash.ToHexString(), + /* ForceDiskBased */ PopulateCache || Size > m_Options.MaximumInMemoryPayloadSize); + if (PopulateCache) { - IoBufferFileReference FileRef; - if (Payload.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == Size)) + IoBuffer CacheBlob = IoBufferBuilder::MakeFromFile(CompressedChunkPath); + if (CacheBlob) { - ZEN_TRACE_CPU("MoveTempChunk"); - std::error_code Ec; - std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); - if (!Ec) - { - Payload.SetDeleteOnClose(false); - Payload = {}; - CompressedChunkPath = m_TempDownloadFolderPath / ChunkHash.ToHexString(); - RenameFile(TempBlobPath, CompressedChunkPath, Ec); - if (Ec) - { - CompressedChunkPath = std::filesystem::path{}; - - // Re-open the temp file again - BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); - Payload = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, Size, true); - Payload.SetDeleteOnClose(true); - } - } + m_Storage.CacheStorage->PutBuildBlob(m_BuildId, + ChunkHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(SharedBuffer(CacheBlob))); } } - if (CompressedChunkPath.empty() && (Size > m_Options.MaximumInMemoryPayloadSize)) - { - ZEN_TRACE_CPU("WriteTempChunk"); - // Could not be moved and rather large, lets store it on disk - CompressedChunkPath = m_TempDownloadFolderPath / ChunkHash.ToHexString(); - TemporaryFile::SafeWriteFile(CompressedChunkPath, Payload); - Payload = {}; - } + IoBufferFileReference FileRef; + bool EnableBacklog = !CompressedChunkPath.empty() || Payload.GetFileReference(FileRef); Work.ScheduleWork( m_IOWorkerPool, |