diff options
| author | Dan Engelbrecht <[email protected]> | 2025-03-03 17:53:11 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-03-03 17:53:11 +0100 |
| commit | 1270bfeffbc81b1e4940c5c454ee6acde43e696a (patch) | |
| tree | 9ff53df6b43f2806fb5701b4d10ad37696a1c203 /src | |
| parent | builds download incremental (#290) (diff) | |
| download | zen-1270bfeffbc81b1e4940c5c454ee6acde43e696a.tar.xz zen-1270bfeffbc81b1e4940c5c454ee6acde43e696a.zip | |
refactor use chunk sequence download (#291)
* work on chunk sequences on download, not paths
* write chunksequences to .tmp file and move when complete
* cleanup
* Added on the fly validation `zen builds download` of files built from smaller chunks as each file is completed
Added `--verify` option to `zen builds upload` to verify all uploaded data once entire upload is complete
Added `--verify` option to `zen builds download` to verify all files in target folder once entire download is complete
Fixed/improved progress updated
Multithreaded part validation
* added rates to Write Chunks task
* b/s -> bits/s
* dont validate partial content as complete payload
* handle legacy c# builds
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 1324 | ||||
| -rw-r--r-- | src/zen/cmds/builds_cmd.h | 2 | ||||
| -rw-r--r-- | src/zenhttp/httpclient.cpp | 5 | ||||
| -rw-r--r-- | src/zenutil/chunkedcontent.cpp | 78 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/chunkedcontent.h | 53 |
5 files changed, 910 insertions, 552 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index 28c794559..219d01240 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -286,7 +286,7 @@ namespace { } } - uint64_t GetCurrent() const + uint64_t GetCurrent() const // If Stopped - return total count / total time { if (LastTimeUS == (uint64_t)-1) { @@ -330,6 +330,16 @@ namespace { return Count * 1000000 / ElapsedWallTimeUS; } + std::filesystem::path GetTempChunkedSequenceFileName(const std::filesystem::path& CacheFolderPath, const IoHash& RawHash) + { + return (CacheFolderPath / (RawHash.ToHexString() + ".tmp")).make_preferred(); + } + + std::filesystem::path GetFinalChunkedSequenceFileName(const std::filesystem::path& CacheFolderPath, const IoHash& RawHash) + { + return (CacheFolderPath / RawHash.ToHexString()).make_preferred(); + } + ChunkedFolderContent ScanAndChunkFolder( GetFolderContentStatistics& GetFolderContentStats, ChunkingStatistics& ChunkingStats, @@ -364,15 +374,16 @@ namespace { UsePlainProgress ? 5000 : 200, [&](bool, std::ptrdiff_t) { FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load()); + std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found", + ChunkingStats.FilesProcessed.load(), + GetFolderContentStats.AcceptedFileCount.load(), + NiceBytes(ChunkingStats.BytesHashed.load()), + NiceBytes(GetFolderContentStats.FoundFileByteCount), + NiceNum(FilteredBytesHashed.GetCurrent()), + ChunkingStats.UniqueChunksFound.load(), + NiceBytes(ChunkingStats.UniqueBytesFound.load())); ProgressBar.UpdateState({.Task = "Scanning files ", - .Details = fmt::format("{}/{} ({}/{}, {}B/s) files, {} ({}) chunks found", - ChunkingStats.FilesProcessed.load(), - GetFolderContentStats.AcceptedFileCount.load(), - NiceBytes(ChunkingStats.BytesHashed.load()), - NiceBytes(GetFolderContentStats.FoundFileByteCount), - NiceNum(FilteredBytesHashed.GetCurrent()), - ChunkingStats.UniqueChunksFound.load(), - NiceBytes(ChunkingStats.UniqueBytesFound.load())), + .Details = Details, .TotalCount = GetFolderContentStats.AcceptedFileByteCount, .RemainingCount = GetFolderContentStats.AcceptedFileByteCount - ChunkingStats.BytesHashed.load()}, false); @@ -766,9 +777,19 @@ namespace { } } - if (FilesObject["chunkcounts"sv]) + if (CbObjectView ChunkContentView = BuildPartManifest["chunkedContent"sv].AsObjectView(); ChunkContentView) + { + compactbinary_helpers::ReadArray("sequenceRawHashes"sv, ChunkContentView, OutSequenceRawHashes); + compactbinary_helpers::ReadArray("chunkcounts"sv, ChunkContentView, OutChunkCounts); + if (OutChunkCounts.size() != OutSequenceRawHashes.size()) + { + throw std::runtime_error(fmt::format("Number of chunk count entries does not match number of paths")); + } + compactbinary_helpers::ReadArray("chunkorders"sv, ChunkContentView, OutAbsoluteChunkOrders); + } + else if (FilesObject["chunkcounts"sv]) { - // Legacy style + // Legacy zen style std::vector<uint32_t> LegacyChunkCounts; compactbinary_helpers::ReadArray("chunkcounts"sv, FilesObject, LegacyChunkCounts); @@ -805,15 +826,29 @@ namespace { OrderIndexOffset += LegacyChunkCounts[PathIndex]; } } - if (CbObjectView ChunkContentView = BuildPartManifest["chunkedContent"sv].AsObjectView(); ChunkContentView) + else { - compactbinary_helpers::ReadArray("sequenceRawHashes"sv, ChunkContentView, OutSequenceRawHashes); - compactbinary_helpers::ReadArray("chunkcounts"sv, ChunkContentView, OutChunkCounts); - if (OutChunkCounts.size() != OutSequenceRawHashes.size()) + // Legacy C# style + + tsl::robin_set<IoHash, IoHash::Hasher> FoundRawHashes; + FoundRawHashes.reserve(PathCount); + uint32_t OrderIndexOffset = 0; + for (uint32_t PathIndex = 0; PathIndex < OutPaths.size(); PathIndex++) { - throw std::runtime_error(fmt::format("Number of chunk count entries does not match number of paths")); + if (OutRawSizes[PathIndex] > 0) + { + const IoHash& PathRawHash = OutRawHashes[PathIndex]; + if (FoundRawHashes.insert(PathRawHash).second) + { + OutSequenceRawHashes.push_back(PathRawHash); + OutChunkCounts.push_back(1); + OutAbsoluteChunkOrders.push_back(OrderIndexOffset); + OutLooseChunkHashes.push_back(PathRawHash); + OutLooseChunkRawSizes.push_back(OutRawSizes[PathIndex]); + OrderIndexOffset += 1; + } + } } - compactbinary_helpers::ReadArray("chunkorders"sv, ChunkContentView, OutAbsoluteChunkOrders); } CbObjectView ChunkAttachmentsView = BuildPartManifest["chunkAttachments"sv].AsObjectView(); @@ -963,14 +998,14 @@ namespace { { public: // A buffered file reader that provides CompositeBuffer where the buffers are owned and the memory never overwritten - ReadFileCache(DiskStatistics& DiskStats, - const std::filesystem::path& Path, - const std::vector<std::filesystem::path>& Paths, - const std::vector<uint64_t>& RawSizes, - size_t MaxOpenFileCount) + ReadFileCache(DiskStatistics& DiskStats, + const std::filesystem::path& Path, + const ChunkedFolderContent& LocalContent, + const ChunkedContentLookup& LocalLookup, + size_t MaxOpenFileCount) : m_Path(Path) - , m_Paths(Paths) - , m_RawSizes(RawSizes) + , m_LocalContent(LocalContent) + , m_LocalLookup(LocalLookup) , m_DiskStats(DiskStats) { m_OpenFiles.reserve(MaxOpenFileCount); @@ -981,26 +1016,28 @@ namespace { m_OpenFiles.clear(); } - CompositeBuffer GetRange(uint32_t PathIndex, uint64_t Offset, uint64_t Size) + CompositeBuffer GetRange(uint32_t SequenceIndex, uint64_t Offset, uint64_t Size) { - auto CacheIt = - std::find_if(m_OpenFiles.begin(), m_OpenFiles.end(), [PathIndex](const auto& Lhs) { return Lhs.first == PathIndex; }); + auto CacheIt = std::find_if(m_OpenFiles.begin(), m_OpenFiles.end(), [SequenceIndex](const auto& Lhs) { + return Lhs.first == SequenceIndex; + }); if (CacheIt != m_OpenFiles.end()) { if (CacheIt != m_OpenFiles.begin()) { auto CachedFile(std::move(CacheIt->second)); m_OpenFiles.erase(CacheIt); - m_OpenFiles.insert(m_OpenFiles.begin(), std::make_pair(PathIndex, std::move(CachedFile))); + m_OpenFiles.insert(m_OpenFiles.begin(), std::make_pair(SequenceIndex, std::move(CachedFile))); } CompositeBuffer Result = m_OpenFiles.front().second->GetRange(Offset, Size); m_DiskStats.ReadByteCount += Result.GetSize(); return Result; } - const std::filesystem::path AttachmentPath = (m_Path / m_Paths[PathIndex]).make_preferred(); - if (Size == m_RawSizes[PathIndex]) + const uint32_t LocalPathIndex = m_LocalLookup.SequenceIndexFirstPathIndex[SequenceIndex]; + const std::filesystem::path LocalFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred(); + if (Size == m_LocalContent.RawSizes[LocalPathIndex]) { - IoBuffer Result = IoBufferBuilder::MakeFromFile(AttachmentPath); + IoBuffer Result = IoBufferBuilder::MakeFromFile(LocalFilePath); m_DiskStats.OpenReadCount++; m_DiskStats.ReadByteCount += Result.GetSize(); return CompositeBuffer(SharedBuffer(Result)); @@ -1010,7 +1047,7 @@ namespace { m_OpenFiles.pop_back(); m_DiskStats.CurrentOpenFileCount--; } - m_OpenFiles.insert(m_OpenFiles.begin(), std::make_pair(PathIndex, std::make_unique<BufferedOpenFile>(AttachmentPath))); + m_OpenFiles.insert(m_OpenFiles.begin(), std::make_pair(SequenceIndex, std::make_unique<BufferedOpenFile>(LocalFilePath))); CompositeBuffer Result = m_OpenFiles.front().second->GetRange(Offset, Size); m_DiskStats.ReadByteCount += Result.GetSize(); m_DiskStats.OpenReadCount++; @@ -1020,23 +1057,14 @@ namespace { private: const std::filesystem::path m_Path; - const std::vector<std::filesystem::path>& m_Paths; - const std::vector<uint64_t>& m_RawSizes; + const ChunkedFolderContent& m_LocalContent; + const ChunkedContentLookup& m_LocalLookup; std::vector<std::pair<uint32_t, std::unique_ptr<BufferedOpenFile>>> m_OpenFiles; DiskStatistics& m_DiskStats; }; - CompositeBuffer ValidateBlob(BuildStorage& Storage, - const Oid& BuildId, - const IoHash& BlobHash, - uint64_t& OutCompressedSize, - uint64_t& OutDecompressedSize) + CompositeBuffer ValidateBlob(IoBuffer&& Payload, const IoHash& BlobHash, uint64_t& OutCompressedSize, uint64_t& OutDecompressedSize) { - IoBuffer Payload = Storage.GetBuildBlob(BuildId, BlobHash); - if (!Payload) - { - throw std::runtime_error(fmt::format("Blob {} could not be found", BlobHash)); - } if (Payload.GetContentType() != ZenContentType::kCompressedBinary) { throw std::runtime_error(fmt::format("Blob {} ({} bytes) has unexpected content type '{}'", @@ -1080,13 +1108,26 @@ namespace { return DecompressedComposite; } - ChunkBlockDescription ValidateChunkBlock(BuildStorage& Storage, - const Oid& BuildId, + CompositeBuffer ValidateBlob(BuildStorage& Storage, + const Oid& BuildId, + const IoHash& BlobHash, + uint64_t& OutCompressedSize, + uint64_t& OutDecompressedSize) + { + IoBuffer Payload = Storage.GetBuildBlob(BuildId, BlobHash); + if (!Payload) + { + throw std::runtime_error(fmt::format("Blob {} could not be found", BlobHash)); + } + return ValidateBlob(std::move(Payload), BlobHash, OutCompressedSize, OutDecompressedSize); + } + + ChunkBlockDescription ValidateChunkBlock(IoBuffer&& Payload, const IoHash& BlobHash, uint64_t& OutCompressedSize, uint64_t& OutDecompressedSize) { - CompositeBuffer BlockBuffer = ValidateBlob(Storage, BuildId, BlobHash, OutCompressedSize, OutDecompressedSize); + CompositeBuffer BlockBuffer = ValidateBlob(std::move(Payload), BlobHash, OutCompressedSize, OutDecompressedSize); return GetChunkBlockDescription(BlockBuffer.Flatten(), BlobHash); } @@ -1097,11 +1138,12 @@ namespace { { auto It = Lookup.ChunkHashToChunkIndex.find(ChunkHash); ZEN_ASSERT(It != Lookup.ChunkHashToChunkIndex.end()); - uint32_t ChunkIndex = It->second; - std::span<const ChunkedContentLookup::ChunkLocation> ChunkLocations = GetChunkLocations(Lookup, ChunkIndex); + uint32_t ChunkIndex = It->second; + std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkLocations = GetChunkSequenceLocations(Lookup, ChunkIndex); ZEN_ASSERT(!ChunkLocations.empty()); - CompositeBuffer Chunk = - OpenFileCache.GetRange(ChunkLocations[0].PathIndex, ChunkLocations[0].Offset, Content.ChunkedContent.ChunkRawSizes[ChunkIndex]); + CompositeBuffer Chunk = OpenFileCache.GetRange(ChunkLocations[0].SequenceIndex, + ChunkLocations[0].Offset, + Content.ChunkedContent.ChunkRawSizes[ChunkIndex]); ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == ChunkHash); return Chunk; }; @@ -1113,7 +1155,7 @@ namespace { ChunkBlockDescription& OutBlockDescription, DiskStatistics& DiskStats) { - ReadFileCache OpenFileCache(DiskStats, Path, Content.Paths, Content.RawSizes, 4); + ReadFileCache OpenFileCache(DiskStats, Path, Content, Lookup, 4); std::vector<std::pair<IoHash, FetchChunkFunc>> BlockContent; BlockContent.reserve(ChunksInBlock.size()); @@ -1135,6 +1177,187 @@ namespace { return GenerateChunkBlock(std::move(BlockContent), OutBlockDescription); }; + void ValidateBuildPart(BuildStorage& Storage, const Oid& BuildId, Oid BuildPartId, const std::string_view BuildPartName) + { + Stopwatch Timer; + auto _ = MakeGuard([&]() { + ZEN_CONSOLE("Validated build part {}/{} ('{}') in {}", + BuildId, + BuildPartId, + BuildPartName, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + }); + CbObject Build = Storage.GetBuild(BuildId); + if (!BuildPartName.empty()) + { + BuildPartId = Build["parts"sv].AsObjectView()[BuildPartName].AsObjectId(); + if (BuildPartId == Oid::Zero) + { + throw std::runtime_error(fmt::format("Build {} does not have a part named '{}'", BuildId, BuildPartName)); + } + } + CbObject BuildPart = Storage.GetBuildPart(BuildId, BuildPartId); + ZEN_CONSOLE("Validating build part {}/{} ({})", BuildId, BuildPartId, NiceBytes(BuildPart.GetSize())); + std::vector<IoHash> ChunkAttachments; + for (CbFieldView LooseFileView : BuildPart["chunkAttachments"sv].AsObjectView()["rawHashes"sv]) + { + ChunkAttachments.push_back(LooseFileView.AsBinaryAttachment()); + } + std::vector<IoHash> BlockAttachments; + for (CbFieldView BlocksView : BuildPart["blockAttachments"sv].AsObjectView()["rawHashes"sv]) + { + BlockAttachments.push_back(BlocksView.AsBinaryAttachment()); + } + + std::vector<ChunkBlockDescription> VerifyBlockDescriptions = Storage.GetBlockMetadata(BuildId, BlockAttachments); + if (VerifyBlockDescriptions.size() != BlockAttachments.size()) + { + throw std::runtime_error(fmt::format("Uploaded blocks metadata could not all be found, {} blocks metadata is missing", + BlockAttachments.size() - VerifyBlockDescriptions.size())); + } + + WorkerThreadPool& NetworkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // + WorkerThreadPool& VerifyPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // + ParallellWork Work(AbortFlag); + + ProgressBar ProgressBar(UsePlainProgress); + + uint64_t AttachmentsToVerifyCount = ChunkAttachments.size() + BlockAttachments.size(); + std::atomic<uint64_t> DownloadedAttachmentCount = 0; + std::atomic<uint64_t> VerifiedAttachmentCount = 0; + std::atomic<uint64_t> DownloadedByteCount = 0; + std::atomic<uint64_t> VerifiedByteCount = 0; + FilteredRate FilteredDownloadedBytesPerSecond; + FilteredRate FilteredVerifiedBytesPerSecond; + + for (const IoHash& ChunkAttachment : ChunkAttachments) + { + Work.ScheduleWork( + NetworkPool, + [&, ChunkAttachment](std::atomic<bool>&) { + if (!AbortFlag) + { + FilteredDownloadedBytesPerSecond.Start(); + IoBuffer Payload = Storage.GetBuildBlob(BuildId, ChunkAttachment); + DownloadedAttachmentCount++; + DownloadedByteCount += Payload.GetSize(); + if (DownloadedAttachmentCount.load() == AttachmentsToVerifyCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + if (!Payload) + { + throw std::runtime_error(fmt::format("Chunk attachment {} could not be found", ChunkAttachment)); + } + if (!AbortFlag) + { + Work.ScheduleWork( + VerifyPool, + [&, Payload = std::move(Payload), ChunkAttachment](std::atomic<bool>&) { + if (!AbortFlag) + { + FilteredVerifiedBytesPerSecond.Start(); + + uint64_t CompressedSize; + uint64_t DecompressedSize; + ValidateBlob(IoBuffer(Payload), ChunkAttachment, CompressedSize, DecompressedSize); + ZEN_CONSOLE_VERBOSE("Chunk attachment {} ({} -> {}) is valid", + ChunkAttachment, + NiceBytes(CompressedSize), + NiceBytes(DecompressedSize)); + VerifiedAttachmentCount++; + VerifiedByteCount += DecompressedSize; + if (VerifiedAttachmentCount.load() == AttachmentsToVerifyCount) + { + FilteredVerifiedBytesPerSecond.Stop(); + } + } + }, + Work.DefaultErrorFunction()); + } + } + }, + Work.DefaultErrorFunction()); + } + + for (const IoHash& BlockAttachment : BlockAttachments) + { + Work.ScheduleWork( + NetworkPool, + [&, BlockAttachment](std::atomic<bool>&) { + if (!AbortFlag) + { + FilteredDownloadedBytesPerSecond.Start(); + IoBuffer Payload = Storage.GetBuildBlob(BuildId, BlockAttachment); + DownloadedAttachmentCount++; + DownloadedByteCount += Payload.GetSize(); + if (DownloadedAttachmentCount.load() == AttachmentsToVerifyCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + if (!Payload) + { + throw std::runtime_error(fmt::format("Block attachment {} could not be found", BlockAttachment)); + } + if (!AbortFlag) + { + Work.ScheduleWork( + VerifyPool, + [&, Payload = std::move(Payload), BlockAttachment](std::atomic<bool>&) { + if (!AbortFlag) + { + FilteredVerifiedBytesPerSecond.Start(); + + uint64_t CompressedSize; + uint64_t DecompressedSize; + ValidateChunkBlock(IoBuffer(Payload), BlockAttachment, CompressedSize, DecompressedSize); + ZEN_CONSOLE_VERBOSE("Chunk block {} ({} -> {}) is valid", + BlockAttachment, + NiceBytes(CompressedSize), + NiceBytes(DecompressedSize)); + VerifiedAttachmentCount++; + VerifiedByteCount += DecompressedSize; + if (VerifiedAttachmentCount.load() == AttachmentsToVerifyCount) + { + FilteredVerifiedBytesPerSecond.Stop(); + } + } + }, + Work.DefaultErrorFunction()); + } + } + }, + Work.DefaultErrorFunction()); + } + + Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(IsAborted, PendingWork); + + FilteredDownloadedBytesPerSecond.Update(DownloadedByteCount); + FilteredVerifiedBytesPerSecond.Update(VerifiedByteCount); + + std::string Details = fmt::format("Downloaded {}/{} ({}, {}bits/s). Verified {}/{} ({}, {}B/s)", + DownloadedAttachmentCount.load(), + AttachmentsToVerifyCount, + NiceBytes(DownloadedByteCount.load()), + NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8), + VerifiedAttachmentCount.load(), + AttachmentsToVerifyCount, + NiceBytes(VerifiedByteCount.load()), + NiceNum(FilteredVerifiedBytesPerSecond.GetCurrent())); + + ProgressBar.UpdateState( + {.Task = "Validating blobs ", + .Details = Details, + .TotalCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2), + .RemainingCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2 - + (DownloadedAttachmentCount.load() + VerifiedAttachmentCount.load()))}, + false); + }); + + ProgressBar.Finish(); + } + void ArrangeChunksIntoBlocks(const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, uint64_t MaxBlockSize, @@ -1142,13 +1365,13 @@ namespace { std::vector<std::vector<uint32_t>>& OutBlocks) { std::sort(ChunkIndexes.begin(), ChunkIndexes.end(), [&Content, &Lookup](uint32_t Lhs, uint32_t Rhs) { - const ChunkedContentLookup::ChunkLocation& LhsLocation = GetChunkLocations(Lookup, Lhs)[0]; - const ChunkedContentLookup::ChunkLocation& RhsLocation = GetChunkLocations(Lookup, Rhs)[0]; - if (LhsLocation.PathIndex < RhsLocation.PathIndex) + const ChunkedContentLookup::ChunkSequenceLocation& LhsLocation = GetChunkSequenceLocations(Lookup, Lhs)[0]; + const ChunkedContentLookup::ChunkSequenceLocation& RhsLocation = GetChunkSequenceLocations(Lookup, Rhs)[0]; + if (LhsLocation.SequenceIndex < RhsLocation.SequenceIndex) { return true; } - else if (LhsLocation.PathIndex > RhsLocation.PathIndex) + else if (LhsLocation.SequenceIndex > RhsLocation.SequenceIndex) { return false; } @@ -1171,7 +1394,8 @@ namespace { // between source paths for chunks. Break the block at the last such break if any. ZEN_ASSERT(ChunkIndexOffset > ChunkIndexStart); - const uint32_t ChunkPathIndex = Lookup.ChunkLocations[Lookup.ChunkLocationOffset[ChunkIndex]].PathIndex; + const uint32_t ChunkSequenceIndex = + Lookup.ChunkSequenceLocations[Lookup.ChunkSequenceLocationOffset[ChunkIndex]].SequenceIndex; uint64_t ScanBlockSize = BlockSize; @@ -1185,8 +1409,9 @@ namespace { break; } - const uint32_t TestPathIndex = Lookup.ChunkLocations[Lookup.ChunkLocationOffset[TestChunkIndex]].PathIndex; - if (ChunkPathIndex != TestPathIndex) + const uint32_t TestSequenceIndex = + Lookup.ChunkSequenceLocations[Lookup.ChunkSequenceLocationOffset[TestChunkIndex]].SequenceIndex; + if (ChunkSequenceIndex != TestSequenceIndex) { ChunkIndexOffset = ScanChunkIndexOffset + 1; break; @@ -1235,10 +1460,9 @@ namespace { const IoHash& ChunkHash = Content.ChunkedContent.ChunkHashes[ChunkIndex]; const uint64_t ChunkSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; - const ChunkedContentLookup::ChunkLocation& Source = GetChunkLocations(Lookup, ChunkIndex)[0]; - - IoBuffer RawSource = - IoBufferBuilder::MakeFromFile((Path / Content.Paths[Source.PathIndex]).make_preferred(), Source.Offset, ChunkSize); + const ChunkedContentLookup::ChunkSequenceLocation& Source = GetChunkSequenceLocations(Lookup, ChunkIndex)[0]; + const std::uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[Source.SequenceIndex]; + IoBuffer RawSource = IoBufferBuilder::MakeFromFile((Path / Content.Paths[PathIndex]).make_preferred(), Source.Offset, ChunkSize); if (!RawSource) { throw std::runtime_error(fmt::format("Failed fetching chunk {}", ChunkHash)); @@ -1418,7 +1642,7 @@ namespace { FilteredGeneratedBytesPerSecond.Update(GenerateBlocksStats.GeneratedBlockByteCount.load()); FilteredUploadedBytesPerSecond.Update(UploadStats.BlocksBytes.load()); - std::string Details = fmt::format("Generated {}/{} ({}, {}B/s) and uploaded {}/{} ({}, {}bits/s) blocks", + std::string Details = fmt::format("Generated {}/{} ({}, {}B/s). Uploaded {}/{} ({}, {}bits/s)", GenerateBlocksStats.GeneratedBlockCount.load(), NewBlockCount, NiceBytes(GenerateBlocksStats.GeneratedBlockByteCount.load()), @@ -1470,10 +1694,11 @@ namespace { ParallellWork Work(AbortFlag); - std::atomic<size_t> UploadedBlockSize = 0; - std::atomic<size_t> UploadedBlockCount = 0; - std::atomic<size_t> UploadedChunkSize = 0; - std::atomic<uint32_t> UploadedChunkCount = 0; + std::atomic<size_t> UploadedBlockSize = 0; + std::atomic<size_t> UploadedBlockCount = 0; + std::atomic<size_t> UploadedRawChunkSize = 0; + std::atomic<size_t> UploadedCompressedChunkSize = 0; + std::atomic<uint32_t> UploadedChunkCount = 0; tsl::robin_map<uint32_t, uint32_t> ChunkIndexToLooseChunkOrderIndex; ChunkIndexToLooseChunkOrderIndex.reserve(LooseChunkIndexes.size()); @@ -1485,8 +1710,8 @@ namespace { std::vector<size_t> BlockIndexes; std::vector<uint32_t> LooseChunkOrderIndexes; - uint64_t TotalChunksSize = 0; - uint64_t TotalBlocksSize = 0; + uint64_t TotalLooseChunksSize = 0; + uint64_t TotalBlocksSize = 0; for (const IoHash& RawHash : RawHashes) { if (auto It = NewBlocks.BlockHashToBlockIndex.find(RawHash); It != NewBlocks.BlockHashToBlockIndex.end()) @@ -1501,11 +1726,11 @@ namespace { LooseOrderIndexIt != ChunkIndexToLooseChunkOrderIndex.end()) { LooseChunkOrderIndexes.push_back(LooseOrderIndexIt->second); - TotalChunksSize += Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; + TotalLooseChunksSize += Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; } } } - uint64_t TotalSize = TotalChunksSize + TotalBlocksSize; + uint64_t TotalRawSize = TotalLooseChunksSize + TotalBlocksSize; const size_t UploadBlockCount = BlockIndexes.size(); const uint32_t UploadChunkCount = gsl::narrow<uint32_t>(LooseChunkOrderIndexes.size()); @@ -1548,10 +1773,10 @@ namespace { Work.DefaultErrorFunction()); }; - auto AsyncUploadLooseChunk = [&](const IoHash& RawHash, CompositeBuffer&& Payload) { + auto AsyncUploadLooseChunk = [&](const IoHash& RawHash, uint64_t RawSize, CompositeBuffer&& Payload) { Work.ScheduleWork( UploadChunkPool, - [&, RawHash, Payload = CompositeBuffer(std::move(Payload))](std::atomic<bool>&) { + [&, RawHash, RawSize, Payload = CompositeBuffer(std::move(Payload))](std::atomic<bool>&) { if (!AbortFlag) { const uint64_t PayloadSize = Payload.GetSize(); @@ -1571,9 +1796,9 @@ namespace { PartPayload.SetContentType(ZenContentType::kBinary); return PartPayload; }, - [&](uint64_t SentBytes, bool IsComplete) { + [&, RawSize](uint64_t SentBytes, bool IsComplete) { UploadStats.ChunksBytes += SentBytes; - UploadedChunkSize += SentBytes; + UploadedCompressedChunkSize += SentBytes; if (IsComplete) { UploadStats.ChunkCount++; @@ -1582,6 +1807,7 @@ namespace { { FilteredUploadedBytesPerSecond.Stop(); } + UploadedRawChunkSize += RawSize; } }); for (auto& WorkPart : MultipartWork) @@ -1604,7 +1830,8 @@ namespace { ZEN_CONSOLE_VERBOSE("Uploaded chunk {} ({})", RawHash, NiceBytes(PayloadSize)); UploadStats.ChunksBytes += Payload.GetSize(); UploadStats.ChunkCount++; - UploadedChunkSize += Payload.GetSize(); + UploadedCompressedChunkSize += Payload.GetSize(); + UploadedRawChunkSize += RawSize; UploadedChunkCount++; if (UploadedChunkCount == UploadChunkCount) { @@ -1696,6 +1923,7 @@ namespace { std::atomic<uint64_t> CompressedLooseChunkCount = 0; std::atomic<uint64_t> CompressedLooseChunkByteCount = 0; + std::atomic<uint64_t> RawLooseChunkByteCount = 0; // Start compression of any non-precompressed loose chunks and schedule upload for (const uint32_t CompressLooseChunkOrderIndex : CompressLooseChunkOrderIndexes) @@ -1712,18 +1940,20 @@ namespace { Content.ChunkedContent.ChunkHashes[ChunkIndex], NiceBytes(Content.ChunkedContent.ChunkRawSizes[ChunkIndex]), NiceBytes(Payload.GetSize())); - UploadStats.ReadFromDiskBytes += Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; + const uint64_t ChunkRawSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; + UploadStats.ReadFromDiskBytes += ChunkRawSize; LooseChunksStats.CompressedChunkBytes += Payload.GetSize(); LooseChunksStats.CompressedChunkCount++; CompressedLooseChunkByteCount += Payload.GetSize(); CompressedLooseChunkCount++; + RawLooseChunkByteCount += ChunkRawSize; if (CompressedLooseChunkCount == CompressLooseChunkOrderIndexes.size()) { FilteredCompressedBytesPerSecond.Stop(); } if (!AbortFlag) { - AsyncUploadLooseChunk(Content.ChunkedContent.ChunkHashes[ChunkIndex], std::move(Payload)); + AsyncUploadLooseChunk(Content.ChunkedContent.ChunkHashes[ChunkIndex], ChunkRawSize, std::move(Payload)); } } }, @@ -1734,22 +1964,31 @@ namespace { ZEN_UNUSED(IsAborted, PendingWork); FilteredCompressedBytesPerSecond.Update(CompressedLooseChunkByteCount.load()); FilteredGenerateBlockBytesPerSecond.Update(GeneratedBlockByteCount.load()); - FilteredUploadedBytesPerSecond.Update(UploadedChunkSize.load() + UploadedBlockSize.load()); - uint64_t UploadedSize = UploadedChunkSize.load() + UploadedBlockSize.load(); + FilteredUploadedBytesPerSecond.Update(UploadedCompressedChunkSize.load() + UploadedBlockSize.load()); + uint64_t UploadedRawSize = UploadedRawChunkSize.load() + UploadedBlockSize.load(); + uint64_t UploadedCompressedSize = UploadedCompressedChunkSize.load() + UploadedBlockSize.load(); + + std::string Details = fmt::format( + "Compressed {}/{} ({}/{}) chunks. " + "Uploaded {}/{} ({}/{}) blobs " + "({} {}bits/s)", + CompressedLooseChunkCount.load(), + CompressLooseChunkOrderIndexes.size(), + NiceBytes(RawLooseChunkByteCount), + NiceBytes(TotalLooseChunksSize), + + UploadedBlockCount.load() + UploadedChunkCount.load(), + UploadBlockCount + UploadChunkCount, + NiceBytes(UploadedRawSize), + NiceBytes(TotalRawSize), + + NiceBytes(UploadedCompressedSize), + NiceNum(FilteredUploadedBytesPerSecond.GetCurrent())); + ProgressBar.UpdateState({.Task = "Uploading blobs ", - .Details = fmt::format("Compressed {}/{} chunks. " - "Uploaded {}/{} blobs ({}/{} {}bits/s)", - CompressedLooseChunkCount.load(), - CompressLooseChunkOrderIndexes.size(), - - UploadedBlockCount.load() + UploadedChunkCount.load(), - UploadBlockCount + UploadChunkCount, - - NiceBytes(UploadedChunkSize.load() + UploadedBlockSize.load()), - NiceBytes(TotalSize), - NiceNum(FilteredUploadedBytesPerSecond.GetCurrent())), - .TotalCount = gsl::narrow<uint64_t>(TotalSize), - .RemainingCount = gsl::narrow<uint64_t>(TotalSize - UploadedSize)}, + .Details = Details, + .TotalCount = gsl::narrow<uint64_t>(TotalRawSize), + .RemainingCount = gsl::narrow<uint64_t>(TotalRawSize - UploadedRawSize)}, false); }); @@ -1930,7 +2169,8 @@ namespace { bool AllowMultiparts, const CbObject& MetaData, bool CreateBuild, - bool IgnoreExistingBlocks) + bool IgnoreExistingBlocks, + bool PostUploadVerify) { Stopwatch ProcessTimer; @@ -2121,15 +2361,16 @@ namespace { UsePlainProgress ? 5000 : 200, [&](bool, std::ptrdiff_t) { FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load()); + std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found", + ChunkingStats.FilesProcessed.load(), + Content.Paths.size(), + NiceBytes(ChunkingStats.BytesHashed.load()), + NiceBytes(TotalRawSize), + NiceNum(FilteredBytesHashed.GetCurrent()), + ChunkingStats.UniqueChunksFound.load(), + NiceBytes(ChunkingStats.UniqueBytesFound.load())); ProgressBar.UpdateState({.Task = "Scanning files ", - .Details = fmt::format("{}/{} ({}/{}, {}B/s) files, {} ({}) chunks found", - ChunkingStats.FilesProcessed.load(), - Content.Paths.size(), - NiceBytes(ChunkingStats.BytesHashed.load()), - NiceBytes(TotalRawSize), - NiceNum(FilteredBytesHashed.GetCurrent()), - ChunkingStats.UniqueChunksFound.load(), - NiceBytes(ChunkingStats.UniqueBytesFound.load())), + .Details = Details, .TotalCount = TotalRawSize, .RemainingCount = TotalRawSize - ChunkingStats.BytesHashed.load()}, false); @@ -2422,19 +2663,18 @@ namespace { std::vector<IoHash> OutLooseChunkHashes; std::vector<uint64_t> OutLooseChunkRawSizes; std::vector<IoHash> OutBlockRawHashes; - - ReadBuildContentFromCompactBinary(PartManifestWriter.Save(), - VerifyFolderContent.Platform, - VerifyFolderContent.Paths, - VerifyFolderContent.RawHashes, - VerifyFolderContent.RawSizes, - VerifyFolderContent.Attributes, - VerifyFolderContent.ChunkedContent.SequenceRawHashes, - VerifyFolderContent.ChunkedContent.ChunkCounts, - OutAbsoluteChunkOrders, - OutLooseChunkHashes, - OutLooseChunkRawSizes, - OutBlockRawHashes); + 4 ReadBuildContentFromCompactBinary(PartManifestWriter.Save(), + VerifyFolderContent.Platform, + VerifyFolderContent.Paths, + VerifyFolderContent.RawHashes, + VerifyFolderContent.RawSizes, + VerifyFolderContent.Attributes, + VerifyFolderContent.ChunkedContent.SequenceRawHashes, + VerifyFolderContent.ChunkedContent.ChunkCounts, + OutAbsoluteChunkOrders, + OutLooseChunkHashes, + OutLooseChunkRawSizes, + OutBlockRawHashes); ZEN_ASSERT(OutBlockRawHashes == AllChunkBlockHashes); for (uint32_t OrderIndex = 0; OrderIndex < OutAbsoluteChunkOrders.size(); OrderIndex++) @@ -2483,7 +2723,7 @@ namespace { Stopwatch PutBuildPartResultTimer; std::pair<IoHash, std::vector<IoHash>> PutBuildPartResult = Storage.PutBuildPart(BuildId, BuildPartId, BuildPartName, PartManifest); - ZEN_CONSOLE("PutBuildPart took {}, payload size {}. {} attachments are missing.", + ZEN_CONSOLE("PutBuildPart took {}, payload size {}. {} attachments are needed.", NiceTimeSpanMs(PutBuildPartResultTimer.GetElapsedTimeMs()), NiceBytes(PartManifest.GetSize()), PutBuildPartResult.second.size()); @@ -2504,7 +2744,7 @@ namespace { ZEN_CONSOLE( "Generated {} ({} {}B/s) and uploaded {} ({}) blocks. " "Compressed {} ({} {}B/s) and uploaded {} ({}) chunks. " - "Transferred {} ({}B/s) in {}", + "Transferred {} ({}bits/s) in {}", TempGenerateBlocksStats.GeneratedBlockCount.load(), NiceBytes(TempGenerateBlocksStats.GeneratedBlockByteCount.load()), NiceNum(GetBytesPerSecond(TempGenerateBlocksStats.GenerateBlocksElapsedWallTimeUS, @@ -2571,7 +2811,7 @@ namespace { UploadAttachments(PutBuildPartResult.second); } - while (true) + while (!AbortFlag) { Stopwatch FinalizeBuildPartTimer; std::vector<IoHash> Needs = Storage.FinalizeBuildPart(BuildId, BuildPartId, PartHash); @@ -2586,14 +2826,14 @@ namespace { UploadAttachments(Needs); } - if (CreateBuild) + if (CreateBuild && !AbortFlag) { Stopwatch FinalizeBuildTimer; Storage.FinalizeBuild(BuildId); ZEN_CONSOLE("FinalizeBuild took {}", NiceTimeSpanMs(FinalizeBuildTimer.GetElapsedTimeMs())); } - if (!NewBlocks.BlockDescriptions.empty()) + if (!NewBlocks.BlockDescriptions.empty() && !AbortFlag) { uint64_t UploadBlockMetadataCount = 0; std::vector<IoHash> BlockHashes; @@ -2619,13 +2859,11 @@ namespace { UploadStats.ElapsedWallTimeUS += ElapsedUS; ZEN_CONSOLE("Uploaded metadata for {} blocks in {}", UploadBlockMetadataCount, NiceTimeSpanMs(ElapsedUS / 1000)); } + } - std::vector<ChunkBlockDescription> VerifyBlockDescriptions = Storage.GetBlockMetadata(BuildId, BlockHashes); - if (VerifyBlockDescriptions.size() != BlockHashes.size()) - { - throw std::runtime_error(fmt::format("Uploaded blocks could not all be found, {} blocks are missing", - BlockHashes.size() - VerifyBlockDescriptions.size())); - } + if (PostUploadVerify && !AbortFlag) + { + ValidateBuildPart(Storage, BuildId, BuildPartId, BuildPartName); } const double DeltaByteCountPercent = @@ -2786,7 +3024,7 @@ namespace { NiceTimeSpanMs(ProcessTimer.GetElapsedTimeMs())); } - void VerifyFolder(const ChunkedFolderContent& Content, const std::filesystem::path& Path) + void VerifyFolder(const ChunkedFolderContent& Content, const std::filesystem::path& Path, bool VerifyFileHash) { ProgressBar ProgressBar(UsePlainProgress); std::atomic<uint64_t> FilesVerified(0); @@ -2879,18 +3117,18 @@ namespace { }); FilesFailed++; } - else if (SizeOnDisk > 0) + else if (SizeOnDisk > 0 && VerifyFileHash) { const IoHash& ExpectedRawHash = Content.RawHashes[PathIndex]; IoBuffer Buffer = IoBufferBuilder::MakeFromFile(TargetPath); IoHash RawHash = IoHash::HashBuffer(Buffer); if (RawHash != ExpectedRawHash) { - uint64_t FileOffset = 0; - const uint32_t SequenceRawHashesIndex = Lookup.RawHashToSequenceRawHashIndex.at(ExpectedRawHash); - const uint32_t OrderOffset = Lookup.SequenceRawHashIndexChunkOrderOffset[SequenceRawHashesIndex]; + uint64_t FileOffset = 0; + const uint32_t SequenceIndex = Lookup.RawHashToSequenceIndex.at(ExpectedRawHash); + const uint32_t OrderOffset = Lookup.SequenceIndexChunkOrderOffset[SequenceIndex]; for (uint32_t OrderIndex = OrderOffset; - OrderIndex < OrderOffset + Content.ChunkedContent.ChunkCounts[SequenceRawHashesIndex]; + OrderIndex < OrderOffset + Content.ChunkedContent.ChunkCounts[SequenceIndex]; OrderIndex++) { uint32_t ChunkIndex = Content.ChunkedContent.ChunkOrders[OrderIndex]; @@ -2933,12 +3171,13 @@ namespace { Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted, PendingWork); + std::string Details = fmt::format("Verified {}/{} ({}). Failed files: {}", + FilesVerified.load(), + PathCount, + NiceBytes(ReadBytes.load()), + FilesFailed.load()); ProgressBar.UpdateState({.Task = "Verifying files ", - .Details = fmt::format("Verified {} files out of {}. Verfied: {}. Failed files: {}", - FilesVerified.load(), - PathCount, - NiceBytes(ReadBytes.load()), - FilesFailed.load()), + .Details = Details, .TotalCount = gsl::narrow<uint64_t>(PathCount), .RemainingCount = gsl::narrow<uint64_t>(PathCount - FilesVerified.load())}, false); @@ -3018,19 +3257,19 @@ namespace { std::unique_ptr<BasicFileWriter> OpenFileWriter; }; - std::vector<const ChunkedContentLookup::ChunkLocation*> GetRemainingChunkTargets( - const std::vector<bool>& RemotePathIndexWantsCopyFromCacheFlags, - const ChunkedContentLookup& Lookup, - uint32_t ChunkIndex) + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> GetRemainingChunkTargets( + std::span<const std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + const ChunkedContentLookup& Lookup, + uint32_t ChunkIndex) { - std::span<const ChunkedContentLookup::ChunkLocation> ChunkSources = GetChunkLocations(Lookup, ChunkIndex); - std::vector<const ChunkedContentLookup::ChunkLocation*> ChunkTargetPtrs; + std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkSources = GetChunkSequenceLocations(Lookup, ChunkIndex); + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs; if (!ChunkSources.empty()) { ChunkTargetPtrs.reserve(ChunkSources.size()); - for (const ChunkedContentLookup::ChunkLocation& Source : ChunkSources) + for (const ChunkedContentLookup::ChunkSequenceLocation& Source : ChunkSources) { - if (!RemotePathIndexWantsCopyFromCacheFlags[Source.PathIndex]) + if (SequenceIndexChunksLeftToWriteCounters[Source.SequenceIndex].load() > 0) { ChunkTargetPtrs.push_back(&Source); } @@ -3039,20 +3278,20 @@ namespace { return ChunkTargetPtrs; }; - bool WriteBlockToDisk(const std::filesystem::path& CacheFolderPath, - const ChunkedFolderContent& Content, - const std::vector<bool>& RemotePathIndexWantsCopyFromCacheFlags, - const CompositeBuffer& DecompressedBlockBuffer, - const ChunkedContentLookup& Lookup, - std::atomic<bool>* RemoteChunkIndexNeedsCopyFromSourceFlags, - uint32_t& OutChunksComplete, - uint64_t& OutBytesWritten) + bool WriteBlockToDisk(const std::filesystem::path& CacheFolderPath, + const ChunkedFolderContent& RemoteContent, + std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + const CompositeBuffer& DecompressedBlockBuffer, + const ChunkedContentLookup& Lookup, + std::atomic<bool>* RemoteChunkIndexNeedsCopyFromSourceFlags, + uint32_t& OutChunksComplete, + uint64_t& OutBytesWritten) { std::vector<CompositeBuffer> ChunkBuffers; struct WriteOpData { - const ChunkedContentLookup::ChunkLocation* Target; - size_t ChunkBufferIndex; + const ChunkedContentLookup::ChunkSequenceLocation* Target; + size_t ChunkBufferIndex; }; std::vector<WriteOpData> WriteOps; @@ -3063,9 +3302,9 @@ namespace { [&](CompressedBuffer&& Chunk, const IoHash& ChunkHash) { if (auto It = Lookup.ChunkHashToChunkIndex.find(ChunkHash); It != Lookup.ChunkHashToChunkIndex.end()) { - const uint32_t ChunkIndex = It->second; - std::vector<const ChunkedContentLookup::ChunkLocation*> ChunkTargetPtrs = - GetRemainingChunkTargets(RemotePathIndexWantsCopyFromCacheFlags, Lookup, ChunkIndex); + const uint32_t ChunkIndex = It->second; + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = + GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, Lookup, ChunkIndex); if (!ChunkTargetPtrs.empty()) { @@ -3078,8 +3317,8 @@ namespace { throw std::runtime_error(fmt::format("Decompression of build blob {} failed", ChunkHash)); } ZEN_ASSERT_SLOW(ChunkHash == IoHash::HashBuffer(Decompressed)); - ZEN_ASSERT(Decompressed.GetSize() == Content.ChunkedContent.ChunkRawSizes[ChunkIndex]); - for (const ChunkedContentLookup::ChunkLocation* Target : ChunkTargetPtrs) + ZEN_ASSERT(Decompressed.GetSize() == RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]); + for (const ChunkedContentLookup::ChunkSequenceLocation* Target : ChunkTargetPtrs) { WriteOps.push_back(WriteOpData{.Target = Target, .ChunkBufferIndex = ChunkBuffers.size()}); } @@ -3095,11 +3334,11 @@ namespace { if (!AbortFlag) { std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOpData& Lhs, const WriteOpData& Rhs) { - if (Lhs.Target->PathIndex < Rhs.Target->PathIndex) + if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex) { return true; } - if (Lhs.Target->PathIndex > Rhs.Target->PathIndex) + if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex) { return false; } @@ -3114,23 +3353,50 @@ namespace { { break; } - const CompositeBuffer& Chunk = ChunkBuffers[WriteOp.ChunkBufferIndex]; - const uint32_t PathIndex = WriteOp.Target->PathIndex; - const uint64_t ChunkSize = Chunk.GetSize(); - const uint64_t FileOffset = WriteOp.Target->Offset; - ZEN_ASSERT(FileOffset + ChunkSize <= Content.RawSizes[PathIndex]); + const CompositeBuffer& Chunk = ChunkBuffers[WriteOp.ChunkBufferIndex]; + const uint32_t SequenceIndex = WriteOp.Target->SequenceIndex; + ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() <= + RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]); + ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() > 0); + const uint64_t ChunkSize = Chunk.GetSize(); + const uint64_t FileOffset = WriteOp.Target->Offset; + const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex]; + ZEN_ASSERT(FileOffset + ChunkSize <= RemoteContent.RawSizes[PathIndex]); OpenFileCache.WriteToFile<CompositeBuffer>( - PathIndex, - [&CacheFolderPath, &Content](uint32_t TargetIndex) { - return (CacheFolderPath / Content.RawHashes[TargetIndex].ToHexString()).make_preferred(); + SequenceIndex, + [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) { + return GetTempChunkedSequenceFileName(CacheFolderPath, + RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); }, Chunk, FileOffset, - Content.RawSizes[PathIndex]); + RemoteContent.RawSizes[PathIndex]); OutBytesWritten += ChunkSize; } } + if (!AbortFlag) + { + // Write tracking, updating this must be done without any files open (WriteFileCache) + for (const WriteOpData& WriteOp : WriteOps) + { + const uint32_t RemoteSequenceIndex = WriteOp.Target->SequenceIndex; + if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1) + { + const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + const IoHash VerifyChunkHash = IoHash::HashBuffer( + IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash))); + if (VerifyChunkHash != SequenceRawHash) + { + throw std::runtime_error(fmt::format("Written hunk sequence {} hash does not match expected hash {}", + VerifyChunkHash, + SequenceRawHash)); + } + std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash), + GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)); + } + } + } OutChunksComplete += gsl::narrow<uint32_t>(ChunkBuffers.size()); } } @@ -3171,50 +3437,52 @@ namespace { return Decompressed; } - void WriteChunkToDisk(const std::filesystem::path& CacheFolderPath, - const ChunkedFolderContent& Content, - std::span<const ChunkedContentLookup::ChunkLocation* const> ChunkTargets, - const CompositeBuffer& ChunkData, - WriteFileCache& OpenFileCache, - uint64_t& OutBytesWritten) + void WriteChunkToDisk(const std::filesystem::path& CacheFolderPath, + const ChunkedFolderContent& Content, + const ChunkedContentLookup& Lookup, + std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> ChunkTargets, + const CompositeBuffer& ChunkData, + WriteFileCache& OpenFileCache, + uint64_t& OutBytesWritten) { - for (const ChunkedContentLookup::ChunkLocation* TargetPtr : ChunkTargets) + for (const ChunkedContentLookup::ChunkSequenceLocation* TargetPtr : ChunkTargets) { - const auto& Target = *TargetPtr; - const uint64_t FileOffset = Target.Offset; + const auto& Target = *TargetPtr; + const uint64_t FileOffset = Target.Offset; + const uint32_t SequenceIndex = Target.SequenceIndex; + const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex]; OpenFileCache.WriteToFile( - Target.PathIndex, - [&CacheFolderPath, &Content](uint32_t TargetIndex) { - return (CacheFolderPath / Content.RawHashes[TargetIndex].ToHexString()).make_preferred(); - // return (Path / Content.Paths[TargetIndex]).make_preferred(); + SequenceIndex, + [&CacheFolderPath, &Content](uint32_t SequenceIndex) { + return GetTempChunkedSequenceFileName(CacheFolderPath, Content.ChunkedContent.SequenceRawHashes[SequenceIndex]); }, ChunkData, FileOffset, - Content.RawSizes[Target.PathIndex]); + Content.RawSizes[PathIndex]); OutBytesWritten += ChunkData.GetSize(); } } - void DownloadLargeBlob(BuildStorage& Storage, - const std::filesystem::path& TempFolderPath, - const std::filesystem::path& CacheFolderPath, - const ChunkedFolderContent& RemoteContent, - const ChunkedContentLookup& RemoteLookup, - const Oid& BuildId, - const IoHash& ChunkHash, - const std::uint64_t PreferredMultipartChunkSize, - const std::vector<const ChunkedContentLookup::ChunkLocation*>& ChunkTargetPtrs, - ParallellWork& Work, - WorkerThreadPool& WritePool, - WorkerThreadPool& NetworkPool, - std::atomic<uint64_t>& BytesWritten, - std::atomic<uint64_t>& WriteToDiskBytes, - std::atomic<uint64_t>& BytesDownloaded, - std::atomic<uint64_t>& LooseChunksBytes, - std::atomic<uint64_t>& DownloadedChunks, - std::atomic<uint32_t>& ChunksComplete, - std::atomic<uint64_t>& MultipartAttachmentCount) + void DownloadLargeBlob(BuildStorage& Storage, + const std::filesystem::path& TempFolderPath, + const std::filesystem::path& CacheFolderPath, + const ChunkedFolderContent& RemoteContent, + const ChunkedContentLookup& RemoteLookup, + const Oid& BuildId, + const IoHash& ChunkHash, + const std::uint64_t PreferredMultipartChunkSize, + const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs, + std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + ParallellWork& Work, + WorkerThreadPool& WritePool, + WorkerThreadPool& NetworkPool, + std::atomic<uint64_t>& WriteToDiskBytes, + std::atomic<uint64_t>& BytesDownloaded, + std::atomic<uint64_t>& LooseChunksBytes, + std::atomic<uint64_t>& DownloadedChunks, + std::atomic<uint32_t>& ChunksComplete, + std::atomic<uint64_t>& MultipartAttachmentCount) { struct WorkloadData { @@ -3242,11 +3510,11 @@ namespace { ChunkHash, &BytesDownloaded, &LooseChunksBytes, - &BytesWritten, &WriteToDiskBytes, &DownloadedChunks, &ChunksComplete, - ChunkTargetPtrs = std::vector<const ChunkedContentLookup::ChunkLocation*>( + SequenceIndexChunksLeftToWriteCounters, + ChunkTargetPtrs = std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>( ChunkTargetPtrs)](uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining) { BytesDownloaded += Chunk.GetSize(); LooseChunksBytes += Chunk.GetSize(); @@ -3268,8 +3536,8 @@ namespace { Offset, BytesRemaining, &ChunksComplete, - &BytesWritten, &WriteToDiskBytes, + SequenceIndexChunksLeftToWriteCounters, ChunkTargetPtrs](std::atomic<bool>&) { if (!AbortFlag) { @@ -3289,7 +3557,9 @@ namespace { uint64_t TotalBytesWritten = 0; - uint32_t ChunkIndex = RemoteLookup.ChunkHashToChunkIndex.at(ChunkHash); + auto ChunkHashToChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); + ZEN_ASSERT(ChunkHashToChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end()); + uint32_t ChunkIndex = ChunkHashToChunkIndexIt->second; SharedBuffer Chunk = Decompress(CompositeBuffer(std::move(CompressedPart)), ChunkHash, @@ -3304,14 +3574,38 @@ namespace { WriteChunkToDisk(CacheFolderPath, RemoteContent, + RemoteLookup, ChunkTargetPtrs, CompositeBuffer(Chunk), OpenFileCache, TotalBytesWritten); ChunksComplete++; - BytesWritten += TotalBytesWritten; WriteToDiskBytes += TotalBytesWritten; } + if (!AbortFlag) + { + // Write tracking, updating this must be done without any files open (WriteFileCache) + for (const ChunkedContentLookup::ChunkSequenceLocation* Location : ChunkTargetPtrs) + { + const uint32_t RemoteSequenceIndex = Location->SequenceIndex; + if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1) + { + const IoHash& SequenceRawHash = + RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile( + GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash))); + if (VerifyChunkHash != SequenceRawHash) + { + throw std::runtime_error( + fmt::format("Written chunk sequence {} hash does not match expected hash {}", + VerifyChunkHash, + SequenceRawHash)); + } + std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash), + GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)); + } + } + } } }, Work.DefaultErrorFunction()); @@ -3368,45 +3662,37 @@ namespace { const std::filesystem::path CacheFolderPath = Path / ZenTempCacheFolderName; - tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> RawHashToLocalPathIndex; + Stopwatch CacheMappingTimer; - { - Stopwatch CacheTimer; + uint64_t CacheMappedBytesForReuse = 0; + std::vector<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters(RemoteContent.ChunkedContent.SequenceRawHashes.size()); + // std::vector<bool> RemoteSequenceIndexIsCachedFlags(RemoteContent.ChunkedContent.SequenceRawHashes.size(), false); + std::vector<bool> RemoteChunkIndexIsCachedFlags(RemoteContent.ChunkedContent.ChunkHashes.size()); + // Guard if he same chunks is in multiple blocks (can happen due to block reuse, cache reuse blocks writes directly) + std::vector<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags(RemoteContent.ChunkedContent.ChunkHashes.size()); - for (uint32_t LocalPathIndex = 0; LocalPathIndex < LocalContent.Paths.size(); LocalPathIndex++) + // Pick up all whole files we can use from current local state + for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < RemoteContent.ChunkedContent.SequenceRawHashes.size(); + RemoteSequenceIndex++) + { + const IoHash& RemoteSequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + if (auto It = LocalLookup.RawHashToSequenceIndex.find(RemoteSequenceRawHash); It != LocalLookup.RawHashToSequenceIndex.end()) { - ZEN_ASSERT_SLOW(std::filesystem::exists(Path / LocalContent.Paths[LocalPathIndex])); - - if (LocalContent.RawSizes[LocalPathIndex] > 0) - { - const uint32_t SequenceRawHashIndex = - LocalLookup.RawHashToSequenceRawHashIndex.at(LocalContent.RawHashes[LocalPathIndex]); - uint32_t ChunkCount = LocalContent.ChunkedContent.ChunkCounts[SequenceRawHashIndex]; - if (ChunkCount > 0) - { - const IoHash LocalRawHash = LocalContent.RawHashes[LocalPathIndex]; - if (!RawHashToLocalPathIndex.contains(LocalRawHash)) - { - RawHashToLocalPathIndex.insert_or_assign(LocalRawHash, LocalPathIndex); - } - } - } + SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = 0; + const uint32_t RemotePathIndex = GetFirstPathIndexForRawHash(RemoteLookup, RemoteSequenceRawHash); + CacheMappedBytesForReuse += RemoteContent.RawSizes[RemotePathIndex]; + } + else + { + SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]; } } - Stopwatch CacheMappingTimer; - - std::atomic<uint64_t> BytesWritten = 0; - uint64_t CacheMappedBytesForReuse = 0; - - std::vector<bool> RemotePathIndexWantsCopyFromCacheFlags(RemoteContent.Paths.size(), false); - std::vector<bool> RemoteChunkIndexWantsCopyFromCacheFlags(RemoteContent.ChunkedContent.ChunkHashes.size()); - // Guard if he same chunks is in multiple blocks (can happen due to block reuse, cache reuse blocks writes directly) - std::vector<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags(RemoteContent.ChunkedContent.ChunkHashes.size()); + // Pick up all chunks in current local state struct CacheCopyData { - uint32_t LocalPathIndex; - std::vector<const ChunkedContentLookup::ChunkLocation*> TargetChunkLocationPtrs; + uint32_t LocalSequenceIndex; + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> TargetChunkLocationPtrs; struct ChunkTarget { uint32_t TargetChunkLocationCount; @@ -3418,43 +3704,30 @@ namespace { tsl::robin_map<IoHash, size_t, IoHash::Hasher> RawHashToCacheCopyDataIndex; std::vector<CacheCopyData> CacheCopyDatas; - uint32_t ChunkCountToWrite = 0; - // Pick up all whole files we can use from current local state - for (uint32_t RemotePathIndex = 0; RemotePathIndex < RemoteContent.Paths.size(); RemotePathIndex++) + for (uint32_t LocalSequenceIndex = 0; LocalSequenceIndex < LocalContent.ChunkedContent.SequenceRawHashes.size(); + LocalSequenceIndex++) { - const IoHash& RemoteRawHash = RemoteContent.RawHashes[RemotePathIndex]; - if (auto It = RawHashToLocalPathIndex.find(RemoteRawHash); It != RawHashToLocalPathIndex.end()) - { - RemotePathIndexWantsCopyFromCacheFlags[RemotePathIndex] = true; - CacheMappedBytesForReuse += RemoteContent.RawSizes[RemotePathIndex]; - } - } - - // Pick up all chunks in current local state - for (auto& CachedLocalFile : RawHashToLocalPathIndex) - { - const IoHash& LocalFileRawHash = CachedLocalFile.first; - const uint32_t LocalPathIndex = CachedLocalFile.second; - const uint32_t LocalSequenceRawHashIndex = LocalLookup.RawHashToSequenceRawHashIndex.at(LocalFileRawHash); - const uint32_t LocalOrderOffset = LocalLookup.SequenceRawHashIndexChunkOrderOffset[LocalSequenceRawHashIndex]; + const IoHash& LocalSequenceRawHash = LocalContent.ChunkedContent.SequenceRawHashes[LocalSequenceIndex]; + const uint32_t LocalOrderOffset = LocalLookup.SequenceIndexChunkOrderOffset[LocalSequenceIndex]; { uint64_t SourceOffset = 0; - const uint32_t LocalChunkCount = LocalContent.ChunkedContent.ChunkCounts[LocalSequenceRawHashIndex]; + const uint32_t LocalChunkCount = LocalContent.ChunkedContent.ChunkCounts[LocalSequenceIndex]; for (uint32_t LocalOrderIndex = 0; LocalOrderIndex < LocalChunkCount; LocalOrderIndex++) { const uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[LocalOrderOffset + LocalOrderIndex]; const IoHash& LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; const uint64_t LocalChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[LocalChunkIndex]; + if (auto RemoteChunkIt = RemoteLookup.ChunkHashToChunkIndex.find(LocalChunkHash); RemoteChunkIt != RemoteLookup.ChunkHashToChunkIndex.end()) { const uint32_t RemoteChunkIndex = RemoteChunkIt->second; - if (!RemoteChunkIndexWantsCopyFromCacheFlags[RemoteChunkIndex]) + if (!RemoteChunkIndexIsCachedFlags[RemoteChunkIndex]) { - std::vector<const ChunkedContentLookup::ChunkLocation*> ChunkTargetPtrs = - GetRemainingChunkTargets(RemotePathIndexWantsCopyFromCacheFlags, RemoteLookup, RemoteChunkIndex); + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = + GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex); if (!ChunkTargetPtrs.empty()) { @@ -3462,7 +3735,7 @@ namespace { .TargetChunkLocationCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()), .ChunkRawSize = LocalChunkRawSize, .CacheFileOffset = SourceOffset}; - if (auto CopySourceIt = RawHashToCacheCopyDataIndex.find(LocalFileRawHash); + if (auto CopySourceIt = RawHashToCacheCopyDataIndex.find(LocalSequenceRawHash); CopySourceIt != RawHashToCacheCopyDataIndex.end()) { CacheCopyData& Data = CacheCopyDatas[CopySourceIt->second]; @@ -3473,14 +3746,14 @@ namespace { } else { - RawHashToCacheCopyDataIndex.insert_or_assign(LocalFileRawHash, CacheCopyDatas.size()); + RawHashToCacheCopyDataIndex.insert_or_assign(LocalSequenceRawHash, CacheCopyDatas.size()); CacheCopyDatas.push_back( - CacheCopyData{.LocalPathIndex = LocalPathIndex, + CacheCopyData{.LocalSequenceIndex = LocalSequenceIndex, .TargetChunkLocationPtrs = ChunkTargetPtrs, .ChunkTargets = std::vector<CacheCopyData::ChunkTarget>{Target}}); } CacheMappedBytesForReuse += LocalChunkRawSize; - RemoteChunkIndexWantsCopyFromCacheFlags[RemoteChunkIndex] = true; + RemoteChunkIndexIsCachedFlags[RemoteChunkIndex] = true; } } } @@ -3489,16 +3762,24 @@ namespace { } } + if (CacheMappedBytesForReuse > 0) + { + ZEN_CONSOLE("Mapped {} cached data for reuse in {}", + NiceBytes(CacheMappedBytesForReuse), + NiceTimeSpanMs(CacheMappingTimer.GetElapsedTimeMs())); + } + + uint32_t ChunkCountToWrite = 0; for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++) { - if (RemoteChunkIndexWantsCopyFromCacheFlags[RemoteChunkIndex]) + if (RemoteChunkIndexIsCachedFlags[RemoteChunkIndex]) { ChunkCountToWrite++; } else { - std::vector<const ChunkedContentLookup::ChunkLocation*> ChunkTargetPtrs = - GetRemainingChunkTargets(RemotePathIndexWantsCopyFromCacheFlags, RemoteLookup, RemoteChunkIndex); + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = + GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex); if (!ChunkTargetPtrs.empty()) { RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex] = true; @@ -3506,12 +3787,13 @@ namespace { } } } + std::atomic<uint32_t> ChunkCountWritten = 0; - ZEN_CONSOLE("Mapped {} cached data for reuse in {}", - NiceBytes(CacheMappedBytesForReuse), - NiceTimeSpanMs(CacheMappingTimer.GetElapsedTimeMs())); { + FilteredRate FilteredDownloadedBytesPerSecond; + FilteredRate FilteredWrittenBytesPerSecond; + WorkerThreadPool& NetworkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // WorkerThreadPool& WritePool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // @@ -3520,110 +3802,6 @@ namespace { std::atomic<uint64_t> BytesDownloaded = 0; - for (size_t CopyDataIndex = 0; CopyDataIndex < CacheCopyDatas.size(); CopyDataIndex++) - { - if (AbortFlag) - { - break; - } - - Work.ScheduleWork( - WritePool, // GetSyncWorkerPool(),// - [&, CopyDataIndex](std::atomic<bool>&) { - if (!AbortFlag) - { - const CacheCopyData& CopyData = CacheCopyDatas[CopyDataIndex]; - const std::filesystem::path LocalFilePath = - (Path / LocalContent.Paths[CopyData.LocalPathIndex]).make_preferred(); - if (!CopyData.TargetChunkLocationPtrs.empty()) - { - uint64_t CacheLocalFileBytesRead = 0; - - size_t TargetStart = 0; - const std::span<const ChunkedContentLookup::ChunkLocation* const> AllTargets( - CopyData.TargetChunkLocationPtrs); - - struct WriteOp - { - const ChunkedContentLookup::ChunkLocation* Target; - uint64_t CacheFileOffset; - uint64_t ChunkSize; - }; - - std::vector<WriteOp> WriteOps; - WriteOps.reserve(AllTargets.size()); - - for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets) - { - std::span<const ChunkedContentLookup::ChunkLocation* const> TargetRange = - AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount); - for (const ChunkedContentLookup::ChunkLocation* Target : TargetRange) - { - WriteOps.push_back(WriteOp{.Target = Target, - .CacheFileOffset = ChunkTarget.CacheFileOffset, - .ChunkSize = ChunkTarget.ChunkRawSize}); - } - TargetStart += ChunkTarget.TargetChunkLocationCount; - } - - std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) { - if (Lhs.Target->PathIndex < Rhs.Target->PathIndex) - { - return true; - } - else if (Lhs.Target->PathIndex > Rhs.Target->PathIndex) - { - return false; - } - if (Lhs.Target->Offset < Rhs.Target->Offset) - { - return true; - } - return false; - }); - - { - BufferedOpenFile SourceFile(LocalFilePath); - WriteFileCache OpenFileCache; - for (const WriteOp& Op : WriteOps) - { - if (AbortFlag) - { - break; - } - const uint32_t RemotePathIndex = Op.Target->PathIndex; - const uint64_t ChunkSize = Op.ChunkSize; - CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ChunkSize); - - ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]); - - OpenFileCache.WriteToFile<CompositeBuffer>( - RemotePathIndex, - [&CacheFolderPath, &RemoteContent](uint32_t TargetIndex) { - return (CacheFolderPath / RemoteContent.RawHashes[TargetIndex].ToHexString()) - .make_preferred(); - }, - ChunkSource, - Op.Target->Offset, - RemoteContent.RawSizes[RemotePathIndex]); - BytesWritten += ChunkSize; - WriteToDiskBytes += ChunkSize; - CacheLocalFileBytesRead += ChunkSize; // TODO: This should be the sum of unique chunk sizes? - } - } - if (!AbortFlag) - { - ChunkCountWritten += gsl::narrow<uint32_t>(CopyData.ChunkTargets.size()); - ZEN_DEBUG("Copied {} from {}", - NiceBytes(CacheLocalFileBytesRead), - LocalContent.Paths[CopyData.LocalPathIndex]); - } - } - } - }, - Work.DefaultErrorFunction()); - } - for (const IoHash ChunkHash : LooseChunkHashes) { if (AbortFlag) @@ -3631,8 +3809,10 @@ namespace { break; } - uint32_t RemoteChunkIndex = RemoteLookup.ChunkHashToChunkIndex.at(ChunkHash); - if (RemoteChunkIndexWantsCopyFromCacheFlags[RemoteChunkIndex]) + auto RemoteChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); + ZEN_ASSERT(RemoteChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end()); + const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second; + if (RemoteChunkIndexIsCachedFlags[RemoteChunkIndex]) { ZEN_DEBUG("Skipping chunk {} due to cache reuse", ChunkHash); continue; @@ -3640,8 +3820,8 @@ namespace { bool NeedsCopy = true; if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex].compare_exchange_strong(NeedsCopy, false)) { - std::vector<const ChunkedContentLookup::ChunkLocation*> ChunkTargetPtrs = - GetRemainingChunkTargets(RemotePathIndexWantsCopyFromCacheFlags, RemoteLookup, RemoteChunkIndex); + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = + GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex); if (ChunkTargetPtrs.empty()) { @@ -3654,6 +3834,7 @@ namespace { [&, ChunkHash, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) { if (!AbortFlag) { + FilteredDownloadedBytesPerSecond.Start(); if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize) { DownloadLargeBlob(Storage, @@ -3665,10 +3846,10 @@ namespace { ChunkHash, PreferredMultipartChunkSize, ChunkTargetPtrs, + SequenceIndexChunksLeftToWriteCounters, Work, WritePool, NetworkPool, - BytesWritten, WriteToDiskBytes, BytesDownloaded, LooseChunksBytes, @@ -3698,6 +3879,7 @@ namespace { std::atomic<bool>&) { if (!AbortFlag) { + FilteredWrittenBytesPerSecond.Start(); uint64_t TotalBytesWritten = 0; SharedBuffer Chunk = Decompress(CompressedPart, @@ -3708,14 +3890,45 @@ namespace { WriteFileCache OpenFileCache; WriteChunkToDisk(CacheFolderPath, RemoteContent, + RemoteLookup, ChunkTargetPtrs, CompositeBuffer(Chunk), OpenFileCache, TotalBytesWritten); } - ChunkCountWritten++; - BytesWritten += TotalBytesWritten; - WriteToDiskBytes += TotalBytesWritten; + if (!AbortFlag) + { + // Write tracking, updating this must be done without any files open + // (WriteFileCache) + for (const ChunkedContentLookup::ChunkSequenceLocation* Location : + ChunkTargetPtrs) + { + const uint32_t RemoteSequenceIndex = Location->SequenceIndex; + if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub( + 1) == 1) + { + const IoHash& SequenceRawHash = + RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + const IoHash VerifyChunkHash = + IoHash::HashBuffer(IoBufferBuilder::MakeFromFile( + GetTempChunkedSequenceFileName(CacheFolderPath, + SequenceRawHash))); + if (VerifyChunkHash != SequenceRawHash) + { + throw std::runtime_error(fmt::format( + "Written hunk sequence {} hash does not match expected hash {}", + VerifyChunkHash, + SequenceRawHash)); + } + std::filesystem::rename( + GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash), + GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)); + } + } + + ChunkCountWritten++; + WriteToDiskBytes += TotalBytesWritten; + } } }, Work.DefaultErrorFunction()); @@ -3728,6 +3941,139 @@ namespace { } } + for (size_t CopyDataIndex = 0; CopyDataIndex < CacheCopyDatas.size(); CopyDataIndex++) + { + if (AbortFlag) + { + break; + } + + Work.ScheduleWork( + WritePool, // GetSyncWorkerPool(),// + [&, CopyDataIndex](std::atomic<bool>&) { + if (!AbortFlag) + { + FilteredWrittenBytesPerSecond.Start(); + const CacheCopyData& CopyData = CacheCopyDatas[CopyDataIndex]; + const uint32_t LocalPathIndex = LocalLookup.SequenceIndexFirstPathIndex[CopyData.LocalSequenceIndex]; + const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred(); + if (!CopyData.TargetChunkLocationPtrs.empty()) + { + uint64_t CacheLocalFileBytesRead = 0; + + size_t TargetStart = 0; + const std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> AllTargets( + CopyData.TargetChunkLocationPtrs); + + struct WriteOp + { + const ChunkedContentLookup::ChunkSequenceLocation* Target; + uint64_t CacheFileOffset; + uint64_t ChunkSize; + }; + + std::vector<WriteOp> WriteOps; + WriteOps.reserve(AllTargets.size()); + + for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets) + { + std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> TargetRange = + AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount); + for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange) + { + WriteOps.push_back(WriteOp{.Target = Target, + .CacheFileOffset = ChunkTarget.CacheFileOffset, + .ChunkSize = ChunkTarget.ChunkRawSize}); + } + TargetStart += ChunkTarget.TargetChunkLocationCount; + } + + std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) { + if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex) + { + return true; + } + else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex) + { + return false; + } + if (Lhs.Target->Offset < Rhs.Target->Offset) + { + return true; + } + return false; + }); + + if (!AbortFlag) + { + BufferedOpenFile SourceFile(LocalFilePath); + WriteFileCache OpenFileCache; + for (const WriteOp& Op : WriteOps) + { + if (AbortFlag) + { + break; + } + const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex; + ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() <= + RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]); + ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() > 0); + const uint32_t RemotePathIndex = RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]; + const uint64_t ChunkSize = Op.ChunkSize; + CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ChunkSize); + + ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]); + + OpenFileCache.WriteToFile<CompositeBuffer>( + RemoteSequenceIndex, + [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) { + return GetTempChunkedSequenceFileName( + CacheFolderPath, + RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); + }, + ChunkSource, + Op.Target->Offset, + RemoteContent.RawSizes[RemotePathIndex]); + WriteToDiskBytes += ChunkSize; + CacheLocalFileBytesRead += ChunkSize; // TODO: This should be the sum of unique chunk sizes? + } + } + if (!AbortFlag) + { + if (!AbortFlag) + { + // Write tracking, updating this must be done without any files open (WriteFileCache) + for (const WriteOp& Op : WriteOps) + { + const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex; + if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1) + { + const IoHash& SequenceRawHash = + RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile( + GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash))); + if (VerifyChunkHash != SequenceRawHash) + { + throw std::runtime_error( + fmt::format("Written hunk sequence {} hash does not match expected hash {}", + VerifyChunkHash, + SequenceRawHash)); + } + std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash), + GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)); + } + } + } + + ChunkCountWritten += gsl::narrow<uint32_t>(CopyData.ChunkTargets.size()); + ZEN_DEBUG("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), LocalContent.Paths[LocalPathIndex]); + } + } + } + }, + Work.DefaultErrorFunction()); + } + size_t BlockCount = BlockDescriptions.size(); std::atomic<size_t> BlocksComplete = 0; @@ -3762,6 +4108,7 @@ namespace { [&, BlockIndex](std::atomic<bool>&) { if (!AbortFlag) { + FilteredDownloadedBytesPerSecond.Start(); IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockDescriptions[BlockIndex].BlockHash); if (!BlockBuffer) { @@ -3781,6 +4128,7 @@ namespace { [&, BlockIndex, BlockBuffer = std::move(Payload)](std::atomic<bool>&) { if (!AbortFlag) { + FilteredWrittenBytesPerSecond.Start(); IoHash BlockRawHash; uint64_t BlockRawSize; CompressedBuffer CompressedBlockBuffer = @@ -3812,14 +4160,13 @@ namespace { uint32_t ChunksReadFromBlock = 0; if (WriteBlockToDisk(CacheFolderPath, RemoteContent, - RemotePathIndexWantsCopyFromCacheFlags, + SequenceIndexChunksLeftToWriteCounters, DecompressedBlockBuffer, RemoteLookup, RemoteChunkIndexNeedsCopyFromSourceFlags.data(), ChunksReadFromBlock, BytesWrittenToDisk)) { - BytesWritten += BytesWrittenToDisk; WriteToDiskBytes += BytesWrittenToDisk; ChunkCountWritten += ChunksReadFromBlock; } @@ -3851,20 +4198,27 @@ namespace { Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted, PendingWork); ZEN_ASSERT(ChunkCountToWrite >= ChunkCountWritten.load()); - WriteProgressBar.UpdateState( - {.Task = "Writing chunks ", - .Details = fmt::format("Written {} chunks out of {}. {} ouf of {} blocks complete. Downloaded: {}. Written: {}", - ChunkCountWritten.load(), - ChunkCountToWrite, - BlocksComplete.load(), - BlocksNeededCount, - NiceBytes(BytesDownloaded.load()), - NiceBytes(BytesWritten.load())), - .TotalCount = gsl::narrow<uint64_t>(ChunkCountToWrite), - .RemainingCount = gsl::narrow<uint64_t>(ChunkCountToWrite - ChunkCountWritten.load())}, - false); + FilteredWrittenBytesPerSecond.Update(WriteToDiskBytes.load()); + FilteredDownloadedBytesPerSecond.Update(BytesDownloaded.load()); + std::string Details = fmt::format("{}/{} chunks. {}/{} blocks. {} {}bits/s downloaded. {} {}B/s written", + ChunkCountWritten.load(), + ChunkCountToWrite, + BlocksComplete.load(), + BlocksNeededCount, + NiceBytes(BytesDownloaded.load()), + NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8), + NiceBytes(WriteToDiskBytes.load()), + NiceNum(FilteredWrittenBytesPerSecond.GetCurrent())); + WriteProgressBar.UpdateState({.Task = "Writing chunks ", + .Details = Details, + .TotalCount = gsl::narrow<uint64_t>(ChunkCountToWrite), + .RemainingCount = gsl::narrow<uint64_t>(ChunkCountToWrite - ChunkCountWritten.load())}, + false); }); + FilteredWrittenBytesPerSecond.Stop(); + FilteredDownloadedBytesPerSecond.Stop(); + if (AbortFlag) { return; @@ -3873,6 +4227,11 @@ namespace { WriteProgressBar.Finish(); } + for (const auto& SequenceIndexChunksLeftToWriteCounter : SequenceIndexChunksLeftToWriteCounters) + { + ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounter.load() == 0); + } + std::vector<std::pair<IoHash, uint32_t>> Targets; Targets.reserve(RemoteContent.Paths.size()); for (uint32_t RemotePathIndex = 0; RemotePathIndex < RemoteContent.Paths.size(); RemotePathIndex++) @@ -3884,13 +4243,13 @@ namespace { }); // Move all files we will reuse to cache folder - for (auto It : RawHashToLocalPathIndex) + for (uint32_t LocalPathIndex = 0; LocalPathIndex < LocalContent.Paths.size(); LocalPathIndex++) { - const IoHash& RawHash = It.first; - if (RemoteLookup.RawHashToSequenceRawHashIndex.contains(RawHash)) + const IoHash& RawHash = LocalContent.RawHashes[LocalPathIndex]; + if (RemoteLookup.RawHashToSequenceIndex.contains(RawHash)) { - const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[It.second]).make_preferred(); - const std::filesystem::path CacheFilePath = (CacheFolderPath / RawHash.ToHexString()).make_preferred(); + const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred(); + const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash); ZEN_ASSERT_SLOW(std::filesystem::exists(LocalFilePath)); SetFileReadOnly(LocalFilePath, false); std::filesystem::rename(LocalFilePath, CacheFilePath); @@ -3989,7 +4348,7 @@ namespace { } else { - const std::filesystem::path CacheFilePath = (CacheFolderPath / RawHash.ToHexString()).make_preferred(); + const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash); ZEN_ASSERT_SLOW(std::filesystem::exists(CacheFilePath)); CreateDirectories(FirstTargetFilePath.parent_path()); if (std::filesystem::exists(FirstTargetFilePath)) @@ -4045,12 +4404,12 @@ namespace { Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted, PendingWork); - RebuildProgressBar.UpdateState( - {.Task = "Rebuilding state ", - .Details = fmt::format("Written {} files out of {}", TargetsComplete.load(), Targets.size()), - .TotalCount = gsl::narrow<uint64_t>(Targets.size()), - .RemainingCount = gsl::narrow<uint64_t>(Targets.size() - TargetsComplete.load())}, - false); + std::string Details = fmt::format("{}/{} files", TargetsComplete.load(), Targets.size()); + RebuildProgressBar.UpdateState({.Task = "Rebuilding state ", + .Details = Details, + .TotalCount = gsl::narrow<uint64_t>(Targets.size()), + .RemainingCount = gsl::narrow<uint64_t>(Targets.size() - TargetsComplete.load())}, + false); }); if (AbortFlag) @@ -4487,20 +4846,19 @@ namespace { UsePlainProgress ? 5000 : 200, [&](bool, std::ptrdiff_t) { FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load()); - - ProgressBar.UpdateState( - {.Task = "Scanning files ", - .Details = fmt::format("{}/{} ({}/{}, {}B/s) files, {} ({}) chunks found", - ChunkingStats.FilesProcessed.load(), - UpdatedContent.Paths.size(), - NiceBytes(ChunkingStats.BytesHashed.load()), - NiceBytes(ByteCountToScan), - NiceNum(FilteredBytesHashed.GetCurrent()), - ChunkingStats.UniqueChunksFound.load(), - NiceBytes(ChunkingStats.UniqueBytesFound.load())), - .TotalCount = ByteCountToScan, - .RemainingCount = ByteCountToScan - ChunkingStats.BytesHashed.load()}, - false); + std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found", + ChunkingStats.FilesProcessed.load(), + UpdatedContent.Paths.size(), + NiceBytes(ChunkingStats.BytesHashed.load()), + NiceBytes(ByteCountToScan), + NiceNum(FilteredBytesHashed.GetCurrent()), + ChunkingStats.UniqueChunksFound.load(), + NiceBytes(ChunkingStats.UniqueBytesFound.load())); + ProgressBar.UpdateState({.Task = "Scanning files ", + .Details = Details, + .TotalCount = ByteCountToScan, + .RemainingCount = ByteCountToScan - ChunkingStats.BytesHashed.load()}, + false); }, AbortFlag); if (AbortFlag) @@ -4567,15 +4925,16 @@ namespace { UsePlainProgress ? 5000 : 200, [&](bool, std::ptrdiff_t) { FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load()); + std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found", + ChunkingStats.FilesProcessed.load(), + CurrentLocalFolderContent.Paths.size(), + NiceBytes(ChunkingStats.BytesHashed.load()), + ByteCountToScan, + NiceNum(FilteredBytesHashed.GetCurrent()), + ChunkingStats.UniqueChunksFound.load(), + NiceBytes(ChunkingStats.UniqueBytesFound.load())); ProgressBar.UpdateState({.Task = "Scanning files ", - .Details = fmt::format("{}/{} ({}/{}, {}B/s) files, {} ({}) chunks found", - ChunkingStats.FilesProcessed.load(), - CurrentLocalFolderContent.Paths.size(), - NiceBytes(ChunkingStats.BytesHashed.load()), - ByteCountToScan, - NiceNum(FilteredBytesHashed.GetCurrent()), - ChunkingStats.UniqueChunksFound.load(), - NiceBytes(ChunkingStats.UniqueBytesFound.load())), + .Details = Details, .TotalCount = ByteCountToScan, .RemainingCount = (ByteCountToScan - ChunkingStats.BytesHashed.load())}, false); @@ -4599,7 +4958,8 @@ namespace { std::span<const std::string> BuildPartNames, const std::filesystem::path& Path, bool AllowMultiparts, - bool WipeTargetFolder) + bool WipeTargetFolder, + bool PostDownloadVerify) { Stopwatch DownloadTimer; @@ -4610,8 +4970,10 @@ namespace { std::filesystem::remove(ZenTempFolder); }); CreateDirectories(Path / ZenTempBlockFolderName); - CreateDirectories(Path / ZenTempChunkFolderName); - CreateDirectories(Path / ZenTempCacheFolderName); + CreateDirectories(Path / ZenTempChunkFolderName); // TODO: Don't clear this - pick up files -> chunks to use + CreateDirectories(Path / + ZenTempCacheFolderName); // TODO: Don't clear this - pick up files and use as sequences (non .tmp extension) and + // delete .tmp (maybe?) - chunk them? How do we know the file is worth chunking? std::uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u; @@ -4724,7 +5086,7 @@ namespace { if (!AbortFlag) { - VerifyFolder(RemoteContent, Path); + VerifyFolder(RemoteContent, Path, PostDownloadVerify); Stopwatch WriteStateTimer; CbObject StateObject = CreateStateObject(BuildId, AllBuildParts, PartContents, LocalFolderState); @@ -5078,6 +5440,8 @@ BuildsCommand::BuildsCommand() "Path to a text file with one line of <local path>[TAB]<modification date> per file to include.", cxxopts::value(m_ManifestPath), "<manifestpath>"); + m_UploadOptions + .add_option("", "", "verify", "Enable post upload verify of all uploaded data", cxxopts::value(m_PostUploadVerify), "<verify>"); m_UploadOptions.parse_positional({"local-path", "build-id"}); m_UploadOptions.positional_help("local-path build-id"); @@ -5111,6 +5475,8 @@ BuildsCommand::BuildsCommand() "Allow large attachments to be transfered using multipart protocol. Defaults to true.", cxxopts::value(m_AllowMultiparts), "<allowmultipart>"); + m_DownloadOptions + .add_option("", "", "verify", "Enable post download verify of all tracked files", cxxopts::value(m_PostDownloadVerify), "<verify>"); m_DownloadOptions.parse_positional({"local-path", "build-id", "build-part-name"}); m_DownloadOptions.positional_help("local-path build-id build-part-name"); @@ -5503,7 +5869,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) m_AllowMultiparts, MetaData, m_CreateBuild, - m_Clean); + m_Clean, + m_PostUploadVerify); if (false) { @@ -5593,7 +5960,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); } - DownloadFolder(*Storage, BuildId, BuildPartIds, m_BuildPartNames, m_Path, m_AllowMultiparts, m_Clean); + DownloadFolder(*Storage, BuildId, BuildPartIds, m_BuildPartNames, m_Path, m_AllowMultiparts, m_Clean, m_PostDownloadVerify); if (false) { @@ -5727,8 +6094,9 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) m_BlockReuseMinPercentLimit, m_AllowMultiparts, MetaData, - m_CreateBuild, - m_Clean); + true, + false, + true); if (AbortFlag) { ZEN_CONSOLE("Upload failed."); @@ -5737,7 +6105,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) const std::filesystem::path DownloadPath = m_Path.parent_path() / (m_BuildPartName + "_download"); ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}'", BuildId, BuildPartId, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, true); + DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, true, true); if (AbortFlag) { ZEN_CONSOLE("Download failed."); @@ -5749,7 +6117,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) BuildPartId, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false); + DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false, true); if (AbortFlag) { ZEN_CONSOLE("Re-download failed. (identical target)"); @@ -5851,7 +6219,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) BuildPartId, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false); + DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false, true); if (AbortFlag) { ZEN_CONSOLE("Re-download failed. (scrambled target)"); @@ -5880,7 +6248,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) m_AllowMultiparts, MetaData2, true, - false); + false, + true); if (AbortFlag) { ZEN_CONSOLE("Upload of scrambled failed."); @@ -5888,7 +6257,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (original)", BuildId, BuildPartId, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false); + DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false, true); if (AbortFlag) { ZEN_CONSOLE("Re-download failed."); @@ -5896,7 +6265,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, BuildId2, {BuildPartId2}, {}, DownloadPath, m_AllowMultiparts, false); + DownloadFolder(*Storage, BuildId2, {BuildPartId2}, {}, DownloadPath, m_AllowMultiparts, false, true); if (AbortFlag) { ZEN_CONSOLE("Re-download failed."); @@ -5904,7 +6273,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } ZEN_CONSOLE("\nRe-download Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath); - DownloadFolder(*Storage, BuildId2, {BuildPartId2}, {}, DownloadPath, m_AllowMultiparts, false); + DownloadFolder(*Storage, BuildId2, {BuildPartId2}, {}, DownloadPath, m_AllowMultiparts, false, true); if (AbortFlag) { ZEN_CONSOLE("Re-download failed."); @@ -6032,64 +6401,9 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); } - Oid BuildPartId = Oid::TryFromHexString(m_BuildPartId); - CbObject Build = Storage->GetBuild(BuildId); - if (!m_BuildPartName.empty()) - { - BuildPartId = Build["parts"sv].AsObjectView()[m_BuildPartName].AsObjectId(); - if (BuildPartId == Oid::Zero) - { - throw std::runtime_error(fmt::format("Build {} does not have a part named '{}'", m_BuildId, m_BuildPartName)); - } - } - CbObject BuildPart = Storage->GetBuildPart(BuildId, BuildPartId); - ZEN_CONSOLE("Validating build part {}/{} ({})", BuildId, BuildPartId, NiceBytes(BuildPart.GetSize())); - std::vector<IoHash> ChunkAttachments; - for (CbFieldView LooseFileView : BuildPart["chunkAttachments"sv].AsObjectView()["rawHashes"sv]) - { - ChunkAttachments.push_back(LooseFileView.AsBinaryAttachment()); - } - std::vector<IoHash> BlockAttachments; - for (CbFieldView BlocksView : BuildPart["blockAttachments"sv].AsObjectView()["rawHashes"sv]) - { - BlockAttachments.push_back(BlocksView.AsBinaryAttachment()); - } + Oid BuildPartId = Oid::TryFromHexString(m_BuildPartId); - for (const IoHash& ChunkAttachment : ChunkAttachments) - { - uint64_t CompressedSize; - uint64_t DecompressedSize; - try - { - ValidateBlob(*Storage, BuildId, ChunkAttachment, CompressedSize, DecompressedSize); - ZEN_CONSOLE("Chunk attachment {} ({} -> {}) is valid", - ChunkAttachment, - NiceBytes(CompressedSize), - NiceBytes(DecompressedSize)); - } - catch (const std::exception& Ex) - { - ZEN_CONSOLE("Failed validating chunk attachment {}: {}", ChunkAttachment, Ex.what()); - } - } - - for (const IoHash& BlockAttachment : BlockAttachments) - { - uint64_t CompressedSize; - uint64_t DecompressedSize; - try - { - ValidateChunkBlock(*Storage, BuildId, BlockAttachment, CompressedSize, DecompressedSize); - ZEN_CONSOLE("Block attachment {} ({} -> {}) is valid", - BlockAttachment, - NiceBytes(CompressedSize), - NiceBytes(DecompressedSize)); - } - catch (const std::exception& Ex) - { - ZEN_CONSOLE("Failed validating block attachment {}: {}", BlockAttachment, Ex.what()); - } - } + ValidateBuildPart(*Storage, BuildId, BuildPartId, m_BuildPartName); return AbortFlag ? 13 : 0; } diff --git a/src/zen/cmds/builds_cmd.h b/src/zen/cmds/builds_cmd.h index fa223943b..c54fb4db1 100644 --- a/src/zen/cmds/builds_cmd.h +++ b/src/zen/cmds/builds_cmd.h @@ -78,10 +78,12 @@ private: std::filesystem::path m_Path; cxxopts::Options m_UploadOptions{"upload", "Upload a folder"}; + bool m_PostUploadVerify = false; cxxopts::Options m_DownloadOptions{"download", "Download a folder"}; std::vector<std::string> m_BuildPartNames; std::vector<std::string> m_BuildPartIds; + bool m_PostDownloadVerify = false; cxxopts::Options m_DiffOptions{"diff", "Compare two local folders"}; std::filesystem::path m_DiffPath; diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp index bb15a6fce..7f7e70fef 100644 --- a/src/zenhttp/httpclient.cpp +++ b/src/zenhttp/httpclient.cpp @@ -414,6 +414,11 @@ ValidatePayload(cpr::Response& Response, std::unique_ptr<detail::TempPayloadFile } } + if (Response.status_code == (long)HttpResponseCode::PartialContent) + { + return true; + } + if (auto ContentType = Response.header.find("Content-Type"); ContentType != Response.header.end()) { if (ContentType->second == "application/x-ue-comp") diff --git a/src/zenutil/chunkedcontent.cpp b/src/zenutil/chunkedcontent.cpp index 6dc2a20d8..1552ea823 100644 --- a/src/zenutil/chunkedcontent.cpp +++ b/src/zenutil/chunkedcontent.cpp @@ -599,10 +599,10 @@ MergeChunkedFolderContents(const ChunkedFolderContent& Base, std::span<const Chu { RawHashToSequenceRawHashIndex.insert( {RawHash, gsl::narrow<uint32_t>(Result.ChunkedContent.SequenceRawHashes.size())}); - const uint32_t SequenceRawHashIndex = OverlayLookup.RawHashToSequenceRawHashIndex.at(RawHash); - const uint32_t OrderIndexOffset = OverlayLookup.SequenceRawHashIndexChunkOrderOffset[SequenceRawHashIndex]; - const uint32_t ChunkCount = OverlayContent.ChunkedContent.ChunkCounts[SequenceRawHashIndex]; - ChunkingStatistics Stats; + const uint32_t SequenceRawHashIndex = OverlayLookup.RawHashToSequenceIndex.at(RawHash); + const uint32_t OrderIndexOffset = OverlayLookup.SequenceIndexChunkOrderOffset[SequenceRawHashIndex]; + const uint32_t ChunkCount = OverlayContent.ChunkedContent.ChunkCounts[SequenceRawHashIndex]; + ChunkingStatistics Stats; std::span<const uint32_t> OriginalChunkOrder = std::span<const uint32_t>(OverlayContent.ChunkedContent.ChunkOrders).subspan(OrderIndexOffset, ChunkCount); AddCunkSequence(Stats, @@ -667,9 +667,9 @@ DeletePathsFromChunkedContent(const ChunkedFolderContent& BaseContent, std::span { RawHashToSequenceRawHashIndex.insert( {RawHash, gsl::narrow<uint32_t>(Result.ChunkedContent.SequenceRawHashes.size())}); - const uint32_t SequenceRawHashIndex = BaseLookup.RawHashToSequenceRawHashIndex.at(RawHash); - const uint32_t OrderIndexOffset = BaseLookup.SequenceRawHashIndexChunkOrderOffset[SequenceRawHashIndex]; - const uint32_t ChunkCount = BaseContent.ChunkedContent.ChunkCounts[SequenceRawHashIndex]; + const uint32_t SequenceRawHashIndex = BaseLookup.RawHashToSequenceIndex.at(RawHash); + const uint32_t OrderIndexOffset = BaseLookup.SequenceIndexChunkOrderOffset[SequenceRawHashIndex]; + const uint32_t ChunkCount = BaseContent.ChunkedContent.ChunkCounts[SequenceRawHashIndex]; ChunkingStatistics Stats; std::span<const uint32_t> OriginalChunkOrder = std::span<const uint32_t>(BaseContent.ChunkedContent.ChunkOrders).subspan(OrderIndexOffset, ChunkCount); @@ -777,46 +777,40 @@ BuildChunkedContentLookup(const ChunkedFolderContent& Content) { struct ChunkLocationReference { - uint32_t ChunkIndex; - ChunkedContentLookup::ChunkLocation Location; + uint32_t ChunkIndex; + ChunkedContentLookup::ChunkSequenceLocation Location; }; ChunkedContentLookup Result; { const uint32_t SequenceRawHashesCount = gsl::narrow<uint32_t>(Content.ChunkedContent.SequenceRawHashes.size()); - Result.RawHashToSequenceRawHashIndex.reserve(SequenceRawHashesCount); - Result.SequenceRawHashIndexChunkOrderOffset.reserve(SequenceRawHashesCount); + Result.RawHashToSequenceIndex.reserve(SequenceRawHashesCount); + Result.SequenceIndexChunkOrderOffset.reserve(SequenceRawHashesCount); uint32_t OrderOffset = 0; for (uint32_t SequenceRawHashIndex = 0; SequenceRawHashIndex < Content.ChunkedContent.SequenceRawHashes.size(); SequenceRawHashIndex++) { - Result.RawHashToSequenceRawHashIndex.insert( - {Content.ChunkedContent.SequenceRawHashes[SequenceRawHashIndex], SequenceRawHashIndex}); - Result.SequenceRawHashIndexChunkOrderOffset.push_back(OrderOffset); + Result.RawHashToSequenceIndex.insert({Content.ChunkedContent.SequenceRawHashes[SequenceRawHashIndex], SequenceRawHashIndex}); + Result.SequenceIndexChunkOrderOffset.push_back(OrderOffset); OrderOffset += Content.ChunkedContent.ChunkCounts[SequenceRawHashIndex]; } } std::vector<ChunkLocationReference> Locations; Locations.reserve(Content.ChunkedContent.ChunkOrders.size()); - for (uint32_t PathIndex = 0; PathIndex < Content.Paths.size(); PathIndex++) + for (uint32_t SequenceIndex = 0; SequenceIndex < Content.ChunkedContent.SequenceRawHashes.size(); SequenceIndex++) { - if (Content.RawSizes[PathIndex] > 0) + const uint32_t OrderOffset = Result.SequenceIndexChunkOrderOffset[SequenceIndex]; + const uint32_t ChunkCount = Content.ChunkedContent.ChunkCounts[SequenceIndex]; + uint64_t LocationOffset = 0; + for (size_t OrderIndex = OrderOffset; OrderIndex < OrderOffset + ChunkCount; OrderIndex++) { - const IoHash& RawHash = Content.RawHashes[PathIndex]; - uint32_t SequenceRawHashIndex = Result.RawHashToSequenceRawHashIndex.at(RawHash); - const uint32_t OrderOffset = Result.SequenceRawHashIndexChunkOrderOffset[SequenceRawHashIndex]; - const uint32_t ChunkCount = Content.ChunkedContent.ChunkCounts[SequenceRawHashIndex]; - uint64_t LocationOffset = 0; - for (size_t OrderIndex = OrderOffset; OrderIndex < OrderOffset + ChunkCount; OrderIndex++) - { - uint32_t ChunkIndex = Content.ChunkedContent.ChunkOrders[OrderIndex]; + uint32_t ChunkIndex = Content.ChunkedContent.ChunkOrders[OrderIndex]; - Locations.push_back(ChunkLocationReference{ChunkIndex, ChunkedContentLookup::ChunkLocation{PathIndex, LocationOffset}}); + Locations.push_back( + ChunkLocationReference{ChunkIndex, ChunkedContentLookup::ChunkSequenceLocation{SequenceIndex, LocationOffset}}); - LocationOffset += Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; - } - ZEN_ASSERT(LocationOffset == Content.RawSizes[PathIndex]); + LocationOffset += Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; } } @@ -829,18 +823,18 @@ BuildChunkedContentLookup(const ChunkedFolderContent& Content) { return false; } - if (Lhs.Location.PathIndex < Rhs.Location.PathIndex) + if (Lhs.Location.SequenceIndex < Rhs.Location.SequenceIndex) { return true; } - if (Lhs.Location.PathIndex > Rhs.Location.PathIndex) + if (Lhs.Location.SequenceIndex > Rhs.Location.SequenceIndex) { return false; } return Lhs.Location.Offset < Rhs.Location.Offset; }); - Result.ChunkLocations.reserve(Locations.size()); + Result.ChunkSequenceLocations.reserve(Locations.size()); const uint32_t ChunkCount = gsl::narrow<uint32_t>(Content.ChunkedContent.ChunkHashes.size()); Result.ChunkHashToChunkIndex.reserve(ChunkCount); size_t RangeOffset = 0; @@ -850,14 +844,30 @@ BuildChunkedContentLookup(const ChunkedFolderContent& Content) uint32_t Count = 0; while (Locations[RangeOffset + Count].ChunkIndex == ChunkIndex) { - Result.ChunkLocations.push_back(Locations[RangeOffset + Count].Location); + Result.ChunkSequenceLocations.push_back(Locations[RangeOffset + Count].Location); Count++; } - Result.ChunkLocationOffset.push_back(RangeOffset); - Result.ChunkLocationCounts.push_back(Count); + Result.ChunkSequenceLocationOffset.push_back(RangeOffset); + Result.ChunkSequenceLocationCounts.push_back(Count); RangeOffset += Count; } + Result.SequenceIndexFirstPathIndex.resize(Content.ChunkedContent.SequenceRawHashes.size(), (uint32_t)-1); + for (uint32_t PathIndex = 0; PathIndex < Content.Paths.size(); PathIndex++) + { + if (Content.RawSizes[PathIndex] > 0) + { + const IoHash& RawHash = Content.RawHashes[PathIndex]; + auto SequenceIndexIt = Result.RawHashToSequenceIndex.find(RawHash); + ZEN_ASSERT(SequenceIndexIt != Result.RawHashToSequenceIndex.end()); + const uint32_t SequenceIndex = SequenceIndexIt->second; + if (Result.SequenceIndexFirstPathIndex[SequenceIndex] == (uint32_t)-1) + { + Result.SequenceIndexFirstPathIndex[SequenceIndex] = PathIndex; + } + } + } + return Result; } diff --git a/src/zenutil/include/zenutil/chunkedcontent.h b/src/zenutil/include/zenutil/chunkedcontent.h index 15c687462..309341550 100644 --- a/src/zenutil/include/zenutil/chunkedcontent.h +++ b/src/zenutil/include/zenutil/chunkedcontent.h @@ -122,32 +122,59 @@ ChunkedFolderContent ChunkFolderContent(ChunkingStatistics& Stats, struct ChunkedContentLookup { - struct ChunkLocation + struct ChunkSequenceLocation { - uint32_t PathIndex; + uint32_t SequenceIndex; uint64_t Offset; }; tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkHashToChunkIndex; - tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> RawHashToSequenceRawHashIndex; - std::vector<uint32_t> SequenceRawHashIndexChunkOrderOffset; - std::vector<ChunkLocation> ChunkLocations; - std::vector<size_t> ChunkLocationOffset; // ChunkLocations[ChunkLocationOffset[ChunkIndex]] -> start of sources for ChunkIndex - std::vector<uint32_t> ChunkLocationCounts; // ChunkLocationCounts[ChunkIndex] count of chunk locations for ChunkIndex + tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> RawHashToSequenceIndex; + std::vector<uint32_t> SequenceIndexChunkOrderOffset; + std::vector<ChunkSequenceLocation> ChunkSequenceLocations; + std::vector<size_t> + ChunkSequenceLocationOffset; // ChunkSequenceLocations[ChunkLocationOffset[ChunkIndex]] -> start of sources for ChunkIndex + std::vector<uint32_t> ChunkSequenceLocationCounts; // ChunkSequenceLocationCounts[ChunkIndex] count of chunk locations for ChunkIndex + std::vector<uint32_t> SequenceIndexFirstPathIndex; // SequenceIndexFirstPathIndex[SequenceIndex] -> first path index with that RawHash }; ChunkedContentLookup BuildChunkedContentLookup(const ChunkedFolderContent& Content); inline std::pair<size_t, uint32_t> -GetChunkLocationRange(const ChunkedContentLookup& Lookup, uint32_t ChunkIndex) +GetChunkSequenceLocationRange(const ChunkedContentLookup& Lookup, uint32_t ChunkIndex) { - return std::make_pair(Lookup.ChunkLocationOffset[ChunkIndex], Lookup.ChunkLocationCounts[ChunkIndex]); + return std::make_pair(Lookup.ChunkSequenceLocationOffset[ChunkIndex], Lookup.ChunkSequenceLocationCounts[ChunkIndex]); } -inline std::span<const ChunkedContentLookup::ChunkLocation> -GetChunkLocations(const ChunkedContentLookup& Lookup, uint32_t ChunkIndex) +inline std::span<const ChunkedContentLookup::ChunkSequenceLocation> +GetChunkSequenceLocations(const ChunkedContentLookup& Lookup, uint32_t ChunkIndex) { - std::pair<size_t, uint32_t> Range = GetChunkLocationRange(Lookup, ChunkIndex); - return std::span<const ChunkedContentLookup::ChunkLocation>(Lookup.ChunkLocations).subspan(Range.first, Range.second); + std::pair<size_t, uint32_t> Range = GetChunkSequenceLocationRange(Lookup, ChunkIndex); + return std::span<const ChunkedContentLookup::ChunkSequenceLocation>(Lookup.ChunkSequenceLocations).subspan(Range.first, Range.second); +} + +inline uint32_t +GetSequenceIndexForRawHash(const ChunkedContentLookup& Lookup, const IoHash& RawHash) +{ + return Lookup.RawHashToSequenceIndex.at(RawHash); +} + +inline uint32_t +GetChunkIndexForRawHash(const ChunkedContentLookup& Lookup, const IoHash& RawHash) +{ + return Lookup.RawHashToSequenceIndex.at(RawHash); +} + +inline uint32_t +GetFirstPathIndexForSeqeuenceIndex(const ChunkedContentLookup& Lookup, const uint32_t SequenceIndex) +{ + return Lookup.SequenceIndexFirstPathIndex[SequenceIndex]; +} + +inline uint32_t +GetFirstPathIndexForRawHash(const ChunkedContentLookup& Lookup, const IoHash& RawHash) +{ + const uint32_t SequenceIndex = GetSequenceIndexForRawHash(Lookup, RawHash); + return GetFirstPathIndexForSeqeuenceIndex(Lookup, SequenceIndex); } namespace compactbinary_helpers { |