diff options
| author | Dan Engelbrecht <[email protected]> | 2025-05-16 19:51:36 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-05-16 19:51:36 +0200 |
| commit | 4e2efa1051e3eb86ab48d92b3f6ad5896cda5d81 (patch) | |
| tree | 1fbab083b3fe8919a36caa2d925c933f696a5791 /src | |
| parent | validate custom fields (#399) (diff) | |
| download | zen-4e2efa1051e3eb86ab48d92b3f6ad5896cda5d81.tar.xz zen-4e2efa1051e3eb86ab48d92b3f6ad5896cda5d81.zip | |
parallel work handle dispatch exception (#400)
- Bugfix: Wait for async threads if dispatching of work using ParallellWork throws exception
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 2361 | ||||
| -rw-r--r-- | src/zen/cmds/wipe_cmd.cpp | 4 | ||||
| -rw-r--r-- | src/zencore/filesystem.cpp | 5 | ||||
| -rw-r--r-- | src/zenutil/buildstoragecache.cpp | 4 | ||||
| -rw-r--r-- | src/zenutil/chunkedcontent.cpp | 52 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/buildstoragecache.h | 2 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/chunkedcontent.h | 4 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/parallellwork.h | 119 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/parallelwork.h | 71 | ||||
| -rw-r--r-- | src/zenutil/parallelwork.cpp | 192 | ||||
| -rw-r--r-- | src/zenutil/zenutil.cpp | 2 |
11 files changed, 1444 insertions, 1372 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index 117d0b291..815bb7597 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -30,7 +30,7 @@ #include <zenutil/filebuildstorage.h> #include <zenutil/jupiter/jupiterbuildstorage.h> #include <zenutil/jupiter/jupitersession.h> -#include <zenutil/parallellwork.h> +#include <zenutil/parallelwork.h> #include <zenutil/workerpools.h> #include <zenutil/zenserverprocess.h> @@ -356,7 +356,7 @@ namespace { std::atomic<uint64_t> DiscoveredItemCount = 0; std::atomic<uint64_t> DeletedItemCount = 0; std::atomic<uint64_t> DeletedByteCount = 0; - ParallellWork Work(AbortFlag); + ParallelWork Work(AbortFlag); struct AsyncVisitor : public GetDirectoryContentVisitor { @@ -1749,7 +1749,7 @@ namespace { const Oid& BuildId, const IoHash& ChunkHash, const std::uint64_t PreferredMultipartChunkSize, - ParallellWork& Work, + ParallelWork& Work, WorkerThreadPool& NetworkPool, DownloadStatistics& DownloadStats, std::function<void(IoBuffer&& Payload)>&& OnDownloadComplete) @@ -1800,16 +1800,13 @@ namespace { } for (auto& WorkItem : WorkItems) { - Work.ScheduleWork( - NetworkPool, - [WorkItem = std::move(WorkItem)](std::atomic<bool>&) { - ZEN_TRACE_CPU("DownloadLargeBlob_Work"); - if (!AbortFlag) - { - WorkItem(); - } - }, - Work.DefaultErrorFunction()); + Work.ScheduleWork(NetworkPool, [WorkItem = std::move(WorkItem)](std::atomic<bool>&) { + ZEN_TRACE_CPU("DownloadLargeBlob_Work"); + if (!AbortFlag) + { + WorkItem(); + } + }); } } @@ -1902,7 +1899,7 @@ namespace { WorkerThreadPool& NetworkPool = GetNetworkPool(); WorkerThreadPool& VerifyPool = GetIOWorkerPool(); - ParallellWork Work(AbortFlag); + ParallelWork Work(AbortFlag); const std::filesystem::path TempFolder = ".zen-tmp"; @@ -1927,95 +1924,41 @@ namespace { for (const IoHash& ChunkAttachment : ChunkAttachments) { - Work.ScheduleWork( - NetworkPool, - [&, ChunkAttachment](std::atomic<bool>&) { - if (!AbortFlag) - { - ZEN_TRACE_CPU("ValidateBuildPart_GetChunk"); - - FilteredDownloadedBytesPerSecond.Start(); - DownloadLargeBlob(Storage, - TempFolder, - BuildId, - ChunkAttachment, - PreferredMultipartChunkSize, - Work, - NetworkPool, - DownloadStats, - [&, ChunkHash = ChunkAttachment](IoBuffer&& Payload) { - Payload.SetContentType(ZenContentType::kCompressedBinary); - if (!AbortFlag) - { - Work.ScheduleWork( - VerifyPool, - [&, Payload = std::move(Payload), ChunkHash](std::atomic<bool>&) mutable { - if (!AbortFlag) - { - ZEN_TRACE_CPU("ValidateBuildPart_Validate"); - - if (DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount == - AttachmentsToVerifyCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - - FilteredVerifiedBytesPerSecond.Start(); - - uint64_t CompressedSize; - uint64_t DecompressedSize; - ValidateBlob(std::move(Payload), ChunkHash, CompressedSize, DecompressedSize); - ValidateStats.VerifiedAttachmentCount++; - ValidateStats.VerifiedByteCount += DecompressedSize; - if (ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount) - { - FilteredVerifiedBytesPerSecond.Stop(); - } - } - }, - Work.DefaultErrorFunction()); - } - }); - } - }, - Work.DefaultErrorFunction()); - } - - for (const IoHash& BlockAttachment : BlockAttachments) - { - Work.ScheduleWork( - NetworkPool, - [&, BlockAttachment](std::atomic<bool>&) { - if (!AbortFlag) - { - ZEN_TRACE_CPU("ValidateBuildPart_GetBlock"); + Work.ScheduleWork(NetworkPool, [&, ChunkAttachment](std::atomic<bool>&) { + if (!AbortFlag) + { + ZEN_TRACE_CPU("ValidateBuildPart_GetChunk"); - FilteredDownloadedBytesPerSecond.Start(); - IoBuffer Payload = Storage.GetBuildBlob(BuildId, BlockAttachment); - DownloadStats.DownloadedBlockCount++; - DownloadStats.DownloadedBlockByteCount += Payload.GetSize(); - if (DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount == AttachmentsToVerifyCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - if (!Payload) - { - throw std::runtime_error(fmt::format("Block attachment {} could not be found", BlockAttachment)); - } - if (!AbortFlag) - { - Work.ScheduleWork( - VerifyPool, - [&, Payload = std::move(Payload), BlockAttachment](std::atomic<bool>&) mutable { + FilteredDownloadedBytesPerSecond.Start(); + DownloadLargeBlob( + Storage, + TempFolder, + BuildId, + ChunkAttachment, + PreferredMultipartChunkSize, + Work, + NetworkPool, + DownloadStats, + [&, ChunkHash = ChunkAttachment](IoBuffer&& Payload) { + Payload.SetContentType(ZenContentType::kCompressedBinary); + if (!AbortFlag) + { + Work.ScheduleWork(VerifyPool, [&, Payload = std::move(Payload), ChunkHash](std::atomic<bool>&) mutable { if (!AbortFlag) { - ZEN_TRACE_CPU("ValidateBuildPart_ValidateBlock"); + ZEN_TRACE_CPU("ValidateBuildPart_Validate"); + + if (DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount == + AttachmentsToVerifyCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } FilteredVerifiedBytesPerSecond.Start(); uint64_t CompressedSize; uint64_t DecompressedSize; - ValidateChunkBlock(std::move(Payload), BlockAttachment, CompressedSize, DecompressedSize); + ValidateBlob(std::move(Payload), ChunkHash, CompressedSize, DecompressedSize); ValidateStats.VerifiedAttachmentCount++; ValidateStats.VerifiedByteCount += DecompressedSize; if (ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount) @@ -2023,12 +1966,55 @@ namespace { FilteredVerifiedBytesPerSecond.Stop(); } } - }, - Work.DefaultErrorFunction()); - } + }); + } + }); + } + }); + } + + for (const IoHash& BlockAttachment : BlockAttachments) + { + Work.ScheduleWork(NetworkPool, [&, BlockAttachment](std::atomic<bool>&) { + if (!AbortFlag) + { + ZEN_TRACE_CPU("ValidateBuildPart_GetBlock"); + + FilteredDownloadedBytesPerSecond.Start(); + IoBuffer Payload = Storage.GetBuildBlob(BuildId, BlockAttachment); + DownloadStats.DownloadedBlockCount++; + DownloadStats.DownloadedBlockByteCount += Payload.GetSize(); + if (DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount == AttachmentsToVerifyCount) + { + FilteredDownloadedBytesPerSecond.Stop(); } - }, - Work.DefaultErrorFunction()); + if (!Payload) + { + throw std::runtime_error(fmt::format("Block attachment {} could not be found", BlockAttachment)); + } + if (!AbortFlag) + { + Work.ScheduleWork(VerifyPool, [&, Payload = std::move(Payload), BlockAttachment](std::atomic<bool>&) mutable { + if (!AbortFlag) + { + ZEN_TRACE_CPU("ValidateBuildPart_ValidateBlock"); + + FilteredVerifiedBytesPerSecond.Start(); + + uint64_t CompressedSize; + uint64_t DecompressedSize; + ValidateChunkBlock(std::move(Payload), BlockAttachment, CompressedSize, DecompressedSize); + ValidateStats.VerifiedAttachmentCount++; + ValidateStats.VerifiedByteCount += DecompressedSize; + if (ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount) + { + FilteredVerifiedBytesPerSecond.Stop(); + } + } + }); + } + } + }); } Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) { @@ -2286,7 +2272,7 @@ namespace { FilteredRate FilteredGeneratedBytesPerSecond; FilteredRate FilteredUploadedBytesPerSecond; - ParallellWork Work(AbortFlag); + ParallelWork Work(AbortFlag); std::atomic<uint64_t> QueuedPendingBlocksForUpload = 0; @@ -2297,139 +2283,134 @@ namespace { break; } const std::vector<uint32_t>& ChunksInBlock = NewBlockChunks[BlockIndex]; - Work.ScheduleWork( - GenerateBlobsPool, - [&, BlockIndex](std::atomic<bool>&) { - if (!AbortFlag) - { - ZEN_TRACE_CPU("GenerateBuildBlocks_Generate"); - - FilteredGeneratedBytesPerSecond.Start(); - // TODO: Convert ScheduleWork body to function - - Stopwatch GenerateTimer; - CompressedBuffer CompressedBlock = - GenerateBlock(Path, Content, Lookup, ChunksInBlock, OutBlocks.BlockDescriptions[BlockIndex], DiskStats); - ZEN_CONSOLE_VERBOSE("Generated block {} ({}) containing {} chunks in {}", - OutBlocks.BlockDescriptions[BlockIndex].BlockHash, - NiceBytes(CompressedBlock.GetCompressedSize()), - OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size(), - NiceTimeSpanMs(GenerateTimer.GetElapsedTimeMs())); + Work.ScheduleWork(GenerateBlobsPool, [&, BlockIndex](std::atomic<bool>&) { + if (!AbortFlag) + { + ZEN_TRACE_CPU("GenerateBuildBlocks_Generate"); - OutBlocks.BlockSizes[BlockIndex] = CompressedBlock.GetCompressedSize(); - { - CbObjectWriter Writer; - Writer.AddString("createdBy", "zen"); - OutBlocks.BlockMetaDatas[BlockIndex] = Writer.Save(); - } - GenerateBlocksStats.GeneratedBlockByteCount += OutBlocks.BlockSizes[BlockIndex]; - GenerateBlocksStats.GeneratedBlockCount++; + FilteredGeneratedBytesPerSecond.Start(); + // TODO: Convert ScheduleWork body to function - Lock.WithExclusiveLock([&]() { - OutBlocks.BlockHashToBlockIndex.insert_or_assign(OutBlocks.BlockDescriptions[BlockIndex].BlockHash, - BlockIndex); - }); + Stopwatch GenerateTimer; + CompressedBuffer CompressedBlock = + GenerateBlock(Path, Content, Lookup, ChunksInBlock, OutBlocks.BlockDescriptions[BlockIndex], DiskStats); + ZEN_CONSOLE_VERBOSE("Generated block {} ({}) containing {} chunks in {}", + OutBlocks.BlockDescriptions[BlockIndex].BlockHash, + NiceBytes(CompressedBlock.GetCompressedSize()), + OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size(), + NiceTimeSpanMs(GenerateTimer.GetElapsedTimeMs())); - { - std::span<const SharedBuffer> Segments = CompressedBlock.GetCompressed().GetSegments(); - ZEN_ASSERT(Segments.size() >= 2); - OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]); - } + OutBlocks.BlockSizes[BlockIndex] = CompressedBlock.GetCompressedSize(); + { + CbObjectWriter Writer; + Writer.AddString("createdBy", "zen"); + OutBlocks.BlockMetaDatas[BlockIndex] = Writer.Save(); + } + GenerateBlocksStats.GeneratedBlockByteCount += OutBlocks.BlockSizes[BlockIndex]; + GenerateBlocksStats.GeneratedBlockCount++; - if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount) - { - FilteredGeneratedBytesPerSecond.Stop(); - } + Lock.WithExclusiveLock([&]() { + OutBlocks.BlockHashToBlockIndex.insert_or_assign(OutBlocks.BlockDescriptions[BlockIndex].BlockHash, BlockIndex); + }); - if (QueuedPendingBlocksForUpload.load() > 16) - { - std::span<const SharedBuffer> Segments = CompressedBlock.GetCompressed().GetSegments(); - ZEN_ASSERT(Segments.size() >= 2); - OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]); - } - else + { + std::span<const SharedBuffer> Segments = CompressedBlock.GetCompressed().GetSegments(); + ZEN_ASSERT(Segments.size() >= 2); + OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]); + } + + if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount) + { + FilteredGeneratedBytesPerSecond.Stop(); + } + + if (QueuedPendingBlocksForUpload.load() > 16) + { + std::span<const SharedBuffer> Segments = CompressedBlock.GetCompressed().GetSegments(); + ZEN_ASSERT(Segments.size() >= 2); + OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]); + } + else + { + if (!AbortFlag) { - if (!AbortFlag) - { - QueuedPendingBlocksForUpload++; + QueuedPendingBlocksForUpload++; - Work.ScheduleWork( - UploadBlocksPool, - [&, BlockIndex, Payload = std::move(CompressedBlock)](std::atomic<bool>&) mutable { - auto _ = MakeGuard([&QueuedPendingBlocksForUpload] { QueuedPendingBlocksForUpload--; }); - if (!AbortFlag) + Work.ScheduleWork( + UploadBlocksPool, + [&, BlockIndex, Payload = std::move(CompressedBlock)](std::atomic<bool>&) mutable { + auto _ = MakeGuard([&QueuedPendingBlocksForUpload] { QueuedPendingBlocksForUpload--; }); + if (!AbortFlag) + { + if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount) { - if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount) - { - ZEN_TRACE_CPU("GenerateBuildBlocks_Save"); + ZEN_TRACE_CPU("GenerateBuildBlocks_Save"); - FilteredUploadedBytesPerSecond.Stop(); - std::span<const SharedBuffer> Segments = Payload.GetCompressed().GetSegments(); - ZEN_ASSERT(Segments.size() >= 2); - OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]); - } - else - { - ZEN_TRACE_CPU("GenerateBuildBlocks_Upload"); + FilteredUploadedBytesPerSecond.Stop(); + std::span<const SharedBuffer> Segments = Payload.GetCompressed().GetSegments(); + ZEN_ASSERT(Segments.size() >= 2); + OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]); + } + else + { + ZEN_TRACE_CPU("GenerateBuildBlocks_Upload"); - FilteredUploadedBytesPerSecond.Start(); - // TODO: Convert ScheduleWork body to function + FilteredUploadedBytesPerSecond.Start(); + // TODO: Convert ScheduleWork body to function - const CbObject BlockMetaData = - BuildChunkBlockDescription(OutBlocks.BlockDescriptions[BlockIndex], - OutBlocks.BlockMetaDatas[BlockIndex]); + const CbObject BlockMetaData = + BuildChunkBlockDescription(OutBlocks.BlockDescriptions[BlockIndex], + OutBlocks.BlockMetaDatas[BlockIndex]); - const IoHash& BlockHash = OutBlocks.BlockDescriptions[BlockIndex].BlockHash; - const uint64_t CompressedBlockSize = Payload.GetCompressedSize(); + const IoHash& BlockHash = OutBlocks.BlockDescriptions[BlockIndex].BlockHash; + const uint64_t CompressedBlockSize = Payload.GetCompressedSize(); - if (Storage.BuildCacheStorage) - { - Storage.BuildCacheStorage->PutBuildBlob(BuildId, - BlockHash, - ZenContentType::kCompressedBinary, - Payload.GetCompressed()); - } + if (Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->PutBuildBlob(BuildId, + BlockHash, + ZenContentType::kCompressedBinary, + Payload.GetCompressed()); + } - Storage.BuildStorage->PutBuildBlob(BuildId, - BlockHash, - ZenContentType::kCompressedBinary, - std::move(Payload).GetCompressed()); - UploadStats.BlocksBytes += CompressedBlockSize; + Storage.BuildStorage->PutBuildBlob(BuildId, + BlockHash, + ZenContentType::kCompressedBinary, + std::move(Payload).GetCompressed()); + UploadStats.BlocksBytes += CompressedBlockSize; - ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks", - BlockHash, - NiceBytes(CompressedBlockSize), - OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); + ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks", + BlockHash, + NiceBytes(CompressedBlockSize), + OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); - if (Storage.BuildCacheStorage) - { - Storage.BuildCacheStorage->PutBlobMetadatas(BuildId, - std::vector<IoHash>({BlockHash}), - std::vector<CbObject>({BlockMetaData})); - } + if (Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->PutBlobMetadatas(BuildId, + std::vector<IoHash>({BlockHash}), + std::vector<CbObject>({BlockMetaData})); + } - Storage.BuildStorage->PutBlockMetadata(BuildId, BlockHash, BlockMetaData); - ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})", - BlockHash, - NiceBytes(BlockMetaData.GetSize())); + Storage.BuildStorage->PutBlockMetadata(BuildId, BlockHash, BlockMetaData); + ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})", + BlockHash, + NiceBytes(BlockMetaData.GetSize())); - OutBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; + OutBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; - UploadStats.BlocksBytes += BlockMetaData.GetSize(); - UploadStats.BlockCount++; - if (UploadStats.BlockCount == NewBlockCount) - { - FilteredUploadedBytesPerSecond.Stop(); - } + UploadStats.BlocksBytes += BlockMetaData.GetSize(); + UploadStats.BlockCount++; + if (UploadStats.BlockCount == NewBlockCount) + { + FilteredUploadedBytesPerSecond.Stop(); } } - }, - Work.DefaultErrorFunction()); - } + } + }); } } - }, - Work.DefaultErrorFunction()); + } + }); } Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) { @@ -2491,7 +2472,7 @@ namespace { FilteredRate FilteredCompressedBytesPerSecond; FilteredRate FilteredUploadedBytesPerSecond; - ParallellWork Work(AbortFlag); + ParallelWork Work(AbortFlag); std::atomic<size_t> UploadedBlockSize = 0; std::atomic<size_t> UploadedBlockCount = 0; @@ -2606,8 +2587,7 @@ namespace { FilteredUploadedBytesPerSecond.Stop(); } } - }, - Work.DefaultErrorFunction()); + }); }; auto AsyncUploadLooseChunk = [&](const IoHash& RawHash, uint64_t RawSize, CompositeBuffer&& Payload) { @@ -2658,16 +2638,13 @@ namespace { }); for (auto& WorkPart : MultipartWork) { - Work.ScheduleWork( - UploadChunkPool, - [Work = std::move(WorkPart)](std::atomic<bool>&) { - ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart_Work"); - if (!AbortFlag) - { - Work(); - } - }, - Work.DefaultErrorFunction()); + Work.ScheduleWork(UploadChunkPool, [Work = std::move(WorkPart)](std::atomic<bool>&) { + ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart_Work"); + if (!AbortFlag) + { + Work(); + } + }); } ZEN_CONSOLE_VERBOSE("Uploaded multipart chunk {} ({})", RawHash, NiceBytes(PayloadSize)); } @@ -2687,8 +2664,7 @@ namespace { } } } - }, - Work.DefaultErrorFunction()); + }); }; std::vector<size_t> GenerateBlockIndexes; @@ -2704,59 +2680,56 @@ namespace { const IoHash& BlockHash = NewBlocks.BlockDescriptions[BlockIndex].BlockHash; if (!AbortFlag) { - Work.ScheduleWork( - ReadChunkPool, - [&, BlockIndex](std::atomic<bool>&) { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UploadPartBlobs_GenerateBlock"); + Work.ScheduleWork(ReadChunkPool, [&, BlockIndex](std::atomic<bool>&) { + if (!AbortFlag) + { + ZEN_TRACE_CPU("UploadPartBlobs_GenerateBlock"); - FilteredGenerateBlockBytesPerSecond.Start(); + FilteredGenerateBlockBytesPerSecond.Start(); - Stopwatch GenerateTimer; - CompositeBuffer Payload; - if (NewBlocks.BlockHeaders[BlockIndex]) - { - Payload = RebuildBlock(Path, - Content, - Lookup, - std::move(NewBlocks.BlockHeaders[BlockIndex]), - NewBlockChunks[BlockIndex], - DiskStats) - .GetCompressed(); - } - else + Stopwatch GenerateTimer; + CompositeBuffer Payload; + if (NewBlocks.BlockHeaders[BlockIndex]) + { + Payload = RebuildBlock(Path, + Content, + Lookup, + std::move(NewBlocks.BlockHeaders[BlockIndex]), + NewBlockChunks[BlockIndex], + DiskStats) + .GetCompressed(); + } + else + { + ChunkBlockDescription BlockDescription; + CompressedBuffer CompressedBlock = + GenerateBlock(Path, Content, Lookup, NewBlockChunks[BlockIndex], BlockDescription, DiskStats); + if (!CompressedBlock) { - ChunkBlockDescription BlockDescription; - CompressedBuffer CompressedBlock = - GenerateBlock(Path, Content, Lookup, NewBlockChunks[BlockIndex], BlockDescription, DiskStats); - if (!CompressedBlock) - { - throw std::runtime_error(fmt::format("Failed generating block {}", BlockHash)); - } - ZEN_ASSERT(BlockDescription.BlockHash == BlockHash); - Payload = std::move(CompressedBlock).GetCompressed(); + throw std::runtime_error(fmt::format("Failed generating block {}", BlockHash)); } + ZEN_ASSERT(BlockDescription.BlockHash == BlockHash); + Payload = std::move(CompressedBlock).GetCompressed(); + } - GeneratedBlockByteCount += NewBlocks.BlockSizes[BlockIndex]; - GeneratedBlockCount++; - if (GeneratedBlockCount == GenerateBlockIndexes.size()) - { - FilteredGenerateBlockBytesPerSecond.Stop(); - } - ZEN_CONSOLE_VERBOSE("{} block {} ({}) containing {} chunks in {}", - NewBlocks.BlockHeaders[BlockIndex] ? "Regenerated" : "Generated", - NewBlocks.BlockDescriptions[BlockIndex].BlockHash, - NiceBytes(NewBlocks.BlockSizes[BlockIndex]), - NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size(), - NiceTimeSpanMs(GenerateTimer.GetElapsedTimeMs())); - if (!AbortFlag) - { - AsyncUploadBlock(BlockIndex, BlockHash, std::move(Payload), QueuedPendingInMemoryBlocksForUpload); - } + GeneratedBlockByteCount += NewBlocks.BlockSizes[BlockIndex]; + GeneratedBlockCount++; + if (GeneratedBlockCount == GenerateBlockIndexes.size()) + { + FilteredGenerateBlockBytesPerSecond.Stop(); } - }, - Work.DefaultErrorFunction()); + ZEN_CONSOLE_VERBOSE("{} block {} ({}) containing {} chunks in {}", + NewBlocks.BlockHeaders[BlockIndex] ? "Regenerated" : "Generated", + NewBlocks.BlockDescriptions[BlockIndex].BlockHash, + NiceBytes(NewBlocks.BlockSizes[BlockIndex]), + NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size(), + NiceTimeSpanMs(GenerateTimer.GetElapsedTimeMs())); + if (!AbortFlag) + { + AsyncUploadBlock(BlockIndex, BlockHash, std::move(Payload), QueuedPendingInMemoryBlocksForUpload); + } + } + }); } } @@ -2764,35 +2737,32 @@ namespace { for (const uint32_t LooseChunkOrderIndex : LooseChunkOrderIndexes) { const uint32_t ChunkIndex = LooseChunkIndexes[LooseChunkOrderIndex]; - Work.ScheduleWork( - ReadChunkPool, - [&, ChunkIndex](std::atomic<bool>&) { + Work.ScheduleWork(ReadChunkPool, [&, ChunkIndex](std::atomic<bool>&) { + if (!AbortFlag) + { + ZEN_TRACE_CPU("UploadPartBlobs_CompressChunk"); + + FilteredCompressedBytesPerSecond.Start(); + Stopwatch CompressTimer; + CompositeBuffer Payload = + CompressChunk(Path, Content, Lookup, ChunkIndex, ZenTempChunkFolderPath(ZenFolderPath), LooseChunksStats); + ZEN_CONSOLE_VERBOSE("Compressed chunk {} ({} -> {}) in {}", + Content.ChunkedContent.ChunkHashes[ChunkIndex], + NiceBytes(Content.ChunkedContent.ChunkRawSizes[ChunkIndex]), + NiceBytes(Payload.GetSize()), + NiceTimeSpanMs(CompressTimer.GetElapsedTimeMs())); + const uint64_t ChunkRawSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; + UploadStats.ReadFromDiskBytes += ChunkRawSize; + if (LooseChunksStats.CompressedChunkCount == LooseChunkOrderIndexes.size()) + { + FilteredCompressedBytesPerSecond.Stop(); + } if (!AbortFlag) { - ZEN_TRACE_CPU("UploadPartBlobs_CompressChunk"); - - FilteredCompressedBytesPerSecond.Start(); - Stopwatch CompressTimer; - CompositeBuffer Payload = - CompressChunk(Path, Content, Lookup, ChunkIndex, ZenTempChunkFolderPath(ZenFolderPath), LooseChunksStats); - ZEN_CONSOLE_VERBOSE("Compressed chunk {} ({} -> {}) in {}", - Content.ChunkedContent.ChunkHashes[ChunkIndex], - NiceBytes(Content.ChunkedContent.ChunkRawSizes[ChunkIndex]), - NiceBytes(Payload.GetSize()), - NiceTimeSpanMs(CompressTimer.GetElapsedTimeMs())); - const uint64_t ChunkRawSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; - UploadStats.ReadFromDiskBytes += ChunkRawSize; - if (LooseChunksStats.CompressedChunkCount == LooseChunkOrderIndexes.size()) - { - FilteredCompressedBytesPerSecond.Stop(); - } - if (!AbortFlag) - { - AsyncUploadLooseChunk(Content.ChunkedContent.ChunkHashes[ChunkIndex], ChunkRawSize, std::move(Payload)); - } + AsyncUploadLooseChunk(Content.ChunkedContent.ChunkHashes[ChunkIndex], ChunkRawSize, std::move(Payload)); } - }, - Work.DefaultErrorFunction()); + } + }); } Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) { @@ -4045,7 +4015,7 @@ namespace { WorkerThreadPool& VerifyPool = GetIOWorkerPool(); - ParallellWork Work(AbortFlag); + ParallelWork Work(AbortFlag); const uint32_t PathCount = gsl::narrow<uint32_t>(Content.Paths.size()); @@ -4173,11 +4143,20 @@ namespace { VerifyFolderStats.FilesVerified++; } }, - [&, PathIndex](const std::exception& Ex, std::atomic<bool>&) { + [&, PathIndex](std::exception_ptr Ex, std::atomic<bool>&) { + std::string Description; + try + { + std::rethrow_exception(Ex); + } + catch (const std::exception& Ex) + { + Description = Ex.what(); + } ErrorLock.WithExclusiveLock([&]() { Errors.push_back(fmt::format("Failed verifying file '{}'. Reason: {}", (Path / Content.Paths[PathIndex]).make_preferred(), - Ex.what())); + Description)); }); VerifyFolderStats.FilesFailed++; }); @@ -4405,7 +4384,7 @@ namespace { const ChunkedFolderContent& RemoteContent, const ChunkedContentLookup& Lookup, std::span<const uint32_t> RemoteSequenceIndexes, - ParallellWork& Work, + ParallelWork& Work, WorkerThreadPool& VerifyPool) { if (RemoteSequenceIndexes.empty()) @@ -4416,21 +4395,18 @@ namespace { for (uint32_t RemoteSequenceIndexOffset = 1; RemoteSequenceIndexOffset < RemoteSequenceIndexes.size(); RemoteSequenceIndexOffset++) { const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset]; - Work.ScheduleWork( - VerifyPool, - [&RemoteContent, &Lookup, TargetFolder, RemoteSequenceIndex](std::atomic<bool>&) { + Work.ScheduleWork(VerifyPool, [&RemoteContent, &Lookup, TargetFolder, RemoteSequenceIndex](std::atomic<bool>&) { + if (!AbortFlag) + { + ZEN_TRACE_CPU("VerifyAndCompleteChunkSequenceAsync"); + VerifySequence(TargetFolder, RemoteContent, Lookup, RemoteSequenceIndex); if (!AbortFlag) { - ZEN_TRACE_CPU("VerifyAndCompleteChunkSequenceAsync"); - VerifySequence(TargetFolder, RemoteContent, Lookup, RemoteSequenceIndex); - if (!AbortFlag) - { - const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; - FinalizeChunkSequence(TargetFolder, SequenceRawHash); - } + const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + FinalizeChunkSequence(TargetFolder, SequenceRawHash); } - }, - Work.DefaultErrorFunction()); + } + }); } const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[0]; @@ -4480,7 +4456,7 @@ namespace { const ChunkedContentLookup& Lookup, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, const BlockWriteOps& Ops, - ParallellWork& Work, + ParallelWork& Work, WorkerThreadPool& VerifyPool, DiskStatistics& DiskStats) { @@ -4672,7 +4648,7 @@ namespace { const ChunkedFolderContent& RemoteContent, const ChunkBlockDescription& BlockDescription, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, - ParallellWork& Work, + ParallelWork& Work, WorkerThreadPool& VerifyPool, CompositeBuffer&& BlockBuffer, const ChunkedContentLookup& Lookup, @@ -4745,7 +4721,7 @@ namespace { const ChunkedFolderContent& RemoteContent, const ChunkBlockDescription& BlockDescription, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, - ParallellWork& Work, + ParallelWork& Work, WorkerThreadPool& VerifyPool, CompositeBuffer&& PartialBlockBuffer, uint32_t FirstIncludedBlockChunkIndex, @@ -4978,7 +4954,7 @@ namespace { const ChunkedContentLookup& RemoteLookup, uint32_t RemoteChunkIndex, std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>&& ChunkTargetPtrs, - ParallellWork& Work, + ParallelWork& Work, WorkerThreadPool& WritePool, IoBuffer&& Payload, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, @@ -5098,8 +5074,7 @@ namespace { } } } - }, - Work.DefaultErrorFunction()); + }); }; bool ReadStateFile(const std::filesystem::path& StateFilePath, @@ -5167,36 +5142,35 @@ namespace { ProgressBar ProgressBar(ProgressMode, "Check Files"); - ParallellWork Work(AbortFlag); + ParallelWork Work(AbortFlag); std::atomic<uint64_t> CompletedPathCount = 0; uint32_t PathIndex = 0; while (PathIndex < PathCount) { uint32_t PathRangeCount = Min(128u, PathCount - PathIndex); - Work.ScheduleWork( - GetIOWorkerPool(), - [PathIndex, PathRangeCount, &PathsToCheck, &Path, &Result, &CompletedPathCount, &LocalFolderScanStats]( - std::atomic<bool>&) { - for (uint32_t PathRangeIndex = PathIndex; PathRangeIndex < PathIndex + PathRangeCount; PathRangeIndex++) - { - const std::filesystem::path& FilePath = PathsToCheck[PathRangeIndex]; - std::filesystem::path LocalFilePath = (Path / FilePath).make_preferred(); - if (TryGetFileProperties(LocalFilePath, - Result.RawSizes[PathRangeIndex], - Result.ModificationTicks[PathRangeIndex], - Result.Attributes[PathRangeIndex])) - { - Result.Paths[PathRangeIndex] = std::move(FilePath); - LocalFolderScanStats.FoundFileCount++; - LocalFolderScanStats.FoundFileByteCount += Result.RawSizes[PathRangeIndex]; - LocalFolderScanStats.AcceptedFileCount++; - LocalFolderScanStats.AcceptedFileByteCount += Result.RawSizes[PathRangeIndex]; - } - CompletedPathCount++; - } - }, - Work.DefaultErrorFunction()); + Work.ScheduleWork(GetIOWorkerPool(), + [PathIndex, PathRangeCount, &PathsToCheck, &Path, &Result, &CompletedPathCount, &LocalFolderScanStats]( + std::atomic<bool>&) { + for (uint32_t PathRangeIndex = PathIndex; PathRangeIndex < PathIndex + PathRangeCount; + PathRangeIndex++) + { + const std::filesystem::path& FilePath = PathsToCheck[PathRangeIndex]; + std::filesystem::path LocalFilePath = (Path / FilePath).make_preferred(); + if (TryGetFileProperties(LocalFilePath, + Result.RawSizes[PathRangeIndex], + Result.ModificationTicks[PathRangeIndex], + Result.Attributes[PathRangeIndex])) + { + Result.Paths[PathRangeIndex] = std::move(FilePath); + LocalFolderScanStats.FoundFileCount++; + LocalFolderScanStats.FoundFileByteCount += Result.RawSizes[PathRangeIndex]; + LocalFolderScanStats.AcceptedFileCount++; + LocalFolderScanStats.AcceptedFileByteCount += Result.RawSizes[PathRangeIndex]; + } + CompletedPathCount++; + } + }); PathIndex += PathRangeCount; } Work.Wait(200, [&](bool, ptrdiff_t) { @@ -5831,8 +5805,8 @@ namespace { WorkerThreadPool& NetworkPool = GetNetworkPool(); WorkerThreadPool& WritePool = GetIOWorkerPool(); - ProgressBar WriteProgressBar(ProgressMode, PrimeCacheOnly ? "Downloading" : "Writing"); - ParallellWork Work(AbortFlag); + ProgressBar WriteProgressBar(ProgressMode, PrimeCacheOnly ? "Downloading" : "Writing"); + ParallelWork Work(AbortFlag); struct LooseChunkHashWorkData { @@ -6199,41 +6173,38 @@ namespace { } if (!PrimeCacheOnly) { - Work.ScheduleWork( - WritePool, - [&, ScavengeOpIndex](std::atomic<bool>&) mutable { - if (!AbortFlag) - { - const ScavengeCopyOperation& ScavengeOp = ScavengeCopyOperations[ScavengeOpIndex]; - const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengeOp.ScavengedContentIndex]; - const std::filesystem::path ScavengedPath = ScavengedContent.Paths[ScavengeOp.ScavengedPathIndex]; + Work.ScheduleWork(WritePool, [&, ScavengeOpIndex](std::atomic<bool>&) mutable { + if (!AbortFlag) + { + const ScavengeCopyOperation& ScavengeOp = ScavengeCopyOperations[ScavengeOpIndex]; + const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengeOp.ScavengedContentIndex]; + const std::filesystem::path ScavengedPath = ScavengedContent.Paths[ScavengeOp.ScavengedPathIndex]; - const std::filesystem::path ScavengedFilePath = - (ScavengedPaths[ScavengeOp.ScavengedContentIndex] / ScavengedPath).make_preferred(); - ZEN_ASSERT_SLOW(FileSizeFromPath(ScavengedFilePath) == ScavengeOp.RawSize); + const std::filesystem::path ScavengedFilePath = + (ScavengedPaths[ScavengeOp.ScavengedContentIndex] / ScavengedPath).make_preferred(); + ZEN_ASSERT_SLOW(FileSizeFromPath(ScavengedFilePath) == ScavengeOp.RawSize); - const IoHash& RemoteSequenceRawHash = - RemoteContent.ChunkedContent.SequenceRawHashes[ScavengeOp.RemoteSequenceIndex]; - const std::filesystem::path TempFilePath = - GetTempChunkedSequenceFileName(CacheFolderPath, RemoteSequenceRawHash); + const IoHash& RemoteSequenceRawHash = + RemoteContent.ChunkedContent.SequenceRawHashes[ScavengeOp.RemoteSequenceIndex]; + const std::filesystem::path TempFilePath = + GetTempChunkedSequenceFileName(CacheFolderPath, RemoteSequenceRawHash); - CopyFile(ScavengedFilePath, TempFilePath, {.EnableClone = false}); + CopyFile(ScavengedFilePath, TempFilePath, {.EnableClone = false}); - DiskStats.WriteCount++; - DiskStats.WriteByteCount += ScavengeOp.RawSize; + DiskStats.WriteCount++; + DiskStats.WriteByteCount += ScavengeOp.RawSize; - const std::filesystem::path CacheFilePath = - GetFinalChunkedSequenceFileName(CacheFolderPath, RemoteSequenceRawHash); - RenameFile(TempFilePath, CacheFilePath); + const std::filesystem::path CacheFilePath = + GetFinalChunkedSequenceFileName(CacheFolderPath, RemoteSequenceRawHash); + RenameFile(TempFilePath, CacheFilePath); - WritePartsComplete++; - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); - } + WritePartsComplete++; + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); } - }, - Work.DefaultErrorFunction()); + } + }); } } @@ -6256,287 +6227,279 @@ namespace { continue; } - Work.ScheduleWork( - WritePool, - [&, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) mutable { - if (!AbortFlag) + Work.ScheduleWork(WritePool, [&, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) mutable { + if (!AbortFlag) + { + ZEN_TRACE_CPU("UpdateFolder_ReadPreDownloaded"); + std::filesystem::path ExistingCompressedChunkPath; + if (!PrimeCacheOnly) { - ZEN_TRACE_CPU("UpdateFolder_ReadPreDownloaded"); - std::filesystem::path ExistingCompressedChunkPath; - if (!PrimeCacheOnly) + const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + std::filesystem::path CompressedChunkPath = ZenTempDownloadFolderPath(ZenFolderPath) / ChunkHash.ToHexString(); + if (IsFile(CompressedChunkPath)) { - const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; - std::filesystem::path CompressedChunkPath = - ZenTempDownloadFolderPath(ZenFolderPath) / ChunkHash.ToHexString(); - if (IsFile(CompressedChunkPath)) + IoBuffer ExistingCompressedPart = IoBufferBuilder::MakeFromFile(ExistingCompressedChunkPath); + if (ExistingCompressedPart) { - IoBuffer ExistingCompressedPart = IoBufferBuilder::MakeFromFile(ExistingCompressedChunkPath); - if (ExistingCompressedPart) + IoHash RawHash; + uint64_t RawSize; + if (CompressedBuffer::ValidateCompressedHeader(ExistingCompressedPart, RawHash, RawSize)) { - IoHash RawHash; - uint64_t RawSize; - if (CompressedBuffer::ValidateCompressedHeader(ExistingCompressedPart, RawHash, RawSize)) - { - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - ExistingCompressedChunkPath = std::move(CompressedChunkPath); - } - else + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) { - std::error_code DummyEc; - RemoveFile(CompressedChunkPath, DummyEc); + FilteredDownloadedBytesPerSecond.Stop(); } + ExistingCompressedChunkPath = std::move(CompressedChunkPath); + } + else + { + std::error_code DummyEc; + RemoveFile(CompressedChunkPath, DummyEc); } } } - if (!AbortFlag) + } + if (!AbortFlag) + { + if (!ExistingCompressedChunkPath.empty()) { - if (!ExistingCompressedChunkPath.empty()) - { - Work.ScheduleWork( - WritePool, - [&Path, - &ZenFolderPath, - &RemoteContent, - &RemoteLookup, - &CacheFolderPath, - &SequenceIndexChunksLeftToWriteCounters, - &Work, - &WritePool, - &DiskStats, - &WriteChunkStats, - &WritePartsComplete, - TotalPartWriteCount, - &FilteredWrittenBytesPerSecond, - RemoteChunkIndex, - ChunkTargetPtrs, - CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>&) mutable { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_WritePreDownloaded"); + Work.ScheduleWork( + WritePool, + [&Path, + &ZenFolderPath, + &RemoteContent, + &RemoteLookup, + &CacheFolderPath, + &SequenceIndexChunksLeftToWriteCounters, + &Work, + &WritePool, + &DiskStats, + &WriteChunkStats, + &WritePartsComplete, + TotalPartWriteCount, + &FilteredWrittenBytesPerSecond, + RemoteChunkIndex, + ChunkTargetPtrs, + CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>&) mutable { + if (!AbortFlag) + { + ZEN_TRACE_CPU("UpdateFolder_WritePreDownloaded"); - FilteredWrittenBytesPerSecond.Start(); + FilteredWrittenBytesPerSecond.Start(); - const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; - IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); - if (!CompressedPart) - { - throw std::runtime_error( - fmt::format("Could not open dowloaded compressed chunk {} from {}", - ChunkHash, - CompressedChunkPath)); - } + IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); + if (!CompressedPart) + { + throw std::runtime_error(fmt::format("Could not open dowloaded compressed chunk {} from {}", + ChunkHash, + CompressedChunkPath)); + } - std::filesystem::path TargetFolder = ZenTempCacheFolderPath(ZenFolderPath); - bool NeedHashVerify = WriteCompressedChunk(TargetFolder, - RemoteContent, - RemoteLookup, - ChunkHash, - ChunkTargetPtrs, - std::move(CompressedPart), - DiskStats); - WritePartsComplete++; + std::filesystem::path TargetFolder = ZenTempCacheFolderPath(ZenFolderPath); + bool NeedHashVerify = WriteCompressedChunk(TargetFolder, + RemoteContent, + RemoteLookup, + ChunkHash, + ChunkTargetPtrs, + std::move(CompressedPart), + DiskStats); + WritePartsComplete++; - if (!AbortFlag) + if (!AbortFlag) + { + if (WritePartsComplete == TotalPartWriteCount) { - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); - } + FilteredWrittenBytesPerSecond.Stop(); + } - RemoveFileWithRetry(CompressedChunkPath); + RemoveFileWithRetry(CompressedChunkPath); - std::vector<uint32_t> CompletedSequences = - CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); - if (NeedHashVerify) - { - VerifyAndCompleteChunkSequencesAsync(TargetFolder, - RemoteContent, - RemoteLookup, - CompletedSequences, - Work, - WritePool); - } - else - { - FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences); - } + std::vector<uint32_t> CompletedSequences = + CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); + if (NeedHashVerify) + { + VerifyAndCompleteChunkSequencesAsync(TargetFolder, + RemoteContent, + RemoteLookup, + CompletedSequences, + Work, + WritePool); + } + else + { + FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences); } } - }, - Work.DefaultErrorFunction()); - } - else - { - Work.ScheduleWork( - NetworkPool, - [&Path, - &ZenFolderPath, - &Storage, - BuildId, - &PrimeCacheOnly, - &RemoteContent, - &RemoteLookup, - &ExistsResult, - &SequenceIndexChunksLeftToWriteCounters, - &Work, - &WritePool, - &NetworkPool, - &DiskStats, - &WriteChunkStats, - &WritePartsComplete, - TotalPartWriteCount, - TotalRequestCount, - &FilteredDownloadedBytesPerSecond, - &FilteredWrittenBytesPerSecond, - LargeAttachmentSize, - PreferredMultipartChunkSize, - RemoteChunkIndex, - ChunkTargetPtrs, - &DownloadStats](std::atomic<bool>&) mutable { - if (!AbortFlag) + } + }); + } + else + { + Work.ScheduleWork( + NetworkPool, + [&Path, + &ZenFolderPath, + &Storage, + BuildId, + &PrimeCacheOnly, + &RemoteContent, + &RemoteLookup, + &ExistsResult, + &SequenceIndexChunksLeftToWriteCounters, + &Work, + &WritePool, + &NetworkPool, + &DiskStats, + &WriteChunkStats, + &WritePartsComplete, + TotalPartWriteCount, + TotalRequestCount, + &FilteredDownloadedBytesPerSecond, + &FilteredWrittenBytesPerSecond, + LargeAttachmentSize, + PreferredMultipartChunkSize, + RemoteChunkIndex, + ChunkTargetPtrs, + &DownloadStats](std::atomic<bool>&) mutable { + if (!AbortFlag) + { + const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + FilteredDownloadedBytesPerSecond.Start(); + IoBuffer BuildBlob; + const bool ExistsInCache = + Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash); + if (ExistsInCache) { - const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; - FilteredDownloadedBytesPerSecond.Start(); - IoBuffer BuildBlob; - const bool ExistsInCache = - Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash); - if (ExistsInCache) + BuildBlob = Storage.BuildCacheStorage->GetBuildBlob(BuildId, ChunkHash); + } + if (BuildBlob) + { + uint64_t BlobSize = BuildBlob.GetSize(); + DownloadStats.DownloadedChunkCount++; + DownloadStats.DownloadedChunkByteCount += BlobSize; + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) { - BuildBlob = Storage.BuildCacheStorage->GetBuildBlob(BuildId, ChunkHash); + FilteredDownloadedBytesPerSecond.Stop(); } - if (BuildBlob) + AsyncWriteDownloadedChunk(ZenFolderPath, + RemoteContent, + RemoteLookup, + RemoteChunkIndex, + std::move(ChunkTargetPtrs), + Work, + WritePool, + std::move(BuildBlob), + SequenceIndexChunksLeftToWriteCounters, + WritePartsComplete, + TotalPartWriteCount, + FilteredWrittenBytesPerSecond, + DiskStats); + } + else + { + if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize) { - uint64_t BlobSize = BuildBlob.GetSize(); - DownloadStats.DownloadedChunkCount++; - DownloadStats.DownloadedChunkByteCount += BlobSize; - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - AsyncWriteDownloadedChunk(ZenFolderPath, - RemoteContent, - RemoteLookup, - RemoteChunkIndex, - std::move(ChunkTargetPtrs), - Work, - WritePool, - std::move(BuildBlob), - SequenceIndexChunksLeftToWriteCounters, - WritePartsComplete, - TotalPartWriteCount, - FilteredWrittenBytesPerSecond, - DiskStats); + ZEN_TRACE_CPU("UpdateFolder_GetLargeChunk"); + DownloadLargeBlob( + *Storage.BuildStorage, + ZenTempDownloadFolderPath(ZenFolderPath), + BuildId, + ChunkHash, + PreferredMultipartChunkSize, + Work, + NetworkPool, + DownloadStats, + [&, TotalPartWriteCount, TotalRequestCount, RemoteChunkIndex, ChunkTargetPtrs]( + IoBuffer&& Payload) mutable { + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + if (Payload && Storage.BuildCacheStorage) + { + Storage.BuildCacheStorage->PutBuildBlob( + BuildId, + ChunkHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(SharedBuffer(Payload))); + } + if (!PrimeCacheOnly) + { + if (!AbortFlag) + { + AsyncWriteDownloadedChunk(ZenFolderPath, + RemoteContent, + RemoteLookup, + RemoteChunkIndex, + std::move(ChunkTargetPtrs), + Work, + WritePool, + std::move(Payload), + SequenceIndexChunksLeftToWriteCounters, + WritePartsComplete, + TotalPartWriteCount, + FilteredWrittenBytesPerSecond, + DiskStats); + } + } + }); } else { - if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize) + ZEN_TRACE_CPU("UpdateFolder_GetChunk"); + BuildBlob = Storage.BuildStorage->GetBuildBlob(BuildId, ChunkHash); + if (BuildBlob && Storage.BuildCacheStorage) { - ZEN_TRACE_CPU("UpdateFolder_GetLargeChunk"); - DownloadLargeBlob( - *Storage.BuildStorage, - ZenTempDownloadFolderPath(ZenFolderPath), - BuildId, - ChunkHash, - PreferredMultipartChunkSize, - Work, - NetworkPool, - DownloadStats, - [&, TotalPartWriteCount, TotalRequestCount, RemoteChunkIndex, ChunkTargetPtrs]( - IoBuffer&& Payload) mutable { - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - if (Payload && Storage.BuildCacheStorage) - { - Storage.BuildCacheStorage->PutBuildBlob( - BuildId, - ChunkHash, - ZenContentType::kCompressedBinary, - CompositeBuffer(SharedBuffer(Payload))); - } - if (!PrimeCacheOnly) - { - if (!AbortFlag) - { - AsyncWriteDownloadedChunk(ZenFolderPath, - RemoteContent, - RemoteLookup, - RemoteChunkIndex, - std::move(ChunkTargetPtrs), - Work, - WritePool, - std::move(Payload), - SequenceIndexChunksLeftToWriteCounters, - WritePartsComplete, - TotalPartWriteCount, - FilteredWrittenBytesPerSecond, - DiskStats); - } - } - }); + Storage.BuildCacheStorage->PutBuildBlob(BuildId, + ChunkHash, + BuildBlob.GetContentType(), + CompositeBuffer(SharedBuffer(BuildBlob))); } - else + if (!BuildBlob) { - ZEN_TRACE_CPU("UpdateFolder_GetChunk"); - BuildBlob = Storage.BuildStorage->GetBuildBlob(BuildId, ChunkHash); - if (BuildBlob && Storage.BuildCacheStorage) - { - Storage.BuildCacheStorage->PutBuildBlob( - BuildId, - ChunkHash, - BuildBlob.GetContentType(), - CompositeBuffer(SharedBuffer(BuildBlob))); - } - if (!BuildBlob) - { - throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash)); - } - if (!PrimeCacheOnly) + throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash)); + } + if (!PrimeCacheOnly) + { + if (!AbortFlag) { - if (!AbortFlag) + uint64_t BlobSize = BuildBlob.GetSize(); + DownloadStats.DownloadedChunkCount++; + DownloadStats.DownloadedChunkByteCount += BlobSize; + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) { - uint64_t BlobSize = BuildBlob.GetSize(); - DownloadStats.DownloadedChunkCount++; - DownloadStats.DownloadedChunkByteCount += BlobSize; - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - AsyncWriteDownloadedChunk(ZenFolderPath, - RemoteContent, - RemoteLookup, - RemoteChunkIndex, - std::move(ChunkTargetPtrs), - Work, - WritePool, - std::move(BuildBlob), - SequenceIndexChunksLeftToWriteCounters, - WritePartsComplete, - TotalPartWriteCount, - FilteredWrittenBytesPerSecond, - DiskStats); + FilteredDownloadedBytesPerSecond.Stop(); } + AsyncWriteDownloadedChunk(ZenFolderPath, + RemoteContent, + RemoteLookup, + RemoteChunkIndex, + std::move(ChunkTargetPtrs), + Work, + WritePool, + std::move(BuildBlob), + SequenceIndexChunksLeftToWriteCounters, + WritePartsComplete, + TotalPartWriteCount, + FilteredWrittenBytesPerSecond, + DiskStats); } } } } - }, - Work.DefaultErrorFunction()); - } + } + }); } } - }, - Work.DefaultErrorFunction()); + } + }); } for (size_t CopyDataIndex = 0; CopyDataIndex < CacheCopyDatas.size(); CopyDataIndex++) @@ -6547,184 +6510,180 @@ namespace { break; } - Work.ScheduleWork( - WritePool, - [&, CopyDataIndex](std::atomic<bool>&) { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_CopyLocal"); + Work.ScheduleWork(WritePool, [&, CopyDataIndex](std::atomic<bool>&) { + if (!AbortFlag) + { + ZEN_TRACE_CPU("UpdateFolder_CopyLocal"); - FilteredWrittenBytesPerSecond.Start(); - const CacheCopyData& CopyData = CacheCopyDatas[CopyDataIndex]; + FilteredWrittenBytesPerSecond.Start(); + const CacheCopyData& CopyData = CacheCopyDatas[CopyDataIndex]; - std::filesystem::path SourceFilePath; + std::filesystem::path SourceFilePath; - if (CopyData.ScavengeSourceIndex == (uint32_t)-1) - { - const uint32_t LocalPathIndex = LocalLookup.SequenceIndexFirstPathIndex[CopyData.SourceSequenceIndex]; - SourceFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred(); - } - else - { - const ChunkedFolderContent& ScavengedContent = ScavengedContents[CopyData.ScavengeSourceIndex]; - const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[CopyData.ScavengeSourceIndex]; - const std::filesystem::path ScavengedPath = ScavengedPaths[CopyData.ScavengeSourceIndex]; - const uint32_t ScavengedPathIndex = - ScavengedLookup.SequenceIndexFirstPathIndex[CopyData.SourceSequenceIndex]; - SourceFilePath = (ScavengedPath / ScavengedContent.Paths[ScavengedPathIndex]).make_preferred(); - } - ZEN_ASSERT_SLOW(IsFile(SourceFilePath)); - ZEN_ASSERT(!CopyData.TargetChunkLocationPtrs.empty()); + if (CopyData.ScavengeSourceIndex == (uint32_t)-1) + { + const uint32_t LocalPathIndex = LocalLookup.SequenceIndexFirstPathIndex[CopyData.SourceSequenceIndex]; + SourceFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred(); + } + else + { + const ChunkedFolderContent& ScavengedContent = ScavengedContents[CopyData.ScavengeSourceIndex]; + const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[CopyData.ScavengeSourceIndex]; + const std::filesystem::path ScavengedPath = ScavengedPaths[CopyData.ScavengeSourceIndex]; + const uint32_t ScavengedPathIndex = ScavengedLookup.SequenceIndexFirstPathIndex[CopyData.SourceSequenceIndex]; + SourceFilePath = (ScavengedPath / ScavengedContent.Paths[ScavengedPathIndex]).make_preferred(); + } + ZEN_ASSERT_SLOW(IsFile(SourceFilePath)); + ZEN_ASSERT(!CopyData.TargetChunkLocationPtrs.empty()); - uint64_t CacheLocalFileBytesRead = 0; + uint64_t CacheLocalFileBytesRead = 0; - size_t TargetStart = 0; - const std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> AllTargets( - CopyData.TargetChunkLocationPtrs); + size_t TargetStart = 0; + const std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> AllTargets( + CopyData.TargetChunkLocationPtrs); - struct WriteOp - { - const ChunkedContentLookup::ChunkSequenceLocation* Target = nullptr; - uint64_t CacheFileOffset = (uint64_t)-1; - uint32_t ChunkIndex = (uint32_t)-1; - }; + struct WriteOp + { + const ChunkedContentLookup::ChunkSequenceLocation* Target = nullptr; + uint64_t CacheFileOffset = (uint64_t)-1; + uint32_t ChunkIndex = (uint32_t)-1; + }; - std::vector<WriteOp> WriteOps; + std::vector<WriteOp> WriteOps; - if (!AbortFlag) + if (!AbortFlag) + { + ZEN_TRACE_CPU("Sort"); + WriteOps.reserve(AllTargets.size()); + for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets) { - ZEN_TRACE_CPU("Sort"); - WriteOps.reserve(AllTargets.size()); - for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets) + std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> TargetRange = + AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount); + for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange) { - std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> TargetRange = - AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount); - for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange) - { - WriteOps.push_back(WriteOp{.Target = Target, - .CacheFileOffset = ChunkTarget.CacheFileOffset, - .ChunkIndex = ChunkTarget.RemoteChunkIndex}); - } - TargetStart += ChunkTarget.TargetChunkLocationCount; + WriteOps.push_back(WriteOp{.Target = Target, + .CacheFileOffset = ChunkTarget.CacheFileOffset, + .ChunkIndex = ChunkTarget.RemoteChunkIndex}); } + TargetStart += ChunkTarget.TargetChunkLocationCount; + } - std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) { - if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex) - { - return true; - } - else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex) - { - return false; - } - if (Lhs.Target->Offset < Rhs.Target->Offset) - { - return true; - } + std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) { + if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex) + { + return true; + } + else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex) + { return false; - }); - } + } + if (Lhs.Target->Offset < Rhs.Target->Offset) + { + return true; + } + return false; + }); + } - if (!AbortFlag) - { - ZEN_TRACE_CPU("Write"); + if (!AbortFlag) + { + ZEN_TRACE_CPU("Write"); - tsl::robin_set<uint32_t> ChunkIndexesWritten; + tsl::robin_set<uint32_t> ChunkIndexesWritten; - BufferedOpenFile SourceFile(SourceFilePath, DiskStats); - WriteFileCache OpenFileCache(DiskStats); - for (size_t WriteOpIndex = 0; WriteOpIndex < WriteOps.size();) + BufferedOpenFile SourceFile(SourceFilePath, DiskStats); + WriteFileCache OpenFileCache(DiskStats); + for (size_t WriteOpIndex = 0; WriteOpIndex < WriteOps.size();) + { + if (AbortFlag) { - if (AbortFlag) + break; + } + const WriteOp& Op = WriteOps[WriteOpIndex]; + + const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex; + ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() <= + RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]); + ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() > 0); + const uint32_t RemotePathIndex = RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]; + const uint64_t ChunkSize = RemoteContent.ChunkedContent.ChunkRawSizes[Op.ChunkIndex]; + + uint64_t ReadLength = ChunkSize; + size_t WriteCount = 1; + uint64_t OpSourceEnd = Op.CacheFileOffset + ChunkSize; + uint64_t OpTargetEnd = Op.Target->Offset + ChunkSize; + while ((WriteOpIndex + WriteCount) < WriteOps.size()) + { + const WriteOp& NextOp = WriteOps[WriteOpIndex + WriteCount]; + if (NextOp.Target->SequenceIndex != Op.Target->SequenceIndex) { break; } - const WriteOp& Op = WriteOps[WriteOpIndex]; - - const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex; - ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() <= - RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]); - ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() > 0); - const uint32_t RemotePathIndex = RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]; - const uint64_t ChunkSize = RemoteContent.ChunkedContent.ChunkRawSizes[Op.ChunkIndex]; - - uint64_t ReadLength = ChunkSize; - size_t WriteCount = 1; - uint64_t OpSourceEnd = Op.CacheFileOffset + ChunkSize; - uint64_t OpTargetEnd = Op.Target->Offset + ChunkSize; - while ((WriteOpIndex + WriteCount) < WriteOps.size()) + if (NextOp.Target->Offset != OpTargetEnd) { - const WriteOp& NextOp = WriteOps[WriteOpIndex + WriteCount]; - if (NextOp.Target->SequenceIndex != Op.Target->SequenceIndex) - { - break; - } - if (NextOp.Target->Offset != OpTargetEnd) - { - break; - } - if (NextOp.CacheFileOffset != OpSourceEnd) - { - break; - } - const uint64_t NextChunkLength = RemoteContent.ChunkedContent.ChunkRawSizes[NextOp.ChunkIndex]; - if (ReadLength + NextChunkLength > 512u * 1024u) - { - break; - } - ReadLength += NextChunkLength; - OpSourceEnd += NextChunkLength; - OpTargetEnd += NextChunkLength; - WriteCount++; + break; + } + if (NextOp.CacheFileOffset != OpSourceEnd) + { + break; + } + const uint64_t NextChunkLength = RemoteContent.ChunkedContent.ChunkRawSizes[NextOp.ChunkIndex]; + if (ReadLength + NextChunkLength > 512u * 1024u) + { + break; } + ReadLength += NextChunkLength; + OpSourceEnd += NextChunkLength; + OpTargetEnd += NextChunkLength; + WriteCount++; + } - CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ReadLength); - ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]); + CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ReadLength); + ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]); - OpenFileCache.WriteToFile<CompositeBuffer>( - RemoteSequenceIndex, - [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) { - return GetTempChunkedSequenceFileName( - CacheFolderPath, - RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); - }, - ChunkSource, - Op.Target->Offset, - RemoteContent.RawSizes[RemotePathIndex]); + OpenFileCache.WriteToFile<CompositeBuffer>( + RemoteSequenceIndex, + [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) { + return GetTempChunkedSequenceFileName( + CacheFolderPath, + RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); + }, + ChunkSource, + Op.Target->Offset, + RemoteContent.RawSizes[RemotePathIndex]); - CacheLocalFileBytesRead += ReadLength; // TODO: This should be the sum of unique chunk sizes? + CacheLocalFileBytesRead += ReadLength; // TODO: This should be the sum of unique chunk sizes? - WriteOpIndex += WriteCount; - } + WriteOpIndex += WriteCount; } - if (!AbortFlag) + } + if (!AbortFlag) + { + // Write tracking, updating this must be done without any files open (WriteFileCache) + std::vector<uint32_t> CompletedChunkSequences; + for (const WriteOp& Op : WriteOps) { - // Write tracking, updating this must be done without any files open (WriteFileCache) - std::vector<uint32_t> CompletedChunkSequences; - for (const WriteOp& Op : WriteOps) + const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex; + if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) { - const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex; - if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) - { - CompletedChunkSequences.push_back(RemoteSequenceIndex); - } + CompletedChunkSequences.push_back(RemoteSequenceIndex); } - VerifyAndCompleteChunkSequencesAsync(CacheFolderPath, - RemoteContent, - RemoteLookup, - CompletedChunkSequences, - Work, - WritePool); - ZEN_CONSOLE_VERBOSE("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), SourceFilePath); - } - WritePartsComplete++; - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); } + VerifyAndCompleteChunkSequencesAsync(CacheFolderPath, + RemoteContent, + RemoteLookup, + CompletedChunkSequences, + Work, + WritePool); + ZEN_CONSOLE_VERBOSE("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), SourceFilePath); } - }, - Work.DefaultErrorFunction()); + WritePartsComplete++; + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } + } + }); } for (uint32_t BlockIndex : CachedChunkBlockIndexes) @@ -6735,52 +6694,49 @@ namespace { break; } - Work.ScheduleWork( - WritePool, - [&, BlockIndex](std::atomic<bool>&) mutable { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_WriteCachedBlock"); + Work.ScheduleWork(WritePool, [&, BlockIndex](std::atomic<bool>&) mutable { + if (!AbortFlag) + { + ZEN_TRACE_CPU("UpdateFolder_WriteCachedBlock"); - const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; - FilteredWrittenBytesPerSecond.Start(); + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + FilteredWrittenBytesPerSecond.Start(); - std::filesystem::path BlockChunkPath = - ZenTempBlockFolderPath(ZenFolderPath) / BlockDescription.BlockHash.ToHexString(); - IoBuffer BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); - if (!BlockBuffer) + std::filesystem::path BlockChunkPath = + ZenTempBlockFolderPath(ZenFolderPath) / BlockDescription.BlockHash.ToHexString(); + IoBuffer BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); + if (!BlockBuffer) + { + throw std::runtime_error( + fmt::format("Can not read block {} at {}", BlockDescription.BlockHash, BlockChunkPath)); + } + + if (!AbortFlag) + { + if (!WriteBlockToDisk(CacheFolderPath, + RemoteContent, + BlockDescription, + SequenceIndexChunksLeftToWriteCounters, + Work, + WritePool, + CompositeBuffer(std::move(BlockBuffer)), + RemoteLookup, + RemoteChunkIndexNeedsCopyFromSourceFlags, + DiskStats)) { - throw std::runtime_error( - fmt::format("Can not read block {} at {}", BlockDescription.BlockHash, BlockChunkPath)); + std::error_code DummyEc; + RemoveFile(BlockChunkPath, DummyEc); + throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash)); } - - if (!AbortFlag) + WritePartsComplete++; + RemoveFileWithRetry(BlockChunkPath); + if (WritePartsComplete == TotalPartWriteCount) { - if (!WriteBlockToDisk(CacheFolderPath, - RemoteContent, - BlockDescription, - SequenceIndexChunksLeftToWriteCounters, - Work, - WritePool, - CompositeBuffer(std::move(BlockBuffer)), - RemoteLookup, - RemoteChunkIndexNeedsCopyFromSourceFlags, - DiskStats)) - { - std::error_code DummyEc; - RemoveFile(BlockChunkPath, DummyEc); - throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash)); - } - WritePartsComplete++; - RemoveFileWithRetry(BlockChunkPath); - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); - } + FilteredWrittenBytesPerSecond.Stop(); } } - }, - Work.DefaultErrorFunction()); + } + }); } for (size_t BlockRangeIndex = 0; BlockRangeIndex < BlockRangeWorks.size(); BlockRangeIndex++) @@ -6793,46 +6749,214 @@ namespace { const BlockRangeDescriptor BlockRange = BlockRangeWorks[BlockRangeIndex]; ZEN_ASSERT(BlockRange.BlockIndex != (uint32_t)-1); const uint32_t BlockIndex = BlockRange.BlockIndex; - Work.ScheduleWork( - NetworkPool, - [&, BlockIndex, BlockRange](std::atomic<bool>&) { + Work.ScheduleWork(NetworkPool, [&, BlockIndex, BlockRange](std::atomic<bool>&) { + if (!AbortFlag) + { + ZEN_TRACE_CPU("UpdateFolder_GetPartialBlock"); + + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + + FilteredDownloadedBytesPerSecond.Start(); + IoBuffer BlockBuffer; + if (Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash)) + { + BlockBuffer = Storage.BuildCacheStorage->GetBuildBlob(BuildId, + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength); + } + if (!BlockBuffer) + { + BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId, + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength); + } + if (!BlockBuffer) + { + throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); + } if (!AbortFlag) { - ZEN_TRACE_CPU("UpdateFolder_GetPartialBlock"); + uint64_t BlockSize = BlockBuffer.GetSize(); + DownloadStats.DownloadedBlockCount++; + DownloadStats.DownloadedBlockByteCount += BlockSize; + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } - const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + std::filesystem::path BlockChunkPath; - FilteredDownloadedBytesPerSecond.Start(); - IoBuffer BlockBuffer; - if (Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash)) + // Check if the dowloaded block is file based and we can move it directly without rewriting it { - BlockBuffer = Storage.BuildCacheStorage->GetBuildBlob(BuildId, - BlockDescription.BlockHash, - BlockRange.RangeStart, - BlockRange.RangeLength); + IoBufferFileReference FileRef; + if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && + (FileRef.FileChunkSize == BlockSize)) + { + ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock"); + + std::error_code Ec; + std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); + if (!Ec) + { + BlockBuffer.SetDeleteOnClose(false); + BlockBuffer = {}; + BlockChunkPath = ZenTempBlockFolderPath(ZenFolderPath) / fmt::format("{}_{:x}_{:x}", + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength); + RenameFile(TempBlobPath, BlockChunkPath, Ec); + if (Ec) + { + BlockChunkPath = std::filesystem::path{}; + + // Re-open the temp file again + BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); + BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); + BlockBuffer.SetDeleteOnClose(true); + } + } + } } - if (!BlockBuffer) + + if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u)) { - BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId, - BlockDescription.BlockHash, - BlockRange.RangeStart, - BlockRange.RangeLength); + ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock"); + // Could not be moved and rather large, lets store it on disk + BlockChunkPath = + ZenTempBlockFolderPath(ZenFolderPath) / + fmt::format("{}_{:x}_{:x}", BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength); + TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); + BlockBuffer = {}; } - if (!BlockBuffer) + + if (!AbortFlag) { - throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); + Work.ScheduleWork( + WritePool, + [&, BlockIndex, BlockRange, BlockChunkPath, BlockPartialBuffer = std::move(BlockBuffer)]( + std::atomic<bool>&) mutable { + if (!AbortFlag) + { + ZEN_TRACE_CPU("UpdateFolder_WritePartialBlock"); + + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + + if (BlockChunkPath.empty()) + { + ZEN_ASSERT(BlockPartialBuffer); + } + else + { + ZEN_ASSERT(!BlockPartialBuffer); + BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); + if (!BlockPartialBuffer) + { + throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}", + BlockDescription.BlockHash, + BlockChunkPath)); + } + } + + FilteredWrittenBytesPerSecond.Start(); + + if (!WritePartialBlockToDisk( + CacheFolderPath, + RemoteContent, + BlockDescription, + SequenceIndexChunksLeftToWriteCounters, + Work, + WritePool, + CompositeBuffer(std::move(BlockPartialBuffer)), + BlockRange.ChunkBlockIndexStart, + BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1, + RemoteLookup, + RemoteChunkIndexNeedsCopyFromSourceFlags, + DiskStats)) + { + std::error_code DummyEc; + RemoveFile(BlockChunkPath, DummyEc); + throw std::runtime_error( + fmt::format("Partial block {} is malformed", BlockDescription.BlockHash)); + } + + if (!BlockChunkPath.empty()) + { + RemoveFileWithRetry(BlockChunkPath); + } + + WritePartsComplete++; + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } + } + }); } - if (!AbortFlag) + } + } + }); + } + + for (uint32_t BlockIndex : FullBlockWorks) + { + if (AbortFlag) + { + break; + } + + if (PrimeCacheOnly && ExistsResult.ExistingBlobs.contains(BlockDescriptions[BlockIndex].BlockHash)) + { + DownloadStats.RequestsCompleteCount++; + continue; + } + + Work.ScheduleWork(NetworkPool, [&, BlockIndex](std::atomic<bool>&) { + if (!AbortFlag) + { + ZEN_TRACE_CPU("UpdateFolder_GetFullBlock"); + + const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; + + FilteredDownloadedBytesPerSecond.Start(); + + IoBuffer BlockBuffer; + const bool ExistsInCache = + Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash); + if (ExistsInCache) + { + BlockBuffer = Storage.BuildCacheStorage->GetBuildBlob(BuildId, BlockDescription.BlockHash); + } + if (!BlockBuffer) + { + BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId, BlockDescription.BlockHash); + if (BlockBuffer && Storage.BuildCacheStorage) { - uint64_t BlockSize = BlockBuffer.GetSize(); - DownloadStats.DownloadedBlockCount++; - DownloadStats.DownloadedBlockByteCount += BlockSize; - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } + Storage.BuildCacheStorage->PutBuildBlob(BuildId, + BlockDescription.BlockHash, + BlockBuffer.GetContentType(), + CompositeBuffer(SharedBuffer(BlockBuffer))); + } + } + if (!BlockBuffer) + { + throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); + } + if (!AbortFlag) + { + uint64_t BlockSize = BlockBuffer.GetSize(); + DownloadStats.DownloadedBlockCount++; + DownloadStats.DownloadedBlockByteCount += BlockSize; + DownloadStats.RequestsCompleteCount++; + if (DownloadStats.RequestsCompleteCount == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + if (!PrimeCacheOnly) + { std::filesystem::path BlockChunkPath; // Check if the dowloaded block is file based and we can move it directly without rewriting it @@ -6842,17 +6966,14 @@ namespace { (FileRef.FileChunkSize == BlockSize)) { ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock"); - std::error_code Ec; std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); if (!Ec) { BlockBuffer.SetDeleteOnClose(false); - BlockBuffer = {}; - BlockChunkPath = ZenTempBlockFolderPath(ZenFolderPath) / fmt::format("{}_{:x}_{:x}", - BlockDescription.BlockHash, - BlockRange.RangeStart, - BlockRange.RangeLength); + BlockBuffer = {}; + BlockChunkPath = + ZenTempBlockFolderPath(ZenFolderPath) / BlockDescription.BlockHash.ToHexString(); RenameFile(TempBlobPath, BlockChunkPath, Ec); if (Ec) { @@ -6871,10 +6992,7 @@ namespace { { ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock"); // Could not be moved and rather large, lets store it on disk - BlockChunkPath = ZenTempBlockFolderPath(ZenFolderPath) / fmt::format("{}_{:x}_{:x}", - BlockDescription.BlockHash, - BlockRange.RangeStart, - BlockRange.RangeLength); + BlockChunkPath = ZenTempBlockFolderPath(ZenFolderPath) / BlockDescription.BlockHash.ToHexString(); TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); BlockBuffer = {}; } @@ -6883,50 +7001,60 @@ namespace { { Work.ScheduleWork( WritePool, - [&, BlockIndex, BlockRange, BlockChunkPath, BlockPartialBuffer = std::move(BlockBuffer)]( - std::atomic<bool>&) mutable { + [&Work, + &WritePool, + &RemoteContent, + &RemoteLookup, + CacheFolderPath, + &RemoteChunkIndexNeedsCopyFromSourceFlags, + &SequenceIndexChunksLeftToWriteCounters, + BlockIndex, + &BlockDescriptions, + &WriteChunkStats, + &DiskStats, + &WritePartsComplete, + TotalPartWriteCount, + &FilteredWrittenBytesPerSecond, + BlockChunkPath, + BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable { if (!AbortFlag) { - ZEN_TRACE_CPU("UpdateFolder_WritePartialBlock"); + ZEN_TRACE_CPU("UpdateFolder_WriteFullBlock"); const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; if (BlockChunkPath.empty()) { - ZEN_ASSERT(BlockPartialBuffer); + ZEN_ASSERT(BlockBuffer); } else { - ZEN_ASSERT(!BlockPartialBuffer); - BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); - if (!BlockPartialBuffer) + ZEN_ASSERT(!BlockBuffer); + BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); + if (!BlockBuffer) { - throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}", + throw std::runtime_error(fmt::format("Could not open dowloaded block {} from {}", BlockDescription.BlockHash, BlockChunkPath)); } } FilteredWrittenBytesPerSecond.Start(); - - if (!WritePartialBlockToDisk( - CacheFolderPath, - RemoteContent, - BlockDescription, - SequenceIndexChunksLeftToWriteCounters, - Work, - WritePool, - CompositeBuffer(std::move(BlockPartialBuffer)), - BlockRange.ChunkBlockIndexStart, - BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1, - RemoteLookup, - RemoteChunkIndexNeedsCopyFromSourceFlags, - DiskStats)) + if (!WriteBlockToDisk(CacheFolderPath, + RemoteContent, + BlockDescription, + SequenceIndexChunksLeftToWriteCounters, + Work, + WritePool, + CompositeBuffer(std::move(BlockBuffer)), + RemoteLookup, + RemoteChunkIndexNeedsCopyFromSourceFlags, + DiskStats)) { std::error_code DummyEc; RemoveFile(BlockChunkPath, DummyEc); throw std::runtime_error( - fmt::format("Partial block {} is malformed", BlockDescription.BlockHash)); + fmt::format("Block {} is malformed", BlockDescription.BlockHash)); } if (!BlockChunkPath.empty()) @@ -6935,200 +7063,18 @@ namespace { } WritePartsComplete++; + if (WritePartsComplete == TotalPartWriteCount) { FilteredWrittenBytesPerSecond.Stop(); } } - }, - Work.DefaultErrorFunction()); - } - } - } - }, - Work.DefaultErrorFunction()); - } - - for (uint32_t BlockIndex : FullBlockWorks) - { - if (AbortFlag) - { - break; - } - - if (PrimeCacheOnly && ExistsResult.ExistingBlobs.contains(BlockDescriptions[BlockIndex].BlockHash)) - { - DownloadStats.RequestsCompleteCount++; - continue; - } - - Work.ScheduleWork( - NetworkPool, - [&, BlockIndex](std::atomic<bool>&) { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_GetFullBlock"); - - const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; - - FilteredDownloadedBytesPerSecond.Start(); - - IoBuffer BlockBuffer; - const bool ExistsInCache = - Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash); - if (ExistsInCache) - { - BlockBuffer = Storage.BuildCacheStorage->GetBuildBlob(BuildId, BlockDescription.BlockHash); - } - if (!BlockBuffer) - { - BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId, BlockDescription.BlockHash); - if (BlockBuffer && Storage.BuildCacheStorage) - { - Storage.BuildCacheStorage->PutBuildBlob(BuildId, - BlockDescription.BlockHash, - BlockBuffer.GetContentType(), - CompositeBuffer(SharedBuffer(BlockBuffer))); - } - } - if (!BlockBuffer) - { - throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); - } - if (!AbortFlag) - { - uint64_t BlockSize = BlockBuffer.GetSize(); - DownloadStats.DownloadedBlockCount++; - DownloadStats.DownloadedBlockByteCount += BlockSize; - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - - if (!PrimeCacheOnly) - { - std::filesystem::path BlockChunkPath; - - // Check if the dowloaded block is file based and we can move it directly without rewriting it - { - IoBufferFileReference FileRef; - if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && - (FileRef.FileChunkSize == BlockSize)) - { - ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock"); - std::error_code Ec; - std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); - if (!Ec) - { - BlockBuffer.SetDeleteOnClose(false); - BlockBuffer = {}; - BlockChunkPath = - ZenTempBlockFolderPath(ZenFolderPath) / BlockDescription.BlockHash.ToHexString(); - RenameFile(TempBlobPath, BlockChunkPath, Ec); - if (Ec) - { - BlockChunkPath = std::filesystem::path{}; - - // Re-open the temp file again - BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); - BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); - BlockBuffer.SetDeleteOnClose(true); - } - } - } - } - - if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u)) - { - ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock"); - // Could not be moved and rather large, lets store it on disk - BlockChunkPath = ZenTempBlockFolderPath(ZenFolderPath) / BlockDescription.BlockHash.ToHexString(); - TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); - BlockBuffer = {}; - } - - if (!AbortFlag) - { - Work.ScheduleWork( - WritePool, - [&Work, - &WritePool, - &RemoteContent, - &RemoteLookup, - CacheFolderPath, - &RemoteChunkIndexNeedsCopyFromSourceFlags, - &SequenceIndexChunksLeftToWriteCounters, - BlockIndex, - &BlockDescriptions, - &WriteChunkStats, - &DiskStats, - &WritePartsComplete, - TotalPartWriteCount, - &FilteredWrittenBytesPerSecond, - BlockChunkPath, - BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_WriteFullBlock"); - - const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; - - if (BlockChunkPath.empty()) - { - ZEN_ASSERT(BlockBuffer); - } - else - { - ZEN_ASSERT(!BlockBuffer); - BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); - if (!BlockBuffer) - { - throw std::runtime_error( - fmt::format("Could not open dowloaded block {} from {}", - BlockDescription.BlockHash, - BlockChunkPath)); - } - } - - FilteredWrittenBytesPerSecond.Start(); - if (!WriteBlockToDisk(CacheFolderPath, - RemoteContent, - BlockDescription, - SequenceIndexChunksLeftToWriteCounters, - Work, - WritePool, - CompositeBuffer(std::move(BlockBuffer)), - RemoteLookup, - RemoteChunkIndexNeedsCopyFromSourceFlags, - DiskStats)) - { - std::error_code DummyEc; - RemoveFile(BlockChunkPath, DummyEc); - throw std::runtime_error( - fmt::format("Block {} is malformed", BlockDescription.BlockHash)); - } - - if (!BlockChunkPath.empty()) - { - RemoveFileWithRetry(BlockChunkPath); - } - - WritePartsComplete++; - - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); - } - } - }, - Work.DefaultErrorFunction()); - } + }); } } } - }, - Work.DefaultErrorFunction()); + } + }); } { @@ -7321,8 +7267,8 @@ namespace { WorkerThreadPool& WritePool = GetIOWorkerPool(); - ProgressBar CacheLocalProgressBar(ProgressMode, "Cache Local Data"); - ParallellWork Work(AbortFlag); + ProgressBar CacheLocalProgressBar(ProgressMode, "Cache Local Data"); + ParallelWork Work(AbortFlag); for (uint32_t LocalPathIndex : FilesToCache) { @@ -7330,23 +7276,20 @@ namespace { { break; } - Work.ScheduleWork( - WritePool, - [&, LocalPathIndex](std::atomic<bool>&) { - ZEN_TRACE_CPU("UpdateFolder_AsyncCopyToCache"); - if (!AbortFlag) - { - const IoHash& RawHash = LocalContent.RawHashes[LocalPathIndex]; - const std::filesystem::path& LocalPath = LocalContent.Paths[LocalPathIndex]; - const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash); - ZEN_ASSERT_SLOW(!IsFileWithRetry(CacheFilePath)); - const std::filesystem::path LocalFilePath = (Path / LocalPath).make_preferred(); - RenameFileWithRetry(LocalFilePath, CacheFilePath); - CachedCount++; - CachedByteCount += LocalContent.RawSizes[LocalPathIndex]; - } - }, - Work.DefaultErrorFunction()); + Work.ScheduleWork(WritePool, [&, LocalPathIndex](std::atomic<bool>&) { + ZEN_TRACE_CPU("UpdateFolder_AsyncCopyToCache"); + if (!AbortFlag) + { + const IoHash& RawHash = LocalContent.RawHashes[LocalPathIndex]; + const std::filesystem::path& LocalPath = LocalContent.Paths[LocalPathIndex]; + const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash); + ZEN_ASSERT_SLOW(!IsFileWithRetry(CacheFilePath)); + const std::filesystem::path LocalFilePath = (Path / LocalPath).make_preferred(); + RenameFileWithRetry(LocalFilePath, CacheFilePath); + CachedCount++; + CachedByteCount += LocalContent.RawSizes[LocalPathIndex]; + } + }); } { @@ -7409,8 +7352,8 @@ namespace { WorkerThreadPool& WritePool = GetIOWorkerPool(); - ProgressBar RebuildProgressBar(ProgressMode, "Rebuild State"); - ParallellWork Work(AbortFlag); + ProgressBar RebuildProgressBar(ProgressMode, "Rebuild State"); + ParallelWork Work(AbortFlag); OutLocalFolderState.Paths.resize(RemoteContent.Paths.size()); OutLocalFolderState.RawSizes.resize(RemoteContent.Paths.size()); @@ -7425,18 +7368,15 @@ namespace { { break; } - Work.ScheduleWork( - WritePool, - [&, LocalPathIndex](std::atomic<bool>&) { - if (!AbortFlag) - { - const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred(); - SetFileReadOnlyWithRetry(LocalFilePath, false); - RemoveFileWithRetry(LocalFilePath); - DeletedCount++; - } - }, - Work.DefaultErrorFunction()); + Work.ScheduleWork(WritePool, [&, LocalPathIndex](std::atomic<bool>&) { + if (!AbortFlag) + { + const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred(); + SetFileReadOnlyWithRetry(LocalFilePath, false); + RemoveFileWithRetry(LocalFilePath); + DeletedCount++; + } + }); } std::atomic<uint64_t> TargetsComplete = 0; @@ -7479,166 +7419,158 @@ namespace { TargetCount++; } - Work.ScheduleWork( - WritePool, - [&, BaseTargetOffset = TargetOffset, TargetCount](std::atomic<bool>&) { - if (!AbortFlag) - { - ZEN_TRACE_CPU("FinalizeTree_Work"); + Work.ScheduleWork(WritePool, [&, BaseTargetOffset = TargetOffset, TargetCount](std::atomic<bool>&) { + if (!AbortFlag) + { + ZEN_TRACE_CPU("FinalizeTree_Work"); - size_t TargetOffset = BaseTargetOffset; - const IoHash& RawHash = Targets[TargetOffset].RawHash; + size_t TargetOffset = BaseTargetOffset; + const IoHash& RawHash = Targets[TargetOffset].RawHash; - if (RawHash == IoHash::Zero) + if (RawHash == IoHash::Zero) + { + ZEN_TRACE_CPU("ZeroSize"); + while (TargetOffset < (BaseTargetOffset + TargetCount)) { - ZEN_TRACE_CPU("ZeroSize"); - while (TargetOffset < (BaseTargetOffset + TargetCount)) + const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex; + ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash); + const std::filesystem::path& TargetPath = RemoteContent.Paths[RemotePathIndex]; + std::filesystem::path TargetFilePath = (Path / TargetPath).make_preferred(); + if (!RemotePathIndexToLocalPathIndex[RemotePathIndex]) { - const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex; - ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash); - const std::filesystem::path& TargetPath = RemoteContent.Paths[RemotePathIndex]; - std::filesystem::path TargetFilePath = (Path / TargetPath).make_preferred(); - if (!RemotePathIndexToLocalPathIndex[RemotePathIndex]) + if (IsFileWithRetry(TargetFilePath)) { - if (IsFileWithRetry(TargetFilePath)) - { - SetFileReadOnlyWithRetry(TargetFilePath, false); - } - else - { - CreateDirectories(TargetFilePath.parent_path()); - } - BasicFile OutputFile; - OutputFile.Open(TargetFilePath, BasicFile::Mode::kTruncate); + SetFileReadOnlyWithRetry(TargetFilePath, false); + } + else + { + CreateDirectories(TargetFilePath.parent_path()); } - OutLocalFolderState.Paths[RemotePathIndex] = TargetPath; - OutLocalFolderState.RawSizes[RemotePathIndex] = RemoteContent.RawSizes[RemotePathIndex]; - - OutLocalFolderState.Attributes[RemotePathIndex] = - RemoteContent.Attributes.empty() - ? GetNativeFileAttributes(TargetFilePath) - : SetNativeFileAttributes(TargetFilePath, - RemoteContent.Platform, - RemoteContent.Attributes[RemotePathIndex]); - OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath); - - TargetOffset++; - TargetsComplete++; + BasicFile OutputFile; + OutputFile.Open(TargetFilePath, BasicFile::Mode::kTruncate); } + OutLocalFolderState.Paths[RemotePathIndex] = TargetPath; + OutLocalFolderState.RawSizes[RemotePathIndex] = RemoteContent.RawSizes[RemotePathIndex]; + + OutLocalFolderState.Attributes[RemotePathIndex] = + RemoteContent.Attributes.empty() ? GetNativeFileAttributes(TargetFilePath) + : SetNativeFileAttributes(TargetFilePath, + RemoteContent.Platform, + RemoteContent.Attributes[RemotePathIndex]); + OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath); + + TargetOffset++; + TargetsComplete++; + } + } + else + { + ZEN_TRACE_CPU("Files"); + ZEN_ASSERT(RemoteLookup.RawHashToSequenceIndex.contains(RawHash)); + const uint32_t FirstRemotePathIndex = Targets[TargetOffset].RemotePathIndex; + const std::filesystem::path& FirstTargetPath = RemoteContent.Paths[FirstRemotePathIndex]; + std::filesystem::path FirstTargetFilePath = (Path / FirstTargetPath).make_preferred(); + + if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(FirstRemotePathIndex); + InPlaceIt != RemotePathIndexToLocalPathIndex.end()) + { + ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath)); } else { - ZEN_TRACE_CPU("Files"); - ZEN_ASSERT(RemoteLookup.RawHashToSequenceIndex.contains(RawHash)); - const uint32_t FirstRemotePathIndex = Targets[TargetOffset].RemotePathIndex; - const std::filesystem::path& FirstTargetPath = RemoteContent.Paths[FirstRemotePathIndex]; - std::filesystem::path FirstTargetFilePath = (Path / FirstTargetPath).make_preferred(); - - if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(FirstRemotePathIndex); - InPlaceIt != RemotePathIndexToLocalPathIndex.end()) + if (IsFileWithRetry(FirstTargetFilePath)) { - ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath)); + SetFileReadOnlyWithRetry(FirstTargetFilePath, false); } else { - if (IsFileWithRetry(FirstTargetFilePath)) - { - SetFileReadOnlyWithRetry(FirstTargetFilePath, false); - } - else - { - CreateDirectories(FirstTargetFilePath.parent_path()); - } + CreateDirectories(FirstTargetFilePath.parent_path()); + } - if (auto InplaceIt = SequenceHashToLocalPathIndex.find(RawHash); - InplaceIt != SequenceHashToLocalPathIndex.end()) - { - ZEN_TRACE_CPU("Copy"); - const uint32_t LocalPathIndex = InplaceIt->second; - const std::filesystem::path& SourcePath = LocalContent.Paths[LocalPathIndex]; - std::filesystem::path SourceFilePath = (Path / SourcePath).make_preferred(); - ZEN_ASSERT_SLOW(IsFileWithRetry(SourceFilePath)); - - ZEN_DEBUG("Copying from '{}' -> '{}'", SourceFilePath, FirstTargetFilePath); - CopyFile(SourceFilePath, FirstTargetFilePath, {.EnableClone = false}); - RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++; - } - else - { - ZEN_TRACE_CPU("Rename"); - const std::filesystem::path CacheFilePath = - GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash); - ZEN_ASSERT_SLOW(IsFileWithRetry(CacheFilePath)); + if (auto InplaceIt = SequenceHashToLocalPathIndex.find(RawHash); + InplaceIt != SequenceHashToLocalPathIndex.end()) + { + ZEN_TRACE_CPU("Copy"); + const uint32_t LocalPathIndex = InplaceIt->second; + const std::filesystem::path& SourcePath = LocalContent.Paths[LocalPathIndex]; + std::filesystem::path SourceFilePath = (Path / SourcePath).make_preferred(); + ZEN_ASSERT_SLOW(IsFileWithRetry(SourceFilePath)); + + ZEN_DEBUG("Copying from '{}' -> '{}'", SourceFilePath, FirstTargetFilePath); + CopyFile(SourceFilePath, FirstTargetFilePath, {.EnableClone = false}); + RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++; + } + else + { + ZEN_TRACE_CPU("Rename"); + const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash); + ZEN_ASSERT_SLOW(IsFileWithRetry(CacheFilePath)); - RenameFileWithRetry(CacheFilePath, FirstTargetFilePath); + RenameFileWithRetry(CacheFilePath, FirstTargetFilePath); - RebuildFolderStateStats.FinalizeTreeFilesMovedCount++; - } + RebuildFolderStateStats.FinalizeTreeFilesMovedCount++; } + } - OutLocalFolderState.Paths[FirstRemotePathIndex] = FirstTargetPath; - OutLocalFolderState.RawSizes[FirstRemotePathIndex] = RemoteContent.RawSizes[FirstRemotePathIndex]; + OutLocalFolderState.Paths[FirstRemotePathIndex] = FirstTargetPath; + OutLocalFolderState.RawSizes[FirstRemotePathIndex] = RemoteContent.RawSizes[FirstRemotePathIndex]; - OutLocalFolderState.Attributes[FirstRemotePathIndex] = - RemoteContent.Attributes.empty() - ? GetNativeFileAttributes(FirstTargetFilePath) - : SetNativeFileAttributes(FirstTargetFilePath, - RemoteContent.Platform, - RemoteContent.Attributes[FirstRemotePathIndex]); - OutLocalFolderState.ModificationTicks[FirstRemotePathIndex] = - GetModificationTickFromPath(FirstTargetFilePath); + OutLocalFolderState.Attributes[FirstRemotePathIndex] = + RemoteContent.Attributes.empty() ? GetNativeFileAttributes(FirstTargetFilePath) + : SetNativeFileAttributes(FirstTargetFilePath, + RemoteContent.Platform, + RemoteContent.Attributes[FirstRemotePathIndex]); + OutLocalFolderState.ModificationTicks[FirstRemotePathIndex] = GetModificationTickFromPath(FirstTargetFilePath); - TargetOffset++; - TargetsComplete++; + TargetOffset++; + TargetsComplete++; - while (TargetOffset < (BaseTargetOffset + TargetCount)) - { - const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex; - ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash); - const std::filesystem::path& TargetPath = RemoteContent.Paths[RemotePathIndex]; - std::filesystem::path TargetFilePath = (Path / TargetPath).make_preferred(); + while (TargetOffset < (BaseTargetOffset + TargetCount)) + { + const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex; + ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash); + const std::filesystem::path& TargetPath = RemoteContent.Paths[RemotePathIndex]; + std::filesystem::path TargetFilePath = (Path / TargetPath).make_preferred(); - if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(RemotePathIndex); - InPlaceIt != RemotePathIndexToLocalPathIndex.end()) + if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(RemotePathIndex); + InPlaceIt != RemotePathIndexToLocalPathIndex.end()) + { + ZEN_ASSERT_SLOW(IsFileWithRetry(TargetFilePath)); + } + else + { + ZEN_TRACE_CPU("Copy"); + if (IsFileWithRetry(TargetFilePath)) { - ZEN_ASSERT_SLOW(IsFileWithRetry(TargetFilePath)); + SetFileReadOnlyWithRetry(TargetFilePath, false); } else { - ZEN_TRACE_CPU("Copy"); - if (IsFileWithRetry(TargetFilePath)) - { - SetFileReadOnlyWithRetry(TargetFilePath, false); - } - else - { - CreateDirectories(TargetFilePath.parent_path()); - } - - ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath)); - ZEN_DEBUG("Copying from '{}' -> '{}'", FirstTargetFilePath, TargetFilePath); - CopyFile(FirstTargetFilePath, TargetFilePath, {.EnableClone = false}); - RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++; + CreateDirectories(TargetFilePath.parent_path()); } - OutLocalFolderState.Paths[RemotePathIndex] = TargetPath; - OutLocalFolderState.RawSizes[RemotePathIndex] = RemoteContent.RawSizes[RemotePathIndex]; + ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath)); + ZEN_DEBUG("Copying from '{}' -> '{}'", FirstTargetFilePath, TargetFilePath); + CopyFile(FirstTargetFilePath, TargetFilePath, {.EnableClone = false}); + RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++; + } - OutLocalFolderState.Attributes[RemotePathIndex] = - RemoteContent.Attributes.empty() - ? GetNativeFileAttributes(TargetFilePath) - : SetNativeFileAttributes(TargetFilePath, - RemoteContent.Platform, - RemoteContent.Attributes[RemotePathIndex]); - OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath); + OutLocalFolderState.Paths[RemotePathIndex] = TargetPath; + OutLocalFolderState.RawSizes[RemotePathIndex] = RemoteContent.RawSizes[RemotePathIndex]; - TargetOffset++; - TargetsComplete++; - } + OutLocalFolderState.Attributes[RemotePathIndex] = + RemoteContent.Attributes.empty() ? GetNativeFileAttributes(TargetFilePath) + : SetNativeFileAttributes(TargetFilePath, + RemoteContent.Platform, + RemoteContent.Attributes[RemotePathIndex]); + OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath); + + TargetOffset++; + TargetsComplete++; } } - }, - Work.DefaultErrorFunction()); + } + }); TargetOffset += TargetCount; } @@ -10470,7 +10402,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) return true; }; - ParallellWork Work(AbortFlag); + ParallelWork Work(AbortFlag); uint32_t Randomizer = 0; auto FileSizeIt = DownloadContent.FileSizes.begin(); @@ -10512,8 +10444,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) SetFileReadOnly(FilePath, true); } } - }, - Work.DefaultErrorFunction()); + }); } } break; @@ -10677,14 +10608,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) return 0; } } - catch (const ParallellWorkException& Ex) - { - for (const std::string& Error : Ex.m_Errors) - { - ZEN_ERROR("{}", Error); - } - return 3; - } catch (const std::exception& Ex) { ZEN_ERROR("{}", Ex.what()); diff --git a/src/zen/cmds/wipe_cmd.cpp b/src/zen/cmds/wipe_cmd.cpp index 269f95417..9dfdca0a1 100644 --- a/src/zen/cmds/wipe_cmd.cpp +++ b/src/zen/cmds/wipe_cmd.cpp @@ -8,7 +8,7 @@ #include <zencore/string.h> #include <zencore/timer.h> #include <zencore/trace.h> -#include <zenutil/parallellwork.h> +#include <zenutil/parallelwork.h> #include <zenutil/workerpools.h> #include <signal.h> @@ -248,7 +248,7 @@ namespace { return Added; }; - ParallellWork Work(AbortFlag); + ParallelWork Work(AbortFlag); struct AsyncVisitor : public GetDirectoryContentVisitor { diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp index c1df6d53e..f71397922 100644 --- a/src/zencore/filesystem.cpp +++ b/src/zencore/filesystem.cpp @@ -2322,12 +2322,17 @@ GetDirectoryContent(const std::filesystem::path& RootDir, RelativeRoot = RelativeRoot / DirectoryName]() { ZEN_ASSERT(Visitor); auto _ = MakeGuard([&]() { PendingWorkCount->CountDown(); }); + try { MultithreadedVisitor SubVisitor(*WorkerPool, *PendingWorkCount, RelativeRoot, Flags, Visitor); FileSystemTraversal Traversal; Traversal.TraverseFileSystem(Path, SubVisitor); Visitor->AsyncVisitDirectory(SubVisitor.RelativeRoot, std::move(SubVisitor.Content)); } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed scheduling work to scan subfolder '{}'. Reason: '{}'", Path / RelativeRoot, Ex.what()); + } }); } catch (const std::exception Ex) diff --git a/src/zenutil/buildstoragecache.cpp b/src/zenutil/buildstoragecache.cpp index f273ac699..88238effd 100644 --- a/src/zenutil/buildstoragecache.cpp +++ b/src/zenutil/buildstoragecache.cpp @@ -338,7 +338,7 @@ public: return {}; } - virtual void Flush(int32_t UpdateInteralMS, std::function<bool(intptr_t Remaining)>&& UpdateCallback) override + virtual void Flush(int32_t UpdateIntervalMS, std::function<bool(intptr_t Remaining)>&& UpdateCallback) override { if (IsFlushed) { @@ -358,7 +358,7 @@ public: intptr_t Remaining = m_PendingBackgroundWorkCount.Remaining(); if (UpdateCallback(Remaining)) { - if (m_PendingBackgroundWorkCount.Wait(UpdateInteralMS)) + if (m_PendingBackgroundWorkCount.Wait(UpdateIntervalMS)) { UpdateCallback(0); return; diff --git a/src/zenutil/chunkedcontent.cpp b/src/zenutil/chunkedcontent.cpp index 17b348f8d..ae129324e 100644 --- a/src/zenutil/chunkedcontent.cpp +++ b/src/zenutil/chunkedcontent.cpp @@ -11,7 +11,7 @@ #include <zenutil/chunkedfile.h> #include <zenutil/chunkingcontroller.h> -#include <zenutil/parallellwork.h> +#include <zenutil/parallelwork.h> #include <zenutil/workerpools.h> ZEN_THIRD_PARTY_INCLUDES_START @@ -378,7 +378,7 @@ GetFolderContent(GetFolderContentStatistics& Stats, std::function<bool(const std::string_view& RelativePath)>&& AcceptDirectory, std::function<bool(std::string_view RelativePath, uint64_t Size, uint32_t Attributes)>&& AcceptFile, WorkerThreadPool& WorkerPool, - int32_t UpdateInteralMS, + int32_t UpdateIntervalMS, std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback, std::atomic<bool>& AbortFlag) { @@ -467,7 +467,7 @@ GetFolderContent(GetFolderContentStatistics& Stats, WorkerPool, PendingWork); PendingWork.CountDown(); - while (!PendingWork.Wait(UpdateInteralMS)) + while (!PendingWork.Wait(UpdateIntervalMS)) { UpdateCallback(AbortFlag.load(), PendingWork.Remaining()); } @@ -731,7 +731,7 @@ ChunkFolderContent(ChunkingStatistics& Stats, const std::filesystem::path& RootPath, const FolderContent& Content, const ChunkingController& InChunkingController, - int32_t UpdateInteralMS, + int32_t UpdateIntervalMS, std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback, std::atomic<bool>& AbortFlag) { @@ -772,7 +772,7 @@ ChunkFolderContent(ChunkingStatistics& Stats, RwLock Lock; - ParallellWork Work(AbortFlag); + ParallelWork Work(AbortFlag); for (uint32_t PathIndex : Order) { @@ -780,28 +780,26 @@ ChunkFolderContent(ChunkingStatistics& Stats, { break; } - Work.ScheduleWork( - WorkerPool, // GetSyncWorkerPool() - [&, PathIndex](std::atomic<bool>& AbortFlag) { - if (!AbortFlag) - { - IoHash RawHash = HashOneFile(Stats, - InChunkingController, - Result, - ChunkHashToChunkIndex, - RawHashToSequenceRawHashIndex, - Lock, - RootPath, - PathIndex, - AbortFlag); - Lock.WithExclusiveLock([&]() { Result.RawHashes[PathIndex] = RawHash; }); - Stats.FilesProcessed++; - } - }, - Work.DefaultErrorFunction()); - } - - Work.Wait(UpdateInteralMS, [&](bool IsAborted, std::ptrdiff_t PendingWork) { + Work.ScheduleWork(WorkerPool, // GetSyncWorkerPool() + [&, PathIndex](std::atomic<bool>& AbortFlag) { + if (!AbortFlag) + { + IoHash RawHash = HashOneFile(Stats, + InChunkingController, + Result, + ChunkHashToChunkIndex, + RawHashToSequenceRawHashIndex, + Lock, + RootPath, + PathIndex, + AbortFlag); + Lock.WithExclusiveLock([&]() { Result.RawHashes[PathIndex] = RawHash; }); + Stats.FilesProcessed++; + } + }); + } + + Work.Wait(UpdateIntervalMS, [&](bool IsAborted, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted); ZEN_UNUSED(PendingWork); UpdateCallback(Work.IsAborted(), Work.PendingWork().Remaining()); diff --git a/src/zenutil/include/zenutil/buildstoragecache.h b/src/zenutil/include/zenutil/buildstoragecache.h index cab35328d..e1fb73fd4 100644 --- a/src/zenutil/include/zenutil/buildstoragecache.h +++ b/src/zenutil/include/zenutil/buildstoragecache.h @@ -44,7 +44,7 @@ public: virtual std::vector<BlobExistsResult> BlobsExists(const Oid& BuildId, std::span<const IoHash> BlobHashes) = 0; virtual void Flush( - int32_t UpdateInteralMS, + int32_t UpdateIntervalMS, std::function<bool(intptr_t Remaining)>&& UpdateCallback = [](intptr_t) { return true; }) = 0; }; diff --git a/src/zenutil/include/zenutil/chunkedcontent.h b/src/zenutil/include/zenutil/chunkedcontent.h index 57b55cb8e..d33869be2 100644 --- a/src/zenutil/include/zenutil/chunkedcontent.h +++ b/src/zenutil/include/zenutil/chunkedcontent.h @@ -67,7 +67,7 @@ FolderContent GetFolderContent(GetFolderContentStatistics& Stats, std::function<bool(const std::string_view& RelativePath)>&& AcceptDirectory, std::function<bool(std::string_view RelativePath, uint64_t Size, uint32_t Attributes)>&& AcceptFile, WorkerThreadPool& WorkerPool, - int32_t UpdateInteralMS, + int32_t UpdateIntervalMS, std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback, std::atomic<bool>& AbortFlag); @@ -116,7 +116,7 @@ ChunkedFolderContent ChunkFolderContent(ChunkingStatistics& Stats, const std::filesystem::path& RootPath, const FolderContent& Content, const ChunkingController& InChunkingController, - int32_t UpdateInteralMS, + int32_t UpdateIntervalMS, std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback, std::atomic<bool>& AbortFlag); diff --git a/src/zenutil/include/zenutil/parallellwork.h b/src/zenutil/include/zenutil/parallellwork.h deleted file mode 100644 index 8ea77c65d..000000000 --- a/src/zenutil/include/zenutil/parallellwork.h +++ /dev/null @@ -1,119 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/except.h> -#include <zencore/fmtutils.h> -#include <zencore/thread.h> -#include <zencore/workthreadpool.h> - -#include <atomic> - -class ParallellWorkException : public std::runtime_error -{ -public: - explicit ParallellWorkException(std::vector<std::string>&& Errors) : std::runtime_error(Errors.front()), m_Errors(std::move(Errors)) {} - - const std::vector<std::string> m_Errors; -}; - -namespace zen { - -class ParallellWork -{ -public: - ParallellWork(std::atomic<bool>& AbortFlag) : m_AbortFlag(AbortFlag), m_PendingWork(1) {} - - ~ParallellWork() - { - // Make sure to call Wait before destroying - ZEN_ASSERT(m_PendingWork.Remaining() == 0); - } - - std::function<void(const std::exception& Ex, std::atomic<bool>& AbortFlag)> DefaultErrorFunction() - { - return [&](const std::exception& Ex, std::atomic<bool>& AbortFlag) { - m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex.what()); }); - AbortFlag = true; - }; - } - - void ScheduleWork(WorkerThreadPool& WorkerPool, - std::function<void(std::atomic<bool>& AbortFlag)>&& Work, - std::function<void(const std::exception& Ex, std::atomic<bool>& AbortFlag)>&& OnError) - { - m_PendingWork.AddCount(1); - try - { - WorkerPool.ScheduleWork([this, Work = std::move(Work), OnError = std::move(OnError)] { - try - { - Work(m_AbortFlag); - } - catch (const AssertException& AssertEx) - { - OnError( - std::runtime_error(fmt::format("Caught assert exception while handling request: {}", AssertEx.FullDescription())), - m_AbortFlag); - } - catch (const std::system_error& SystemError) - { - if (IsOOM(SystemError.code())) - { - OnError(std::runtime_error(fmt::format("Out of memory. Reason: {}", SystemError.what())), m_AbortFlag); - } - else if (IsOOD(SystemError.code())) - { - OnError(std::runtime_error(fmt::format("Out of disk. Reason: {}", SystemError.what())), m_AbortFlag); - } - else - { - OnError(std::runtime_error(fmt::format("System error. Reason: {}", SystemError.what())), m_AbortFlag); - } - } - catch (const std::exception& Ex) - { - OnError(Ex, m_AbortFlag); - } - m_PendingWork.CountDown(); - }); - } - catch (const std::exception&) - { - m_PendingWork.CountDown(); - throw; - } - } - - void Abort() { m_AbortFlag = true; } - - bool IsAborted() const { return m_AbortFlag.load(); } - - void Wait(int32_t UpdateInteralMS, std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback) - { - ZEN_ASSERT(m_PendingWork.Remaining() > 0); - m_PendingWork.CountDown(); - while (!m_PendingWork.Wait(UpdateInteralMS)) - { - UpdateCallback(m_AbortFlag.load(), m_PendingWork.Remaining()); - } - if (m_Errors.size() == 1) - { - throw std::runtime_error(m_Errors.front()); - } - else if (m_Errors.size() > 1) - { - throw ParallellWorkException(std::move(m_Errors)); - } - } - Latch& PendingWork() { return m_PendingWork; } - -private: - std::atomic<bool>& m_AbortFlag; - Latch m_PendingWork; - - RwLock m_ErrorLock; - std::vector<std::string> m_Errors; -}; - -} // namespace zen diff --git a/src/zenutil/include/zenutil/parallelwork.h b/src/zenutil/include/zenutil/parallelwork.h new file mode 100644 index 000000000..08e730b28 --- /dev/null +++ b/src/zenutil/include/zenutil/parallelwork.h @@ -0,0 +1,71 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/thread.h> +#include <zencore/workthreadpool.h> + +#include <atomic> + +namespace zen { + +class ParallelWork +{ +public: + ParallelWork(std::atomic<bool>& AbortFlag); + + ~ParallelWork(); + + typedef std::function<void(std::atomic<bool>& AbortFlag)> WorkCallback; + typedef std::function<void(std::exception_ptr Ex, std::atomic<bool>& AbortFlag)> ExceptionCallback; + typedef std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)> UpdateCallback; + + void ScheduleWork(WorkerThreadPool& WorkerPool, WorkCallback&& Work, ExceptionCallback&& OnError = {}) + { + m_PendingWork.AddCount(1); + try + { + WorkerPool.ScheduleWork([this, Work = std::move(Work), OnError = OnError ? std::move(OnError) : DefaultErrorFunction()] { + try + { + Work(m_AbortFlag); + } + catch (...) + { + OnError(std::current_exception(), m_AbortFlag); + } + m_PendingWork.CountDown(); + }); + } + catch (const std::exception&) + { + m_PendingWork.CountDown(); + throw; + } + } + + void Abort() { m_AbortFlag = true; } + + bool IsAborted() const { return m_AbortFlag.load(); } + + void Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback); + + void Wait(); + + Latch& PendingWork() { return m_PendingWork; } + +private: + ExceptionCallback DefaultErrorFunction(); + void RethrowErrors(); + + std::atomic<bool>& m_AbortFlag; + bool m_DispatchComplete = false; + Latch m_PendingWork; + + RwLock m_ErrorLock; + std::vector<std::exception_ptr> m_Errors; +}; + +void parallellwork_forcelink(); + +} // namespace zen diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp new file mode 100644 index 000000000..516d70e28 --- /dev/null +++ b/src/zenutil/parallelwork.cpp @@ -0,0 +1,192 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/parallelwork.h> + +#include <zencore/except.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> + +#include <typeinfo> + +#if ZEN_WITH_TESTS +# include <zencore/testing.h> +#endif // ZEN_WITH_TESTS + +namespace zen { + +ParallelWork::ParallelWork(std::atomic<bool>& AbortFlag) : m_AbortFlag(AbortFlag), m_PendingWork(1) +{ +} + +ParallelWork::~ParallelWork() +{ + try + { + if (!m_DispatchComplete) + { + ZEN_ASSERT(m_PendingWork.Remaining() > 0); + ZEN_WARN( + "ParallelWork disposed without explicit wait for completion, likely caused by an exception, waiting for dispatched threads " + "to complete"); + m_PendingWork.CountDown(); + } + m_AbortFlag.store(true); + m_PendingWork.Wait(); + ZEN_ASSERT(m_PendingWork.Remaining() == 0); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Exception in ~ParallelWork: {}", Ex.what()); + } +} + +ParallelWork::ExceptionCallback +ParallelWork::DefaultErrorFunction() +{ + return [&](std::exception_ptr Ex, std::atomic<bool>& AbortFlag) { + m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex); }); + AbortFlag = true; + }; +} + +void +ParallelWork::Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback) +{ + ZEN_ASSERT(!m_DispatchComplete); + m_DispatchComplete = true; + + ZEN_ASSERT(m_PendingWork.Remaining() > 0); + m_PendingWork.CountDown(); + + while (!m_PendingWork.Wait(UpdateIntervalMS)) + { + UpdateCallback(m_AbortFlag.load(), m_PendingWork.Remaining()); + } + + RethrowErrors(); +} + +void +ParallelWork::Wait() +{ + ZEN_ASSERT(!m_DispatchComplete); + m_DispatchComplete = true; + + ZEN_ASSERT(m_PendingWork.Remaining() > 0); + m_PendingWork.CountDown(); + m_PendingWork.Wait(); + + RethrowErrors(); +} + +void +ParallelWork::RethrowErrors() +{ + if (!m_Errors.empty()) + { + if (m_Errors.size() > 1) + { + ZEN_INFO("Multiple exceptions throwm during ParallelWork execution, dropping the following exceptions:"); + auto It = m_Errors.begin() + 1; + while (It != m_Errors.end()) + { + try + { + std::rethrow_exception(*It); + } + catch (const std::exception& Ex) + { + ZEN_INFO(" {}", Ex.what()); + } + It++; + } + } + std::exception_ptr Ex = m_Errors.front(); + m_Errors.clear(); + std::rethrow_exception(Ex); + } +} + +#if ZEN_WITH_TESTS + +TEST_CASE("parallellwork.nowork") +{ + std::atomic<bool> AbortFlag; + ParallelWork Work(AbortFlag); + Work.Wait(); +} + +TEST_CASE("parallellwork.basic") +{ + WorkerThreadPool WorkerPool(2); + + std::atomic<bool> AbortFlag; + ParallelWork Work(AbortFlag); + for (uint32_t I = 0; I < 5; I++) + { + Work.ScheduleWork(WorkerPool, [](std::atomic<bool>& AbortFlag) { CHECK(!AbortFlag); }); + } + Work.Wait(); +} + +TEST_CASE("parallellwork.throws_in_work") +{ + WorkerThreadPool WorkerPool(2); + + std::atomic<bool> AbortFlag; + ParallelWork Work(AbortFlag); + for (uint32_t I = 0; I < 10; I++) + { + Work.ScheduleWork(WorkerPool, [I](std::atomic<bool>& AbortFlag) { + ZEN_UNUSED(AbortFlag); + if (I > 3) + { + throw std::runtime_error("We throw in async thread"); + } + else + { + Sleep(10); + } + }); + } + CHECK_THROWS_WITH(Work.Wait(), "We throw in async thread"); +} + +TEST_CASE("parallellwork.throws_in_dispatch") +{ + WorkerThreadPool WorkerPool(2); + std::atomic<uint32_t> ExecutedCount; + try + { + std::atomic<bool> AbortFlag; + ParallelWork Work(AbortFlag); + for (uint32_t I = 0; I < 5; I++) + { + Work.ScheduleWork(WorkerPool, [I, &ExecutedCount](std::atomic<bool>& AbortFlag) { + if (AbortFlag.load()) + { + return; + } + ExecutedCount++; + }); + if (I == 3) + { + throw std::runtime_error("We throw in dispatcher thread"); + } + } + CHECK(false); + } + catch (const std::runtime_error& Ex) + { + CHECK_EQ("We throw in dispatcher thread", std::string(Ex.what())); + CHECK_LE(ExecutedCount.load(), 4); + } +} + +void +parallellwork_forcelink() +{ +} +#endif // ZEN_WITH_TESTS + +} // namespace zen diff --git a/src/zenutil/zenutil.cpp b/src/zenutil/zenutil.cpp index aff9156f4..fe23b00c1 100644 --- a/src/zenutil/zenutil.cpp +++ b/src/zenutil/zenutil.cpp @@ -8,6 +8,7 @@ # include <zenutil/cache/rpcrecording.h> # include <zenutil/chunkedfile.h> # include <zenutil/commandlineoptions.h> +# include <zenutil/parallelwork.h> namespace zen { @@ -19,6 +20,7 @@ zenutil_forcelinktests() cacherequests_forcelink(); chunkedfile_forcelink(); commandlineoptions_forcelink(); + parallellwork_forcelink(); } } // namespace zen |