diff options
| author | Dan Engelbrecht <[email protected]> | 2025-04-23 08:54:32 +0200 |
|---|---|---|
| committer | Dan Engelbrecht <[email protected]> | 2025-04-23 09:58:23 +0200 |
| commit | 89b06f514a303ef61f0d574dd85ba5d3edb25940 (patch) | |
| tree | e17f2a63a63cb2f16299f11dfd5bde84455484b1 /src | |
| parent | make sure to call MakeSafeAbsolutePathÍnPlace where appropriate (#363) (diff) | |
| download | zen-89b06f514a303ef61f0d574dd85ba5d3edb25940.tar.xz zen-89b06f514a303ef61f0d574dd85ba5d3edb25940.zip | |
keep files open in WriteFileCache
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 157 |
1 files changed, 116 insertions, 41 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index 43e0ed689..06b4f000b 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -4217,59 +4217,83 @@ namespace { uint64_t TargetFinalSize) { ZEN_TRACE_CPU("WriteFileCache_WriteToFile"); - if (!SeenTargetIndexes.empty() && SeenTargetIndexes.back() == TargetIndex) + if (CurrentTargetIndex == TargetIndex) { ZEN_TRACE_CPU("WriteFileCache_WriteToFile_CacheWrite"); - ZEN_ASSERT(OpenFileWriter); - OpenFileWriter->Write(Buffer, FileOffset); + ZEN_ASSERT(OutputFile); + OutputFile->m_Writer.Write(Buffer, FileOffset); m_DiskStats.WriteCount++; m_DiskStats.WriteByteCount += Buffer.GetSize(); } else { - std::unique_ptr<BasicFile> NewOutputFile; - { - ZEN_TRACE_CPU("WriteFileCache_WriteToFile_Open"); - Flush(); - const std::filesystem::path& TargetPath = GetTargetPath(TargetIndex); - CreateDirectories(TargetPath.parent_path()); - uint32_t Tries = 5; - NewOutputFile = - std::make_unique<BasicFile>(TargetPath, BasicFile::Mode::kWrite, [&Tries, TargetPath](std::error_code& Ec) { - if (Tries < 3) - { - ZEN_CONSOLE("Failed opening file '{}': {}{}", TargetPath, Ec.message(), Tries > 1 ? " Retrying"sv : ""sv); - } - if (Tries > 1) - { - Sleep(100); - } - return --Tries > 0; - }); - m_DiskStats.OpenWriteCount++; - m_DiskStats.CurrentOpenFileCount++; - } + Flush(); const bool CacheWriter = TargetFinalSize > Buffer.GetSize(); if (CacheWriter) { - ZEN_TRACE_CPU("WriteFileCache_WriteToFile_CacheWrite"); - ZEN_ASSERT_SLOW(std::find(SeenTargetIndexes.begin(), SeenTargetIndexes.end(), TargetIndex) == SeenTargetIndexes.end()); + ZEN_TRACE_CPU("WriteFileCache_Write_Cached"); + { + RwLock::ExclusiveLockScope _(m_OpenWriteCacheLock); + if (TargetIndex < m_OutputFiles.size()) + { + OutputFile = std::move(m_OutputFiles[TargetIndex]); + } + } + if (!OutputFile) + { + ZEN_TRACE_CPU("WriteFileCache_Write_CachedOpen"); + const std::filesystem::path& TargetPath = GetTargetPath(TargetIndex); + CreateDirectories(TargetPath.parent_path()); + uint32_t Tries = 5; + OutputFile = std::make_unique<OpenCachedFile>(TargetFinalSize, + TargetPath, + BasicFile::Mode::kWrite, + [&Tries, TargetPath](std::error_code& Ec) { + if (Tries < 3) + { + ZEN_CONSOLE("Failed opening file '{}': {}{}", + TargetPath, + Ec.message(), + Tries > 1 ? " Retrying"sv : ""sv); + } + if (Tries > 1) + { + Sleep(100); + } + return --Tries > 0; + }); + m_DiskStats.OpenWriteCount++; + m_DiskStats.CurrentOpenFileCount++; + } - OutputFile = std::move(NewOutputFile); - OpenFileWriter = std::make_unique<BasicFileWriter>(*OutputFile, Min(TargetFinalSize, 256u * 1024u)); - OpenFileWriter->Write(Buffer, FileOffset); + CurrentTargetIndex = TargetIndex; + OutputFile->m_Writer.Write(Buffer, FileOffset); m_DiskStats.WriteCount++; m_DiskStats.WriteByteCount += Buffer.GetSize(); - SeenTargetIndexes.push_back(TargetIndex); } else { - ZEN_TRACE_CPU("WriteFileCache_WriteToFile_Write"); - NewOutputFile->Write(Buffer, FileOffset); + ZEN_TRACE_CPU("WriteFileCache_Write_Uncached"); + const std::filesystem::path& TargetPath = GetTargetPath(TargetIndex); + CreateDirectories(TargetPath.parent_path()); + uint32_t Tries = 5; + BasicFile TmpFile(TargetPath, BasicFile::Mode::kWrite, [&Tries, TargetPath](std::error_code& Ec) { + if (Tries < 3) + { + ZEN_CONSOLE("Failed opening file '{}': {}{}", TargetPath, Ec.message(), Tries > 1 ? " Retrying"sv : ""sv); + } + if (Tries > 1) + { + Sleep(100); + } + return --Tries > 0; + }); + m_DiskStats.OpenWriteCount++; + m_DiskStats.CurrentOpenFileCount++; + TmpFile.Write(Buffer, FileOffset); m_DiskStats.WriteCount++; m_DiskStats.WriteByteCount += Buffer.GetSize(); - NewOutputFile = {}; m_DiskStats.CurrentOpenFileCount--; } } @@ -4278,20 +4302,68 @@ namespace { void Flush() { ZEN_TRACE_CPU("WriteFileCache_Flush"); - if (OutputFile) + if (CurrentTargetIndex != (uint32_t)-1) { + ZEN_ASSERT_SLOW(OutputFile); + + { + RwLock::ExclusiveLockScope _(m_OpenWriteCacheLock); + if (m_OutputFiles.size() <= CurrentTargetIndex) + { + m_OutputFiles.resize(CurrentTargetIndex + 1); + m_OutputFiles[CurrentTargetIndex] = std::move(OutputFile); + CurrentTargetIndex = (uint32_t)-1; + return; + } + if (!m_OutputFiles[CurrentTargetIndex]) + { + m_OutputFiles[CurrentTargetIndex] = std::move(OutputFile); + CurrentTargetIndex = (uint32_t)-1; + return; + } + } + OutputFile = {}; m_DiskStats.CurrentOpenFileCount--; + CurrentTargetIndex = (uint32_t)-1; } + } - OpenFileWriter = {}; - OutputFile = {}; + static void FlushSequence(uint32_t SequenceIndex) + { + RwLock::ExclusiveLockScope _(m_OpenWriteCacheLock); + if (SequenceIndex < m_OutputFiles.size() && m_OutputFiles[SequenceIndex]) + { + m_OutputFiles[SequenceIndex]->m_Writer.Flush(); + m_OutputFiles[SequenceIndex] = {}; + } } - DiskStatistics& m_DiskStats; - std::vector<uint32_t> SeenTargetIndexes; - std::unique_ptr<BasicFile> OutputFile; - std::unique_ptr<BasicFileWriter> OpenFileWriter; + + class OpenCachedFile + { + public: + OpenCachedFile(uint64_t TargetFinalSize, + const std::filesystem::path& FileName, + BasicFile::Mode Mode, + std::function<bool(std::error_code& Ec)>&& RetryCallback) + : m_File(FileName, Mode, std::move(RetryCallback)) + , m_Writer(m_File, Min(TargetFinalSize, 256u * 1024u)) + { + } + BasicFile m_File; + BasicFileWriter m_Writer; + }; + + DiskStatistics& m_DiskStats; + std::unique_ptr<OpenCachedFile> OutputFile; + uint32_t CurrentTargetIndex = (uint32_t)-1; + + static RwLock m_OpenWriteCacheLock; + static std::vector<std::unique_ptr<OpenCachedFile>> m_OutputFiles; }; + RwLock WriteFileCache::m_OpenWriteCacheLock; + std::vector<std::unique_ptr<WriteFileCache::OpenCachedFile>> WriteFileCache::m_OutputFiles; + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> GetRemainingChunkTargets( std::span<const std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, const ChunkedContentLookup& Lookup, @@ -4441,6 +4513,7 @@ namespace { const uint32_t RemoteSequenceIndex = Location->SequenceIndex; if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) { + WriteFileCache::FlushSequence(RemoteSequenceIndex); CompletedSequenceIndexes.push_back(RemoteSequenceIndex); } } @@ -4506,6 +4579,7 @@ namespace { const uint32_t RemoteSequenceIndex = WriteOp.Target->SequenceIndex; if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) { + WriteFileCache::FlushSequence(RemoteSequenceIndex); CompletedChunkSequences.push_back(RemoteSequenceIndex); } } @@ -6689,6 +6763,7 @@ namespace { const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex; if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) { + WriteFileCache::FlushSequence(RemoteSequenceIndex); CompletedChunkSequences.push_back(RemoteSequenceIndex); } } |