diff options
| author | Dan Engelbrecht <[email protected]> | 2025-05-20 18:43:41 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-05-20 18:43:41 +0200 |
| commit | 47cba21817608be9232e9d0a55bf01a313d4be2c (patch) | |
| tree | d6fdc355af5c4b6f4a494b70dbb43ab4db9cf4d7 /src | |
| parent | Merge pull request #402 from ue-foundation/zs/fix-oplog-import-oplog-creation (diff) | |
| download | zen-47cba21817608be9232e9d0a55bf01a313d4be2c.tar.xz zen-47cba21817608be9232e9d0a55bf01a313d4be2c.zip | |
use explicit capture for lambdas (#404)
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 2103 | ||||
| -rw-r--r-- | src/zenutil/jupiter/jupitersession.cpp | 47 |
2 files changed, 1254 insertions, 896 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index 1a570d6da..d998ed3e8 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -1962,41 +1962,131 @@ namespace { for (const IoHash& ChunkAttachment : ChunkAttachments) { - Work.ScheduleWork(NetworkPool, [&, ChunkAttachment](std::atomic<bool>&) { - if (!AbortFlag) - { - ZEN_TRACE_CPU("ValidateBuildPart_GetChunk"); + Work.ScheduleWork(NetworkPool, + [&Storage, + &NetworkPool, + &VerifyPool, + &Work, + &DownloadStats, + &ValidateStats, + AttachmentsToVerifyCount, + &TempFolder, + BuildId = Oid(BuildId), + PreferredMultipartChunkSize, + &FilteredDownloadedBytesPerSecond, + &FilteredVerifiedBytesPerSecond, + ChunkAttachment](std::atomic<bool>&) { + if (!AbortFlag) + { + ZEN_TRACE_CPU("ValidateBuildPart_GetChunk"); + + FilteredDownloadedBytesPerSecond.Start(); + DownloadLargeBlob( + Storage, + TempFolder, + BuildId, + ChunkAttachment, + PreferredMultipartChunkSize, + Work, + NetworkPool, + DownloadStats, + [&Work, + &VerifyPool, + &DownloadStats, + &ValidateStats, + AttachmentsToVerifyCount, + &FilteredDownloadedBytesPerSecond, + &FilteredVerifiedBytesPerSecond, + ChunkHash = ChunkAttachment](IoBuffer&& Payload) { + Payload.SetContentType(ZenContentType::kCompressedBinary); + if (!AbortFlag) + { + Work.ScheduleWork( + VerifyPool, + [&DownloadStats, + &ValidateStats, + AttachmentsToVerifyCount, + &FilteredDownloadedBytesPerSecond, + &FilteredVerifiedBytesPerSecond, + Payload = std::move(Payload), + ChunkHash](std::atomic<bool>&) mutable { + if (!AbortFlag) + { + ZEN_TRACE_CPU("ValidateBuildPart_Validate"); + + if (DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount == + AttachmentsToVerifyCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + + FilteredVerifiedBytesPerSecond.Start(); + + uint64_t CompressedSize; + uint64_t DecompressedSize; + ValidateBlob(std::move(Payload), ChunkHash, CompressedSize, DecompressedSize); + ValidateStats.VerifiedAttachmentCount++; + ValidateStats.VerifiedByteCount += DecompressedSize; + if (ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount) + { + FilteredVerifiedBytesPerSecond.Stop(); + } + } + }); + } + }); + } + }); + } - FilteredDownloadedBytesPerSecond.Start(); - DownloadLargeBlob( - Storage, - TempFolder, - BuildId, - ChunkAttachment, - PreferredMultipartChunkSize, - Work, - NetworkPool, - DownloadStats, - [&, ChunkHash = ChunkAttachment](IoBuffer&& Payload) { - Payload.SetContentType(ZenContentType::kCompressedBinary); - if (!AbortFlag) - { - Work.ScheduleWork(VerifyPool, [&, Payload = std::move(Payload), ChunkHash](std::atomic<bool>&) mutable { + for (const IoHash& BlockAttachment : BlockAttachments) + { + Work.ScheduleWork( + NetworkPool, + [&Storage, + &BuildId, + &Work, + &VerifyPool, + &DownloadStats, + &ValidateStats, + AttachmentsToVerifyCount, + &FilteredDownloadedBytesPerSecond, + &FilteredVerifiedBytesPerSecond, + BlockAttachment](std::atomic<bool>&) { + if (!AbortFlag) + { + ZEN_TRACE_CPU("ValidateBuildPart_GetBlock"); + + FilteredDownloadedBytesPerSecond.Start(); + IoBuffer Payload = Storage.GetBuildBlob(BuildId, BlockAttachment); + DownloadStats.DownloadedBlockCount++; + DownloadStats.DownloadedBlockByteCount += Payload.GetSize(); + if (DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount == AttachmentsToVerifyCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + if (!Payload) + { + throw std::runtime_error(fmt::format("Block attachment {} could not be found", BlockAttachment)); + } + if (!AbortFlag) + { + Work.ScheduleWork( + VerifyPool, + [&FilteredVerifiedBytesPerSecond, + AttachmentsToVerifyCount, + &ValidateStats, + Payload = std::move(Payload), + BlockAttachment](std::atomic<bool>&) mutable { if (!AbortFlag) { - ZEN_TRACE_CPU("ValidateBuildPart_Validate"); - - if (DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount == - AttachmentsToVerifyCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } + ZEN_TRACE_CPU("ValidateBuildPart_ValidateBlock"); FilteredVerifiedBytesPerSecond.Start(); uint64_t CompressedSize; uint64_t DecompressedSize; - ValidateBlob(std::move(Payload), ChunkHash, CompressedSize, DecompressedSize); + ValidateChunkBlock(std::move(Payload), BlockAttachment, CompressedSize, DecompressedSize); ValidateStats.VerifiedAttachmentCount++; ValidateStats.VerifiedByteCount += DecompressedSize; if (ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount) @@ -2005,54 +2095,9 @@ namespace { } } }); - } - }); - } - }); - } - - for (const IoHash& BlockAttachment : BlockAttachments) - { - Work.ScheduleWork(NetworkPool, [&, BlockAttachment](std::atomic<bool>&) { - if (!AbortFlag) - { - ZEN_TRACE_CPU("ValidateBuildPart_GetBlock"); - - FilteredDownloadedBytesPerSecond.Start(); - IoBuffer Payload = Storage.GetBuildBlob(BuildId, BlockAttachment); - DownloadStats.DownloadedBlockCount++; - DownloadStats.DownloadedBlockByteCount += Payload.GetSize(); - if (DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount == AttachmentsToVerifyCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - if (!Payload) - { - throw std::runtime_error(fmt::format("Block attachment {} could not be found", BlockAttachment)); - } - if (!AbortFlag) - { - Work.ScheduleWork(VerifyPool, [&, Payload = std::move(Payload), BlockAttachment](std::atomic<bool>&) mutable { - if (!AbortFlag) - { - ZEN_TRACE_CPU("ValidateBuildPart_ValidateBlock"); - - FilteredVerifiedBytesPerSecond.Start(); - - uint64_t CompressedSize; - uint64_t DecompressedSize; - ValidateChunkBlock(std::move(Payload), BlockAttachment, CompressedSize, DecompressedSize); - ValidateStats.VerifiedAttachmentCount++; - ValidateStats.VerifiedByteCount += DecompressedSize; - if (ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount) - { - FilteredVerifiedBytesPerSecond.Stop(); - } - } - }); + } } - } - }); + }); } Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) { @@ -2321,134 +2366,163 @@ namespace { break; } const std::vector<uint32_t>& ChunksInBlock = NewBlockChunks[BlockIndex]; - Work.ScheduleWork(GenerateBlobsPool, [&, BlockIndex](std::atomic<bool>&) { - if (!AbortFlag) - { - ZEN_TRACE_CPU("GenerateBuildBlocks_Generate"); - - FilteredGeneratedBytesPerSecond.Start(); - // TODO: Convert ScheduleWork body to function - - Stopwatch GenerateTimer; - CompressedBuffer CompressedBlock = - GenerateBlock(Path, Content, Lookup, ChunksInBlock, OutBlocks.BlockDescriptions[BlockIndex], DiskStats); - ZEN_CONSOLE_VERBOSE("Generated block {} ({}) containing {} chunks in {}", - OutBlocks.BlockDescriptions[BlockIndex].BlockHash, - NiceBytes(CompressedBlock.GetCompressedSize()), - OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size(), - NiceTimeSpanMs(GenerateTimer.GetElapsedTimeMs())); - - OutBlocks.BlockSizes[BlockIndex] = CompressedBlock.GetCompressedSize(); + Work.ScheduleWork( + GenerateBlobsPool, + [&Storage, + BuildId, + &Path, + &Content, + &Lookup, + &Work, + &UploadBlocksPool, + NewBlockCount, + ChunksInBlock, + &Lock, + &OutBlocks, + &DiskStats, + &GenerateBlocksStats, + &UploadStats, + &FilteredGeneratedBytesPerSecond, + &QueuedPendingBlocksForUpload, + &FilteredUploadedBytesPerSecond, + BlockIndex](std::atomic<bool>&) { + if (!AbortFlag) { - CbObjectWriter Writer; - Writer.AddString("createdBy", "zen"); - OutBlocks.BlockMetaDatas[BlockIndex] = Writer.Save(); - } - GenerateBlocksStats.GeneratedBlockByteCount += OutBlocks.BlockSizes[BlockIndex]; - GenerateBlocksStats.GeneratedBlockCount++; + ZEN_TRACE_CPU("GenerateBuildBlocks_Generate"); + + FilteredGeneratedBytesPerSecond.Start(); + // TODO: Convert ScheduleWork body to function + + Stopwatch GenerateTimer; + CompressedBuffer CompressedBlock = + GenerateBlock(Path, Content, Lookup, ChunksInBlock, OutBlocks.BlockDescriptions[BlockIndex], DiskStats); + ZEN_CONSOLE_VERBOSE("Generated block {} ({}) containing {} chunks in {}", + OutBlocks.BlockDescriptions[BlockIndex].BlockHash, + NiceBytes(CompressedBlock.GetCompressedSize()), + OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size(), + NiceTimeSpanMs(GenerateTimer.GetElapsedTimeMs())); - Lock.WithExclusiveLock([&]() { - OutBlocks.BlockHashToBlockIndex.insert_or_assign(OutBlocks.BlockDescriptions[BlockIndex].BlockHash, BlockIndex); - }); + OutBlocks.BlockSizes[BlockIndex] = CompressedBlock.GetCompressedSize(); + { + CbObjectWriter Writer; + Writer.AddString("createdBy", "zen"); + OutBlocks.BlockMetaDatas[BlockIndex] = Writer.Save(); + } + GenerateBlocksStats.GeneratedBlockByteCount += OutBlocks.BlockSizes[BlockIndex]; + GenerateBlocksStats.GeneratedBlockCount++; - { - std::span<const SharedBuffer> Segments = CompressedBlock.GetCompressed().GetSegments(); - ZEN_ASSERT(Segments.size() >= 2); - OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]); - } + Lock.WithExclusiveLock([&]() { + OutBlocks.BlockHashToBlockIndex.insert_or_assign(OutBlocks.BlockDescriptions[BlockIndex].BlockHash, + BlockIndex); + }); - if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount) - { - FilteredGeneratedBytesPerSecond.Stop(); - } + { + std::span<const SharedBuffer> Segments = CompressedBlock.GetCompressed().GetSegments(); + ZEN_ASSERT(Segments.size() >= 2); + OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]); + } - if (QueuedPendingBlocksForUpload.load() > 16) - { - std::span<const SharedBuffer> Segments = CompressedBlock.GetCompressed().GetSegments(); - ZEN_ASSERT(Segments.size() >= 2); - OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]); - } - else - { - if (!AbortFlag) + if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount) { - QueuedPendingBlocksForUpload++; + FilteredGeneratedBytesPerSecond.Stop(); + } - Work.ScheduleWork( - UploadBlocksPool, - [&, BlockIndex, Payload = std::move(CompressedBlock)](std::atomic<bool>&) mutable { - auto _ = MakeGuard([&QueuedPendingBlocksForUpload] { QueuedPendingBlocksForUpload--; }); - if (!AbortFlag) - { - if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount) - { - ZEN_TRACE_CPU("GenerateBuildBlocks_Save"); + if (QueuedPendingBlocksForUpload.load() > 16) + { + std::span<const SharedBuffer> Segments = CompressedBlock.GetCompressed().GetSegments(); + ZEN_ASSERT(Segments.size() >= 2); + OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]); + } + else + { + if (!AbortFlag) + { + QueuedPendingBlocksForUpload++; - FilteredUploadedBytesPerSecond.Stop(); - std::span<const SharedBuffer> Segments = Payload.GetCompressed().GetSegments(); - ZEN_ASSERT(Segments.size() >= 2); - OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]); - } - else + Work.ScheduleWork( + UploadBlocksPool, + [&Storage, + BuildId, + NewBlockCount, + &FilteredUploadedBytesPerSecond, + &QueuedPendingBlocksForUpload, + &GenerateBlocksStats, + &UploadStats, + &OutBlocks, + BlockIndex, + Payload = std::move(CompressedBlock)](std::atomic<bool>&) mutable { + auto _ = MakeGuard([&QueuedPendingBlocksForUpload] { QueuedPendingBlocksForUpload--; }); + if (!AbortFlag) { - ZEN_TRACE_CPU("GenerateBuildBlocks_Upload"); + if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount) + { + ZEN_TRACE_CPU("GenerateBuildBlocks_Save"); - FilteredUploadedBytesPerSecond.Start(); - // TODO: Convert ScheduleWork body to function + FilteredUploadedBytesPerSecond.Stop(); + std::span<const SharedBuffer> Segments = Payload.GetCompressed().GetSegments(); + ZEN_ASSERT(Segments.size() >= 2); + OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]); + } + else + { + ZEN_TRACE_CPU("GenerateBuildBlocks_Upload"); - const CbObject BlockMetaData = - BuildChunkBlockDescription(OutBlocks.BlockDescriptions[BlockIndex], - OutBlocks.BlockMetaDatas[BlockIndex]); + FilteredUploadedBytesPerSecond.Start(); + // TODO: Convert ScheduleWork body to function - const IoHash& BlockHash = OutBlocks.BlockDescriptions[BlockIndex].BlockHash; - const uint64_t CompressedBlockSize = Payload.GetCompressedSize(); + const CbObject BlockMetaData = + BuildChunkBlockDescription(OutBlocks.BlockDescriptions[BlockIndex], + OutBlocks.BlockMetaDatas[BlockIndex]); - if (Storage.BuildCacheStorage) - { - Storage.BuildCacheStorage->PutBuildBlob(BuildId, - BlockHash, - ZenContentType::kCompressedBinary, - Payload.GetCompressed()); - } + const IoHash& BlockHash = OutBlocks.BlockDescriptions[BlockIndex].BlockHash; + const uint64_t CompressedBlockSize = Payload.GetCompressedSize(); + + if (Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->PutBuildBlob(BuildId, + BlockHash, + ZenContentType::kCompressedBinary, + Payload.GetCompressed()); + } - Storage.BuildStorage->PutBuildBlob(BuildId, - BlockHash, - ZenContentType::kCompressedBinary, - std::move(Payload).GetCompressed()); - UploadStats.BlocksBytes += CompressedBlockSize; + Storage.BuildStorage->PutBuildBlob(BuildId, + BlockHash, + ZenContentType::kCompressedBinary, + std::move(Payload).GetCompressed()); + UploadStats.BlocksBytes += CompressedBlockSize; - ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks", - BlockHash, - NiceBytes(CompressedBlockSize), - OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); + ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks", + BlockHash, + NiceBytes(CompressedBlockSize), + OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); - if (Storage.BuildCacheStorage) - { - Storage.BuildCacheStorage->PutBlobMetadatas(BuildId, - std::vector<IoHash>({BlockHash}), - std::vector<CbObject>({BlockMetaData})); - } + if (Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->PutBlobMetadatas(BuildId, + std::vector<IoHash>({BlockHash}), + std::vector<CbObject>({BlockMetaData})); + } - Storage.BuildStorage->PutBlockMetadata(BuildId, BlockHash, BlockMetaData); - ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})", - BlockHash, - NiceBytes(BlockMetaData.GetSize())); + Storage.BuildStorage->PutBlockMetadata(BuildId, BlockHash, BlockMetaData); + ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})", + BlockHash, + NiceBytes(BlockMetaData.GetSize())); - OutBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; + OutBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; - UploadStats.BlocksBytes += BlockMetaData.GetSize(); - UploadStats.BlockCount++; - if (UploadStats.BlockCount == NewBlockCount) - { - FilteredUploadedBytesPerSecond.Stop(); + UploadStats.BlocksBytes += BlockMetaData.GetSize(); + UploadStats.BlockCount++; + if (UploadStats.BlockCount == NewBlockCount) + { + FilteredUploadedBytesPerSecond.Stop(); + } } } - } - }); + }); + } } } - } - }); + }); } Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) { @@ -2558,10 +2632,22 @@ namespace { const size_t UploadBlockCount = BlockIndexes.size(); const uint32_t UploadChunkCount = gsl::narrow<uint32_t>(LooseChunkOrderIndexes.size()); - auto AsyncUploadBlock = [&](const size_t BlockIndex, - const IoHash BlockHash, - CompositeBuffer&& Payload, - std::atomic<uint64_t>& QueuedPendingInMemoryBlocksForUpload) { + auto AsyncUploadBlock = [&Storage, + &BuildId, + &Work, + &ZenFolderPath, + &NewBlocks, + UploadBlockCount, + &UploadedBlockCount, + UploadChunkCount, + &UploadedChunkCount, + &UploadedBlockSize, + &UploadStats, + &FilteredUploadedBytesPerSecond, + &UploadChunkPool](const size_t BlockIndex, + const IoHash BlockHash, + CompositeBuffer&& Payload, + std::atomic<uint64_t>& QueuedPendingInMemoryBlocksForUpload) { bool IsInMemoryBlock = true; if (QueuedPendingInMemoryBlocksForUpload.load() > 16) { @@ -2576,7 +2662,21 @@ namespace { Work.ScheduleWork( UploadChunkPool, - [&, IsInMemoryBlock, BlockIndex, BlockHash, Payload = std::move(Payload)](std::atomic<bool>&) mutable { + [&Storage, + &BuildId, + &QueuedPendingInMemoryBlocksForUpload, + &NewBlocks, + UploadBlockCount, + &UploadedBlockCount, + UploadChunkCount, + &UploadedChunkCount, + &UploadedBlockSize, + &UploadStats, + &FilteredUploadedBytesPerSecond, + IsInMemoryBlock, + BlockIndex, + BlockHash, + Payload = std::move(Payload)](std::atomic<bool>&) mutable { auto _ = MakeGuard([IsInMemoryBlock, &QueuedPendingInMemoryBlocksForUpload] { if (IsInMemoryBlock) { @@ -2628,10 +2728,37 @@ namespace { }); }; - auto AsyncUploadLooseChunk = [&](const IoHash& RawHash, uint64_t RawSize, CompositeBuffer&& Payload) { + auto AsyncUploadLooseChunk = [&Storage, + BuildId, + LargeAttachmentSize, + &Work, + &UploadChunkPool, + &FilteredUploadedBytesPerSecond, + &UploadedBlockCount, + &UploadedChunkCount, + UploadBlockCount, + UploadChunkCount, + &UploadedCompressedChunkSize, + &UploadedRawChunkSize, + &UploadStats](const IoHash& RawHash, uint64_t RawSize, CompositeBuffer&& Payload) { Work.ScheduleWork( UploadChunkPool, - [&, RawHash, RawSize, Payload = CompositeBuffer(std::move(Payload))](std::atomic<bool>&) mutable { + [&Storage, + BuildId, + &Work, + LargeAttachmentSize, + &FilteredUploadedBytesPerSecond, + &UploadChunkPool, + &UploadedBlockCount, + &UploadedChunkCount, + UploadBlockCount, + UploadChunkCount, + &UploadedCompressedChunkSize, + &UploadedRawChunkSize, + &UploadStats, + RawHash, + RawSize, + Payload = CompositeBuffer(std::move(Payload))](std::atomic<bool>&) mutable { if (!AbortFlag) { ZEN_TRACE_CPU("AsyncUploadLooseChunk"); @@ -2660,7 +2787,16 @@ namespace { PartPayload.SetContentType(ZenContentType::kBinary); return PartPayload; }, - [&, RawSize](uint64_t SentBytes, bool IsComplete) { + [RawSize, + &UploadStats, + &UploadedCompressedChunkSize, + &UploadChunkPool, + &UploadedBlockCount, + UploadBlockCount, + &UploadedChunkCount, + UploadChunkCount, + &FilteredUploadedBytesPerSecond, + &UploadedRawChunkSize](uint64_t SentBytes, bool IsComplete) { UploadStats.ChunksBytes += SentBytes; UploadedCompressedChunkSize += SentBytes; if (IsComplete) @@ -2718,56 +2854,71 @@ namespace { const IoHash& BlockHash = NewBlocks.BlockDescriptions[BlockIndex].BlockHash; if (!AbortFlag) { - Work.ScheduleWork(ReadChunkPool, [&, BlockIndex](std::atomic<bool>&) { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UploadPartBlobs_GenerateBlock"); + Work.ScheduleWork( + ReadChunkPool, + [BlockHash = IoHash(BlockHash), + BlockIndex, + &FilteredGenerateBlockBytesPerSecond, + Path, + &Content, + &Lookup, + &NewBlocks, + &NewBlockChunks, + &GenerateBlockIndexes, + &GeneratedBlockCount, + &GeneratedBlockByteCount, + &DiskStats, + &AsyncUploadBlock, + &QueuedPendingInMemoryBlocksForUpload](std::atomic<bool>&) { + if (!AbortFlag) + { + ZEN_TRACE_CPU("UploadPartBlobs_GenerateBlock"); - FilteredGenerateBlockBytesPerSecond.Start(); + FilteredGenerateBlockBytesPerSecond.Start(); - Stopwatch GenerateTimer; - CompositeBuffer Payload; - if (NewBlocks.BlockHeaders[BlockIndex]) - { - Payload = RebuildBlock(Path, - Content, - Lookup, - std::move(NewBlocks.BlockHeaders[BlockIndex]), - NewBlockChunks[BlockIndex], - DiskStats) - .GetCompressed(); - } - else - { - ChunkBlockDescription BlockDescription; - CompressedBuffer CompressedBlock = - GenerateBlock(Path, Content, Lookup, NewBlockChunks[BlockIndex], BlockDescription, DiskStats); - if (!CompressedBlock) + Stopwatch GenerateTimer; + CompositeBuffer Payload; + if (NewBlocks.BlockHeaders[BlockIndex]) { - throw std::runtime_error(fmt::format("Failed generating block {}", BlockHash)); + Payload = RebuildBlock(Path, + Content, + Lookup, + std::move(NewBlocks.BlockHeaders[BlockIndex]), + NewBlockChunks[BlockIndex], + DiskStats) + .GetCompressed(); + } + else + { + ChunkBlockDescription BlockDescription; + CompressedBuffer CompressedBlock = + GenerateBlock(Path, Content, Lookup, NewBlockChunks[BlockIndex], BlockDescription, DiskStats); + if (!CompressedBlock) + { + throw std::runtime_error(fmt::format("Failed generating block {}", BlockHash)); + } + ZEN_ASSERT(BlockDescription.BlockHash == BlockHash); + Payload = std::move(CompressedBlock).GetCompressed(); } - ZEN_ASSERT(BlockDescription.BlockHash == BlockHash); - Payload = std::move(CompressedBlock).GetCompressed(); - } - GeneratedBlockByteCount += NewBlocks.BlockSizes[BlockIndex]; - GeneratedBlockCount++; - if (GeneratedBlockCount == GenerateBlockIndexes.size()) - { - FilteredGenerateBlockBytesPerSecond.Stop(); - } - ZEN_CONSOLE_VERBOSE("{} block {} ({}) containing {} chunks in {}", - NewBlocks.BlockHeaders[BlockIndex] ? "Regenerated" : "Generated", - NewBlocks.BlockDescriptions[BlockIndex].BlockHash, - NiceBytes(NewBlocks.BlockSizes[BlockIndex]), - NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size(), - NiceTimeSpanMs(GenerateTimer.GetElapsedTimeMs())); - if (!AbortFlag) - { - AsyncUploadBlock(BlockIndex, BlockHash, std::move(Payload), QueuedPendingInMemoryBlocksForUpload); + GeneratedBlockByteCount += NewBlocks.BlockSizes[BlockIndex]; + GeneratedBlockCount++; + if (GeneratedBlockCount == GenerateBlockIndexes.size()) + { + FilteredGenerateBlockBytesPerSecond.Stop(); + } + ZEN_CONSOLE_VERBOSE("{} block {} ({}) containing {} chunks in {}", + NewBlocks.BlockHeaders[BlockIndex] ? "Regenerated" : "Generated", + NewBlocks.BlockDescriptions[BlockIndex].BlockHash, + NiceBytes(NewBlocks.BlockSizes[BlockIndex]), + NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size(), + NiceTimeSpanMs(GenerateTimer.GetElapsedTimeMs())); + if (!AbortFlag) + { + AsyncUploadBlock(BlockIndex, BlockHash, std::move(Payload), QueuedPendingInMemoryBlocksForUpload); + } } - } - }); + }); } } @@ -2775,32 +2926,43 @@ namespace { for (const uint32_t LooseChunkOrderIndex : LooseChunkOrderIndexes) { const uint32_t ChunkIndex = LooseChunkIndexes[LooseChunkOrderIndex]; - Work.ScheduleWork(ReadChunkPool, [&, ChunkIndex](std::atomic<bool>&) { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UploadPartBlobs_CompressChunk"); - - FilteredCompressedBytesPerSecond.Start(); - Stopwatch CompressTimer; - CompositeBuffer Payload = - CompressChunk(Path, Content, Lookup, ChunkIndex, ZenTempChunkFolderPath(ZenFolderPath), LooseChunksStats); - ZEN_CONSOLE_VERBOSE("Compressed chunk {} ({} -> {}) in {}", - Content.ChunkedContent.ChunkHashes[ChunkIndex], - NiceBytes(Content.ChunkedContent.ChunkRawSizes[ChunkIndex]), - NiceBytes(Payload.GetSize()), - NiceTimeSpanMs(CompressTimer.GetElapsedTimeMs())); - const uint64_t ChunkRawSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; - UploadStats.ReadFromDiskBytes += ChunkRawSize; - if (LooseChunksStats.CompressedChunkCount == LooseChunkOrderIndexes.size()) - { - FilteredCompressedBytesPerSecond.Stop(); - } + Work.ScheduleWork( + ReadChunkPool, + [&Path, + &Content, + &Lookup, + &ZenFolderPath, + &LooseChunksStats, + &LooseChunkOrderIndexes, + &FilteredCompressedBytesPerSecond, + &UploadStats, + &AsyncUploadLooseChunk, + ChunkIndex](std::atomic<bool>&) { if (!AbortFlag) { - AsyncUploadLooseChunk(Content.ChunkedContent.ChunkHashes[ChunkIndex], ChunkRawSize, std::move(Payload)); + ZEN_TRACE_CPU("UploadPartBlobs_CompressChunk"); + + FilteredCompressedBytesPerSecond.Start(); + Stopwatch CompressTimer; + CompositeBuffer Payload = + CompressChunk(Path, Content, Lookup, ChunkIndex, ZenTempChunkFolderPath(ZenFolderPath), LooseChunksStats); + ZEN_CONSOLE_VERBOSE("Compressed chunk {} ({} -> {}) in {}", + Content.ChunkedContent.ChunkHashes[ChunkIndex], + NiceBytes(Content.ChunkedContent.ChunkRawSizes[ChunkIndex]), + NiceBytes(Payload.GetSize()), + NiceTimeSpanMs(CompressTimer.GetElapsedTimeMs())); + const uint64_t ChunkRawSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; + UploadStats.ReadFromDiskBytes += ChunkRawSize; + if (LooseChunksStats.CompressedChunkCount == LooseChunkOrderIndexes.size()) + { + FilteredCompressedBytesPerSecond.Stop(); + } + if (!AbortFlag) + { + AsyncUploadLooseChunk(Content.ChunkedContent.ChunkHashes[ChunkIndex], ChunkRawSize, std::move(Payload)); + } } - } - }); + }); } Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) { @@ -3649,7 +3811,19 @@ namespace { PutBuildPartResult.second.size()); IoHash PartHash = PutBuildPartResult.first; - auto UploadAttachments = [&](std::span<IoHash> RawHashes) { + auto UploadAttachments = [&Storage, + &BuildId, + &Path, + &ZenFolderPath, + &LocalContent, + &LocalLookup, + &NewBlockChunks, + &NewBlocks, + &LooseChunkIndexes, + &LargeAttachmentSize, + &DiskStats, + &UploadStats, + &LooseChunksStats](std::span<IoHash> RawHashes) { if (!AbortFlag) { UploadStatistics TempUploadStats; @@ -4089,7 +4263,8 @@ namespace { Work.ScheduleWork( VerifyPool, - [&, PathIndex](std::atomic<bool>&) { + [&Path, &Content, &Lookup, &ErrorLock, &Errors, &VerifyFolderStats, VerifyFileHash, &IsAcceptedFolder, PathIndex]( + std::atomic<bool>&) { if (!AbortFlag) { ZEN_TRACE_CPU("VerifyFile_work"); @@ -5048,11 +5223,18 @@ namespace { Work.ScheduleWork( WritePool, - [&, + [&ZenFolderPath, + &RemoteContent, + &RemoteLookup, SequenceIndexChunksLeftToWriteCounters, + &Work, + &WritePool, CompressedChunkPath, RemoteChunkIndex, TotalPartWriteCount, + &DiskStats, + &WritePartsComplete, + &FilteredWrittenBytesPerSecond, ChunkTargetPtrs = std::move(ChunkTargetPtrs), CompressedPart = std::move(Payload)](std::atomic<bool>&) mutable { ZEN_TRACE_CPU("UpdateFolder_WriteChunk"); @@ -6215,40 +6397,51 @@ namespace { } if (!PrimeCacheOnly) { - Work.ScheduleWork(WritePool, [&, ScavengeOpIndex](std::atomic<bool>&) mutable { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_WriteScavanged"); + Work.ScheduleWork( + WritePool, + [&RemoteContent, + &CacheFolderPath, + &ScavengedPaths, + &ScavengeCopyOperations, + &ScavengedContents, + &FilteredWrittenBytesPerSecond, + ScavengeOpIndex, + &WritePartsComplete, + TotalPartWriteCount, + &DiskStats](std::atomic<bool>&) mutable { + if (!AbortFlag) + { + ZEN_TRACE_CPU("UpdateFolder_WriteScavanged"); - FilteredWrittenBytesPerSecond.Start(); + FilteredWrittenBytesPerSecond.Start(); - const ScavengeCopyOperation& ScavengeOp = ScavengeCopyOperations[ScavengeOpIndex]; - const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengeOp.ScavengedContentIndex]; - const std::filesystem::path ScavengedPath = ScavengedContent.Paths[ScavengeOp.ScavengedPathIndex]; + const ScavengeCopyOperation& ScavengeOp = ScavengeCopyOperations[ScavengeOpIndex]; + const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengeOp.ScavengedContentIndex]; + const std::filesystem::path ScavengedPath = ScavengedContent.Paths[ScavengeOp.ScavengedPathIndex]; - const std::filesystem::path ScavengedFilePath = - (ScavengedPaths[ScavengeOp.ScavengedContentIndex] / ScavengedPath).make_preferred(); - ZEN_ASSERT_SLOW(FileSizeFromPath(ScavengedFilePath) == ScavengeOp.RawSize); + const std::filesystem::path ScavengedFilePath = + (ScavengedPaths[ScavengeOp.ScavengedContentIndex] / ScavengedPath).make_preferred(); + ZEN_ASSERT_SLOW(FileSizeFromPath(ScavengedFilePath) == ScavengeOp.RawSize); - const IoHash& RemoteSequenceRawHash = - RemoteContent.ChunkedContent.SequenceRawHashes[ScavengeOp.RemoteSequenceIndex]; - const std::filesystem::path TempFilePath = - GetTempChunkedSequenceFileName(CacheFolderPath, RemoteSequenceRawHash); + const IoHash& RemoteSequenceRawHash = + RemoteContent.ChunkedContent.SequenceRawHashes[ScavengeOp.RemoteSequenceIndex]; + const std::filesystem::path TempFilePath = + GetTempChunkedSequenceFileName(CacheFolderPath, RemoteSequenceRawHash); - const uint64_t RawSize = ScavengedContent.RawSizes[ScavengeOp.ScavengedContentIndex]; - CopyFile(ScavengedFilePath, TempFilePath, RawSize, DiskStats.WriteCount, DiskStats.WriteByteCount); + const uint64_t RawSize = ScavengedContent.RawSizes[ScavengeOp.ScavengedContentIndex]; + CopyFile(ScavengedFilePath, TempFilePath, RawSize, DiskStats.WriteCount, DiskStats.WriteByteCount); - const std::filesystem::path CacheFilePath = - GetFinalChunkedSequenceFileName(CacheFolderPath, RemoteSequenceRawHash); - RenameFile(TempFilePath, CacheFilePath); + const std::filesystem::path CacheFilePath = + GetFinalChunkedSequenceFileName(CacheFolderPath, RemoteSequenceRawHash); + RenameFile(TempFilePath, CacheFilePath); - WritePartsComplete++; - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); + WritePartsComplete++; + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } } - } - }); + }); } } @@ -6273,7 +6466,31 @@ namespace { Work.ScheduleWork( WritePool, - [&, RemoteChunkIndex, ChunkTargetPtrs, BuildId, TotalRequestCount, TotalPartWriteCount](std::atomic<bool>&) mutable { + [&Storage, + &Path, + &ZenFolderPath, + &RemoteContent, + &RemoteLookup, + &CacheFolderPath, + &SequenceIndexChunksLeftToWriteCounters, + &Work, + &WritePool, + &NetworkPool, + PrimeCacheOnly, + &ExistsResult, + &DiskStats, + &DownloadStats, + &WriteChunkStats, + &WritePartsComplete, + RemoteChunkIndex, + ChunkTargetPtrs, + BuildId = Oid(BuildId), + LargeAttachmentSize, + PreferredMultipartChunkSize, + TotalRequestCount, + TotalPartWriteCount, + &FilteredDownloadedBytesPerSecond, + &FilteredWrittenBytesPerSecond](std::atomic<bool>&) mutable { if (!AbortFlag) { ZEN_TRACE_CPU("UpdateFolder_ReadPreDownloaded"); @@ -6392,7 +6609,7 @@ namespace { [&Path, &ZenFolderPath, &Storage, - BuildId, + BuildId = Oid(BuildId), &PrimeCacheOnly, &RemoteContent, &RemoteLookup, @@ -6453,50 +6670,67 @@ namespace { if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize) { ZEN_TRACE_CPU("UpdateFolder_GetLargeChunk"); - DownloadLargeBlob( - *Storage.BuildStorage, - ZenTempDownloadFolderPath(ZenFolderPath), - BuildId, - ChunkHash, - PreferredMultipartChunkSize, - Work, - NetworkPool, - DownloadStats, - [&, TotalPartWriteCount, TotalRequestCount, RemoteChunkIndex, ChunkTargetPtrs]( - IoBuffer&& Payload) mutable { - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - if (Payload && Storage.BuildCacheStorage) - { - Storage.BuildCacheStorage->PutBuildBlob( - BuildId, - ChunkHash, - ZenContentType::kCompressedBinary, - CompositeBuffer(SharedBuffer(Payload))); - } - if (!PrimeCacheOnly) - { - if (!AbortFlag) - { - AsyncWriteDownloadedChunk(ZenFolderPath, - RemoteContent, - RemoteLookup, - RemoteChunkIndex, - std::move(ChunkTargetPtrs), - Work, - WritePool, - std::move(Payload), - SequenceIndexChunksLeftToWriteCounters, - WritePartsComplete, - TotalPartWriteCount, - FilteredWrittenBytesPerSecond, - DiskStats); - } - } - }); + DownloadLargeBlob(*Storage.BuildStorage, + ZenTempDownloadFolderPath(ZenFolderPath), + BuildId, + ChunkHash, + PreferredMultipartChunkSize, + Work, + NetworkPool, + DownloadStats, + [&Storage, + &ZenFolderPath, + &RemoteContent, + &RemoteLookup, + BuildId, + PrimeCacheOnly, + &SequenceIndexChunksLeftToWriteCounters, + &Work, + &WritePool, + ChunkHash, + TotalPartWriteCount, + TotalRequestCount, + &WritePartsComplete, + &FilteredWrittenBytesPerSecond, + &FilteredDownloadedBytesPerSecond, + &DownloadStats, + &DiskStats, + RemoteChunkIndex, + ChunkTargetPtrs](IoBuffer&& Payload) mutable { + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + if (Payload && Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->PutBuildBlob( + BuildId, + ChunkHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(SharedBuffer(Payload))); + } + if (!PrimeCacheOnly) + { + if (!AbortFlag) + { + AsyncWriteDownloadedChunk( + ZenFolderPath, + RemoteContent, + RemoteLookup, + RemoteChunkIndex, + std::move(ChunkTargetPtrs), + Work, + WritePool, + std::move(Payload), + SequenceIndexChunksLeftToWriteCounters, + WritePartsComplete, + TotalPartWriteCount, + FilteredWrittenBytesPerSecond, + DiskStats); + } + } + }); } else { @@ -6559,180 +6793,200 @@ namespace { break; } - Work.ScheduleWork(WritePool, [&, CopyDataIndex](std::atomic<bool>&) { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_CopyLocal"); - - FilteredWrittenBytesPerSecond.Start(); - const CacheCopyData& CopyData = CacheCopyDatas[CopyDataIndex]; - - std::filesystem::path SourceFilePath; - - if (CopyData.ScavengeSourceIndex == (uint32_t)-1) - { - const uint32_t LocalPathIndex = LocalLookup.SequenceIndexFirstPathIndex[CopyData.SourceSequenceIndex]; - SourceFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred(); - } - else + Work.ScheduleWork( + WritePool, + [&Path, + &LocalContent, + &RemoteContent, + &RemoteLookup, + &CacheFolderPath, + &LocalLookup, + &SequenceIndexChunksLeftToWriteCounters, + &Work, + &WritePool, + &FilteredWrittenBytesPerSecond, + &CacheCopyDatas, + &ScavengedContents, + &ScavengedLookups, + &ScavengedPaths, + &WritePartsComplete, + TotalPartWriteCount, + &DiskStats, + CopyDataIndex](std::atomic<bool>&) { + if (!AbortFlag) { - const ChunkedFolderContent& ScavengedContent = ScavengedContents[CopyData.ScavengeSourceIndex]; - const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[CopyData.ScavengeSourceIndex]; - const std::filesystem::path ScavengedPath = ScavengedPaths[CopyData.ScavengeSourceIndex]; - const uint32_t ScavengedPathIndex = ScavengedLookup.SequenceIndexFirstPathIndex[CopyData.SourceSequenceIndex]; - SourceFilePath = (ScavengedPath / ScavengedContent.Paths[ScavengedPathIndex]).make_preferred(); - } - ZEN_ASSERT_SLOW(IsFile(SourceFilePath)); - ZEN_ASSERT(!CopyData.TargetChunkLocationPtrs.empty()); - - uint64_t CacheLocalFileBytesRead = 0; + ZEN_TRACE_CPU("UpdateFolder_CopyLocal"); - size_t TargetStart = 0; - const std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> AllTargets( - CopyData.TargetChunkLocationPtrs); - - struct WriteOp - { - const ChunkedContentLookup::ChunkSequenceLocation* Target = nullptr; - uint64_t CacheFileOffset = (uint64_t)-1; - uint32_t ChunkIndex = (uint32_t)-1; - }; + FilteredWrittenBytesPerSecond.Start(); + const CacheCopyData& CopyData = CacheCopyDatas[CopyDataIndex]; - std::vector<WriteOp> WriteOps; + std::filesystem::path SourceFilePath; - if (!AbortFlag) - { - ZEN_TRACE_CPU("Sort"); - WriteOps.reserve(AllTargets.size()); - for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets) + if (CopyData.ScavengeSourceIndex == (uint32_t)-1) { - std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> TargetRange = - AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount); - for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange) - { - WriteOps.push_back(WriteOp{.Target = Target, - .CacheFileOffset = ChunkTarget.CacheFileOffset, - .ChunkIndex = ChunkTarget.RemoteChunkIndex}); - } - TargetStart += ChunkTarget.TargetChunkLocationCount; + const uint32_t LocalPathIndex = LocalLookup.SequenceIndexFirstPathIndex[CopyData.SourceSequenceIndex]; + SourceFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred(); } + else + { + const ChunkedFolderContent& ScavengedContent = ScavengedContents[CopyData.ScavengeSourceIndex]; + const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[CopyData.ScavengeSourceIndex]; + const std::filesystem::path ScavengedPath = ScavengedPaths[CopyData.ScavengeSourceIndex]; + const uint32_t ScavengedPathIndex = + ScavengedLookup.SequenceIndexFirstPathIndex[CopyData.SourceSequenceIndex]; + SourceFilePath = (ScavengedPath / ScavengedContent.Paths[ScavengedPathIndex]).make_preferred(); + } + ZEN_ASSERT_SLOW(IsFile(SourceFilePath)); + ZEN_ASSERT(!CopyData.TargetChunkLocationPtrs.empty()); - std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) { - if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex) - { - return true; - } - else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex) - { - return false; - } - if (Lhs.Target->Offset < Rhs.Target->Offset) - { - return true; - } - return false; - }); - } + uint64_t CacheLocalFileBytesRead = 0; - if (!AbortFlag) - { - ZEN_TRACE_CPU("Write"); + size_t TargetStart = 0; + const std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> AllTargets( + CopyData.TargetChunkLocationPtrs); - tsl::robin_set<uint32_t> ChunkIndexesWritten; + struct WriteOp + { + const ChunkedContentLookup::ChunkSequenceLocation* Target = nullptr; + uint64_t CacheFileOffset = (uint64_t)-1; + uint32_t ChunkIndex = (uint32_t)-1; + }; - BufferedOpenFile SourceFile(SourceFilePath, DiskStats); - WriteFileCache OpenFileCache(DiskStats); - for (size_t WriteOpIndex = 0; WriteOpIndex < WriteOps.size();) + std::vector<WriteOp> WriteOps; + + if (!AbortFlag) { - if (AbortFlag) + ZEN_TRACE_CPU("Sort"); + WriteOps.reserve(AllTargets.size()); + for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets) { - break; + std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> TargetRange = + AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount); + for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange) + { + WriteOps.push_back(WriteOp{.Target = Target, + .CacheFileOffset = ChunkTarget.CacheFileOffset, + .ChunkIndex = ChunkTarget.RemoteChunkIndex}); + } + TargetStart += ChunkTarget.TargetChunkLocationCount; } - const WriteOp& Op = WriteOps[WriteOpIndex]; - - const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex; - ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() <= - RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]); - ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() > 0); - const uint32_t RemotePathIndex = RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]; - const uint64_t ChunkSize = RemoteContent.ChunkedContent.ChunkRawSizes[Op.ChunkIndex]; - - uint64_t ReadLength = ChunkSize; - size_t WriteCount = 1; - uint64_t OpSourceEnd = Op.CacheFileOffset + ChunkSize; - uint64_t OpTargetEnd = Op.Target->Offset + ChunkSize; - while ((WriteOpIndex + WriteCount) < WriteOps.size()) - { - const WriteOp& NextOp = WriteOps[WriteOpIndex + WriteCount]; - if (NextOp.Target->SequenceIndex != Op.Target->SequenceIndex) + + std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) { + if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex) { - break; + return true; } - if (NextOp.Target->Offset != OpTargetEnd) + else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex) { - break; + return false; } - if (NextOp.CacheFileOffset != OpSourceEnd) + if (Lhs.Target->Offset < Rhs.Target->Offset) { - break; + return true; } - const uint64_t NextChunkLength = RemoteContent.ChunkedContent.ChunkRawSizes[NextOp.ChunkIndex]; - if (ReadLength + NextChunkLength > 512u * 1024u) + return false; + }); + } + + if (!AbortFlag) + { + ZEN_TRACE_CPU("Write"); + + tsl::robin_set<uint32_t> ChunkIndexesWritten; + + BufferedOpenFile SourceFile(SourceFilePath, DiskStats); + WriteFileCache OpenFileCache(DiskStats); + for (size_t WriteOpIndex = 0; WriteOpIndex < WriteOps.size();) + { + if (AbortFlag) { break; } - ReadLength += NextChunkLength; - OpSourceEnd += NextChunkLength; - OpTargetEnd += NextChunkLength; - WriteCount++; - } + const WriteOp& Op = WriteOps[WriteOpIndex]; + + const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex; + ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() <= + RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]); + ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() > 0); + const uint32_t RemotePathIndex = RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]; + const uint64_t ChunkSize = RemoteContent.ChunkedContent.ChunkRawSizes[Op.ChunkIndex]; + + uint64_t ReadLength = ChunkSize; + size_t WriteCount = 1; + uint64_t OpSourceEnd = Op.CacheFileOffset + ChunkSize; + uint64_t OpTargetEnd = Op.Target->Offset + ChunkSize; + while ((WriteOpIndex + WriteCount) < WriteOps.size()) + { + const WriteOp& NextOp = WriteOps[WriteOpIndex + WriteCount]; + if (NextOp.Target->SequenceIndex != Op.Target->SequenceIndex) + { + break; + } + if (NextOp.Target->Offset != OpTargetEnd) + { + break; + } + if (NextOp.CacheFileOffset != OpSourceEnd) + { + break; + } + const uint64_t NextChunkLength = RemoteContent.ChunkedContent.ChunkRawSizes[NextOp.ChunkIndex]; + if (ReadLength + NextChunkLength > 512u * 1024u) + { + break; + } + ReadLength += NextChunkLength; + OpSourceEnd += NextChunkLength; + OpTargetEnd += NextChunkLength; + WriteCount++; + } - CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ReadLength); - ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]); + CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ReadLength); + ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]); - OpenFileCache.WriteToFile<CompositeBuffer>( - RemoteSequenceIndex, - [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) { - return GetTempChunkedSequenceFileName( - CacheFolderPath, - RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); - }, - ChunkSource, - Op.Target->Offset, - RemoteContent.RawSizes[RemotePathIndex]); + OpenFileCache.WriteToFile<CompositeBuffer>( + RemoteSequenceIndex, + [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) { + return GetTempChunkedSequenceFileName( + CacheFolderPath, + RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); + }, + ChunkSource, + Op.Target->Offset, + RemoteContent.RawSizes[RemotePathIndex]); - CacheLocalFileBytesRead += ReadLength; // TODO: This should be the sum of unique chunk sizes? + CacheLocalFileBytesRead += ReadLength; // TODO: This should be the sum of unique chunk sizes? - WriteOpIndex += WriteCount; + WriteOpIndex += WriteCount; + } } - } - if (!AbortFlag) - { - // Write tracking, updating this must be done without any files open (WriteFileCache) - std::vector<uint32_t> CompletedChunkSequences; - for (const WriteOp& Op : WriteOps) + if (!AbortFlag) { - const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex; - if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) + // Write tracking, updating this must be done without any files open (WriteFileCache) + std::vector<uint32_t> CompletedChunkSequences; + for (const WriteOp& Op : WriteOps) { - CompletedChunkSequences.push_back(RemoteSequenceIndex); + const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex; + if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) + { + CompletedChunkSequences.push_back(RemoteSequenceIndex); + } } + VerifyAndCompleteChunkSequencesAsync(CacheFolderPath, + RemoteContent, + RemoteLookup, + CompletedChunkSequences, + Work, + WritePool); + ZEN_CONSOLE_VERBOSE("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), SourceFilePath); + } + WritePartsComplete++; + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); } - VerifyAndCompleteChunkSequencesAsync(CacheFolderPath, - RemoteContent, - RemoteLookup, - CompletedChunkSequences, - Work, - WritePool); - ZEN_CONSOLE_VERBOSE("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), SourceFilePath); - } - WritePartsComplete++; - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); } - } - }); + }); } for (uint32_t BlockIndex : CachedChunkBlockIndexes) @@ -6743,52 +6997,67 @@ namespace { break; } - Work.ScheduleWork(WritePool, [&, BlockIndex](std::atomic<bool>&) mutable { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_WriteCachedBlock"); - - const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; - FilteredWrittenBytesPerSecond.Start(); - - std::filesystem::path BlockChunkPath = - ZenTempBlockFolderPath(ZenFolderPath) / BlockDescription.BlockHash.ToHexString(); - IoBuffer BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); - if (!BlockBuffer) - { - throw std::runtime_error( - fmt::format("Can not read block {} at {}", BlockDescription.BlockHash, BlockChunkPath)); - } - + Work.ScheduleWork( + WritePool, + [&ZenFolderPath, + &CacheFolderPath, + &RemoteContent, + &RemoteLookup, + &RemoteChunkIndexNeedsCopyFromSourceFlags, + &SequenceIndexChunksLeftToWriteCounters, + &Work, + &WritePool, + &BlockDescriptions, + &FilteredWrittenBytesPerSecond, + &DiskStats, + &WritePartsComplete, + TotalPartWriteCount, + BlockIndex](std::atomic<bool>&) mutable { if (!AbortFlag) { - if (!WriteBlockToDisk(CacheFolderPath, - RemoteContent, - BlockDescription, - SequenceIndexChunksLeftToWriteCounters, - Work, - WritePool, - CompositeBuffer(std::move(BlockBuffer)), - RemoteLookup, - RemoteChunkIndexNeedsCopyFromSourceFlags, - DiskStats)) + ZEN_TRACE_CPU("UpdateFolder_WriteCachedBlock"); + + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + FilteredWrittenBytesPerSecond.Start(); + + std::filesystem::path BlockChunkPath = + ZenTempBlockFolderPath(ZenFolderPath) / BlockDescription.BlockHash.ToHexString(); + IoBuffer BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); + if (!BlockBuffer) { - std::error_code DummyEc; - RemoveFile(BlockChunkPath, DummyEc); - throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash)); + throw std::runtime_error( + fmt::format("Can not read block {} at {}", BlockDescription.BlockHash, BlockChunkPath)); } - TryRemoveFile(BlockChunkPath); + if (!AbortFlag) + { + if (!WriteBlockToDisk(CacheFolderPath, + RemoteContent, + BlockDescription, + SequenceIndexChunksLeftToWriteCounters, + Work, + WritePool, + CompositeBuffer(std::move(BlockBuffer)), + RemoteLookup, + RemoteChunkIndexNeedsCopyFromSourceFlags, + DiskStats)) + { + std::error_code DummyEc; + RemoveFile(BlockChunkPath, DummyEc); + throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash)); + } - WritePartsComplete++; + TryRemoveFile(BlockChunkPath); - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); + WritePartsComplete++; + + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } } } - } - }); + }); } for (size_t BlockRangeIndex = 0; BlockRangeIndex < BlockRangeWorks.size(); BlockRangeIndex++) @@ -6801,214 +7070,66 @@ namespace { const BlockRangeDescriptor BlockRange = BlockRangeWorks[BlockRangeIndex]; ZEN_ASSERT(BlockRange.BlockIndex != (uint32_t)-1); const uint32_t BlockIndex = BlockRange.BlockIndex; - Work.ScheduleWork(NetworkPool, [&, BlockIndex, BlockRange](std::atomic<bool>&) { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_GetPartialBlock"); - - const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; - - FilteredDownloadedBytesPerSecond.Start(); - IoBuffer BlockBuffer; - if (Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash)) - { - BlockBuffer = Storage.BuildCacheStorage->GetBuildBlob(BuildId, - BlockDescription.BlockHash, - BlockRange.RangeStart, - BlockRange.RangeLength); - } - if (!BlockBuffer) - { - BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId, - BlockDescription.BlockHash, - BlockRange.RangeStart, - BlockRange.RangeLength); - } - if (!BlockBuffer) - { - throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); - } + Work.ScheduleWork( + NetworkPool, + [&Storage, + &ZenFolderPath, + BuildId, + &RemoteLookup, + &BlockDescriptions, + &RemoteChunkIndexNeedsCopyFromSourceFlags, + &CacheFolderPath, + &RemoteContent, + &SequenceIndexChunksLeftToWriteCounters, + &ExistsResult, + &FilteredDownloadedBytesPerSecond, + TotalRequestCount, + &WritePartsComplete, + TotalPartWriteCount, + &FilteredWrittenBytesPerSecond, + &DiskStats, + &DownloadStats, + &Work, + &WritePool, + BlockIndex, + BlockRange](std::atomic<bool>&) { if (!AbortFlag) { - uint64_t BlockSize = BlockBuffer.GetSize(); - DownloadStats.DownloadedBlockCount++; - DownloadStats.DownloadedBlockByteCount += BlockSize; - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } + ZEN_TRACE_CPU("UpdateFolder_GetPartialBlock"); - std::filesystem::path BlockChunkPath; + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; - // Check if the dowloaded block is file based and we can move it directly without rewriting it + FilteredDownloadedBytesPerSecond.Start(); + IoBuffer BlockBuffer; + if (Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash)) { - IoBufferFileReference FileRef; - if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && - (FileRef.FileChunkSize == BlockSize)) - { - ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock"); - - std::error_code Ec; - std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); - if (!Ec) - { - BlockBuffer.SetDeleteOnClose(false); - BlockBuffer = {}; - BlockChunkPath = ZenTempBlockFolderPath(ZenFolderPath) / fmt::format("{}_{:x}_{:x}", - BlockDescription.BlockHash, - BlockRange.RangeStart, - BlockRange.RangeLength); - RenameFile(TempBlobPath, BlockChunkPath, Ec); - if (Ec) - { - BlockChunkPath = std::filesystem::path{}; - - // Re-open the temp file again - BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); - BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); - BlockBuffer.SetDeleteOnClose(true); - } - } - } + BlockBuffer = Storage.BuildCacheStorage->GetBuildBlob(BuildId, + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength); } - - if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u)) + if (!BlockBuffer) { - ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock"); - // Could not be moved and rather large, lets store it on disk - BlockChunkPath = - ZenTempBlockFolderPath(ZenFolderPath) / - fmt::format("{}_{:x}_{:x}", BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength); - TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); - BlockBuffer = {}; + BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId, + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength); } - - if (!AbortFlag) - { - Work.ScheduleWork( - WritePool, - [&, BlockIndex, BlockRange, BlockChunkPath, BlockPartialBuffer = std::move(BlockBuffer)]( - std::atomic<bool>&) mutable { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_WritePartialBlock"); - - const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; - - if (BlockChunkPath.empty()) - { - ZEN_ASSERT(BlockPartialBuffer); - } - else - { - ZEN_ASSERT(!BlockPartialBuffer); - BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); - if (!BlockPartialBuffer) - { - throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}", - BlockDescription.BlockHash, - BlockChunkPath)); - } - } - - FilteredWrittenBytesPerSecond.Start(); - - if (!WritePartialBlockToDisk( - CacheFolderPath, - RemoteContent, - BlockDescription, - SequenceIndexChunksLeftToWriteCounters, - Work, - WritePool, - CompositeBuffer(std::move(BlockPartialBuffer)), - BlockRange.ChunkBlockIndexStart, - BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1, - RemoteLookup, - RemoteChunkIndexNeedsCopyFromSourceFlags, - DiskStats)) - { - std::error_code DummyEc; - RemoveFile(BlockChunkPath, DummyEc); - throw std::runtime_error( - fmt::format("Partial block {} is malformed", BlockDescription.BlockHash)); - } - - if (!BlockChunkPath.empty()) - { - TryRemoveFile(BlockChunkPath); - } - - WritePartsComplete++; - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); - } - } - }); - } - } - } - }); - } - - for (uint32_t BlockIndex : FullBlockWorks) - { - if (AbortFlag) - { - break; - } - - if (PrimeCacheOnly && ExistsResult.ExistingBlobs.contains(BlockDescriptions[BlockIndex].BlockHash)) - { - DownloadStats.RequestsCompleteCount++; - continue; - } - - Work.ScheduleWork(NetworkPool, [&, BlockIndex](std::atomic<bool>&) { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_GetFullBlock"); - - const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; - - FilteredDownloadedBytesPerSecond.Start(); - - IoBuffer BlockBuffer; - const bool ExistsInCache = - Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash); - if (ExistsInCache) - { - BlockBuffer = Storage.BuildCacheStorage->GetBuildBlob(BuildId, BlockDescription.BlockHash); - } - if (!BlockBuffer) - { - BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId, BlockDescription.BlockHash); - if (BlockBuffer && Storage.BuildCacheStorage) + if (!BlockBuffer) { - Storage.BuildCacheStorage->PutBuildBlob(BuildId, - BlockDescription.BlockHash, - BlockBuffer.GetContentType(), - CompositeBuffer(SharedBuffer(BlockBuffer))); + throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); } - } - if (!BlockBuffer) - { - throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); - } - if (!AbortFlag) - { - uint64_t BlockSize = BlockBuffer.GetSize(); - DownloadStats.DownloadedBlockCount++; - DownloadStats.DownloadedBlockByteCount += BlockSize; - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + if (!AbortFlag) { - FilteredDownloadedBytesPerSecond.Stop(); - } + uint64_t BlockSize = BlockBuffer.GetSize(); + DownloadStats.DownloadedBlockCount++; + DownloadStats.DownloadedBlockByteCount += BlockSize; + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } - if (!PrimeCacheOnly) - { std::filesystem::path BlockChunkPath; // Check if the dowloaded block is file based and we can move it directly without rewriting it @@ -7018,14 +7139,17 @@ namespace { (FileRef.FileChunkSize == BlockSize)) { ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock"); + std::error_code Ec; std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); if (!Ec) { BlockBuffer.SetDeleteOnClose(false); - BlockBuffer = {}; - BlockChunkPath = - ZenTempBlockFolderPath(ZenFolderPath) / BlockDescription.BlockHash.ToHexString(); + BlockBuffer = {}; + BlockChunkPath = ZenTempBlockFolderPath(ZenFolderPath) / fmt::format("{}_{:x}_{:x}", + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength); RenameFile(TempBlobPath, BlockChunkPath, Ec); if (Ec) { @@ -7044,7 +7168,10 @@ namespace { { ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock"); // Could not be moved and rather large, lets store it on disk - BlockChunkPath = ZenTempBlockFolderPath(ZenFolderPath) / BlockDescription.BlockHash.ToHexString(); + BlockChunkPath = ZenTempBlockFolderPath(ZenFolderPath) / fmt::format("{}_{:x}_{:x}", + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength); TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); BlockBuffer = {}; } @@ -7053,60 +7180,64 @@ namespace { { Work.ScheduleWork( WritePool, - [&Work, - &WritePool, + [&CacheFolderPath, &RemoteContent, &RemoteLookup, - CacheFolderPath, + &BlockDescriptions, &RemoteChunkIndexNeedsCopyFromSourceFlags, &SequenceIndexChunksLeftToWriteCounters, - BlockIndex, - &BlockDescriptions, - &WriteChunkStats, - &DiskStats, &WritePartsComplete, + &Work, TotalPartWriteCount, + &WritePool, + &DiskStats, &FilteredWrittenBytesPerSecond, + BlockIndex, + BlockRange, BlockChunkPath, - BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable { + BlockPartialBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable { if (!AbortFlag) { - ZEN_TRACE_CPU("UpdateFolder_WriteFullBlock"); + ZEN_TRACE_CPU("UpdateFolder_WritePartialBlock"); const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; if (BlockChunkPath.empty()) { - ZEN_ASSERT(BlockBuffer); + ZEN_ASSERT(BlockPartialBuffer); } else { - ZEN_ASSERT(!BlockBuffer); - BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); - if (!BlockBuffer) + ZEN_ASSERT(!BlockPartialBuffer); + BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); + if (!BlockPartialBuffer) { - throw std::runtime_error(fmt::format("Could not open dowloaded block {} from {}", + throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}", BlockDescription.BlockHash, BlockChunkPath)); } } FilteredWrittenBytesPerSecond.Start(); - if (!WriteBlockToDisk(CacheFolderPath, - RemoteContent, - BlockDescription, - SequenceIndexChunksLeftToWriteCounters, - Work, - WritePool, - CompositeBuffer(std::move(BlockBuffer)), - RemoteLookup, - RemoteChunkIndexNeedsCopyFromSourceFlags, - DiskStats)) + + if (!WritePartialBlockToDisk( + CacheFolderPath, + RemoteContent, + BlockDescription, + SequenceIndexChunksLeftToWriteCounters, + Work, + WritePool, + CompositeBuffer(std::move(BlockPartialBuffer)), + BlockRange.ChunkBlockIndexStart, + BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1, + RemoteLookup, + RemoteChunkIndexNeedsCopyFromSourceFlags, + DiskStats)) { std::error_code DummyEc; RemoveFile(BlockChunkPath, DummyEc); throw std::runtime_error( - fmt::format("Block {} is malformed", BlockDescription.BlockHash)); + fmt::format("Partial block {} is malformed", BlockDescription.BlockHash)); } if (!BlockChunkPath.empty()) @@ -7115,7 +7246,6 @@ namespace { } WritePartsComplete++; - if (WritePartsComplete == TotalPartWriteCount) { FilteredWrittenBytesPerSecond.Stop(); @@ -7125,8 +7255,208 @@ namespace { } } } - } - }); + }); + } + + for (uint32_t BlockIndex : FullBlockWorks) + { + if (AbortFlag) + { + break; + } + + if (PrimeCacheOnly && ExistsResult.ExistingBlobs.contains(BlockDescriptions[BlockIndex].BlockHash)) + { + DownloadStats.RequestsCompleteCount++; + continue; + } + + Work.ScheduleWork( + NetworkPool, + [&Storage, + &ZenFolderPath, + BuildId, + PrimeCacheOnly, + &BlockDescriptions, + &WritePartsComplete, + TotalPartWriteCount, + &FilteredWrittenBytesPerSecond, + &ExistsResult, + &Work, + &WritePool, + &RemoteContent, + &RemoteLookup, + &CacheFolderPath, + &RemoteChunkIndexNeedsCopyFromSourceFlags, + &SequenceIndexChunksLeftToWriteCounters, + &FilteredDownloadedBytesPerSecond, + &WriteChunkStats, + &DiskStats, + &DownloadStats, + TotalRequestCount, + BlockIndex](std::atomic<bool>&) { + if (!AbortFlag) + { + ZEN_TRACE_CPU("UpdateFolder_GetFullBlock"); + + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + + FilteredDownloadedBytesPerSecond.Start(); + + IoBuffer BlockBuffer; + const bool ExistsInCache = + Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash); + if (ExistsInCache) + { + BlockBuffer = Storage.BuildCacheStorage->GetBuildBlob(BuildId, BlockDescription.BlockHash); + } + if (!BlockBuffer) + { + BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId, BlockDescription.BlockHash); + if (BlockBuffer && Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->PutBuildBlob(BuildId, + BlockDescription.BlockHash, + BlockBuffer.GetContentType(), + CompositeBuffer(SharedBuffer(BlockBuffer))); + } + } + if (!BlockBuffer) + { + throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); + } + if (!AbortFlag) + { + uint64_t BlockSize = BlockBuffer.GetSize(); + DownloadStats.DownloadedBlockCount++; + DownloadStats.DownloadedBlockByteCount += BlockSize; + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + + if (!PrimeCacheOnly) + { + std::filesystem::path BlockChunkPath; + + // Check if the dowloaded block is file based and we can move it directly without rewriting it + { + IoBufferFileReference FileRef; + if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && + (FileRef.FileChunkSize == BlockSize)) + { + ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock"); + std::error_code Ec; + std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); + if (!Ec) + { + BlockBuffer.SetDeleteOnClose(false); + BlockBuffer = {}; + BlockChunkPath = + ZenTempBlockFolderPath(ZenFolderPath) / BlockDescription.BlockHash.ToHexString(); + RenameFile(TempBlobPath, BlockChunkPath, Ec); + if (Ec) + { + BlockChunkPath = std::filesystem::path{}; + + // Re-open the temp file again + BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); + BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); + BlockBuffer.SetDeleteOnClose(true); + } + } + } + } + + if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u)) + { + ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock"); + // Could not be moved and rather large, lets store it on disk + BlockChunkPath = ZenTempBlockFolderPath(ZenFolderPath) / BlockDescription.BlockHash.ToHexString(); + TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); + BlockBuffer = {}; + } + + if (!AbortFlag) + { + Work.ScheduleWork(WritePool, + [&Work, + &WritePool, + &RemoteContent, + &RemoteLookup, + CacheFolderPath, + &RemoteChunkIndexNeedsCopyFromSourceFlags, + &SequenceIndexChunksLeftToWriteCounters, + BlockIndex, + &BlockDescriptions, + &WriteChunkStats, + &DiskStats, + &WritePartsComplete, + TotalPartWriteCount, + &FilteredWrittenBytesPerSecond, + BlockChunkPath, + BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable { + if (!AbortFlag) + { + ZEN_TRACE_CPU("UpdateFolder_WriteFullBlock"); + + const ChunkBlockDescription& BlockDescription = + BlockDescriptions[BlockIndex]; + + if (BlockChunkPath.empty()) + { + ZEN_ASSERT(BlockBuffer); + } + else + { + ZEN_ASSERT(!BlockBuffer); + BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); + if (!BlockBuffer) + { + throw std::runtime_error( + fmt::format("Could not open dowloaded block {} from {}", + BlockDescription.BlockHash, + BlockChunkPath)); + } + } + + FilteredWrittenBytesPerSecond.Start(); + if (!WriteBlockToDisk(CacheFolderPath, + RemoteContent, + BlockDescription, + SequenceIndexChunksLeftToWriteCounters, + Work, + WritePool, + CompositeBuffer(std::move(BlockBuffer)), + RemoteLookup, + RemoteChunkIndexNeedsCopyFromSourceFlags, + DiskStats)) + { + std::error_code DummyEc; + RemoveFile(BlockChunkPath, DummyEc); + throw std::runtime_error( + fmt::format("Block {} is malformed", BlockDescription.BlockHash)); + } + + if (!BlockChunkPath.empty()) + { + TryRemoveFile(BlockChunkPath); + } + + WritePartsComplete++; + + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } + } + }); + } + } + } + } + }); } { @@ -7328,20 +7658,22 @@ namespace { { break; } - Work.ScheduleWork(WritePool, [&, LocalPathIndex](std::atomic<bool>&) { - ZEN_TRACE_CPU("UpdateFolder_AsyncCopyToCache"); - if (!AbortFlag) - { - const IoHash& RawHash = LocalContent.RawHashes[LocalPathIndex]; - const std::filesystem::path& LocalPath = LocalContent.Paths[LocalPathIndex]; - const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash); - ZEN_ASSERT_SLOW(!IsFileWithRetry(CacheFilePath)); - const std::filesystem::path LocalFilePath = (Path / LocalPath).make_preferred(); - RenameFileWithRetry(LocalFilePath, CacheFilePath); - CachedCount++; - CachedByteCount += LocalContent.RawSizes[LocalPathIndex]; - } - }); + Work.ScheduleWork( + WritePool, + [&Path, &LocalContent, &CacheFolderPath, &CachedCount, &CachedByteCount, LocalPathIndex](std::atomic<bool>&) { + ZEN_TRACE_CPU("UpdateFolder_AsyncCopyToCache"); + if (!AbortFlag) + { + const IoHash& RawHash = LocalContent.RawHashes[LocalPathIndex]; + const std::filesystem::path& LocalPath = LocalContent.Paths[LocalPathIndex]; + const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash); + ZEN_ASSERT_SLOW(!IsFileWithRetry(CacheFilePath)); + const std::filesystem::path LocalFilePath = (Path / LocalPath).make_preferred(); + RenameFileWithRetry(LocalFilePath, CacheFilePath); + CachedCount++; + CachedByteCount += LocalContent.RawSizes[LocalPathIndex]; + } + }); } { @@ -7420,7 +7752,7 @@ namespace { { break; } - Work.ScheduleWork(WritePool, [&, LocalPathIndex](std::atomic<bool>&) { + Work.ScheduleWork(WritePool, [&Path, &LocalContent, &DeletedCount, LocalPathIndex](std::atomic<bool>&) { if (!AbortFlag) { const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred(); @@ -7471,164 +7803,183 @@ namespace { TargetCount++; } - Work.ScheduleWork(WritePool, [&, BaseTargetOffset = TargetOffset, TargetCount](std::atomic<bool>&) { - if (!AbortFlag) - { - ZEN_TRACE_CPU("FinalizeTree_Work"); + Work.ScheduleWork( + WritePool, + [&Path, + &LocalContent, + &SequenceHashToLocalPathIndex, + &RemoteContent, + &RemoteLookup, + &CacheFolderPath, + &Targets, + &RemotePathIndexToLocalPathIndex, + &RebuildFolderStateStats, + &OutLocalFolderState, + BaseTargetOffset = TargetOffset, + TargetCount, + &TargetsComplete](std::atomic<bool>&) { + if (!AbortFlag) + { + ZEN_TRACE_CPU("FinalizeTree_Work"); - size_t TargetOffset = BaseTargetOffset; - const IoHash& RawHash = Targets[TargetOffset].RawHash; + size_t TargetOffset = BaseTargetOffset; + const IoHash& RawHash = Targets[TargetOffset].RawHash; - if (RawHash == IoHash::Zero) - { - ZEN_TRACE_CPU("ZeroSize"); - while (TargetOffset < (BaseTargetOffset + TargetCount)) + if (RawHash == IoHash::Zero) { - const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex; - ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash); - const std::filesystem::path& TargetPath = RemoteContent.Paths[RemotePathIndex]; - std::filesystem::path TargetFilePath = (Path / TargetPath).make_preferred(); - if (!RemotePathIndexToLocalPathIndex[RemotePathIndex]) + ZEN_TRACE_CPU("ZeroSize"); + while (TargetOffset < (BaseTargetOffset + TargetCount)) { - if (IsFileWithRetry(TargetFilePath)) + const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex; + ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash); + const std::filesystem::path& TargetPath = RemoteContent.Paths[RemotePathIndex]; + std::filesystem::path TargetFilePath = (Path / TargetPath).make_preferred(); + if (!RemotePathIndexToLocalPathIndex[RemotePathIndex]) { - SetFileReadOnlyWithRetry(TargetFilePath, false); - } - else - { - CreateDirectories(TargetFilePath.parent_path()); + if (IsFileWithRetry(TargetFilePath)) + { + SetFileReadOnlyWithRetry(TargetFilePath, false); + } + else + { + CreateDirectories(TargetFilePath.parent_path()); + } + BasicFile OutputFile; + OutputFile.Open(TargetFilePath, BasicFile::Mode::kTruncate); } - BasicFile OutputFile; - OutputFile.Open(TargetFilePath, BasicFile::Mode::kTruncate); + OutLocalFolderState.Paths[RemotePathIndex] = TargetPath; + OutLocalFolderState.RawSizes[RemotePathIndex] = RemoteContent.RawSizes[RemotePathIndex]; + + OutLocalFolderState.Attributes[RemotePathIndex] = + RemoteContent.Attributes.empty() + ? GetNativeFileAttributes(TargetFilePath) + : SetNativeFileAttributes(TargetFilePath, + RemoteContent.Platform, + RemoteContent.Attributes[RemotePathIndex]); + OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath); + + TargetOffset++; + TargetsComplete++; } - OutLocalFolderState.Paths[RemotePathIndex] = TargetPath; - OutLocalFolderState.RawSizes[RemotePathIndex] = RemoteContent.RawSizes[RemotePathIndex]; - - OutLocalFolderState.Attributes[RemotePathIndex] = - RemoteContent.Attributes.empty() ? GetNativeFileAttributes(TargetFilePath) - : SetNativeFileAttributes(TargetFilePath, - RemoteContent.Platform, - RemoteContent.Attributes[RemotePathIndex]); - OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath); - - TargetOffset++; - TargetsComplete++; - } - } - else - { - ZEN_TRACE_CPU("Files"); - ZEN_ASSERT(RemoteLookup.RawHashToSequenceIndex.contains(RawHash)); - const uint32_t FirstRemotePathIndex = Targets[TargetOffset].RemotePathIndex; - const std::filesystem::path& FirstTargetPath = RemoteContent.Paths[FirstRemotePathIndex]; - std::filesystem::path FirstTargetFilePath = (Path / FirstTargetPath).make_preferred(); - - if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(FirstRemotePathIndex); - InPlaceIt != RemotePathIndexToLocalPathIndex.end()) - { - ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath)); } else { - if (IsFileWithRetry(FirstTargetFilePath)) - { - SetFileReadOnlyWithRetry(FirstTargetFilePath, false); - } - else - { - CreateDirectories(FirstTargetFilePath.parent_path()); - } + ZEN_TRACE_CPU("Files"); + ZEN_ASSERT(RemoteLookup.RawHashToSequenceIndex.contains(RawHash)); + const uint32_t FirstRemotePathIndex = Targets[TargetOffset].RemotePathIndex; + const std::filesystem::path& FirstTargetPath = RemoteContent.Paths[FirstRemotePathIndex]; + std::filesystem::path FirstTargetFilePath = (Path / FirstTargetPath).make_preferred(); - if (auto InplaceIt = SequenceHashToLocalPathIndex.find(RawHash); - InplaceIt != SequenceHashToLocalPathIndex.end()) + if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(FirstRemotePathIndex); + InPlaceIt != RemotePathIndexToLocalPathIndex.end()) { - ZEN_TRACE_CPU("Copy"); - const uint32_t LocalPathIndex = InplaceIt->second; - const std::filesystem::path& SourcePath = LocalContent.Paths[LocalPathIndex]; - std::filesystem::path SourceFilePath = (Path / SourcePath).make_preferred(); - ZEN_ASSERT_SLOW(IsFileWithRetry(SourceFilePath)); - - ZEN_DEBUG("Copying from '{}' -> '{}'", SourceFilePath, FirstTargetFilePath); - const uint64_t RawSize = LocalContent.RawSizes[LocalPathIndex]; - std::atomic<uint64_t> WriteCount; - std::atomic<uint64_t> WriteByteCount; - CopyFile(SourceFilePath, FirstTargetFilePath, RawSize, WriteCount, WriteByteCount); - RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++; + ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath)); } else { - ZEN_TRACE_CPU("Rename"); - const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash); - ZEN_ASSERT_SLOW(IsFileWithRetry(CacheFilePath)); + if (IsFileWithRetry(FirstTargetFilePath)) + { + SetFileReadOnlyWithRetry(FirstTargetFilePath, false); + } + else + { + CreateDirectories(FirstTargetFilePath.parent_path()); + } - RenameFileWithRetry(CacheFilePath, FirstTargetFilePath); + if (auto InplaceIt = SequenceHashToLocalPathIndex.find(RawHash); + InplaceIt != SequenceHashToLocalPathIndex.end()) + { + ZEN_TRACE_CPU("Copy"); + const uint32_t LocalPathIndex = InplaceIt->second; + const std::filesystem::path& SourcePath = LocalContent.Paths[LocalPathIndex]; + std::filesystem::path SourceFilePath = (Path / SourcePath).make_preferred(); + ZEN_ASSERT_SLOW(IsFileWithRetry(SourceFilePath)); + + ZEN_DEBUG("Copying from '{}' -> '{}'", SourceFilePath, FirstTargetFilePath); + const uint64_t RawSize = LocalContent.RawSizes[LocalPathIndex]; + std::atomic<uint64_t> WriteCount; + std::atomic<uint64_t> WriteByteCount; + CopyFile(SourceFilePath, FirstTargetFilePath, RawSize, WriteCount, WriteByteCount); + RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++; + } + else + { + ZEN_TRACE_CPU("Rename"); + const std::filesystem::path CacheFilePath = + GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash); + ZEN_ASSERT_SLOW(IsFileWithRetry(CacheFilePath)); - RebuildFolderStateStats.FinalizeTreeFilesMovedCount++; - } - } + RenameFileWithRetry(CacheFilePath, FirstTargetFilePath); - OutLocalFolderState.Paths[FirstRemotePathIndex] = FirstTargetPath; - OutLocalFolderState.RawSizes[FirstRemotePathIndex] = RemoteContent.RawSizes[FirstRemotePathIndex]; + RebuildFolderStateStats.FinalizeTreeFilesMovedCount++; + } + } - OutLocalFolderState.Attributes[FirstRemotePathIndex] = - RemoteContent.Attributes.empty() ? GetNativeFileAttributes(FirstTargetFilePath) - : SetNativeFileAttributes(FirstTargetFilePath, - RemoteContent.Platform, - RemoteContent.Attributes[FirstRemotePathIndex]); - OutLocalFolderState.ModificationTicks[FirstRemotePathIndex] = GetModificationTickFromPath(FirstTargetFilePath); + OutLocalFolderState.Paths[FirstRemotePathIndex] = FirstTargetPath; + OutLocalFolderState.RawSizes[FirstRemotePathIndex] = RemoteContent.RawSizes[FirstRemotePathIndex]; - TargetOffset++; - TargetsComplete++; + OutLocalFolderState.Attributes[FirstRemotePathIndex] = + RemoteContent.Attributes.empty() + ? GetNativeFileAttributes(FirstTargetFilePath) + : SetNativeFileAttributes(FirstTargetFilePath, + RemoteContent.Platform, + RemoteContent.Attributes[FirstRemotePathIndex]); + OutLocalFolderState.ModificationTicks[FirstRemotePathIndex] = + GetModificationTickFromPath(FirstTargetFilePath); - while (TargetOffset < (BaseTargetOffset + TargetCount)) - { - const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex; - ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash); - const std::filesystem::path& TargetPath = RemoteContent.Paths[RemotePathIndex]; - std::filesystem::path TargetFilePath = (Path / TargetPath).make_preferred(); + TargetOffset++; + TargetsComplete++; - if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(RemotePathIndex); - InPlaceIt != RemotePathIndexToLocalPathIndex.end()) - { - ZEN_ASSERT_SLOW(IsFileWithRetry(TargetFilePath)); - } - else + while (TargetOffset < (BaseTargetOffset + TargetCount)) { - ZEN_TRACE_CPU("Copy"); - if (IsFileWithRetry(TargetFilePath)) + const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex; + ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash); + const std::filesystem::path& TargetPath = RemoteContent.Paths[RemotePathIndex]; + std::filesystem::path TargetFilePath = (Path / TargetPath).make_preferred(); + + if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(RemotePathIndex); + InPlaceIt != RemotePathIndexToLocalPathIndex.end()) { - SetFileReadOnlyWithRetry(TargetFilePath, false); + ZEN_ASSERT_SLOW(IsFileWithRetry(TargetFilePath)); } else { - CreateDirectories(TargetFilePath.parent_path()); - } + ZEN_TRACE_CPU("Copy"); + if (IsFileWithRetry(TargetFilePath)) + { + SetFileReadOnlyWithRetry(TargetFilePath, false); + } + else + { + CreateDirectories(TargetFilePath.parent_path()); + } - ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath)); - ZEN_DEBUG("Copying from '{}' -> '{}'", FirstTargetFilePath, TargetFilePath); - const uint64_t RawSize = RemoteContent.RawSizes[RemotePathIndex]; - std::atomic<uint64_t> WriteCount; - std::atomic<uint64_t> WriteByteCount; - CopyFile(FirstTargetFilePath, TargetFilePath, RawSize, WriteCount, WriteByteCount); - RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++; - } + ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath)); + ZEN_DEBUG("Copying from '{}' -> '{}'", FirstTargetFilePath, TargetFilePath); + const uint64_t RawSize = RemoteContent.RawSizes[RemotePathIndex]; + std::atomic<uint64_t> WriteCount; + std::atomic<uint64_t> WriteByteCount; + CopyFile(FirstTargetFilePath, TargetFilePath, RawSize, WriteCount, WriteByteCount); + RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++; + } - OutLocalFolderState.Paths[RemotePathIndex] = TargetPath; - OutLocalFolderState.RawSizes[RemotePathIndex] = RemoteContent.RawSizes[RemotePathIndex]; + OutLocalFolderState.Paths[RemotePathIndex] = TargetPath; + OutLocalFolderState.RawSizes[RemotePathIndex] = RemoteContent.RawSizes[RemotePathIndex]; - OutLocalFolderState.Attributes[RemotePathIndex] = - RemoteContent.Attributes.empty() ? GetNativeFileAttributes(TargetFilePath) - : SetNativeFileAttributes(TargetFilePath, - RemoteContent.Platform, - RemoteContent.Attributes[RemotePathIndex]); - OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath); + OutLocalFolderState.Attributes[RemotePathIndex] = + RemoteContent.Attributes.empty() + ? GetNativeFileAttributes(TargetFilePath) + : SetNativeFileAttributes(TargetFilePath, + RemoteContent.Platform, + RemoteContent.Attributes[RemotePathIndex]); + OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath); - TargetOffset++; - TargetsComplete++; + TargetOffset++; + TargetsComplete++; + } } } - } - }); + }); TargetOffset += TargetCount; } diff --git a/src/zenutil/jupiter/jupitersession.cpp b/src/zenutil/jupiter/jupitersession.cpp index de138f994..01a703a1b 100644 --- a/src/zenutil/jupiter/jupitersession.cpp +++ b/src/zenutil/jupiter/jupitersession.cpp @@ -697,28 +697,35 @@ JupiterSession::GetMultipartBuildBlob(std::string_view Namespace, while (Offset < TotalSize) { uint64_t PartSize = Min(ChunkSize, TotalSize - Offset); - OutWorkItems.emplace_back( - [this, Namespace, BucketId, BuildId, Hash, TotalSize, Workload, Offset, PartSize]() -> JupiterResult { - std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}", - Namespace, - BucketId, - BuildId, - Hash.ToHexString(), - m_AllowRedirect ? "true"sv : "false"sv); - HttpClient::Response Response = m_HttpClient.Get( - RequestUrl, - HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", Offset, Offset + PartSize - 1)}})); - if (Response.IsSuccess()) + OutWorkItems.emplace_back([this, + Namespace = std::string(Namespace), + BucketId = std::string(BucketId), + BuildId = Oid(BuildId), + Hash = IoHash(Hash), + TotalSize, + Workload, + Offset, + PartSize]() -> JupiterResult { + std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}", + Namespace, + BucketId, + BuildId, + Hash.ToHexString(), + m_AllowRedirect ? "true"sv : "false"sv); + HttpClient::Response Response = m_HttpClient.Get( + RequestUrl, + HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", Offset, Offset + PartSize - 1)}})); + if (Response.IsSuccess()) + { + Workload->OnReceive(Offset, Response.ResponsePayload); + uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Response.ResponsePayload.GetSize()); + if (ByteRemaning == Response.ResponsePayload.GetSize()) { - Workload->OnReceive(Offset, Response.ResponsePayload); - uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Response.ResponsePayload.GetSize()); - if (ByteRemaning == Response.ResponsePayload.GetSize()) - { - Workload->OnComplete(); - } + Workload->OnComplete(); } - return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv); - }); + } + return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv); + }); Offset += PartSize; } } |