diff options
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 124 |
1 files changed, 73 insertions, 51 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index 06b4f000b..d1f321358 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -4305,7 +4305,6 @@ namespace { if (CurrentTargetIndex != (uint32_t)-1) { ZEN_ASSERT_SLOW(OutputFile); - { RwLock::ExclusiveLockScope _(m_OpenWriteCacheLock); if (m_OutputFiles.size() <= CurrentTargetIndex) @@ -4321,23 +4320,42 @@ namespace { CurrentTargetIndex = (uint32_t)-1; return; } + else + { + m_DiskStats.CurrentOpenFileCount--; + } } - OutputFile = {}; - m_DiskStats.CurrentOpenFileCount--; + OutputFile.reset(); CurrentTargetIndex = (uint32_t)-1; } } - static void FlushSequence(uint32_t SequenceIndex) + void FlushSequence(uint32_t SequenceIndex) { - RwLock::ExclusiveLockScope _(m_OpenWriteCacheLock); - if (SequenceIndex < m_OutputFiles.size() && m_OutputFiles[SequenceIndex]) + if (SequenceIndex == CurrentTargetIndex) { - m_OutputFiles[SequenceIndex]->m_Writer.Flush(); - m_OutputFiles[SequenceIndex] = {}; + ZEN_ASSERT_SLOW(OutputFile); + OutputFile->m_Writer.Flush(); + OutputFile.reset(); + CurrentTargetIndex = (uint32_t)-1; + } + std::unique_ptr<OpenCachedFile> FlushOutputFile; + { + RwLock::ExclusiveLockScope _(m_OpenWriteCacheLock); + if (SequenceIndex < m_OutputFiles.size() && m_OutputFiles[SequenceIndex]) + { + FlushOutputFile = std::move(m_OutputFiles[SequenceIndex]); + } + } + if (FlushOutputFile) + { + FlushOutputFile->m_Writer.Flush(); + FlushOutputFile.reset(); + m_DiskStats.CurrentOpenFileCount--; } } + private: class OpenCachedFile { public: @@ -4502,7 +4520,8 @@ namespace { return PreviousValue == 1; } - std::vector<uint32_t> CompleteChunkTargets(const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs, + std::vector<uint32_t> CompleteChunkTargets(WriteFileCache& OpenFileCache, + const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters) { ZEN_TRACE_CPU("CompleteChunkTargets"); @@ -4513,7 +4532,7 @@ namespace { const uint32_t RemoteSequenceIndex = Location->SequenceIndex; if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) { - WriteFileCache::FlushSequence(RemoteSequenceIndex); + OpenFileCache.FlushSequence(RemoteSequenceIndex); CompletedSequenceIndexes.push_back(RemoteSequenceIndex); } } @@ -4541,45 +4560,43 @@ namespace { DiskStatistics& DiskStats) { ZEN_TRACE_CPU("WriteBlockChunkOps"); + + WriteFileCache OpenFileCache(DiskStats); + for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps) { - WriteFileCache OpenFileCache(DiskStats); - for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps) + if (AbortFlag) { - if (AbortFlag) - { - break; - } - const CompositeBuffer& Chunk = Ops.ChunkBuffers[WriteOp.ChunkBufferIndex]; - const uint32_t SequenceIndex = WriteOp.Target->SequenceIndex; - ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() <= - RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]); - ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() > 0); - const uint64_t ChunkSize = Chunk.GetSize(); - const uint64_t FileOffset = WriteOp.Target->Offset; - const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex]; - ZEN_ASSERT(FileOffset + ChunkSize <= RemoteContent.RawSizes[PathIndex]); - - OpenFileCache.WriteToFile<CompositeBuffer>( - SequenceIndex, - [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) { - return GetTempChunkedSequenceFileName(CacheFolderPath, - RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); - }, - Chunk, - FileOffset, - RemoteContent.RawSizes[PathIndex]); + break; } + const CompositeBuffer& Chunk = Ops.ChunkBuffers[WriteOp.ChunkBufferIndex]; + const uint32_t SequenceIndex = WriteOp.Target->SequenceIndex; + ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() <= + RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]); + ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() > 0); + const uint64_t ChunkSize = Chunk.GetSize(); + const uint64_t FileOffset = WriteOp.Target->Offset; + const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex]; + ZEN_ASSERT(FileOffset + ChunkSize <= RemoteContent.RawSizes[PathIndex]); + + OpenFileCache.WriteToFile<CompositeBuffer>( + SequenceIndex, + [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) { + return GetTempChunkedSequenceFileName(CacheFolderPath, RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); + }, + Chunk, + FileOffset, + RemoteContent.RawSizes[PathIndex]); } + if (!AbortFlag) { - // Write tracking, updating this must be done without any files open (WriteFileCache) std::vector<uint32_t> CompletedChunkSequences; for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps) { const uint32_t RemoteSequenceIndex = WriteOp.Target->SequenceIndex; if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) { - WriteFileCache::FlushSequence(RemoteSequenceIndex); + OpenFileCache.FlushSequence(RemoteSequenceIndex); CompletedChunkSequences.push_back(RemoteSequenceIndex); } } @@ -4998,6 +5015,7 @@ namespace { const IoHash& ChunkHash, const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs, IoBuffer&& CompressedPart, + WriteFileCache& OpenFileCache, DiskStatistics& DiskStats) { ZEN_TRACE_CPU("WriteCompressedChunk"); @@ -5017,7 +5035,6 @@ namespace { if (!AbortFlag) { - WriteFileCache OpenFileCache(DiskStats); WriteChunkToDisk(TargetFolder, RemoteContent, RemoteLookup, @@ -5121,13 +5138,15 @@ namespace { std::filesystem::path TargetFolder = ZenTempCacheFolderPath(ZenFolderPath); - bool NeedHashVerify = WriteCompressedChunk(TargetFolder, - RemoteContent, - RemoteLookup, - ChunkHash, - ChunkTargetPtrs, - std::move(CompressedPart), - DiskStats); + WriteFileCache OpenFileCache(DiskStats); + bool NeedHashVerify = WriteCompressedChunk(TargetFolder, + RemoteContent, + RemoteLookup, + ChunkHash, + ChunkTargetPtrs, + std::move(CompressedPart), + OpenFileCache, + DiskStats); if (!AbortFlag) { WritePartsComplete++; @@ -5139,7 +5158,7 @@ namespace { RemoveFileWithRetry(CompressedChunkPath); std::vector<uint32_t> CompletedSequences = - CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); + CompleteChunkTargets(OpenFileCache, ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); if (NeedHashVerify) { VerifyAndCompleteChunkSequencesAsync(TargetFolder, @@ -6389,13 +6408,15 @@ namespace { CompressedChunkPath)); } - std::filesystem::path TargetFolder = ZenTempCacheFolderPath(ZenFolderPath); + std::filesystem::path TargetFolder = ZenTempCacheFolderPath(ZenFolderPath); + WriteFileCache OpenFileCache(DiskStats); bool NeedHashVerify = WriteCompressedChunk(TargetFolder, RemoteContent, RemoteLookup, ChunkHash, ChunkTargetPtrs, std::move(CompressedPart), + OpenFileCache, DiskStats); WritePartsComplete++; @@ -6409,7 +6430,9 @@ namespace { RemoveFileWithRetry(CompressedChunkPath); std::vector<uint32_t> CompletedSequences = - CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); + CompleteChunkTargets(OpenFileCache, + ChunkTargetPtrs, + SequenceIndexChunksLeftToWriteCounters); if (NeedHashVerify) { VerifyAndCompleteChunkSequencesAsync(TargetFolder, @@ -6682,6 +6705,7 @@ namespace { }); } + WriteFileCache OpenFileCache(DiskStats); if (!AbortFlag) { ZEN_TRACE_CPU("Write"); @@ -6689,7 +6713,6 @@ namespace { tsl::robin_set<uint32_t> ChunkIndexesWritten; BufferedOpenFile SourceFile(SourceFilePath, DiskStats); - WriteFileCache OpenFileCache(DiskStats); for (size_t WriteOpIndex = 0; WriteOpIndex < WriteOps.size();) { if (AbortFlag) @@ -6756,14 +6779,13 @@ namespace { } if (!AbortFlag) { - // Write tracking, updating this must be done without any files open (WriteFileCache) std::vector<uint32_t> CompletedChunkSequences; for (const WriteOp& Op : WriteOps) { const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex; if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) { - WriteFileCache::FlushSequence(RemoteSequenceIndex); + OpenFileCache.FlushSequence(RemoteSequenceIndex); CompletedChunkSequences.push_back(RemoteSequenceIndex); } } |