diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 1914 | ||||
| -rw-r--r-- | src/zen/zen.cpp | 2 | ||||
| -rw-r--r-- | src/zenutil/chunkedcontent.cpp | 13 | ||||
| -rw-r--r-- | src/zenutil/chunkedfile.cpp | 11 | ||||
| -rw-r--r-- | src/zenutil/chunkingcontroller.cpp | 12 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/chunkedfile.h | 3 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/chunkingcontroller.h | 7 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/parallellwork.h | 48 |
8 files changed, 1036 insertions, 974 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index ed1eb2d19..18cc7cf9e 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -28,6 +28,7 @@ #include <zenutil/workerpools.h> #include <zenutil/zenserverprocess.h> +#include <signal.h> #include <memory> ZEN_THIRD_PARTY_INCLUDES_START @@ -49,6 +50,21 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { namespace { + static std::atomic<bool> AbortFlag = false; + static void SignalCallbackHandler(int SigNum) + { + if (SigNum == SIGINT) + { + AbortFlag = true; + } +#if ZEN_PLATFORM_WINDOWS + if (SigNum == SIGBREAK) + { + AbortFlag = true; + } +#endif // ZEN_PLATFORM_WINDOWS + } + using namespace std::literals; static const size_t DefaultMaxBlockSize = 64u * 1024u * 1024u; @@ -273,8 +289,7 @@ namespace { const std::filesystem::path& Path, std::function<bool(const std::string_view& RelativePath)>&& IsAcceptedFolder, std::function<bool(std::string_view RelativePath, uint64_t Size, uint32_t Attributes)>&& IsAcceptedFile, - ChunkingController& ChunkController, - std::atomic<bool>& AbortFlag) + ChunkingController& ChunkController) { FolderContent Content = GetFolderContent( GetFolderContentStats, @@ -316,6 +331,10 @@ namespace { false); }, AbortFlag); + if (AbortFlag) + { + return {}; + } FilteredBytesHashed.Stop(); ProgressBar.Finish(); @@ -1214,7 +1233,6 @@ namespace { const ChunkedContentLookup& Lookup, BuildStorage& Storage, const Oid& BuildId, - std::atomic<bool>& AbortFlag, const std::vector<std::vector<uint32_t>>& NewBlockChunks, GeneratedBlocks& OutBlocks, DiskStatistics& DiskStats, @@ -1254,7 +1272,7 @@ namespace { const std::vector<uint32_t>& ChunksInBlock = NewBlockChunks[BlockIndex]; Work.ScheduleWork( GenerateBlobsPool, - [&, BlockIndex](std::atomic<bool>& AbortFlag) { + [&, BlockIndex](std::atomic<bool>&) { if (!AbortFlag) { FilteredGeneratedBytesPerSecond.Start(); @@ -1295,7 +1313,7 @@ namespace { PendingUploadCount++; Work.ScheduleWork( UploadBlocksPool, - [&, BlockIndex, Payload = std::move(Payload)](std::atomic<bool>& AbortFlag) { + [&, BlockIndex, Payload = std::move(Payload)](std::atomic<bool>&) { if (!AbortFlag) { if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount) @@ -1340,17 +1358,11 @@ namespace { } } }, - [&](const std::exception& Ex, std::atomic<bool>& AbortFlag) { - ZEN_CONSOLE("Failed uploading block. Reason: {}", Ex.what()); - AbortFlag = true; - }); + Work.DefaultErrorFunction()); } } }, - [&](const std::exception& Ex, std::atomic<bool>& AbortFlag) { - ZEN_CONSOLE("Failed generating block. Reason: {}", Ex.what()); - AbortFlag = true; - }); + Work.DefaultErrorFunction()); } Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { @@ -1394,7 +1406,6 @@ namespace { GeneratedBlocks& NewBlocks, std::span<const uint32_t> LooseChunkIndexes, const std::uint64_t LargeAttachmentSize, - std::atomic<bool>& AbortFlag, DiskStatistics& DiskStats, UploadStatistics& UploadStats, GenerateBlocksStatistics& GenerateBlocksStats, @@ -1424,9 +1435,6 @@ namespace { ChunkIndexToLooseChunkOrderIndex.insert_or_assign(LooseChunkIndexes[OrderIndex], OrderIndex); } - std::vector<IoHash> FoundChunkHashes; - FoundChunkHashes.reserve(RawHashes.size()); - std::vector<size_t> BlockIndexes; std::vector<uint32_t> LooseChunkOrderIndexes; @@ -1458,7 +1466,7 @@ namespace { auto AsyncUploadBlock = [&](const size_t BlockIndex, const IoHash BlockHash, CompositeBuffer&& Payload) { Work.ScheduleWork( UploadChunkPool, - [&, BlockIndex, BlockHash, Payload = CompositeBuffer(std::move(Payload))](std::atomic<bool>& AbortFlag) { + [&, BlockIndex, BlockHash, Payload = std::move(Payload)](std::atomic<bool>&) { if (!AbortFlag) { FilteredUploadedBytesPerSecond.Start(); @@ -1490,16 +1498,13 @@ namespace { } } }, - [&](const std::exception& Ex, std::atomic<bool>& AbortFlag) { - ZEN_CONSOLE("Failed uploading block. Reason: {}", Ex.what()); - AbortFlag = true; - }); + Work.DefaultErrorFunction()); }; auto AsyncUploadLooseChunk = [&](const IoHash& RawHash, CompositeBuffer&& Payload) { Work.ScheduleWork( UploadChunkPool, - [&, RawHash, Payload = CompositeBuffer(std::move(Payload))](std::atomic<bool>& AbortFlag) { + [&, RawHash, Payload = CompositeBuffer(std::move(Payload))](std::atomic<bool>&) { if (!AbortFlag) { const uint64_t PayloadSize = Payload.GetSize(); @@ -1536,16 +1541,13 @@ namespace { { Work.ScheduleWork( UploadChunkPool, - [Work = std::move(WorkPart)](std::atomic<bool>& AbortFlag) { + [Work = std::move(WorkPart)](std::atomic<bool>&) { if (!AbortFlag) { Work(); } }, - [&, RawHash](const std::exception& Ex, std::atomic<bool>& AbortFlag) { - ZEN_CONSOLE("Failed uploading multipart blob {}. Reason: {}", RawHash, Ex.what()); - AbortFlag = true; - }); + Work.DefaultErrorFunction()); } ZEN_CONSOLE_VERBOSE("Uploaded multipart chunk {} ({})", RawHash, NiceBytes(PayloadSize)); } @@ -1564,10 +1566,7 @@ namespace { } } }, - [&](const std::exception& Ex, std::atomic<bool>& AbortFlag) { - ZEN_CONSOLE("Failed uploading chunk. Reason: {}", Ex.what()); - AbortFlag = true; - }); + Work.DefaultErrorFunction()); }; std::vector<size_t> GenerateBlockIndexes; @@ -1581,7 +1580,6 @@ namespace { if (CompositeBuffer BlockPayload = std::move(NewBlocks.BlockBuffers[BlockIndex]); BlockPayload) { const IoHash& BlockHash = NewBlocks.BlockDescriptions[BlockIndex].BlockHash; - FoundChunkHashes.push_back(BlockHash); if (!AbortFlag) { AsyncUploadBlock(BlockIndex, BlockHash, std::move(BlockPayload)); @@ -1606,12 +1604,11 @@ namespace { for (const size_t BlockIndex : GenerateBlockIndexes) { const IoHash& BlockHash = NewBlocks.BlockDescriptions[BlockIndex].BlockHash; - FoundChunkHashes.push_back(BlockHash); if (!AbortFlag) { Work.ScheduleWork( ReadChunkPool, - [&, BlockIndex](std::atomic<bool>& AbortFlag) { + [&, BlockIndex](std::atomic<bool>&) { if (!AbortFlag) { FilteredGenerateBlockBytesPerSecond.Start(); @@ -1646,10 +1643,7 @@ namespace { NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); } }, - [&](const std::exception& Ex, std::atomic<bool>& AbortFlag) { - ZEN_CONSOLE("Failed generating block. Reason: {}", Ex.what()); - AbortFlag = true; - }); + Work.DefaultErrorFunction()); } } @@ -1662,7 +1656,7 @@ namespace { const uint32_t ChunkIndex = LooseChunkIndexes[CompressLooseChunkOrderIndex]; Work.ScheduleWork( ReadChunkPool, - [&, ChunkIndex](std::atomic<bool>& AbortFlag) { + [&, ChunkIndex](std::atomic<bool>&) { if (!AbortFlag) { FilteredCompressedBytesPerSecond.Start(); @@ -1686,12 +1680,7 @@ namespace { } } }, - [&, ChunkIndex](const std::exception& Ex, std::atomic<bool>& AbortFlag) { - ZEN_CONSOLE("Failed compressing part blob {}. Reason: {}", - Content.ChunkedContent.ChunkHashes[ChunkIndex], - Ex.what()); - AbortFlag = true; - }); + Work.DefaultErrorFunction()); } Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { @@ -1884,7 +1873,7 @@ namespace { return FilteredReuseBlockIndexes; }; - bool UploadFolder(BuildStorage& Storage, + void UploadFolder(BuildStorage& Storage, const Oid& BuildId, const Oid& BuildPartId, const std::string_view BuildPartName, @@ -1898,8 +1887,6 @@ namespace { { Stopwatch ProcessTimer; - std::atomic<bool> AbortFlag = false; - const std::filesystem::path ZenTempFolder = Path / ZenTempFolderName; CreateDirectories(ZenTempFolder); CleanDirectory(ZenTempFolder, {}); @@ -2101,15 +2088,14 @@ namespace { false); }, AbortFlag); + if (AbortFlag) + { + return; + } FilteredBytesHashed.Stop(); ProgressBar.Finish(); } - if (AbortFlag) - { - return true; - } - ZEN_CONSOLE("Found {} ({}) files divided into {} ({}) unique chunks in '{}' in {}. Average hash rate {}B/sec", LocalContent.Paths.size(), NiceBytes(TotalRawSize), @@ -2289,7 +2275,6 @@ namespace { LocalLookup, Storage, BuildId, - AbortFlag, NewBlockChunks, NewBlocks, DiskStats, @@ -2297,11 +2282,6 @@ namespace { GenerateBlocksStats); } - if (AbortFlag) - { - return true; - } - CbObject PartManifest; { CbObjectWriter PartManifestWriter; @@ -2508,7 +2488,6 @@ namespace { NewBlocks, LooseChunkIndexes, LargeAttachmentSize, - AbortFlag, DiskStats, TempUploadStats, TempGenerateBlocksStats, @@ -2558,21 +2537,8 @@ namespace { { break; } - if (AbortFlag) - { - return true; - } ZEN_CONSOLE_VERBOSE("FinalizeBuildPart needs attachments: {}", FormatArray<IoHash>(Needs, "\n "sv)); UploadAttachments(Needs); - if (AbortFlag) - { - return true; - } - } - - if (AbortFlag) - { - return true; } if (CreateBuild) @@ -2612,9 +2578,8 @@ namespace { std::vector<ChunkBlockDescription> VerifyBlockDescriptions = Storage.GetBlockMetadata(BuildId, BlockHashes); if (VerifyBlockDescriptions.size() != BlockHashes.size()) { - ZEN_CONSOLE("Uploaded blocks could not all be found, {} blocks are missing", - BlockHashes.size() - VerifyBlockDescriptions.size()); - return true; + throw std::runtime_error(fmt::format("Uploaded blocks could not all be found, {} blocks are missing", + BlockHashes.size() - VerifyBlockDescriptions.size())); } } @@ -2774,10 +2739,9 @@ namespace { BuildPartName, BuildPartId, NiceTimeSpanMs(ProcessTimer.GetElapsedTimeMs())); - return false; } - void VerifyFolder(const ChunkedFolderContent& Content, const std::filesystem::path& Path, std::atomic<bool>& AbortFlag) + void VerifyFolder(const ChunkedFolderContent& Content, const std::filesystem::path& Path) { ProgressBar ProgressBar(UsePlainProgress); std::atomic<uint64_t> FilesVerified(0); @@ -2822,7 +2786,7 @@ namespace { Work.ScheduleWork( VerifyPool, - [&, PathIndex](std::atomic<bool>& AbortFlag) { + [&, PathIndex](std::atomic<bool>&) { if (!AbortFlag) { // TODO: Convert ScheduleWork body to function @@ -2912,9 +2876,7 @@ namespace { FilesVerified++; } }, - [&, PathIndex](const std::exception& Ex, std::atomic<bool>& AbortFlag) { - ZEN_UNUSED(AbortFlag); - + [&, PathIndex](const std::exception& Ex, std::atomic<bool>&) { ErrorLock.WithExclusiveLock([&]() { Errors.push_back(fmt::format("Failed verifying file '{}'. Reason: {}", (Path / Content.Paths[PathIndex]).make_preferred(), @@ -3085,36 +3047,39 @@ namespace { { if (!WriteOps.empty()) { - std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOpData& Lhs, const WriteOpData& Rhs) { - if (Lhs.Target->PathIndex < Rhs.Target->PathIndex) - { - return true; - } - if (Lhs.Target->PathIndex > Rhs.Target->PathIndex) + if (!AbortFlag) + { + std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOpData& Lhs, const WriteOpData& Rhs) { + if (Lhs.Target->PathIndex < Rhs.Target->PathIndex) + { + return true; + } + if (Lhs.Target->PathIndex > Rhs.Target->PathIndex) + { + return false; + } + return Lhs.Target->Offset < Rhs.Target->Offset; + }); + + WriteFileCache OpenFileCache; + for (const WriteOpData& WriteOp : WriteOps) { - return false; + const CompositeBuffer& Chunk = ChunkBuffers[WriteOp.ChunkBufferIndex]; + const uint32_t PathIndex = WriteOp.Target->PathIndex; + const uint64_t ChunkSize = Chunk.GetSize(); + const uint64_t FileOffset = WriteOp.Target->Offset; + ZEN_ASSERT(FileOffset + ChunkSize <= Content.RawSizes[PathIndex]); + + OpenFileCache.WriteToFile<CompositeBuffer>( + PathIndex, + [&Path, &Content](uint32_t TargetIndex) { return (Path / Content.Paths[TargetIndex]).make_preferred(); }, + Chunk, + FileOffset, + Content.RawSizes[PathIndex]); + OutBytesWritten += ChunkSize; } - return Lhs.Target->Offset < Rhs.Target->Offset; - }); - - WriteFileCache OpenFileCache; - for (const WriteOpData& WriteOp : WriteOps) - { - const CompositeBuffer& Chunk = ChunkBuffers[WriteOp.ChunkBufferIndex]; - const uint32_t PathIndex = WriteOp.Target->PathIndex; - const uint64_t ChunkSize = Chunk.GetSize(); - const uint64_t FileOffset = WriteOp.Target->Offset; - ZEN_ASSERT(FileOffset + ChunkSize <= Content.RawSizes[PathIndex]); - - OpenFileCache.WriteToFile<CompositeBuffer>( - PathIndex, - [&Path, &Content](uint32_t TargetIndex) { return (Path / Content.Paths[TargetIndex]).make_preferred(); }, - Chunk, - FileOffset, - Content.RawSizes[PathIndex]); - OutBytesWritten += ChunkSize; + OutChunksComplete += gsl::narrow<uint32_t>(ChunkBuffers.size()); } - OutChunksComplete += gsl::narrow<uint32_t>(ChunkBuffers.size()); } return true; } @@ -3186,7 +3151,6 @@ namespace { ParallellWork& Work, WorkerThreadPool& WritePool, WorkerThreadPool& NetworkPool, - std::atomic<bool>& AbortFlag, std::atomic<uint64_t>& BytesWritten, std::atomic<uint64_t>& WriteToDiskBytes, std::atomic<uint64_t>& BytesDownloaded, @@ -3225,8 +3189,8 @@ namespace { &WriteToDiskBytes, &DownloadedChunks, &ChunksComplete, - ChunkTargetPtrs = std::vector<const ChunkedContentLookup::ChunkLocation*>(ChunkTargetPtrs), - &AbortFlag](uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining) { + ChunkTargetPtrs = std::vector<const ChunkedContentLookup::ChunkLocation*>( + ChunkTargetPtrs)](uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining) { BytesDownloaded += Chunk.GetSize(); LooseChunksBytes += Chunk.GetSize(); @@ -3249,7 +3213,7 @@ namespace { &ChunksComplete, &BytesWritten, &WriteToDiskBytes, - ChunkTargetPtrs](std::atomic<bool>& AbortFlag) { + ChunkTargetPtrs](std::atomic<bool>&) { if (!AbortFlag) { uint64_t CompressedSize = Workload->TempFile.FileSize(); @@ -3276,6 +3240,7 @@ namespace { // ZEN_ASSERT_SLOW(ChunkHash == // IoHash::HashBuffer(Chunk.AsIoBuffer())); + if (!AbortFlag) { WriteFileCache OpenFileCache; @@ -3291,10 +3256,7 @@ namespace { } } }, - [&, ChunkHash](const std::exception& Ex, std::atomic<bool>& AbortFlag) { - ZEN_CONSOLE("Failed writing chunk {}. Reason: {}", ChunkHash, Ex.what()); - AbortFlag = true; - }); + Work.DefaultErrorFunction()); } } }); @@ -3306,20 +3268,17 @@ namespace { { Work.ScheduleWork( NetworkPool, - [WorkItem = std::move(WorkItem)](std::atomic<bool>& AbortFlag) { + [WorkItem = std::move(WorkItem)](std::atomic<bool>&) { if (!AbortFlag) { WorkItem(); } }, - [&, ChunkHash](const std::exception& Ex, std::atomic<bool>& AbortFlag) { - ZEN_CONSOLE("Failed uploading multipart blob {}. Reason: {}", ChunkHash, Ex.what()); - AbortFlag = true; - }); + Work.DefaultErrorFunction()); } } - bool UpdateFolder(BuildStorage& Storage, + void UpdateFolder(BuildStorage& Storage, const Oid& BuildId, const std::filesystem::path& Path, const std::uint64_t LargeAttachmentSize, @@ -3329,7 +3288,6 @@ namespace { const std::vector<ChunkBlockDescription>& BlockDescriptions, const std::vector<IoHash>& LooseChunkHashes, bool WipeTargetFolder, - std::atomic<bool>& AbortFlag, FolderContent& OutLocalFolderState) { std::atomic<uint64_t> DownloadedBlocks = 0; @@ -3433,7 +3391,7 @@ namespace { if (AbortFlag) { - return true; + return; } CleanDirectory(Path, DefaultExcludeFolders); @@ -3646,7 +3604,7 @@ namespace { Work.ScheduleWork( WritePool, // GetSyncWorkerPool(),// - [&, CopyDataIndex](std::atomic<bool>& AbortFlag) { + [&, CopyDataIndex](std::atomic<bool>&) { if (!AbortFlag) { const CacheCopyData& CopyData = CacheCopyDatas[CopyDataIndex]; @@ -3703,6 +3661,10 @@ namespace { WriteFileCache OpenFileCache; for (const WriteOp& Op : WriteOps) { + if (AbortFlag) + { + break; + } const uint32_t RemotePathIndex = Op.Target->PathIndex; const uint64_t ChunkSize = Op.ChunkSize; CompositeBuffer ChunkSource = SourceFile.GetRange(Op.LocalFileOffset, ChunkSize); @@ -3721,15 +3683,18 @@ namespace { WriteToDiskBytes += ChunkSize; CacheLocalFileBytesRead += ChunkSize; // TODO: This should be the sum of unique chunk sizes? } - ChunkCountWritten += gsl::narrow<uint32_t>(CopyData.ChunkTargets.size()); - ZEN_DEBUG("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), CopyData.OriginalSourceFileName); + if (!AbortFlag) + { + ChunkCountWritten += gsl::narrow<uint32_t>(CopyData.ChunkTargets.size()); + ZEN_DEBUG("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), CopyData.OriginalSourceFileName); + } } if (CopyData.RemotePathIndexes.empty()) { std::filesystem::remove(CacheFilePath); } - else + else if (!AbortFlag) { uint64_t LocalBytesWritten = 0; CloneFullFileFromCache(Path, @@ -3753,12 +3718,7 @@ namespace { } } }, - [&, CopyDataIndex](const std::exception& Ex, std::atomic<bool>& AbortFlag) { - ZEN_CONSOLE("Failed reading cached file {}. Reason: {}", - CacheCopyDatas[CopyDataIndex].OriginalSourceFileName, - Ex.what()); - AbortFlag = true; - }); + Work.DefaultErrorFunction()); } for (const IoHash ChunkHash : LooseChunkHashes) @@ -3788,7 +3748,7 @@ namespace { { Work.ScheduleWork( NetworkPool, - [&, ChunkHash, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>& AbortFlag) { + [&, ChunkHash, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) { if (!AbortFlag) { if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize) @@ -3804,7 +3764,6 @@ namespace { Work, WritePool, NetworkPool, - AbortFlag, BytesWritten, WriteToDiskBytes, BytesDownloaded, @@ -3829,7 +3788,7 @@ namespace { Work.ScheduleWork( WritePool, [&, ChunkHash, RemoteChunkIndex, ChunkTargetPtrs, CompressedPart = std::move(CompressedPart)]( - std::atomic<bool>& AbortFlag) { + std::atomic<bool>&) { if (!AbortFlag) { uint64_t TotalBytesWritten = 0; @@ -3850,18 +3809,12 @@ namespace { WriteToDiskBytes += TotalBytesWritten; } }, - [&, ChunkHash](const std::exception& Ex, std::atomic<bool>& AbortFlag) { - ZEN_CONSOLE("Failed writing chunk {}. Reason: {}", ChunkHash, Ex.what()); - AbortFlag = true; - }); + Work.DefaultErrorFunction()); } } } }, - [&, ChunkHash](const std::exception& Ex, std::atomic<bool>& AbortFlag) { - ZEN_CONSOLE("Failed downloading chunk {}. Reason: {}", ChunkHash, Ex.what()); - AbortFlag = true; - }); + Work.DefaultErrorFunction()); } } } @@ -3893,98 +3846,97 @@ namespace { } Work.ScheduleWork( WritePool, - [&, BlockIndex](std::atomic<bool>& AbortFlag) { + [&, BlockIndex](std::atomic<bool>&) { if (!AbortFlag) { if (IsBlockNeeded(BlockDescriptions[BlockIndex])) { Work.ScheduleWork( NetworkPool, - [&, BlockIndex](std::atomic<bool>& AbortFlag) { - IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockDescriptions[BlockIndex].BlockHash); - if (!BlockBuffer) - { - throw std::runtime_error( - fmt::format("Block {} is missing", BlockDescriptions[BlockIndex].BlockHash)); - } - BytesDownloaded += BlockBuffer.GetSize(); - BlockBytes += BlockBuffer.GetSize(); - DownloadedBlocks++; - + [&, BlockIndex](std::atomic<bool>&) { if (!AbortFlag) { - Work.ScheduleWork( - WritePool, - [&, BlockIndex, BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>& AbortFlag) { - if (!AbortFlag) - { - IoHash BlockRawHash; - uint64_t BlockRawSize; - CompressedBuffer CompressedBlockBuffer = - CompressedBuffer::FromCompressed(SharedBuffer(std::move(BlockBuffer)), - BlockRawHash, - BlockRawSize); - if (!CompressedBlockBuffer) - { - throw std::runtime_error(fmt::format("Block {} is not a compressed buffer", - BlockDescriptions[BlockIndex].BlockHash)); - } - - if (BlockRawHash != BlockDescriptions[BlockIndex].BlockHash) - { - throw std::runtime_error( - fmt::format("Block {} header has a mismatching raw hash {}", - BlockDescriptions[BlockIndex].BlockHash, - BlockRawHash)); - } - - CompositeBuffer DecompressedBlockBuffer = CompressedBlockBuffer.DecompressToComposite(); - if (!DecompressedBlockBuffer) - { - throw std::runtime_error(fmt::format("Block {} failed to decompress", - BlockDescriptions[BlockIndex].BlockHash)); - } + IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockDescriptions[BlockIndex].BlockHash); + if (!BlockBuffer) + { + throw std::runtime_error( + fmt::format("Block {} is missing", BlockDescriptions[BlockIndex].BlockHash)); + } + BytesDownloaded += BlockBuffer.GetSize(); + BlockBytes += BlockBuffer.GetSize(); + DownloadedBlocks++; - ZEN_ASSERT_SLOW(BlockDescriptions[BlockIndex].BlockHash == - IoHash::HashBuffer(DecompressedBlockBuffer)); - - uint64_t BytesWrittenToDisk = 0; - uint32_t ChunksReadFromBlock = 0; - if (WriteBlockToDisk(Path, - RemoteContent, - RemotePathIndexWantsCopyFromCacheFlags, - DecompressedBlockBuffer, - RemoteLookup, - RemoteChunkIndexNeedsCopyFromSourceFlags.data(), - ChunksReadFromBlock, - BytesWrittenToDisk)) - { - BytesWritten += BytesWrittenToDisk; - WriteToDiskBytes += BytesWrittenToDisk; - ChunkCountWritten += ChunksReadFromBlock; - } - else + if (!AbortFlag) + { + Work.ScheduleWork( + WritePool, + [&, BlockIndex, BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) { + if (!AbortFlag) { - throw std::runtime_error( - fmt::format("Block {} is malformed", BlockDescriptions[BlockIndex].BlockHash)); + IoHash BlockRawHash; + uint64_t BlockRawSize; + CompressedBuffer CompressedBlockBuffer = + CompressedBuffer::FromCompressed(SharedBuffer(std::move(BlockBuffer)), + BlockRawHash, + BlockRawSize); + if (!CompressedBlockBuffer) + { + throw std::runtime_error(fmt::format("Block {} is not a compressed buffer", + BlockDescriptions[BlockIndex].BlockHash)); + } + + if (BlockRawHash != BlockDescriptions[BlockIndex].BlockHash) + { + throw std::runtime_error( + fmt::format("Block {} header has a mismatching raw hash {}", + BlockDescriptions[BlockIndex].BlockHash, + BlockRawHash)); + } + + CompositeBuffer DecompressedBlockBuffer = + CompressedBlockBuffer.DecompressToComposite(); + if (!DecompressedBlockBuffer) + { + throw std::runtime_error(fmt::format("Block {} failed to decompress", + BlockDescriptions[BlockIndex].BlockHash)); + } + + ZEN_ASSERT_SLOW(BlockDescriptions[BlockIndex].BlockHash == + IoHash::HashBuffer(DecompressedBlockBuffer)); + + uint64_t BytesWrittenToDisk = 0; + uint32_t ChunksReadFromBlock = 0; + if (WriteBlockToDisk(Path, + RemoteContent, + RemotePathIndexWantsCopyFromCacheFlags, + DecompressedBlockBuffer, + RemoteLookup, + RemoteChunkIndexNeedsCopyFromSourceFlags.data(), + ChunksReadFromBlock, + BytesWrittenToDisk)) + { + BytesWritten += BytesWrittenToDisk; + WriteToDiskBytes += BytesWrittenToDisk; + ChunkCountWritten += ChunksReadFromBlock; + } + else + { + throw std::runtime_error(fmt::format("Block {} is malformed", + BlockDescriptions[BlockIndex].BlockHash)); + } + BlocksComplete++; } - BlocksComplete++; - } - }, - [&, BlockIndex](const std::exception& Ex, std::atomic<bool>& AbortFlag) { - ZEN_CONSOLE("Failed writing block {}. Reason: {}", - BlockDescriptions[BlockIndex].BlockHash, - Ex.what()); - AbortFlag = true; - }); + }, + [&, BlockIndex](const std::exception& Ex, std::atomic<bool>&) { + ZEN_ERROR("Failed writing block {}. Reason: {}", + BlockDescriptions[BlockIndex].BlockHash, + Ex.what()); + AbortFlag = true; + }); + } } }, - [&, BlockIndex](const std::exception& Ex, std::atomic<bool>& AbortFlag) { - ZEN_CONSOLE("Failed downloading block {}. Reason: {}", - BlockDescriptions[BlockIndex].BlockHash, - Ex.what()); - AbortFlag = true; - }); + Work.DefaultErrorFunction()); } else { @@ -3993,10 +3945,7 @@ namespace { } } }, - [&, BlockIndex](const std::exception& Ex, std::atomic<bool>& AbortFlag) { - ZEN_CONSOLE("Failed determning if block {} is needed. Reason: {}", BlockDescriptions[BlockIndex].BlockHash, Ex.what()); - AbortFlag = true; - }); + Work.DefaultErrorFunction()); } for (uint32_t PathIndex = 0; PathIndex < RemoteContent.Paths.size(); PathIndex++) { @@ -4008,7 +3957,7 @@ namespace { { Work.ScheduleWork( WritePool, - [&, PathIndex](std::atomic<bool>& AbortFlag) { + [&, PathIndex](std::atomic<bool>&) { if (!AbortFlag) { const std::filesystem::path TargetPath = (Path / RemoteContent.Paths[PathIndex]).make_preferred(); @@ -4017,10 +3966,7 @@ namespace { OutputFile.Open(TargetPath, BasicFile::Mode::kTruncate); } }, - [&, PathIndex](const std::exception& Ex, std::atomic<bool>& AbortFlag) { - ZEN_CONSOLE("Failed creating file at {}. Reason: {}", RemoteContent.Paths[PathIndex], Ex.what()); - AbortFlag = true; - }); + Work.DefaultErrorFunction()); } } @@ -4040,6 +3986,12 @@ namespace { .RemainingCount = gsl::narrow<uint64_t>(ChunkCountToWrite - ChunkCountWritten.load())}, false); }); + + if (AbortFlag) + { + return; + } + WriteProgressBar.Finish(); { @@ -4109,8 +4061,6 @@ namespace { } PremissionsProgressBar.Finish(); } - - return false; } std::vector<std::pair<Oid, std::string>> ResolveBuildPartNames(BuildStorage& Storage, @@ -4421,8 +4371,7 @@ namespace { ChunkedFolderContent GetLocalContent(GetFolderContentStatistics& LocalFolderScanStats, ChunkingStatistics& ChunkingStats, const std::filesystem::path& Path, - ChunkingController& ChunkController, - std::atomic<bool>& AbortFlag) + ChunkingController& ChunkController) { ChunkedFolderContent LocalContent; @@ -4564,6 +4513,26 @@ namespace { } else { + // Remove files from LocalContent no longer in LocalFolderState + tsl::robin_set<std::string> LocalFolderPaths; + LocalFolderPaths.reserve(LocalFolderState.Paths.size()); + for (const std::filesystem::path& LocalFolderPath : LocalFolderState.Paths) + { + LocalFolderPaths.insert(LocalFolderPath.generic_string()); + } + std::vector<std::filesystem::path> DeletedPaths; + for (const std::filesystem::path& LocalContentPath : LocalContent.Paths) + { + if (!LocalFolderPaths.contains(LocalContentPath.generic_string())) + { + DeletedPaths.push_back(LocalContentPath); + } + } + if (!DeletedPaths.empty()) + { + LocalContent = DeletePathsFromChunkedContent(LocalContent, DeletedPaths); + } + ZEN_CONSOLE("Using cached local state"); } ZEN_CONSOLE("Read local state in {}", NiceLatencyNs(ReadStateTimer.GetElapsedTimeUs() * 1000)); @@ -4611,18 +4580,19 @@ namespace { false); }, AbortFlag); - FilteredBytesHashed.Stop(); - ProgressBar.Finish(); if (AbortFlag) { return {}; } + + FilteredBytesHashed.Stop(); + ProgressBar.Finish(); } return LocalContent; } - bool DownloadFolder(BuildStorage& Storage, + void DownloadFolder(BuildStorage& Storage, const Oid& BuildId, const std::vector<Oid>& BuildPartIds, std::span<const std::string> BuildPartNames, @@ -4630,8 +4600,7 @@ namespace { bool AllowMultiparts, bool WipeTargetFolder) { - Stopwatch DownloadTimer; - std::atomic<bool> AbortFlag(false); + Stopwatch DownloadTimer; const std::filesystem::path ZenTempFolder = Path / ZenTempFolderName; CreateDirectories(ZenTempFolder); @@ -4672,19 +4641,19 @@ namespace { { if (!WipeTargetFolder) { - LocalContent = GetLocalContent(LocalFolderScanStats, ChunkingStats, Path, *ChunkController, AbortFlag); + LocalContent = GetLocalContent(LocalFolderScanStats, ChunkingStats, Path, *ChunkController); } } else { CreateDirectories(Path); } - if (AbortFlag.load()) + if (AbortFlag) { - return true; + return; } - auto CompareContent = [](const ChunkedFolderContent& Lsh, const ChunkedFolderContent& Rhs) { + auto CompareContent = [](const ChunkedFolderContent& Lhs, const ChunkedFolderContent& Rhs) { tsl::robin_map<std::string, size_t> RhsPathToIndex; const size_t RhsPathCount = Rhs.Paths.size(); RhsPathToIndex.reserve(RhsPathCount); @@ -4692,14 +4661,14 @@ namespace { { RhsPathToIndex.insert({Rhs.Paths[RhsPathIndex].generic_string(), RhsPathIndex}); } - const size_t LhsPathCount = Lsh.Paths.size(); + const size_t LhsPathCount = Lhs.Paths.size(); for (size_t LhsPathIndex = 0; LhsPathIndex < LhsPathCount; LhsPathIndex++) { - if (auto It = RhsPathToIndex.find(Lsh.Paths[LhsPathIndex].generic_string()); It != RhsPathToIndex.end()) + if (auto It = RhsPathToIndex.find(Lhs.Paths[LhsPathIndex].generic_string()); It != RhsPathToIndex.end()) { const size_t RhsPathIndex = It->second; - if ((Lsh.RawHashes[LhsPathIndex] != Rhs.RawHashes[RhsPathIndex]) || - (!FolderContent::AreFileAttributesEqual(Lsh.Attributes[LhsPathIndex], Rhs.Attributes[RhsPathIndex]))) + if ((Lhs.RawHashes[LhsPathIndex] != Rhs.RawHashes[RhsPathIndex]) || + (!FolderContent::AreFileAttributesEqual(Lhs.Attributes[LhsPathIndex], Rhs.Attributes[RhsPathIndex]))) { return false; } @@ -4709,6 +4678,20 @@ namespace { return false; } } + tsl::robin_set<std::string> LhsPathExists; + LhsPathExists.reserve(LhsPathCount); + for (size_t LhsPathIndex = 0; LhsPathIndex < LhsPathCount; LhsPathIndex++) + { + LhsPathExists.insert({Lhs.Paths[LhsPathIndex].generic_string()}); + } + for (size_t RhsPathIndex = 0; RhsPathIndex < RhsPathCount; RhsPathIndex++) + { + if (!LhsPathExists.contains(Rhs.Paths[RhsPathIndex].generic_string())) + { + return false; + } + } + return true; }; @@ -4726,49 +4709,42 @@ namespace { } ZEN_CONSOLE("Downloading build {}, parts:{}", BuildId, SB.ToView()); FolderContent LocalFolderState; - if (UpdateFolder(Storage, - BuildId, - Path, - LargeAttachmentSize, - PreferredMultipartChunkSize, - LocalContent, - RemoteContent, - BlockDescriptions, - LooseChunkHashes, - WipeTargetFolder, - AbortFlag, - LocalFolderState)) - { - AbortFlag = true; - } + UpdateFolder(Storage, + BuildId, + Path, + LargeAttachmentSize, + PreferredMultipartChunkSize, + LocalContent, + RemoteContent, + BlockDescriptions, + LooseChunkHashes, + WipeTargetFolder, + LocalFolderState); + if (!AbortFlag) { - VerifyFolder(RemoteContent, Path, AbortFlag); - } + VerifyFolder(RemoteContent, Path); - Stopwatch WriteStateTimer; - CbObject StateObject = CreateStateObject(BuildId, AllBuildParts, PartContents, LocalFolderState); + Stopwatch WriteStateTimer; + CbObject StateObject = CreateStateObject(BuildId, AllBuildParts, PartContents, LocalFolderState); - CreateDirectories((Path / ZenStateFilePath).parent_path()); - TemporaryFile::SafeWriteFile(Path / ZenStateFilePath, StateObject.GetView()); - ZEN_CONSOLE("Wrote local state in {}", NiceLatencyNs(WriteStateTimer.GetElapsedTimeUs() * 1000)); + CreateDirectories((Path / ZenStateFilePath).parent_path()); + TemporaryFile::SafeWriteFile(Path / ZenStateFilePath, StateObject.GetView()); + ZEN_CONSOLE("Wrote local state in {}", NiceLatencyNs(WriteStateTimer.GetElapsedTimeUs() * 1000)); #if 0 - ExtendableStringBuilder<1024> SB; - CompactBinaryToJson(StateObject, SB); - WriteFile(Path / ZenStateFileJsonPath, IoBuffer(IoBuffer::Wrap, SB.Data(), SB.Size())); + ExtendableStringBuilder<1024> SB; + CompactBinaryToJson(StateObject, SB); + WriteFile(Path / ZenStateFileJsonPath, IoBuffer(IoBuffer::Wrap, SB.Data(), SB.Size())); #endif // 0 - ZEN_CONSOLE("Downloaded build in {}.", NiceLatencyNs(DownloadTimer.GetElapsedTimeUs() * 1000)); + ZEN_CONSOLE("Downloaded build in {}.", NiceLatencyNs(DownloadTimer.GetElapsedTimeUs() * 1000)); + } } - - return AbortFlag.load(); } - bool DiffFolders(const std::filesystem::path& BasePath, const std::filesystem::path& ComparePath, bool OnlyChunked) + void DiffFolders(const std::filesystem::path& BasePath, const std::filesystem::path& ComparePath, bool OnlyChunked) { - std::atomic<bool> AbortFlag(false); - ChunkedFolderContent BaseFolderContent; ChunkedFolderContent CompareFolderContent; @@ -4818,8 +4794,11 @@ namespace { BasePath, IsAcceptedFolder, IsAcceptedFile, - *ChunkController, - AbortFlag); + *ChunkController); + if (AbortFlag) + { + return; + } GetFolderContentStatistics CompareGetFolderContentStats; ChunkingStatistics CompareChunkingStats; @@ -4828,8 +4807,12 @@ namespace { ComparePath, IsAcceptedFolder, IsAcceptedFile, - *ChunkController, - AbortFlag); + *ChunkController); + + if (AbortFlag) + { + return; + } } std::vector<IoHash> AddedHashes; @@ -4918,8 +4901,6 @@ namespace { NewChunkCount, NiceBytes(NewChunkSize), NewPercent); - - return false; } } // namespace @@ -5190,6 +5171,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { ZEN_UNUSED(GlobalOptions); + signal(SIGINT, SignalCallbackHandler); +#if ZEN_PLATFORM_WINDOWS + signal(SIGBREAK, SignalCallbackHandler); +#endif // ZEN_PLATFORM_WINDOWS + using namespace std::literals; std::vector<char*> SubCommandArguments; @@ -5320,786 +5306,798 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) }; ParseOutputOptions(); - if (SubOption == &m_ListOptions) + try { - ParseStorageOptions(); - ParseAuthOptions(); - - HttpClient Http(m_BuildsUrl, ClientSettings); - - CbObjectWriter QueryWriter; - QueryWriter.BeginObject("query"); + if (SubOption == &m_ListOptions) { - // QueryWriter.BeginObject("platform"); - // { - // QueryWriter.AddString("$eq", "Windows"); - // } - // QueryWriter.EndObject(); // changelist - } - QueryWriter.EndObject(); // query - - BuildStorage::Statistics StorageStats; - std::unique_ptr<BuildStorage> Storage; - if (!m_BuildsUrl.empty()) - { - ZEN_CONSOLE("Querying builds in cloud endpoint '{}'. SessionId: '{}'. Namespace '{}', Bucket '{}'", - m_BuildsUrl, - Http.GetSessionId(), - m_Namespace, - m_Bucket); - Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, std::filesystem::path{}); - } - else if (!m_StoragePath.empty()) - { - ZEN_CONSOLE("Querying builds in folder '{}'.", m_StoragePath); - Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false); // , .0015, 0.00004 - } - else - { - throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); - } - - CbObject Response = Storage->ListBuilds(QueryWriter.Save()); - ExtendableStringBuilder<1024> SB; - CompactBinaryToJson(Response.GetView(), SB); - ZEN_CONSOLE("{}", SB.ToView()); - return 0; - } + ParseStorageOptions(); + ParseAuthOptions(); - if (SubOption == &m_UploadOptions) - { - ParseStorageOptions(); - ParseAuthOptions(); + HttpClient Http(m_BuildsUrl, ClientSettings); - HttpClient Http(m_BuildsUrl, ClientSettings); - - if (m_Path.empty()) - { - throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_UploadOptions.help())); - } - - if (m_CreateBuild) - { - if (m_BuildMetadataPath.empty() && m_BuildMetadata.empty()) + CbObjectWriter QueryWriter; + QueryWriter.BeginObject("query"); { - throw zen::OptionParseException(fmt::format("Options for builds target are missing\n{}", m_UploadOptions.help())); + // QueryWriter.BeginObject("platform"); + // { + // QueryWriter.AddString("$eq", "Windows"); + // } + // QueryWriter.EndObject(); // changelist } - if (!m_BuildMetadataPath.empty() && !m_BuildMetadata.empty()) + QueryWriter.EndObject(); // query + + BuildStorage::Statistics StorageStats; + std::unique_ptr<BuildStorage> Storage; + if (!m_BuildsUrl.empty()) { - throw zen::OptionParseException(fmt::format("Conflicting options for builds target\n{}", m_UploadOptions.help())); + ZEN_CONSOLE("Querying builds in cloud endpoint '{}'. SessionId: '{}'. Namespace '{}', Bucket '{}'", + m_BuildsUrl, + Http.GetSessionId(), + m_Namespace, + m_Bucket); + Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, std::filesystem::path{}); } - } - else - { - if (!m_BuildMetadataPath.empty()) + else if (!m_StoragePath.empty()) { - throw zen::OptionParseException( - fmt::format("metadata-path option is only valid if creating a build\n{}", m_UploadOptions.help())); + ZEN_CONSOLE("Querying builds in folder '{}'.", m_StoragePath); + Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false); // , .0015, 0.00004 } - if (!m_BuildMetadata.empty()) + else { - throw zen::OptionParseException( - fmt::format("metadata option is only valid if creating a build\n{}", m_UploadOptions.help())); + throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); } - } - if (m_BuildPartName.empty()) - { - m_BuildPartName = m_Path.filename().string(); + CbObject Response = Storage->ListBuilds(QueryWriter.Save()); + ExtendableStringBuilder<1024> SB; + CompactBinaryToJson(Response.GetView(), SB); + ZEN_CONSOLE("{}", SB.ToView()); + return 0; } - const bool GeneratedBuildId = m_BuildId.empty(); - if (GeneratedBuildId) + if (SubOption == &m_UploadOptions) { - m_BuildId = Oid::NewOid().ToString(); - } - else if (m_BuildId.length() != Oid::StringLength) - { - throw zen::OptionParseException(fmt::format("Invalid build id\n{}", m_UploadOptions.help())); - } - else if (Oid::FromHexString(m_BuildId) == Oid::Zero) - { - throw zen::OptionParseException(fmt::format("Invalid build id\n{}", m_UploadOptions.help())); - } + ParseStorageOptions(); + ParseAuthOptions(); - const bool GeneratedBuildPartId = m_BuildPartId.empty(); - if (GeneratedBuildPartId) - { - m_BuildPartId = Oid::NewOid().ToString(); - } - else if (m_BuildPartId.length() != Oid::StringLength) - { - throw zen::OptionParseException(fmt::format("Invalid build id\n{}", m_UploadOptions.help())); - } - else if (Oid::FromHexString(m_BuildPartId) == Oid::Zero) - { - throw zen::OptionParseException(fmt::format("Invalid build part id\n{}", m_UploadOptions.help())); - } + HttpClient Http(m_BuildsUrl, ClientSettings); - BuildStorage::Statistics StorageStats; - const Oid BuildId = Oid::FromHexString(m_BuildId); - const Oid BuildPartId = Oid::FromHexString(m_BuildPartId); - std::unique_ptr<BuildStorage> Storage; - std::string StorageName; - if (!m_BuildsUrl.empty()) - { - ZEN_CONSOLE("Uploading '{}' from '{}' to cloud endpoint '{}'. SessionId: '{}'. Namespace '{}', Bucket '{}', {}BuildId '{}'", - m_BuildPartName, - m_Path, - m_BuildsUrl, - Http.GetSessionId(), - m_Namespace, - m_Bucket, - GeneratedBuildId ? "Generated " : "", - BuildId); - CreateDirectories(m_Path / ZenTempStorageFolderName); - Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName); - StorageName = "Cloud DDC"; - } - else if (!m_StoragePath.empty()) - { - ZEN_CONSOLE("Uploading '{}' from '{}' to folder '{}'. {}BuildId '{}'", - m_BuildPartName, - m_Path, - m_StoragePath, - GeneratedBuildId ? "Generated " : "", - BuildId); - Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, m_WriteMetadataAsJson); // , .0015, 0.00004 - StorageName = fmt::format("Disk {}", m_StoragePath.stem()); - } - else - { - throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); - } + if (m_Path.empty()) + { + throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_UploadOptions.help())); + } - CbObject MetaData; - if (m_CreateBuild) - { - if (!m_BuildMetadataPath.empty()) + if (m_CreateBuild) { - std::filesystem::path MetadataPath(m_BuildMetadataPath); - IoBuffer MetaDataJson = ReadFile(MetadataPath).Flatten(); - std::string_view Json(reinterpret_cast<const char*>(MetaDataJson.GetData()), MetaDataJson.GetSize()); - std::string JsonError; - MetaData = LoadCompactBinaryFromJson(Json, JsonError).AsObject(); - if (!JsonError.empty()) + if (m_BuildMetadataPath.empty() && m_BuildMetadata.empty()) { - throw std::runtime_error( - fmt::format("build metadata file '{}' is malformed. Reason: '{}'", m_BuildMetadataPath, JsonError)); + throw zen::OptionParseException(fmt::format("Options for builds target are missing\n{}", m_UploadOptions.help())); + } + if (!m_BuildMetadataPath.empty() && !m_BuildMetadata.empty()) + { + throw zen::OptionParseException(fmt::format("Conflicting options for builds target\n{}", m_UploadOptions.help())); } } - if (!m_BuildMetadata.empty()) + else + { + if (!m_BuildMetadataPath.empty()) + { + throw zen::OptionParseException( + fmt::format("metadata-path option is only valid if creating a build\n{}", m_UploadOptions.help())); + } + if (!m_BuildMetadata.empty()) + { + throw zen::OptionParseException( + fmt::format("metadata option is only valid if creating a build\n{}", m_UploadOptions.help())); + } + } + + if (m_BuildPartName.empty()) + { + m_BuildPartName = m_Path.filename().string(); + } + + const bool GeneratedBuildId = m_BuildId.empty(); + if (GeneratedBuildId) + { + m_BuildId = Oid::NewOid().ToString(); + } + else if (m_BuildId.length() != Oid::StringLength) + { + throw zen::OptionParseException(fmt::format("Invalid build id\n{}", m_UploadOptions.help())); + } + else if (Oid::FromHexString(m_BuildId) == Oid::Zero) + { + throw zen::OptionParseException(fmt::format("Invalid build id\n{}", m_UploadOptions.help())); + } + + const bool GeneratedBuildPartId = m_BuildPartId.empty(); + if (GeneratedBuildPartId) + { + m_BuildPartId = Oid::NewOid().ToString(); + } + else if (m_BuildPartId.length() != Oid::StringLength) + { + throw zen::OptionParseException(fmt::format("Invalid build id\n{}", m_UploadOptions.help())); + } + else if (Oid::FromHexString(m_BuildPartId) == Oid::Zero) + { + throw zen::OptionParseException(fmt::format("Invalid build part id\n{}", m_UploadOptions.help())); + } + + BuildStorage::Statistics StorageStats; + const Oid BuildId = Oid::FromHexString(m_BuildId); + const Oid BuildPartId = Oid::FromHexString(m_BuildPartId); + std::unique_ptr<BuildStorage> Storage; + std::string StorageName; + if (!m_BuildsUrl.empty()) { - CbObjectWriter MetaDataWriter(1024); - ForEachStrTok(m_BuildMetadata, ';', [&](std::string_view Pair) { - size_t SplitPos = Pair.find('='); - if (SplitPos == std::string::npos || SplitPos == 0) + ZEN_CONSOLE("Uploading '{}' from '{}' to cloud endpoint '{}'. SessionId: '{}'. Namespace '{}', Bucket '{}', {}BuildId '{}'", + m_BuildPartName, + m_Path, + m_BuildsUrl, + Http.GetSessionId(), + m_Namespace, + m_Bucket, + GeneratedBuildId ? "Generated " : "", + BuildId); + CreateDirectories(m_Path / ZenTempStorageFolderName); + Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName); + StorageName = "Cloud DDC"; + } + else if (!m_StoragePath.empty()) + { + ZEN_CONSOLE("Uploading '{}' from '{}' to folder '{}'. {}BuildId '{}'", + m_BuildPartName, + m_Path, + m_StoragePath, + GeneratedBuildId ? "Generated " : "", + BuildId); + Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, m_WriteMetadataAsJson); // , .0015, 0.00004 + StorageName = fmt::format("Disk {}", m_StoragePath.stem()); + } + else + { + throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); + } + + CbObject MetaData; + if (m_CreateBuild) + { + if (!m_BuildMetadataPath.empty()) + { + std::filesystem::path MetadataPath(m_BuildMetadataPath); + IoBuffer MetaDataJson = ReadFile(MetadataPath).Flatten(); + std::string_view Json(reinterpret_cast<const char*>(MetaDataJson.GetData()), MetaDataJson.GetSize()); + std::string JsonError; + MetaData = LoadCompactBinaryFromJson(Json, JsonError).AsObject(); + if (!JsonError.empty()) { - throw std::runtime_error(fmt::format("build metadata key-value pair '{}' is malformed", Pair)); + throw std::runtime_error( + fmt::format("build metadata file '{}' is malformed. Reason: '{}'", m_BuildMetadataPath, JsonError)); } - MetaDataWriter.AddString(Pair.substr(0, SplitPos), Pair.substr(SplitPos + 1)); - return true; - }); - MetaData = MetaDataWriter.Save(); + } + if (!m_BuildMetadata.empty()) + { + CbObjectWriter MetaDataWriter(1024); + ForEachStrTok(m_BuildMetadata, ';', [&](std::string_view Pair) { + size_t SplitPos = Pair.find('='); + if (SplitPos == std::string::npos || SplitPos == 0) + { + throw std::runtime_error(fmt::format("build metadata key-value pair '{}' is malformed", Pair)); + } + MetaDataWriter.AddString(Pair.substr(0, SplitPos), Pair.substr(SplitPos + 1)); + return true; + }); + MetaData = MetaDataWriter.Save(); + } + } + + UploadFolder(*Storage, + BuildId, + BuildPartId, + m_BuildPartName, + m_Path, + m_ManifestPath, + m_BlockReuseMinPercentLimit, + m_AllowMultiparts, + MetaData, + m_CreateBuild, + m_Clean); + + if (false) + { + ZEN_CONSOLE( + "{}:\n" + "Read: {}\n" + "Write: {}\n" + "Requests: {}\n" + "Avg Request Time: {}\n" + "Avg I/O Time: {}", + StorageName, + NiceBytes(StorageStats.TotalBytesRead.load()), + NiceBytes(StorageStats.TotalBytesWritten.load()), + StorageStats.TotalRequestCount.load(), + StorageStats.TotalExecutionTimeUs.load() > 0 + ? NiceTimeSpanMs(StorageStats.TotalExecutionTimeUs.load() / 1000 / StorageStats.TotalRequestCount.load()) + : 0, + StorageStats.TotalRequestCount.load() > 0 + ? NiceTimeSpanMs(StorageStats.TotalRequestTimeUs.load() / 1000 / StorageStats.TotalRequestCount.load()) + : 0); } + return AbortFlag ? 11 : 0; } - bool Aborted = UploadFolder(*Storage, - BuildId, - BuildPartId, - m_BuildPartName, - m_Path, - m_ManifestPath, - m_BlockReuseMinPercentLimit, - m_AllowMultiparts, - MetaData, - m_CreateBuild, - m_Clean); - if (Aborted) - { - ZEN_CONSOLE("Upload failed."); - } - - if (false) - { - ZEN_CONSOLE( - "{}:\n" - "Read: {}\n" - "Write: {}\n" - "Requests: {}\n" - "Avg Request Time: {}\n" - "Avg I/O Time: {}", - StorageName, - NiceBytes(StorageStats.TotalBytesRead.load()), - NiceBytes(StorageStats.TotalBytesWritten.load()), - StorageStats.TotalRequestCount.load(), - StorageStats.TotalExecutionTimeUs.load() > 0 - ? NiceTimeSpanMs(StorageStats.TotalExecutionTimeUs.load() / 1000 / StorageStats.TotalRequestCount.load()) - : 0, - StorageStats.TotalRequestCount.load() > 0 - ? NiceTimeSpanMs(StorageStats.TotalRequestTimeUs.load() / 1000 / StorageStats.TotalRequestCount.load()) - : 0); - } - return Aborted ? 11 : 0; - } + if (SubOption == &m_DownloadOptions) + { + ParseStorageOptions(); + ParseAuthOptions(); - if (SubOption == &m_DownloadOptions) - { - ParseStorageOptions(); - ParseAuthOptions(); + HttpClient Http(m_BuildsUrl, ClientSettings); - HttpClient Http(m_BuildsUrl, ClientSettings); + if (m_Path.empty()) + { + throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_DownloadOptions.help())); + } + if (m_BuildId.empty()) + { + throw zen::OptionParseException(fmt::format("build-id is required\n{}", m_DownloadOptions.help())); + } + Oid BuildId = Oid::TryFromHexString(m_BuildId); + if (BuildId == Oid::Zero) + { + throw zen::OptionParseException(fmt::format("build-id is invalid\n{}", m_DownloadOptions.help())); + } - if (m_Path.empty()) - { - throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_DownloadOptions.help())); - } - if (m_BuildId.empty()) - { - throw zen::OptionParseException(fmt::format("build-id is required\n{}", m_DownloadOptions.help())); - } - Oid BuildId = Oid::TryFromHexString(m_BuildId); - if (BuildId == Oid::Zero) - { - throw zen::OptionParseException(fmt::format("build-id is invalid\n{}", m_DownloadOptions.help())); - } + if (!m_BuildPartName.empty() && !m_BuildPartId.empty()) + { + throw zen::OptionParseException(fmt::format("build-part-id conflicts with build-part-name\n{}", m_DownloadOptions.help())); + } - if (!m_BuildPartName.empty() && !m_BuildPartId.empty()) - { - throw zen::OptionParseException(fmt::format("build-part-id conflicts with build-part-name\n{}", m_DownloadOptions.help())); - } + std::vector<Oid> BuildPartIds; + for (const std::string& BuildPartId : m_BuildPartIds) + { + BuildPartIds.push_back(Oid::TryFromHexString(BuildPartId)); + if (BuildPartIds.back() == Oid::Zero) + { + throw zen::OptionParseException( + fmt::format("build-part-id '{}' is invalid\n{}", BuildPartId, m_DownloadOptions.help())); + } + } - std::vector<Oid> BuildPartIds; - for (const std::string& BuildPartId : m_BuildPartIds) - { - BuildPartIds.push_back(Oid::TryFromHexString(BuildPartId)); - if (BuildPartIds.back() == Oid::Zero) + BuildStorage::Statistics StorageStats; + std::unique_ptr<BuildStorage> Storage; + std::string StorageName; + if (!m_BuildsUrl.empty()) { - throw zen::OptionParseException(fmt::format("build-part-id '{}' is invalid\n{}", BuildPartId, m_DownloadOptions.help())); + ZEN_CONSOLE("Downloading '{}' to '{}' from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'", + BuildId, + m_Path, + m_BuildsUrl, + Http.GetSessionId(), + m_Namespace, + m_Bucket, + BuildId); + CreateDirectories(m_Path / ZenTempStorageFolderName); + Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName); + StorageName = "Cloud DDC"; + } + else if (!m_StoragePath.empty()) + { + ZEN_CONSOLE("Downloading '{}' to '{}' from folder {}. BuildId '{}'", BuildId, m_Path, m_StoragePath, BuildId); + Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false); // , .0015, 0.00004 + StorageName = fmt::format("Disk {}", m_StoragePath.stem()); + } + else + { + throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); } - } - BuildStorage::Statistics StorageStats; - std::unique_ptr<BuildStorage> Storage; - std::string StorageName; - if (!m_BuildsUrl.empty()) - { - ZEN_CONSOLE("Downloading '{}' to '{}' from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'", - BuildId, - m_Path, - m_BuildsUrl, - Http.GetSessionId(), - m_Namespace, - m_Bucket, - BuildId); - CreateDirectories(m_Path / ZenTempStorageFolderName); - Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName); - StorageName = "Cloud DDC"; - } - else if (!m_StoragePath.empty()) - { - ZEN_CONSOLE("Downloading '{}' to '{}' from folder {}. BuildId '{}'", BuildId, m_Path, m_StoragePath, BuildId); - Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false); // , .0015, 0.00004 - StorageName = fmt::format("Disk {}", m_StoragePath.stem()); - } - else - { - throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); - } + DownloadFolder(*Storage, BuildId, BuildPartIds, m_BuildPartNames, m_Path, m_AllowMultiparts, m_Clean); - bool Aborted = DownloadFolder(*Storage, BuildId, BuildPartIds, m_BuildPartNames, m_Path, m_AllowMultiparts, m_Clean); - if (Aborted) - { - ZEN_CONSOLE("Download failed."); - } - if (false) - { - ZEN_CONSOLE( - "{}:\n" - "Read: {}\n" - "Write: {}\n" - "Requests: {}\n" - "Avg Request Time: {}\n" - "Avg I/O Time: {}", - StorageName, - NiceBytes(StorageStats.TotalBytesRead.load()), - NiceBytes(StorageStats.TotalBytesWritten.load()), - StorageStats.TotalRequestCount.load(), - StorageStats.TotalExecutionTimeUs.load() > 0 - ? NiceTimeSpanMs(StorageStats.TotalExecutionTimeUs.load() / 1000 / StorageStats.TotalRequestCount.load()) - : 0, - StorageStats.TotalRequestCount.load() > 0 - ? NiceTimeSpanMs(StorageStats.TotalRequestTimeUs.load() / 1000 / StorageStats.TotalRequestCount.load()) - : 0); - } + if (false) + { + ZEN_CONSOLE( + "{}:\n" + "Read: {}\n" + "Write: {}\n" + "Requests: {}\n" + "Avg Request Time: {}\n" + "Avg I/O Time: {}", + StorageName, + NiceBytes(StorageStats.TotalBytesRead.load()), + NiceBytes(StorageStats.TotalBytesWritten.load()), + StorageStats.TotalRequestCount.load(), + StorageStats.TotalExecutionTimeUs.load() > 0 + ? NiceTimeSpanMs(StorageStats.TotalExecutionTimeUs.load() / 1000 / StorageStats.TotalRequestCount.load()) + : 0, + StorageStats.TotalRequestCount.load() > 0 + ? NiceTimeSpanMs(StorageStats.TotalRequestTimeUs.load() / 1000 / StorageStats.TotalRequestCount.load()) + : 0); + } - return Aborted ? 11 : 0; - } - if (SubOption == &m_DiffOptions) - { - if (m_Path.empty()) - { - throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_DownloadOptions.help())); + return AbortFlag ? 11 : 0; } - if (m_DiffPath.empty()) + if (SubOption == &m_DiffOptions) { - throw zen::OptionParseException(fmt::format("compare-path is required\n{}", m_DownloadOptions.help())); + if (m_Path.empty()) + { + throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_DownloadOptions.help())); + } + if (m_DiffPath.empty()) + { + throw zen::OptionParseException(fmt::format("compare-path is required\n{}", m_DownloadOptions.help())); + } + DiffFolders(m_Path, m_DiffPath, m_OnlyChunked); + return AbortFlag ? 11 : 0; } - bool Aborted = DiffFolders(m_Path, m_DiffPath, m_OnlyChunked); - return Aborted ? 11 : 0; - } - if (SubOption == &m_TestOptions) - { - ParseStorageOptions(); - ParseAuthOptions(); + if (SubOption == &m_TestOptions) + { + ParseStorageOptions(); + ParseAuthOptions(); - HttpClient Http(m_BuildsUrl, ClientSettings); + HttpClient Http(m_BuildsUrl, ClientSettings); - if (m_Path.empty()) - { - throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_DownloadOptions.help())); - } + if (m_Path.empty()) + { + throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_DownloadOptions.help())); + } - m_BuildId = Oid::NewOid().ToString(); - m_BuildPartName = m_Path.filename().string(); - m_BuildPartId = Oid::NewOid().ToString(); - m_CreateBuild = true; + m_BuildId = Oid::NewOid().ToString(); + m_BuildPartName = m_Path.filename().string(); + m_BuildPartId = Oid::NewOid().ToString(); + m_CreateBuild = true; - BuildStorage::Statistics StorageStats; - const Oid BuildId = Oid::FromHexString(m_BuildId); - const Oid BuildPartId = Oid::FromHexString(m_BuildPartId); - std::unique_ptr<BuildStorage> Storage; - std::string StorageName; + BuildStorage::Statistics StorageStats; + const Oid BuildId = Oid::FromHexString(m_BuildId); + const Oid BuildPartId = Oid::FromHexString(m_BuildPartId); + std::unique_ptr<BuildStorage> Storage; + std::string StorageName; - if (m_BuildsUrl.empty() && m_StoragePath.empty()) - { - m_StoragePath = GetRunningExecutablePath().parent_path() / ".tmpstore"; - CreateDirectories(m_StoragePath); - CleanDirectory(m_StoragePath); - } - auto _ = MakeGuard([&]() { if (m_BuildsUrl.empty() && m_StoragePath.empty()) { - DeleteDirectories(m_StoragePath); + m_StoragePath = GetRunningExecutablePath().parent_path() / ".tmpstore"; + CreateDirectories(m_StoragePath); + CleanDirectory(m_StoragePath); } - }); - - if (!m_BuildsUrl.empty()) - { - ZEN_CONSOLE("Using '{}' to '{}' from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'", - m_BuildPartName.empty() ? m_BuildPartId : m_BuildPartName, - m_Path, - m_BuildsUrl, - Http.GetSessionId(), - m_Namespace, - m_Bucket, - BuildId); - CreateDirectories(m_Path / ZenTempStorageFolderName); - Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName); - StorageName = "Cloud DDC"; - } - else if (!m_StoragePath.empty()) - { - ZEN_CONSOLE("Using '{}' to '{}' from folder {}. BuildId '{}'", - m_BuildPartName.empty() ? m_BuildPartId : m_BuildPartName, - m_Path, - m_StoragePath, - BuildId); - Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false); // , .0015, 0.00004 - StorageName = fmt::format("Disk {}", m_StoragePath.stem()); - } - else - { - throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); - } + auto _ = MakeGuard([&]() { + if (m_BuildsUrl.empty() && m_StoragePath.empty()) + { + DeleteDirectories(m_StoragePath); + } + }); - auto MakeMetaData = [](const Oid& BuildId) -> CbObject { - CbObjectWriter BuildMetaDataWriter; + if (!m_BuildsUrl.empty()) + { + ZEN_CONSOLE("Using '{}' to '{}' from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'", + m_BuildPartName.empty() ? m_BuildPartId : m_BuildPartName, + m_Path, + m_BuildsUrl, + Http.GetSessionId(), + m_Namespace, + m_Bucket, + BuildId); + CreateDirectories(m_Path / ZenTempStorageFolderName); + Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName); + StorageName = "Cloud DDC"; + } + else if (!m_StoragePath.empty()) + { + ZEN_CONSOLE("Using '{}' to '{}' from folder {}. BuildId '{}'", + m_BuildPartName.empty() ? m_BuildPartId : m_BuildPartName, + m_Path, + m_StoragePath, + BuildId); + Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false); // , .0015, 0.00004 + StorageName = fmt::format("Disk {}", m_StoragePath.stem()); + } + else { - const uint32_t CL = BuildId.OidBits[2]; - BuildMetaDataWriter.AddString("name", fmt::format("++Test+Main-CL-{}", CL)); - BuildMetaDataWriter.AddString("branch", "ZenTestBuild"); - BuildMetaDataWriter.AddString("baselineBranch", "ZenTestBuild"); - BuildMetaDataWriter.AddString("platform", "Windows"); - BuildMetaDataWriter.AddString("project", "Test"); - BuildMetaDataWriter.AddInteger("changelist", CL); - BuildMetaDataWriter.AddString("buildType", "test-folder"); + throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); } - return BuildMetaDataWriter.Save(); - }; - CbObject MetaData = MakeMetaData(Oid::TryFromHexString(m_BuildId)); - { - ExtendableStringBuilder<256> SB; - CompactBinaryToJson(MetaData, SB); - ZEN_CONSOLE("Upload Build {}, Part {} ({})\n{}", m_BuildId, BuildPartId, m_BuildPartName, SB.ToView()); - } - bool Aborted = UploadFolder(*Storage, - BuildId, - BuildPartId, - m_BuildPartName, - m_Path, - {}, - m_BlockReuseMinPercentLimit, - m_AllowMultiparts, - MetaData, - m_CreateBuild, - m_Clean); - if (Aborted) - { - ZEN_CONSOLE("Upload failed."); - return 11; - } + auto MakeMetaData = [](const Oid& BuildId) -> CbObject { + CbObjectWriter BuildMetaDataWriter; + { + const uint32_t CL = BuildId.OidBits[2]; + BuildMetaDataWriter.AddString("name", fmt::format("++Test+Main-CL-{}", CL)); + BuildMetaDataWriter.AddString("branch", "ZenTestBuild"); + BuildMetaDataWriter.AddString("baselineBranch", "ZenTestBuild"); + BuildMetaDataWriter.AddString("platform", "Windows"); + BuildMetaDataWriter.AddString("project", "Test"); + BuildMetaDataWriter.AddInteger("changelist", CL); + BuildMetaDataWriter.AddString("buildType", "test-folder"); + } + return BuildMetaDataWriter.Save(); + }; + CbObject MetaData = MakeMetaData(Oid::TryFromHexString(m_BuildId)); + { + ExtendableStringBuilder<256> SB; + CompactBinaryToJson(MetaData, SB); + ZEN_CONSOLE("Upload Build {}, Part {} ({})\n{}", m_BuildId, BuildPartId, m_BuildPartName, SB.ToView()); + } + + UploadFolder(*Storage, + BuildId, + BuildPartId, + m_BuildPartName, + m_Path, + {}, + m_BlockReuseMinPercentLimit, + m_AllowMultiparts, + MetaData, + m_CreateBuild, + m_Clean); + if (AbortFlag) + { + ZEN_CONSOLE("Upload failed."); + return 11; + } - const std::filesystem::path DownloadPath = m_Path.parent_path() / (m_BuildPartName + "_download"); - ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}'", BuildId, BuildPartId, m_BuildPartName, DownloadPath); - Aborted = DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, true); - if (Aborted) - { - ZEN_CONSOLE("Download failed."); - return 11; - } + const std::filesystem::path DownloadPath = m_Path.parent_path() / (m_BuildPartName + "_download"); + ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}'", BuildId, BuildPartId, m_BuildPartName, DownloadPath); + DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, true); + if (AbortFlag) + { + ZEN_CONSOLE("Download failed."); + return 11; + } - ZEN_CONSOLE("\nRe-download Build {}, Part {} ({}) to '{}' (identical target)", BuildId, BuildPartId, m_BuildPartName, DownloadPath); - Aborted = DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false); - if (Aborted) - { - ZEN_CONSOLE("Re-download failed. (identical target)"); - return 11; - } + ZEN_CONSOLE("\nRe-download Build {}, Part {} ({}) to '{}' (identical target)", + BuildId, + BuildPartId, + m_BuildPartName, + DownloadPath); + DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false); + if (AbortFlag) + { + ZEN_CONSOLE("Re-download failed. (identical target)"); + return 11; + } - auto ScrambleDir = [](const std::filesystem::path& Path) { - ZEN_CONSOLE("\nScrambling '{}'", Path); - Stopwatch Timer; - DirectoryContent DownloadContent; - GetDirectoryContent( - Path, - DirectoryContentFlags::Recursive | DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeFileSizes, - DownloadContent); - auto IsAcceptedFolder = [ExcludeFolders = DefaultExcludeFolders, Path](const std::filesystem::path& AbsolutePath) -> bool { - std::string RelativePath = std::filesystem::relative(AbsolutePath, Path).generic_string(); - for (const std::string_view& ExcludeFolder : ExcludeFolders) - { - if (RelativePath.starts_with(ExcludeFolder)) + auto ScrambleDir = [](const std::filesystem::path& Path) { + ZEN_CONSOLE("\nScrambling '{}'", Path); + Stopwatch Timer; + DirectoryContent DownloadContent; + GetDirectoryContent( + Path, + DirectoryContentFlags::Recursive | DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeFileSizes, + DownloadContent); + auto IsAcceptedFolder = [ExcludeFolders = DefaultExcludeFolders, Path](const std::filesystem::path& AbsolutePath) -> bool { + std::string RelativePath = std::filesystem::relative(AbsolutePath, Path).generic_string(); + for (const std::string_view& ExcludeFolder : ExcludeFolders) { - if (RelativePath.length() == ExcludeFolder.length()) + if (RelativePath.starts_with(ExcludeFolder)) { - return false; - } - else if (RelativePath[ExcludeFolder.length()] == '/') - { - return false; + if (RelativePath.length() == ExcludeFolder.length()) + { + return false; + } + else if (RelativePath[ExcludeFolder.length()] == '/') + { + return false; + } } } - } - return true; - }; + return true; + }; - std::atomic<bool> AbortFlag = false; - ParallellWork Work(AbortFlag); + ParallellWork Work(AbortFlag); - uint32_t Randomizer = 0; - auto FileSizeIt = DownloadContent.FileSizes.begin(); - for (const std::filesystem::path& FilePath : DownloadContent.Files) - { - if (IsAcceptedFolder(FilePath)) + uint32_t Randomizer = 0; + auto FileSizeIt = DownloadContent.FileSizes.begin(); + for (const std::filesystem::path& FilePath : DownloadContent.Files) { - uint32_t Case = (Randomizer++) % 7; - switch (Case) + if (IsAcceptedFolder(FilePath)) { - case 0: - { - uint64_t SourceSize = *FileSizeIt; - if (SourceSize > 0) + uint32_t Case = (Randomizer++) % 7; + switch (Case) + { + case 0: { - Work.ScheduleWork( - GetMediumWorkerPool(EWorkloadType::Burst), - [SourceSize, FilePath](std::atomic<bool>& AbortFlag) { - if (!AbortFlag) - { - IoBuffer Scrambled(SourceSize); - { - IoBuffer Source = IoBufferBuilder::MakeFromFile(FilePath); - Scrambled.GetMutableView().CopyFrom( - Source.GetView().Mid(SourceSize / 3, SourceSize / 3)); - Scrambled.GetMutableView() - .Mid(SourceSize / 3) - .CopyFrom(Source.GetView().Mid(0, SourceSize / 3)); - Scrambled.GetMutableView() - .Mid((SourceSize / 3) * 2) - .CopyFrom(Source.GetView().Mid(SourceSize / 2, SourceSize / 3)); - } - bool IsReadOnly = SetFileReadOnly(FilePath, false); - WriteFile(FilePath, Scrambled); - if (IsReadOnly) + uint64_t SourceSize = *FileSizeIt; + if (SourceSize > 0) + { + Work.ScheduleWork( + GetMediumWorkerPool(EWorkloadType::Burst), + [SourceSize, FilePath](std::atomic<bool>&) { + if (!AbortFlag) { - SetFileReadOnly(FilePath, true); + IoBuffer Scrambled(SourceSize); + { + IoBuffer Source = IoBufferBuilder::MakeFromFile(FilePath); + Scrambled.GetMutableView().CopyFrom( + Source.GetView().Mid(SourceSize / 3, SourceSize / 3)); + Scrambled.GetMutableView() + .Mid(SourceSize / 3) + .CopyFrom(Source.GetView().Mid(0, SourceSize / 3)); + Scrambled.GetMutableView() + .Mid((SourceSize / 3) * 2) + .CopyFrom(Source.GetView().Mid(SourceSize / 2, SourceSize / 3)); + } + bool IsReadOnly = SetFileReadOnly(FilePath, false); + WriteFile(FilePath, Scrambled); + if (IsReadOnly) + { + SetFileReadOnly(FilePath, true); + } } - } - }, - [FilePath](const std::exception& Ex, std::atomic<bool>& AbortFlag) { - ZEN_CONSOLE("Failed scrambling file {}. Reason: {}", FilePath, Ex.what()); - AbortFlag = true; - }); + }, + Work.DefaultErrorFunction()); + } } - } - break; - case 1: - std::filesystem::remove(FilePath); - break; - default: - break; + break; + case 1: + std::filesystem::remove(FilePath); + break; + default: + break; + } } + FileSizeIt++; } - FileSizeIt++; - } - Work.Wait(5000, [&](bool IsAborted, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(IsAborted); - ZEN_CONSOLE("Scrambling files, {} remaining", PendingWork); - }); - ZEN_ASSERT(!AbortFlag.load()); - ZEN_CONSOLE("Scrambled files in {}", NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - }; - - ScrambleDir(DownloadPath); - ZEN_CONSOLE("\nRe-download Build {}, Part {} ({}) to '{}' (scrambled target)", BuildId, BuildPartId, m_BuildPartName, DownloadPath); - Aborted = DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false); - if (Aborted) - { - ZEN_CONSOLE("Re-download failed. (scrambled target)"); - return 11; - } - - ScrambleDir(DownloadPath); - - Oid BuildId2 = Oid::NewOid(); - Oid BuildPartId2 = Oid::NewOid(); + Work.Wait(5000, [&](bool IsAborted, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(IsAborted); + ZEN_CONSOLE("Scrambling files, {} remaining", PendingWork); + }); + ZEN_ASSERT(!AbortFlag.load()); + ZEN_CONSOLE("Scrambled files in {}", NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + }; - CbObject MetaData2 = MakeMetaData(BuildId2); - { - ExtendableStringBuilder<256> SB; - CompactBinaryToJson(MetaData, SB); - ZEN_CONSOLE("\nUpload scrambled Build {}, Part {} ({})\n{}\n", BuildId2, BuildPartId2, m_BuildPartName, SB.ToView()); - } + ScrambleDir(DownloadPath); + ZEN_CONSOLE("\nRe-download Build {}, Part {} ({}) to '{}' (scrambled target)", + BuildId, + BuildPartId, + m_BuildPartName, + DownloadPath); + DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false); + if (AbortFlag) + { + ZEN_CONSOLE("Re-download failed. (scrambled target)"); + return 11; + } - Aborted = UploadFolder(*Storage, - BuildId2, - BuildPartId2, - m_BuildPartName, - DownloadPath, - {}, - m_BlockReuseMinPercentLimit, - m_AllowMultiparts, - MetaData2, - true, - false); - if (Aborted) - { - ZEN_CONSOLE("Upload of scrambled failed."); - return 11; - } + ScrambleDir(DownloadPath); - ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (original)", BuildId, BuildPartId, m_BuildPartName, DownloadPath); - Aborted = DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false); - if (Aborted) - { - ZEN_CONSOLE("Re-download failed."); - return 11; - } + Oid BuildId2 = Oid::NewOid(); + Oid BuildPartId2 = Oid::NewOid(); - ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath); - Aborted = DownloadFolder(*Storage, BuildId2, {BuildPartId2}, {}, DownloadPath, m_AllowMultiparts, false); - if (Aborted) - { - ZEN_CONSOLE("Re-download failed."); - return 11; - } + CbObject MetaData2 = MakeMetaData(BuildId2); + { + ExtendableStringBuilder<256> SB; + CompactBinaryToJson(MetaData, SB); + ZEN_CONSOLE("\nUpload scrambled Build {}, Part {} ({})\n{}\n", BuildId2, BuildPartId2, m_BuildPartName, SB.ToView()); + } - ZEN_CONSOLE("\nRe-download Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath); - Aborted = DownloadFolder(*Storage, BuildId2, {BuildPartId2}, {}, DownloadPath, m_AllowMultiparts, false); - if (Aborted) - { - ZEN_CONSOLE("Re-download failed."); - return 11; - } + UploadFolder(*Storage, + BuildId2, + BuildPartId2, + m_BuildPartName, + DownloadPath, + {}, + m_BlockReuseMinPercentLimit, + m_AllowMultiparts, + MetaData2, + true, + false); + if (AbortFlag) + { + ZEN_CONSOLE("Upload of scrambled failed."); + return 11; + } - return 0; - } + ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (original)", BuildId, BuildPartId, m_BuildPartName, DownloadPath); + DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false); + if (AbortFlag) + { + ZEN_CONSOLE("Re-download failed."); + return 11; + } - if (SubOption == &m_FetchBlobOptions) - { - ParseStorageOptions(); - ParseAuthOptions(); + ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath); + DownloadFolder(*Storage, BuildId2, {BuildPartId2}, {}, DownloadPath, m_AllowMultiparts, false); + if (AbortFlag) + { + ZEN_CONSOLE("Re-download failed."); + return 11; + } - HttpClient Http(m_BuildsUrl, ClientSettings); + ZEN_CONSOLE("\nRe-download Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath); + DownloadFolder(*Storage, BuildId2, {BuildPartId2}, {}, DownloadPath, m_AllowMultiparts, false); + if (AbortFlag) + { + ZEN_CONSOLE("Re-download failed."); + return 11; + } - if (m_BlobHash.empty()) - { - throw zen::OptionParseException(fmt::format("Blob hash string is missing\n{}", m_UploadOptions.help())); + return 0; } - IoHash BlobHash; - if (!IoHash::TryParse(m_BlobHash, BlobHash)) + if (SubOption == &m_FetchBlobOptions) { - throw zen::OptionParseException(fmt::format("Blob hash string is invalid\n{}", m_UploadOptions.help())); - } + ParseStorageOptions(); + ParseAuthOptions(); - if (m_BuildsUrl.empty() && m_StoragePath.empty()) - { - throw zen::OptionParseException(fmt::format("At least one storage option is required\n{}", m_UploadOptions.help())); - } + HttpClient Http(m_BuildsUrl, ClientSettings); - BuildStorage::Statistics StorageStats; - const Oid BuildId = Oid::FromHexString(m_BuildId); - std::unique_ptr<BuildStorage> Storage; - std::string StorageName; + if (m_BlobHash.empty()) + { + throw zen::OptionParseException(fmt::format("Blob hash string is missing\n{}", m_UploadOptions.help())); + } - if (!m_BuildsUrl.empty()) - { - ZEN_CONSOLE("Using from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'", - m_BuildsUrl, - Http.GetSessionId(), - m_Namespace, - m_Bucket, - BuildId); - CreateDirectories(m_Path / ZenTempStorageFolderName); - Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName); - StorageName = "Cloud DDC"; - } - else if (!m_StoragePath.empty()) - { - ZEN_CONSOLE("Using folder {}. BuildId '{}'", m_StoragePath, BuildId); - Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false); // , .0015, 0.00004 - StorageName = fmt::format("Disk {}", m_StoragePath.stem()); - } - else - { - throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); - } + IoHash BlobHash; + if (!IoHash::TryParse(m_BlobHash, BlobHash)) + { + throw zen::OptionParseException(fmt::format("Blob hash string is invalid\n{}", m_UploadOptions.help())); + } - uint64_t CompressedSize; - uint64_t DecompressedSize; - ValidateBlob(*Storage, BuildId, BlobHash, CompressedSize, DecompressedSize); - ZEN_CONSOLE("Blob '{}' has a compressed size {} and a decompressed size of {} bytes", BlobHash, CompressedSize, DecompressedSize); - return 0; - } + if (m_BuildsUrl.empty() && m_StoragePath.empty()) + { + throw zen::OptionParseException(fmt::format("At least one storage option is required\n{}", m_UploadOptions.help())); + } - if (SubOption == &m_ValidateBuildPartOptions) - { - ParseStorageOptions(); - ParseAuthOptions(); + BuildStorage::Statistics StorageStats; + const Oid BuildId = Oid::FromHexString(m_BuildId); + std::unique_ptr<BuildStorage> Storage; + std::string StorageName; - HttpClient Http(m_BuildsUrl, ClientSettings); + if (!m_BuildsUrl.empty()) + { + ZEN_CONSOLE("Using from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'", + m_BuildsUrl, + Http.GetSessionId(), + m_Namespace, + m_Bucket, + BuildId); + CreateDirectories(m_Path / ZenTempStorageFolderName); + Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName); + StorageName = "Cloud DDC"; + } + else if (!m_StoragePath.empty()) + { + ZEN_CONSOLE("Using folder {}. BuildId '{}'", m_StoragePath, BuildId); + Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false); // , .0015, 0.00004 + StorageName = fmt::format("Disk {}", m_StoragePath.stem()); + } + else + { + throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); + } - if (m_BuildsUrl.empty() && m_StoragePath.empty()) - { - throw zen::OptionParseException(fmt::format("At least one storage option is required\n{}", m_UploadOptions.help())); + uint64_t CompressedSize; + uint64_t DecompressedSize; + ValidateBlob(*Storage, BuildId, BlobHash, CompressedSize, DecompressedSize); + if (AbortFlag) + { + return 11; + } + ZEN_CONSOLE("Blob '{}' has a compressed size {} and a decompressed size of {} bytes", + BlobHash, + CompressedSize, + DecompressedSize); + return 0; } - if (m_BuildId.empty()) - { - throw zen::OptionParseException(fmt::format("build-id is required\n{}", m_DownloadOptions.help())); - } - Oid BuildId = Oid::TryFromHexString(m_BuildId); - if (BuildId == Oid::Zero) + if (SubOption == &m_ValidateBuildPartOptions) { - throw zen::OptionParseException(fmt::format("build-id is invalid\n{}", m_DownloadOptions.help())); - } + ParseStorageOptions(); + ParseAuthOptions(); - if (!m_BuildPartName.empty() && !m_BuildPartId.empty()) - { - throw zen::OptionParseException(fmt::format("build-part-id conflicts with build-part-name\n{}", m_DownloadOptions.help())); - } + HttpClient Http(m_BuildsUrl, ClientSettings); - BuildStorage::Statistics StorageStats; - std::unique_ptr<BuildStorage> Storage; - std::string StorageName; + if (m_BuildsUrl.empty() && m_StoragePath.empty()) + { + throw zen::OptionParseException(fmt::format("At least one storage option is required\n{}", m_UploadOptions.help())); + } - if (!m_BuildsUrl.empty()) - { - ZEN_CONSOLE("Using from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'", - m_BuildsUrl, - Http.GetSessionId(), - m_Namespace, - m_Bucket, - BuildId); - CreateDirectories(m_Path / ZenTempStorageFolderName); - Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName); - StorageName = "Cloud DDC"; - } - else if (!m_StoragePath.empty()) - { - ZEN_CONSOLE("Using folder {}. BuildId '{}'", m_StoragePath, BuildId); - Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false); // , .0015, 0.00004 - StorageName = fmt::format("Disk {}", m_StoragePath.stem()); - } - else - { - throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); - } - Oid BuildPartId = Oid::TryFromHexString(m_BuildPartId); - CbObject Build = Storage->GetBuild(BuildId); - if (!m_BuildPartName.empty()) - { - BuildPartId = Build["parts"sv].AsObjectView()[m_BuildPartName].AsObjectId(); - if (BuildPartId == Oid::Zero) + if (m_BuildId.empty()) { - throw std::runtime_error(fmt::format("Build {} does not have a part named '{}'", m_BuildId, m_BuildPartName)); + throw zen::OptionParseException(fmt::format("build-id is required\n{}", m_DownloadOptions.help())); + } + Oid BuildId = Oid::TryFromHexString(m_BuildId); + if (BuildId == Oid::Zero) + { + throw zen::OptionParseException(fmt::format("build-id is invalid\n{}", m_DownloadOptions.help())); } - } - CbObject BuildPart = Storage->GetBuildPart(BuildId, BuildPartId); - ZEN_CONSOLE("Validating build part {}/{} ({})", BuildId, BuildPartId, NiceBytes(BuildPart.GetSize())); - std::vector<IoHash> ChunkAttachments; - for (CbFieldView LooseFileView : BuildPart["chunkAttachments"sv].AsObjectView()["rawHashes"sv]) - { - ChunkAttachments.push_back(LooseFileView.AsBinaryAttachment()); - } - std::vector<IoHash> BlockAttachments; - for (CbFieldView BlocksView : BuildPart["blockAttachments"sv].AsObjectView()["rawHashes"sv]) - { - BlockAttachments.push_back(BlocksView.AsBinaryAttachment()); - } - for (const IoHash& ChunkAttachment : ChunkAttachments) - { - uint64_t CompressedSize; - uint64_t DecompressedSize; - try + if (!m_BuildPartName.empty() && !m_BuildPartId.empty()) { - ValidateBlob(*Storage, BuildId, ChunkAttachment, CompressedSize, DecompressedSize); - ZEN_CONSOLE("Chunk attachment {} ({} -> {}) is valid", - ChunkAttachment, - NiceBytes(CompressedSize), - NiceBytes(DecompressedSize)); + throw zen::OptionParseException(fmt::format("build-part-id conflicts with build-part-name\n{}", m_DownloadOptions.help())); } - catch (const std::exception& Ex) + + BuildStorage::Statistics StorageStats; + std::unique_ptr<BuildStorage> Storage; + std::string StorageName; + + if (!m_BuildsUrl.empty()) { - ZEN_CONSOLE("Failed validating chunk attachment {}: {}", ChunkAttachment, Ex.what()); + ZEN_CONSOLE("Using from cloud endpoint {}. SessionId: '{}'. Namespace '{}', Bucket '{}', BuildId '{}'", + m_BuildsUrl, + Http.GetSessionId(), + m_Namespace, + m_Bucket, + BuildId); + CreateDirectories(m_Path / ZenTempStorageFolderName); + Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName); + StorageName = "Cloud DDC"; + } + else if (!m_StoragePath.empty()) + { + ZEN_CONSOLE("Using folder {}. BuildId '{}'", m_StoragePath, BuildId); + Storage = CreateFileBuildStorage(m_StoragePath, StorageStats, false); // , .0015, 0.00004 + StorageName = fmt::format("Disk {}", m_StoragePath.stem()); + } + else + { + throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); + } + Oid BuildPartId = Oid::TryFromHexString(m_BuildPartId); + CbObject Build = Storage->GetBuild(BuildId); + if (!m_BuildPartName.empty()) + { + BuildPartId = Build["parts"sv].AsObjectView()[m_BuildPartName].AsObjectId(); + if (BuildPartId == Oid::Zero) + { + throw std::runtime_error(fmt::format("Build {} does not have a part named '{}'", m_BuildId, m_BuildPartName)); + } + } + CbObject BuildPart = Storage->GetBuildPart(BuildId, BuildPartId); + ZEN_CONSOLE("Validating build part {}/{} ({})", BuildId, BuildPartId, NiceBytes(BuildPart.GetSize())); + std::vector<IoHash> ChunkAttachments; + for (CbFieldView LooseFileView : BuildPart["chunkAttachments"sv].AsObjectView()["rawHashes"sv]) + { + ChunkAttachments.push_back(LooseFileView.AsBinaryAttachment()); + } + std::vector<IoHash> BlockAttachments; + for (CbFieldView BlocksView : BuildPart["blockAttachments"sv].AsObjectView()["rawHashes"sv]) + { + BlockAttachments.push_back(BlocksView.AsBinaryAttachment()); } - } - for (const IoHash& BlockAttachment : BlockAttachments) - { - uint64_t CompressedSize; - uint64_t DecompressedSize; - try + for (const IoHash& ChunkAttachment : ChunkAttachments) { - ValidateChunkBlock(*Storage, BuildId, BlockAttachment, CompressedSize, DecompressedSize); - ZEN_CONSOLE("Block attachment {} ({} -> {}) is valid", - BlockAttachment, - NiceBytes(CompressedSize), - NiceBytes(DecompressedSize)); + uint64_t CompressedSize; + uint64_t DecompressedSize; + try + { + ValidateBlob(*Storage, BuildId, ChunkAttachment, CompressedSize, DecompressedSize); + ZEN_CONSOLE("Chunk attachment {} ({} -> {}) is valid", + ChunkAttachment, + NiceBytes(CompressedSize), + NiceBytes(DecompressedSize)); + } + catch (const std::exception& Ex) + { + ZEN_CONSOLE("Failed validating chunk attachment {}: {}", ChunkAttachment, Ex.what()); + } } - catch (const std::exception& Ex) + + for (const IoHash& BlockAttachment : BlockAttachments) { - ZEN_CONSOLE("Failed validating block attachment {}: {}", BlockAttachment, Ex.what()); + uint64_t CompressedSize; + uint64_t DecompressedSize; + try + { + ValidateChunkBlock(*Storage, BuildId, BlockAttachment, CompressedSize, DecompressedSize); + ZEN_CONSOLE("Block attachment {} ({} -> {}) is valid", + BlockAttachment, + NiceBytes(CompressedSize), + NiceBytes(DecompressedSize)); + } + catch (const std::exception& Ex) + { + ZEN_CONSOLE("Failed validating block attachment {}: {}", BlockAttachment, Ex.what()); + } } - } - return 0; + return AbortFlag ? 13 : 0; + } + } + catch (const std::exception& Ex) + { + ZEN_ERROR("{}", Ex.what()); + return 3; } - ZEN_ASSERT(false); } diff --git a/src/zen/zen.cpp b/src/zen/zen.cpp index 2e230ed53..bee1fd676 100644 --- a/src/zen/zen.cpp +++ b/src/zen/zen.cpp @@ -264,7 +264,7 @@ ProgressBar::~ProgressBar() { try { - Finish(); + ForceLinebreak(); } catch (const std::exception& Ex) { diff --git a/src/zenutil/chunkedcontent.cpp b/src/zenutil/chunkedcontent.cpp index a41b71972..6dc2a20d8 100644 --- a/src/zenutil/chunkedcontent.cpp +++ b/src/zenutil/chunkedcontent.cpp @@ -92,7 +92,8 @@ namespace { tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& RawHashToSequenceRawHashIndex, RwLock& Lock, const std::filesystem::path& FolderPath, - uint32_t PathIndex) + uint32_t PathIndex, + std::atomic<bool>& AbortFlag) { const uint64_t RawSize = OutChunkedContent.RawSizes[PathIndex]; const std::filesystem::path& Path = OutChunkedContent.Paths[PathIndex]; @@ -105,7 +106,7 @@ namespace { { ChunkedInfoWithSource Chunked; const bool DidChunking = - InChunkingController.ProcessFile((FolderPath / Path).make_preferred(), RawSize, Chunked, Stats.BytesHashed); + InChunkingController.ProcessFile((FolderPath / Path).make_preferred(), RawSize, Chunked, Stats.BytesHashed, AbortFlag); if (DidChunking) { Lock.WithExclusiveLock([&]() { @@ -753,15 +754,13 @@ ChunkFolderContent(ChunkingStatistics& Stats, RawHashToSequenceRawHashIndex, Lock, RootPath, - PathIndex); + PathIndex, + AbortFlag); Lock.WithExclusiveLock([&]() { Result.RawHashes[PathIndex] = RawHash; }); Stats.FilesProcessed++; } }, - [&, PathIndex](const std::exception& Ex, std::atomic<bool>& AbortFlag) { - ZEN_CONSOLE("Failed scanning file {}. Reason: {}", Result.Paths[PathIndex], Ex.what()); - AbortFlag = true; - }); + Work.DefaultErrorFunction()); } Work.Wait(UpdateInteralMS, [&](bool IsAborted, std::ptrdiff_t PendingWork) { diff --git a/src/zenutil/chunkedfile.cpp b/src/zenutil/chunkedfile.cpp index 3f3a6661c..4f9344039 100644 --- a/src/zenutil/chunkedfile.cpp +++ b/src/zenutil/chunkedfile.cpp @@ -112,7 +112,12 @@ Reconstruct(const ChunkedInfo& Info, const std::filesystem::path& TargetPath, st } ChunkedInfoWithSource -ChunkData(BasicFile& RawData, uint64_t Offset, uint64_t Size, ChunkedParams Params, std::atomic<uint64_t>* BytesProcessed) +ChunkData(BasicFile& RawData, + uint64_t Offset, + uint64_t Size, + ChunkedParams Params, + std::atomic<uint64_t>* BytesProcessed, + std::atomic<bool>* AbortFlag) { ChunkedInfoWithSource Result; tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> FoundChunks; @@ -129,6 +134,10 @@ ChunkData(BasicFile& RawData, uint64_t Offset, uint64_t Size, ChunkedParams Para IoHashStream RawHashStream; while (Offset < End) { + if (AbortFlag != nullptr && AbortFlag->load()) + { + return {}; + } size_t ScanLength = Chunker.ScanChunk(SliceView.GetData(), SliceSize); if (ScanLength == ZenChunkHelper::kNoBoundaryFound) { diff --git a/src/zenutil/chunkingcontroller.cpp b/src/zenutil/chunkingcontroller.cpp index bc0e57b14..017d12433 100644 --- a/src/zenutil/chunkingcontroller.cpp +++ b/src/zenutil/chunkingcontroller.cpp @@ -58,7 +58,8 @@ public: virtual bool ProcessFile(const std::filesystem::path& InputPath, uint64_t RawSize, ChunkedInfoWithSource& OutChunked, - std::atomic<uint64_t>& BytesProcessed) const override + std::atomic<uint64_t>& BytesProcessed, + std::atomic<bool>& AbortFlag) const override { const bool ExcludeFromChunking = std::find(m_ChunkExcludeExtensions.begin(), m_ChunkExcludeExtensions.end(), InputPath.extension()) != @@ -70,7 +71,7 @@ public: } BasicFile Buffer(InputPath, BasicFile::Mode::kRead); - OutChunked = ChunkData(Buffer, 0, RawSize, m_ChunkingParams, &BytesProcessed); + OutChunked = ChunkData(Buffer, 0, RawSize, m_ChunkingParams, &BytesProcessed, &AbortFlag); return true; } @@ -132,7 +133,8 @@ public: virtual bool ProcessFile(const std::filesystem::path& InputPath, uint64_t RawSize, ChunkedInfoWithSource& OutChunked, - std::atomic<uint64_t>& BytesProcessed) const override + std::atomic<uint64_t>& BytesProcessed, + std::atomic<bool>& AbortFlag) const override { if (RawSize < m_ChunkFileSizeLimit) { @@ -150,6 +152,10 @@ public: ChunkHashToChunkIndex.reserve(1 + (RawSize / m_FixedChunkingChunkSize)); while (Offset < RawSize) { + if (AbortFlag) + { + return false; + } uint64_t ChunkSize = std::min<uint64_t>(RawSize - Offset, m_FixedChunkingChunkSize); IoBuffer Chunk(Source, Offset, ChunkSize); MemoryView ChunkData = Chunk.GetView(); diff --git a/src/zenutil/include/zenutil/chunkedfile.h b/src/zenutil/include/zenutil/chunkedfile.h index 7110ad317..4cec80fdb 100644 --- a/src/zenutil/include/zenutil/chunkedfile.h +++ b/src/zenutil/include/zenutil/chunkedfile.h @@ -47,7 +47,8 @@ ChunkedInfoWithSource ChunkData(BasicFile& RawData, uint64_t Offset, uint64_t Size, ChunkedParams Params = {}, - std::atomic<uint64_t>* BytesProcessed = nullptr); + std::atomic<uint64_t>* BytesProcessed = nullptr, + std::atomic<bool>* AbortFlag = nullptr); void Reconstruct(const ChunkedInfo& Info, const std::filesystem::path& TargetPath, std::function<IoBuffer(const IoHash& ChunkHash)> GetChunk); diff --git a/src/zenutil/include/zenutil/chunkingcontroller.h b/src/zenutil/include/zenutil/chunkingcontroller.h index fe4fc1bb5..ebc80e207 100644 --- a/src/zenutil/include/zenutil/chunkingcontroller.h +++ b/src/zenutil/include/zenutil/chunkingcontroller.h @@ -32,9 +32,10 @@ public: virtual bool ProcessFile(const std::filesystem::path& InputPath, uint64_t RawSize, ChunkedInfoWithSource& OutChunked, - std::atomic<uint64_t>& BytesProcessed) const = 0; - virtual std::string_view GetName() const = 0; - virtual CbObject GetParameters() const = 0; + std::atomic<uint64_t>& BytesProcessed, + std::atomic<bool>& AbortFlag) const = 0; + virtual std::string_view GetName() const = 0; + virtual CbObject GetParameters() const = 0; }; std::unique_ptr<ChunkingController> CreateBasicChunkingController( diff --git a/src/zenutil/include/zenutil/parallellwork.h b/src/zenutil/include/zenutil/parallellwork.h index 7a8218c51..79798fc8d 100644 --- a/src/zenutil/include/zenutil/parallellwork.h +++ b/src/zenutil/include/zenutil/parallellwork.h @@ -2,6 +2,8 @@ #pragma once +#include <zencore/except.h> +#include <zencore/fmtutils.h> #include <zencore/thread.h> #include <zencore/workthreadpool.h> @@ -20,6 +22,14 @@ public: ZEN_ASSERT(m_PendingWork.Remaining() == 0); } + std::function<void(const std::exception& Ex, std::atomic<bool>& AbortFlag)> DefaultErrorFunction() + { + return [&](const std::exception& Ex, std::atomic<bool>& AbortFlag) { + m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex.what()); }); + AbortFlag = true; + }; + } + void ScheduleWork(WorkerThreadPool& WorkerPool, std::function<void(std::atomic<bool>& AbortFlag)>&& Work, std::function<void(const std::exception& Ex, std::atomic<bool>& AbortFlag)>&& OnError) @@ -32,6 +42,27 @@ public: { Work(m_AbortFlag); } + catch (const AssertException& AssertEx) + { + OnError( + std::runtime_error(fmt::format("Caught assert exception while handling request: {}", AssertEx.FullDescription())), + m_AbortFlag); + } + catch (const std::system_error& SystemError) + { + if (IsOOM(SystemError.code())) + { + OnError(std::runtime_error(fmt::format("Out of memory. Reason: {}", SystemError.what())), m_AbortFlag); + } + else if (IsOOD(SystemError.code())) + { + OnError(std::runtime_error(fmt::format("Out of disk. Reason: {}", SystemError.what())), m_AbortFlag); + } + else + { + OnError(std::runtime_error(fmt::format("System error. Reason: {}", SystemError.what())), m_AbortFlag); + } + } catch (const std::exception& Ex) { OnError(Ex, m_AbortFlag); @@ -58,12 +89,29 @@ public: { UpdateCallback(m_AbortFlag.load(), m_PendingWork.Remaining()); } + if (m_Errors.size() == 1) + { + throw std::runtime_error(m_Errors.front()); + } + else if (m_Errors.size() > 1) + { + ExtendableStringBuilder<128> SB; + SB.Append("Multiple errors:"); + for (const std::string& Error : m_Errors) + { + SB.Append(fmt::format("\n {}", Error)); + } + throw std::runtime_error(SB.ToString()); + } } Latch& PendingWork() { return m_PendingWork; } private: std::atomic<bool>& m_AbortFlag; Latch m_PendingWork; + + RwLock m_ErrorLock; + std::vector<std::string> m_Errors; }; } // namespace zen |