diff options
| author | Dan Engelbrecht <[email protected]> | 2025-03-18 10:00:55 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-03-18 10:00:55 +0100 |
| commit | 7bb818631b3d7431b7ac1c58da0e46bb0ddaa797 (patch) | |
| tree | 2732fffdb32ae67b0e4080f86072c9341e7a035d /src | |
| parent | collapse local writes (#310) (diff) | |
| download | zen-7bb818631b3d7431b7ac1c58da0e46bb0ddaa797.tar.xz zen-7bb818631b3d7431b7ac1c58da0e46bb0ddaa797.zip | |
If a chunk or block write operation results in more than one completed chunk sequence, do the additional verifications as async work (#311)
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 243 |
1 files changed, 155 insertions, 88 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index 8566f1540..7e3f5345f 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -3870,6 +3870,100 @@ namespace { return ChunkTargetPtrs; }; + void FinalizeChunkSequence(const std::filesystem::path& TargetFolder, const IoHash& SequenceRawHash) + { + ZEN_TRACE_CPU("FinalizeChunkSequence"); + ZEN_ASSERT_SLOW(!std::filesystem::exists(GetFinalChunkedSequenceFileName(TargetFolder, SequenceRawHash))); + std::filesystem::rename(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash), + GetFinalChunkedSequenceFileName(TargetFolder, SequenceRawHash)); + } + + void FinalizeChunkSequences(const std::filesystem::path& TargetFolder, + const ChunkedFolderContent& RemoteContent, + std::span<const uint32_t> RemoteSequenceIndexes) + { + ZEN_TRACE_CPU("FinalizeChunkSequences"); + for (uint32_t SequenceIndex : RemoteSequenceIndexes) + { + FinalizeChunkSequence(TargetFolder, RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); + } + } + + void VerifyAndCompleteChunkSequencesAsync(const std::filesystem::path& TargetFolder, + const ChunkedFolderContent& RemoteContent, + std::span<const uint32_t> RemoteSequenceIndexes, + ParallellWork& Work, + WorkerThreadPool& VerifyPool) + { + if (RemoteSequenceIndexes.empty()) + { + return; + } + ZEN_TRACE_CPU("VerifyAndCompleteChunkSequence"); + for (uint32_t RemoteSequenceIndexOffset = 1; RemoteSequenceIndexOffset < RemoteSequenceIndexes.size(); RemoteSequenceIndexOffset++) + { + const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset]; + Work.ScheduleWork( + VerifyPool, + [&RemoteContent, TargetFolder, RemoteSequenceIndex](std::atomic<bool>&) { + if (!AbortFlag) + { + ZEN_TRACE_CPU("VerifyAndCompleteChunkSequenceAsync"); + const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + { + ZEN_TRACE_CPU("HashSequence"); + const IoHash VerifyChunkHash = IoHash::HashBuffer( + IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash))); + if (VerifyChunkHash != SequenceRawHash) + { + throw std::runtime_error(fmt::format("Written chunk sequence {} hash does not match expected hash {}", + VerifyChunkHash, + SequenceRawHash)); + } + } + FinalizeChunkSequence(TargetFolder, SequenceRawHash); + } + }, + Work.DefaultErrorFunction()); + } + const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[0]; + + const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; + { + ZEN_TRACE_CPU("HashSequence"); + const IoHash VerifyChunkHash = + IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash))); + if (VerifyChunkHash != SequenceRawHash) + { + throw std::runtime_error( + fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash)); + } + } + FinalizeChunkSequence(TargetFolder, SequenceRawHash); + } + + bool CompleteSequenceChunk(uint32_t RemoteSequenceIndex, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters) + { + return SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1; + } + + std::vector<uint32_t> CompleteChunkTargets(const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs, + std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters) + { + ZEN_TRACE_CPU("CompleteChunkTargets"); + + std::vector<uint32_t> CompletedSequenceIndexes; + for (const ChunkedContentLookup::ChunkSequenceLocation* Location : ChunkTargetPtrs) + { + const uint32_t RemoteSequenceIndex = Location->SequenceIndex; + if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) + { + CompletedSequenceIndexes.push_back(RemoteSequenceIndex); + } + } + return CompletedSequenceIndexes; + } + struct BlockWriteOps { std::vector<CompositeBuffer> ChunkBuffers; @@ -3886,6 +3980,8 @@ namespace { const ChunkedContentLookup& Lookup, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, const BlockWriteOps& Ops, + ParallellWork& Work, + WorkerThreadPool& VerifyPool, DiskStatistics& DiskStats, WriteChunkStatistics& WriteChunkStats) { @@ -3928,29 +4024,16 @@ namespace { if (!AbortFlag) { // Write tracking, updating this must be done without any files open (WriteFileCache) + std::vector<uint32_t> CompletedChunkSequences; for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps) { const uint32_t RemoteSequenceIndex = WriteOp.Target->SequenceIndex; - if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1) + if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) { - const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; - { - ZEN_TRACE_CPU("VerifyChunkHash"); - const IoHash VerifyChunkHash = IoHash::HashBuffer( - IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash))); - if (VerifyChunkHash != SequenceRawHash) - { - throw std::runtime_error(fmt::format("Written chunk sequence {} hash does not match expected hash {}", - VerifyChunkHash, - SequenceRawHash)); - } - } - ZEN_TRACE_CPU("VerifyChunkHashes_rename"); - ZEN_ASSERT_SLOW(!std::filesystem::exists(GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash))); - std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash), - GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)); + CompletedChunkSequences.push_back(RemoteSequenceIndex); } } + VerifyAndCompleteChunkSequencesAsync(CacheFolderPath, RemoteContent, CompletedChunkSequences, Work, VerifyPool); } } @@ -4071,6 +4154,8 @@ namespace { const ChunkedFolderContent& RemoteContent, const ChunkBlockDescription& BlockDescription, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + ParallellWork& Work, + WorkerThreadPool& VerifyPool, CompositeBuffer&& BlockBuffer, const ChunkedContentLookup& Lookup, std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, @@ -4107,6 +4192,8 @@ namespace { Lookup, SequenceIndexChunksLeftToWriteCounters, Ops, + Work, + VerifyPool, DiskStats, WriteChunkStats); return true; @@ -4130,6 +4217,8 @@ namespace { Lookup, SequenceIndexChunksLeftToWriteCounters, Ops, + Work, + VerifyPool, DiskStats, WriteChunkStats); return true; @@ -4141,6 +4230,8 @@ namespace { const ChunkedFolderContent& RemoteContent, const ChunkBlockDescription& BlockDescription, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + ParallellWork& Work, + WorkerThreadPool& VerifyPool, CompositeBuffer&& PartialBlockBuffer, uint32_t FirstIncludedBlockChunkIndex, uint32_t LastIncludedBlockChunkIndex, @@ -4171,6 +4262,8 @@ namespace { Lookup, SequenceIndexChunksLeftToWriteCounters, Ops, + Work, + VerifyPool, DiskStats, WriteChunkStats); return true; @@ -4371,42 +4464,6 @@ namespace { return false; } - void CompleteChunkTargets(const std::filesystem::path& TargetFolder, - const ChunkedFolderContent& RemoteContent, - const IoHash& ChunkHash, - const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs, - std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, - const bool NeedHashVerify) - { - ZEN_TRACE_CPU("CompleteChunkTargets"); - - for (const ChunkedContentLookup::ChunkSequenceLocation* Location : ChunkTargetPtrs) - { - const uint32_t RemoteSequenceIndex = Location->SequenceIndex; - if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1) - { - const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; - if (NeedHashVerify) - { - ZEN_TRACE_CPU("VerifyChunkHash"); - - const IoHash VerifyChunkHash = - IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash))); - if (VerifyChunkHash != ChunkHash) - { - throw std::runtime_error( - fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, ChunkHash)); - } - } - - ZEN_TRACE_CPU("RenameToFinal"); - ZEN_ASSERT_SLOW(!std::filesystem::exists(GetFinalChunkedSequenceFileName(TargetFolder, SequenceRawHash))); - std::filesystem::rename(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash), - GetFinalChunkedSequenceFileName(TargetFolder, SequenceRawHash)); - } - } - } - void AsyncWriteDownloadedChunk(const std::filesystem::path& Path, const ChunkedFolderContent& RemoteContent, const ChunkedContentLookup& RemoteLookup, @@ -4518,12 +4575,16 @@ namespace { std::filesystem::remove(CompressedChunkPath); - CompleteChunkTargets(TargetFolder, - RemoteContent, - ChunkHash, - ChunkTargetPtrs, - SequenceIndexChunksLeftToWriteCounters, - NeedHashVerify); + std::vector<uint32_t> CompletedSequences = + CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); + if (NeedHashVerify) + { + VerifyAndCompleteChunkSequencesAsync(TargetFolder, RemoteContent, CompletedSequences, Work, WritePool); + } + else + { + FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences); + } } } }, @@ -4694,8 +4755,8 @@ namespace { else { // We must write the sequence - SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = - RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]; + const uint32_t ChunkCount = RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]; + SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = ChunkCount; } } } @@ -5098,6 +5159,8 @@ namespace { &RemoteLookup, &CacheFolderPath, &SequenceIndexChunksLeftToWriteCounters, + &Work, + &WritePool, &DiskStats, &WriteChunkStats, &WritePartsComplete, @@ -5145,12 +5208,20 @@ namespace { std::filesystem::remove(CompressedChunkPath); - CompleteChunkTargets(TargetFolder, - RemoteContent, - ChunkHash, - ChunkTargetPtrs, - SequenceIndexChunksLeftToWriteCounters, - NeedHashVerify); + std::vector<uint32_t> CompletedSequences = + CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); + if (NeedHashVerify) + { + VerifyAndCompleteChunkSequencesAsync(TargetFolder, + RemoteContent, + CompletedSequences, + Work, + WritePool); + } + else + { + FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences); + } } } }, @@ -5385,32 +5456,20 @@ namespace { if (!AbortFlag) { // Write tracking, updating this must be done without any files open (WriteFileCache) + std::vector<uint32_t> CompletedChunkSequences; for (const WriteOp& Op : WriteOps) { const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex; - if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1) + if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) { - const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; - { - ZEN_TRACE_CPU("VerifyHash"); - const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile( - GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash))); - if (VerifyChunkHash != SequenceRawHash) - { - throw std::runtime_error( - fmt::format("Written chunk sequence {} hash does not match expected hash {}", - VerifyChunkHash, - SequenceRawHash)); - } - } - - ZEN_TRACE_CPU("rename"); - ZEN_ASSERT_SLOW( - !std::filesystem::exists(GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash))); - std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash), - GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)); + CompletedChunkSequences.push_back(RemoteSequenceIndex); } } + VerifyAndCompleteChunkSequencesAsync(CacheFolderPath, + RemoteContent, + CompletedChunkSequences, + Work, + WritePool); ZEN_DEBUG("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), LocalContent.Paths[LocalPathIndex]); } WritePartsComplete++; @@ -5452,6 +5511,8 @@ namespace { RemoteContent, BlockDescription, SequenceIndexChunksLeftToWriteCounters, + Work, + WritePool, CompositeBuffer(std::move(BlockBuffer)), RemoteLookup, RemoteChunkIndexNeedsCopyFromSourceFlags, @@ -5588,6 +5649,8 @@ namespace { RemoteContent, BlockDescription, SequenceIndexChunksLeftToWriteCounters, + Work, + WritePool, CompositeBuffer(std::move(BlockPartialBuffer)), BlockRange.ChunkBlockIndexStart, BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1, @@ -5694,7 +5757,9 @@ namespace { { Work.ScheduleWork( WritePool, // WritePool, GetSyncWorkerPool() - [&RemoteContent, + [&Work, + &WritePool, + &RemoteContent, &RemoteLookup, CacheFolderPath, &RemoteChunkIndexNeedsCopyFromSourceFlags, @@ -5735,6 +5800,8 @@ namespace { RemoteContent, BlockDescription, SequenceIndexChunksLeftToWriteCounters, + Work, + WritePool, CompositeBuffer(std::move(BlockBuffer)), RemoteLookup, RemoteChunkIndexNeedsCopyFromSourceFlags, |