diff options
| author | Dan Engelbrecht <[email protected]> | 2025-03-07 09:58:20 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-03-07 09:58:20 +0100 |
| commit | 9b24647facccc9c7848a52f1f4c5e32055bf2f01 (patch) | |
| tree | d798889a2b3f6a0d72331890b75e92b0159ce0c3 /src | |
| parent | reduced memory churn using fixed_xxx containers (#236) (diff) | |
| download | zen-9b24647facccc9c7848a52f1f4c5e32055bf2f01.tar.xz zen-9b24647facccc9c7848a52f1f4c5e32055bf2f01.zip | |
partial block fetch (#298)
- Improvement: Do partial requests of blocks if not all of the block is needed
- Improvement: Better progress/statistics on download
- Bugfix: Ensure that temporary folder for Jupiter downloads exists during verify phase
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 1099 | ||||
| -rw-r--r-- | src/zen/cmds/builds_cmd.h | 1 | ||||
| -rw-r--r-- | src/zenhttp/httpclient.cpp | 161 | ||||
| -rw-r--r-- | src/zenutil/filebuildstorage.cpp | 17 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/buildstorage.h | 5 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/jupiter/jupitersession.h | 4 | ||||
| -rw-r--r-- | src/zenutil/jupiter/jupiterbuildstorage.cpp | 10 | ||||
| -rw-r--r-- | src/zenutil/jupiter/jupitersession.cpp | 12 |
8 files changed, 907 insertions, 402 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index eb0650c4d..1c9476b96 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -231,7 +231,10 @@ namespace { return AuthToken; } - CompositeBuffer WriteToTempFileIfNeeded(const CompositeBuffer& Buffer, const std::filesystem::path& TempFolderPath, const IoHash& Hash) + CompositeBuffer WriteToTempFileIfNeeded(const CompositeBuffer& Buffer, + const std::filesystem::path& TempFolderPath, + const IoHash& Hash, + const std::string& Suffix = {}) { // If this is a file based buffer or a compressed buffer with a memory-based header, we don't need to rewrite to disk to save memory std::span<const SharedBuffer> Segments = Buffer.GetSegments(); @@ -241,7 +244,7 @@ namespace { { return Buffer; } - std::filesystem::path TempFilePath = (TempFolderPath / Hash.ToHexString()).make_preferred(); + std::filesystem::path TempFilePath = (TempFolderPath / (Hash.ToHexString() + Suffix)).make_preferred(); return CompositeBuffer(WriteToTempFile(Buffer, TempFilePath)); } @@ -302,7 +305,7 @@ namespace { return FilteredPerSecond; } - uint64_t GetElapsedTime() const + uint64_t GetElapsedTimeUS() const { if (StartTimeUS == (uint64_t)-1) { @@ -1738,8 +1741,8 @@ namespace { ProgressBar.Finish(); - GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGeneratedBytesPerSecond.GetElapsedTime(); - UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTime(); + GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGeneratedBytesPerSecond.GetElapsedTimeUS(); + UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS(); } } @@ -2069,9 +2072,9 @@ namespace { }); ProgressBar.Finish(); - UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTime(); - GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGenerateBlockBytesPerSecond.GetElapsedTime(); - LooseChunksStats.CompressChunksElapsedWallTimeUS = FilteredCompressedBytesPerSecond.GetElapsedTime(); + UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS(); + GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGenerateBlockBytesPerSecond.GetElapsedTimeUS(); + LooseChunksStats.CompressChunksElapsedWallTimeUS = FilteredCompressedBytesPerSecond.GetElapsedTimeUS(); } } @@ -3358,17 +3361,8 @@ namespace { return ChunkTargetPtrs; }; - bool WriteBlockToDisk(const std::filesystem::path& CacheFolderPath, - const ChunkedFolderContent& RemoteContent, - std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, - const CompositeBuffer& DecompressedBlockBuffer, - const ChunkedContentLookup& Lookup, - std::atomic<bool>* RemoteChunkIndexNeedsCopyFromSourceFlags, - std::atomic<uint32_t>& OutChunksComplete, - std::atomic<uint64_t>& OutBytesWritten) + struct BlockWriteOps { - ZEN_TRACE_CPU("WriteBlockToDisk"); - std::vector<CompositeBuffer> ChunkBuffers; struct WriteOpData { @@ -3376,7 +3370,79 @@ namespace { size_t ChunkBufferIndex; }; std::vector<WriteOpData> WriteOps; + }; + void WriteBlockChunkOps(const std::filesystem::path& CacheFolderPath, + const ChunkedFolderContent& RemoteContent, + const ChunkedContentLookup& Lookup, + std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + const BlockWriteOps Ops, + std::atomic<uint32_t>& OutChunksComplete, + std::atomic<uint64_t>& OutBytesWritten) + { + ZEN_TRACE_CPU("WriteBlockChunkOps"); + { + WriteFileCache OpenFileCache; + for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps) + { + if (AbortFlag) + { + break; + } + const CompositeBuffer& Chunk = Ops.ChunkBuffers[WriteOp.ChunkBufferIndex]; + const uint32_t SequenceIndex = WriteOp.Target->SequenceIndex; + ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() <= + RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]); + ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() > 0); + const uint64_t ChunkSize = Chunk.GetSize(); + const uint64_t FileOffset = WriteOp.Target->Offset; + const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex]; + ZEN_ASSERT(FileOffset + ChunkSize <= RemoteContent.RawSizes[PathIndex]); + + OpenFileCache.WriteToFile<CompositeBuffer>( + SequenceIndex, + [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) { + return GetTempChunkedSequenceFileName(CacheFolderPath, + RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); + }, + Chunk, + FileOffset, + RemoteContent.RawSizes[PathIndex]); + OutBytesWritten += ChunkSize; + } + } + if (!AbortFlag) + { + // Write tracking, updating this must be done without any files open (WriteFileCache) + for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps) + { + const uint32_t RemoteSequenceIndex = WriteOp.Target->SequenceIndex; + if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1) + { + const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + const IoHash VerifyChunkHash = + IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash))); + if (VerifyChunkHash != SequenceRawHash) + { + throw std::runtime_error( + fmt::format("Written hunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash)); + } + std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash), + GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)); + } + } + OutChunksComplete += gsl::narrow<uint32_t>(Ops.ChunkBuffers.size()); + } + } + + bool GetBlockWriteOps(const ChunkedFolderContent& RemoteContent, + const ChunkedContentLookup& Lookup, + std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, + const CompositeBuffer& DecompressedBlockBuffer, + BlockWriteOps& OutOps) + { + ZEN_TRACE_CPU("GetBlockWriteOps"); SharedBuffer BlockBuffer = DecompressedBlockBuffer.Flatten(); uint64_t HeaderSize = 0; if (IterateChunkBlock( @@ -3402,91 +3468,207 @@ namespace { ZEN_ASSERT(Decompressed.GetSize() == RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]); for (const ChunkedContentLookup::ChunkSequenceLocation* Target : ChunkTargetPtrs) { - WriteOps.push_back(WriteOpData{.Target = Target, .ChunkBufferIndex = ChunkBuffers.size()}); + OutOps.WriteOps.push_back( + BlockWriteOps::WriteOpData{.Target = Target, .ChunkBufferIndex = OutOps.ChunkBuffers.size()}); } - ChunkBuffers.emplace_back(std::move(Decompressed)); + OutOps.ChunkBuffers.emplace_back(std::move(Decompressed)); } } } }, HeaderSize)) { - if (!WriteOps.empty()) + std::sort(OutOps.WriteOps.begin(), + OutOps.WriteOps.end(), + [](const BlockWriteOps::WriteOpData& Lhs, const BlockWriteOps::WriteOpData& Rhs) { + if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex) + { + return true; + } + if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex) + { + return false; + } + return Lhs.Target->Offset < Rhs.Target->Offset; + }); + + return true; + } + else + { + return false; + } + } + + bool WriteBlockToDisk(const std::filesystem::path& CacheFolderPath, + const ChunkedFolderContent& RemoteContent, + const ChunkBlockDescription& BlockDescription, + std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + const CompositeBuffer& BlockBuffer, + const ChunkedContentLookup& Lookup, + std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, + std::atomic<uint32_t>& OutChunksComplete, + std::atomic<uint64_t>& OutBytesWritten) + { + ZEN_TRACE_CPU("WriteBlockToDisk"); + + IoHash BlockRawHash; + uint64_t BlockRawSize; + CompressedBuffer CompressedBlockBuffer = CompressedBuffer::FromCompressed(BlockBuffer, BlockRawHash, BlockRawSize); + if (!CompressedBlockBuffer) + { + throw std::runtime_error(fmt::format("Block {} is not a compressed buffer", BlockDescription.BlockHash)); + } + + if (BlockRawHash != BlockDescription.BlockHash) + { + throw std::runtime_error( + fmt::format("Block {} header has a mismatching raw hash {}", BlockDescription.BlockHash, BlockRawHash)); + } + + CompositeBuffer DecompressedBlockBuffer = CompressedBlockBuffer.DecompressToComposite(); + if (!DecompressedBlockBuffer) + { + throw std::runtime_error(fmt::format("Block {} failed to decompress", BlockDescription.BlockHash)); + } + + ZEN_ASSERT_SLOW(BlockDescription.BlockHash == IoHash::HashBuffer(DecompressedBlockBuffer)); + + BlockWriteOps Ops; + if (GetBlockWriteOps(RemoteContent, + Lookup, + SequenceIndexChunksLeftToWriteCounters, + RemoteChunkIndexNeedsCopyFromSourceFlags, + DecompressedBlockBuffer, + Ops)) + { + WriteBlockChunkOps(CacheFolderPath, + RemoteContent, + Lookup, + SequenceIndexChunksLeftToWriteCounters, + Ops, + OutChunksComplete, + OutBytesWritten); + return true; + } + return false; + } + + bool GetPartialBlockWriteOps(const ChunkedFolderContent& RemoteContent, + const ChunkedContentLookup& Lookup, + const ChunkBlockDescription& BlockDescription, + std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, + const CompositeBuffer& PartialBlockBuffer, + uint32_t FirstIncludedBlockChunkIndex, + uint32_t LastIncludedBlockChunkIndex, + BlockWriteOps& OutOps) + { + ZEN_TRACE_CPU("GetPartialBlockWriteOps"); + uint32_t OffsetInBlock = 0; + for (uint32_t ChunkBlockIndex = FirstIncludedBlockChunkIndex; ChunkBlockIndex <= LastIncludedBlockChunkIndex; ChunkBlockIndex++) + { + const uint32_t ChunkCompressedSize = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex]; + const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex]; + if (auto It = Lookup.ChunkHashToChunkIndex.find(ChunkHash); It != Lookup.ChunkHashToChunkIndex.end()) { - if (!AbortFlag) + const uint32_t ChunkIndex = It->second; + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = + GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, Lookup, ChunkIndex); + + if (!ChunkTargetPtrs.empty()) { - std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOpData& Lhs, const WriteOpData& Rhs) { - if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex) + bool NeedsWrite = true; + if (RemoteChunkIndexNeedsCopyFromSourceFlags[ChunkIndex].compare_exchange_strong(NeedsWrite, false)) + { + CompositeBuffer Chunk = PartialBlockBuffer.Mid(OffsetInBlock, ChunkCompressedSize); + IoHash VerifyChunkHash; + uint64_t VerifyRawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Chunk, VerifyChunkHash, VerifyRawSize); + if (!Compressed) { - return true; + ZEN_ASSERT(false); } - if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex) + if (VerifyChunkHash != ChunkHash) { - return false; + ZEN_ASSERT(false); } - return Lhs.Target->Offset < Rhs.Target->Offset; - }); - - { - WriteFileCache OpenFileCache; - for (const WriteOpData& WriteOp : WriteOps) + if (VerifyRawSize != BlockDescription.ChunkRawLengths[ChunkBlockIndex]) { - if (AbortFlag) - { - break; - } - const CompositeBuffer& Chunk = ChunkBuffers[WriteOp.ChunkBufferIndex]; - const uint32_t SequenceIndex = WriteOp.Target->SequenceIndex; - ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() <= - RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]); - ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() > 0); - const uint64_t ChunkSize = Chunk.GetSize(); - const uint64_t FileOffset = WriteOp.Target->Offset; - const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex]; - ZEN_ASSERT(FileOffset + ChunkSize <= RemoteContent.RawSizes[PathIndex]); - - OpenFileCache.WriteToFile<CompositeBuffer>( - SequenceIndex, - [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) { - return GetTempChunkedSequenceFileName(CacheFolderPath, - RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); - }, - Chunk, - FileOffset, - RemoteContent.RawSizes[PathIndex]); - OutBytesWritten += ChunkSize; + ZEN_ASSERT(false); } - } - if (!AbortFlag) - { - ZEN_TRACE_CPU("WriteBlockToDisk_VerifyHash"); - - // Write tracking, updating this must be done without any files open (WriteFileCache) - for (const WriteOpData& WriteOp : WriteOps) + CompositeBuffer Decompressed = Compressed.DecompressToComposite(); + if (!Decompressed) { - const uint32_t RemoteSequenceIndex = WriteOp.Target->SequenceIndex; - if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1) - { - const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; - const IoHash VerifyChunkHash = IoHash::HashBuffer( - IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash))); - if (VerifyChunkHash != SequenceRawHash) - { - throw std::runtime_error(fmt::format("Written chunk sequence {} hash does not match expected hash {}", - VerifyChunkHash, - SequenceRawHash)); - } - std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash), - GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)); - } + throw std::runtime_error(fmt::format("Decompression of build blob {} failed", ChunkHash)); } + ZEN_ASSERT_SLOW(ChunkHash == IoHash::HashBuffer(Decompressed)); + ZEN_ASSERT(Decompressed.GetSize() == RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]); + for (const ChunkedContentLookup::ChunkSequenceLocation* Target : ChunkTargetPtrs) + { + OutOps.WriteOps.push_back( + BlockWriteOps::WriteOpData{.Target = Target, .ChunkBufferIndex = OutOps.ChunkBuffers.size()}); + } + OutOps.ChunkBuffers.emplace_back(std::move(Decompressed)); } - OutChunksComplete += gsl::narrow<uint32_t>(ChunkBuffers.size()); } } + + OffsetInBlock += ChunkCompressedSize; + } + std::sort(OutOps.WriteOps.begin(), + OutOps.WriteOps.end(), + [](const BlockWriteOps::WriteOpData& Lhs, const BlockWriteOps::WriteOpData& Rhs) { + if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex) + { + return true; + } + if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex) + { + return false; + } + return Lhs.Target->Offset < Rhs.Target->Offset; + }); + return true; + } + + bool WritePartialBlockToDisk(const std::filesystem::path& CacheFolderPath, + const ChunkedFolderContent& RemoteContent, + const ChunkBlockDescription& BlockDescription, + std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + const CompositeBuffer& PartialBlockBuffer, + uint32_t FirstIncludedBlockChunkIndex, + uint32_t LastIncludedBlockChunkIndex, + const ChunkedContentLookup& Lookup, + std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, + std::atomic<uint32_t>& OutChunksComplete, + std::atomic<uint64_t>& OutBytesWritten) + { + ZEN_TRACE_CPU("WritePartialBlockToDisk"); + BlockWriteOps Ops; + if (GetPartialBlockWriteOps(RemoteContent, + Lookup, + BlockDescription, + SequenceIndexChunksLeftToWriteCounters, + RemoteChunkIndexNeedsCopyFromSourceFlags, + PartialBlockBuffer, + FirstIncludedBlockChunkIndex, + LastIncludedBlockChunkIndex, + Ops)) + { + WriteBlockChunkOps(CacheFolderPath, + RemoteContent, + Lookup, + SequenceIndexChunksLeftToWriteCounters, + Ops, + OutChunksComplete, + OutBytesWritten); return true; } - return false; + else + { + return false; + } } SharedBuffer Decompress(const CompositeBuffer& CompressedChunk, const IoHash& ChunkHash, const uint64_t ChunkRawSize) @@ -3571,6 +3753,7 @@ namespace { CompositeBuffer&& CompressedPart, std::atomic<uint64_t>& WriteToDiskBytes) { + ZEN_TRACE_CPU("StreamDecompress"); const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash); TemporaryFile DecompressedTemp; std::error_code Ec; @@ -3633,10 +3816,9 @@ namespace { WorkerThreadPool& NetworkPool, std::atomic<uint64_t>& WriteToDiskBytes, std::atomic<uint64_t>& BytesDownloaded, - std::atomic<uint64_t>& LooseChunksBytes, - std::atomic<uint64_t>& DownloadedChunks, - std::atomic<uint32_t>& ChunksComplete, - std::atomic<uint64_t>& MultipartAttachmentCount) + std::atomic<uint64_t>& MultipartAttachmentCount, + std::function<void(uint64_t DowloadedBytes)>&& OnDownloadComplete, + std::function<void()>&& OnWriteComplete) { ZEN_TRACE_CPU("DownloadLargeBlob"); @@ -3665,22 +3847,20 @@ namespace { Workload, ChunkHash, &BytesDownloaded, - &LooseChunksBytes, + OnDownloadComplete = std::move(OnDownloadComplete), + OnWriteComplete = std::move(OnWriteComplete), &WriteToDiskBytes, - &DownloadedChunks, - &ChunksComplete, SequenceIndexChunksLeftToWriteCounters, ChunkTargetPtrs = std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>( ChunkTargetPtrs)](uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining) { BytesDownloaded += Chunk.GetSize(); - LooseChunksBytes += Chunk.GetSize(); if (!AbortFlag.load()) { Workload->TempFile.Write(Chunk.GetView(), Offset); if (Chunk.GetSize() == BytesRemaining) { - DownloadedChunks++; + OnDownloadComplete(Workload->TempFile.FileSize()); Work.ScheduleWork( WritePool, // GetSyncWorkerPool(),// @@ -3690,8 +3870,7 @@ namespace { ChunkHash, Workload, Offset, - BytesRemaining, - &ChunksComplete, + OnWriteComplete = std::move(OnWriteComplete), &WriteToDiskBytes, SequenceIndexChunksLeftToWriteCounters, ChunkTargetPtrs](std::atomic<bool>&) { @@ -3725,7 +3904,7 @@ namespace { CompositeBuffer(std::move(CompressedPart)), WriteToDiskBytes); NeedHashVerify = false; - ChunksComplete++; + OnWriteComplete(); } else { @@ -3748,7 +3927,7 @@ namespace { CompositeBuffer(Chunk), OpenFileCache, WriteToDiskBytes); - ChunksComplete++; + OnWriteComplete(); } } @@ -3760,12 +3939,12 @@ namespace { const uint32_t RemoteSequenceIndex = Location->SequenceIndex; if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1) { - ZEN_TRACE_CPU("VerifyChunkHash"); - const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; if (NeedHashVerify) { + ZEN_TRACE_CPU("VerifyChunkHash"); + const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile( GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash))); if (VerifyChunkHash != ChunkHash) @@ -3816,6 +3995,7 @@ namespace { const ChunkedFolderContent& RemoteContent, const std::vector<ChunkBlockDescription>& BlockDescriptions, const std::vector<IoHash>& LooseChunkHashes, + bool AllowPartialBlockRequests, bool WipeTargetFolder, FolderContent& OutLocalFolderState) { @@ -3967,11 +4147,17 @@ namespace { } } - std::atomic<uint32_t> ChunkCountWritten = 0; + uint64_t TotalRequestCount = 0; + std::atomic<uint64_t> RequestsComplete = 0; + std::atomic<uint32_t> ChunkCountWritten = 0; + std::atomic<size_t> TotalPartWriteCount = 0; + std::atomic<size_t> WritePartsComplete = 0; { ZEN_TRACE_CPU("HandleChunks"); + Stopwatch WriteTimer; + FilteredRate FilteredDownloadedBytesPerSecond; FilteredRate FilteredWrittenBytesPerSecond; @@ -4010,6 +4196,8 @@ namespace { } else { + TotalRequestCount++; + TotalPartWriteCount++; Work.ScheduleWork( NetworkPool, // GetSyncWorkerPool(),// [&, ChunkHash, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) { @@ -4020,25 +4208,39 @@ namespace { FilteredDownloadedBytesPerSecond.Start(); if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize) { - DownloadLargeBlob(Storage, - Path / ZenTempChunkFolderName, - CacheFolderPath, - RemoteContent, - RemoteLookup, - BuildId, - ChunkHash, - PreferredMultipartChunkSize, - ChunkTargetPtrs, - SequenceIndexChunksLeftToWriteCounters, - Work, - WritePool, - NetworkPool, - WriteToDiskBytes, - BytesDownloaded, - LooseChunksBytes, - DownloadedChunks, - ChunkCountWritten, - MultipartAttachmentCount); + DownloadLargeBlob( + Storage, + Path / ZenTempChunkFolderName, + CacheFolderPath, + RemoteContent, + RemoteLookup, + BuildId, + ChunkHash, + PreferredMultipartChunkSize, + ChunkTargetPtrs, + SequenceIndexChunksLeftToWriteCounters, + Work, + WritePool, + NetworkPool, + WriteToDiskBytes, + BytesDownloaded, + MultipartAttachmentCount, + [&](uint64_t BytesDownloaded) { + LooseChunksBytes += BytesDownloaded; + RequestsComplete++; + if (RequestsComplete == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + }, + [&]() { + ChunkCountWritten++; + WritePartsComplete++; + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } + }); } else { @@ -4049,6 +4251,11 @@ namespace { } BytesDownloaded += CompressedPart.GetSize(); LooseChunksBytes += CompressedPart.GetSize(); + RequestsComplete++; + if (RequestsComplete == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } CompositeBuffer Payload = WriteToTempFileIfNeeded(CompositeBuffer(std::move(CompressedPart)), Path / ZenTempChunkFolderName, ChunkHash); @@ -4077,6 +4284,11 @@ namespace { CompositeBuffer(CompressedPart), WriteToDiskBytes); ChunkCountWritten++; + WritePartsComplete++; + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } NeedHashVerify = false; } else @@ -4096,10 +4308,17 @@ namespace { OpenFileCache, WriteToDiskBytes); ChunkCountWritten++; + WritePartsComplete++; + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } } } if (!AbortFlag) { + WritePartsComplete++; + // Write tracking, updating this must be done without any files open // (WriteFileCache) for (const ChunkedContentLookup::ChunkSequenceLocation* Location : @@ -4109,12 +4328,12 @@ namespace { if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub( 1) == 1) { - ZEN_TRACE_CPU("UpdateFolder_VerifyHash"); - const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; if (NeedHashVerify) { + ZEN_TRACE_CPU("UpdateFolder_VerifyHash"); + const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile( GetTempChunkedSequenceFileName(CacheFolderPath, @@ -4155,6 +4374,8 @@ namespace { break; } + TotalPartWriteCount++; + Work.ScheduleWork( WritePool, // GetSyncWorkerPool(),// [&, CopyDataIndex](std::atomic<bool>&) { @@ -4166,245 +4387,428 @@ namespace { const CacheCopyData& CopyData = CacheCopyDatas[CopyDataIndex]; const uint32_t LocalPathIndex = LocalLookup.SequenceIndexFirstPathIndex[CopyData.LocalSequenceIndex]; const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred(); - if (!CopyData.TargetChunkLocationPtrs.empty()) - { - uint64_t CacheLocalFileBytesRead = 0; + ZEN_ASSERT(!CopyData.TargetChunkLocationPtrs.empty()); - size_t TargetStart = 0; - const std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> AllTargets( - CopyData.TargetChunkLocationPtrs); + uint64_t CacheLocalFileBytesRead = 0; - struct WriteOp - { - const ChunkedContentLookup::ChunkSequenceLocation* Target; - uint64_t CacheFileOffset; - uint64_t ChunkSize; - }; + size_t TargetStart = 0; + const std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> AllTargets( + CopyData.TargetChunkLocationPtrs); + + struct WriteOp + { + const ChunkedContentLookup::ChunkSequenceLocation* Target; + uint64_t CacheFileOffset; + uint64_t ChunkSize; + }; - std::vector<WriteOp> WriteOps; - WriteOps.reserve(AllTargets.size()); + std::vector<WriteOp> WriteOps; + WriteOps.reserve(AllTargets.size()); - for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets) + for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets) + { + std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> TargetRange = + AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount); + for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange) { - std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> TargetRange = - AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount); - for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange) - { - WriteOps.push_back(WriteOp{.Target = Target, - .CacheFileOffset = ChunkTarget.CacheFileOffset, - .ChunkSize = ChunkTarget.ChunkRawSize}); - } - TargetStart += ChunkTarget.TargetChunkLocationCount; + WriteOps.push_back(WriteOp{.Target = Target, + .CacheFileOffset = ChunkTarget.CacheFileOffset, + .ChunkSize = ChunkTarget.ChunkRawSize}); } + TargetStart += ChunkTarget.TargetChunkLocationCount; + } - std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) { - if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex) - { - return true; - } - else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex) - { - return false; - } - if (Lhs.Target->Offset < Rhs.Target->Offset) - { - return true; - } + std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) { + if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex) + { + return true; + } + else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex) + { return false; - }); + } + if (Lhs.Target->Offset < Rhs.Target->Offset) + { + return true; + } + return false; + }); - if (!AbortFlag) + if (!AbortFlag) + { + BufferedOpenFile SourceFile(LocalFilePath); + WriteFileCache OpenFileCache; + for (const WriteOp& Op : WriteOps) { - BufferedOpenFile SourceFile(LocalFilePath); - WriteFileCache OpenFileCache; - for (const WriteOp& Op : WriteOps) + if (AbortFlag) { - if (AbortFlag) - { - break; - } - const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex; - ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() <= - RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]); - ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() > 0); - const uint32_t RemotePathIndex = RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]; - const uint64_t ChunkSize = Op.ChunkSize; - CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ChunkSize); - - ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]); - - OpenFileCache.WriteToFile<CompositeBuffer>( - RemoteSequenceIndex, - [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) { - return GetTempChunkedSequenceFileName( - CacheFolderPath, - RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); - }, - ChunkSource, - Op.Target->Offset, - RemoteContent.RawSizes[RemotePathIndex]); - WriteToDiskBytes += ChunkSize; - CacheLocalFileBytesRead += ChunkSize; // TODO: This should be the sum of unique chunk sizes? + break; } + const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex; + ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() <= + RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]); + ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() > 0); + const uint32_t RemotePathIndex = RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]; + const uint64_t ChunkSize = Op.ChunkSize; + CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ChunkSize); + + ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]); + + OpenFileCache.WriteToFile<CompositeBuffer>( + RemoteSequenceIndex, + [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) { + return GetTempChunkedSequenceFileName( + CacheFolderPath, + RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); + }, + ChunkSource, + Op.Target->Offset, + RemoteContent.RawSizes[RemotePathIndex]); + WriteToDiskBytes += ChunkSize; + CacheLocalFileBytesRead += ChunkSize; // TODO: This should be the sum of unique chunk sizes? } - if (!AbortFlag) + } + if (!AbortFlag) + { + // Write tracking, updating this must be done without any files open (WriteFileCache) + for (const WriteOp& Op : WriteOps) { - if (!AbortFlag) + ZEN_TRACE_CPU("UpdateFolder_Copy_VerifyHash"); + + const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex; + if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1) { - // Write tracking, updating this must be done without any files open (WriteFileCache) - for (const WriteOp& Op : WriteOps) + const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile( + GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash))); + if (VerifyChunkHash != SequenceRawHash) { - ZEN_TRACE_CPU("UpdateFolder_Copy_VerifyHash"); - - const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex; - if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1) - { - const IoHash& SequenceRawHash = - RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; - const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile( - GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash))); - if (VerifyChunkHash != SequenceRawHash) - { - throw std::runtime_error( - fmt::format("Written chunk sequence {} hash does not match expected hash {}", - VerifyChunkHash, - SequenceRawHash)); - } - - ZEN_TRACE_CPU("UpdateFolder_Copy_rename"); - std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash), - GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)); - } + throw std::runtime_error( + fmt::format("Written chunk sequence {} hash does not match expected hash {}", + VerifyChunkHash, + SequenceRawHash)); } - } - ChunkCountWritten += gsl::narrow<uint32_t>(CopyData.ChunkTargets.size()); - ZEN_DEBUG("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), LocalContent.Paths[LocalPathIndex]); + ZEN_TRACE_CPU("UpdateFolder_Copy_rename"); + std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash), + GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)); + } } + + ChunkCountWritten += gsl::narrow<uint32_t>(CopyData.ChunkTargets.size()); + ZEN_DEBUG("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), LocalContent.Paths[LocalPathIndex]); + } + WritePartsComplete++; + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); } } }, Work.DefaultErrorFunction()); } - size_t BlockCount = BlockDescriptions.size(); - std::atomic<size_t> BlocksComplete = 0; - - auto IsBlockNeeded = [&RemoteContent, &RemoteLookup, &RemoteChunkIndexNeedsCopyFromSourceFlags]( - const ChunkBlockDescription& BlockDescription) -> bool { - for (const IoHash& ChunkHash : BlockDescription.ChunkRawHashes) - { - if (auto It = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); It != RemoteLookup.ChunkHashToChunkIndex.end()) - { - const uint32_t RemoteChunkIndex = It->second; - if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex]) - { - return true; - } - } - } - return false; + size_t BlockCount = BlockDescriptions.size(); + + std::vector<bool> ChunkIsPickedUpByBlock(RemoteContent.ChunkedContent.ChunkHashes.size(), false); + auto GetNeededChunkBlockIndexes = [&RemoteContent, + &RemoteLookup, + &RemoteChunkIndexNeedsCopyFromSourceFlags, + &ChunkIsPickedUpByBlock](const ChunkBlockDescription& BlockDescription) { + std::vector<uint32_t> NeededBlockChunkIndexes; + for (uint32_t ChunkBlockIndex = 0; ChunkBlockIndex < BlockDescription.ChunkRawHashes.size(); ChunkBlockIndex++) + { + const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex]; + if (auto It = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); It != RemoteLookup.ChunkHashToChunkIndex.end()) + { + const uint32_t RemoteChunkIndex = It->second; + if (!ChunkIsPickedUpByBlock[RemoteChunkIndex]) + { + if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex]) + { + ChunkIsPickedUpByBlock[RemoteChunkIndex] = true; + NeededBlockChunkIndexes.push_back(ChunkBlockIndex); + } + } + } + } + return NeededBlockChunkIndexes; }; - size_t BlocksNeededCount = 0; + size_t BlocksNeededCount = 0; + uint64_t AllBlocksSize = 0; + uint64_t AllBlocksFetch = 0; + uint64_t AllBlocksSlack = 0; + uint64_t AllBlockRequests = 0; + uint64_t AllBlockChunksSize = 0; for (size_t BlockIndex = 0; BlockIndex < BlockCount; BlockIndex++) { if (Work.IsAborted()) { break; } - if (IsBlockNeeded(BlockDescriptions[BlockIndex])) + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + const std::vector<uint32_t> BlockChunkIndexNeeded = GetNeededChunkBlockIndexes(BlockDescription); + if (!BlockChunkIndexNeeded.empty()) { - BlocksNeededCount++; - Work.ScheduleWork( - NetworkPool, - [&, BlockIndex](std::atomic<bool>&) { - if (!AbortFlag) + bool WantsToDoPartialBlockDownload = BlockChunkIndexNeeded.size() < BlockDescription.ChunkRawHashes.size(); + bool CanDoPartialBlockDownload = (BlockDescription.HeaderSize > 0) && (BlockDescription.ChunkCompressedLengths.size() == + BlockDescription.ChunkRawHashes.size()); + if (AllowPartialBlockRequests && WantsToDoPartialBlockDownload && CanDoPartialBlockDownload) + { + struct BlockRangeDescriptor + { + uint64_t RangeStart = 0; + uint64_t RangeLength = 0; + uint32_t ChunkBlockIndexStart = 0; + uint32_t ChunkBlockIndexCount = 0; + }; + std::vector<BlockRangeDescriptor> BlockRanges; + + ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_PartialAnalisys"); + + uint32_t NeedBlockChunkIndexOffset = 0; + uint32_t ChunkBlockIndex = 0; + uint32_t CurrentOffset = + gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); + + BlockRangeDescriptor NextRange; + while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() && + ChunkBlockIndex < BlockDescription.ChunkRawHashes.size()) + { + const uint32_t ChunkCompressedLength = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex]; + if (ChunkBlockIndex < BlockChunkIndexNeeded[NeedBlockChunkIndexOffset]) { - ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_Read"); - - FilteredDownloadedBytesPerSecond.Start(); - IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockDescriptions[BlockIndex].BlockHash); - if (!BlockBuffer) + if (NextRange.RangeLength > 0) { - throw std::runtime_error(fmt::format("Block {} is missing", BlockDescriptions[BlockIndex].BlockHash)); + BlockRanges.push_back(NextRange); + NextRange = {}; } - BytesDownloaded += BlockBuffer.GetSize(); - BlockBytes += BlockBuffer.GetSize(); - DownloadedBlocks++; - CompositeBuffer Payload = WriteToTempFileIfNeeded(CompositeBuffer(std::move(BlockBuffer)), - Path / ZenTempBlockFolderName, - BlockDescriptions[BlockIndex].BlockHash); - - if (!AbortFlag) + ChunkBlockIndex++; + CurrentOffset += ChunkCompressedLength; + } + else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset]) + { + AllBlockChunksSize += ChunkCompressedLength; + if (NextRange.RangeLength == 0) { - Work.ScheduleWork( - WritePool, - [&, BlockIndex, BlockBuffer = std::move(Payload)](std::atomic<bool>&) { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_Write"); - - FilteredWrittenBytesPerSecond.Start(); - IoHash BlockRawHash; - uint64_t BlockRawSize; - CompressedBuffer CompressedBlockBuffer = - CompressedBuffer::FromCompressed(std::move(BlockBuffer), BlockRawHash, BlockRawSize); - if (!CompressedBlockBuffer) - { - throw std::runtime_error(fmt::format("Block {} is not a compressed buffer", - BlockDescriptions[BlockIndex].BlockHash)); - } + NextRange.RangeStart = CurrentOffset; + NextRange.ChunkBlockIndexStart = ChunkBlockIndex; + } + NextRange.RangeLength += ChunkCompressedLength; + NextRange.ChunkBlockIndexCount++; + ChunkBlockIndex++; + CurrentOffset += ChunkCompressedLength; + NeedBlockChunkIndexOffset++; + } + else + { + ZEN_ASSERT(false); + } + } + AllBlocksSize += CurrentOffset; + if (NextRange.RangeLength > 0) + { + BlockRanges.push_back(NextRange); + NextRange = {}; + } - if (BlockRawHash != BlockDescriptions[BlockIndex].BlockHash) - { - throw std::runtime_error(fmt::format("Block {} header has a mismatching raw hash {}", - BlockDescriptions[BlockIndex].BlockHash, - BlockRawHash)); - } + ZEN_ASSERT(!BlockRanges.empty()); + std::vector<BlockRangeDescriptor> CollapsedBlockRanges; + auto It = BlockRanges.begin(); + CollapsedBlockRanges.push_back(*It++); + uint64_t TotalSlack = 0; + while (It != BlockRanges.end()) + { + BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back(); + uint64_t Slack = It->RangeStart - (LastRange.RangeStart + LastRange.RangeLength); + uint64_t BothRangeSize = It->RangeLength + LastRange.RangeLength; + if (Slack <= Max(BothRangeSize / 8, 64u * 1024u)) // Made up heuristic - we'll see how it pans out + { + LastRange.ChunkBlockIndexCount = + (It->ChunkBlockIndexStart + It->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart; + LastRange.RangeLength = (It->RangeStart + It->RangeLength) - LastRange.RangeStart; + TotalSlack += Slack; + } + else + { + CollapsedBlockRanges.push_back(*It); + } + ++It; + } + + uint64_t TotalFetch = 0; + for (const BlockRangeDescriptor& Range : CollapsedBlockRanges) + { + TotalFetch += Range.RangeLength; + } + + AllBlocksFetch += TotalFetch; + AllBlocksSlack += TotalSlack; + BlocksNeededCount++; + AllBlockRequests += CollapsedBlockRanges.size(); + + for (size_t BlockRangeIndex = 0; BlockRangeIndex < CollapsedBlockRanges.size(); BlockRangeIndex++) + { + TotalRequestCount++; + TotalPartWriteCount++; + const BlockRangeDescriptor BlockRange = CollapsedBlockRanges[BlockRangeIndex]; + // Partial block schedule + Work.ScheduleWork( + NetworkPool, // NetworkPool, // GetSyncWorkerPool() + [&, BlockIndex, BlockRange](std::atomic<bool>&) { + ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_PartialGet"); + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + + FilteredDownloadedBytesPerSecond.Start(); + IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength); + if (!BlockBuffer) + { + throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); + } + BytesDownloaded += BlockBuffer.GetSize(); + BlockBytes += BlockBuffer.GetSize(); + DownloadedBlocks++; + CompositeBuffer Payload = WriteToTempFileIfNeeded(CompositeBuffer(std::move(BlockBuffer)), + Path / ZenTempBlockFolderName, + BlockDescription.BlockHash, + fmt::format("_{}", BlockRange.RangeStart)); + RequestsComplete++; + if (RequestsComplete == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } - CompositeBuffer DecompressedBlockBuffer = CompressedBlockBuffer.DecompressToComposite(); - if (!DecompressedBlockBuffer) + if (!AbortFlag) + { + Work.ScheduleWork( + WritePool, // WritePool, // GetSyncWorkerPool(), + [&, BlockIndex, BlockRange, BlockPartialBuffer = std::move(Payload)](std::atomic<bool>&) { + if (!AbortFlag) { - throw std::runtime_error(fmt::format("Block {} failed to decompress", - BlockDescriptions[BlockIndex].BlockHash)); + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + + FilteredWrittenBytesPerSecond.Start(); + + if (!WritePartialBlockToDisk( + CacheFolderPath, + RemoteContent, + BlockDescription, + SequenceIndexChunksLeftToWriteCounters, + BlockPartialBuffer, + BlockRange.ChunkBlockIndexStart, + BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1, + RemoteLookup, + RemoteChunkIndexNeedsCopyFromSourceFlags, + ChunkCountWritten, + WriteToDiskBytes)) + { + throw std::runtime_error( + fmt::format("Partial block {} is malformed", BlockDescription.BlockHash)); + } + WritePartsComplete++; + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } } + }, + [&, BlockIndex](const std::exception& Ex, std::atomic<bool>&) { + ZEN_ERROR("Failed writing block {}. Reason: {}", + BlockDescriptions[BlockIndex].BlockHash, + Ex.what()); + AbortFlag = true; + }); + } + }, + Work.DefaultErrorFunction()); + } + } + else + { + BlocksNeededCount++; + TotalRequestCount++; + TotalPartWriteCount++; + + Work.ScheduleWork( + NetworkPool, // GetSyncWorkerPool(), // NetworkPool, + [&, BlockIndex](std::atomic<bool>&) { + if (!AbortFlag) + { + ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_Get"); + + // Full block schedule - ZEN_ASSERT_SLOW(BlockDescriptions[BlockIndex].BlockHash == - IoHash::HashBuffer(DecompressedBlockBuffer)); - - if (!WriteBlockToDisk(CacheFolderPath, - RemoteContent, - SequenceIndexChunksLeftToWriteCounters, - DecompressedBlockBuffer, - RemoteLookup, - RemoteChunkIndexNeedsCopyFromSourceFlags.data(), - ChunkCountWritten, - WriteToDiskBytes)) + FilteredDownloadedBytesPerSecond.Start(); + IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockDescription.BlockHash); + if (!BlockBuffer) + { + throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); + } + BytesDownloaded += BlockBuffer.GetSize(); + BlockBytes += BlockBuffer.GetSize(); + DownloadedBlocks++; + RequestsComplete++; + if (RequestsComplete == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + + CompositeBuffer Payload = WriteToTempFileIfNeeded(CompositeBuffer(std::move(BlockBuffer)), + Path / ZenTempBlockFolderName, + BlockDescription.BlockHash); + if (!AbortFlag) + { + Work.ScheduleWork( + WritePool, + [&, BlockIndex, BlockBuffer = std::move(Payload)](std::atomic<bool>&) { + if (!AbortFlag) { - throw std::runtime_error( - fmt::format("Block {} is malformed", BlockDescriptions[BlockIndex].BlockHash)); + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + + FilteredWrittenBytesPerSecond.Start(); + if (!WriteBlockToDisk(CacheFolderPath, + RemoteContent, + BlockDescription, + SequenceIndexChunksLeftToWriteCounters, + BlockBuffer, + RemoteLookup, + RemoteChunkIndexNeedsCopyFromSourceFlags, + ChunkCountWritten, + WriteToDiskBytes)) + { + throw std::runtime_error( + fmt::format("Block {} is malformed", BlockDescription.BlockHash)); + } + WritePartsComplete++; + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } } - BlocksComplete++; - } - }, - [&, BlockIndex](const std::exception& Ex, std::atomic<bool>&) { - ZEN_ERROR("Failed writing block {}. Reason: {}", - BlockDescriptions[BlockIndex].BlockHash, - Ex.what()); - AbortFlag = true; - }); + }, + Work.DefaultErrorFunction()); + } } - } - }, - Work.DefaultErrorFunction()); + }, + Work.DefaultErrorFunction()); + } } else { ZEN_DEBUG("Skipping block {} due to cache reuse", BlockDescriptions[BlockIndex].BlockHash); } } - + ZEN_DEBUG("Fetching {} with {} slack (ideal {}) out of {} using {} requests for {} blocks", + NiceBytes(AllBlocksFetch), + NiceBytes(AllBlocksSlack), + NiceBytes(AllBlockChunksSize), + NiceBytes(AllBlocksSize), + AllBlockRequests, + BlocksNeededCount); ZEN_TRACE_CPU("HandleChunks_Wait"); Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { @@ -4412,13 +4816,13 @@ namespace { ZEN_ASSERT(ChunkCountToWrite >= ChunkCountWritten.load()); FilteredWrittenBytesPerSecond.Update(WriteToDiskBytes.load()); FilteredDownloadedBytesPerSecond.Update(BytesDownloaded.load()); - std::string Details = fmt::format("{}/{} chunks. {}/{} blocks. {} {}bits/s downloaded. {} {}B/s written", - ChunkCountWritten.load(), - ChunkCountToWrite, - BlocksComplete.load(), - BlocksNeededCount, + std::string Details = fmt::format("{}/{} ({} {}bits/s) downloaded. {}/{} ({} {}B/s) written.", + RequestsComplete.load(), + TotalRequestCount, NiceBytes(BytesDownloaded.load()), NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8), + ChunkCountWritten.load(), + ChunkCountToWrite, NiceBytes(WriteToDiskBytes.load()), NiceNum(FilteredWrittenBytesPerSecond.GetCurrent())); WriteProgressBar.UpdateState({.Task = "Writing chunks ", @@ -4437,6 +4841,13 @@ namespace { } WriteProgressBar.Finish(); + + ZEN_CONSOLE("Downloaded {} ({}bits/s). Wrote {} ({}B/s). Completed in {}", + NiceBytes(BytesDownloaded.load()), + NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), BytesDownloaded * 8)), + NiceBytes(WriteToDiskBytes.load()), + NiceNum(GetBytesPerSecond(FilteredWrittenBytesPerSecond.GetElapsedTimeUS(), WriteToDiskBytes.load())), + NiceTimeSpanMs(WriteTimer.GetElapsedTimeMs())); } for (const auto& SequenceIndexChunksLeftToWriteCounter : SequenceIndexChunksLeftToWriteCounters) @@ -4455,6 +4866,7 @@ namespace { }); // Move all files we will reuse to cache folder + // TODO: If WipeTargetFolder is false we could check which files are already correct and leave them in place for (uint32_t LocalPathIndex = 0; LocalPathIndex < LocalContent.Paths.size(); LocalPathIndex++) { const IoHash& RawHash = LocalContent.RawHashes[LocalPathIndex]; @@ -5186,6 +5598,7 @@ namespace { std::span<const std::string> BuildPartNames, const std::filesystem::path& Path, bool AllowMultiparts, + bool AllowPartialBlockRequests, bool WipeTargetFolder, bool PostDownloadVerify) { @@ -5202,8 +5615,8 @@ namespace { CreateDirectories(Path / ZenTempBlockFolderName); CreateDirectories(Path / ZenTempChunkFolderName); // TODO: Don't clear this - pick up files -> chunks to use CreateDirectories(Path / - ZenTempCacheFolderName); // TODO: Don't clear this - pick up files and use as sequences (non .tmp extension) and - // delete .tmp (maybe?) - chunk them? How do we know the file is worth chunking? + ZenTempCacheFolderName); // TODO: Don't clear this - pick up files and use as sequences (non .tmp extension) + // and delete .tmp (maybe?) - chunk them? How do we know the file is worth chunking? std::uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u; @@ -5311,6 +5724,7 @@ namespace { RemoteContent, BlockDescriptions, LooseChunkHashes, + AllowPartialBlockRequests, WipeTargetFolder, LocalFolderState); @@ -5705,6 +6119,12 @@ BuildsCommand::BuildsCommand() "Allow large attachments to be transfered using multipart protocol. Defaults to true.", cxxopts::value(m_AllowMultiparts), "<allowmultipart>"); + m_DownloadOptions.add_option("", + "", + "allow-partial-block-requests", + "Allow request for partial chunk blocks. Defaults to true.", + cxxopts::value(m_AllowPartialBlockRequests), + "<allowpartialblockrequests>"); m_DownloadOptions .add_option("", "", "verify", "Enable post download verify of all tracked files", cxxopts::value(m_PostDownloadVerify), "<verify>"); m_DownloadOptions.parse_positional({"local-path", "build-id", "build-part-name"}); @@ -5728,6 +6148,18 @@ BuildsCommand::BuildsCommand() AddOutputOptions(m_TestOptions); m_TestOptions.add_options()("h,help", "Print help"); m_TestOptions.add_option("", "l", "local-path", "Root file system folder used as base", cxxopts::value(m_Path), "<local-path>"); + m_TestOptions.add_option("", + "", + "allow-multipart", + "Allow large attachments to be transfered using multipart protocol. Defaults to true.", + cxxopts::value(m_AllowMultiparts), + "<allowmultipart>"); + m_TestOptions.add_option("", + "", + "allow-partial-block-requests", + "Allow request for partial chunk blocks. Defaults to true.", + cxxopts::value(m_AllowPartialBlockRequests), + "<allowpartialblockrequests>"); m_TestOptions.parse_positional({"local-path"}); m_TestOptions.positional_help("local-path"); @@ -6037,7 +6469,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) m_Bucket, GeneratedBuildId ? "Generated " : "", BuildId); - CreateDirectories(m_Path / ZenTempStorageFolderName); Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName); StorageName = "Cloud DDC"; } @@ -6175,7 +6606,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) m_Namespace, m_Bucket, BuildId); - CreateDirectories(m_Path / ZenTempStorageFolderName); Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName); StorageName = "Cloud DDC"; } @@ -6190,7 +6620,15 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); } - DownloadFolder(*Storage, BuildId, BuildPartIds, m_BuildPartNames, m_Path, m_AllowMultiparts, m_Clean, m_PostDownloadVerify); + DownloadFolder(*Storage, + BuildId, + BuildPartIds, + m_BuildPartNames, + m_Path, + m_AllowMultiparts, + m_AllowPartialBlockRequests, + m_Clean, + m_PostDownloadVerify); if (false) { @@ -6275,7 +6713,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) m_Namespace, m_Bucket, BuildId); - CreateDirectories(m_Path / ZenTempStorageFolderName); Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName); StorageName = "Cloud DDC"; } @@ -6335,7 +6772,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) const std::filesystem::path DownloadPath = m_Path.parent_path() / (m_BuildPartName + "_download"); ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}'", BuildId, BuildPartId, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, true, true); + DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, true, true); if (AbortFlag) { ZEN_CONSOLE("Download failed."); @@ -6347,7 +6784,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) BuildPartId, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false, true); + DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true); if (AbortFlag) { ZEN_CONSOLE("Re-download failed. (identical target)"); @@ -6449,7 +6886,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) BuildPartId, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false, true); + DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true); if (AbortFlag) { ZEN_CONSOLE("Re-download failed. (scrambled target)"); @@ -6487,7 +6924,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (original)", BuildId, BuildPartId, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false, true); + DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true); if (AbortFlag) { ZEN_CONSOLE("Re-download failed."); @@ -6495,7 +6932,15 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, BuildId2, {BuildPartId2}, {}, DownloadPath, m_AllowMultiparts, false, true); + DownloadFolder(*Storage, + BuildId2, + {BuildPartId2}, + {}, + DownloadPath, + m_AllowMultiparts, + m_AllowPartialBlockRequests, + false, + true); if (AbortFlag) { ZEN_CONSOLE("Re-download failed."); @@ -6503,7 +6948,15 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } ZEN_CONSOLE("\nRe-download Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, BuildId2, {BuildPartId2}, {}, DownloadPath, m_AllowMultiparts, false, true); + DownloadFolder(*Storage, + BuildId2, + {BuildPartId2}, + {}, + DownloadPath, + m_AllowMultiparts, + m_AllowPartialBlockRequests, + false, + true); if (AbortFlag) { ZEN_CONSOLE("Re-download failed."); @@ -6549,7 +7002,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) m_Namespace, m_Bucket, BuildId); - CreateDirectories(m_Path / ZenTempStorageFolderName); Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName); StorageName = "Cloud DDC"; } @@ -6617,7 +7069,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) m_Namespace, m_Bucket, BuildId); - CreateDirectories(m_Path / ZenTempStorageFolderName); Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName); StorageName = "Cloud DDC"; } diff --git a/src/zen/cmds/builds_cmd.h b/src/zen/cmds/builds_cmd.h index c54fb4db1..838a17807 100644 --- a/src/zen/cmds/builds_cmd.h +++ b/src/zen/cmds/builds_cmd.h @@ -49,6 +49,7 @@ private: bool m_Clean = false; uint8_t m_BlockReuseMinPercentLimit = 85; bool m_AllowMultiparts = true; + bool m_AllowPartialBlockRequests = true; std::filesystem::path m_ManifestPath; // Direct access token (may expire) diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp index e4c6d243d..30711a432 100644 --- a/src/zenhttp/httpclient.cpp +++ b/src/zenhttp/httpclient.cpp @@ -1148,6 +1148,30 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold return true; }; + uint64_t RequestedContentLength = (uint64_t)-1; + if (auto RangeIt = AdditionalHeader.Entries.find("Range"); RangeIt != AdditionalHeader.Entries.end()) + { + if (RangeIt->second.starts_with("bytes")) + { + size_t RangeStartPos = RangeIt->second.find('=', 5); + if (RangeStartPos != std::string::npos) + { + RangeStartPos++; + size_t RangeSplitPos = RangeIt->second.find('-', RangeStartPos); + if (RangeSplitPos != std::string::npos) + { + std::optional<size_t> RequestedRangeStart = + ParseInt<size_t>(RangeIt->second.substr(RangeStartPos, RangeSplitPos - RangeStartPos)); + std::optional<size_t> RequestedRangeEnd = ParseInt<size_t>(RangeIt->second.substr(RangeStartPos + 1)); + if (RequestedRangeStart.has_value() && RequestedRangeEnd.has_value()) + { + RequestedContentLength = RequestedRangeEnd.value() - 1; + } + } + } + } + } + cpr::Response Response; { std::vector<std::pair<std::string, std::string>> ReceivedHeaders; @@ -1155,10 +1179,10 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold std::pair<std::string, std::string> Header = GetHeader(header); if (Header.first == "Content-Length"sv) { - std::optional<size_t> ContentSize = ParseInt<size_t>(Header.second); - if (ContentSize.has_value()) + std::optional<size_t> ContentLength = ParseInt<size_t>(Header.second); + if (ContentLength.has_value()) { - if (ContentSize.value() > 1024 * 1024) + if (ContentLength.value() > 1024 * 1024) { PayloadFile = std::make_unique<detail::TempPayloadFile>(); std::error_code Ec = PayloadFile->Open(TempFolderPath); @@ -1172,7 +1196,7 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold } else { - PayloadString.reserve(ContentSize.value()); + PayloadString.reserve(ContentLength.value()); } } } @@ -1218,85 +1242,90 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold auto It = Response.header.find("Content-Length"); if (It != Response.header.end()) { - std::optional<int64_t> ContentLength = ParseInt<int64_t>(It->second); - if (ContentLength) - { - std::vector<std::pair<std::string, std::string>> ReceivedHeaders; + std::vector<std::pair<std::string, std::string>> ReceivedHeaders; - auto HeaderCallback = [&](std::string header, intptr_t) { - std::pair<std::string, std::string> Header = GetHeader(header); - if (!Header.first.empty()) - { - ReceivedHeaders.emplace_back(std::move(Header)); - } + auto HeaderCallback = [&](std::string header, intptr_t) { + std::pair<std::string, std::string> Header = GetHeader(header); + if (!Header.first.empty()) + { + ReceivedHeaders.emplace_back(std::move(Header)); + } - if (Header.first == "Content-Range"sv) + if (Header.first == "Content-Range"sv) + { + if (Header.second.starts_with("bytes "sv)) { - if (Header.second.starts_with("bytes "sv)) + size_t RangeStartEnd = Header.second.find('-', 6); + if (RangeStartEnd != std::string::npos) { - size_t RangeStartEnd = Header.second.find('-', 6); - if (RangeStartEnd != std::string::npos) + const auto Start = ParseInt<uint64_t>(Header.second.substr(6, RangeStartEnd - 6)); + if (Start) { - const auto Start = ParseInt<uint64_t>(Header.second.substr(6, RangeStartEnd - 6)); - if (Start) + uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length(); + if (Start.value() == DownloadedSize) { - uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length(); - if (Start.value() == DownloadedSize) - { - return 1; - } - else if (Start.value() > DownloadedSize) - { - return 0; - } - if (PayloadFile) - { - PayloadFile->ResetWritePos(Start.value()); - } - else - { - PayloadString = PayloadString.substr(0, Start.value()); - } return 1; } + else if (Start.value() > DownloadedSize) + { + return 0; + } + if (PayloadFile) + { + PayloadFile->ResetWritePos(Start.value()); + } + else + { + PayloadString = PayloadString.substr(0, Start.value()); + } + return 1; } } - return 0; } - return 1; - }; + return 0; + } + return 1; + }; - KeyValueMap HeadersWithRange(AdditionalHeader); - do - { - uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length(); + KeyValueMap HeadersWithRange(AdditionalHeader); + do + { + uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length(); - std::string Range = fmt::format("bytes={}-{}", DownloadedSize, DownloadedSize + ContentLength.value() - 1); - if (auto RangeIt = HeadersWithRange.Entries.find("Range"); RangeIt != HeadersWithRange.Entries.end()) + uint64_t ContentLength = RequestedContentLength; + if (ContentLength == uint64_t(-1)) + { + if (auto ParsedContentLength = ParseInt<int64_t>(It->second); ParsedContentLength.has_value()) { - if (RangeIt->second == Range) - { - // If we didn't make any progress, abort - break; - } + ContentLength = ParsedContentLength.value(); } - HeadersWithRange.Entries.insert_or_assign("Range", Range); - - Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, - Url, - m_ConnectionSettings, - HeadersWithRange, - {}, - m_SessionId, - GetAccessToken()); - Response = Sess.Download(cpr::WriteCallback{DownloadCallback}, cpr::HeaderCallback{HeaderCallback}); - for (const std::pair<std::string, std::string>& H : ReceivedHeaders) + } + + std::string Range = fmt::format("bytes={}-{}", DownloadedSize, DownloadedSize + ContentLength - 1); + if (auto RangeIt = HeadersWithRange.Entries.find("Range"); RangeIt != HeadersWithRange.Entries.end()) + { + if (RangeIt->second == Range) { - Response.header.insert_or_assign(H.first, H.second); + // If we didn't make any progress, abort + break; } - ReceivedHeaders.clear(); - } while (ShouldResume(Response)); - } + } + HeadersWithRange.Entries.insert_or_assign("Range", Range); + + Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, + Url, + m_ConnectionSettings, + HeadersWithRange, + {}, + m_SessionId, + GetAccessToken()); + Response = Sess.Download(cpr::WriteCallback{DownloadCallback}, cpr::HeaderCallback{HeaderCallback}); + for (const std::pair<std::string, std::string>& H : ReceivedHeaders) + { + Response.header.insert_or_assign(H.first, H.second); + } + ReceivedHeaders.clear(); + } while (ShouldResume(Response)); } } } diff --git a/src/zenutil/filebuildstorage.cpp b/src/zenutil/filebuildstorage.cpp index a4bb759e7..e57109006 100644 --- a/src/zenutil/filebuildstorage.cpp +++ b/src/zenutil/filebuildstorage.cpp @@ -325,7 +325,7 @@ public: return {}; } - virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash) override + virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override { ZEN_UNUSED(BuildId); SimulateLatency(0, 0); @@ -337,10 +337,19 @@ public: if (std::filesystem::is_regular_file(BlockPath)) { BasicFile File(BlockPath, BasicFile::Mode::kRead); - IoBuffer Payload = File.ReadAll(); - ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, CompositeBuffer(SharedBuffer(Payload)))); - m_Stats.TotalBytesRead += Payload.GetSize(); + IoBuffer Payload; + if (RangeOffset != 0 || RangeBytes != (uint64_t)-1) + { + Payload = IoBuffer(RangeBytes); + File.Read(Payload.GetMutableView().GetData(), RangeBytes, RangeOffset); + } + else + { + Payload = File.ReadAll(); + ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, CompositeBuffer(SharedBuffer(Payload)))); + } Payload.SetContentType(ZenContentType::kCompressedBinary); + m_Stats.TotalBytesRead += Payload.GetSize(); SimulateLatency(0, Payload.GetSize()); return Payload; } diff --git a/src/zenutil/include/zenutil/buildstorage.h b/src/zenutil/include/zenutil/buildstorage.h index 9c236310f..9d2bab170 100644 --- a/src/zenutil/include/zenutil/buildstorage.h +++ b/src/zenutil/include/zenutil/buildstorage.h @@ -40,7 +40,10 @@ public: std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter, std::function<void(uint64_t, bool)>&& OnSentBytes) = 0; - virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash) = 0; + virtual IoBuffer GetBuildBlob(const Oid& BuildId, + const IoHash& RawHash, + uint64_t RangeOffset = 0, + uint64_t RangeBytes = (uint64_t)-1) = 0; virtual std::vector<std::function<void()>> GetLargeBuildBlob( const Oid& BuildId, const IoHash& RawHash, diff --git a/src/zenutil/include/zenutil/jupiter/jupitersession.h b/src/zenutil/include/zenutil/jupiter/jupitersession.h index 852271868..2c5fc73b8 100644 --- a/src/zenutil/include/zenutil/jupiter/jupitersession.h +++ b/src/zenutil/include/zenutil/jupiter/jupitersession.h @@ -123,7 +123,9 @@ public: std::string_view BucketId, const Oid& BuildId, const IoHash& Hash, - std::filesystem::path TempFolderPath); + std::filesystem::path TempFolderPath, + uint64_t Offset = 0, + uint64_t Size = (uint64_t)-1); JupiterResult PutMultipartBuildBlob(std::string_view Namespace, std::string_view BucketId, diff --git a/src/zenutil/jupiter/jupiterbuildstorage.cpp b/src/zenutil/jupiter/jupiterbuildstorage.cpp index 309885b05..bf89ce785 100644 --- a/src/zenutil/jupiter/jupiterbuildstorage.cpp +++ b/src/zenutil/jupiter/jupiterbuildstorage.cpp @@ -217,13 +217,15 @@ public: return WorkList; } - virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash) override + virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override { ZEN_TRACE_CPU("Jupiter::GetBuildBlob"); - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - JupiterResult GetBuildBlobResult = m_Session.GetBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, m_TempFolderPath); + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + CreateDirectories(m_TempFolderPath); + JupiterResult GetBuildBlobResult = + m_Session.GetBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, m_TempFolderPath, RangeOffset, RangeBytes); AddStatistic(GetBuildBlobResult); if (!GetBuildBlobResult.Success) { diff --git a/src/zenutil/jupiter/jupitersession.cpp b/src/zenutil/jupiter/jupitersession.cpp index 06ac6ae36..68f214c06 100644 --- a/src/zenutil/jupiter/jupitersession.cpp +++ b/src/zenutil/jupiter/jupitersession.cpp @@ -698,11 +698,19 @@ JupiterSession::GetBuildBlob(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoHash& Hash, - std::filesystem::path TempFolderPath) + std::filesystem::path TempFolderPath, + uint64_t Offset, + uint64_t Size) { + HttpClient::KeyValueMap Headers; + if (Offset != 0 || Size != (uint64_t)-1) + { + Headers.Entries.insert({"Range", fmt::format("bytes={}-{}", Offset, Offset + Size - 1)}); + } HttpClient::Response Response = m_HttpClient.Download(fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString()), - TempFolderPath); + TempFolderPath, + Headers); return detail::ConvertResponse(Response, "JupiterSession::GetBuildBlob"sv); } |