aboutsummaryrefslogtreecommitdiff
path: root/src/zen/cmds/builds_cmd.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-03-07 09:58:20 +0100
committerGitHub Enterprise <[email protected]>2025-03-07 09:58:20 +0100
commit9b24647facccc9c7848a52f1f4c5e32055bf2f01 (patch)
treed798889a2b3f6a0d72331890b75e92b0159ce0c3 /src/zen/cmds/builds_cmd.cpp
parentreduced memory churn using fixed_xxx containers (#236) (diff)
downloadarchived-zen-9b24647facccc9c7848a52f1f4c5e32055bf2f01.tar.xz
archived-zen-9b24647facccc9c7848a52f1f4c5e32055bf2f01.zip
partial block fetch (#298)
- Improvement: Do partial requests of blocks if not all of the block is needed - Improvement: Better progress/statistics on download - Bugfix: Ensure that temporary folder for Jupiter downloads exists during verify phase
Diffstat (limited to 'src/zen/cmds/builds_cmd.cpp')
-rw-r--r--src/zen/cmds/builds_cmd.cpp1099
1 files changed, 775 insertions, 324 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index eb0650c4d..1c9476b96 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -231,7 +231,10 @@ namespace {
return AuthToken;
}
- CompositeBuffer WriteToTempFileIfNeeded(const CompositeBuffer& Buffer, const std::filesystem::path& TempFolderPath, const IoHash& Hash)
+ CompositeBuffer WriteToTempFileIfNeeded(const CompositeBuffer& Buffer,
+ const std::filesystem::path& TempFolderPath,
+ const IoHash& Hash,
+ const std::string& Suffix = {})
{
// If this is a file based buffer or a compressed buffer with a memory-based header, we don't need to rewrite to disk to save memory
std::span<const SharedBuffer> Segments = Buffer.GetSegments();
@@ -241,7 +244,7 @@ namespace {
{
return Buffer;
}
- std::filesystem::path TempFilePath = (TempFolderPath / Hash.ToHexString()).make_preferred();
+ std::filesystem::path TempFilePath = (TempFolderPath / (Hash.ToHexString() + Suffix)).make_preferred();
return CompositeBuffer(WriteToTempFile(Buffer, TempFilePath));
}
@@ -302,7 +305,7 @@ namespace {
return FilteredPerSecond;
}
- uint64_t GetElapsedTime() const
+ uint64_t GetElapsedTimeUS() const
{
if (StartTimeUS == (uint64_t)-1)
{
@@ -1738,8 +1741,8 @@ namespace {
ProgressBar.Finish();
- GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGeneratedBytesPerSecond.GetElapsedTime();
- UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTime();
+ GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGeneratedBytesPerSecond.GetElapsedTimeUS();
+ UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS();
}
}
@@ -2069,9 +2072,9 @@ namespace {
});
ProgressBar.Finish();
- UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTime();
- GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGenerateBlockBytesPerSecond.GetElapsedTime();
- LooseChunksStats.CompressChunksElapsedWallTimeUS = FilteredCompressedBytesPerSecond.GetElapsedTime();
+ UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS();
+ GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGenerateBlockBytesPerSecond.GetElapsedTimeUS();
+ LooseChunksStats.CompressChunksElapsedWallTimeUS = FilteredCompressedBytesPerSecond.GetElapsedTimeUS();
}
}
@@ -3358,17 +3361,8 @@ namespace {
return ChunkTargetPtrs;
};
- bool WriteBlockToDisk(const std::filesystem::path& CacheFolderPath,
- const ChunkedFolderContent& RemoteContent,
- std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
- const CompositeBuffer& DecompressedBlockBuffer,
- const ChunkedContentLookup& Lookup,
- std::atomic<bool>* RemoteChunkIndexNeedsCopyFromSourceFlags,
- std::atomic<uint32_t>& OutChunksComplete,
- std::atomic<uint64_t>& OutBytesWritten)
+ struct BlockWriteOps
{
- ZEN_TRACE_CPU("WriteBlockToDisk");
-
std::vector<CompositeBuffer> ChunkBuffers;
struct WriteOpData
{
@@ -3376,7 +3370,79 @@ namespace {
size_t ChunkBufferIndex;
};
std::vector<WriteOpData> WriteOps;
+ };
+ void WriteBlockChunkOps(const std::filesystem::path& CacheFolderPath,
+ const ChunkedFolderContent& RemoteContent,
+ const ChunkedContentLookup& Lookup,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ const BlockWriteOps Ops,
+ std::atomic<uint32_t>& OutChunksComplete,
+ std::atomic<uint64_t>& OutBytesWritten)
+ {
+ ZEN_TRACE_CPU("WriteBlockChunkOps");
+ {
+ WriteFileCache OpenFileCache;
+ for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps)
+ {
+ if (AbortFlag)
+ {
+ break;
+ }
+ const CompositeBuffer& Chunk = Ops.ChunkBuffers[WriteOp.ChunkBufferIndex];
+ const uint32_t SequenceIndex = WriteOp.Target->SequenceIndex;
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() <=
+ RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]);
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() > 0);
+ const uint64_t ChunkSize = Chunk.GetSize();
+ const uint64_t FileOffset = WriteOp.Target->Offset;
+ const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex];
+ ZEN_ASSERT(FileOffset + ChunkSize <= RemoteContent.RawSizes[PathIndex]);
+
+ OpenFileCache.WriteToFile<CompositeBuffer>(
+ SequenceIndex,
+ [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) {
+ return GetTempChunkedSequenceFileName(CacheFolderPath,
+ RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
+ },
+ Chunk,
+ FileOffset,
+ RemoteContent.RawSizes[PathIndex]);
+ OutBytesWritten += ChunkSize;
+ }
+ }
+ if (!AbortFlag)
+ {
+ // Write tracking, updating this must be done without any files open (WriteFileCache)
+ for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps)
+ {
+ const uint32_t RemoteSequenceIndex = WriteOp.Target->SequenceIndex;
+ if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1)
+ {
+ const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ const IoHash VerifyChunkHash =
+ IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
+ if (VerifyChunkHash != SequenceRawHash)
+ {
+ throw std::runtime_error(
+ fmt::format("Written hunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash));
+ }
+ std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash),
+ GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash));
+ }
+ }
+ OutChunksComplete += gsl::narrow<uint32_t>(Ops.ChunkBuffers.size());
+ }
+ }
+
+ bool GetBlockWriteOps(const ChunkedFolderContent& RemoteContent,
+ const ChunkedContentLookup& Lookup,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
+ const CompositeBuffer& DecompressedBlockBuffer,
+ BlockWriteOps& OutOps)
+ {
+ ZEN_TRACE_CPU("GetBlockWriteOps");
SharedBuffer BlockBuffer = DecompressedBlockBuffer.Flatten();
uint64_t HeaderSize = 0;
if (IterateChunkBlock(
@@ -3402,91 +3468,207 @@ namespace {
ZEN_ASSERT(Decompressed.GetSize() == RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]);
for (const ChunkedContentLookup::ChunkSequenceLocation* Target : ChunkTargetPtrs)
{
- WriteOps.push_back(WriteOpData{.Target = Target, .ChunkBufferIndex = ChunkBuffers.size()});
+ OutOps.WriteOps.push_back(
+ BlockWriteOps::WriteOpData{.Target = Target, .ChunkBufferIndex = OutOps.ChunkBuffers.size()});
}
- ChunkBuffers.emplace_back(std::move(Decompressed));
+ OutOps.ChunkBuffers.emplace_back(std::move(Decompressed));
}
}
}
},
HeaderSize))
{
- if (!WriteOps.empty())
+ std::sort(OutOps.WriteOps.begin(),
+ OutOps.WriteOps.end(),
+ [](const BlockWriteOps::WriteOpData& Lhs, const BlockWriteOps::WriteOpData& Rhs) {
+ if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
+ {
+ return true;
+ }
+ if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
+ {
+ return false;
+ }
+ return Lhs.Target->Offset < Rhs.Target->Offset;
+ });
+
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ bool WriteBlockToDisk(const std::filesystem::path& CacheFolderPath,
+ const ChunkedFolderContent& RemoteContent,
+ const ChunkBlockDescription& BlockDescription,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ const CompositeBuffer& BlockBuffer,
+ const ChunkedContentLookup& Lookup,
+ std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
+ std::atomic<uint32_t>& OutChunksComplete,
+ std::atomic<uint64_t>& OutBytesWritten)
+ {
+ ZEN_TRACE_CPU("WriteBlockToDisk");
+
+ IoHash BlockRawHash;
+ uint64_t BlockRawSize;
+ CompressedBuffer CompressedBlockBuffer = CompressedBuffer::FromCompressed(BlockBuffer, BlockRawHash, BlockRawSize);
+ if (!CompressedBlockBuffer)
+ {
+ throw std::runtime_error(fmt::format("Block {} is not a compressed buffer", BlockDescription.BlockHash));
+ }
+
+ if (BlockRawHash != BlockDescription.BlockHash)
+ {
+ throw std::runtime_error(
+ fmt::format("Block {} header has a mismatching raw hash {}", BlockDescription.BlockHash, BlockRawHash));
+ }
+
+ CompositeBuffer DecompressedBlockBuffer = CompressedBlockBuffer.DecompressToComposite();
+ if (!DecompressedBlockBuffer)
+ {
+ throw std::runtime_error(fmt::format("Block {} failed to decompress", BlockDescription.BlockHash));
+ }
+
+ ZEN_ASSERT_SLOW(BlockDescription.BlockHash == IoHash::HashBuffer(DecompressedBlockBuffer));
+
+ BlockWriteOps Ops;
+ if (GetBlockWriteOps(RemoteContent,
+ Lookup,
+ SequenceIndexChunksLeftToWriteCounters,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ DecompressedBlockBuffer,
+ Ops))
+ {
+ WriteBlockChunkOps(CacheFolderPath,
+ RemoteContent,
+ Lookup,
+ SequenceIndexChunksLeftToWriteCounters,
+ Ops,
+ OutChunksComplete,
+ OutBytesWritten);
+ return true;
+ }
+ return false;
+ }
+
+ bool GetPartialBlockWriteOps(const ChunkedFolderContent& RemoteContent,
+ const ChunkedContentLookup& Lookup,
+ const ChunkBlockDescription& BlockDescription,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
+ const CompositeBuffer& PartialBlockBuffer,
+ uint32_t FirstIncludedBlockChunkIndex,
+ uint32_t LastIncludedBlockChunkIndex,
+ BlockWriteOps& OutOps)
+ {
+ ZEN_TRACE_CPU("GetPartialBlockWriteOps");
+ uint32_t OffsetInBlock = 0;
+ for (uint32_t ChunkBlockIndex = FirstIncludedBlockChunkIndex; ChunkBlockIndex <= LastIncludedBlockChunkIndex; ChunkBlockIndex++)
+ {
+ const uint32_t ChunkCompressedSize = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex];
+ const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex];
+ if (auto It = Lookup.ChunkHashToChunkIndex.find(ChunkHash); It != Lookup.ChunkHashToChunkIndex.end())
{
- if (!AbortFlag)
+ const uint32_t ChunkIndex = It->second;
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
+ GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, Lookup, ChunkIndex);
+
+ if (!ChunkTargetPtrs.empty())
{
- std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOpData& Lhs, const WriteOpData& Rhs) {
- if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
+ bool NeedsWrite = true;
+ if (RemoteChunkIndexNeedsCopyFromSourceFlags[ChunkIndex].compare_exchange_strong(NeedsWrite, false))
+ {
+ CompositeBuffer Chunk = PartialBlockBuffer.Mid(OffsetInBlock, ChunkCompressedSize);
+ IoHash VerifyChunkHash;
+ uint64_t VerifyRawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Chunk, VerifyChunkHash, VerifyRawSize);
+ if (!Compressed)
{
- return true;
+ ZEN_ASSERT(false);
}
- if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
+ if (VerifyChunkHash != ChunkHash)
{
- return false;
+ ZEN_ASSERT(false);
}
- return Lhs.Target->Offset < Rhs.Target->Offset;
- });
-
- {
- WriteFileCache OpenFileCache;
- for (const WriteOpData& WriteOp : WriteOps)
+ if (VerifyRawSize != BlockDescription.ChunkRawLengths[ChunkBlockIndex])
{
- if (AbortFlag)
- {
- break;
- }
- const CompositeBuffer& Chunk = ChunkBuffers[WriteOp.ChunkBufferIndex];
- const uint32_t SequenceIndex = WriteOp.Target->SequenceIndex;
- ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() <=
- RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]);
- ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() > 0);
- const uint64_t ChunkSize = Chunk.GetSize();
- const uint64_t FileOffset = WriteOp.Target->Offset;
- const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex];
- ZEN_ASSERT(FileOffset + ChunkSize <= RemoteContent.RawSizes[PathIndex]);
-
- OpenFileCache.WriteToFile<CompositeBuffer>(
- SequenceIndex,
- [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) {
- return GetTempChunkedSequenceFileName(CacheFolderPath,
- RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
- },
- Chunk,
- FileOffset,
- RemoteContent.RawSizes[PathIndex]);
- OutBytesWritten += ChunkSize;
+ ZEN_ASSERT(false);
}
- }
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("WriteBlockToDisk_VerifyHash");
-
- // Write tracking, updating this must be done without any files open (WriteFileCache)
- for (const WriteOpData& WriteOp : WriteOps)
+ CompositeBuffer Decompressed = Compressed.DecompressToComposite();
+ if (!Decompressed)
{
- const uint32_t RemoteSequenceIndex = WriteOp.Target->SequenceIndex;
- if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1)
- {
- const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- const IoHash VerifyChunkHash = IoHash::HashBuffer(
- IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
- if (VerifyChunkHash != SequenceRawHash)
- {
- throw std::runtime_error(fmt::format("Written chunk sequence {} hash does not match expected hash {}",
- VerifyChunkHash,
- SequenceRawHash));
- }
- std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash),
- GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash));
- }
+ throw std::runtime_error(fmt::format("Decompression of build blob {} failed", ChunkHash));
}
+ ZEN_ASSERT_SLOW(ChunkHash == IoHash::HashBuffer(Decompressed));
+ ZEN_ASSERT(Decompressed.GetSize() == RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]);
+ for (const ChunkedContentLookup::ChunkSequenceLocation* Target : ChunkTargetPtrs)
+ {
+ OutOps.WriteOps.push_back(
+ BlockWriteOps::WriteOpData{.Target = Target, .ChunkBufferIndex = OutOps.ChunkBuffers.size()});
+ }
+ OutOps.ChunkBuffers.emplace_back(std::move(Decompressed));
}
- OutChunksComplete += gsl::narrow<uint32_t>(ChunkBuffers.size());
}
}
+
+ OffsetInBlock += ChunkCompressedSize;
+ }
+ std::sort(OutOps.WriteOps.begin(),
+ OutOps.WriteOps.end(),
+ [](const BlockWriteOps::WriteOpData& Lhs, const BlockWriteOps::WriteOpData& Rhs) {
+ if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
+ {
+ return true;
+ }
+ if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
+ {
+ return false;
+ }
+ return Lhs.Target->Offset < Rhs.Target->Offset;
+ });
+ return true;
+ }
+
+ bool WritePartialBlockToDisk(const std::filesystem::path& CacheFolderPath,
+ const ChunkedFolderContent& RemoteContent,
+ const ChunkBlockDescription& BlockDescription,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ const CompositeBuffer& PartialBlockBuffer,
+ uint32_t FirstIncludedBlockChunkIndex,
+ uint32_t LastIncludedBlockChunkIndex,
+ const ChunkedContentLookup& Lookup,
+ std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
+ std::atomic<uint32_t>& OutChunksComplete,
+ std::atomic<uint64_t>& OutBytesWritten)
+ {
+ ZEN_TRACE_CPU("WritePartialBlockToDisk");
+ BlockWriteOps Ops;
+ if (GetPartialBlockWriteOps(RemoteContent,
+ Lookup,
+ BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ PartialBlockBuffer,
+ FirstIncludedBlockChunkIndex,
+ LastIncludedBlockChunkIndex,
+ Ops))
+ {
+ WriteBlockChunkOps(CacheFolderPath,
+ RemoteContent,
+ Lookup,
+ SequenceIndexChunksLeftToWriteCounters,
+ Ops,
+ OutChunksComplete,
+ OutBytesWritten);
return true;
}
- return false;
+ else
+ {
+ return false;
+ }
}
SharedBuffer Decompress(const CompositeBuffer& CompressedChunk, const IoHash& ChunkHash, const uint64_t ChunkRawSize)
@@ -3571,6 +3753,7 @@ namespace {
CompositeBuffer&& CompressedPart,
std::atomic<uint64_t>& WriteToDiskBytes)
{
+ ZEN_TRACE_CPU("StreamDecompress");
const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash);
TemporaryFile DecompressedTemp;
std::error_code Ec;
@@ -3633,10 +3816,9 @@ namespace {
WorkerThreadPool& NetworkPool,
std::atomic<uint64_t>& WriteToDiskBytes,
std::atomic<uint64_t>& BytesDownloaded,
- std::atomic<uint64_t>& LooseChunksBytes,
- std::atomic<uint64_t>& DownloadedChunks,
- std::atomic<uint32_t>& ChunksComplete,
- std::atomic<uint64_t>& MultipartAttachmentCount)
+ std::atomic<uint64_t>& MultipartAttachmentCount,
+ std::function<void(uint64_t DowloadedBytes)>&& OnDownloadComplete,
+ std::function<void()>&& OnWriteComplete)
{
ZEN_TRACE_CPU("DownloadLargeBlob");
@@ -3665,22 +3847,20 @@ namespace {
Workload,
ChunkHash,
&BytesDownloaded,
- &LooseChunksBytes,
+ OnDownloadComplete = std::move(OnDownloadComplete),
+ OnWriteComplete = std::move(OnWriteComplete),
&WriteToDiskBytes,
- &DownloadedChunks,
- &ChunksComplete,
SequenceIndexChunksLeftToWriteCounters,
ChunkTargetPtrs = std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>(
ChunkTargetPtrs)](uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining) {
BytesDownloaded += Chunk.GetSize();
- LooseChunksBytes += Chunk.GetSize();
if (!AbortFlag.load())
{
Workload->TempFile.Write(Chunk.GetView(), Offset);
if (Chunk.GetSize() == BytesRemaining)
{
- DownloadedChunks++;
+ OnDownloadComplete(Workload->TempFile.FileSize());
Work.ScheduleWork(
WritePool, // GetSyncWorkerPool(),//
@@ -3690,8 +3870,7 @@ namespace {
ChunkHash,
Workload,
Offset,
- BytesRemaining,
- &ChunksComplete,
+ OnWriteComplete = std::move(OnWriteComplete),
&WriteToDiskBytes,
SequenceIndexChunksLeftToWriteCounters,
ChunkTargetPtrs](std::atomic<bool>&) {
@@ -3725,7 +3904,7 @@ namespace {
CompositeBuffer(std::move(CompressedPart)),
WriteToDiskBytes);
NeedHashVerify = false;
- ChunksComplete++;
+ OnWriteComplete();
}
else
{
@@ -3748,7 +3927,7 @@ namespace {
CompositeBuffer(Chunk),
OpenFileCache,
WriteToDiskBytes);
- ChunksComplete++;
+ OnWriteComplete();
}
}
@@ -3760,12 +3939,12 @@ namespace {
const uint32_t RemoteSequenceIndex = Location->SequenceIndex;
if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1)
{
- ZEN_TRACE_CPU("VerifyChunkHash");
-
const IoHash& SequenceRawHash =
RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
if (NeedHashVerify)
{
+ ZEN_TRACE_CPU("VerifyChunkHash");
+
const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(
GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
if (VerifyChunkHash != ChunkHash)
@@ -3816,6 +3995,7 @@ namespace {
const ChunkedFolderContent& RemoteContent,
const std::vector<ChunkBlockDescription>& BlockDescriptions,
const std::vector<IoHash>& LooseChunkHashes,
+ bool AllowPartialBlockRequests,
bool WipeTargetFolder,
FolderContent& OutLocalFolderState)
{
@@ -3967,11 +4147,17 @@ namespace {
}
}
- std::atomic<uint32_t> ChunkCountWritten = 0;
+ uint64_t TotalRequestCount = 0;
+ std::atomic<uint64_t> RequestsComplete = 0;
+ std::atomic<uint32_t> ChunkCountWritten = 0;
+ std::atomic<size_t> TotalPartWriteCount = 0;
+ std::atomic<size_t> WritePartsComplete = 0;
{
ZEN_TRACE_CPU("HandleChunks");
+ Stopwatch WriteTimer;
+
FilteredRate FilteredDownloadedBytesPerSecond;
FilteredRate FilteredWrittenBytesPerSecond;
@@ -4010,6 +4196,8 @@ namespace {
}
else
{
+ TotalRequestCount++;
+ TotalPartWriteCount++;
Work.ScheduleWork(
NetworkPool, // GetSyncWorkerPool(),//
[&, ChunkHash, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) {
@@ -4020,25 +4208,39 @@ namespace {
FilteredDownloadedBytesPerSecond.Start();
if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize)
{
- DownloadLargeBlob(Storage,
- Path / ZenTempChunkFolderName,
- CacheFolderPath,
- RemoteContent,
- RemoteLookup,
- BuildId,
- ChunkHash,
- PreferredMultipartChunkSize,
- ChunkTargetPtrs,
- SequenceIndexChunksLeftToWriteCounters,
- Work,
- WritePool,
- NetworkPool,
- WriteToDiskBytes,
- BytesDownloaded,
- LooseChunksBytes,
- DownloadedChunks,
- ChunkCountWritten,
- MultipartAttachmentCount);
+ DownloadLargeBlob(
+ Storage,
+ Path / ZenTempChunkFolderName,
+ CacheFolderPath,
+ RemoteContent,
+ RemoteLookup,
+ BuildId,
+ ChunkHash,
+ PreferredMultipartChunkSize,
+ ChunkTargetPtrs,
+ SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ WritePool,
+ NetworkPool,
+ WriteToDiskBytes,
+ BytesDownloaded,
+ MultipartAttachmentCount,
+ [&](uint64_t BytesDownloaded) {
+ LooseChunksBytes += BytesDownloaded;
+ RequestsComplete++;
+ if (RequestsComplete == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ },
+ [&]() {
+ ChunkCountWritten++;
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
+ });
}
else
{
@@ -4049,6 +4251,11 @@ namespace {
}
BytesDownloaded += CompressedPart.GetSize();
LooseChunksBytes += CompressedPart.GetSize();
+ RequestsComplete++;
+ if (RequestsComplete == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
CompositeBuffer Payload = WriteToTempFileIfNeeded(CompositeBuffer(std::move(CompressedPart)),
Path / ZenTempChunkFolderName,
ChunkHash);
@@ -4077,6 +4284,11 @@ namespace {
CompositeBuffer(CompressedPart),
WriteToDiskBytes);
ChunkCountWritten++;
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
NeedHashVerify = false;
}
else
@@ -4096,10 +4308,17 @@ namespace {
OpenFileCache,
WriteToDiskBytes);
ChunkCountWritten++;
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
}
}
if (!AbortFlag)
{
+ WritePartsComplete++;
+
// Write tracking, updating this must be done without any files open
// (WriteFileCache)
for (const ChunkedContentLookup::ChunkSequenceLocation* Location :
@@ -4109,12 +4328,12 @@ namespace {
if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(
1) == 1)
{
- ZEN_TRACE_CPU("UpdateFolder_VerifyHash");
-
const IoHash& SequenceRawHash =
RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
if (NeedHashVerify)
{
+ ZEN_TRACE_CPU("UpdateFolder_VerifyHash");
+
const IoHash VerifyChunkHash =
IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(
GetTempChunkedSequenceFileName(CacheFolderPath,
@@ -4155,6 +4374,8 @@ namespace {
break;
}
+ TotalPartWriteCount++;
+
Work.ScheduleWork(
WritePool, // GetSyncWorkerPool(),//
[&, CopyDataIndex](std::atomic<bool>&) {
@@ -4166,245 +4387,428 @@ namespace {
const CacheCopyData& CopyData = CacheCopyDatas[CopyDataIndex];
const uint32_t LocalPathIndex = LocalLookup.SequenceIndexFirstPathIndex[CopyData.LocalSequenceIndex];
const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred();
- if (!CopyData.TargetChunkLocationPtrs.empty())
- {
- uint64_t CacheLocalFileBytesRead = 0;
+ ZEN_ASSERT(!CopyData.TargetChunkLocationPtrs.empty());
- size_t TargetStart = 0;
- const std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> AllTargets(
- CopyData.TargetChunkLocationPtrs);
+ uint64_t CacheLocalFileBytesRead = 0;
- struct WriteOp
- {
- const ChunkedContentLookup::ChunkSequenceLocation* Target;
- uint64_t CacheFileOffset;
- uint64_t ChunkSize;
- };
+ size_t TargetStart = 0;
+ const std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> AllTargets(
+ CopyData.TargetChunkLocationPtrs);
+
+ struct WriteOp
+ {
+ const ChunkedContentLookup::ChunkSequenceLocation* Target;
+ uint64_t CacheFileOffset;
+ uint64_t ChunkSize;
+ };
- std::vector<WriteOp> WriteOps;
- WriteOps.reserve(AllTargets.size());
+ std::vector<WriteOp> WriteOps;
+ WriteOps.reserve(AllTargets.size());
- for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets)
+ for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets)
+ {
+ std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> TargetRange =
+ AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount);
+ for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange)
{
- std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> TargetRange =
- AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount);
- for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange)
- {
- WriteOps.push_back(WriteOp{.Target = Target,
- .CacheFileOffset = ChunkTarget.CacheFileOffset,
- .ChunkSize = ChunkTarget.ChunkRawSize});
- }
- TargetStart += ChunkTarget.TargetChunkLocationCount;
+ WriteOps.push_back(WriteOp{.Target = Target,
+ .CacheFileOffset = ChunkTarget.CacheFileOffset,
+ .ChunkSize = ChunkTarget.ChunkRawSize});
}
+ TargetStart += ChunkTarget.TargetChunkLocationCount;
+ }
- std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) {
- if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
- {
- return true;
- }
- else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
- {
- return false;
- }
- if (Lhs.Target->Offset < Rhs.Target->Offset)
- {
- return true;
- }
+ std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) {
+ if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
+ {
+ return true;
+ }
+ else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
+ {
return false;
- });
+ }
+ if (Lhs.Target->Offset < Rhs.Target->Offset)
+ {
+ return true;
+ }
+ return false;
+ });
- if (!AbortFlag)
+ if (!AbortFlag)
+ {
+ BufferedOpenFile SourceFile(LocalFilePath);
+ WriteFileCache OpenFileCache;
+ for (const WriteOp& Op : WriteOps)
{
- BufferedOpenFile SourceFile(LocalFilePath);
- WriteFileCache OpenFileCache;
- for (const WriteOp& Op : WriteOps)
+ if (AbortFlag)
{
- if (AbortFlag)
- {
- break;
- }
- const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
- ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() <=
- RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]);
- ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() > 0);
- const uint32_t RemotePathIndex = RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex];
- const uint64_t ChunkSize = Op.ChunkSize;
- CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ChunkSize);
-
- ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]);
-
- OpenFileCache.WriteToFile<CompositeBuffer>(
- RemoteSequenceIndex,
- [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) {
- return GetTempChunkedSequenceFileName(
- CacheFolderPath,
- RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
- },
- ChunkSource,
- Op.Target->Offset,
- RemoteContent.RawSizes[RemotePathIndex]);
- WriteToDiskBytes += ChunkSize;
- CacheLocalFileBytesRead += ChunkSize; // TODO: This should be the sum of unique chunk sizes?
+ break;
}
+ const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() <=
+ RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]);
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() > 0);
+ const uint32_t RemotePathIndex = RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex];
+ const uint64_t ChunkSize = Op.ChunkSize;
+ CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ChunkSize);
+
+ ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]);
+
+ OpenFileCache.WriteToFile<CompositeBuffer>(
+ RemoteSequenceIndex,
+ [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) {
+ return GetTempChunkedSequenceFileName(
+ CacheFolderPath,
+ RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
+ },
+ ChunkSource,
+ Op.Target->Offset,
+ RemoteContent.RawSizes[RemotePathIndex]);
+ WriteToDiskBytes += ChunkSize;
+ CacheLocalFileBytesRead += ChunkSize; // TODO: This should be the sum of unique chunk sizes?
}
- if (!AbortFlag)
+ }
+ if (!AbortFlag)
+ {
+ // Write tracking, updating this must be done without any files open (WriteFileCache)
+ for (const WriteOp& Op : WriteOps)
{
- if (!AbortFlag)
+ ZEN_TRACE_CPU("UpdateFolder_Copy_VerifyHash");
+
+ const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
+ if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1)
{
- // Write tracking, updating this must be done without any files open (WriteFileCache)
- for (const WriteOp& Op : WriteOps)
+ const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(
+ GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
+ if (VerifyChunkHash != SequenceRawHash)
{
- ZEN_TRACE_CPU("UpdateFolder_Copy_VerifyHash");
-
- const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
- if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1)
- {
- const IoHash& SequenceRawHash =
- RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(
- GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
- if (VerifyChunkHash != SequenceRawHash)
- {
- throw std::runtime_error(
- fmt::format("Written chunk sequence {} hash does not match expected hash {}",
- VerifyChunkHash,
- SequenceRawHash));
- }
-
- ZEN_TRACE_CPU("UpdateFolder_Copy_rename");
- std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash),
- GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash));
- }
+ throw std::runtime_error(
+ fmt::format("Written chunk sequence {} hash does not match expected hash {}",
+ VerifyChunkHash,
+ SequenceRawHash));
}
- }
- ChunkCountWritten += gsl::narrow<uint32_t>(CopyData.ChunkTargets.size());
- ZEN_DEBUG("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), LocalContent.Paths[LocalPathIndex]);
+ ZEN_TRACE_CPU("UpdateFolder_Copy_rename");
+ std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash),
+ GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash));
+ }
}
+
+ ChunkCountWritten += gsl::narrow<uint32_t>(CopyData.ChunkTargets.size());
+ ZEN_DEBUG("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), LocalContent.Paths[LocalPathIndex]);
+ }
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
}
}
},
Work.DefaultErrorFunction());
}
- size_t BlockCount = BlockDescriptions.size();
- std::atomic<size_t> BlocksComplete = 0;
-
- auto IsBlockNeeded = [&RemoteContent, &RemoteLookup, &RemoteChunkIndexNeedsCopyFromSourceFlags](
- const ChunkBlockDescription& BlockDescription) -> bool {
- for (const IoHash& ChunkHash : BlockDescription.ChunkRawHashes)
- {
- if (auto It = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); It != RemoteLookup.ChunkHashToChunkIndex.end())
- {
- const uint32_t RemoteChunkIndex = It->second;
- if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex])
- {
- return true;
- }
- }
- }
- return false;
+ size_t BlockCount = BlockDescriptions.size();
+
+ std::vector<bool> ChunkIsPickedUpByBlock(RemoteContent.ChunkedContent.ChunkHashes.size(), false);
+ auto GetNeededChunkBlockIndexes = [&RemoteContent,
+ &RemoteLookup,
+ &RemoteChunkIndexNeedsCopyFromSourceFlags,
+ &ChunkIsPickedUpByBlock](const ChunkBlockDescription& BlockDescription) {
+ std::vector<uint32_t> NeededBlockChunkIndexes;
+ for (uint32_t ChunkBlockIndex = 0; ChunkBlockIndex < BlockDescription.ChunkRawHashes.size(); ChunkBlockIndex++)
+ {
+ const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex];
+ if (auto It = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); It != RemoteLookup.ChunkHashToChunkIndex.end())
+ {
+ const uint32_t RemoteChunkIndex = It->second;
+ if (!ChunkIsPickedUpByBlock[RemoteChunkIndex])
+ {
+ if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex])
+ {
+ ChunkIsPickedUpByBlock[RemoteChunkIndex] = true;
+ NeededBlockChunkIndexes.push_back(ChunkBlockIndex);
+ }
+ }
+ }
+ }
+ return NeededBlockChunkIndexes;
};
- size_t BlocksNeededCount = 0;
+ size_t BlocksNeededCount = 0;
+ uint64_t AllBlocksSize = 0;
+ uint64_t AllBlocksFetch = 0;
+ uint64_t AllBlocksSlack = 0;
+ uint64_t AllBlockRequests = 0;
+ uint64_t AllBlockChunksSize = 0;
for (size_t BlockIndex = 0; BlockIndex < BlockCount; BlockIndex++)
{
if (Work.IsAborted())
{
break;
}
- if (IsBlockNeeded(BlockDescriptions[BlockIndex]))
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+ const std::vector<uint32_t> BlockChunkIndexNeeded = GetNeededChunkBlockIndexes(BlockDescription);
+ if (!BlockChunkIndexNeeded.empty())
{
- BlocksNeededCount++;
- Work.ScheduleWork(
- NetworkPool,
- [&, BlockIndex](std::atomic<bool>&) {
- if (!AbortFlag)
+ bool WantsToDoPartialBlockDownload = BlockChunkIndexNeeded.size() < BlockDescription.ChunkRawHashes.size();
+ bool CanDoPartialBlockDownload = (BlockDescription.HeaderSize > 0) && (BlockDescription.ChunkCompressedLengths.size() ==
+ BlockDescription.ChunkRawHashes.size());
+ if (AllowPartialBlockRequests && WantsToDoPartialBlockDownload && CanDoPartialBlockDownload)
+ {
+ struct BlockRangeDescriptor
+ {
+ uint64_t RangeStart = 0;
+ uint64_t RangeLength = 0;
+ uint32_t ChunkBlockIndexStart = 0;
+ uint32_t ChunkBlockIndexCount = 0;
+ };
+ std::vector<BlockRangeDescriptor> BlockRanges;
+
+ ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_PartialAnalisys");
+
+ uint32_t NeedBlockChunkIndexOffset = 0;
+ uint32_t ChunkBlockIndex = 0;
+ uint32_t CurrentOffset =
+ gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize);
+
+ BlockRangeDescriptor NextRange;
+ while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() &&
+ ChunkBlockIndex < BlockDescription.ChunkRawHashes.size())
+ {
+ const uint32_t ChunkCompressedLength = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex];
+ if (ChunkBlockIndex < BlockChunkIndexNeeded[NeedBlockChunkIndexOffset])
{
- ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_Read");
-
- FilteredDownloadedBytesPerSecond.Start();
- IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockDescriptions[BlockIndex].BlockHash);
- if (!BlockBuffer)
+ if (NextRange.RangeLength > 0)
{
- throw std::runtime_error(fmt::format("Block {} is missing", BlockDescriptions[BlockIndex].BlockHash));
+ BlockRanges.push_back(NextRange);
+ NextRange = {};
}
- BytesDownloaded += BlockBuffer.GetSize();
- BlockBytes += BlockBuffer.GetSize();
- DownloadedBlocks++;
- CompositeBuffer Payload = WriteToTempFileIfNeeded(CompositeBuffer(std::move(BlockBuffer)),
- Path / ZenTempBlockFolderName,
- BlockDescriptions[BlockIndex].BlockHash);
-
- if (!AbortFlag)
+ ChunkBlockIndex++;
+ CurrentOffset += ChunkCompressedLength;
+ }
+ else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset])
+ {
+ AllBlockChunksSize += ChunkCompressedLength;
+ if (NextRange.RangeLength == 0)
{
- Work.ScheduleWork(
- WritePool,
- [&, BlockIndex, BlockBuffer = std::move(Payload)](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_Write");
-
- FilteredWrittenBytesPerSecond.Start();
- IoHash BlockRawHash;
- uint64_t BlockRawSize;
- CompressedBuffer CompressedBlockBuffer =
- CompressedBuffer::FromCompressed(std::move(BlockBuffer), BlockRawHash, BlockRawSize);
- if (!CompressedBlockBuffer)
- {
- throw std::runtime_error(fmt::format("Block {} is not a compressed buffer",
- BlockDescriptions[BlockIndex].BlockHash));
- }
+ NextRange.RangeStart = CurrentOffset;
+ NextRange.ChunkBlockIndexStart = ChunkBlockIndex;
+ }
+ NextRange.RangeLength += ChunkCompressedLength;
+ NextRange.ChunkBlockIndexCount++;
+ ChunkBlockIndex++;
+ CurrentOffset += ChunkCompressedLength;
+ NeedBlockChunkIndexOffset++;
+ }
+ else
+ {
+ ZEN_ASSERT(false);
+ }
+ }
+ AllBlocksSize += CurrentOffset;
+ if (NextRange.RangeLength > 0)
+ {
+ BlockRanges.push_back(NextRange);
+ NextRange = {};
+ }
- if (BlockRawHash != BlockDescriptions[BlockIndex].BlockHash)
- {
- throw std::runtime_error(fmt::format("Block {} header has a mismatching raw hash {}",
- BlockDescriptions[BlockIndex].BlockHash,
- BlockRawHash));
- }
+ ZEN_ASSERT(!BlockRanges.empty());
+ std::vector<BlockRangeDescriptor> CollapsedBlockRanges;
+ auto It = BlockRanges.begin();
+ CollapsedBlockRanges.push_back(*It++);
+ uint64_t TotalSlack = 0;
+ while (It != BlockRanges.end())
+ {
+ BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back();
+ uint64_t Slack = It->RangeStart - (LastRange.RangeStart + LastRange.RangeLength);
+ uint64_t BothRangeSize = It->RangeLength + LastRange.RangeLength;
+ if (Slack <= Max(BothRangeSize / 8, 64u * 1024u)) // Made up heuristic - we'll see how it pans out
+ {
+ LastRange.ChunkBlockIndexCount =
+ (It->ChunkBlockIndexStart + It->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart;
+ LastRange.RangeLength = (It->RangeStart + It->RangeLength) - LastRange.RangeStart;
+ TotalSlack += Slack;
+ }
+ else
+ {
+ CollapsedBlockRanges.push_back(*It);
+ }
+ ++It;
+ }
+
+ uint64_t TotalFetch = 0;
+ for (const BlockRangeDescriptor& Range : CollapsedBlockRanges)
+ {
+ TotalFetch += Range.RangeLength;
+ }
+
+ AllBlocksFetch += TotalFetch;
+ AllBlocksSlack += TotalSlack;
+ BlocksNeededCount++;
+ AllBlockRequests += CollapsedBlockRanges.size();
+
+ for (size_t BlockRangeIndex = 0; BlockRangeIndex < CollapsedBlockRanges.size(); BlockRangeIndex++)
+ {
+ TotalRequestCount++;
+ TotalPartWriteCount++;
+ const BlockRangeDescriptor BlockRange = CollapsedBlockRanges[BlockRangeIndex];
+ // Partial block schedule
+ Work.ScheduleWork(
+ NetworkPool, // NetworkPool, // GetSyncWorkerPool()
+ [&, BlockIndex, BlockRange](std::atomic<bool>&) {
+ ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_PartialGet");
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+
+ FilteredDownloadedBytesPerSecond.Start();
+ IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId,
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength);
+ if (!BlockBuffer)
+ {
+ throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
+ }
+ BytesDownloaded += BlockBuffer.GetSize();
+ BlockBytes += BlockBuffer.GetSize();
+ DownloadedBlocks++;
+ CompositeBuffer Payload = WriteToTempFileIfNeeded(CompositeBuffer(std::move(BlockBuffer)),
+ Path / ZenTempBlockFolderName,
+ BlockDescription.BlockHash,
+ fmt::format("_{}", BlockRange.RangeStart));
+ RequestsComplete++;
+ if (RequestsComplete == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
- CompositeBuffer DecompressedBlockBuffer = CompressedBlockBuffer.DecompressToComposite();
- if (!DecompressedBlockBuffer)
+ if (!AbortFlag)
+ {
+ Work.ScheduleWork(
+ WritePool, // WritePool, // GetSyncWorkerPool(),
+ [&, BlockIndex, BlockRange, BlockPartialBuffer = std::move(Payload)](std::atomic<bool>&) {
+ if (!AbortFlag)
{
- throw std::runtime_error(fmt::format("Block {} failed to decompress",
- BlockDescriptions[BlockIndex].BlockHash));
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+
+ FilteredWrittenBytesPerSecond.Start();
+
+ if (!WritePartialBlockToDisk(
+ CacheFolderPath,
+ RemoteContent,
+ BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ BlockPartialBuffer,
+ BlockRange.ChunkBlockIndexStart,
+ BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1,
+ RemoteLookup,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ ChunkCountWritten,
+ WriteToDiskBytes))
+ {
+ throw std::runtime_error(
+ fmt::format("Partial block {} is malformed", BlockDescription.BlockHash));
+ }
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
}
+ },
+ [&, BlockIndex](const std::exception& Ex, std::atomic<bool>&) {
+ ZEN_ERROR("Failed writing block {}. Reason: {}",
+ BlockDescriptions[BlockIndex].BlockHash,
+ Ex.what());
+ AbortFlag = true;
+ });
+ }
+ },
+ Work.DefaultErrorFunction());
+ }
+ }
+ else
+ {
+ BlocksNeededCount++;
+ TotalRequestCount++;
+ TotalPartWriteCount++;
+
+ Work.ScheduleWork(
+ NetworkPool, // GetSyncWorkerPool(), // NetworkPool,
+ [&, BlockIndex](std::atomic<bool>&) {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_Get");
+
+ // Full block schedule
- ZEN_ASSERT_SLOW(BlockDescriptions[BlockIndex].BlockHash ==
- IoHash::HashBuffer(DecompressedBlockBuffer));
-
- if (!WriteBlockToDisk(CacheFolderPath,
- RemoteContent,
- SequenceIndexChunksLeftToWriteCounters,
- DecompressedBlockBuffer,
- RemoteLookup,
- RemoteChunkIndexNeedsCopyFromSourceFlags.data(),
- ChunkCountWritten,
- WriteToDiskBytes))
+ FilteredDownloadedBytesPerSecond.Start();
+ IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockDescription.BlockHash);
+ if (!BlockBuffer)
+ {
+ throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
+ }
+ BytesDownloaded += BlockBuffer.GetSize();
+ BlockBytes += BlockBuffer.GetSize();
+ DownloadedBlocks++;
+ RequestsComplete++;
+ if (RequestsComplete == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+
+ CompositeBuffer Payload = WriteToTempFileIfNeeded(CompositeBuffer(std::move(BlockBuffer)),
+ Path / ZenTempBlockFolderName,
+ BlockDescription.BlockHash);
+ if (!AbortFlag)
+ {
+ Work.ScheduleWork(
+ WritePool,
+ [&, BlockIndex, BlockBuffer = std::move(Payload)](std::atomic<bool>&) {
+ if (!AbortFlag)
{
- throw std::runtime_error(
- fmt::format("Block {} is malformed", BlockDescriptions[BlockIndex].BlockHash));
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+
+ FilteredWrittenBytesPerSecond.Start();
+ if (!WriteBlockToDisk(CacheFolderPath,
+ RemoteContent,
+ BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ BlockBuffer,
+ RemoteLookup,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ ChunkCountWritten,
+ WriteToDiskBytes))
+ {
+ throw std::runtime_error(
+ fmt::format("Block {} is malformed", BlockDescription.BlockHash));
+ }
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
}
- BlocksComplete++;
- }
- },
- [&, BlockIndex](const std::exception& Ex, std::atomic<bool>&) {
- ZEN_ERROR("Failed writing block {}. Reason: {}",
- BlockDescriptions[BlockIndex].BlockHash,
- Ex.what());
- AbortFlag = true;
- });
+ },
+ Work.DefaultErrorFunction());
+ }
}
- }
- },
- Work.DefaultErrorFunction());
+ },
+ Work.DefaultErrorFunction());
+ }
}
else
{
ZEN_DEBUG("Skipping block {} due to cache reuse", BlockDescriptions[BlockIndex].BlockHash);
}
}
-
+ ZEN_DEBUG("Fetching {} with {} slack (ideal {}) out of {} using {} requests for {} blocks",
+ NiceBytes(AllBlocksFetch),
+ NiceBytes(AllBlocksSlack),
+ NiceBytes(AllBlockChunksSize),
+ NiceBytes(AllBlocksSize),
+ AllBlockRequests,
+ BlocksNeededCount);
ZEN_TRACE_CPU("HandleChunks_Wait");
Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
@@ -4412,13 +4816,13 @@ namespace {
ZEN_ASSERT(ChunkCountToWrite >= ChunkCountWritten.load());
FilteredWrittenBytesPerSecond.Update(WriteToDiskBytes.load());
FilteredDownloadedBytesPerSecond.Update(BytesDownloaded.load());
- std::string Details = fmt::format("{}/{} chunks. {}/{} blocks. {} {}bits/s downloaded. {} {}B/s written",
- ChunkCountWritten.load(),
- ChunkCountToWrite,
- BlocksComplete.load(),
- BlocksNeededCount,
+ std::string Details = fmt::format("{}/{} ({} {}bits/s) downloaded. {}/{} ({} {}B/s) written.",
+ RequestsComplete.load(),
+ TotalRequestCount,
NiceBytes(BytesDownloaded.load()),
NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8),
+ ChunkCountWritten.load(),
+ ChunkCountToWrite,
NiceBytes(WriteToDiskBytes.load()),
NiceNum(FilteredWrittenBytesPerSecond.GetCurrent()));
WriteProgressBar.UpdateState({.Task = "Writing chunks ",
@@ -4437,6 +4841,13 @@ namespace {
}
WriteProgressBar.Finish();
+
+ ZEN_CONSOLE("Downloaded {} ({}bits/s). Wrote {} ({}B/s). Completed in {}",
+ NiceBytes(BytesDownloaded.load()),
+ NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), BytesDownloaded * 8)),
+ NiceBytes(WriteToDiskBytes.load()),
+ NiceNum(GetBytesPerSecond(FilteredWrittenBytesPerSecond.GetElapsedTimeUS(), WriteToDiskBytes.load())),
+ NiceTimeSpanMs(WriteTimer.GetElapsedTimeMs()));
}
for (const auto& SequenceIndexChunksLeftToWriteCounter : SequenceIndexChunksLeftToWriteCounters)
@@ -4455,6 +4866,7 @@ namespace {
});
// Move all files we will reuse to cache folder
+ // TODO: If WipeTargetFolder is false we could check which files are already correct and leave them in place
for (uint32_t LocalPathIndex = 0; LocalPathIndex < LocalContent.Paths.size(); LocalPathIndex++)
{
const IoHash& RawHash = LocalContent.RawHashes[LocalPathIndex];
@@ -5186,6 +5598,7 @@ namespace {
std::span<const std::string> BuildPartNames,
const std::filesystem::path& Path,
bool AllowMultiparts,
+ bool AllowPartialBlockRequests,
bool WipeTargetFolder,
bool PostDownloadVerify)
{
@@ -5202,8 +5615,8 @@ namespace {
CreateDirectories(Path / ZenTempBlockFolderName);
CreateDirectories(Path / ZenTempChunkFolderName); // TODO: Don't clear this - pick up files -> chunks to use
CreateDirectories(Path /
- ZenTempCacheFolderName); // TODO: Don't clear this - pick up files and use as sequences (non .tmp extension) and
- // delete .tmp (maybe?) - chunk them? How do we know the file is worth chunking?
+ ZenTempCacheFolderName); // TODO: Don't clear this - pick up files and use as sequences (non .tmp extension)
+ // and delete .tmp (maybe?) - chunk them? How do we know the file is worth chunking?
std::uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u;
@@ -5311,6 +5724,7 @@ namespace {
RemoteContent,
BlockDescriptions,
LooseChunkHashes,
+ AllowPartialBlockRequests,
WipeTargetFolder,
LocalFolderState);
@@ -5705,6 +6119,12 @@ BuildsCommand::BuildsCommand()
"Allow large attachments to be transfered using multipart protocol. Defaults to true.",
cxxopts::value(m_AllowMultiparts),
"<allowmultipart>");
+ m_DownloadOptions.add_option("",
+ "",
+ "allow-partial-block-requests",
+ "Allow request for partial chunk blocks. Defaults to true.",
+ cxxopts::value(m_AllowPartialBlockRequests),
+ "<allowpartialblockrequests>");
m_DownloadOptions
.add_option("", "", "verify", "Enable post download verify of all tracked files", cxxopts::value(m_PostDownloadVerify), "<verify>");
m_DownloadOptions.parse_positional({"local-path", "build-id", "build-part-name"});
@@ -5728,6 +6148,18 @@ BuildsCommand::BuildsCommand()
AddOutputOptions(m_TestOptions);
m_TestOptions.add_options()("h,help", "Print help");
m_TestOptions.add_option("", "l", "local-path", "Root file system folder used as base", cxxopts::value(m_Path), "<local-path>");
+ m_TestOptions.add_option("",
+ "",
+ "allow-multipart",
+ "Allow large attachments to be transfered using multipart protocol. Defaults to true.",
+ cxxopts::value(m_AllowMultiparts),
+ "<allowmultipart>");
+ m_TestOptions.add_option("",
+ "",
+ "allow-partial-block-requests",
+ "Allow request for partial chunk blocks. Defaults to true.",
+ cxxopts::value(m_AllowPartialBlockRequests),
+ "<allowpartialblockrequests>");
m_TestOptions.parse_positional({"local-path"});
m_TestOptions.positional_help("local-path");
@@ -6037,7 +6469,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
m_Bucket,
GeneratedBuildId ? "Generated " : "",
BuildId);
- CreateDirectories(m_Path / ZenTempStorageFolderName);
Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName);
StorageName = "Cloud DDC";
}
@@ -6175,7 +6606,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
m_Namespace,
m_Bucket,
BuildId);
- CreateDirectories(m_Path / ZenTempStorageFolderName);
Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName);
StorageName = "Cloud DDC";
}
@@ -6190,7 +6620,15 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help()));
}
- DownloadFolder(*Storage, BuildId, BuildPartIds, m_BuildPartNames, m_Path, m_AllowMultiparts, m_Clean, m_PostDownloadVerify);
+ DownloadFolder(*Storage,
+ BuildId,
+ BuildPartIds,
+ m_BuildPartNames,
+ m_Path,
+ m_AllowMultiparts,
+ m_AllowPartialBlockRequests,
+ m_Clean,
+ m_PostDownloadVerify);
if (false)
{
@@ -6275,7 +6713,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
m_Namespace,
m_Bucket,
BuildId);
- CreateDirectories(m_Path / ZenTempStorageFolderName);
Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName);
StorageName = "Cloud DDC";
}
@@ -6335,7 +6772,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
const std::filesystem::path DownloadPath = m_Path.parent_path() / (m_BuildPartName + "_download");
ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}'", BuildId, BuildPartId, m_BuildPartName, DownloadPath);
- DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, true, true);
+ DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, true, true);
if (AbortFlag)
{
ZEN_CONSOLE("Download failed.");
@@ -6347,7 +6784,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
BuildPartId,
m_BuildPartName,
DownloadPath);
- DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false, true);
+ DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed. (identical target)");
@@ -6449,7 +6886,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
BuildPartId,
m_BuildPartName,
DownloadPath);
- DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false, true);
+ DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed. (scrambled target)");
@@ -6487,7 +6924,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (original)", BuildId, BuildPartId, m_BuildPartName, DownloadPath);
- DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false, true);
+ DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed.");
@@ -6495,7 +6932,15 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath);
- DownloadFolder(*Storage, BuildId2, {BuildPartId2}, {}, DownloadPath, m_AllowMultiparts, false, true);
+ DownloadFolder(*Storage,
+ BuildId2,
+ {BuildPartId2},
+ {},
+ DownloadPath,
+ m_AllowMultiparts,
+ m_AllowPartialBlockRequests,
+ false,
+ true);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed.");
@@ -6503,7 +6948,15 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
ZEN_CONSOLE("\nRe-download Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath);
- DownloadFolder(*Storage, BuildId2, {BuildPartId2}, {}, DownloadPath, m_AllowMultiparts, false, true);
+ DownloadFolder(*Storage,
+ BuildId2,
+ {BuildPartId2},
+ {},
+ DownloadPath,
+ m_AllowMultiparts,
+ m_AllowPartialBlockRequests,
+ false,
+ true);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed.");
@@ -6549,7 +7002,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
m_Namespace,
m_Bucket,
BuildId);
- CreateDirectories(m_Path / ZenTempStorageFolderName);
Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName);
StorageName = "Cloud DDC";
}
@@ -6617,7 +7069,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
m_Namespace,
m_Bucket,
BuildId);
- CreateDirectories(m_Path / ZenTempStorageFolderName);
Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName);
StorageName = "Cloud DDC";
}