aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/zen/cmds/builds_cmd.cpp124
1 files changed, 73 insertions, 51 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index 06b4f000b..d1f321358 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -4305,7 +4305,6 @@ namespace {
if (CurrentTargetIndex != (uint32_t)-1)
{
ZEN_ASSERT_SLOW(OutputFile);
-
{
RwLock::ExclusiveLockScope _(m_OpenWriteCacheLock);
if (m_OutputFiles.size() <= CurrentTargetIndex)
@@ -4321,23 +4320,42 @@ namespace {
CurrentTargetIndex = (uint32_t)-1;
return;
}
+ else
+ {
+ m_DiskStats.CurrentOpenFileCount--;
+ }
}
- OutputFile = {};
- m_DiskStats.CurrentOpenFileCount--;
+ OutputFile.reset();
CurrentTargetIndex = (uint32_t)-1;
}
}
- static void FlushSequence(uint32_t SequenceIndex)
+ void FlushSequence(uint32_t SequenceIndex)
{
- RwLock::ExclusiveLockScope _(m_OpenWriteCacheLock);
- if (SequenceIndex < m_OutputFiles.size() && m_OutputFiles[SequenceIndex])
+ if (SequenceIndex == CurrentTargetIndex)
{
- m_OutputFiles[SequenceIndex]->m_Writer.Flush();
- m_OutputFiles[SequenceIndex] = {};
+ ZEN_ASSERT_SLOW(OutputFile);
+ OutputFile->m_Writer.Flush();
+ OutputFile.reset();
+ CurrentTargetIndex = (uint32_t)-1;
+ }
+ std::unique_ptr<OpenCachedFile> FlushOutputFile;
+ {
+ RwLock::ExclusiveLockScope _(m_OpenWriteCacheLock);
+ if (SequenceIndex < m_OutputFiles.size() && m_OutputFiles[SequenceIndex])
+ {
+ FlushOutputFile = std::move(m_OutputFiles[SequenceIndex]);
+ }
+ }
+ if (FlushOutputFile)
+ {
+ FlushOutputFile->m_Writer.Flush();
+ FlushOutputFile.reset();
+ m_DiskStats.CurrentOpenFileCount--;
}
}
+ private:
class OpenCachedFile
{
public:
@@ -4502,7 +4520,8 @@ namespace {
return PreviousValue == 1;
}
- std::vector<uint32_t> CompleteChunkTargets(const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs,
+ std::vector<uint32_t> CompleteChunkTargets(WriteFileCache& OpenFileCache,
+ const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs,
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters)
{
ZEN_TRACE_CPU("CompleteChunkTargets");
@@ -4513,7 +4532,7 @@ namespace {
const uint32_t RemoteSequenceIndex = Location->SequenceIndex;
if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters))
{
- WriteFileCache::FlushSequence(RemoteSequenceIndex);
+ OpenFileCache.FlushSequence(RemoteSequenceIndex);
CompletedSequenceIndexes.push_back(RemoteSequenceIndex);
}
}
@@ -4541,45 +4560,43 @@ namespace {
DiskStatistics& DiskStats)
{
ZEN_TRACE_CPU("WriteBlockChunkOps");
+
+ WriteFileCache OpenFileCache(DiskStats);
+ for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps)
{
- WriteFileCache OpenFileCache(DiskStats);
- for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps)
+ if (AbortFlag)
{
- if (AbortFlag)
- {
- break;
- }
- const CompositeBuffer& Chunk = Ops.ChunkBuffers[WriteOp.ChunkBufferIndex];
- const uint32_t SequenceIndex = WriteOp.Target->SequenceIndex;
- ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() <=
- RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]);
- ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() > 0);
- const uint64_t ChunkSize = Chunk.GetSize();
- const uint64_t FileOffset = WriteOp.Target->Offset;
- const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex];
- ZEN_ASSERT(FileOffset + ChunkSize <= RemoteContent.RawSizes[PathIndex]);
-
- OpenFileCache.WriteToFile<CompositeBuffer>(
- SequenceIndex,
- [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) {
- return GetTempChunkedSequenceFileName(CacheFolderPath,
- RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
- },
- Chunk,
- FileOffset,
- RemoteContent.RawSizes[PathIndex]);
+ break;
}
+ const CompositeBuffer& Chunk = Ops.ChunkBuffers[WriteOp.ChunkBufferIndex];
+ const uint32_t SequenceIndex = WriteOp.Target->SequenceIndex;
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() <=
+ RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]);
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() > 0);
+ const uint64_t ChunkSize = Chunk.GetSize();
+ const uint64_t FileOffset = WriteOp.Target->Offset;
+ const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex];
+ ZEN_ASSERT(FileOffset + ChunkSize <= RemoteContent.RawSizes[PathIndex]);
+
+ OpenFileCache.WriteToFile<CompositeBuffer>(
+ SequenceIndex,
+ [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) {
+ return GetTempChunkedSequenceFileName(CacheFolderPath, RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
+ },
+ Chunk,
+ FileOffset,
+ RemoteContent.RawSizes[PathIndex]);
}
+
if (!AbortFlag)
{
- // Write tracking, updating this must be done without any files open (WriteFileCache)
std::vector<uint32_t> CompletedChunkSequences;
for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps)
{
const uint32_t RemoteSequenceIndex = WriteOp.Target->SequenceIndex;
if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters))
{
- WriteFileCache::FlushSequence(RemoteSequenceIndex);
+ OpenFileCache.FlushSequence(RemoteSequenceIndex);
CompletedChunkSequences.push_back(RemoteSequenceIndex);
}
}
@@ -4998,6 +5015,7 @@ namespace {
const IoHash& ChunkHash,
const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs,
IoBuffer&& CompressedPart,
+ WriteFileCache& OpenFileCache,
DiskStatistics& DiskStats)
{
ZEN_TRACE_CPU("WriteCompressedChunk");
@@ -5017,7 +5035,6 @@ namespace {
if (!AbortFlag)
{
- WriteFileCache OpenFileCache(DiskStats);
WriteChunkToDisk(TargetFolder,
RemoteContent,
RemoteLookup,
@@ -5121,13 +5138,15 @@ namespace {
std::filesystem::path TargetFolder = ZenTempCacheFolderPath(ZenFolderPath);
- bool NeedHashVerify = WriteCompressedChunk(TargetFolder,
- RemoteContent,
- RemoteLookup,
- ChunkHash,
- ChunkTargetPtrs,
- std::move(CompressedPart),
- DiskStats);
+ WriteFileCache OpenFileCache(DiskStats);
+ bool NeedHashVerify = WriteCompressedChunk(TargetFolder,
+ RemoteContent,
+ RemoteLookup,
+ ChunkHash,
+ ChunkTargetPtrs,
+ std::move(CompressedPart),
+ OpenFileCache,
+ DiskStats);
if (!AbortFlag)
{
WritePartsComplete++;
@@ -5139,7 +5158,7 @@ namespace {
RemoveFileWithRetry(CompressedChunkPath);
std::vector<uint32_t> CompletedSequences =
- CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters);
+ CompleteChunkTargets(OpenFileCache, ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters);
if (NeedHashVerify)
{
VerifyAndCompleteChunkSequencesAsync(TargetFolder,
@@ -6389,13 +6408,15 @@ namespace {
CompressedChunkPath));
}
- std::filesystem::path TargetFolder = ZenTempCacheFolderPath(ZenFolderPath);
+ std::filesystem::path TargetFolder = ZenTempCacheFolderPath(ZenFolderPath);
+ WriteFileCache OpenFileCache(DiskStats);
bool NeedHashVerify = WriteCompressedChunk(TargetFolder,
RemoteContent,
RemoteLookup,
ChunkHash,
ChunkTargetPtrs,
std::move(CompressedPart),
+ OpenFileCache,
DiskStats);
WritePartsComplete++;
@@ -6409,7 +6430,9 @@ namespace {
RemoveFileWithRetry(CompressedChunkPath);
std::vector<uint32_t> CompletedSequences =
- CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters);
+ CompleteChunkTargets(OpenFileCache,
+ ChunkTargetPtrs,
+ SequenceIndexChunksLeftToWriteCounters);
if (NeedHashVerify)
{
VerifyAndCompleteChunkSequencesAsync(TargetFolder,
@@ -6682,6 +6705,7 @@ namespace {
});
}
+ WriteFileCache OpenFileCache(DiskStats);
if (!AbortFlag)
{
ZEN_TRACE_CPU("Write");
@@ -6689,7 +6713,6 @@ namespace {
tsl::robin_set<uint32_t> ChunkIndexesWritten;
BufferedOpenFile SourceFile(SourceFilePath, DiskStats);
- WriteFileCache OpenFileCache(DiskStats);
for (size_t WriteOpIndex = 0; WriteOpIndex < WriteOps.size();)
{
if (AbortFlag)
@@ -6756,14 +6779,13 @@ namespace {
}
if (!AbortFlag)
{
- // Write tracking, updating this must be done without any files open (WriteFileCache)
std::vector<uint32_t> CompletedChunkSequences;
for (const WriteOp& Op : WriteOps)
{
const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters))
{
- WriteFileCache::FlushSequence(RemoteSequenceIndex);
+ OpenFileCache.FlushSequence(RemoteSequenceIndex);
CompletedChunkSequences.push_back(RemoteSequenceIndex);
}
}