aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-03-18 10:00:55 +0100
committerGitHub Enterprise <[email protected]>2025-03-18 10:00:55 +0100
commit7bb818631b3d7431b7ac1c58da0e46bb0ddaa797 (patch)
tree2732fffdb32ae67b0e4080f86072c9341e7a035d /src
parentcollapse local writes (#310) (diff)
downloadzen-7bb818631b3d7431b7ac1c58da0e46bb0ddaa797.tar.xz
zen-7bb818631b3d7431b7ac1c58da0e46bb0ddaa797.zip
If a chunk or block write operation results in more than one completed chunk sequence, do the additional verifications as async work (#311)
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/builds_cmd.cpp243
1 files changed, 155 insertions, 88 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index 8566f1540..7e3f5345f 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -3870,6 +3870,100 @@ namespace {
return ChunkTargetPtrs;
};
+ void FinalizeChunkSequence(const std::filesystem::path& TargetFolder, const IoHash& SequenceRawHash)
+ {
+ ZEN_TRACE_CPU("FinalizeChunkSequence");
+ ZEN_ASSERT_SLOW(!std::filesystem::exists(GetFinalChunkedSequenceFileName(TargetFolder, SequenceRawHash)));
+ std::filesystem::rename(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash),
+ GetFinalChunkedSequenceFileName(TargetFolder, SequenceRawHash));
+ }
+
+ void FinalizeChunkSequences(const std::filesystem::path& TargetFolder,
+ const ChunkedFolderContent& RemoteContent,
+ std::span<const uint32_t> RemoteSequenceIndexes)
+ {
+ ZEN_TRACE_CPU("FinalizeChunkSequences");
+ for (uint32_t SequenceIndex : RemoteSequenceIndexes)
+ {
+ FinalizeChunkSequence(TargetFolder, RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
+ }
+ }
+
+ void VerifyAndCompleteChunkSequencesAsync(const std::filesystem::path& TargetFolder,
+ const ChunkedFolderContent& RemoteContent,
+ std::span<const uint32_t> RemoteSequenceIndexes,
+ ParallellWork& Work,
+ WorkerThreadPool& VerifyPool)
+ {
+ if (RemoteSequenceIndexes.empty())
+ {
+ return;
+ }
+ ZEN_TRACE_CPU("VerifyAndCompleteChunkSequence");
+ for (uint32_t RemoteSequenceIndexOffset = 1; RemoteSequenceIndexOffset < RemoteSequenceIndexes.size(); RemoteSequenceIndexOffset++)
+ {
+ const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset];
+ Work.ScheduleWork(
+ VerifyPool,
+ [&RemoteContent, TargetFolder, RemoteSequenceIndex](std::atomic<bool>&) {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("VerifyAndCompleteChunkSequenceAsync");
+ const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ {
+ ZEN_TRACE_CPU("HashSequence");
+ const IoHash VerifyChunkHash = IoHash::HashBuffer(
+ IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash)));
+ if (VerifyChunkHash != SequenceRawHash)
+ {
+ throw std::runtime_error(fmt::format("Written chunk sequence {} hash does not match expected hash {}",
+ VerifyChunkHash,
+ SequenceRawHash));
+ }
+ }
+ FinalizeChunkSequence(TargetFolder, SequenceRawHash);
+ }
+ },
+ Work.DefaultErrorFunction());
+ }
+ const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[0];
+
+ const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ {
+ ZEN_TRACE_CPU("HashSequence");
+ const IoHash VerifyChunkHash =
+ IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash)));
+ if (VerifyChunkHash != SequenceRawHash)
+ {
+ throw std::runtime_error(
+ fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash));
+ }
+ }
+ FinalizeChunkSequence(TargetFolder, SequenceRawHash);
+ }
+
+ bool CompleteSequenceChunk(uint32_t RemoteSequenceIndex, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters)
+ {
+ return SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1;
+ }
+
+ std::vector<uint32_t> CompleteChunkTargets(const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters)
+ {
+ ZEN_TRACE_CPU("CompleteChunkTargets");
+
+ std::vector<uint32_t> CompletedSequenceIndexes;
+ for (const ChunkedContentLookup::ChunkSequenceLocation* Location : ChunkTargetPtrs)
+ {
+ const uint32_t RemoteSequenceIndex = Location->SequenceIndex;
+ if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters))
+ {
+ CompletedSequenceIndexes.push_back(RemoteSequenceIndex);
+ }
+ }
+ return CompletedSequenceIndexes;
+ }
+
struct BlockWriteOps
{
std::vector<CompositeBuffer> ChunkBuffers;
@@ -3886,6 +3980,8 @@ namespace {
const ChunkedContentLookup& Lookup,
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
const BlockWriteOps& Ops,
+ ParallellWork& Work,
+ WorkerThreadPool& VerifyPool,
DiskStatistics& DiskStats,
WriteChunkStatistics& WriteChunkStats)
{
@@ -3928,29 +4024,16 @@ namespace {
if (!AbortFlag)
{
// Write tracking, updating this must be done without any files open (WriteFileCache)
+ std::vector<uint32_t> CompletedChunkSequences;
for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps)
{
const uint32_t RemoteSequenceIndex = WriteOp.Target->SequenceIndex;
- if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1)
+ if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters))
{
- const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- {
- ZEN_TRACE_CPU("VerifyChunkHash");
- const IoHash VerifyChunkHash = IoHash::HashBuffer(
- IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
- if (VerifyChunkHash != SequenceRawHash)
- {
- throw std::runtime_error(fmt::format("Written chunk sequence {} hash does not match expected hash {}",
- VerifyChunkHash,
- SequenceRawHash));
- }
- }
- ZEN_TRACE_CPU("VerifyChunkHashes_rename");
- ZEN_ASSERT_SLOW(!std::filesystem::exists(GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
- std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash),
- GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash));
+ CompletedChunkSequences.push_back(RemoteSequenceIndex);
}
}
+ VerifyAndCompleteChunkSequencesAsync(CacheFolderPath, RemoteContent, CompletedChunkSequences, Work, VerifyPool);
}
}
@@ -4071,6 +4154,8 @@ namespace {
const ChunkedFolderContent& RemoteContent,
const ChunkBlockDescription& BlockDescription,
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ ParallellWork& Work,
+ WorkerThreadPool& VerifyPool,
CompositeBuffer&& BlockBuffer,
const ChunkedContentLookup& Lookup,
std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
@@ -4107,6 +4192,8 @@ namespace {
Lookup,
SequenceIndexChunksLeftToWriteCounters,
Ops,
+ Work,
+ VerifyPool,
DiskStats,
WriteChunkStats);
return true;
@@ -4130,6 +4217,8 @@ namespace {
Lookup,
SequenceIndexChunksLeftToWriteCounters,
Ops,
+ Work,
+ VerifyPool,
DiskStats,
WriteChunkStats);
return true;
@@ -4141,6 +4230,8 @@ namespace {
const ChunkedFolderContent& RemoteContent,
const ChunkBlockDescription& BlockDescription,
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ ParallellWork& Work,
+ WorkerThreadPool& VerifyPool,
CompositeBuffer&& PartialBlockBuffer,
uint32_t FirstIncludedBlockChunkIndex,
uint32_t LastIncludedBlockChunkIndex,
@@ -4171,6 +4262,8 @@ namespace {
Lookup,
SequenceIndexChunksLeftToWriteCounters,
Ops,
+ Work,
+ VerifyPool,
DiskStats,
WriteChunkStats);
return true;
@@ -4371,42 +4464,6 @@ namespace {
return false;
}
- void CompleteChunkTargets(const std::filesystem::path& TargetFolder,
- const ChunkedFolderContent& RemoteContent,
- const IoHash& ChunkHash,
- const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs,
- std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
- const bool NeedHashVerify)
- {
- ZEN_TRACE_CPU("CompleteChunkTargets");
-
- for (const ChunkedContentLookup::ChunkSequenceLocation* Location : ChunkTargetPtrs)
- {
- const uint32_t RemoteSequenceIndex = Location->SequenceIndex;
- if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1)
- {
- const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- if (NeedHashVerify)
- {
- ZEN_TRACE_CPU("VerifyChunkHash");
-
- const IoHash VerifyChunkHash =
- IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash)));
- if (VerifyChunkHash != ChunkHash)
- {
- throw std::runtime_error(
- fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, ChunkHash));
- }
- }
-
- ZEN_TRACE_CPU("RenameToFinal");
- ZEN_ASSERT_SLOW(!std::filesystem::exists(GetFinalChunkedSequenceFileName(TargetFolder, SequenceRawHash)));
- std::filesystem::rename(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash),
- GetFinalChunkedSequenceFileName(TargetFolder, SequenceRawHash));
- }
- }
- }
-
void AsyncWriteDownloadedChunk(const std::filesystem::path& Path,
const ChunkedFolderContent& RemoteContent,
const ChunkedContentLookup& RemoteLookup,
@@ -4518,12 +4575,16 @@ namespace {
std::filesystem::remove(CompressedChunkPath);
- CompleteChunkTargets(TargetFolder,
- RemoteContent,
- ChunkHash,
- ChunkTargetPtrs,
- SequenceIndexChunksLeftToWriteCounters,
- NeedHashVerify);
+ std::vector<uint32_t> CompletedSequences =
+ CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters);
+ if (NeedHashVerify)
+ {
+ VerifyAndCompleteChunkSequencesAsync(TargetFolder, RemoteContent, CompletedSequences, Work, WritePool);
+ }
+ else
+ {
+ FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences);
+ }
}
}
},
@@ -4694,8 +4755,8 @@ namespace {
else
{
// We must write the sequence
- SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] =
- RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex];
+ const uint32_t ChunkCount = RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex];
+ SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = ChunkCount;
}
}
}
@@ -5098,6 +5159,8 @@ namespace {
&RemoteLookup,
&CacheFolderPath,
&SequenceIndexChunksLeftToWriteCounters,
+ &Work,
+ &WritePool,
&DiskStats,
&WriteChunkStats,
&WritePartsComplete,
@@ -5145,12 +5208,20 @@ namespace {
std::filesystem::remove(CompressedChunkPath);
- CompleteChunkTargets(TargetFolder,
- RemoteContent,
- ChunkHash,
- ChunkTargetPtrs,
- SequenceIndexChunksLeftToWriteCounters,
- NeedHashVerify);
+ std::vector<uint32_t> CompletedSequences =
+ CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters);
+ if (NeedHashVerify)
+ {
+ VerifyAndCompleteChunkSequencesAsync(TargetFolder,
+ RemoteContent,
+ CompletedSequences,
+ Work,
+ WritePool);
+ }
+ else
+ {
+ FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences);
+ }
}
}
},
@@ -5385,32 +5456,20 @@ namespace {
if (!AbortFlag)
{
// Write tracking, updating this must be done without any files open (WriteFileCache)
+ std::vector<uint32_t> CompletedChunkSequences;
for (const WriteOp& Op : WriteOps)
{
const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
- if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1)
+ if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters))
{
- const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- {
- ZEN_TRACE_CPU("VerifyHash");
- const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(
- GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
- if (VerifyChunkHash != SequenceRawHash)
- {
- throw std::runtime_error(
- fmt::format("Written chunk sequence {} hash does not match expected hash {}",
- VerifyChunkHash,
- SequenceRawHash));
- }
- }
-
- ZEN_TRACE_CPU("rename");
- ZEN_ASSERT_SLOW(
- !std::filesystem::exists(GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
- std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash),
- GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash));
+ CompletedChunkSequences.push_back(RemoteSequenceIndex);
}
}
+ VerifyAndCompleteChunkSequencesAsync(CacheFolderPath,
+ RemoteContent,
+ CompletedChunkSequences,
+ Work,
+ WritePool);
ZEN_DEBUG("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), LocalContent.Paths[LocalPathIndex]);
}
WritePartsComplete++;
@@ -5452,6 +5511,8 @@ namespace {
RemoteContent,
BlockDescription,
SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ WritePool,
CompositeBuffer(std::move(BlockBuffer)),
RemoteLookup,
RemoteChunkIndexNeedsCopyFromSourceFlags,
@@ -5588,6 +5649,8 @@ namespace {
RemoteContent,
BlockDescription,
SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ WritePool,
CompositeBuffer(std::move(BlockPartialBuffer)),
BlockRange.ChunkBlockIndexStart,
BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1,
@@ -5694,7 +5757,9 @@ namespace {
{
Work.ScheduleWork(
WritePool, // WritePool, GetSyncWorkerPool()
- [&RemoteContent,
+ [&Work,
+ &WritePool,
+ &RemoteContent,
&RemoteLookup,
CacheFolderPath,
&RemoteChunkIndexNeedsCopyFromSourceFlags,
@@ -5735,6 +5800,8 @@ namespace {
RemoteContent,
BlockDescription,
SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ WritePool,
CompositeBuffer(std::move(BlockBuffer)),
RemoteLookup,
RemoteChunkIndexNeedsCopyFromSourceFlags,