aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-03-07 09:58:20 +0100
committerGitHub Enterprise <[email protected]>2025-03-07 09:58:20 +0100
commit9b24647facccc9c7848a52f1f4c5e32055bf2f01 (patch)
treed798889a2b3f6a0d72331890b75e92b0159ce0c3 /src
parentreduced memory churn using fixed_xxx containers (#236) (diff)
downloadzen-9b24647facccc9c7848a52f1f4c5e32055bf2f01.tar.xz
zen-9b24647facccc9c7848a52f1f4c5e32055bf2f01.zip
partial block fetch (#298)
- Improvement: Do partial requests of blocks if not all of the block is needed - Improvement: Better progress/statistics on download - Bugfix: Ensure that temporary folder for Jupiter downloads exists during verify phase
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/builds_cmd.cpp1099
-rw-r--r--src/zen/cmds/builds_cmd.h1
-rw-r--r--src/zenhttp/httpclient.cpp161
-rw-r--r--src/zenutil/filebuildstorage.cpp17
-rw-r--r--src/zenutil/include/zenutil/buildstorage.h5
-rw-r--r--src/zenutil/include/zenutil/jupiter/jupitersession.h4
-rw-r--r--src/zenutil/jupiter/jupiterbuildstorage.cpp10
-rw-r--r--src/zenutil/jupiter/jupitersession.cpp12
8 files changed, 907 insertions, 402 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index eb0650c4d..1c9476b96 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -231,7 +231,10 @@ namespace {
return AuthToken;
}
- CompositeBuffer WriteToTempFileIfNeeded(const CompositeBuffer& Buffer, const std::filesystem::path& TempFolderPath, const IoHash& Hash)
+ CompositeBuffer WriteToTempFileIfNeeded(const CompositeBuffer& Buffer,
+ const std::filesystem::path& TempFolderPath,
+ const IoHash& Hash,
+ const std::string& Suffix = {})
{
// If this is a file based buffer or a compressed buffer with a memory-based header, we don't need to rewrite to disk to save memory
std::span<const SharedBuffer> Segments = Buffer.GetSegments();
@@ -241,7 +244,7 @@ namespace {
{
return Buffer;
}
- std::filesystem::path TempFilePath = (TempFolderPath / Hash.ToHexString()).make_preferred();
+ std::filesystem::path TempFilePath = (TempFolderPath / (Hash.ToHexString() + Suffix)).make_preferred();
return CompositeBuffer(WriteToTempFile(Buffer, TempFilePath));
}
@@ -302,7 +305,7 @@ namespace {
return FilteredPerSecond;
}
- uint64_t GetElapsedTime() const
+ uint64_t GetElapsedTimeUS() const
{
if (StartTimeUS == (uint64_t)-1)
{
@@ -1738,8 +1741,8 @@ namespace {
ProgressBar.Finish();
- GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGeneratedBytesPerSecond.GetElapsedTime();
- UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTime();
+ GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGeneratedBytesPerSecond.GetElapsedTimeUS();
+ UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS();
}
}
@@ -2069,9 +2072,9 @@ namespace {
});
ProgressBar.Finish();
- UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTime();
- GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGenerateBlockBytesPerSecond.GetElapsedTime();
- LooseChunksStats.CompressChunksElapsedWallTimeUS = FilteredCompressedBytesPerSecond.GetElapsedTime();
+ UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS();
+ GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGenerateBlockBytesPerSecond.GetElapsedTimeUS();
+ LooseChunksStats.CompressChunksElapsedWallTimeUS = FilteredCompressedBytesPerSecond.GetElapsedTimeUS();
}
}
@@ -3358,17 +3361,8 @@ namespace {
return ChunkTargetPtrs;
};
- bool WriteBlockToDisk(const std::filesystem::path& CacheFolderPath,
- const ChunkedFolderContent& RemoteContent,
- std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
- const CompositeBuffer& DecompressedBlockBuffer,
- const ChunkedContentLookup& Lookup,
- std::atomic<bool>* RemoteChunkIndexNeedsCopyFromSourceFlags,
- std::atomic<uint32_t>& OutChunksComplete,
- std::atomic<uint64_t>& OutBytesWritten)
+ struct BlockWriteOps
{
- ZEN_TRACE_CPU("WriteBlockToDisk");
-
std::vector<CompositeBuffer> ChunkBuffers;
struct WriteOpData
{
@@ -3376,7 +3370,79 @@ namespace {
size_t ChunkBufferIndex;
};
std::vector<WriteOpData> WriteOps;
+ };
+ void WriteBlockChunkOps(const std::filesystem::path& CacheFolderPath,
+ const ChunkedFolderContent& RemoteContent,
+ const ChunkedContentLookup& Lookup,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ const BlockWriteOps Ops,
+ std::atomic<uint32_t>& OutChunksComplete,
+ std::atomic<uint64_t>& OutBytesWritten)
+ {
+ ZEN_TRACE_CPU("WriteBlockChunkOps");
+ {
+ WriteFileCache OpenFileCache;
+ for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps)
+ {
+ if (AbortFlag)
+ {
+ break;
+ }
+ const CompositeBuffer& Chunk = Ops.ChunkBuffers[WriteOp.ChunkBufferIndex];
+ const uint32_t SequenceIndex = WriteOp.Target->SequenceIndex;
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() <=
+ RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]);
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() > 0);
+ const uint64_t ChunkSize = Chunk.GetSize();
+ const uint64_t FileOffset = WriteOp.Target->Offset;
+ const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex];
+ ZEN_ASSERT(FileOffset + ChunkSize <= RemoteContent.RawSizes[PathIndex]);
+
+ OpenFileCache.WriteToFile<CompositeBuffer>(
+ SequenceIndex,
+ [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) {
+ return GetTempChunkedSequenceFileName(CacheFolderPath,
+ RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
+ },
+ Chunk,
+ FileOffset,
+ RemoteContent.RawSizes[PathIndex]);
+ OutBytesWritten += ChunkSize;
+ }
+ }
+ if (!AbortFlag)
+ {
+ // Write tracking, updating this must be done without any files open (WriteFileCache)
+ for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps)
+ {
+ const uint32_t RemoteSequenceIndex = WriteOp.Target->SequenceIndex;
+ if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1)
+ {
+ const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ const IoHash VerifyChunkHash =
+ IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
+ if (VerifyChunkHash != SequenceRawHash)
+ {
+ throw std::runtime_error(
+ fmt::format("Written hunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash));
+ }
+ std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash),
+ GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash));
+ }
+ }
+ OutChunksComplete += gsl::narrow<uint32_t>(Ops.ChunkBuffers.size());
+ }
+ }
+
+ bool GetBlockWriteOps(const ChunkedFolderContent& RemoteContent,
+ const ChunkedContentLookup& Lookup,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
+ const CompositeBuffer& DecompressedBlockBuffer,
+ BlockWriteOps& OutOps)
+ {
+ ZEN_TRACE_CPU("GetBlockWriteOps");
SharedBuffer BlockBuffer = DecompressedBlockBuffer.Flatten();
uint64_t HeaderSize = 0;
if (IterateChunkBlock(
@@ -3402,91 +3468,207 @@ namespace {
ZEN_ASSERT(Decompressed.GetSize() == RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]);
for (const ChunkedContentLookup::ChunkSequenceLocation* Target : ChunkTargetPtrs)
{
- WriteOps.push_back(WriteOpData{.Target = Target, .ChunkBufferIndex = ChunkBuffers.size()});
+ OutOps.WriteOps.push_back(
+ BlockWriteOps::WriteOpData{.Target = Target, .ChunkBufferIndex = OutOps.ChunkBuffers.size()});
}
- ChunkBuffers.emplace_back(std::move(Decompressed));
+ OutOps.ChunkBuffers.emplace_back(std::move(Decompressed));
}
}
}
},
HeaderSize))
{
- if (!WriteOps.empty())
+ std::sort(OutOps.WriteOps.begin(),
+ OutOps.WriteOps.end(),
+ [](const BlockWriteOps::WriteOpData& Lhs, const BlockWriteOps::WriteOpData& Rhs) {
+ if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
+ {
+ return true;
+ }
+ if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
+ {
+ return false;
+ }
+ return Lhs.Target->Offset < Rhs.Target->Offset;
+ });
+
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ bool WriteBlockToDisk(const std::filesystem::path& CacheFolderPath,
+ const ChunkedFolderContent& RemoteContent,
+ const ChunkBlockDescription& BlockDescription,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ const CompositeBuffer& BlockBuffer,
+ const ChunkedContentLookup& Lookup,
+ std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
+ std::atomic<uint32_t>& OutChunksComplete,
+ std::atomic<uint64_t>& OutBytesWritten)
+ {
+ ZEN_TRACE_CPU("WriteBlockToDisk");
+
+ IoHash BlockRawHash;
+ uint64_t BlockRawSize;
+ CompressedBuffer CompressedBlockBuffer = CompressedBuffer::FromCompressed(BlockBuffer, BlockRawHash, BlockRawSize);
+ if (!CompressedBlockBuffer)
+ {
+ throw std::runtime_error(fmt::format("Block {} is not a compressed buffer", BlockDescription.BlockHash));
+ }
+
+ if (BlockRawHash != BlockDescription.BlockHash)
+ {
+ throw std::runtime_error(
+ fmt::format("Block {} header has a mismatching raw hash {}", BlockDescription.BlockHash, BlockRawHash));
+ }
+
+ CompositeBuffer DecompressedBlockBuffer = CompressedBlockBuffer.DecompressToComposite();
+ if (!DecompressedBlockBuffer)
+ {
+ throw std::runtime_error(fmt::format("Block {} failed to decompress", BlockDescription.BlockHash));
+ }
+
+ ZEN_ASSERT_SLOW(BlockDescription.BlockHash == IoHash::HashBuffer(DecompressedBlockBuffer));
+
+ BlockWriteOps Ops;
+ if (GetBlockWriteOps(RemoteContent,
+ Lookup,
+ SequenceIndexChunksLeftToWriteCounters,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ DecompressedBlockBuffer,
+ Ops))
+ {
+ WriteBlockChunkOps(CacheFolderPath,
+ RemoteContent,
+ Lookup,
+ SequenceIndexChunksLeftToWriteCounters,
+ Ops,
+ OutChunksComplete,
+ OutBytesWritten);
+ return true;
+ }
+ return false;
+ }
+
+ bool GetPartialBlockWriteOps(const ChunkedFolderContent& RemoteContent,
+ const ChunkedContentLookup& Lookup,
+ const ChunkBlockDescription& BlockDescription,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
+ const CompositeBuffer& PartialBlockBuffer,
+ uint32_t FirstIncludedBlockChunkIndex,
+ uint32_t LastIncludedBlockChunkIndex,
+ BlockWriteOps& OutOps)
+ {
+ ZEN_TRACE_CPU("GetPartialBlockWriteOps");
+ uint32_t OffsetInBlock = 0;
+ for (uint32_t ChunkBlockIndex = FirstIncludedBlockChunkIndex; ChunkBlockIndex <= LastIncludedBlockChunkIndex; ChunkBlockIndex++)
+ {
+ const uint32_t ChunkCompressedSize = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex];
+ const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex];
+ if (auto It = Lookup.ChunkHashToChunkIndex.find(ChunkHash); It != Lookup.ChunkHashToChunkIndex.end())
{
- if (!AbortFlag)
+ const uint32_t ChunkIndex = It->second;
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
+ GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, Lookup, ChunkIndex);
+
+ if (!ChunkTargetPtrs.empty())
{
- std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOpData& Lhs, const WriteOpData& Rhs) {
- if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
+ bool NeedsWrite = true;
+ if (RemoteChunkIndexNeedsCopyFromSourceFlags[ChunkIndex].compare_exchange_strong(NeedsWrite, false))
+ {
+ CompositeBuffer Chunk = PartialBlockBuffer.Mid(OffsetInBlock, ChunkCompressedSize);
+ IoHash VerifyChunkHash;
+ uint64_t VerifyRawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(Chunk, VerifyChunkHash, VerifyRawSize);
+ if (!Compressed)
{
- return true;
+ ZEN_ASSERT(false);
}
- if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
+ if (VerifyChunkHash != ChunkHash)
{
- return false;
+ ZEN_ASSERT(false);
}
- return Lhs.Target->Offset < Rhs.Target->Offset;
- });
-
- {
- WriteFileCache OpenFileCache;
- for (const WriteOpData& WriteOp : WriteOps)
+ if (VerifyRawSize != BlockDescription.ChunkRawLengths[ChunkBlockIndex])
{
- if (AbortFlag)
- {
- break;
- }
- const CompositeBuffer& Chunk = ChunkBuffers[WriteOp.ChunkBufferIndex];
- const uint32_t SequenceIndex = WriteOp.Target->SequenceIndex;
- ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() <=
- RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]);
- ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() > 0);
- const uint64_t ChunkSize = Chunk.GetSize();
- const uint64_t FileOffset = WriteOp.Target->Offset;
- const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex];
- ZEN_ASSERT(FileOffset + ChunkSize <= RemoteContent.RawSizes[PathIndex]);
-
- OpenFileCache.WriteToFile<CompositeBuffer>(
- SequenceIndex,
- [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) {
- return GetTempChunkedSequenceFileName(CacheFolderPath,
- RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
- },
- Chunk,
- FileOffset,
- RemoteContent.RawSizes[PathIndex]);
- OutBytesWritten += ChunkSize;
+ ZEN_ASSERT(false);
}
- }
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("WriteBlockToDisk_VerifyHash");
-
- // Write tracking, updating this must be done without any files open (WriteFileCache)
- for (const WriteOpData& WriteOp : WriteOps)
+ CompositeBuffer Decompressed = Compressed.DecompressToComposite();
+ if (!Decompressed)
{
- const uint32_t RemoteSequenceIndex = WriteOp.Target->SequenceIndex;
- if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1)
- {
- const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- const IoHash VerifyChunkHash = IoHash::HashBuffer(
- IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
- if (VerifyChunkHash != SequenceRawHash)
- {
- throw std::runtime_error(fmt::format("Written chunk sequence {} hash does not match expected hash {}",
- VerifyChunkHash,
- SequenceRawHash));
- }
- std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash),
- GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash));
- }
+ throw std::runtime_error(fmt::format("Decompression of build blob {} failed", ChunkHash));
}
+ ZEN_ASSERT_SLOW(ChunkHash == IoHash::HashBuffer(Decompressed));
+ ZEN_ASSERT(Decompressed.GetSize() == RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]);
+ for (const ChunkedContentLookup::ChunkSequenceLocation* Target : ChunkTargetPtrs)
+ {
+ OutOps.WriteOps.push_back(
+ BlockWriteOps::WriteOpData{.Target = Target, .ChunkBufferIndex = OutOps.ChunkBuffers.size()});
+ }
+ OutOps.ChunkBuffers.emplace_back(std::move(Decompressed));
}
- OutChunksComplete += gsl::narrow<uint32_t>(ChunkBuffers.size());
}
}
+
+ OffsetInBlock += ChunkCompressedSize;
+ }
+ std::sort(OutOps.WriteOps.begin(),
+ OutOps.WriteOps.end(),
+ [](const BlockWriteOps::WriteOpData& Lhs, const BlockWriteOps::WriteOpData& Rhs) {
+ if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
+ {
+ return true;
+ }
+ if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
+ {
+ return false;
+ }
+ return Lhs.Target->Offset < Rhs.Target->Offset;
+ });
+ return true;
+ }
+
+ bool WritePartialBlockToDisk(const std::filesystem::path& CacheFolderPath,
+ const ChunkedFolderContent& RemoteContent,
+ const ChunkBlockDescription& BlockDescription,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ const CompositeBuffer& PartialBlockBuffer,
+ uint32_t FirstIncludedBlockChunkIndex,
+ uint32_t LastIncludedBlockChunkIndex,
+ const ChunkedContentLookup& Lookup,
+ std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
+ std::atomic<uint32_t>& OutChunksComplete,
+ std::atomic<uint64_t>& OutBytesWritten)
+ {
+ ZEN_TRACE_CPU("WritePartialBlockToDisk");
+ BlockWriteOps Ops;
+ if (GetPartialBlockWriteOps(RemoteContent,
+ Lookup,
+ BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ PartialBlockBuffer,
+ FirstIncludedBlockChunkIndex,
+ LastIncludedBlockChunkIndex,
+ Ops))
+ {
+ WriteBlockChunkOps(CacheFolderPath,
+ RemoteContent,
+ Lookup,
+ SequenceIndexChunksLeftToWriteCounters,
+ Ops,
+ OutChunksComplete,
+ OutBytesWritten);
return true;
}
- return false;
+ else
+ {
+ return false;
+ }
}
SharedBuffer Decompress(const CompositeBuffer& CompressedChunk, const IoHash& ChunkHash, const uint64_t ChunkRawSize)
@@ -3571,6 +3753,7 @@ namespace {
CompositeBuffer&& CompressedPart,
std::atomic<uint64_t>& WriteToDiskBytes)
{
+ ZEN_TRACE_CPU("StreamDecompress");
const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash);
TemporaryFile DecompressedTemp;
std::error_code Ec;
@@ -3633,10 +3816,9 @@ namespace {
WorkerThreadPool& NetworkPool,
std::atomic<uint64_t>& WriteToDiskBytes,
std::atomic<uint64_t>& BytesDownloaded,
- std::atomic<uint64_t>& LooseChunksBytes,
- std::atomic<uint64_t>& DownloadedChunks,
- std::atomic<uint32_t>& ChunksComplete,
- std::atomic<uint64_t>& MultipartAttachmentCount)
+ std::atomic<uint64_t>& MultipartAttachmentCount,
+ std::function<void(uint64_t DowloadedBytes)>&& OnDownloadComplete,
+ std::function<void()>&& OnWriteComplete)
{
ZEN_TRACE_CPU("DownloadLargeBlob");
@@ -3665,22 +3847,20 @@ namespace {
Workload,
ChunkHash,
&BytesDownloaded,
- &LooseChunksBytes,
+ OnDownloadComplete = std::move(OnDownloadComplete),
+ OnWriteComplete = std::move(OnWriteComplete),
&WriteToDiskBytes,
- &DownloadedChunks,
- &ChunksComplete,
SequenceIndexChunksLeftToWriteCounters,
ChunkTargetPtrs = std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>(
ChunkTargetPtrs)](uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining) {
BytesDownloaded += Chunk.GetSize();
- LooseChunksBytes += Chunk.GetSize();
if (!AbortFlag.load())
{
Workload->TempFile.Write(Chunk.GetView(), Offset);
if (Chunk.GetSize() == BytesRemaining)
{
- DownloadedChunks++;
+ OnDownloadComplete(Workload->TempFile.FileSize());
Work.ScheduleWork(
WritePool, // GetSyncWorkerPool(),//
@@ -3690,8 +3870,7 @@ namespace {
ChunkHash,
Workload,
Offset,
- BytesRemaining,
- &ChunksComplete,
+ OnWriteComplete = std::move(OnWriteComplete),
&WriteToDiskBytes,
SequenceIndexChunksLeftToWriteCounters,
ChunkTargetPtrs](std::atomic<bool>&) {
@@ -3725,7 +3904,7 @@ namespace {
CompositeBuffer(std::move(CompressedPart)),
WriteToDiskBytes);
NeedHashVerify = false;
- ChunksComplete++;
+ OnWriteComplete();
}
else
{
@@ -3748,7 +3927,7 @@ namespace {
CompositeBuffer(Chunk),
OpenFileCache,
WriteToDiskBytes);
- ChunksComplete++;
+ OnWriteComplete();
}
}
@@ -3760,12 +3939,12 @@ namespace {
const uint32_t RemoteSequenceIndex = Location->SequenceIndex;
if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1)
{
- ZEN_TRACE_CPU("VerifyChunkHash");
-
const IoHash& SequenceRawHash =
RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
if (NeedHashVerify)
{
+ ZEN_TRACE_CPU("VerifyChunkHash");
+
const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(
GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
if (VerifyChunkHash != ChunkHash)
@@ -3816,6 +3995,7 @@ namespace {
const ChunkedFolderContent& RemoteContent,
const std::vector<ChunkBlockDescription>& BlockDescriptions,
const std::vector<IoHash>& LooseChunkHashes,
+ bool AllowPartialBlockRequests,
bool WipeTargetFolder,
FolderContent& OutLocalFolderState)
{
@@ -3967,11 +4147,17 @@ namespace {
}
}
- std::atomic<uint32_t> ChunkCountWritten = 0;
+ uint64_t TotalRequestCount = 0;
+ std::atomic<uint64_t> RequestsComplete = 0;
+ std::atomic<uint32_t> ChunkCountWritten = 0;
+ std::atomic<size_t> TotalPartWriteCount = 0;
+ std::atomic<size_t> WritePartsComplete = 0;
{
ZEN_TRACE_CPU("HandleChunks");
+ Stopwatch WriteTimer;
+
FilteredRate FilteredDownloadedBytesPerSecond;
FilteredRate FilteredWrittenBytesPerSecond;
@@ -4010,6 +4196,8 @@ namespace {
}
else
{
+ TotalRequestCount++;
+ TotalPartWriteCount++;
Work.ScheduleWork(
NetworkPool, // GetSyncWorkerPool(),//
[&, ChunkHash, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) {
@@ -4020,25 +4208,39 @@ namespace {
FilteredDownloadedBytesPerSecond.Start();
if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize)
{
- DownloadLargeBlob(Storage,
- Path / ZenTempChunkFolderName,
- CacheFolderPath,
- RemoteContent,
- RemoteLookup,
- BuildId,
- ChunkHash,
- PreferredMultipartChunkSize,
- ChunkTargetPtrs,
- SequenceIndexChunksLeftToWriteCounters,
- Work,
- WritePool,
- NetworkPool,
- WriteToDiskBytes,
- BytesDownloaded,
- LooseChunksBytes,
- DownloadedChunks,
- ChunkCountWritten,
- MultipartAttachmentCount);
+ DownloadLargeBlob(
+ Storage,
+ Path / ZenTempChunkFolderName,
+ CacheFolderPath,
+ RemoteContent,
+ RemoteLookup,
+ BuildId,
+ ChunkHash,
+ PreferredMultipartChunkSize,
+ ChunkTargetPtrs,
+ SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ WritePool,
+ NetworkPool,
+ WriteToDiskBytes,
+ BytesDownloaded,
+ MultipartAttachmentCount,
+ [&](uint64_t BytesDownloaded) {
+ LooseChunksBytes += BytesDownloaded;
+ RequestsComplete++;
+ if (RequestsComplete == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ },
+ [&]() {
+ ChunkCountWritten++;
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
+ });
}
else
{
@@ -4049,6 +4251,11 @@ namespace {
}
BytesDownloaded += CompressedPart.GetSize();
LooseChunksBytes += CompressedPart.GetSize();
+ RequestsComplete++;
+ if (RequestsComplete == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
CompositeBuffer Payload = WriteToTempFileIfNeeded(CompositeBuffer(std::move(CompressedPart)),
Path / ZenTempChunkFolderName,
ChunkHash);
@@ -4077,6 +4284,11 @@ namespace {
CompositeBuffer(CompressedPart),
WriteToDiskBytes);
ChunkCountWritten++;
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
NeedHashVerify = false;
}
else
@@ -4096,10 +4308,17 @@ namespace {
OpenFileCache,
WriteToDiskBytes);
ChunkCountWritten++;
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
}
}
if (!AbortFlag)
{
+ WritePartsComplete++;
+
// Write tracking, updating this must be done without any files open
// (WriteFileCache)
for (const ChunkedContentLookup::ChunkSequenceLocation* Location :
@@ -4109,12 +4328,12 @@ namespace {
if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(
1) == 1)
{
- ZEN_TRACE_CPU("UpdateFolder_VerifyHash");
-
const IoHash& SequenceRawHash =
RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
if (NeedHashVerify)
{
+ ZEN_TRACE_CPU("UpdateFolder_VerifyHash");
+
const IoHash VerifyChunkHash =
IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(
GetTempChunkedSequenceFileName(CacheFolderPath,
@@ -4155,6 +4374,8 @@ namespace {
break;
}
+ TotalPartWriteCount++;
+
Work.ScheduleWork(
WritePool, // GetSyncWorkerPool(),//
[&, CopyDataIndex](std::atomic<bool>&) {
@@ -4166,245 +4387,428 @@ namespace {
const CacheCopyData& CopyData = CacheCopyDatas[CopyDataIndex];
const uint32_t LocalPathIndex = LocalLookup.SequenceIndexFirstPathIndex[CopyData.LocalSequenceIndex];
const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred();
- if (!CopyData.TargetChunkLocationPtrs.empty())
- {
- uint64_t CacheLocalFileBytesRead = 0;
+ ZEN_ASSERT(!CopyData.TargetChunkLocationPtrs.empty());
- size_t TargetStart = 0;
- const std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> AllTargets(
- CopyData.TargetChunkLocationPtrs);
+ uint64_t CacheLocalFileBytesRead = 0;
- struct WriteOp
- {
- const ChunkedContentLookup::ChunkSequenceLocation* Target;
- uint64_t CacheFileOffset;
- uint64_t ChunkSize;
- };
+ size_t TargetStart = 0;
+ const std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> AllTargets(
+ CopyData.TargetChunkLocationPtrs);
+
+ struct WriteOp
+ {
+ const ChunkedContentLookup::ChunkSequenceLocation* Target;
+ uint64_t CacheFileOffset;
+ uint64_t ChunkSize;
+ };
- std::vector<WriteOp> WriteOps;
- WriteOps.reserve(AllTargets.size());
+ std::vector<WriteOp> WriteOps;
+ WriteOps.reserve(AllTargets.size());
- for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets)
+ for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets)
+ {
+ std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> TargetRange =
+ AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount);
+ for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange)
{
- std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> TargetRange =
- AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount);
- for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange)
- {
- WriteOps.push_back(WriteOp{.Target = Target,
- .CacheFileOffset = ChunkTarget.CacheFileOffset,
- .ChunkSize = ChunkTarget.ChunkRawSize});
- }
- TargetStart += ChunkTarget.TargetChunkLocationCount;
+ WriteOps.push_back(WriteOp{.Target = Target,
+ .CacheFileOffset = ChunkTarget.CacheFileOffset,
+ .ChunkSize = ChunkTarget.ChunkRawSize});
}
+ TargetStart += ChunkTarget.TargetChunkLocationCount;
+ }
- std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) {
- if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
- {
- return true;
- }
- else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
- {
- return false;
- }
- if (Lhs.Target->Offset < Rhs.Target->Offset)
- {
- return true;
- }
+ std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) {
+ if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
+ {
+ return true;
+ }
+ else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
+ {
return false;
- });
+ }
+ if (Lhs.Target->Offset < Rhs.Target->Offset)
+ {
+ return true;
+ }
+ return false;
+ });
- if (!AbortFlag)
+ if (!AbortFlag)
+ {
+ BufferedOpenFile SourceFile(LocalFilePath);
+ WriteFileCache OpenFileCache;
+ for (const WriteOp& Op : WriteOps)
{
- BufferedOpenFile SourceFile(LocalFilePath);
- WriteFileCache OpenFileCache;
- for (const WriteOp& Op : WriteOps)
+ if (AbortFlag)
{
- if (AbortFlag)
- {
- break;
- }
- const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
- ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() <=
- RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]);
- ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() > 0);
- const uint32_t RemotePathIndex = RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex];
- const uint64_t ChunkSize = Op.ChunkSize;
- CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ChunkSize);
-
- ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]);
-
- OpenFileCache.WriteToFile<CompositeBuffer>(
- RemoteSequenceIndex,
- [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) {
- return GetTempChunkedSequenceFileName(
- CacheFolderPath,
- RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
- },
- ChunkSource,
- Op.Target->Offset,
- RemoteContent.RawSizes[RemotePathIndex]);
- WriteToDiskBytes += ChunkSize;
- CacheLocalFileBytesRead += ChunkSize; // TODO: This should be the sum of unique chunk sizes?
+ break;
}
+ const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() <=
+ RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]);
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() > 0);
+ const uint32_t RemotePathIndex = RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex];
+ const uint64_t ChunkSize = Op.ChunkSize;
+ CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ChunkSize);
+
+ ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]);
+
+ OpenFileCache.WriteToFile<CompositeBuffer>(
+ RemoteSequenceIndex,
+ [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) {
+ return GetTempChunkedSequenceFileName(
+ CacheFolderPath,
+ RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
+ },
+ ChunkSource,
+ Op.Target->Offset,
+ RemoteContent.RawSizes[RemotePathIndex]);
+ WriteToDiskBytes += ChunkSize;
+ CacheLocalFileBytesRead += ChunkSize; // TODO: This should be the sum of unique chunk sizes?
}
- if (!AbortFlag)
+ }
+ if (!AbortFlag)
+ {
+ // Write tracking, updating this must be done without any files open (WriteFileCache)
+ for (const WriteOp& Op : WriteOps)
{
- if (!AbortFlag)
+ ZEN_TRACE_CPU("UpdateFolder_Copy_VerifyHash");
+
+ const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
+ if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1)
{
- // Write tracking, updating this must be done without any files open (WriteFileCache)
- for (const WriteOp& Op : WriteOps)
+ const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(
+ GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
+ if (VerifyChunkHash != SequenceRawHash)
{
- ZEN_TRACE_CPU("UpdateFolder_Copy_VerifyHash");
-
- const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
- if (SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1) == 1)
- {
- const IoHash& SequenceRawHash =
- RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- const IoHash VerifyChunkHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(
- GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash)));
- if (VerifyChunkHash != SequenceRawHash)
- {
- throw std::runtime_error(
- fmt::format("Written chunk sequence {} hash does not match expected hash {}",
- VerifyChunkHash,
- SequenceRawHash));
- }
-
- ZEN_TRACE_CPU("UpdateFolder_Copy_rename");
- std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash),
- GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash));
- }
+ throw std::runtime_error(
+ fmt::format("Written chunk sequence {} hash does not match expected hash {}",
+ VerifyChunkHash,
+ SequenceRawHash));
}
- }
- ChunkCountWritten += gsl::narrow<uint32_t>(CopyData.ChunkTargets.size());
- ZEN_DEBUG("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), LocalContent.Paths[LocalPathIndex]);
+ ZEN_TRACE_CPU("UpdateFolder_Copy_rename");
+ std::filesystem::rename(GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash),
+ GetFinalChunkedSequenceFileName(CacheFolderPath, SequenceRawHash));
+ }
}
+
+ ChunkCountWritten += gsl::narrow<uint32_t>(CopyData.ChunkTargets.size());
+ ZEN_DEBUG("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), LocalContent.Paths[LocalPathIndex]);
+ }
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
}
}
},
Work.DefaultErrorFunction());
}
- size_t BlockCount = BlockDescriptions.size();
- std::atomic<size_t> BlocksComplete = 0;
-
- auto IsBlockNeeded = [&RemoteContent, &RemoteLookup, &RemoteChunkIndexNeedsCopyFromSourceFlags](
- const ChunkBlockDescription& BlockDescription) -> bool {
- for (const IoHash& ChunkHash : BlockDescription.ChunkRawHashes)
- {
- if (auto It = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); It != RemoteLookup.ChunkHashToChunkIndex.end())
- {
- const uint32_t RemoteChunkIndex = It->second;
- if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex])
- {
- return true;
- }
- }
- }
- return false;
+ size_t BlockCount = BlockDescriptions.size();
+
+ std::vector<bool> ChunkIsPickedUpByBlock(RemoteContent.ChunkedContent.ChunkHashes.size(), false);
+ auto GetNeededChunkBlockIndexes = [&RemoteContent,
+ &RemoteLookup,
+ &RemoteChunkIndexNeedsCopyFromSourceFlags,
+ &ChunkIsPickedUpByBlock](const ChunkBlockDescription& BlockDescription) {
+ std::vector<uint32_t> NeededBlockChunkIndexes;
+ for (uint32_t ChunkBlockIndex = 0; ChunkBlockIndex < BlockDescription.ChunkRawHashes.size(); ChunkBlockIndex++)
+ {
+ const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex];
+ if (auto It = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); It != RemoteLookup.ChunkHashToChunkIndex.end())
+ {
+ const uint32_t RemoteChunkIndex = It->second;
+ if (!ChunkIsPickedUpByBlock[RemoteChunkIndex])
+ {
+ if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex])
+ {
+ ChunkIsPickedUpByBlock[RemoteChunkIndex] = true;
+ NeededBlockChunkIndexes.push_back(ChunkBlockIndex);
+ }
+ }
+ }
+ }
+ return NeededBlockChunkIndexes;
};
- size_t BlocksNeededCount = 0;
+ size_t BlocksNeededCount = 0;
+ uint64_t AllBlocksSize = 0;
+ uint64_t AllBlocksFetch = 0;
+ uint64_t AllBlocksSlack = 0;
+ uint64_t AllBlockRequests = 0;
+ uint64_t AllBlockChunksSize = 0;
for (size_t BlockIndex = 0; BlockIndex < BlockCount; BlockIndex++)
{
if (Work.IsAborted())
{
break;
}
- if (IsBlockNeeded(BlockDescriptions[BlockIndex]))
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+ const std::vector<uint32_t> BlockChunkIndexNeeded = GetNeededChunkBlockIndexes(BlockDescription);
+ if (!BlockChunkIndexNeeded.empty())
{
- BlocksNeededCount++;
- Work.ScheduleWork(
- NetworkPool,
- [&, BlockIndex](std::atomic<bool>&) {
- if (!AbortFlag)
+ bool WantsToDoPartialBlockDownload = BlockChunkIndexNeeded.size() < BlockDescription.ChunkRawHashes.size();
+ bool CanDoPartialBlockDownload = (BlockDescription.HeaderSize > 0) && (BlockDescription.ChunkCompressedLengths.size() ==
+ BlockDescription.ChunkRawHashes.size());
+ if (AllowPartialBlockRequests && WantsToDoPartialBlockDownload && CanDoPartialBlockDownload)
+ {
+ struct BlockRangeDescriptor
+ {
+ uint64_t RangeStart = 0;
+ uint64_t RangeLength = 0;
+ uint32_t ChunkBlockIndexStart = 0;
+ uint32_t ChunkBlockIndexCount = 0;
+ };
+ std::vector<BlockRangeDescriptor> BlockRanges;
+
+ ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_PartialAnalisys");
+
+ uint32_t NeedBlockChunkIndexOffset = 0;
+ uint32_t ChunkBlockIndex = 0;
+ uint32_t CurrentOffset =
+ gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize);
+
+ BlockRangeDescriptor NextRange;
+ while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() &&
+ ChunkBlockIndex < BlockDescription.ChunkRawHashes.size())
+ {
+ const uint32_t ChunkCompressedLength = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex];
+ if (ChunkBlockIndex < BlockChunkIndexNeeded[NeedBlockChunkIndexOffset])
{
- ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_Read");
-
- FilteredDownloadedBytesPerSecond.Start();
- IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockDescriptions[BlockIndex].BlockHash);
- if (!BlockBuffer)
+ if (NextRange.RangeLength > 0)
{
- throw std::runtime_error(fmt::format("Block {} is missing", BlockDescriptions[BlockIndex].BlockHash));
+ BlockRanges.push_back(NextRange);
+ NextRange = {};
}
- BytesDownloaded += BlockBuffer.GetSize();
- BlockBytes += BlockBuffer.GetSize();
- DownloadedBlocks++;
- CompositeBuffer Payload = WriteToTempFileIfNeeded(CompositeBuffer(std::move(BlockBuffer)),
- Path / ZenTempBlockFolderName,
- BlockDescriptions[BlockIndex].BlockHash);
-
- if (!AbortFlag)
+ ChunkBlockIndex++;
+ CurrentOffset += ChunkCompressedLength;
+ }
+ else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset])
+ {
+ AllBlockChunksSize += ChunkCompressedLength;
+ if (NextRange.RangeLength == 0)
{
- Work.ScheduleWork(
- WritePool,
- [&, BlockIndex, BlockBuffer = std::move(Payload)](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_Write");
-
- FilteredWrittenBytesPerSecond.Start();
- IoHash BlockRawHash;
- uint64_t BlockRawSize;
- CompressedBuffer CompressedBlockBuffer =
- CompressedBuffer::FromCompressed(std::move(BlockBuffer), BlockRawHash, BlockRawSize);
- if (!CompressedBlockBuffer)
- {
- throw std::runtime_error(fmt::format("Block {} is not a compressed buffer",
- BlockDescriptions[BlockIndex].BlockHash));
- }
+ NextRange.RangeStart = CurrentOffset;
+ NextRange.ChunkBlockIndexStart = ChunkBlockIndex;
+ }
+ NextRange.RangeLength += ChunkCompressedLength;
+ NextRange.ChunkBlockIndexCount++;
+ ChunkBlockIndex++;
+ CurrentOffset += ChunkCompressedLength;
+ NeedBlockChunkIndexOffset++;
+ }
+ else
+ {
+ ZEN_ASSERT(false);
+ }
+ }
+ AllBlocksSize += CurrentOffset;
+ if (NextRange.RangeLength > 0)
+ {
+ BlockRanges.push_back(NextRange);
+ NextRange = {};
+ }
- if (BlockRawHash != BlockDescriptions[BlockIndex].BlockHash)
- {
- throw std::runtime_error(fmt::format("Block {} header has a mismatching raw hash {}",
- BlockDescriptions[BlockIndex].BlockHash,
- BlockRawHash));
- }
+ ZEN_ASSERT(!BlockRanges.empty());
+ std::vector<BlockRangeDescriptor> CollapsedBlockRanges;
+ auto It = BlockRanges.begin();
+ CollapsedBlockRanges.push_back(*It++);
+ uint64_t TotalSlack = 0;
+ while (It != BlockRanges.end())
+ {
+ BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back();
+ uint64_t Slack = It->RangeStart - (LastRange.RangeStart + LastRange.RangeLength);
+ uint64_t BothRangeSize = It->RangeLength + LastRange.RangeLength;
+ if (Slack <= Max(BothRangeSize / 8, 64u * 1024u)) // Made up heuristic - we'll see how it pans out
+ {
+ LastRange.ChunkBlockIndexCount =
+ (It->ChunkBlockIndexStart + It->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart;
+ LastRange.RangeLength = (It->RangeStart + It->RangeLength) - LastRange.RangeStart;
+ TotalSlack += Slack;
+ }
+ else
+ {
+ CollapsedBlockRanges.push_back(*It);
+ }
+ ++It;
+ }
+
+ uint64_t TotalFetch = 0;
+ for (const BlockRangeDescriptor& Range : CollapsedBlockRanges)
+ {
+ TotalFetch += Range.RangeLength;
+ }
+
+ AllBlocksFetch += TotalFetch;
+ AllBlocksSlack += TotalSlack;
+ BlocksNeededCount++;
+ AllBlockRequests += CollapsedBlockRanges.size();
+
+ for (size_t BlockRangeIndex = 0; BlockRangeIndex < CollapsedBlockRanges.size(); BlockRangeIndex++)
+ {
+ TotalRequestCount++;
+ TotalPartWriteCount++;
+ const BlockRangeDescriptor BlockRange = CollapsedBlockRanges[BlockRangeIndex];
+ // Partial block schedule
+ Work.ScheduleWork(
+ NetworkPool, // NetworkPool, // GetSyncWorkerPool()
+ [&, BlockIndex, BlockRange](std::atomic<bool>&) {
+ ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_PartialGet");
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+
+ FilteredDownloadedBytesPerSecond.Start();
+ IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId,
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength);
+ if (!BlockBuffer)
+ {
+ throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
+ }
+ BytesDownloaded += BlockBuffer.GetSize();
+ BlockBytes += BlockBuffer.GetSize();
+ DownloadedBlocks++;
+ CompositeBuffer Payload = WriteToTempFileIfNeeded(CompositeBuffer(std::move(BlockBuffer)),
+ Path / ZenTempBlockFolderName,
+ BlockDescription.BlockHash,
+ fmt::format("_{}", BlockRange.RangeStart));
+ RequestsComplete++;
+ if (RequestsComplete == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
- CompositeBuffer DecompressedBlockBuffer = CompressedBlockBuffer.DecompressToComposite();
- if (!DecompressedBlockBuffer)
+ if (!AbortFlag)
+ {
+ Work.ScheduleWork(
+ WritePool, // WritePool, // GetSyncWorkerPool(),
+ [&, BlockIndex, BlockRange, BlockPartialBuffer = std::move(Payload)](std::atomic<bool>&) {
+ if (!AbortFlag)
{
- throw std::runtime_error(fmt::format("Block {} failed to decompress",
- BlockDescriptions[BlockIndex].BlockHash));
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+
+ FilteredWrittenBytesPerSecond.Start();
+
+ if (!WritePartialBlockToDisk(
+ CacheFolderPath,
+ RemoteContent,
+ BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ BlockPartialBuffer,
+ BlockRange.ChunkBlockIndexStart,
+ BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1,
+ RemoteLookup,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ ChunkCountWritten,
+ WriteToDiskBytes))
+ {
+ throw std::runtime_error(
+ fmt::format("Partial block {} is malformed", BlockDescription.BlockHash));
+ }
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
}
+ },
+ [&, BlockIndex](const std::exception& Ex, std::atomic<bool>&) {
+ ZEN_ERROR("Failed writing block {}. Reason: {}",
+ BlockDescriptions[BlockIndex].BlockHash,
+ Ex.what());
+ AbortFlag = true;
+ });
+ }
+ },
+ Work.DefaultErrorFunction());
+ }
+ }
+ else
+ {
+ BlocksNeededCount++;
+ TotalRequestCount++;
+ TotalPartWriteCount++;
+
+ Work.ScheduleWork(
+ NetworkPool, // GetSyncWorkerPool(), // NetworkPool,
+ [&, BlockIndex](std::atomic<bool>&) {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_Get");
+
+ // Full block schedule
- ZEN_ASSERT_SLOW(BlockDescriptions[BlockIndex].BlockHash ==
- IoHash::HashBuffer(DecompressedBlockBuffer));
-
- if (!WriteBlockToDisk(CacheFolderPath,
- RemoteContent,
- SequenceIndexChunksLeftToWriteCounters,
- DecompressedBlockBuffer,
- RemoteLookup,
- RemoteChunkIndexNeedsCopyFromSourceFlags.data(),
- ChunkCountWritten,
- WriteToDiskBytes))
+ FilteredDownloadedBytesPerSecond.Start();
+ IoBuffer BlockBuffer = Storage.GetBuildBlob(BuildId, BlockDescription.BlockHash);
+ if (!BlockBuffer)
+ {
+ throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
+ }
+ BytesDownloaded += BlockBuffer.GetSize();
+ BlockBytes += BlockBuffer.GetSize();
+ DownloadedBlocks++;
+ RequestsComplete++;
+ if (RequestsComplete == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+
+ CompositeBuffer Payload = WriteToTempFileIfNeeded(CompositeBuffer(std::move(BlockBuffer)),
+ Path / ZenTempBlockFolderName,
+ BlockDescription.BlockHash);
+ if (!AbortFlag)
+ {
+ Work.ScheduleWork(
+ WritePool,
+ [&, BlockIndex, BlockBuffer = std::move(Payload)](std::atomic<bool>&) {
+ if (!AbortFlag)
{
- throw std::runtime_error(
- fmt::format("Block {} is malformed", BlockDescriptions[BlockIndex].BlockHash));
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+
+ FilteredWrittenBytesPerSecond.Start();
+ if (!WriteBlockToDisk(CacheFolderPath,
+ RemoteContent,
+ BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ BlockBuffer,
+ RemoteLookup,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ ChunkCountWritten,
+ WriteToDiskBytes))
+ {
+ throw std::runtime_error(
+ fmt::format("Block {} is malformed", BlockDescription.BlockHash));
+ }
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
}
- BlocksComplete++;
- }
- },
- [&, BlockIndex](const std::exception& Ex, std::atomic<bool>&) {
- ZEN_ERROR("Failed writing block {}. Reason: {}",
- BlockDescriptions[BlockIndex].BlockHash,
- Ex.what());
- AbortFlag = true;
- });
+ },
+ Work.DefaultErrorFunction());
+ }
}
- }
- },
- Work.DefaultErrorFunction());
+ },
+ Work.DefaultErrorFunction());
+ }
}
else
{
ZEN_DEBUG("Skipping block {} due to cache reuse", BlockDescriptions[BlockIndex].BlockHash);
}
}
-
+ ZEN_DEBUG("Fetching {} with {} slack (ideal {}) out of {} using {} requests for {} blocks",
+ NiceBytes(AllBlocksFetch),
+ NiceBytes(AllBlocksSlack),
+ NiceBytes(AllBlockChunksSize),
+ NiceBytes(AllBlocksSize),
+ AllBlockRequests,
+ BlocksNeededCount);
ZEN_TRACE_CPU("HandleChunks_Wait");
Work.Wait(UsePlainProgress ? 5000 : 200, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
@@ -4412,13 +4816,13 @@ namespace {
ZEN_ASSERT(ChunkCountToWrite >= ChunkCountWritten.load());
FilteredWrittenBytesPerSecond.Update(WriteToDiskBytes.load());
FilteredDownloadedBytesPerSecond.Update(BytesDownloaded.load());
- std::string Details = fmt::format("{}/{} chunks. {}/{} blocks. {} {}bits/s downloaded. {} {}B/s written",
- ChunkCountWritten.load(),
- ChunkCountToWrite,
- BlocksComplete.load(),
- BlocksNeededCount,
+ std::string Details = fmt::format("{}/{} ({} {}bits/s) downloaded. {}/{} ({} {}B/s) written.",
+ RequestsComplete.load(),
+ TotalRequestCount,
NiceBytes(BytesDownloaded.load()),
NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8),
+ ChunkCountWritten.load(),
+ ChunkCountToWrite,
NiceBytes(WriteToDiskBytes.load()),
NiceNum(FilteredWrittenBytesPerSecond.GetCurrent()));
WriteProgressBar.UpdateState({.Task = "Writing chunks ",
@@ -4437,6 +4841,13 @@ namespace {
}
WriteProgressBar.Finish();
+
+ ZEN_CONSOLE("Downloaded {} ({}bits/s). Wrote {} ({}B/s). Completed in {}",
+ NiceBytes(BytesDownloaded.load()),
+ NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), BytesDownloaded * 8)),
+ NiceBytes(WriteToDiskBytes.load()),
+ NiceNum(GetBytesPerSecond(FilteredWrittenBytesPerSecond.GetElapsedTimeUS(), WriteToDiskBytes.load())),
+ NiceTimeSpanMs(WriteTimer.GetElapsedTimeMs()));
}
for (const auto& SequenceIndexChunksLeftToWriteCounter : SequenceIndexChunksLeftToWriteCounters)
@@ -4455,6 +4866,7 @@ namespace {
});
// Move all files we will reuse to cache folder
+ // TODO: If WipeTargetFolder is false we could check which files are already correct and leave them in place
for (uint32_t LocalPathIndex = 0; LocalPathIndex < LocalContent.Paths.size(); LocalPathIndex++)
{
const IoHash& RawHash = LocalContent.RawHashes[LocalPathIndex];
@@ -5186,6 +5598,7 @@ namespace {
std::span<const std::string> BuildPartNames,
const std::filesystem::path& Path,
bool AllowMultiparts,
+ bool AllowPartialBlockRequests,
bool WipeTargetFolder,
bool PostDownloadVerify)
{
@@ -5202,8 +5615,8 @@ namespace {
CreateDirectories(Path / ZenTempBlockFolderName);
CreateDirectories(Path / ZenTempChunkFolderName); // TODO: Don't clear this - pick up files -> chunks to use
CreateDirectories(Path /
- ZenTempCacheFolderName); // TODO: Don't clear this - pick up files and use as sequences (non .tmp extension) and
- // delete .tmp (maybe?) - chunk them? How do we know the file is worth chunking?
+ ZenTempCacheFolderName); // TODO: Don't clear this - pick up files and use as sequences (non .tmp extension)
+ // and delete .tmp (maybe?) - chunk them? How do we know the file is worth chunking?
std::uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u;
@@ -5311,6 +5724,7 @@ namespace {
RemoteContent,
BlockDescriptions,
LooseChunkHashes,
+ AllowPartialBlockRequests,
WipeTargetFolder,
LocalFolderState);
@@ -5705,6 +6119,12 @@ BuildsCommand::BuildsCommand()
"Allow large attachments to be transfered using multipart protocol. Defaults to true.",
cxxopts::value(m_AllowMultiparts),
"<allowmultipart>");
+ m_DownloadOptions.add_option("",
+ "",
+ "allow-partial-block-requests",
+ "Allow request for partial chunk blocks. Defaults to true.",
+ cxxopts::value(m_AllowPartialBlockRequests),
+ "<allowpartialblockrequests>");
m_DownloadOptions
.add_option("", "", "verify", "Enable post download verify of all tracked files", cxxopts::value(m_PostDownloadVerify), "<verify>");
m_DownloadOptions.parse_positional({"local-path", "build-id", "build-part-name"});
@@ -5728,6 +6148,18 @@ BuildsCommand::BuildsCommand()
AddOutputOptions(m_TestOptions);
m_TestOptions.add_options()("h,help", "Print help");
m_TestOptions.add_option("", "l", "local-path", "Root file system folder used as base", cxxopts::value(m_Path), "<local-path>");
+ m_TestOptions.add_option("",
+ "",
+ "allow-multipart",
+ "Allow large attachments to be transfered using multipart protocol. Defaults to true.",
+ cxxopts::value(m_AllowMultiparts),
+ "<allowmultipart>");
+ m_TestOptions.add_option("",
+ "",
+ "allow-partial-block-requests",
+ "Allow request for partial chunk blocks. Defaults to true.",
+ cxxopts::value(m_AllowPartialBlockRequests),
+ "<allowpartialblockrequests>");
m_TestOptions.parse_positional({"local-path"});
m_TestOptions.positional_help("local-path");
@@ -6037,7 +6469,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
m_Bucket,
GeneratedBuildId ? "Generated " : "",
BuildId);
- CreateDirectories(m_Path / ZenTempStorageFolderName);
Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName);
StorageName = "Cloud DDC";
}
@@ -6175,7 +6606,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
m_Namespace,
m_Bucket,
BuildId);
- CreateDirectories(m_Path / ZenTempStorageFolderName);
Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName);
StorageName = "Cloud DDC";
}
@@ -6190,7 +6620,15 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help()));
}
- DownloadFolder(*Storage, BuildId, BuildPartIds, m_BuildPartNames, m_Path, m_AllowMultiparts, m_Clean, m_PostDownloadVerify);
+ DownloadFolder(*Storage,
+ BuildId,
+ BuildPartIds,
+ m_BuildPartNames,
+ m_Path,
+ m_AllowMultiparts,
+ m_AllowPartialBlockRequests,
+ m_Clean,
+ m_PostDownloadVerify);
if (false)
{
@@ -6275,7 +6713,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
m_Namespace,
m_Bucket,
BuildId);
- CreateDirectories(m_Path / ZenTempStorageFolderName);
Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName);
StorageName = "Cloud DDC";
}
@@ -6335,7 +6772,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
const std::filesystem::path DownloadPath = m_Path.parent_path() / (m_BuildPartName + "_download");
ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}'", BuildId, BuildPartId, m_BuildPartName, DownloadPath);
- DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, true, true);
+ DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, true, true);
if (AbortFlag)
{
ZEN_CONSOLE("Download failed.");
@@ -6347,7 +6784,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
BuildPartId,
m_BuildPartName,
DownloadPath);
- DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false, true);
+ DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed. (identical target)");
@@ -6449,7 +6886,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
BuildPartId,
m_BuildPartName,
DownloadPath);
- DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false, true);
+ DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed. (scrambled target)");
@@ -6487,7 +6924,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (original)", BuildId, BuildPartId, m_BuildPartName, DownloadPath);
- DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, false, true);
+ DownloadFolder(*Storage, BuildId, {BuildPartId}, {}, DownloadPath, m_AllowMultiparts, m_AllowPartialBlockRequests, false, true);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed.");
@@ -6495,7 +6932,15 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath);
- DownloadFolder(*Storage, BuildId2, {BuildPartId2}, {}, DownloadPath, m_AllowMultiparts, false, true);
+ DownloadFolder(*Storage,
+ BuildId2,
+ {BuildPartId2},
+ {},
+ DownloadPath,
+ m_AllowMultiparts,
+ m_AllowPartialBlockRequests,
+ false,
+ true);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed.");
@@ -6503,7 +6948,15 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
ZEN_CONSOLE("\nRe-download Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath);
- DownloadFolder(*Storage, BuildId2, {BuildPartId2}, {}, DownloadPath, m_AllowMultiparts, false, true);
+ DownloadFolder(*Storage,
+ BuildId2,
+ {BuildPartId2},
+ {},
+ DownloadPath,
+ m_AllowMultiparts,
+ m_AllowPartialBlockRequests,
+ false,
+ true);
if (AbortFlag)
{
ZEN_CONSOLE("Re-download failed.");
@@ -6549,7 +7002,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
m_Namespace,
m_Bucket,
BuildId);
- CreateDirectories(m_Path / ZenTempStorageFolderName);
Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName);
StorageName = "Cloud DDC";
}
@@ -6617,7 +7069,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
m_Namespace,
m_Bucket,
BuildId);
- CreateDirectories(m_Path / ZenTempStorageFolderName);
Storage = CreateJupiterBuildStorage(Log(), Http, StorageStats, m_Namespace, m_Bucket, m_Path / ZenTempStorageFolderName);
StorageName = "Cloud DDC";
}
diff --git a/src/zen/cmds/builds_cmd.h b/src/zen/cmds/builds_cmd.h
index c54fb4db1..838a17807 100644
--- a/src/zen/cmds/builds_cmd.h
+++ b/src/zen/cmds/builds_cmd.h
@@ -49,6 +49,7 @@ private:
bool m_Clean = false;
uint8_t m_BlockReuseMinPercentLimit = 85;
bool m_AllowMultiparts = true;
+ bool m_AllowPartialBlockRequests = true;
std::filesystem::path m_ManifestPath;
// Direct access token (may expire)
diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp
index e4c6d243d..30711a432 100644
--- a/src/zenhttp/httpclient.cpp
+++ b/src/zenhttp/httpclient.cpp
@@ -1148,6 +1148,30 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold
return true;
};
+ uint64_t RequestedContentLength = (uint64_t)-1;
+ if (auto RangeIt = AdditionalHeader.Entries.find("Range"); RangeIt != AdditionalHeader.Entries.end())
+ {
+ if (RangeIt->second.starts_with("bytes"))
+ {
+ size_t RangeStartPos = RangeIt->second.find('=', 5);
+ if (RangeStartPos != std::string::npos)
+ {
+ RangeStartPos++;
+ size_t RangeSplitPos = RangeIt->second.find('-', RangeStartPos);
+ if (RangeSplitPos != std::string::npos)
+ {
+ std::optional<size_t> RequestedRangeStart =
+ ParseInt<size_t>(RangeIt->second.substr(RangeStartPos, RangeSplitPos - RangeStartPos));
+ std::optional<size_t> RequestedRangeEnd = ParseInt<size_t>(RangeIt->second.substr(RangeStartPos + 1));
+ if (RequestedRangeStart.has_value() && RequestedRangeEnd.has_value())
+ {
+ RequestedContentLength = RequestedRangeEnd.value() - 1;
+ }
+ }
+ }
+ }
+ }
+
cpr::Response Response;
{
std::vector<std::pair<std::string, std::string>> ReceivedHeaders;
@@ -1155,10 +1179,10 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold
std::pair<std::string, std::string> Header = GetHeader(header);
if (Header.first == "Content-Length"sv)
{
- std::optional<size_t> ContentSize = ParseInt<size_t>(Header.second);
- if (ContentSize.has_value())
+ std::optional<size_t> ContentLength = ParseInt<size_t>(Header.second);
+ if (ContentLength.has_value())
{
- if (ContentSize.value() > 1024 * 1024)
+ if (ContentLength.value() > 1024 * 1024)
{
PayloadFile = std::make_unique<detail::TempPayloadFile>();
std::error_code Ec = PayloadFile->Open(TempFolderPath);
@@ -1172,7 +1196,7 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold
}
else
{
- PayloadString.reserve(ContentSize.value());
+ PayloadString.reserve(ContentLength.value());
}
}
}
@@ -1218,85 +1242,90 @@ HttpClient::Download(std::string_view Url, const std::filesystem::path& TempFold
auto It = Response.header.find("Content-Length");
if (It != Response.header.end())
{
- std::optional<int64_t> ContentLength = ParseInt<int64_t>(It->second);
- if (ContentLength)
- {
- std::vector<std::pair<std::string, std::string>> ReceivedHeaders;
+ std::vector<std::pair<std::string, std::string>> ReceivedHeaders;
- auto HeaderCallback = [&](std::string header, intptr_t) {
- std::pair<std::string, std::string> Header = GetHeader(header);
- if (!Header.first.empty())
- {
- ReceivedHeaders.emplace_back(std::move(Header));
- }
+ auto HeaderCallback = [&](std::string header, intptr_t) {
+ std::pair<std::string, std::string> Header = GetHeader(header);
+ if (!Header.first.empty())
+ {
+ ReceivedHeaders.emplace_back(std::move(Header));
+ }
- if (Header.first == "Content-Range"sv)
+ if (Header.first == "Content-Range"sv)
+ {
+ if (Header.second.starts_with("bytes "sv))
{
- if (Header.second.starts_with("bytes "sv))
+ size_t RangeStartEnd = Header.second.find('-', 6);
+ if (RangeStartEnd != std::string::npos)
{
- size_t RangeStartEnd = Header.second.find('-', 6);
- if (RangeStartEnd != std::string::npos)
+ const auto Start = ParseInt<uint64_t>(Header.second.substr(6, RangeStartEnd - 6));
+ if (Start)
{
- const auto Start = ParseInt<uint64_t>(Header.second.substr(6, RangeStartEnd - 6));
- if (Start)
+ uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length();
+ if (Start.value() == DownloadedSize)
{
- uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length();
- if (Start.value() == DownloadedSize)
- {
- return 1;
- }
- else if (Start.value() > DownloadedSize)
- {
- return 0;
- }
- if (PayloadFile)
- {
- PayloadFile->ResetWritePos(Start.value());
- }
- else
- {
- PayloadString = PayloadString.substr(0, Start.value());
- }
return 1;
}
+ else if (Start.value() > DownloadedSize)
+ {
+ return 0;
+ }
+ if (PayloadFile)
+ {
+ PayloadFile->ResetWritePos(Start.value());
+ }
+ else
+ {
+ PayloadString = PayloadString.substr(0, Start.value());
+ }
+ return 1;
}
}
- return 0;
}
- return 1;
- };
+ return 0;
+ }
+ return 1;
+ };
- KeyValueMap HeadersWithRange(AdditionalHeader);
- do
- {
- uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length();
+ KeyValueMap HeadersWithRange(AdditionalHeader);
+ do
+ {
+ uint64_t DownloadedSize = PayloadFile ? PayloadFile->GetSize() : PayloadString.length();
- std::string Range = fmt::format("bytes={}-{}", DownloadedSize, DownloadedSize + ContentLength.value() - 1);
- if (auto RangeIt = HeadersWithRange.Entries.find("Range"); RangeIt != HeadersWithRange.Entries.end())
+ uint64_t ContentLength = RequestedContentLength;
+ if (ContentLength == uint64_t(-1))
+ {
+ if (auto ParsedContentLength = ParseInt<int64_t>(It->second); ParsedContentLength.has_value())
{
- if (RangeIt->second == Range)
- {
- // If we didn't make any progress, abort
- break;
- }
+ ContentLength = ParsedContentLength.value();
}
- HeadersWithRange.Entries.insert_or_assign("Range", Range);
-
- Impl::Session Sess = m_Impl->AllocSession(m_BaseUri,
- Url,
- m_ConnectionSettings,
- HeadersWithRange,
- {},
- m_SessionId,
- GetAccessToken());
- Response = Sess.Download(cpr::WriteCallback{DownloadCallback}, cpr::HeaderCallback{HeaderCallback});
- for (const std::pair<std::string, std::string>& H : ReceivedHeaders)
+ }
+
+ std::string Range = fmt::format("bytes={}-{}", DownloadedSize, DownloadedSize + ContentLength - 1);
+ if (auto RangeIt = HeadersWithRange.Entries.find("Range"); RangeIt != HeadersWithRange.Entries.end())
+ {
+ if (RangeIt->second == Range)
{
- Response.header.insert_or_assign(H.first, H.second);
+ // If we didn't make any progress, abort
+ break;
}
- ReceivedHeaders.clear();
- } while (ShouldResume(Response));
- }
+ }
+ HeadersWithRange.Entries.insert_or_assign("Range", Range);
+
+ Impl::Session Sess = m_Impl->AllocSession(m_BaseUri,
+ Url,
+ m_ConnectionSettings,
+ HeadersWithRange,
+ {},
+ m_SessionId,
+ GetAccessToken());
+ Response = Sess.Download(cpr::WriteCallback{DownloadCallback}, cpr::HeaderCallback{HeaderCallback});
+ for (const std::pair<std::string, std::string>& H : ReceivedHeaders)
+ {
+ Response.header.insert_or_assign(H.first, H.second);
+ }
+ ReceivedHeaders.clear();
+ } while (ShouldResume(Response));
}
}
}
diff --git a/src/zenutil/filebuildstorage.cpp b/src/zenutil/filebuildstorage.cpp
index a4bb759e7..e57109006 100644
--- a/src/zenutil/filebuildstorage.cpp
+++ b/src/zenutil/filebuildstorage.cpp
@@ -325,7 +325,7 @@ public:
return {};
}
- virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash) override
+ virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override
{
ZEN_UNUSED(BuildId);
SimulateLatency(0, 0);
@@ -337,10 +337,19 @@ public:
if (std::filesystem::is_regular_file(BlockPath))
{
BasicFile File(BlockPath, BasicFile::Mode::kRead);
- IoBuffer Payload = File.ReadAll();
- ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, CompositeBuffer(SharedBuffer(Payload))));
- m_Stats.TotalBytesRead += Payload.GetSize();
+ IoBuffer Payload;
+ if (RangeOffset != 0 || RangeBytes != (uint64_t)-1)
+ {
+ Payload = IoBuffer(RangeBytes);
+ File.Read(Payload.GetMutableView().GetData(), RangeBytes, RangeOffset);
+ }
+ else
+ {
+ Payload = File.ReadAll();
+ ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, CompositeBuffer(SharedBuffer(Payload))));
+ }
Payload.SetContentType(ZenContentType::kCompressedBinary);
+ m_Stats.TotalBytesRead += Payload.GetSize();
SimulateLatency(0, Payload.GetSize());
return Payload;
}
diff --git a/src/zenutil/include/zenutil/buildstorage.h b/src/zenutil/include/zenutil/buildstorage.h
index 9c236310f..9d2bab170 100644
--- a/src/zenutil/include/zenutil/buildstorage.h
+++ b/src/zenutil/include/zenutil/buildstorage.h
@@ -40,7 +40,10 @@ public:
std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter,
std::function<void(uint64_t, bool)>&& OnSentBytes) = 0;
- virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash) = 0;
+ virtual IoBuffer GetBuildBlob(const Oid& BuildId,
+ const IoHash& RawHash,
+ uint64_t RangeOffset = 0,
+ uint64_t RangeBytes = (uint64_t)-1) = 0;
virtual std::vector<std::function<void()>> GetLargeBuildBlob(
const Oid& BuildId,
const IoHash& RawHash,
diff --git a/src/zenutil/include/zenutil/jupiter/jupitersession.h b/src/zenutil/include/zenutil/jupiter/jupitersession.h
index 852271868..2c5fc73b8 100644
--- a/src/zenutil/include/zenutil/jupiter/jupitersession.h
+++ b/src/zenutil/include/zenutil/jupiter/jupitersession.h
@@ -123,7 +123,9 @@ public:
std::string_view BucketId,
const Oid& BuildId,
const IoHash& Hash,
- std::filesystem::path TempFolderPath);
+ std::filesystem::path TempFolderPath,
+ uint64_t Offset = 0,
+ uint64_t Size = (uint64_t)-1);
JupiterResult PutMultipartBuildBlob(std::string_view Namespace,
std::string_view BucketId,
diff --git a/src/zenutil/jupiter/jupiterbuildstorage.cpp b/src/zenutil/jupiter/jupiterbuildstorage.cpp
index 309885b05..bf89ce785 100644
--- a/src/zenutil/jupiter/jupiterbuildstorage.cpp
+++ b/src/zenutil/jupiter/jupiterbuildstorage.cpp
@@ -217,13 +217,15 @@ public:
return WorkList;
}
- virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash) override
+ virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override
{
ZEN_TRACE_CPU("Jupiter::GetBuildBlob");
- Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- JupiterResult GetBuildBlobResult = m_Session.GetBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, m_TempFolderPath);
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ CreateDirectories(m_TempFolderPath);
+ JupiterResult GetBuildBlobResult =
+ m_Session.GetBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, m_TempFolderPath, RangeOffset, RangeBytes);
AddStatistic(GetBuildBlobResult);
if (!GetBuildBlobResult.Success)
{
diff --git a/src/zenutil/jupiter/jupitersession.cpp b/src/zenutil/jupiter/jupitersession.cpp
index 06ac6ae36..68f214c06 100644
--- a/src/zenutil/jupiter/jupitersession.cpp
+++ b/src/zenutil/jupiter/jupitersession.cpp
@@ -698,11 +698,19 @@ JupiterSession::GetBuildBlob(std::string_view Namespace,
std::string_view BucketId,
const Oid& BuildId,
const IoHash& Hash,
- std::filesystem::path TempFolderPath)
+ std::filesystem::path TempFolderPath,
+ uint64_t Offset,
+ uint64_t Size)
{
+ HttpClient::KeyValueMap Headers;
+ if (Offset != 0 || Size != (uint64_t)-1)
+ {
+ Headers.Entries.insert({"Range", fmt::format("bytes={}-{}", Offset, Offset + Size - 1)});
+ }
HttpClient::Response Response =
m_HttpClient.Download(fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}", Namespace, BucketId, BuildId, Hash.ToHexString()),
- TempFolderPath);
+ TempFolderPath,
+ Headers);
return detail::ConvertResponse(Response, "JupiterSession::GetBuildBlob"sv);
}