diff options
| author | Dan Engelbrecht <[email protected]> | 2025-03-10 18:33:24 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-03-10 18:33:24 +0100 |
| commit | 7de3d4218ee5969af6147f9ab20bda538a136d9a (patch) | |
| tree | 77e10f8e3a165275bbb0bfa516eb65a854cd75ec /src | |
| parent | partial block fetch (#298) (diff) | |
| download | zen-7de3d4218ee5969af6147f9ab20bda538a136d9a.tar.xz zen-7de3d4218ee5969af6147f9ab20bda538a136d9a.zip | |
pick up existing cache (#299)
- Improvement: Scavenge .zen temp folders for existing data (downloaded, decompressed or written) from previous failed run
- Improvement: Faster abort during stream compression
- Improvement: Try to move downloaded blobs with rename if possible avoiding an extra disk write
- Improvement: Only clean temp folders on successful or cancelled build - keep it if download fails
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 2376 | ||||
| -rw-r--r-- | src/zen/cmds/builds_cmd.h | 8 | ||||
| -rw-r--r-- | src/zencore/basicfile.cpp | 37 | ||||
| -rw-r--r-- | src/zencore/compress.cpp | 51 | ||||
| -rw-r--r-- | src/zencore/include/zencore/basicfile.h | 2 | ||||
| -rw-r--r-- | src/zencore/include/zencore/compress.h | 2 | ||||
| -rw-r--r-- | src/zencore/workthreadpool.cpp | 2 | ||||
| -rw-r--r-- | src/zenutil/chunkblock.cpp | 51 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/chunkblock.h | 9 |
9 files changed, 1651 insertions, 887 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index 1c9476b96..2c7a73fb3 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -83,15 +83,24 @@ namespace { const double DefaultLatency = 0; // .0010; const double DefaultDelayPerKBSec = 0; // 0.00005; - const std::string ZenFolderName = ".zen"; - const std::string ZenStateFilePath = fmt::format("{}/current_state.cbo", ZenFolderName); - const std::string ZenStateFileJsonPath = fmt::format("{}/current_state.json", ZenFolderName); - const std::string ZenTempFolderName = fmt::format("{}/tmp", ZenFolderName); - const std::string ZenTempCacheFolderName = fmt::format("{}/cache", ZenTempFolderName); - const std::string ZenTempStorageFolderName = fmt::format("{}/storage", ZenTempFolderName); - const std::string ZenTempBlockFolderName = fmt::format("{}/blocks", ZenTempFolderName); - const std::string ZenTempChunkFolderName = fmt::format("{}/chunks", ZenTempFolderName); - const std::string ZenExcludeManifestName = ".zen_exclude_manifest.txt"; + const std::string ZenFolderName = ".zen"; + const std::string ZenStateFilePath = fmt::format("{}/current_state.cbo", ZenFolderName); + const std::string ZenStateFileJsonPath = fmt::format("{}/current_state.json", ZenFolderName); + const std::string ZenTempFolderName = fmt::format("{}/tmp", ZenFolderName); + + const std::string ZenTempCacheFolderName = + fmt::format("{}/cache", ZenTempFolderName); // Decompressed and verified data - chunks & sequences + const std::string ZenTempBlockFolderName = fmt::format("{}/blocks", ZenTempFolderName); // Temp storage for whole and partial blocks + const std::string ZenTempChunkFolderName = + fmt::format("{}/chunks", ZenTempFolderName); // Temp storage for decompressed and validated chunks + + const std::string ZenTempDownloadFolderName = + fmt::format("{}/download", ZenTempFolderName); // Temp storage for unverfied downloaded blobs + + const std::string ZenTempStorageFolderName = + fmt::format("{}/storage", ZenTempFolderName); // Temp storage folder for BuildStorage implementations + + const std::string ZenExcludeManifestName = ".zen_exclude_manifest.txt"; const std::string UnsyncFolderName = ".unsync"; @@ -231,21 +240,31 @@ namespace { return AuthToken; } - CompositeBuffer WriteToTempFileIfNeeded(const CompositeBuffer& Buffer, - const std::filesystem::path& TempFolderPath, - const IoHash& Hash, - const std::string& Suffix = {}) + bool IsBufferDiskBased(const IoBuffer& Buffer) { - // If this is a file based buffer or a compressed buffer with a memory-based header, we don't need to rewrite to disk to save memory - std::span<const SharedBuffer> Segments = Buffer.GetSegments(); - ZEN_ASSERT(Buffer.GetSegments().size() > 0); IoBufferFileReference FileRef; - if (Segments.back().GetFileReference(FileRef)) + if (Buffer.GetFileReference(FileRef)) { - return Buffer; + return true; } + return false; + } + + bool IsBufferDiskBased(const CompositeBuffer& Buffer) + { + // If this is a file based buffer or a compressed buffer with a memory-based header, we don't need to rewrite to disk to save memory + std::span<const SharedBuffer> Segments = Buffer.GetSegments(); + ZEN_ASSERT(Buffer.GetSegments().size() > 0); + return IsBufferDiskBased(Segments.back().AsIoBuffer()); + } + + IoBuffer WriteToTempFile(CompositeBuffer&& Buffer, + const std::filesystem::path& TempFolderPath, + const IoHash& Hash, + const std::string& Suffix = {}) + { std::filesystem::path TempFilePath = (TempFolderPath / (Hash.ToHexString() + Suffix)).make_preferred(); - return CompositeBuffer(WriteToTempFile(Buffer, TempFilePath)); + return WriteToTempFile(std::move(Buffer), TempFilePath); } class FilteredRate @@ -1109,12 +1128,22 @@ namespace { IoHashStream Hash; bool CouldDecompress = Compressed.DecompressToStream(0, RawSize, [&Hash](uint64_t, const CompositeBuffer& RangeBuffer) { - for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) + if (!AbortFlag) { - Hash.Append(Segment.GetView()); + for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) + { + Hash.Append(Segment.GetView()); + } + return true; } + return false; }); + if (AbortFlag) + { + return CompositeBuffer{}; + } + if (!CouldDecompress) { throw std::runtime_error( @@ -1299,14 +1328,14 @@ namespace { { Work.ScheduleWork( VerifyPool, - [&, Payload = std::move(Payload), ChunkAttachment](std::atomic<bool>&) { + [&, Payload = std::move(Payload), ChunkAttachment](std::atomic<bool>&) mutable { if (!AbortFlag) { FilteredVerifiedBytesPerSecond.Start(); uint64_t CompressedSize; uint64_t DecompressedSize; - ValidateBlob(IoBuffer(Payload), ChunkAttachment, CompressedSize, DecompressedSize); + ValidateBlob(std::move(Payload), ChunkAttachment, CompressedSize, DecompressedSize); ZEN_CONSOLE_VERBOSE("Chunk attachment {} ({} -> {}) is valid", ChunkAttachment, NiceBytes(CompressedSize), @@ -1349,14 +1378,14 @@ namespace { { Work.ScheduleWork( VerifyPool, - [&, Payload = std::move(Payload), BlockAttachment](std::atomic<bool>&) { + [&, Payload = std::move(Payload), BlockAttachment](std::atomic<bool>&) mutable { if (!AbortFlag) { FilteredVerifiedBytesPerSecond.Start(); uint64_t CompressedSize; uint64_t DecompressedSize; - ValidateChunkBlock(IoBuffer(Payload), BlockAttachment, CompressedSize, DecompressedSize); + ValidateChunkBlock(std::move(Payload), BlockAttachment, CompressedSize, DecompressedSize); ZEN_CONSOLE_VERBOSE("Chunk block {} ({} -> {}) is valid", BlockAttachment, NiceBytes(CompressedSize), @@ -1564,8 +1593,12 @@ namespace { { throw std::runtime_error(fmt::format("Failed to compress large blob {}", ChunkHash)); } - CompositeBuffer TempPayload = WriteToTempFileIfNeeded(CompressedBlob.GetCompressed(), TempFolderPath, ChunkHash); - return CompressedBuffer::FromCompressedNoValidate(std::move(TempPayload)).GetCompressed(); + if (!IsBufferDiskBased(CompressedBlob.GetCompressed())) + { + IoBuffer TempPayload = WriteToTempFile(std::move(CompressedBlob).GetCompressed(), TempFolderPath, ChunkHash); + CompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(TempPayload)); + } + return std::move(CompressedBlob).GetCompressed(); } struct GeneratedBlocks @@ -1637,9 +1670,13 @@ namespace { OutBlocks.BlockSizes[BlockIndex] = CompressedBlock.GetCompressedSize(); - CompositeBuffer Payload = WriteToTempFileIfNeeded(CompressedBlock.GetCompressed(), - Path / ZenTempBlockFolderName, - OutBlocks.BlockDescriptions[BlockIndex].BlockHash); + if (!IsBufferDiskBased(CompressedBlock.GetCompressed())) + { + IoBuffer TempPayload = WriteToTempFile(std::move(CompressedBlock).GetCompressed(), + Path / ZenTempBlockFolderName, + OutBlocks.BlockDescriptions[BlockIndex].BlockHash); + CompressedBlock = CompressedBuffer::FromCompressedNoValidate(std::move(TempPayload)); + } { CbObjectWriter Writer; Writer.AddString("createdBy", "zen"); @@ -1663,7 +1700,7 @@ namespace { PendingUploadCount++; Work.ScheduleWork( UploadBlocksPool, - [&, BlockIndex, Payload = std::move(Payload)](std::atomic<bool>&) { + [&, BlockIndex, Payload = std::move(CompressedBlock).GetCompressed()](std::atomic<bool>&) mutable { if (!AbortFlag) { if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount) @@ -1682,12 +1719,16 @@ namespace { BuildChunkBlockDescription(OutBlocks.BlockDescriptions[BlockIndex], OutBlocks.BlockMetaDatas[BlockIndex]); - const IoHash& BlockHash = OutBlocks.BlockDescriptions[BlockIndex].BlockHash; - Storage.PutBuildBlob(BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload); - UploadStats.BlocksBytes += Payload.GetSize(); + const IoHash& BlockHash = OutBlocks.BlockDescriptions[BlockIndex].BlockHash; + const uint64_t CompressedBlockSize = Payload.GetSize(); + Storage.PutBuildBlob(BuildId, + BlockHash, + ZenContentType::kCompressedBinary, + std::move(Payload)); + UploadStats.BlocksBytes += CompressedBlockSize; ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks", OutBlocks.BlockDescriptions[BlockIndex].BlockHash, - NiceBytes(Payload.GetSize()), + NiceBytes(CompressedBlockSize), OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); Storage.PutBlockMetadata(BuildId, @@ -1817,7 +1858,7 @@ namespace { auto AsyncUploadBlock = [&](const size_t BlockIndex, const IoHash BlockHash, CompositeBuffer&& Payload) { Work.ScheduleWork( UploadChunkPool, - [&, BlockIndex, BlockHash, Payload = std::move(Payload)](std::atomic<bool>&) { + [&, BlockIndex, BlockHash, Payload = std::move(Payload)](std::atomic<bool>&) mutable { if (!AbortFlag) { FilteredUploadedBytesPerSecond.Start(); @@ -1855,7 +1896,7 @@ namespace { auto AsyncUploadLooseChunk = [&](const IoHash& RawHash, uint64_t RawSize, CompositeBuffer&& Payload) { Work.ScheduleWork( UploadChunkPool, - [&, RawHash, RawSize, Payload = CompositeBuffer(std::move(Payload))](std::atomic<bool>&) { + [&, RawHash, RawSize, Payload = CompositeBuffer(std::move(Payload))](std::atomic<bool>&) mutable { if (!AbortFlag) { const uint64_t PayloadSize = Payload.GetSize(); @@ -1868,7 +1909,7 @@ namespace { ZenContentType::kCompressedBinary, PayloadSize, [Payload = std::move(Payload), &FilteredUploadedBytesPerSecond](uint64_t Offset, - uint64_t Size) -> IoBuffer { + uint64_t Size) mutable -> IoBuffer { FilteredUploadedBytesPerSecond.Start(); IoBuffer PartPayload = Payload.Mid(Offset, Size).Flatten().AsIoBuffer(); @@ -1974,9 +2015,11 @@ namespace { } ZEN_ASSERT(BlockDescription.BlockHash == BlockHash); - CompositeBuffer Payload = WriteToTempFileIfNeeded(CompressedBlock.GetCompressed(), - Path / ZenTempBlockFolderName, - BlockDescription.BlockHash); + CompositeBuffer Payload = IsBufferDiskBased(CompressedBlock.GetCompressed()) + ? std::move(CompressedBlock).GetCompressed() + : CompositeBuffer(WriteToTempFile(std::move(CompressedBlock).GetCompressed(), + Path / ZenTempBlockFolderName, + BlockDescription.BlockHash)); GenerateBlocksStats.GeneratedBlockByteCount += NewBlocks.BlockSizes[BlockIndex]; GenerateBlocksStats.GeneratedBlockCount++; @@ -3289,33 +3332,40 @@ namespace { uint64_t FileOffset, uint64_t TargetFinalSize) { + ZEN_TRACE_CPU("WriteFileCache_WriteToFile"); if (!SeenTargetIndexes.empty() && SeenTargetIndexes.back() == TargetIndex) { + ZEN_TRACE_CPU("WriteFileCache_WriteToFile_CacheWrite"); ZEN_ASSERT(OpenFileWriter); OpenFileWriter->Write(Buffer, FileOffset); } else { - Flush(); - const std::filesystem::path& TargetPath = GetTargetPath(TargetIndex); - CreateDirectories(TargetPath.parent_path()); - uint32_t Tries = 5; - std::unique_ptr<BasicFile> NewOutputFile( - std::make_unique<BasicFile>(TargetPath, BasicFile::Mode::kWrite, [&Tries, TargetPath](std::error_code& Ec) { - if (Tries < 3) - { - ZEN_CONSOLE("Failed opening file '{}': {}{}", TargetPath, Ec.message(), Tries > 1 ? " Retrying"sv : ""sv); - } - if (Tries > 1) - { - Sleep(100); - } - return --Tries > 0; - })); + std::unique_ptr<BasicFile> NewOutputFile; + { + ZEN_TRACE_CPU("WriteFileCache_WriteToFile_Open"); + Flush(); + const std::filesystem::path& TargetPath = GetTargetPath(TargetIndex); + CreateDirectories(TargetPath.parent_path()); + uint32_t Tries = 5; + NewOutputFile = + std::make_unique<BasicFile>(TargetPath, BasicFile::Mode::kWrite, [&Tries, TargetPath](std::error_code& Ec) { + if (Tries < 3) + { + ZEN_CONSOLE("Failed opening file '{}': {}{}", TargetPath, Ec.message(), Tries > 1 ? " Retrying"sv : ""sv); + } + if (Tries > 1) + { + Sleep(100); + } + return --Tries > 0; + }); + } const bool CacheWriter = TargetFinalSize > Buffer.GetSize(); if (CacheWriter) { + ZEN_TRACE_CPU("WriteFileCache_WriteToFile_CacheWrite"); ZEN_ASSERT_SLOW(std::find(SeenTargetIndexes.begin(), SeenTargetIndexes.end(), TargetIndex) == SeenTargetIndexes.end()); OutputFile = std::move(NewOutputFile); @@ -3325,6 +3375,7 @@ namespace { } else { + ZEN_TRACE_CPU("WriteFileCache_WriteToFile_Write"); NewOutputFile->Write(Buffer, FileOffset); } } @@ -3332,6 +3383,7 @@ namespace { void Flush() { + ZEN_TRACE_CPU("WriteFileCache_Flush"); OpenFileWriter = {}; OutputFile = {}; } @@ -3376,7 +3428,7 @@ namespace { const ChunkedFolderContent& RemoteContent, const ChunkedContentLookup& Lookup, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, - const BlockWriteOps Ops, + const BlockWriteOps& Ops, std::atomic<uint32_t>& OutChunksComplete, std::atomic<uint64_t>& OutBytesWritten) { @@ -3420,13 +3472,19 @@ namespace { if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1) { const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; - const IoHash VerifyChunkHash = - IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash))); - if (VerifyChunkHash != SequenceRawHash) { - throw std::runtime_error( - fmt::format("Written hunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash)); + ZEN_TRACE_CPU("VerifyChunkHash"); + const IoHash VerifyChunkHash = IoHash::HashBuffer( + IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash))); + if (VerifyChunkHash != SequenceRawHash) + { + throw std::runtime_error(fmt::format("Written chunk sequence {} hash does not match expected hash {}", + VerifyChunkHash, + SequenceRawHash)); + } } + ZEN_TRACE_CPU("VerifyChunkHashes_rename"); + ZEN_ASSERT_SLOW(!std::filesystem::exists(GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash))); std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash), GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)); } @@ -3437,139 +3495,38 @@ namespace { bool GetBlockWriteOps(const ChunkedFolderContent& RemoteContent, const ChunkedContentLookup& Lookup, + std::span<const IoHash> ChunkRawHashes, + std::span<const uint32_t> ChunkCompressedLengths, + std::span<const uint32_t> ChunkRawLengths, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, - const CompositeBuffer& DecompressedBlockBuffer, + CompositeBuffer&& PartialBlockBuffer, + uint32_t FirstIncludedBlockChunkIndex, + uint32_t LastIncludedBlockChunkIndex, BlockWriteOps& OutOps) { ZEN_TRACE_CPU("GetBlockWriteOps"); - SharedBuffer BlockBuffer = DecompressedBlockBuffer.Flatten(); - uint64_t HeaderSize = 0; - if (IterateChunkBlock( - BlockBuffer, - [&](CompressedBuffer&& Chunk, const IoHash& ChunkHash) { - if (auto It = Lookup.ChunkHashToChunkIndex.find(ChunkHash); It != Lookup.ChunkHashToChunkIndex.end()) - { - const uint32_t ChunkIndex = It->second; - std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = - GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, Lookup, ChunkIndex); - - if (!ChunkTargetPtrs.empty()) - { - bool NeedsWrite = true; - if (RemoteChunkIndexNeedsCopyFromSourceFlags[ChunkIndex].compare_exchange_strong(NeedsWrite, false)) - { - CompositeBuffer Decompressed = Chunk.DecompressToComposite(); - if (!Decompressed) - { - throw std::runtime_error(fmt::format("Decompression of build blob {} failed", ChunkHash)); - } - ZEN_ASSERT_SLOW(ChunkHash == IoHash::HashBuffer(Decompressed)); - ZEN_ASSERT(Decompressed.GetSize() == RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]); - for (const ChunkedContentLookup::ChunkSequenceLocation* Target : ChunkTargetPtrs) - { - OutOps.WriteOps.push_back( - BlockWriteOps::WriteOpData{.Target = Target, .ChunkBufferIndex = OutOps.ChunkBuffers.size()}); - } - OutOps.ChunkBuffers.emplace_back(std::move(Decompressed)); - } - } - } - }, - HeaderSize)) - { - std::sort(OutOps.WriteOps.begin(), - OutOps.WriteOps.end(), - [](const BlockWriteOps::WriteOpData& Lhs, const BlockWriteOps::WriteOpData& Rhs) { - if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex) - { - return true; - } - if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex) - { - return false; - } - return Lhs.Target->Offset < Rhs.Target->Offset; - }); - - return true; + MemoryView BlockMemoryView; + UniqueBuffer BlockMemoryBuffer; + IoBufferFileReference FileRef = {}; + if (PartialBlockBuffer.GetSegments().size() == 1 && PartialBlockBuffer.GetSegments()[0].AsIoBuffer().GetFileReference(FileRef)) + { + BlockMemoryBuffer = UniqueBuffer::Alloc(FileRef.FileChunkSize); + BasicFile Reader; + Reader.Attach(FileRef.FileHandle); + Reader.Read(BlockMemoryBuffer.GetData(), FileRef.FileChunkSize, FileRef.FileChunkOffset); + BlockMemoryView = BlockMemoryBuffer.GetView(); + Reader.Detach(); } else { - return false; - } - } - - bool WriteBlockToDisk(const std::filesystem::path& CacheFolderPath, - const ChunkedFolderContent& RemoteContent, - const ChunkBlockDescription& BlockDescription, - std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, - const CompositeBuffer& BlockBuffer, - const ChunkedContentLookup& Lookup, - std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, - std::atomic<uint32_t>& OutChunksComplete, - std::atomic<uint64_t>& OutBytesWritten) - { - ZEN_TRACE_CPU("WriteBlockToDisk"); - - IoHash BlockRawHash; - uint64_t BlockRawSize; - CompressedBuffer CompressedBlockBuffer = CompressedBuffer::FromCompressed(BlockBuffer, BlockRawHash, BlockRawSize); - if (!CompressedBlockBuffer) - { - throw std::runtime_error(fmt::format("Block {} is not a compressed buffer", BlockDescription.BlockHash)); - } - - if (BlockRawHash != BlockDescription.BlockHash) - { - throw std::runtime_error( - fmt::format("Block {} header has a mismatching raw hash {}", BlockDescription.BlockHash, BlockRawHash)); - } - - CompositeBuffer DecompressedBlockBuffer = CompressedBlockBuffer.DecompressToComposite(); - if (!DecompressedBlockBuffer) - { - throw std::runtime_error(fmt::format("Block {} failed to decompress", BlockDescription.BlockHash)); - } - - ZEN_ASSERT_SLOW(BlockDescription.BlockHash == IoHash::HashBuffer(DecompressedBlockBuffer)); - - BlockWriteOps Ops; - if (GetBlockWriteOps(RemoteContent, - Lookup, - SequenceIndexChunksLeftToWriteCounters, - RemoteChunkIndexNeedsCopyFromSourceFlags, - DecompressedBlockBuffer, - Ops)) - { - WriteBlockChunkOps(CacheFolderPath, - RemoteContent, - Lookup, - SequenceIndexChunksLeftToWriteCounters, - Ops, - OutChunksComplete, - OutBytesWritten); - return true; + BlockMemoryView = PartialBlockBuffer.ViewOrCopyRange(0, PartialBlockBuffer.GetSize(), BlockMemoryBuffer); } - return false; - } - - bool GetPartialBlockWriteOps(const ChunkedFolderContent& RemoteContent, - const ChunkedContentLookup& Lookup, - const ChunkBlockDescription& BlockDescription, - std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, - std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, - const CompositeBuffer& PartialBlockBuffer, - uint32_t FirstIncludedBlockChunkIndex, - uint32_t LastIncludedBlockChunkIndex, - BlockWriteOps& OutOps) - { - ZEN_TRACE_CPU("GetPartialBlockWriteOps"); uint32_t OffsetInBlock = 0; for (uint32_t ChunkBlockIndex = FirstIncludedBlockChunkIndex; ChunkBlockIndex <= LastIncludedBlockChunkIndex; ChunkBlockIndex++) { - const uint32_t ChunkCompressedSize = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex]; - const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex]; + const uint32_t ChunkCompressedSize = ChunkCompressedLengths[ChunkBlockIndex]; + const IoHash& ChunkHash = ChunkRawHashes[ChunkBlockIndex]; if (auto It = Lookup.ChunkHashToChunkIndex.find(ChunkHash); It != Lookup.ChunkHashToChunkIndex.end()) { const uint32_t ChunkIndex = It->second; @@ -3581,7 +3538,9 @@ namespace { bool NeedsWrite = true; if (RemoteChunkIndexNeedsCopyFromSourceFlags[ChunkIndex].compare_exchange_strong(NeedsWrite, false)) { - CompositeBuffer Chunk = PartialBlockBuffer.Mid(OffsetInBlock, ChunkCompressedSize); + // CompositeBuffer Chunk = PartialBlockBuffer.Mid(OffsetInBlock, ChunkCompressedSize); + MemoryView ChunkMemory = BlockMemoryView.Mid(OffsetInBlock, ChunkCompressedSize); + CompositeBuffer Chunk = CompositeBuffer(IoBuffer(IoBuffer::Wrap, ChunkMemory.GetData(), ChunkMemory.GetSize())); IoHash VerifyChunkHash; uint64_t VerifyRawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Chunk, VerifyChunkHash, VerifyRawSize); @@ -3593,9 +3552,12 @@ namespace { { ZEN_ASSERT(false); } - if (VerifyRawSize != BlockDescription.ChunkRawLengths[ChunkBlockIndex]) + if (!ChunkRawLengths.empty()) { - ZEN_ASSERT(false); + if (VerifyRawSize != ChunkRawLengths[ChunkBlockIndex]) + { + ZEN_ASSERT(false); + } } CompositeBuffer Decompressed = Compressed.DecompressToComposite(); if (!Decompressed) @@ -3632,11 +3594,85 @@ namespace { return true; } + bool WriteBlockToDisk(const std::filesystem::path& CacheFolderPath, + const ChunkedFolderContent& RemoteContent, + const ChunkBlockDescription& BlockDescription, + std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + CompositeBuffer&& BlockBuffer, + const ChunkedContentLookup& Lookup, + std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, + std::atomic<uint32_t>& OutChunksComplete, + std::atomic<uint64_t>& OutBytesWritten) + { + ZEN_TRACE_CPU("WriteBlockToDisk"); + + BlockWriteOps Ops; + if ((BlockDescription.HeaderSize == 0) || BlockDescription.ChunkCompressedLengths.empty()) + { + ZEN_TRACE_CPU("WriteBlockToDisk_Legacy"); + + UniqueBuffer CopyBuffer; + const MemoryView BlockView = BlockBuffer.ViewOrCopyRange(0, BlockBuffer.GetSize(), CopyBuffer); + uint64_t HeaderSize; + const std::vector<uint32_t> ChunkCompressedLengths = ReadChunkBlockHeader(BlockView, HeaderSize); + + CompositeBuffer PartialBlockBuffer = std::move(BlockBuffer).Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder() + HeaderSize); + + if (GetBlockWriteOps(RemoteContent, + Lookup, + BlockDescription.ChunkRawHashes, + ChunkCompressedLengths, + BlockDescription.ChunkRawLengths, + SequenceIndexChunksLeftToWriteCounters, + RemoteChunkIndexNeedsCopyFromSourceFlags, + std::move(PartialBlockBuffer), + 0, + gsl::narrow<uint32_t>(BlockDescription.ChunkRawHashes.size() - 1), + Ops)) + { + WriteBlockChunkOps(CacheFolderPath, + RemoteContent, + Lookup, + SequenceIndexChunksLeftToWriteCounters, + Ops, + OutChunksComplete, + OutBytesWritten); + return true; + } + return false; + } + + CompositeBuffer PartialBlockBuffer = + std::move(BlockBuffer).Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); + if (GetBlockWriteOps(RemoteContent, + Lookup, + BlockDescription.ChunkRawHashes, + BlockDescription.ChunkCompressedLengths, + BlockDescription.ChunkRawLengths, + SequenceIndexChunksLeftToWriteCounters, + RemoteChunkIndexNeedsCopyFromSourceFlags, + std::move(PartialBlockBuffer), + 0, + gsl::narrow<uint32_t>(BlockDescription.ChunkRawHashes.size() - 1), + Ops)) + { + WriteBlockChunkOps(CacheFolderPath, + RemoteContent, + Lookup, + SequenceIndexChunksLeftToWriteCounters, + Ops, + OutChunksComplete, + OutBytesWritten); + return true; + } + return false; + } + bool WritePartialBlockToDisk(const std::filesystem::path& CacheFolderPath, const ChunkedFolderContent& RemoteContent, const ChunkBlockDescription& BlockDescription, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, - const CompositeBuffer& PartialBlockBuffer, + CompositeBuffer&& PartialBlockBuffer, uint32_t FirstIncludedBlockChunkIndex, uint32_t LastIncludedBlockChunkIndex, const ChunkedContentLookup& Lookup, @@ -3646,15 +3682,17 @@ namespace { { ZEN_TRACE_CPU("WritePartialBlockToDisk"); BlockWriteOps Ops; - if (GetPartialBlockWriteOps(RemoteContent, - Lookup, - BlockDescription, - SequenceIndexChunksLeftToWriteCounters, - RemoteChunkIndexNeedsCopyFromSourceFlags, - PartialBlockBuffer, - FirstIncludedBlockChunkIndex, - LastIncludedBlockChunkIndex, - Ops)) + if (GetBlockWriteOps(RemoteContent, + Lookup, + BlockDescription.ChunkRawHashes, + BlockDescription.ChunkCompressedLengths, + BlockDescription.ChunkRawLengths, + SequenceIndexChunksLeftToWriteCounters, + RemoteChunkIndexNeedsCopyFromSourceFlags, + std::move(PartialBlockBuffer), + FirstIncludedBlockChunkIndex, + LastIncludedBlockChunkIndex, + Ops)) { WriteBlockChunkOps(CacheFolderPath, RemoteContent, @@ -3671,7 +3709,7 @@ namespace { } } - SharedBuffer Decompress(const CompositeBuffer& CompressedChunk, const IoHash& ChunkHash, const uint64_t ChunkRawSize) + SharedBuffer Decompress(CompositeBuffer&& CompressedChunk, const IoHash& ChunkHash, const uint64_t ChunkRawSize) { ZEN_TRACE_CPU("Decompress"); @@ -3709,7 +3747,7 @@ namespace { const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> ChunkTargets, - const CompositeBuffer& ChunkData, + CompositeBuffer&& ChunkData, WriteFileCache& OpenFileCache, std::atomic<uint64_t>& OutBytesWritten) { @@ -3734,8 +3772,8 @@ namespace { } } - bool CanStreamDecompress(const ChunkedFolderContent& RemoteContent, - const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> Locations) + bool CanDecompressDirectToSequence(const ChunkedFolderContent& RemoteContent, + const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> Locations) { if (Locations.size() == 1) { @@ -3776,13 +3814,25 @@ namespace { } IoHashStream Hash; bool CouldDecompress = Compressed.DecompressToStream(0, (uint64_t)-1, [&](uint64_t Offset, const CompositeBuffer& RangeBuffer) { - DecompressedTemp.Write(RangeBuffer, Offset); - for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) + ZEN_TRACE_CPU("StreamDecompress_Write"); + if (!AbortFlag) { - Hash.Append(Segment.GetView()); + DecompressedTemp.Write(RangeBuffer, Offset); + for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) + { + Hash.Append(Segment.GetView()); + } + WriteToDiskBytes += RangeBuffer.GetSize(); + return true; } - WriteToDiskBytes += RangeBuffer.GetSize(); + return false; }); + + if (AbortFlag) + { + return; + } + if (!CouldDecompress) { throw std::runtime_error(fmt::format("Failed to decompress large blob {}", SequenceRawHash)); @@ -3801,9 +3851,82 @@ namespace { } } + bool WriteCompressedChunk(const std::filesystem::path& TargetFolder, + const ChunkedFolderContent& RemoteContent, + const ChunkedContentLookup& RemoteLookup, + const IoHash& ChunkHash, + const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs, + IoBuffer&& CompressedPart, + std::atomic<uint64_t>& WriteToDiskBytes) + { + auto ChunkHashToChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); + ZEN_ASSERT(ChunkHashToChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end()); + if (CanDecompressDirectToSequence(RemoteContent, ChunkTargetPtrs)) + { + const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[ChunkTargetPtrs.front()->SequenceIndex]; + StreamDecompress(TargetFolder, SequenceRawHash, CompositeBuffer(std::move(CompressedPart)), WriteToDiskBytes); + } + else + { + const uint32_t ChunkIndex = ChunkHashToChunkIndexIt->second; + SharedBuffer Chunk = + Decompress(CompositeBuffer(std::move(CompressedPart)), ChunkHash, RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]); + + if (!AbortFlag) + { + WriteFileCache OpenFileCache; + + WriteChunkToDisk(TargetFolder, + RemoteContent, + RemoteLookup, + ChunkTargetPtrs, + CompositeBuffer(std::move(Chunk)), + OpenFileCache, + WriteToDiskBytes); + return true; + } + } + return false; + } + + void CompleteChunkTargets(const std::filesystem::path& TargetFolder, + const ChunkedFolderContent& RemoteContent, + const IoHash& ChunkHash, + const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs, + std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + const bool NeedHashVerify) + { + ZEN_TRACE_CPU("CompleteChunkTargets"); + + for (const ChunkedContentLookup::ChunkSequenceLocation* Location : ChunkTargetPtrs) + { + const uint32_t RemoteSequenceIndex = Location->SequenceIndex; + if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1) + { + const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + if (NeedHashVerify) + { + ZEN_TRACE_CPU("VerifyChunkHash"); + + const IoHash VerifyChunkHash = + IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash))); + if (VerifyChunkHash != ChunkHash) + { + throw std::runtime_error( + fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, ChunkHash)); + } + } + + ZEN_TRACE_CPU("RenameToFinal"); + ZEN_ASSERT_SLOW(!std::filesystem::exists(GetFinalChunkedSequenceFileName(TargetFolder, SequenceRawHash))); + std::filesystem::rename(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash), + GetFinalChunkedSequenceFileName(TargetFolder, SequenceRawHash)); + } + } + } + void DownloadLargeBlob(BuildStorage& Storage, - const std::filesystem::path& TempFolderPath, - const std::filesystem::path& CacheFolderPath, + const std::filesystem::path& Path, const ChunkedFolderContent& RemoteContent, const ChunkedContentLookup& RemoteLookup, const Oid& BuildId, @@ -3818,6 +3941,7 @@ namespace { std::atomic<uint64_t>& BytesDownloaded, std::atomic<uint64_t>& MultipartAttachmentCount, std::function<void(uint64_t DowloadedBytes)>&& OnDownloadComplete, + std::function<void()>&& OnWriteStart, std::function<void()>&& OnWriteComplete) { ZEN_TRACE_CPU("DownloadLargeBlob"); @@ -3828,8 +3952,11 @@ namespace { }; std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>()); + std::filesystem::path DownloadFolder = Path / ZenTempDownloadFolderName; + std::filesystem::path TargetFolder = Path / ZenTempCacheFolderName; + std::error_code Ec; - Workload->TempFile.CreateTemporary(TempFolderPath, Ec); + Workload->TempFile.CreateTemporary(DownloadFolder, Ec); if (Ec) { throw std::runtime_error( @@ -3839,7 +3966,8 @@ namespace { BuildId, ChunkHash, PreferredMultipartChunkSize, - [&CacheFolderPath, + [DownloadFolder, + TargetFolder, &RemoteContent, &RemoteLookup, &Work, @@ -3849,6 +3977,7 @@ namespace { &BytesDownloaded, OnDownloadComplete = std::move(OnDownloadComplete), OnWriteComplete = std::move(OnWriteComplete), + OnWriteStart = std::move(OnWriteStart), &WriteToDiskBytes, SequenceIndexChunksLeftToWriteCounters, ChunkTargetPtrs = std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>( @@ -3857,6 +3986,7 @@ namespace { if (!AbortFlag.load()) { + ZEN_TRACE_CPU("DownloadLargeBlob_Save"); Workload->TempFile.Write(Chunk.GetView(), Offset); if (Chunk.GetSize() == BytesRemaining) { @@ -3864,103 +3994,61 @@ namespace { Work.ScheduleWork( WritePool, // GetSyncWorkerPool(),// - [&CacheFolderPath, + [DownloadFolder, + TargetFolder, &RemoteContent, &RemoteLookup, ChunkHash, Workload, Offset, OnWriteComplete = std::move(OnWriteComplete), + OnWriteStart = std::move(OnWriteStart), &WriteToDiskBytes, SequenceIndexChunksLeftToWriteCounters, ChunkTargetPtrs](std::atomic<bool>&) { - ZEN_TRACE_CPU("DownloadLargeBlob_Work"); + ZEN_TRACE_CPU("DownloadLargeBlob_Write"); if (!AbortFlag) { - uint64_t CompressedSize = Workload->TempFile.FileSize(); - void* FileHandle = Workload->TempFile.Detach(); - IoBuffer CompressedPart = IoBuffer(IoBuffer::File, - FileHandle, - 0, - CompressedSize, - /*IsWholeFile*/ true); - if (!CompressedPart) + const std::filesystem::path CompressedChunkPath = DownloadFolder / ChunkHash.ToHexString(); + std::error_code Ec; + Workload->TempFile.MoveTemporaryIntoPlace(CompressedChunkPath, Ec); + if (Ec) { - throw std::runtime_error( - fmt::format("Multipart build blob {} is not a compressed buffer", ChunkHash)); + throw std::runtime_error(fmt::format("Failed moving downloaded chunk {} file to {}. Reason: {}", + ChunkHash, + CompressedChunkPath, + Ec.message())); } - CompressedPart.SetDeleteOnClose(true); - auto ChunkHashToChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); - ZEN_ASSERT(ChunkHashToChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end()); - bool NeedHashVerify = true; - if (CanStreamDecompress(RemoteContent, ChunkTargetPtrs)) + IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); + if (!CompressedPart) { - const IoHash& SequenceRawHash = - RemoteContent.ChunkedContent.SequenceRawHashes[ChunkTargetPtrs.front()->SequenceIndex]; - StreamDecompress(CacheFolderPath, - SequenceRawHash, - CompositeBuffer(std::move(CompressedPart)), - WriteToDiskBytes); - NeedHashVerify = false; - OnWriteComplete(); + throw std::runtime_error(fmt::format("Could not open dowloaded compressed chunk {} from {}", + ChunkHash, + CompressedChunkPath)); } - else - { - const uint32_t ChunkIndex = ChunkHashToChunkIndexIt->second; - SharedBuffer Chunk = Decompress(CompositeBuffer(std::move(CompressedPart)), - ChunkHash, - RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]); - // ZEN_ASSERT_SLOW(ChunkHash == - // IoHash::HashBuffer(Chunk.AsIoBuffer())); + OnWriteStart(); - if (!AbortFlag) - { - WriteFileCache OpenFileCache; - - WriteChunkToDisk(CacheFolderPath, - RemoteContent, - RemoteLookup, - ChunkTargetPtrs, - CompositeBuffer(Chunk), - OpenFileCache, - WriteToDiskBytes); - OnWriteComplete(); - } - } + bool NeedHashVerify = WriteCompressedChunk(TargetFolder, + RemoteContent, + RemoteLookup, + ChunkHash, + ChunkTargetPtrs, + std::move(CompressedPart), + WriteToDiskBytes); if (!AbortFlag) { - // Write tracking, updating this must be done without any files open (WriteFileCache) - for (const ChunkedContentLookup::ChunkSequenceLocation* Location : ChunkTargetPtrs) - { - const uint32_t RemoteSequenceIndex = Location->SequenceIndex; - if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1) - { - const IoHash& SequenceRawHash = - RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; - if (NeedHashVerify) - { - ZEN_TRACE_CPU("VerifyChunkHash"); + std::filesystem::remove(CompressedChunkPath); - const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile( - GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash))); - if (VerifyChunkHash != ChunkHash) - { - throw std::runtime_error( - fmt::format("Written chunk sequence {} hash does not match expected hash {}", - VerifyChunkHash, - ChunkHash)); - } - } - - ZEN_TRACE_CPU("VerifyChunkHashes_rename"); - std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash), - GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)); - } - } + CompleteChunkTargets(TargetFolder, + RemoteContent, + ChunkHash, + ChunkTargetPtrs, + SequenceIndexChunksLeftToWriteCounters, + NeedHashVerify); } } }, @@ -3977,6 +4065,7 @@ namespace { Work.ScheduleWork( NetworkPool, // GetSyncWorkerPool(),// [WorkItem = std::move(WorkItem)](std::atomic<bool>&) { + ZEN_TRACE_CPU("DownloadLargeBlob_Work"); if (!AbortFlag) { WorkItem(); @@ -4023,30 +4112,143 @@ namespace { Stopwatch CacheMappingTimer; - uint64_t CacheMappedBytesForReuse = 0; std::vector<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters(RemoteContent.ChunkedContent.SequenceRawHashes.size()); // std::vector<bool> RemoteSequenceIndexIsCachedFlags(RemoteContent.ChunkedContent.SequenceRawHashes.size(), false); - std::vector<bool> RemoteChunkIndexIsCachedFlags(RemoteContent.ChunkedContent.ChunkHashes.size()); + std::vector<bool> RemoteChunkIndexNeedsCopyFromLocalFileFlags(RemoteContent.ChunkedContent.ChunkHashes.size()); // Guard if he same chunks is in multiple blocks (can happen due to block reuse, cache reuse blocks writes directly) std::vector<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags(RemoteContent.ChunkedContent.ChunkHashes.size()); - // Pick up all whole files we can use from current local state - for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < RemoteContent.ChunkedContent.SequenceRawHashes.size(); - RemoteSequenceIndex++) + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedChunkHashesFound; + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedSequenceHashesFound; + uint64_t CachedChunkHashesByteCountFound = 0; + uint64_t CachedSequenceHashesByteCountFound = 0; { - const IoHash& RemoteSequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; - if (auto It = LocalLookup.RawHashToSequenceIndex.find(RemoteSequenceRawHash); It != LocalLookup.RawHashToSequenceIndex.end()) + ZEN_TRACE_CPU("UpdateFolder_CheckChunkCache"); + + DirectoryContent CacheDirContent; + GetDirectoryContent(CacheFolderPath, + DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeFileSizes, + CacheDirContent); + for (size_t Index = 0; Index < CacheDirContent.Files.size(); Index++) { - SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = 0; - const uint32_t RemotePathIndex = GetFirstPathIndexForRawHash(RemoteLookup, RemoteSequenceRawHash); - CacheMappedBytesForReuse += RemoteContent.RawSizes[RemotePathIndex]; + IoHash FileHash; + if (IoHash::TryParse(CacheDirContent.Files[Index].filename().string(), FileHash)) + { + if (auto ChunkIt = RemoteLookup.ChunkHashToChunkIndex.find(FileHash); + ChunkIt != RemoteLookup.ChunkHashToChunkIndex.end()) + { + const uint32_t ChunkIndex = ChunkIt->second; + const uint64_t ChunkSize = RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; + if (ChunkSize == CacheDirContent.FileSizes[Index]) + { + CachedChunkHashesFound.insert({FileHash, ChunkIndex}); + CachedChunkHashesByteCountFound += ChunkSize; + continue; + } + } + else if (auto SequenceIt = RemoteLookup.RawHashToSequenceIndex.find(FileHash); + SequenceIt != RemoteLookup.RawHashToSequenceIndex.end()) + { + const uint32_t SequenceIndex = SequenceIt->second; + const uint32_t PathIndex = RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; + const uint64_t SequenceSize = RemoteContent.RawSizes[PathIndex]; + if (SequenceSize == CacheDirContent.FileSizes[Index]) + { + CachedSequenceHashesFound.insert({FileHash, SequenceIndex}); + CachedSequenceHashesByteCountFound += SequenceSize; + continue; + } + } + } + std::filesystem::remove(CacheDirContent.Files[Index]); } - else + } + + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedBlocksFound; + uint64_t CachedBlocksByteCountFound = 0; + { + ZEN_TRACE_CPU("UpdateFolder_CheckBlockCache"); + + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> AllBlockSizes; + AllBlockSizes.reserve(BlockDescriptions.size()); + for (uint32_t BlockIndex = 0; BlockIndex < BlockDescriptions.size(); BlockIndex++) { - SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]; + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + AllBlockSizes.insert({BlockDescription.BlockHash, BlockIndex}); + } + + DirectoryContent BlockDirContent; + GetDirectoryContent(Path / ZenTempBlockFolderName, + DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeFileSizes, + BlockDirContent); + CachedBlocksFound.reserve(BlockDirContent.Files.size()); + for (size_t Index = 0; Index < BlockDirContent.Files.size(); Index++) + { + IoHash FileHash; + if (IoHash::TryParse(BlockDirContent.Files[Index].filename().string(), FileHash)) + { + if (auto BlockIt = AllBlockSizes.find(FileHash); BlockIt != AllBlockSizes.end()) + { + const uint32_t BlockIndex = BlockIt->second; + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + uint64_t BlockSize = CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize; + for (uint64_t ChunkSize : BlockDescription.ChunkCompressedLengths) + { + BlockSize += ChunkSize; + } + + if (BlockSize == BlockDirContent.FileSizes[Index]) + { + CachedBlocksFound.insert({FileHash, BlockIndex}); + CachedBlocksByteCountFound += BlockSize; + continue; + } + } + } + std::filesystem::remove(BlockDirContent.Files[Index]); } } + std::vector<uint32_t> LocalPathIndexesMatchingSequenceIndexes; + uint64_t LocalPathIndexesByteCountMatchingSequenceIndexes = 0; + // Pick up all whole files we can use from current local state + { + ZEN_TRACE_CPU("UpdateFolder_CheckLocalChunks"); + for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < RemoteContent.ChunkedContent.SequenceRawHashes.size(); + RemoteSequenceIndex++) + { + const IoHash& RemoteSequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + if (auto CacheSequenceIt = CachedSequenceHashesFound.find(RemoteSequenceRawHash); + CacheSequenceIt != CachedSequenceHashesFound.end()) + { + // const uint32_t RemoteSequenceIndex = CacheSequenceIt->second; + // const uint32_t RemotePathIndex = GetFirstPathIndexForSeqeuenceIndex(RemoteLookup, RemoteSequenceIndex); + // RemoteSequenceByteCountFoundInCache += RemoteContent.RawSizes[RemotePathIndex]; + } + else if (auto CacheChunkIt = CachedChunkHashesFound.find(RemoteSequenceRawHash); + CacheChunkIt != CachedChunkHashesFound.end()) + { + // const uint32_t RemoteChunkIndex = CacheChunkIt->second; + // const uint32_t RemotePathIndex = GetFirstPathIndexForSeqeuenceIndex(RemoteLookup, RemoteSequenceIndex); + // RemoteSequenceByteCountFoundInCache += RemoteContent.RawSizes[RemotePathIndex]; + } + else if (auto It = LocalLookup.RawHashToSequenceIndex.find(RemoteSequenceRawHash); + It != LocalLookup.RawHashToSequenceIndex.end()) + { + const uint32_t LocalSequenceIndex = It->second; + const uint32_t LocalPathIndex = GetFirstPathIndexForSeqeuenceIndex(LocalLookup, LocalSequenceIndex); + uint64_t RawSize = LocalContent.RawSizes[LocalPathIndex]; + LocalPathIndexesMatchingSequenceIndexes.push_back(LocalPathIndex); + LocalPathIndexesByteCountMatchingSequenceIndexes += RawSize; + } + else + { + // We must write the sequence + SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = + RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]; + } + } + } // Pick up all chunks in current local state struct CacheCopyData { @@ -4063,75 +4265,103 @@ namespace { tsl::robin_map<IoHash, size_t, IoHash::Hasher> RawHashToCacheCopyDataIndex; std::vector<CacheCopyData> CacheCopyDatas; + uint64_t LocalChunkHashesMatchingRemoteCount = 0; + uint64_t LocalChunkHashesMatchingRemoteByteCount = 0; - for (uint32_t LocalSequenceIndex = 0; LocalSequenceIndex < LocalContent.ChunkedContent.SequenceRawHashes.size(); - LocalSequenceIndex++) { - const IoHash& LocalSequenceRawHash = LocalContent.ChunkedContent.SequenceRawHashes[LocalSequenceIndex]; - const uint32_t LocalOrderOffset = LocalLookup.SequenceIndexChunkOrderOffset[LocalSequenceIndex]; + ZEN_TRACE_CPU("UpdateFolder_GetLocalChunks"); + for (uint32_t LocalSequenceIndex = 0; LocalSequenceIndex < LocalContent.ChunkedContent.SequenceRawHashes.size(); + LocalSequenceIndex++) { - uint64_t SourceOffset = 0; - const uint32_t LocalChunkCount = LocalContent.ChunkedContent.ChunkCounts[LocalSequenceIndex]; - for (uint32_t LocalOrderIndex = 0; LocalOrderIndex < LocalChunkCount; LocalOrderIndex++) - { - const uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[LocalOrderOffset + LocalOrderIndex]; - const IoHash& LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; - const uint64_t LocalChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[LocalChunkIndex]; + const IoHash& LocalSequenceRawHash = LocalContent.ChunkedContent.SequenceRawHashes[LocalSequenceIndex]; + const uint32_t LocalOrderOffset = LocalLookup.SequenceIndexChunkOrderOffset[LocalSequenceIndex]; - if (auto RemoteChunkIt = RemoteLookup.ChunkHashToChunkIndex.find(LocalChunkHash); - RemoteChunkIt != RemoteLookup.ChunkHashToChunkIndex.end()) + { + uint64_t SourceOffset = 0; + const uint32_t LocalChunkCount = LocalContent.ChunkedContent.ChunkCounts[LocalSequenceIndex]; + for (uint32_t LocalOrderIndex = 0; LocalOrderIndex < LocalChunkCount; LocalOrderIndex++) { - const uint32_t RemoteChunkIndex = RemoteChunkIt->second; - if (!RemoteChunkIndexIsCachedFlags[RemoteChunkIndex]) - { - std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = - GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex); + const uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[LocalOrderOffset + LocalOrderIndex]; + const IoHash& LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; + const uint64_t LocalChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[LocalChunkIndex]; - if (!ChunkTargetPtrs.empty()) + if (auto RemoteChunkIt = RemoteLookup.ChunkHashToChunkIndex.find(LocalChunkHash); + RemoteChunkIt != RemoteLookup.ChunkHashToChunkIndex.end()) + { + const uint32_t RemoteChunkIndex = RemoteChunkIt->second; + if (!RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) { - CacheCopyData::ChunkTarget Target = { - .TargetChunkLocationCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()), - .ChunkRawSize = LocalChunkRawSize, - .CacheFileOffset = SourceOffset}; - if (auto CopySourceIt = RawHashToCacheCopyDataIndex.find(LocalSequenceRawHash); - CopySourceIt != RawHashToCacheCopyDataIndex.end()) - { - CacheCopyData& Data = CacheCopyDatas[CopySourceIt->second]; - Data.TargetChunkLocationPtrs.insert(Data.TargetChunkLocationPtrs.end(), - ChunkTargetPtrs.begin(), - ChunkTargetPtrs.end()); - Data.ChunkTargets.push_back(Target); - } - else + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = + GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex); + + if (!ChunkTargetPtrs.empty()) { - RawHashToCacheCopyDataIndex.insert_or_assign(LocalSequenceRawHash, CacheCopyDatas.size()); - CacheCopyDatas.push_back( - CacheCopyData{.LocalSequenceIndex = LocalSequenceIndex, - .TargetChunkLocationPtrs = ChunkTargetPtrs, - .ChunkTargets = std::vector<CacheCopyData::ChunkTarget>{Target}}); + CacheCopyData::ChunkTarget Target = { + .TargetChunkLocationCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()), + .ChunkRawSize = LocalChunkRawSize, + .CacheFileOffset = SourceOffset}; + if (auto CopySourceIt = RawHashToCacheCopyDataIndex.find(LocalSequenceRawHash); + CopySourceIt != RawHashToCacheCopyDataIndex.end()) + { + CacheCopyData& Data = CacheCopyDatas[CopySourceIt->second]; + if (Data.TargetChunkLocationPtrs.size() > 1024) + { + RawHashToCacheCopyDataIndex.insert_or_assign(LocalSequenceRawHash, CacheCopyDatas.size()); + CacheCopyDatas.push_back( + CacheCopyData{.LocalSequenceIndex = LocalSequenceIndex, + .TargetChunkLocationPtrs = ChunkTargetPtrs, + .ChunkTargets = std::vector<CacheCopyData::ChunkTarget>{Target}}); + } + else + { + Data.TargetChunkLocationPtrs.insert(Data.TargetChunkLocationPtrs.end(), + ChunkTargetPtrs.begin(), + ChunkTargetPtrs.end()); + Data.ChunkTargets.push_back(Target); + } + } + else + { + RawHashToCacheCopyDataIndex.insert_or_assign(LocalSequenceRawHash, CacheCopyDatas.size()); + CacheCopyDatas.push_back( + CacheCopyData{.LocalSequenceIndex = LocalSequenceIndex, + .TargetChunkLocationPtrs = ChunkTargetPtrs, + .ChunkTargets = std::vector<CacheCopyData::ChunkTarget>{Target}}); + } + LocalChunkHashesMatchingRemoteByteCount += LocalChunkRawSize; + LocalChunkHashesMatchingRemoteCount++; + RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex] = true; } - CacheMappedBytesForReuse += LocalChunkRawSize; - RemoteChunkIndexIsCachedFlags[RemoteChunkIndex] = true; } } + SourceOffset += LocalChunkRawSize; } - SourceOffset += LocalChunkRawSize; } } } - if (CacheMappedBytesForReuse > 0) + if (!CachedSequenceHashesFound.empty() || !CachedChunkHashesFound.empty() || !CachedBlocksFound.empty() || + !LocalPathIndexesMatchingSequenceIndexes.empty() || LocalChunkHashesMatchingRemoteCount > 0) { - ZEN_CONSOLE("Mapped {} cached data for reuse in {}", - NiceBytes(CacheMappedBytesForReuse), - NiceTimeSpanMs(CacheMappingTimer.GetElapsedTimeMs())); + ZEN_CONSOLE( + "Cache: {} ({}) chunk sequences, {} ({}) chunks, {} ({}) blocks. Local state: {} ({}) chunk sequences, {} ({}) chunks", + CachedSequenceHashesFound.size(), + NiceBytes(CachedSequenceHashesByteCountFound), + CachedChunkHashesFound.size(), + NiceBytes(CachedChunkHashesByteCountFound), + CachedBlocksFound.size(), + NiceBytes(CachedBlocksByteCountFound), + LocalPathIndexesMatchingSequenceIndexes.size(), + NiceBytes(LocalPathIndexesByteCountMatchingSequenceIndexes), + LocalChunkHashesMatchingRemoteCount, + NiceBytes(LocalChunkHashesMatchingRemoteByteCount)); } uint32_t ChunkCountToWrite = 0; for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++) { - if (RemoteChunkIndexIsCachedFlags[RemoteChunkIndex]) + if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) { ChunkCountToWrite++; } @@ -4154,7 +4384,7 @@ namespace { std::atomic<size_t> WritePartsComplete = 0; { - ZEN_TRACE_CPU("HandleChunks"); + ZEN_TRACE_CPU("WriteChunks"); Stopwatch WriteTimer; @@ -4169,17 +4399,21 @@ namespace { std::atomic<uint64_t> BytesDownloaded = 0; - for (const IoHash ChunkHash : LooseChunkHashes) + struct LooseChunkHashWorkData { - if (AbortFlag) - { - break; - } + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs; + uint32_t RemoteChunkIndex; + }; + std::vector<LooseChunkHashWorkData> LooseChunkHashWorks; + TotalPartWriteCount += CacheCopyDatas.size(); + + for (const IoHash ChunkHash : LooseChunkHashes) + { auto RemoteChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); ZEN_ASSERT(RemoteChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end()); const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second; - if (RemoteChunkIndexIsCachedFlags[RemoteChunkIndex]) + if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) { ZEN_DEBUG("Skipping chunk {} due to cache reuse", ChunkHash); continue; @@ -4198,173 +4432,192 @@ namespace { { TotalRequestCount++; TotalPartWriteCount++; - Work.ScheduleWork( - NetworkPool, // GetSyncWorkerPool(),// - [&, ChunkHash, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) { - ZEN_TRACE_CPU("UpdateFolder_LooseChunk"); + LooseChunkHashWorks.push_back( + LooseChunkHashWorkData{.ChunkTargetPtrs = ChunkTargetPtrs, .RemoteChunkIndex = RemoteChunkIndex}); + } + } + } - if (!AbortFlag) + uint32_t BlockCount = gsl::narrow<uint32_t>(BlockDescriptions.size()); + + std::vector<bool> ChunkIsPickedUpByBlock(RemoteContent.ChunkedContent.ChunkHashes.size(), false); + auto GetNeededChunkBlockIndexes = [&RemoteContent, + &RemoteLookup, + &RemoteChunkIndexNeedsCopyFromSourceFlags, + &ChunkIsPickedUpByBlock](const ChunkBlockDescription& BlockDescription) { + ZEN_TRACE_CPU("UpdateFolder_GetNeededChunkBlockIndexes"); + std::vector<uint32_t> NeededBlockChunkIndexes; + for (uint32_t ChunkBlockIndex = 0; ChunkBlockIndex < BlockDescription.ChunkRawHashes.size(); ChunkBlockIndex++) + { + const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex]; + if (auto It = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); It != RemoteLookup.ChunkHashToChunkIndex.end()) + { + const uint32_t RemoteChunkIndex = It->second; + if (!ChunkIsPickedUpByBlock[RemoteChunkIndex]) + { + if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex]) + { + ChunkIsPickedUpByBlock[RemoteChunkIndex] = true; + NeededBlockChunkIndexes.push_back(ChunkBlockIndex); + } + } + } + } + return NeededBlockChunkIndexes; + }; + + std::vector<uint32_t> CachedChunkBlockIndexes; + + struct BlockRangeDescriptor + { + uint32_t BlockIndex = (uint32_t)-1; + uint64_t RangeStart = 0; + uint64_t RangeLength = 0; + uint32_t ChunkBlockIndexStart = 0; + uint32_t ChunkBlockIndexCount = 0; + }; + std::vector<BlockRangeDescriptor> BlockRangeWorks; + + std::vector<uint32_t> FullBlockWorks; + + size_t BlocksNeededCount = 0; + uint64_t AllBlocksSize = 0; + uint64_t AllBlocksFetch = 0; + uint64_t AllBlocksSlack = 0; + uint64_t AllBlockRequests = 0; + uint64_t AllBlockChunksSize = 0; + for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; BlockIndex++) + { + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + const std::vector<uint32_t> BlockChunkIndexNeeded = GetNeededChunkBlockIndexes(BlockDescription); + if (!BlockChunkIndexNeeded.empty()) + { + bool UsingCachedBlock = false; + if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end()) + { + ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_CacheGet"); + + TotalPartWriteCount++; + + std::filesystem::path BlockPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString(); + if (std::filesystem::exists(BlockPath)) + { + CachedChunkBlockIndexes.push_back(BlockIndex); + UsingCachedBlock = true; + } + } + + if (!UsingCachedBlock) + { + bool WantsToDoPartialBlockDownload = BlockChunkIndexNeeded.size() < BlockDescription.ChunkRawHashes.size(); + bool CanDoPartialBlockDownload = + (BlockDescription.HeaderSize > 0) && + (BlockDescription.ChunkCompressedLengths.size() == BlockDescription.ChunkRawHashes.size()); + if (AllowPartialBlockRequests && WantsToDoPartialBlockDownload && CanDoPartialBlockDownload) + { + std::vector<BlockRangeDescriptor> BlockRanges; + + ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_PartialAnalysis"); + + uint32_t NeedBlockChunkIndexOffset = 0; + uint32_t ChunkBlockIndex = 0; + uint32_t CurrentOffset = + gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); + + BlockRangeDescriptor NextRange{.BlockIndex = BlockIndex}; + while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() && + ChunkBlockIndex < BlockDescription.ChunkRawHashes.size()) + { + const uint32_t ChunkCompressedLength = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex]; + if (ChunkBlockIndex < BlockChunkIndexNeeded[NeedBlockChunkIndexOffset]) { - FilteredDownloadedBytesPerSecond.Start(); - if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize) + if (NextRange.RangeLength > 0) { - DownloadLargeBlob( - Storage, - Path / ZenTempChunkFolderName, - CacheFolderPath, - RemoteContent, - RemoteLookup, - BuildId, - ChunkHash, - PreferredMultipartChunkSize, - ChunkTargetPtrs, - SequenceIndexChunksLeftToWriteCounters, - Work, - WritePool, - NetworkPool, - WriteToDiskBytes, - BytesDownloaded, - MultipartAttachmentCount, - [&](uint64_t BytesDownloaded) { - LooseChunksBytes += BytesDownloaded; - RequestsComplete++; - if (RequestsComplete == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - }, - [&]() { - ChunkCountWritten++; - WritePartsComplete++; - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); - } - }); + BlockRanges.push_back(NextRange); + NextRange = {.BlockIndex = BlockIndex}; } - else + ChunkBlockIndex++; + CurrentOffset += ChunkCompressedLength; + } + else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset]) + { + AllBlockChunksSize += ChunkCompressedLength; + if (NextRange.RangeLength == 0) { - IoBuffer CompressedPart = Storage.GetBuildBlob(BuildId, ChunkHash); - if (!CompressedPart) - { - throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash)); - } - BytesDownloaded += CompressedPart.GetSize(); - LooseChunksBytes += CompressedPart.GetSize(); - RequestsComplete++; - if (RequestsComplete == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - CompositeBuffer Payload = WriteToTempFileIfNeeded(CompositeBuffer(std::move(CompressedPart)), - Path / ZenTempChunkFolderName, - ChunkHash); - DownloadedChunks++; + NextRange.RangeStart = CurrentOffset; + NextRange.ChunkBlockIndexStart = ChunkBlockIndex; + } + NextRange.RangeLength += ChunkCompressedLength; + NextRange.ChunkBlockIndexCount++; + ChunkBlockIndex++; + CurrentOffset += ChunkCompressedLength; + NeedBlockChunkIndexOffset++; + } + else + { + ZEN_ASSERT(false); + } + } + AllBlocksSize += CurrentOffset; + if (NextRange.RangeLength > 0) + { + BlockRanges.push_back(NextRange); + } - if (!AbortFlag) - { - Work.ScheduleWork( - WritePool, - [&, ChunkHash, RemoteChunkIndex, ChunkTargetPtrs, CompressedPart = std::move(Payload)]( - std::atomic<bool>&) { - ZEN_TRACE_CPU("UpdateFolder_WriteBlob"); + ZEN_ASSERT(!BlockRanges.empty()); + std::vector<BlockRangeDescriptor> CollapsedBlockRanges; + auto It = BlockRanges.begin(); + CollapsedBlockRanges.push_back(*It++); + uint64_t TotalSlack = 0; + while (It != BlockRanges.end()) + { + BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back(); + uint64_t Slack = It->RangeStart - (LastRange.RangeStart + LastRange.RangeLength); + uint64_t BothRangeSize = It->RangeLength + LastRange.RangeLength; + if (Slack <= Max(BothRangeSize / 8, 64u * 1024u)) // Made up heuristic - we'll see how it pans out + { + LastRange.ChunkBlockIndexCount = + (It->ChunkBlockIndexStart + It->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart; + LastRange.RangeLength = (It->RangeStart + It->RangeLength) - LastRange.RangeStart; + TotalSlack += Slack; + } + else + { + CollapsedBlockRanges.push_back(*It); + } + ++It; + } - if (!AbortFlag) - { - FilteredWrittenBytesPerSecond.Start(); + uint64_t TotalFetch = 0; + for (const BlockRangeDescriptor& Range : CollapsedBlockRanges) + { + TotalFetch += Range.RangeLength; + } - bool NeedHashVerify = true; - if (CanStreamDecompress(RemoteContent, ChunkTargetPtrs)) - { - const IoHash& SequenceRawHash = - RemoteContent.ChunkedContent - .SequenceRawHashes[ChunkTargetPtrs.front()->SequenceIndex]; - StreamDecompress(CacheFolderPath, - SequenceRawHash, - CompositeBuffer(CompressedPart), - WriteToDiskBytes); - ChunkCountWritten++; - WritePartsComplete++; - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); - } - NeedHashVerify = false; - } - else - { - SharedBuffer Chunk = - Decompress(CompressedPart, - ChunkHash, - RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex]); - - { - WriteFileCache OpenFileCache; - WriteChunkToDisk(CacheFolderPath, - RemoteContent, - RemoteLookup, - ChunkTargetPtrs, - CompositeBuffer(Chunk), - OpenFileCache, - WriteToDiskBytes); - ChunkCountWritten++; - WritePartsComplete++; - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); - } - } - } - if (!AbortFlag) - { - WritePartsComplete++; - - // Write tracking, updating this must be done without any files open - // (WriteFileCache) - for (const ChunkedContentLookup::ChunkSequenceLocation* Location : - ChunkTargetPtrs) - { - const uint32_t RemoteSequenceIndex = Location->SequenceIndex; - if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub( - 1) == 1) - { - const IoHash& SequenceRawHash = - RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; - if (NeedHashVerify) - { - ZEN_TRACE_CPU("UpdateFolder_VerifyHash"); - - const IoHash VerifyChunkHash = - IoHash::HashBuffer(IoBufferBuilder::MakeFromFile( - GetTempChunkedSequenceFileName(CacheFolderPath, - SequenceRawHash))); - if (VerifyChunkHash != SequenceRawHash) - { - throw std::runtime_error( - fmt::format("Written chunk sequence {} hash does not match " - "expected hash {}", - VerifyChunkHash, - SequenceRawHash)); - } - } - - ZEN_TRACE_CPU("UpdateFolder_rename"); - std::filesystem::rename( - GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash), - GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)); - } - } - } - } - }, - Work.DefaultErrorFunction()); - } - } - } - }, - Work.DefaultErrorFunction()); + AllBlocksFetch += TotalFetch; + AllBlocksSlack += TotalSlack; + BlocksNeededCount++; + AllBlockRequests += CollapsedBlockRanges.size(); + + TotalRequestCount += CollapsedBlockRanges.size(); + TotalPartWriteCount += CollapsedBlockRanges.size(); + + BlockRangeWorks.insert(BlockRangeWorks.end(), CollapsedBlockRanges.begin(), CollapsedBlockRanges.end()); + } + else + { + BlocksNeededCount++; + TotalRequestCount++; + TotalPartWriteCount++; + + FullBlockWorks.push_back(BlockIndex); + } } } + else + { + ZEN_DEBUG("Skipping block {} due to cache reuse", BlockDescriptions[BlockIndex].BlockHash); + } } for (size_t CopyDataIndex = 0; CopyDataIndex < CacheCopyDatas.size(); CopyDataIndex++) @@ -4374,14 +4627,12 @@ namespace { break; } - TotalPartWriteCount++; - Work.ScheduleWork( WritePool, // GetSyncWorkerPool(),// [&, CopyDataIndex](std::atomic<bool>&) { if (!AbortFlag) { - ZEN_TRACE_CPU("UpdateFolder_Copy"); + ZEN_TRACE_CPU("UpdateFolder_CopyLocal"); FilteredWrittenBytesPerSecond.Start(); const CacheCopyData& CopyData = CacheCopyDatas[CopyDataIndex]; @@ -4403,39 +4654,45 @@ namespace { }; std::vector<WriteOp> WriteOps; - WriteOps.reserve(AllTargets.size()); - for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets) + if (!AbortFlag) { - std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> TargetRange = - AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount); - for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange) + ZEN_TRACE_CPU("Sort"); + WriteOps.reserve(AllTargets.size()); + for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets) { - WriteOps.push_back(WriteOp{.Target = Target, - .CacheFileOffset = ChunkTarget.CacheFileOffset, - .ChunkSize = ChunkTarget.ChunkRawSize}); + std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> TargetRange = + AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount); + for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange) + { + WriteOps.push_back(WriteOp{.Target = Target, + .CacheFileOffset = ChunkTarget.CacheFileOffset, + .ChunkSize = ChunkTarget.ChunkRawSize}); + } + TargetStart += ChunkTarget.TargetChunkLocationCount; } - TargetStart += ChunkTarget.TargetChunkLocationCount; - } - std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) { - if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex) - { - return true; - } - else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex) - { + std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) { + if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex) + { + return true; + } + else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex) + { + return false; + } + if (Lhs.Target->Offset < Rhs.Target->Offset) + { + return true; + } return false; - } - if (Lhs.Target->Offset < Rhs.Target->Offset) - { - return true; - } - return false; - }); + }); + } if (!AbortFlag) { + ZEN_TRACE_CPU("Write"); + BufferedOpenFile SourceFile(LocalFilePath); WriteFileCache OpenFileCache; for (const WriteOp& Op : WriteOps) @@ -4473,23 +4730,26 @@ namespace { // Write tracking, updating this must be done without any files open (WriteFileCache) for (const WriteOp& Op : WriteOps) { - ZEN_TRACE_CPU("UpdateFolder_Copy_VerifyHash"); - const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex; if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1) { const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; - const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile( - GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash))); - if (VerifyChunkHash != SequenceRawHash) { - throw std::runtime_error( - fmt::format("Written chunk sequence {} hash does not match expected hash {}", - VerifyChunkHash, - SequenceRawHash)); + ZEN_TRACE_CPU("VerifyHash"); + const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile( + GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash))); + if (VerifyChunkHash != SequenceRawHash) + { + throw std::runtime_error( + fmt::format("Written chunk sequence {} hash does not match expected hash {}", + VerifyChunkHash, + SequenceRawHash)); + } } - ZEN_TRACE_CPU("UpdateFolder_Copy_rename"); + ZEN_TRACE_CPU("rename"); + ZEN_ASSERT_SLOW( + !std::filesystem::exists(GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash))); std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash), GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)); } @@ -4508,300 +4768,643 @@ namespace { Work.DefaultErrorFunction()); } - size_t BlockCount = BlockDescriptions.size(); - - std::vector<bool> ChunkIsPickedUpByBlock(RemoteContent.ChunkedContent.ChunkHashes.size(), false); - auto GetNeededChunkBlockIndexes = [&RemoteContent, - &RemoteLookup, - &RemoteChunkIndexNeedsCopyFromSourceFlags, - &ChunkIsPickedUpByBlock](const ChunkBlockDescription& BlockDescription) { - std::vector<uint32_t> NeededBlockChunkIndexes; - for (uint32_t ChunkBlockIndex = 0; ChunkBlockIndex < BlockDescription.ChunkRawHashes.size(); ChunkBlockIndex++) - { - const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex]; - if (auto It = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); It != RemoteLookup.ChunkHashToChunkIndex.end()) - { - const uint32_t RemoteChunkIndex = It->second; - if (!ChunkIsPickedUpByBlock[RemoteChunkIndex]) - { - if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex]) - { - ChunkIsPickedUpByBlock[RemoteChunkIndex] = true; - NeededBlockChunkIndexes.push_back(ChunkBlockIndex); - } - } - } - } - return NeededBlockChunkIndexes; - }; - - size_t BlocksNeededCount = 0; - uint64_t AllBlocksSize = 0; - uint64_t AllBlocksFetch = 0; - uint64_t AllBlocksSlack = 0; - uint64_t AllBlockRequests = 0; - uint64_t AllBlockChunksSize = 0; - for (size_t BlockIndex = 0; BlockIndex < BlockCount; BlockIndex++) + for (uint32_t LooseChunkHashWorkIndex = 0; LooseChunkHashWorkIndex < LooseChunkHashWorks.size(); LooseChunkHashWorkIndex++) { - if (Work.IsAborted()) + if (AbortFlag) { break; } - const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; - const std::vector<uint32_t> BlockChunkIndexNeeded = GetNeededChunkBlockIndexes(BlockDescription); - if (!BlockChunkIndexNeeded.empty()) - { - bool WantsToDoPartialBlockDownload = BlockChunkIndexNeeded.size() < BlockDescription.ChunkRawHashes.size(); - bool CanDoPartialBlockDownload = (BlockDescription.HeaderSize > 0) && (BlockDescription.ChunkCompressedLengths.size() == - BlockDescription.ChunkRawHashes.size()); - if (AllowPartialBlockRequests && WantsToDoPartialBlockDownload && CanDoPartialBlockDownload) - { - struct BlockRangeDescriptor - { - uint64_t RangeStart = 0; - uint64_t RangeLength = 0; - uint32_t ChunkBlockIndexStart = 0; - uint32_t ChunkBlockIndexCount = 0; - }; - std::vector<BlockRangeDescriptor> BlockRanges; - - ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_PartialAnalisys"); - - uint32_t NeedBlockChunkIndexOffset = 0; - uint32_t ChunkBlockIndex = 0; - uint32_t CurrentOffset = - gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); - - BlockRangeDescriptor NextRange; - while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() && - ChunkBlockIndex < BlockDescription.ChunkRawHashes.size()) + + LooseChunkHashWorkData& LooseChunkHashWork = LooseChunkHashWorks[LooseChunkHashWorkIndex]; + + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = + std::move(LooseChunkHashWork.ChunkTargetPtrs); + const uint32_t RemoteChunkIndex = LooseChunkHashWork.RemoteChunkIndex; + + Work.ScheduleWork( + NetworkPool, // NetworkPool, // GetSyncWorkerPool(),// + [&, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) { + if (!AbortFlag) { - const uint32_t ChunkCompressedLength = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex]; - if (ChunkBlockIndex < BlockChunkIndexNeeded[NeedBlockChunkIndexOffset]) - { - if (NextRange.RangeLength > 0) - { - BlockRanges.push_back(NextRange); - NextRange = {}; - } - ChunkBlockIndex++; - CurrentOffset += ChunkCompressedLength; - } - else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset]) + std::filesystem::path ExistingCompressedChunkPath; { - AllBlockChunksSize += ChunkCompressedLength; - if (NextRange.RangeLength == 0) + const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + std::filesystem::path CompressedChunkPath = Path / ZenTempDownloadFolderName / ChunkHash.ToHexString(); + if (std::filesystem::exists(CompressedChunkPath)) { - NextRange.RangeStart = CurrentOffset; - NextRange.ChunkBlockIndexStart = ChunkBlockIndex; + IoBuffer ExistingCompressedPart = IoBufferBuilder::MakeFromFile(ExistingCompressedChunkPath); + if (ExistingCompressedPart) + { + IoHash RawHash; + uint64_t RawSize; + if (CompressedBuffer::ValidateCompressedHeader(ExistingCompressedPart, RawHash, RawSize)) + { + LooseChunksBytes += ExistingCompressedPart.GetSize(); + RequestsComplete++; + if (RequestsComplete == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + ExistingCompressedChunkPath = std::move(CompressedChunkPath); + } + else + { + std::error_code DummyEc; + std::filesystem::remove(CompressedChunkPath, DummyEc); + } + } } - NextRange.RangeLength += ChunkCompressedLength; - NextRange.ChunkBlockIndexCount++; - ChunkBlockIndex++; - CurrentOffset += ChunkCompressedLength; - NeedBlockChunkIndexOffset++; } - else + if (!ExistingCompressedChunkPath.empty()) { - ZEN_ASSERT(false); - } - } - AllBlocksSize += CurrentOffset; - if (NextRange.RangeLength > 0) - { - BlockRanges.push_back(NextRange); - NextRange = {}; - } + Work.ScheduleWork( + WritePool, // WritePool, GetSyncWorkerPool() + [&Path, + &RemoteContent, + &RemoteLookup, + &CacheFolderPath, + &SequenceIndexChunksLeftToWriteCounters, + &WriteToDiskBytes, + &ChunkCountWritten, + &WritePartsComplete, + &TotalPartWriteCount, + &FilteredWrittenBytesPerSecond, + RemoteChunkIndex, + ChunkTargetPtrs, + CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>&) mutable { + if (!AbortFlag) + { + ZEN_TRACE_CPU("UpdateFolder_WritePreDownloaded"); - ZEN_ASSERT(!BlockRanges.empty()); - std::vector<BlockRangeDescriptor> CollapsedBlockRanges; - auto It = BlockRanges.begin(); - CollapsedBlockRanges.push_back(*It++); - uint64_t TotalSlack = 0; - while (It != BlockRanges.end()) - { - BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back(); - uint64_t Slack = It->RangeStart - (LastRange.RangeStart + LastRange.RangeLength); - uint64_t BothRangeSize = It->RangeLength + LastRange.RangeLength; - if (Slack <= Max(BothRangeSize / 8, 64u * 1024u)) // Made up heuristic - we'll see how it pans out - { - LastRange.ChunkBlockIndexCount = - (It->ChunkBlockIndexStart + It->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart; - LastRange.RangeLength = (It->RangeStart + It->RangeLength) - LastRange.RangeStart; - TotalSlack += Slack; + FilteredWrittenBytesPerSecond.Start(); + + const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + + IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); + if (!CompressedPart) + { + throw std::runtime_error(fmt::format("Could not open dowloaded compressed chunk {} from {}", + ChunkHash, + CompressedChunkPath)); + } + + std::filesystem::path TargetFolder = Path / ZenTempCacheFolderName; + bool NeedHashVerify = WriteCompressedChunk(TargetFolder, + RemoteContent, + RemoteLookup, + ChunkHash, + ChunkTargetPtrs, + std::move(CompressedPart), + WriteToDiskBytes); + + if (!AbortFlag) + { + ChunkCountWritten++; + WritePartsComplete++; + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } + + std::filesystem::remove(CompressedChunkPath); + + CompleteChunkTargets(TargetFolder, + RemoteContent, + ChunkHash, + ChunkTargetPtrs, + SequenceIndexChunksLeftToWriteCounters, + NeedHashVerify); + } + } + }, + Work.DefaultErrorFunction()); } else { - CollapsedBlockRanges.push_back(*It); - } - ++It; - } - - uint64_t TotalFetch = 0; - for (const BlockRangeDescriptor& Range : CollapsedBlockRanges) - { - TotalFetch += Range.RangeLength; - } - - AllBlocksFetch += TotalFetch; - AllBlocksSlack += TotalSlack; - BlocksNeededCount++; - AllBlockRequests += CollapsedBlockRanges.size(); + FilteredDownloadedBytesPerSecond.Start(); + const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize) + { + ZEN_TRACE_CPU("UpdateFolder_GetLargeChunk"); + DownloadLargeBlob( + Storage, + Path, + RemoteContent, + RemoteLookup, + BuildId, + ChunkHash, + PreferredMultipartChunkSize, + ChunkTargetPtrs, + SequenceIndexChunksLeftToWriteCounters, + Work, + WritePool, + NetworkPool, + WriteToDiskBytes, + BytesDownloaded, + MultipartAttachmentCount, + [&](uint64_t BytesDownloaded) { + LooseChunksBytes += BytesDownloaded; + RequestsComplete++; + if (RequestsComplete == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + }, + [&]() { FilteredWrittenBytesPerSecond.Start(); }, + [&]() { + ChunkCountWritten++; + WritePartsComplete++; + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } + }); + } + else + { + ZEN_TRACE_CPU("UpdateFolder_GetChunk"); - for (size_t BlockRangeIndex = 0; BlockRangeIndex < CollapsedBlockRanges.size(); BlockRangeIndex++) - { - TotalRequestCount++; - TotalPartWriteCount++; - const BlockRangeDescriptor BlockRange = CollapsedBlockRanges[BlockRangeIndex]; - // Partial block schedule - Work.ScheduleWork( - NetworkPool, // NetworkPool, // GetSyncWorkerPool() - [&, BlockIndex, BlockRange](std::atomic<bool>&) { - ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_PartialGet"); - const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; - - FilteredDownloadedBytesPerSecond.Start(); - IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, - BlockDescription.BlockHash, - BlockRange.RangeStart, - BlockRange.RangeLength); - if (!BlockBuffer) + IoBuffer BuildBlob = Storage.GetBuildBlob(BuildId, ChunkHash); + if (!BuildBlob) { - throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); + throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash)); } - BytesDownloaded += BlockBuffer.GetSize(); - BlockBytes += BlockBuffer.GetSize(); - DownloadedBlocks++; - CompositeBuffer Payload = WriteToTempFileIfNeeded(CompositeBuffer(std::move(BlockBuffer)), - Path / ZenTempBlockFolderName, - BlockDescription.BlockHash, - fmt::format("_{}", BlockRange.RangeStart)); + uint64_t BlobSize = BuildBlob.GetSize(); + BytesDownloaded += BlobSize; + LooseChunksBytes += BlobSize; RequestsComplete++; if (RequestsComplete == TotalRequestCount) { FilteredDownloadedBytesPerSecond.Stop(); } - if (!AbortFlag) + std::filesystem::path CompressedChunkPath; + + // Check if the dowloaded file is file based and we can move it directly without rewriting it { - Work.ScheduleWork( - WritePool, // WritePool, // GetSyncWorkerPool(), - [&, BlockIndex, BlockRange, BlockPartialBuffer = std::move(Payload)](std::atomic<bool>&) { - if (!AbortFlag) + IoBufferFileReference FileRef; + if (BuildBlob.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && + (FileRef.FileChunkSize == BlobSize)) + { + ZEN_TRACE_CPU("UpdateFolder_MoveTempChunk"); + std::error_code Ec; + std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); + if (!Ec) + { + BuildBlob.SetDeleteOnClose(false); + BuildBlob = {}; + CompressedChunkPath = Path / ZenTempDownloadFolderName / ChunkHash.ToHexString(); + std::filesystem::rename(TempBlobPath, CompressedChunkPath, Ec); + if (Ec) { - const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + CompressedChunkPath = std::filesystem::path{}; - FilteredWrittenBytesPerSecond.Start(); - - if (!WritePartialBlockToDisk( - CacheFolderPath, - RemoteContent, - BlockDescription, - SequenceIndexChunksLeftToWriteCounters, - BlockPartialBuffer, - BlockRange.ChunkBlockIndexStart, - BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1, - RemoteLookup, - RemoteChunkIndexNeedsCopyFromSourceFlags, - ChunkCountWritten, - WriteToDiskBytes)) - { - throw std::runtime_error( - fmt::format("Partial block {} is malformed", BlockDescription.BlockHash)); - } - WritePartsComplete++; - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); - } + // Re-open the temp file again + BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); + BuildBlob = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlobSize, true); + BuildBlob.SetDeleteOnClose(true); } - }, - [&, BlockIndex](const std::exception& Ex, std::atomic<bool>&) { - ZEN_ERROR("Failed writing block {}. Reason: {}", - BlockDescriptions[BlockIndex].BlockHash, - Ex.what()); - AbortFlag = true; - }); + } + } } - }, - Work.DefaultErrorFunction()); - } - } - else - { - BlocksNeededCount++; - TotalRequestCount++; - TotalPartWriteCount++; - - Work.ScheduleWork( - NetworkPool, // GetSyncWorkerPool(), // NetworkPool, - [&, BlockIndex](std::atomic<bool>&) { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_Get"); - // Full block schedule - - FilteredDownloadedBytesPerSecond.Start(); - IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockDescription.BlockHash); - if (!BlockBuffer) + if (CompressedChunkPath.empty() && (BlobSize > 512u * 1024u)) { - throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); - } - BytesDownloaded += BlockBuffer.GetSize(); - BlockBytes += BlockBuffer.GetSize(); - DownloadedBlocks++; - RequestsComplete++; - if (RequestsComplete == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); + ZEN_TRACE_CPU("UpdateFolder_WriteTempChunk"); + // Could not be moved and rather large, lets store it on disk + CompressedChunkPath = Path / ZenTempDownloadFolderName / ChunkHash.ToHexString(); + TemporaryFile::SafeWriteFile(CompressedChunkPath, BuildBlob); + BuildBlob = {}; } + DownloadedChunks++; - CompositeBuffer Payload = WriteToTempFileIfNeeded(CompositeBuffer(std::move(BlockBuffer)), - Path / ZenTempBlockFolderName, - BlockDescription.BlockHash); if (!AbortFlag) { Work.ScheduleWork( - WritePool, - [&, BlockIndex, BlockBuffer = std::move(Payload)](std::atomic<bool>&) { + WritePool, // WritePool, GetSyncWorkerPool() + [&Path, + &RemoteContent, + &RemoteLookup, + &CacheFolderPath, + &SequenceIndexChunksLeftToWriteCounters, + &WriteToDiskBytes, + &ChunkCountWritten, + &WritePartsComplete, + &TotalPartWriteCount, + &FilteredWrittenBytesPerSecond, + RemoteChunkIndex, + ChunkTargetPtrs, + CompressedChunkPath, + CompressedPart = std::move(BuildBlob)](std::atomic<bool>&) mutable { if (!AbortFlag) { - const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + ZEN_TRACE_CPU("UpdateFolder_WriteChunk"); FilteredWrittenBytesPerSecond.Start(); - if (!WriteBlockToDisk(CacheFolderPath, - RemoteContent, - BlockDescription, - SequenceIndexChunksLeftToWriteCounters, - BlockBuffer, - RemoteLookup, - RemoteChunkIndexNeedsCopyFromSourceFlags, - ChunkCountWritten, - WriteToDiskBytes)) + + const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + if (CompressedChunkPath.empty()) + { + ZEN_ASSERT(CompressedPart); + } + else { - throw std::runtime_error( - fmt::format("Block {} is malformed", BlockDescription.BlockHash)); + ZEN_ASSERT(!CompressedPart); + CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); + if (!CompressedPart) + { + throw std::runtime_error( + fmt::format("Could not open dowloaded compressed chunk {} from {}", + ChunkHash, + CompressedChunkPath)); + } } - WritePartsComplete++; - if (WritePartsComplete == TotalPartWriteCount) + + std::filesystem::path TargetFolder = Path / ZenTempCacheFolderName; + bool NeedHashVerify = WriteCompressedChunk(TargetFolder, + RemoteContent, + RemoteLookup, + ChunkHash, + ChunkTargetPtrs, + std::move(CompressedPart), + WriteToDiskBytes); + + if (!AbortFlag) { - FilteredWrittenBytesPerSecond.Stop(); + ChunkCountWritten++; + WritePartsComplete++; + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } + + if (!CompressedChunkPath.empty()) + { + std::filesystem::remove(CompressedChunkPath); + } + + CompleteChunkTargets(TargetFolder, + RemoteContent, + ChunkHash, + ChunkTargetPtrs, + SequenceIndexChunksLeftToWriteCounters, + NeedHashVerify); } } }, Work.DefaultErrorFunction()); } } - }, - Work.DefaultErrorFunction()); - } + } + } + }, + Work.DefaultErrorFunction()); + } + + for (uint32_t BlockIndex : CachedChunkBlockIndexes) + { + if (AbortFlag) + { + break; } - else + + Work.ScheduleWork( + WritePool, // GetSyncWorkerPool(), // WritePool, + [&, BlockIndex](std::atomic<bool>&) mutable { + if (!AbortFlag) + { + ZEN_TRACE_CPU("UpdateFolder_WriteCachedBlock"); + + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + FilteredWrittenBytesPerSecond.Start(); + + std::filesystem::path BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString(); + IoBuffer BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); + if (!BlockBuffer) + { + throw std::runtime_error( + fmt::format("Can not read block {} at {}", BlockDescription.BlockHash, BlockChunkPath)); + } + + if (!WriteBlockToDisk(CacheFolderPath, + RemoteContent, + BlockDescription, + SequenceIndexChunksLeftToWriteCounters, + CompositeBuffer(std::move(BlockBuffer)), + RemoteLookup, + RemoteChunkIndexNeedsCopyFromSourceFlags, + ChunkCountWritten, + WriteToDiskBytes)) + { + std::error_code DummyEc; + std::filesystem::remove(BlockChunkPath, DummyEc); + throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash)); + } + WritePartsComplete++; + std::filesystem::remove(BlockChunkPath); + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } + } + }, + Work.DefaultErrorFunction()); + } + + for (size_t BlockRangeIndex = 0; BlockRangeIndex < BlockRangeWorks.size(); BlockRangeIndex++) + { + if (AbortFlag) { - ZEN_DEBUG("Skipping block {} due to cache reuse", BlockDescriptions[BlockIndex].BlockHash); + break; } + const BlockRangeDescriptor BlockRange = BlockRangeWorks[BlockRangeIndex]; + ZEN_ASSERT(BlockRange.BlockIndex != (uint32_t)-1); + const uint32_t BlockIndex = BlockRange.BlockIndex; + Work.ScheduleWork( + NetworkPool, // NetworkPool, // GetSyncWorkerPool() + [&, BlockIndex, BlockRange](std::atomic<bool>&) { + if (!AbortFlag) + { + ZEN_TRACE_CPU("UpdateFolder_GetPartialBlock"); + + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + + FilteredDownloadedBytesPerSecond.Start(); + IoBuffer BlockBuffer = + Storage.GetBuildBlob(BuildId, BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength); + if (!BlockBuffer) + { + throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); + } + uint64_t BlockSize = BlockBuffer.GetSize(); + BytesDownloaded += BlockSize; + BlockBytes += BlockSize; + DownloadedBlocks++; + RequestsComplete++; + if (RequestsComplete == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + + std::filesystem::path BlockChunkPath; + + // Check if the dowloaded block is file based and we can move it directly without rewriting it + { + IoBufferFileReference FileRef; + if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && + (FileRef.FileChunkSize == BlockSize)) + { + ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock"); + + std::error_code Ec; + std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); + if (!Ec) + { + BlockBuffer.SetDeleteOnClose(false); + BlockBuffer = {}; + BlockChunkPath = Path / ZenTempBlockFolderName / + fmt::format("{}_{:x}_{:x}", + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength); + std::filesystem::rename(TempBlobPath, BlockChunkPath, Ec); + if (Ec) + { + BlockChunkPath = std::filesystem::path{}; + + // Re-open the temp file again + BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); + BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); + BlockBuffer.SetDeleteOnClose(true); + } + } + } + } + + if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u)) + { + ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock"); + // Could not be moved and rather large, lets store it on disk + BlockChunkPath = + Path / ZenTempBlockFolderName / + fmt::format("{}_{:x}_{:x}", BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength); + TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); + BlockBuffer = {}; + } + + if (!AbortFlag) + { + Work.ScheduleWork( + WritePool, // WritePool, // GetSyncWorkerPool(), + [&, BlockIndex, BlockRange, BlockChunkPath, BlockPartialBuffer = std::move(BlockBuffer)]( + std::atomic<bool>&) mutable { + if (!AbortFlag) + { + ZEN_TRACE_CPU("UpdateFolder_WritePartialBlock"); + + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + + if (BlockChunkPath.empty()) + { + ZEN_ASSERT(BlockPartialBuffer); + } + else + { + ZEN_ASSERT(!BlockPartialBuffer); + BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); + if (!BlockPartialBuffer) + { + throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}", + BlockDescription.BlockHash, + BlockChunkPath)); + } + } + + FilteredWrittenBytesPerSecond.Start(); + + if (!WritePartialBlockToDisk( + CacheFolderPath, + RemoteContent, + BlockDescription, + SequenceIndexChunksLeftToWriteCounters, + CompositeBuffer(std::move(BlockPartialBuffer)), + BlockRange.ChunkBlockIndexStart, + BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1, + RemoteLookup, + RemoteChunkIndexNeedsCopyFromSourceFlags, + ChunkCountWritten, + WriteToDiskBytes)) + { + std::error_code DummyEc; + std::filesystem::remove(BlockChunkPath, DummyEc); + throw std::runtime_error( + fmt::format("Partial block {} is malformed", BlockDescription.BlockHash)); + } + WritePartsComplete++; + + if (!BlockChunkPath.empty()) + { + std::filesystem::remove(BlockChunkPath); + } + + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } + } + }, + Work.DefaultErrorFunction()); + } + } + }, + Work.DefaultErrorFunction()); } + + for (uint32_t BlockIndex : FullBlockWorks) + { + if (AbortFlag) + { + break; + } + Work.ScheduleWork( + NetworkPool, // GetSyncWorkerPool(), // NetworkPool, + [&, BlockIndex](std::atomic<bool>&) { + if (!AbortFlag) + { + ZEN_TRACE_CPU("UpdateFolder_GetFullBlock"); + + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + + FilteredDownloadedBytesPerSecond.Start(); + IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockDescription.BlockHash); + if (!BlockBuffer) + { + throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); + } + uint64_t BlockSize = BlockBuffer.GetSize(); + BytesDownloaded += BlockSize; + BlockBytes += BlockSize; + DownloadedBlocks++; + RequestsComplete++; + if (RequestsComplete == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + + std::filesystem::path BlockChunkPath; + + // Check if the dowloaded block is file based and we can move it directly without rewriting it + { + IoBufferFileReference FileRef; + if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && + (FileRef.FileChunkSize == BlockSize)) + { + ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock"); + std::error_code Ec; + std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); + if (!Ec) + { + BlockBuffer.SetDeleteOnClose(false); + BlockBuffer = {}; + BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString(); + std::filesystem::rename(TempBlobPath, BlockChunkPath, Ec); + if (Ec) + { + BlockChunkPath = std::filesystem::path{}; + + // Re-open the temp file again + BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); + BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); + BlockBuffer.SetDeleteOnClose(true); + } + } + } + } + + if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u)) + { + ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock"); + // Could not be moved and rather large, lets store it on disk + BlockChunkPath = Path / ZenTempBlockFolderName / BlockDescription.BlockHash.ToHexString(); + TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); + BlockBuffer = {}; + } + + if (!AbortFlag) + { + Work.ScheduleWork( + WritePool, // WritePool, GetSyncWorkerPool() + [&RemoteContent, + &RemoteLookup, + &CacheFolderPath, + &RemoteChunkIndexNeedsCopyFromSourceFlags, + &SequenceIndexChunksLeftToWriteCounters, + BlockIndex, + &BlockDescriptions, + &ChunkCountWritten, + &WriteToDiskBytes, + &WritePartsComplete, + &TotalPartWriteCount, + &FilteredWrittenBytesPerSecond, + BlockChunkPath, + BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable { + if (!AbortFlag) + { + ZEN_TRACE_CPU("UpdateFolder_WriteFullBlock"); + + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + + if (BlockChunkPath.empty()) + { + ZEN_ASSERT(BlockBuffer); + } + else + { + ZEN_ASSERT(!BlockBuffer); + BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); + if (!BlockBuffer) + { + throw std::runtime_error(fmt::format("Could not open dowloaded block {} from {}", + BlockDescription.BlockHash, + BlockChunkPath)); + } + } + + FilteredWrittenBytesPerSecond.Start(); + if (!WriteBlockToDisk(CacheFolderPath, + RemoteContent, + BlockDescription, + SequenceIndexChunksLeftToWriteCounters, + CompositeBuffer(std::move(BlockBuffer)), + RemoteLookup, + RemoteChunkIndexNeedsCopyFromSourceFlags, + ChunkCountWritten, + WriteToDiskBytes)) + { + std::error_code DummyEc; + std::filesystem::remove(BlockChunkPath, DummyEc); + throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash)); + } + WritePartsComplete++; + + if (!BlockChunkPath.empty()) + { + std::filesystem::remove(BlockChunkPath); + } + + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } + } + }, + Work.DefaultErrorFunction()); + } + } + }, + Work.DefaultErrorFunction()); + } + ZEN_DEBUG("Fetching {} with {} slack (ideal {}) out of {} using {} requests for {} blocks", NiceBytes(AllBlocksFetch), NiceBytes(AllBlocksSlack), @@ -4809,28 +5412,30 @@ namespace { NiceBytes(AllBlocksSize), AllBlockRequests, BlocksNeededCount); - ZEN_TRACE_CPU("HandleChunks_Wait"); - - Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(IsAborted, PendingWork); - ZEN_ASSERT(ChunkCountToWrite >= ChunkCountWritten.load()); - FilteredWrittenBytesPerSecond.Update(WriteToDiskBytes.load()); - FilteredDownloadedBytesPerSecond.Update(BytesDownloaded.load()); - std::string Details = fmt::format("{}/{} ({} {}bits/s) downloaded. {}/{} ({} {}B/s) written.", - RequestsComplete.load(), - TotalRequestCount, - NiceBytes(BytesDownloaded.load()), - NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8), - ChunkCountWritten.load(), - ChunkCountToWrite, - NiceBytes(WriteToDiskBytes.load()), - NiceNum(FilteredWrittenBytesPerSecond.GetCurrent())); - WriteProgressBar.UpdateState({.Task = "Writing chunks ", - .Details = Details, - .TotalCount = gsl::narrow<uint64_t>(ChunkCountToWrite), - .RemainingCount = gsl::narrow<uint64_t>(ChunkCountToWrite - ChunkCountWritten.load())}, - false); - }); + { + ZEN_TRACE_CPU("WriteChunks_Wait"); + + Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(IsAborted, PendingWork); + ZEN_ASSERT(ChunkCountToWrite >= ChunkCountWritten.load()); + FilteredWrittenBytesPerSecond.Update(WriteToDiskBytes.load()); + FilteredDownloadedBytesPerSecond.Update(BytesDownloaded.load()); + std::string Details = fmt::format("{}/{} ({} {}bits/s) downloaded. {}/{} ({} {}B/s) written.", + RequestsComplete.load(), + TotalRequestCount, + NiceBytes(BytesDownloaded.load()), + NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8), + ChunkCountWritten.load(), + ChunkCountToWrite, + NiceBytes(WriteToDiskBytes.load()), + NiceNum(FilteredWrittenBytesPerSecond.GetCurrent())); + WriteProgressBar.UpdateState({.Task = "Writing chunks ", + .Details = Details, + .TotalCount = gsl::narrow<uint64_t>(ChunkCountToWrite), + .RemainingCount = gsl::narrow<uint64_t>(ChunkCountToWrite - ChunkCountWritten.load())}, + false); + }); + } FilteredWrittenBytesPerSecond.Stop(); FilteredDownloadedBytesPerSecond.Stop(); @@ -4842,19 +5447,32 @@ namespace { WriteProgressBar.Finish(); - ZEN_CONSOLE("Downloaded {} ({}bits/s). Wrote {} ({}B/s). Completed in {}", + uint32_t RawSequencesMissingWriteCount = 0; + for (uint32_t SequenceIndex = 0; SequenceIndex < SequenceIndexChunksLeftToWriteCounters.size(); SequenceIndex++) + { + const auto& SequenceIndexChunksLeftToWriteCounter = SequenceIndexChunksLeftToWriteCounters[SequenceIndex]; + if (SequenceIndexChunksLeftToWriteCounter.load() != 0) + { + RawSequencesMissingWriteCount++; + const uint32_t PathIndex = RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; + const std::filesystem::path& IncompletePath = RemoteContent.Paths[PathIndex]; + ZEN_ASSERT(!IncompletePath.empty()); + const uint32_t ExpectedSequenceCount = RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]; + ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounter.load() <= ExpectedSequenceCount); + } + } + ZEN_ASSERT(RawSequencesMissingWriteCount == 0); + + ZEN_CONSOLE("Downloaded {} ({}bits/s) in {}. Wrote {} ({}B/s) in {}. Completed in {}", NiceBytes(BytesDownloaded.load()), NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), BytesDownloaded * 8)), + NiceTimeSpanMs(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS() / 1000), NiceBytes(WriteToDiskBytes.load()), NiceNum(GetBytesPerSecond(FilteredWrittenBytesPerSecond.GetElapsedTimeUS(), WriteToDiskBytes.load())), + NiceTimeSpanMs(FilteredWrittenBytesPerSecond.GetElapsedTimeUS() / 1000), NiceTimeSpanMs(WriteTimer.GetElapsedTimeMs())); } - for (const auto& SequenceIndexChunksLeftToWriteCounter : SequenceIndexChunksLeftToWriteCounters) - { - ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounter.load() == 0); - } - std::vector<std::pair<IoHash, uint32_t>> Targets; Targets.reserve(RemoteContent.Paths.size()); for (uint32_t RemotePathIndex = 0; RemotePathIndex < RemoteContent.Paths.size(); RemotePathIndex++) @@ -4867,17 +5485,24 @@ namespace { // Move all files we will reuse to cache folder // TODO: If WipeTargetFolder is false we could check which files are already correct and leave them in place - for (uint32_t LocalPathIndex = 0; LocalPathIndex < LocalContent.Paths.size(); LocalPathIndex++) + if (!LocalPathIndexesMatchingSequenceIndexes.empty()) { - const IoHash& RawHash = LocalContent.RawHashes[LocalPathIndex]; - if (RemoteLookup.RawHashToSequenceIndex.contains(RawHash)) + ZEN_TRACE_CPU("UpdateFolder_CacheReused"); + uint64_t TotalFullFileSizeCached = 0; + for (uint32_t LocalPathIndex : LocalPathIndexesMatchingSequenceIndexes) { + const IoHash& RawHash = LocalContent.RawHashes[LocalPathIndex]; const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred(); const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash); ZEN_ASSERT_SLOW(std::filesystem::exists(LocalFilePath)); SetFileReadOnly(LocalFilePath, false); + ZEN_ASSERT_SLOW(!std::filesystem::exists(CacheFilePath)); std::filesystem::rename(LocalFilePath, CacheFilePath); + TotalFullFileSizeCached += std::filesystem::file_size(CacheFilePath); } + ZEN_CONSOLE("Saved {} ({}) unchanged files in cache", + LocalPathIndexesMatchingSequenceIndexes.size(), + NiceBytes(TotalFullFileSizeCached)); } if (WipeTargetFolder) @@ -4923,6 +5548,8 @@ namespace { } { + ZEN_TRACE_CPU("UpdateFolder_FinalizeTree"); + WorkerThreadPool& WritePool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // ProgressBar RebuildProgressBar(UsePlainProgress); @@ -4943,8 +5570,6 @@ namespace { break; } - ZEN_TRACE_CPU("UpdateFolder_FinalizeTree"); - size_t TargetCount = 1; const IoHash& RawHash = Targets[TargetOffset].first; while (Targets[TargetOffset + TargetCount].first == RawHash) @@ -5038,17 +5663,19 @@ namespace { TargetOffset += TargetCount; } - ZEN_TRACE_CPU("FinalizeTree_Wait"); + { + ZEN_TRACE_CPU("FinalizeTree_Wait"); - Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(IsAborted, PendingWork); - std::string Details = fmt::format("{}/{} files", TargetsComplete.load(), Targets.size()); - RebuildProgressBar.UpdateState({.Task = "Rebuilding state ", - .Details = Details, - .TotalCount = gsl::narrow<uint64_t>(Targets.size()), - .RemainingCount = gsl::narrow<uint64_t>(Targets.size() - TargetsComplete.load())}, - false); - }); + Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(IsAborted, PendingWork); + std::string Details = fmt::format("{}/{} files", TargetsComplete.load(), Targets.size()); + RebuildProgressBar.UpdateState({.Task = "Rebuilding state ", + .Details = Details, + .TotalCount = gsl::narrow<uint64_t>(Targets.size()), + .RemainingCount = gsl::narrow<uint64_t>(Targets.size() - TargetsComplete.load())}, + false); + }); + } if (AbortFlag) { @@ -5608,15 +6235,10 @@ namespace { const std::filesystem::path ZenTempFolder = Path / ZenTempFolderName; CreateDirectories(ZenTempFolder); - auto _ = MakeGuard([&]() { - CleanDirectory(ZenTempFolder, {}); - std::filesystem::remove(ZenTempFolder); - }); + CreateDirectories(Path / ZenTempBlockFolderName); - CreateDirectories(Path / ZenTempChunkFolderName); // TODO: Don't clear this - pick up files -> chunks to use - CreateDirectories(Path / - ZenTempCacheFolderName); // TODO: Don't clear this - pick up files and use as sequences (non .tmp extension) - // and delete .tmp (maybe?) - chunk them? How do we know the file is worth chunking? + CreateDirectories(Path / ZenTempCacheFolderName); + CreateDirectories(Path / ZenTempDownloadFolderName); std::uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u; @@ -5748,6 +6370,8 @@ namespace { ZEN_CONSOLE("Downloaded build in {}.", NiceTimeSpanMs(DownloadTimer.GetElapsedTimeMs())); } } + CleanDirectory(ZenTempFolder, {}); + std::filesystem::remove(ZenTempFolder); } void DiffFolders(const std::filesystem::path& BasePath, const std::filesystem::path& ComparePath, bool OnlyChunked) @@ -6191,6 +6815,15 @@ BuildsCommand::BuildsCommand() "<name>"); m_ValidateBuildPartOptions.parse_positional({"build-id", "build-part-id"}); m_ValidateBuildPartOptions.positional_help("build-id build-part-id"); + + AddCloudOptions(m_MultiTestDownloadOptions); + AddFileOptions(m_MultiTestDownloadOptions); + AddOutputOptions(m_MultiTestDownloadOptions); + m_MultiTestDownloadOptions + .add_option("", "l", "local-path", "Root file system folder used as base", cxxopts::value(m_Path), "<local-path>"); + m_MultiTestDownloadOptions.add_option("", "", "build-ids", "Build Ids list separated by ','", cxxopts::value(m_BuildIds), "<ids>"); + m_MultiTestDownloadOptions.parse_positional({"local-path"}); + m_MultiTestDownloadOptions.positional_help("local-path"); } BuildsCommand::~BuildsCommand() = default; @@ -6667,6 +7300,79 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) return AbortFlag ? 11 : 0; } + if (SubOption == &m_MultiTestDownloadOptions) + { + if (m_Path.empty()) + { + throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_DownloadOptions.help())); + } + + ParseStorageOptions(); + ParseAuthOptions(); + + HttpClient Http(m_BuildsUrl, ClientSettings); + // m_StoragePath = "D:\\buildstorage"; + // m_Path = "F:\\Saved\\DownloadedBuilds\\++Fortnite+Main-CL-XXXXXXXX\\WindowsClient"; + // std::vector<std::string> BuildIdStrings{"07d3942f0e7f4ca1b13b0587", + // "07d394eed89d769f2254e75d", + // "07d3953f22fa3f8000fa6f0a", + // "07d3959df47ed1f42ddbe44c", + // "07d395fa7803d50804f14417", + // "07d3964f919d577a321a1fdd", + // "07d396a6ce875004e16b9528"}; + + BuildStorage::Statistics StorageStats; + std::unique_ptr<BuildStorage> Storage; + std::string StorageName; + if (!m_BuildsUrl.empty()) + { + ZEN_CONSOLE("Downloading {} to '{}' from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}'", + FormatArray<std::string>(m_BuildIds, " "sv), + m_Path, + m_BuildsUrl, + Http.GetSessionId(), + m_Namespace, + m_Bucket); + Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName); + StorageName = "Cloud DDC"; + } + else if (!m_StoragePath.empty()) + { + ZEN_CONSOLE("Downloading {}'to '{}' from folder {}", FormatArray<std::string>(m_BuildIds, " "sv), m_Path, m_StoragePath); + Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); + StorageName = fmt::format("Disk {}", m_StoragePath.stem()); + } + else + { + throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); + } + + for (const std::string& BuildIdString : m_BuildIds) + { + Oid BuildId = Oid::FromHexString(BuildIdString); + if (BuildId == Oid::Zero) + { + throw zen::OptionParseException(fmt::format("invalid build id {}\n{}", BuildIdString, m_DownloadOptions.help())); + } + DownloadFolder(*Storage, + BuildId, + {}, + {}, + m_Path, + m_AllowMultiparts, + m_AllowPartialBlockRequests, + BuildIdString == m_BuildIds.front(), + true); + if (AbortFlag) + { + ZEN_CONSOLE("Download cancelled"); + return 11; + } + ZEN_CONSOLE("\n"); + } + return 0; + } + if (SubOption == &m_TestOptions) { ParseStorageOptions(); diff --git a/src/zen/cmds/builds_cmd.h b/src/zen/cmds/builds_cmd.h index 838a17807..167a5d29f 100644 --- a/src/zen/cmds/builds_cmd.h +++ b/src/zen/cmds/builds_cmd.h @@ -92,18 +92,22 @@ private: cxxopts::Options m_TestOptions{"test", "Test upload and download with verify"}; + cxxopts::Options m_MultiTestDownloadOptions{"multi-test-download", "Test multiple sequenced downloads with verify"}; + std::vector<std::string> m_BuildIds; + cxxopts::Options m_FetchBlobOptions{"fetch-blob", "Fetch a blob from remote store"}; std::string m_BlobHash; cxxopts::Options m_ValidateBuildPartOptions{"validate-part", "Fetch a build part and validate all referenced attachments"}; - cxxopts::Options* m_SubCommands[7] = {&m_ListOptions, + cxxopts::Options* m_SubCommands[8] = {&m_ListOptions, &m_UploadOptions, &m_DownloadOptions, &m_DiffOptions, &m_TestOptions, &m_FetchBlobOptions, - &m_ValidateBuildPartOptions}; + &m_ValidateBuildPartOptions, + &m_MultiTestDownloadOptions}; }; } // namespace zen diff --git a/src/zencore/basicfile.cpp b/src/zencore/basicfile.cpp index 6e879ca0d..95876cff4 100644 --- a/src/zencore/basicfile.cpp +++ b/src/zencore/basicfile.cpp @@ -858,7 +858,7 @@ BasicFileWriter::Flush() } IoBuffer -WriteToTempFile(const CompositeBuffer& Buffer, const std::filesystem::path& Path) +WriteToTempFile(CompositeBuffer&& Buffer, const std::filesystem::path& Path) { TemporaryFile Temp; std::error_code Ec; @@ -868,6 +868,7 @@ WriteToTempFile(const CompositeBuffer& Buffer, const std::filesystem::path& Path throw std::system_error(Ec, fmt::format("Failed to create temp file for blob at '{}'", Path)); } + uint64_t BufferSize = Buffer.GetSize(); { uint64_t Offset = 0; static const uint64_t BufferingSize = 256u * 1024u; @@ -899,21 +900,31 @@ WriteToTempFile(const CompositeBuffer& Buffer, const std::filesystem::path& Path Temp.MoveTemporaryIntoPlace(Path, Ec); if (Ec) { - IoBuffer TmpBuffer = IoBufferBuilder::MakeFromFile(Path); - if (TmpBuffer) + Ec.clear(); + BasicFile OpenTemp(Path, BasicFile::Mode::kDelete, Ec); + if (Ec) { - IoHash ExistingHash = IoHash::HashBuffer(TmpBuffer); - const IoHash ExpectedHash = IoHash::HashBuffer(Buffer); - if (ExistingHash == ExpectedHash) - { - TmpBuffer.SetDeleteOnClose(true); - return TmpBuffer; - } + throw std::system_error(Ec, fmt::format("Failed to move temp file to '{}'", Path)); } - throw std::system_error(Ec, fmt::format("Failed to move temp file to '{}'", Path)); - } + if (OpenTemp.FileSize() != BufferSize) + { + throw std::runtime_error(fmt::format("Failed to move temp file to '{}' - mismatching file size already exists", Path)); + } + IoBuffer TmpBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BufferSize, true); - IoBuffer TmpBuffer = IoBufferBuilder::MakeFromFile(Path); + IoHash ExistingHash = IoHash::HashBuffer(TmpBuffer); + const IoHash ExpectedHash = IoHash::HashBuffer(Buffer); + if (ExistingHash != ExpectedHash) + { + throw std::runtime_error(fmt::format("Failed to move temp file to '{}' - mismatching file hash already exists", Path)); + } + Buffer = CompositeBuffer{}; + TmpBuffer.SetDeleteOnClose(true); + return TmpBuffer; + } + Buffer = CompositeBuffer{}; + BasicFile OpenTemp(Path, BasicFile::Mode::kDelete); + IoBuffer TmpBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BufferSize, true); TmpBuffer.SetDeleteOnClose(true); return TmpBuffer; } diff --git a/src/zencore/compress.cpp b/src/zencore/compress.cpp index 1844f6a63..88c3bb5b9 100644 --- a/src/zencore/compress.cpp +++ b/src/zencore/compress.cpp @@ -193,7 +193,7 @@ public: const CompositeBuffer& CompressedData, uint64_t RawOffset, uint64_t RawSize, - std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const = 0; + std::function<bool(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const = 0; }; /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -287,13 +287,16 @@ public: const CompositeBuffer& CompressedData, uint64_t RawOffset, uint64_t RawSize, - std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const final + std::function<bool(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const final { if (Header.Method == CompressionMethod::None && Header.TotalCompressedSize == CompressedData.GetSize() && Header.TotalCompressedSize == Header.TotalRawSize + sizeof(BufferHeader) && RawOffset < Header.TotalRawSize && (RawOffset + RawSize) <= Header.TotalRawSize) { - Callback(0, CompressedData.Mid(sizeof(BufferHeader) + RawOffset, RawSize)); + if (!Callback(0, CompressedData.Mid(sizeof(BufferHeader) + RawOffset, RawSize))) + { + return false; + } return true; } return false; @@ -616,7 +619,7 @@ public: const CompositeBuffer& CompressedData, uint64_t RawOffset, uint64_t RawSize, - std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const final; + std::function<bool(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const final; protected: virtual bool DecompressBlock(MutableMemoryView RawData, MemoryView CompressedData) const = 0; @@ -744,7 +747,7 @@ BlockDecoder::DecompressToStream(const BufferHeader& Header, const CompositeBuffer& CompressedData, uint64_t RawOffset, uint64_t RawSize, - std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const + std::function<bool(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const { if (Header.TotalCompressedSize != CompressedData.GetSize()) { @@ -814,14 +817,23 @@ BlockDecoder::DecompressToStream(const BufferHeader& Header, Source.Detach(); return false; } - Callback(BlockIndex * BlockSize + OffsetInFirstBlock, - CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawDataBuffer.GetData(), BytesToUncompress))); + if (!Callback(BlockIndex * BlockSize + OffsetInFirstBlock, + CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawDataBuffer.GetData(), BytesToUncompress)))) + { + Source.Detach(); + return false; + } } else { - Callback(BlockIndex * BlockSize + OffsetInFirstBlock, - CompositeBuffer( - IoBuffer(IoBuffer::Wrap, CompressedBlockCopy.GetView().Mid(OffsetInFirstBlock).GetData(), BytesToUncompress))); + if (!Callback( + BlockIndex * BlockSize + OffsetInFirstBlock, + CompositeBuffer( + IoBuffer(IoBuffer::Wrap, CompressedBlockCopy.GetView().Mid(OffsetInFirstBlock).GetData(), BytesToUncompress)))) + { + Source.Detach(); + return false; + } } OffsetInFirstBlock = 0; @@ -858,14 +870,21 @@ BlockDecoder::DecompressToStream(const BufferHeader& Header, { return false; } - Callback(BlockIndex * BlockSize + OffsetInFirstBlock, - CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawDataBuffer.GetData(), BytesToUncompress))); + if (!Callback(BlockIndex * BlockSize + OffsetInFirstBlock, + CompositeBuffer(IoBuffer(IoBuffer::Wrap, RawDataBuffer.GetData(), BytesToUncompress)))) + { + return false; + } } else { - Callback(BlockIndex * BlockSize + OffsetInFirstBlock, - CompositeBuffer( - IoBuffer(IoBuffer::Wrap, CompressedBlockCopy.GetView().Mid(OffsetInFirstBlock).GetData(), BytesToUncompress))); + if (!Callback( + BlockIndex * BlockSize + OffsetInFirstBlock, + CompositeBuffer( + IoBuffer(IoBuffer::Wrap, CompressedBlockCopy.GetView().Mid(OffsetInFirstBlock).GetData(), BytesToUncompress)))) + { + return false; + } } OffsetInFirstBlock = 0; @@ -1978,7 +1997,7 @@ CompressedBuffer::DecompressToComposite() const bool CompressedBuffer::DecompressToStream(uint64_t RawOffset, uint64_t RawSize, - std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const + std::function<bool(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const { using namespace detail; if (CompressedData) diff --git a/src/zencore/include/zencore/basicfile.h b/src/zencore/include/zencore/basicfile.h index a78132879..57798b6f4 100644 --- a/src/zencore/include/zencore/basicfile.h +++ b/src/zencore/include/zencore/basicfile.h @@ -186,7 +186,7 @@ private: uint64_t m_BufferEnd; }; -IoBuffer WriteToTempFile(const CompositeBuffer& Buffer, const std::filesystem::path& Path); +IoBuffer WriteToTempFile(CompositeBuffer&& Buffer, const std::filesystem::path& Path); ZENCORE_API void basicfile_forcelink(); diff --git a/src/zencore/include/zencore/compress.h b/src/zencore/include/zencore/compress.h index 3969e9dbd..74fd5f767 100644 --- a/src/zencore/include/zencore/compress.h +++ b/src/zencore/include/zencore/compress.h @@ -209,7 +209,7 @@ public: */ [[nodiscard]] ZENCORE_API bool DecompressToStream(uint64_t RawOffset, uint64_t RawSize, - std::function<void(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const; + std::function<bool(uint64_t Offset, const CompositeBuffer& Range)>&& Callback) const; /** A null compressed buffer. */ static const CompressedBuffer Null; diff --git a/src/zencore/workthreadpool.cpp b/src/zencore/workthreadpool.cpp index d15fb2e83..445fe939e 100644 --- a/src/zencore/workthreadpool.cpp +++ b/src/zencore/workthreadpool.cpp @@ -274,7 +274,7 @@ WorkerThreadPool::ScheduleWork(Ref<IWork> Work) void WorkerThreadPool::ScheduleWork(std::function<void()>&& Work) { - ScheduleWork(Ref<IWork>(new detail::LambdaWork(Work))); + ScheduleWork(Ref<IWork>(new detail::LambdaWork(std::move(Work)))); } [[nodiscard]] size_t diff --git a/src/zenutil/chunkblock.cpp b/src/zenutil/chunkblock.cpp index a19cf5c1b..f3c14edc4 100644 --- a/src/zenutil/chunkblock.cpp +++ b/src/zenutil/chunkblock.cpp @@ -187,31 +187,54 @@ GenerateChunkBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks, return CompressedBlock; } -bool -IterateChunkBlock(const SharedBuffer& BlockPayload, - std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor, - uint64_t& OutHeaderSize) +std::vector<uint32_t> +ReadChunkBlockHeader(const MemoryView BlockView, uint64_t& OutHeaderSize) { - ZEN_ASSERT(BlockPayload); - if (BlockPayload.GetSize() < 1) - { - return false; - } - - MemoryView BlockView = BlockPayload.GetView(); - const uint8_t* ReadPtr = reinterpret_cast<const uint8_t*>(BlockView.GetData()); + const uint8_t* ReadPtr = reinterpret_cast<const uint8_t*>(BlockView.GetData()); uint32_t NumberSize; uint64_t ChunkCount = ReadVarUInt(ReadPtr, NumberSize); ReadPtr += NumberSize; - std::vector<uint64_t> ChunkSizes; + std::vector<uint32_t> ChunkSizes; ChunkSizes.reserve(ChunkCount); while (ChunkCount--) { - ChunkSizes.push_back(ReadVarUInt(ReadPtr, NumberSize)); + if (ReadPtr >= BlockView.GetDataEnd()) + { + throw std::runtime_error("Invalid block header, block data ended unexpectedly"); + } + uint64_t ChunkSize = ReadVarUInt(ReadPtr, NumberSize); + if (ChunkSize > std::numeric_limits<uint32_t>::max()) + { + throw std::runtime_error("Invalid block header, header data is corrupt"); + } + if (ChunkSize < 1) + { + throw std::runtime_error("Invalid block header, header data is corrupt"); + } + ChunkSizes.push_back(gsl::narrow<uint32_t>(ChunkSize)); ReadPtr += NumberSize; } uint64_t Offset = std::distance((const uint8_t*)BlockView.GetData(), ReadPtr); OutHeaderSize = Offset; + return ChunkSizes; +} + +bool +IterateChunkBlock(const SharedBuffer& BlockPayload, + std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor, + uint64_t& OutHeaderSize) +{ + ZEN_ASSERT(BlockPayload); + if (BlockPayload.GetSize() < 1) + { + return false; + } + + MemoryView BlockView = BlockPayload.GetView(); + + std::vector<uint32_t> ChunkSizes = ReadChunkBlockHeader(BlockView, OutHeaderSize); + uint64_t Offset = OutHeaderSize; + OutHeaderSize = Offset; for (uint64_t ChunkSize : ChunkSizes) { IoBuffer Chunk(BlockPayload.AsIoBuffer(), Offset, ChunkSize); diff --git a/src/zenutil/include/zenutil/chunkblock.h b/src/zenutil/include/zenutil/chunkblock.h index 21107fb7c..277580c74 100644 --- a/src/zenutil/include/zenutil/chunkblock.h +++ b/src/zenutil/include/zenutil/chunkblock.h @@ -31,9 +31,10 @@ CbObject BuildChunkBlockDescription(const ChunkBlockDescription& Block, ChunkBlockDescription GetChunkBlockDescription(const SharedBuffer& BlockPayload, const IoHash& RawHash); typedef std::function<std::pair<uint64_t, CompressedBuffer>(const IoHash& RawHash)> FetchChunkFunc; -CompressedBuffer GenerateChunkBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks, ChunkBlockDescription& OutBlock); -bool IterateChunkBlock(const SharedBuffer& BlockPayload, - std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor, - uint64_t& OutHeaderSize); +CompressedBuffer GenerateChunkBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks, ChunkBlockDescription& OutBlock); +bool IterateChunkBlock(const SharedBuffer& BlockPayload, + std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor, + uint64_t& OutHeaderSize); +std::vector<uint32_t> ReadChunkBlockHeader(const MemoryView BlockView, uint64_t& OutHeaderSize); } // namespace zen |