aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-05-20 18:43:41 +0200
committerGitHub Enterprise <[email protected]>2025-05-20 18:43:41 +0200
commit47cba21817608be9232e9d0a55bf01a313d4be2c (patch)
treed6fdc355af5c4b6f4a494b70dbb43ab4db9cf4d7 /src
parentMerge pull request #402 from ue-foundation/zs/fix-oplog-import-oplog-creation (diff)
downloadzen-47cba21817608be9232e9d0a55bf01a313d4be2c.tar.xz
zen-47cba21817608be9232e9d0a55bf01a313d4be2c.zip
use explicit capture for lambdas (#404)
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/builds_cmd.cpp2103
-rw-r--r--src/zenutil/jupiter/jupitersession.cpp47
2 files changed, 1254 insertions, 896 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index 1a570d6da..d998ed3e8 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -1962,41 +1962,131 @@ namespace {
for (const IoHash& ChunkAttachment : ChunkAttachments)
{
- Work.ScheduleWork(NetworkPool, [&, ChunkAttachment](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("ValidateBuildPart_GetChunk");
+ Work.ScheduleWork(NetworkPool,
+ [&Storage,
+ &NetworkPool,
+ &VerifyPool,
+ &Work,
+ &DownloadStats,
+ &ValidateStats,
+ AttachmentsToVerifyCount,
+ &TempFolder,
+ BuildId = Oid(BuildId),
+ PreferredMultipartChunkSize,
+ &FilteredDownloadedBytesPerSecond,
+ &FilteredVerifiedBytesPerSecond,
+ ChunkAttachment](std::atomic<bool>&) {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("ValidateBuildPart_GetChunk");
+
+ FilteredDownloadedBytesPerSecond.Start();
+ DownloadLargeBlob(
+ Storage,
+ TempFolder,
+ BuildId,
+ ChunkAttachment,
+ PreferredMultipartChunkSize,
+ Work,
+ NetworkPool,
+ DownloadStats,
+ [&Work,
+ &VerifyPool,
+ &DownloadStats,
+ &ValidateStats,
+ AttachmentsToVerifyCount,
+ &FilteredDownloadedBytesPerSecond,
+ &FilteredVerifiedBytesPerSecond,
+ ChunkHash = ChunkAttachment](IoBuffer&& Payload) {
+ Payload.SetContentType(ZenContentType::kCompressedBinary);
+ if (!AbortFlag)
+ {
+ Work.ScheduleWork(
+ VerifyPool,
+ [&DownloadStats,
+ &ValidateStats,
+ AttachmentsToVerifyCount,
+ &FilteredDownloadedBytesPerSecond,
+ &FilteredVerifiedBytesPerSecond,
+ Payload = std::move(Payload),
+ ChunkHash](std::atomic<bool>&) mutable {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("ValidateBuildPart_Validate");
+
+ if (DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount ==
+ AttachmentsToVerifyCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+
+ FilteredVerifiedBytesPerSecond.Start();
+
+ uint64_t CompressedSize;
+ uint64_t DecompressedSize;
+ ValidateBlob(std::move(Payload), ChunkHash, CompressedSize, DecompressedSize);
+ ValidateStats.VerifiedAttachmentCount++;
+ ValidateStats.VerifiedByteCount += DecompressedSize;
+ if (ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount)
+ {
+ FilteredVerifiedBytesPerSecond.Stop();
+ }
+ }
+ });
+ }
+ });
+ }
+ });
+ }
- FilteredDownloadedBytesPerSecond.Start();
- DownloadLargeBlob(
- Storage,
- TempFolder,
- BuildId,
- ChunkAttachment,
- PreferredMultipartChunkSize,
- Work,
- NetworkPool,
- DownloadStats,
- [&, ChunkHash = ChunkAttachment](IoBuffer&& Payload) {
- Payload.SetContentType(ZenContentType::kCompressedBinary);
- if (!AbortFlag)
- {
- Work.ScheduleWork(VerifyPool, [&, Payload = std::move(Payload), ChunkHash](std::atomic<bool>&) mutable {
+ for (const IoHash& BlockAttachment : BlockAttachments)
+ {
+ Work.ScheduleWork(
+ NetworkPool,
+ [&Storage,
+ &BuildId,
+ &Work,
+ &VerifyPool,
+ &DownloadStats,
+ &ValidateStats,
+ AttachmentsToVerifyCount,
+ &FilteredDownloadedBytesPerSecond,
+ &FilteredVerifiedBytesPerSecond,
+ BlockAttachment](std::atomic<bool>&) {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("ValidateBuildPart_GetBlock");
+
+ FilteredDownloadedBytesPerSecond.Start();
+ IoBuffer Payload = Storage.GetBuildBlob(BuildId, BlockAttachment);
+ DownloadStats.DownloadedBlockCount++;
+ DownloadStats.DownloadedBlockByteCount += Payload.GetSize();
+ if (DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount == AttachmentsToVerifyCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ if (!Payload)
+ {
+ throw std::runtime_error(fmt::format("Block attachment {} could not be found", BlockAttachment));
+ }
+ if (!AbortFlag)
+ {
+ Work.ScheduleWork(
+ VerifyPool,
+ [&FilteredVerifiedBytesPerSecond,
+ AttachmentsToVerifyCount,
+ &ValidateStats,
+ Payload = std::move(Payload),
+ BlockAttachment](std::atomic<bool>&) mutable {
if (!AbortFlag)
{
- ZEN_TRACE_CPU("ValidateBuildPart_Validate");
-
- if (DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount ==
- AttachmentsToVerifyCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
+ ZEN_TRACE_CPU("ValidateBuildPart_ValidateBlock");
FilteredVerifiedBytesPerSecond.Start();
uint64_t CompressedSize;
uint64_t DecompressedSize;
- ValidateBlob(std::move(Payload), ChunkHash, CompressedSize, DecompressedSize);
+ ValidateChunkBlock(std::move(Payload), BlockAttachment, CompressedSize, DecompressedSize);
ValidateStats.VerifiedAttachmentCount++;
ValidateStats.VerifiedByteCount += DecompressedSize;
if (ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount)
@@ -2005,54 +2095,9 @@ namespace {
}
}
});
- }
- });
- }
- });
- }
-
- for (const IoHash& BlockAttachment : BlockAttachments)
- {
- Work.ScheduleWork(NetworkPool, [&, BlockAttachment](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("ValidateBuildPart_GetBlock");
-
- FilteredDownloadedBytesPerSecond.Start();
- IoBuffer Payload = Storage.GetBuildBlob(BuildId, BlockAttachment);
- DownloadStats.DownloadedBlockCount++;
- DownloadStats.DownloadedBlockByteCount += Payload.GetSize();
- if (DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount == AttachmentsToVerifyCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
- if (!Payload)
- {
- throw std::runtime_error(fmt::format("Block attachment {} could not be found", BlockAttachment));
- }
- if (!AbortFlag)
- {
- Work.ScheduleWork(VerifyPool, [&, Payload = std::move(Payload), BlockAttachment](std::atomic<bool>&) mutable {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("ValidateBuildPart_ValidateBlock");
-
- FilteredVerifiedBytesPerSecond.Start();
-
- uint64_t CompressedSize;
- uint64_t DecompressedSize;
- ValidateChunkBlock(std::move(Payload), BlockAttachment, CompressedSize, DecompressedSize);
- ValidateStats.VerifiedAttachmentCount++;
- ValidateStats.VerifiedByteCount += DecompressedSize;
- if (ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount)
- {
- FilteredVerifiedBytesPerSecond.Stop();
- }
- }
- });
+ }
}
- }
- });
+ });
}
Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) {
@@ -2321,134 +2366,163 @@ namespace {
break;
}
const std::vector<uint32_t>& ChunksInBlock = NewBlockChunks[BlockIndex];
- Work.ScheduleWork(GenerateBlobsPool, [&, BlockIndex](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("GenerateBuildBlocks_Generate");
-
- FilteredGeneratedBytesPerSecond.Start();
- // TODO: Convert ScheduleWork body to function
-
- Stopwatch GenerateTimer;
- CompressedBuffer CompressedBlock =
- GenerateBlock(Path, Content, Lookup, ChunksInBlock, OutBlocks.BlockDescriptions[BlockIndex], DiskStats);
- ZEN_CONSOLE_VERBOSE("Generated block {} ({}) containing {} chunks in {}",
- OutBlocks.BlockDescriptions[BlockIndex].BlockHash,
- NiceBytes(CompressedBlock.GetCompressedSize()),
- OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size(),
- NiceTimeSpanMs(GenerateTimer.GetElapsedTimeMs()));
-
- OutBlocks.BlockSizes[BlockIndex] = CompressedBlock.GetCompressedSize();
+ Work.ScheduleWork(
+ GenerateBlobsPool,
+ [&Storage,
+ BuildId,
+ &Path,
+ &Content,
+ &Lookup,
+ &Work,
+ &UploadBlocksPool,
+ NewBlockCount,
+ ChunksInBlock,
+ &Lock,
+ &OutBlocks,
+ &DiskStats,
+ &GenerateBlocksStats,
+ &UploadStats,
+ &FilteredGeneratedBytesPerSecond,
+ &QueuedPendingBlocksForUpload,
+ &FilteredUploadedBytesPerSecond,
+ BlockIndex](std::atomic<bool>&) {
+ if (!AbortFlag)
{
- CbObjectWriter Writer;
- Writer.AddString("createdBy", "zen");
- OutBlocks.BlockMetaDatas[BlockIndex] = Writer.Save();
- }
- GenerateBlocksStats.GeneratedBlockByteCount += OutBlocks.BlockSizes[BlockIndex];
- GenerateBlocksStats.GeneratedBlockCount++;
+ ZEN_TRACE_CPU("GenerateBuildBlocks_Generate");
+
+ FilteredGeneratedBytesPerSecond.Start();
+ // TODO: Convert ScheduleWork body to function
+
+ Stopwatch GenerateTimer;
+ CompressedBuffer CompressedBlock =
+ GenerateBlock(Path, Content, Lookup, ChunksInBlock, OutBlocks.BlockDescriptions[BlockIndex], DiskStats);
+ ZEN_CONSOLE_VERBOSE("Generated block {} ({}) containing {} chunks in {}",
+ OutBlocks.BlockDescriptions[BlockIndex].BlockHash,
+ NiceBytes(CompressedBlock.GetCompressedSize()),
+ OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size(),
+ NiceTimeSpanMs(GenerateTimer.GetElapsedTimeMs()));
- Lock.WithExclusiveLock([&]() {
- OutBlocks.BlockHashToBlockIndex.insert_or_assign(OutBlocks.BlockDescriptions[BlockIndex].BlockHash, BlockIndex);
- });
+ OutBlocks.BlockSizes[BlockIndex] = CompressedBlock.GetCompressedSize();
+ {
+ CbObjectWriter Writer;
+ Writer.AddString("createdBy", "zen");
+ OutBlocks.BlockMetaDatas[BlockIndex] = Writer.Save();
+ }
+ GenerateBlocksStats.GeneratedBlockByteCount += OutBlocks.BlockSizes[BlockIndex];
+ GenerateBlocksStats.GeneratedBlockCount++;
- {
- std::span<const SharedBuffer> Segments = CompressedBlock.GetCompressed().GetSegments();
- ZEN_ASSERT(Segments.size() >= 2);
- OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]);
- }
+ Lock.WithExclusiveLock([&]() {
+ OutBlocks.BlockHashToBlockIndex.insert_or_assign(OutBlocks.BlockDescriptions[BlockIndex].BlockHash,
+ BlockIndex);
+ });
- if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount)
- {
- FilteredGeneratedBytesPerSecond.Stop();
- }
+ {
+ std::span<const SharedBuffer> Segments = CompressedBlock.GetCompressed().GetSegments();
+ ZEN_ASSERT(Segments.size() >= 2);
+ OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]);
+ }
- if (QueuedPendingBlocksForUpload.load() > 16)
- {
- std::span<const SharedBuffer> Segments = CompressedBlock.GetCompressed().GetSegments();
- ZEN_ASSERT(Segments.size() >= 2);
- OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]);
- }
- else
- {
- if (!AbortFlag)
+ if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount)
{
- QueuedPendingBlocksForUpload++;
+ FilteredGeneratedBytesPerSecond.Stop();
+ }
- Work.ScheduleWork(
- UploadBlocksPool,
- [&, BlockIndex, Payload = std::move(CompressedBlock)](std::atomic<bool>&) mutable {
- auto _ = MakeGuard([&QueuedPendingBlocksForUpload] { QueuedPendingBlocksForUpload--; });
- if (!AbortFlag)
- {
- if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount)
- {
- ZEN_TRACE_CPU("GenerateBuildBlocks_Save");
+ if (QueuedPendingBlocksForUpload.load() > 16)
+ {
+ std::span<const SharedBuffer> Segments = CompressedBlock.GetCompressed().GetSegments();
+ ZEN_ASSERT(Segments.size() >= 2);
+ OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]);
+ }
+ else
+ {
+ if (!AbortFlag)
+ {
+ QueuedPendingBlocksForUpload++;
- FilteredUploadedBytesPerSecond.Stop();
- std::span<const SharedBuffer> Segments = Payload.GetCompressed().GetSegments();
- ZEN_ASSERT(Segments.size() >= 2);
- OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]);
- }
- else
+ Work.ScheduleWork(
+ UploadBlocksPool,
+ [&Storage,
+ BuildId,
+ NewBlockCount,
+ &FilteredUploadedBytesPerSecond,
+ &QueuedPendingBlocksForUpload,
+ &GenerateBlocksStats,
+ &UploadStats,
+ &OutBlocks,
+ BlockIndex,
+ Payload = std::move(CompressedBlock)](std::atomic<bool>&) mutable {
+ auto _ = MakeGuard([&QueuedPendingBlocksForUpload] { QueuedPendingBlocksForUpload--; });
+ if (!AbortFlag)
{
- ZEN_TRACE_CPU("GenerateBuildBlocks_Upload");
+ if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount)
+ {
+ ZEN_TRACE_CPU("GenerateBuildBlocks_Save");
- FilteredUploadedBytesPerSecond.Start();
- // TODO: Convert ScheduleWork body to function
+ FilteredUploadedBytesPerSecond.Stop();
+ std::span<const SharedBuffer> Segments = Payload.GetCompressed().GetSegments();
+ ZEN_ASSERT(Segments.size() >= 2);
+ OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]);
+ }
+ else
+ {
+ ZEN_TRACE_CPU("GenerateBuildBlocks_Upload");
- const CbObject BlockMetaData =
- BuildChunkBlockDescription(OutBlocks.BlockDescriptions[BlockIndex],
- OutBlocks.BlockMetaDatas[BlockIndex]);
+ FilteredUploadedBytesPerSecond.Start();
+ // TODO: Convert ScheduleWork body to function
- const IoHash& BlockHash = OutBlocks.BlockDescriptions[BlockIndex].BlockHash;
- const uint64_t CompressedBlockSize = Payload.GetCompressedSize();
+ const CbObject BlockMetaData =
+ BuildChunkBlockDescription(OutBlocks.BlockDescriptions[BlockIndex],
+ OutBlocks.BlockMetaDatas[BlockIndex]);
- if (Storage.BuildCacheStorage)
- {
- Storage.BuildCacheStorage->PutBuildBlob(BuildId,
- BlockHash,
- ZenContentType::kCompressedBinary,
- Payload.GetCompressed());
- }
+ const IoHash& BlockHash = OutBlocks.BlockDescriptions[BlockIndex].BlockHash;
+ const uint64_t CompressedBlockSize = Payload.GetCompressedSize();
+
+ if (Storage.BuildCacheStorage)
+ {
+ Storage.BuildCacheStorage->PutBuildBlob(BuildId,
+ BlockHash,
+ ZenContentType::kCompressedBinary,
+ Payload.GetCompressed());
+ }
- Storage.BuildStorage->PutBuildBlob(BuildId,
- BlockHash,
- ZenContentType::kCompressedBinary,
- std::move(Payload).GetCompressed());
- UploadStats.BlocksBytes += CompressedBlockSize;
+ Storage.BuildStorage->PutBuildBlob(BuildId,
+ BlockHash,
+ ZenContentType::kCompressedBinary,
+ std::move(Payload).GetCompressed());
+ UploadStats.BlocksBytes += CompressedBlockSize;
- ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks",
- BlockHash,
- NiceBytes(CompressedBlockSize),
- OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size());
+ ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks",
+ BlockHash,
+ NiceBytes(CompressedBlockSize),
+ OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size());
- if (Storage.BuildCacheStorage)
- {
- Storage.BuildCacheStorage->PutBlobMetadatas(BuildId,
- std::vector<IoHash>({BlockHash}),
- std::vector<CbObject>({BlockMetaData}));
- }
+ if (Storage.BuildCacheStorage)
+ {
+ Storage.BuildCacheStorage->PutBlobMetadatas(BuildId,
+ std::vector<IoHash>({BlockHash}),
+ std::vector<CbObject>({BlockMetaData}));
+ }
- Storage.BuildStorage->PutBlockMetadata(BuildId, BlockHash, BlockMetaData);
- ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})",
- BlockHash,
- NiceBytes(BlockMetaData.GetSize()));
+ Storage.BuildStorage->PutBlockMetadata(BuildId, BlockHash, BlockMetaData);
+ ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})",
+ BlockHash,
+ NiceBytes(BlockMetaData.GetSize()));
- OutBlocks.MetaDataHasBeenUploaded[BlockIndex] = true;
+ OutBlocks.MetaDataHasBeenUploaded[BlockIndex] = true;
- UploadStats.BlocksBytes += BlockMetaData.GetSize();
- UploadStats.BlockCount++;
- if (UploadStats.BlockCount == NewBlockCount)
- {
- FilteredUploadedBytesPerSecond.Stop();
+ UploadStats.BlocksBytes += BlockMetaData.GetSize();
+ UploadStats.BlockCount++;
+ if (UploadStats.BlockCount == NewBlockCount)
+ {
+ FilteredUploadedBytesPerSecond.Stop();
+ }
}
}
- }
- });
+ });
+ }
}
}
- }
- });
+ });
}
Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) {
@@ -2558,10 +2632,22 @@ namespace {
const size_t UploadBlockCount = BlockIndexes.size();
const uint32_t UploadChunkCount = gsl::narrow<uint32_t>(LooseChunkOrderIndexes.size());
- auto AsyncUploadBlock = [&](const size_t BlockIndex,
- const IoHash BlockHash,
- CompositeBuffer&& Payload,
- std::atomic<uint64_t>& QueuedPendingInMemoryBlocksForUpload) {
+ auto AsyncUploadBlock = [&Storage,
+ &BuildId,
+ &Work,
+ &ZenFolderPath,
+ &NewBlocks,
+ UploadBlockCount,
+ &UploadedBlockCount,
+ UploadChunkCount,
+ &UploadedChunkCount,
+ &UploadedBlockSize,
+ &UploadStats,
+ &FilteredUploadedBytesPerSecond,
+ &UploadChunkPool](const size_t BlockIndex,
+ const IoHash BlockHash,
+ CompositeBuffer&& Payload,
+ std::atomic<uint64_t>& QueuedPendingInMemoryBlocksForUpload) {
bool IsInMemoryBlock = true;
if (QueuedPendingInMemoryBlocksForUpload.load() > 16)
{
@@ -2576,7 +2662,21 @@ namespace {
Work.ScheduleWork(
UploadChunkPool,
- [&, IsInMemoryBlock, BlockIndex, BlockHash, Payload = std::move(Payload)](std::atomic<bool>&) mutable {
+ [&Storage,
+ &BuildId,
+ &QueuedPendingInMemoryBlocksForUpload,
+ &NewBlocks,
+ UploadBlockCount,
+ &UploadedBlockCount,
+ UploadChunkCount,
+ &UploadedChunkCount,
+ &UploadedBlockSize,
+ &UploadStats,
+ &FilteredUploadedBytesPerSecond,
+ IsInMemoryBlock,
+ BlockIndex,
+ BlockHash,
+ Payload = std::move(Payload)](std::atomic<bool>&) mutable {
auto _ = MakeGuard([IsInMemoryBlock, &QueuedPendingInMemoryBlocksForUpload] {
if (IsInMemoryBlock)
{
@@ -2628,10 +2728,37 @@ namespace {
});
};
- auto AsyncUploadLooseChunk = [&](const IoHash& RawHash, uint64_t RawSize, CompositeBuffer&& Payload) {
+ auto AsyncUploadLooseChunk = [&Storage,
+ BuildId,
+ LargeAttachmentSize,
+ &Work,
+ &UploadChunkPool,
+ &FilteredUploadedBytesPerSecond,
+ &UploadedBlockCount,
+ &UploadedChunkCount,
+ UploadBlockCount,
+ UploadChunkCount,
+ &UploadedCompressedChunkSize,
+ &UploadedRawChunkSize,
+ &UploadStats](const IoHash& RawHash, uint64_t RawSize, CompositeBuffer&& Payload) {
Work.ScheduleWork(
UploadChunkPool,
- [&, RawHash, RawSize, Payload = CompositeBuffer(std::move(Payload))](std::atomic<bool>&) mutable {
+ [&Storage,
+ BuildId,
+ &Work,
+ LargeAttachmentSize,
+ &FilteredUploadedBytesPerSecond,
+ &UploadChunkPool,
+ &UploadedBlockCount,
+ &UploadedChunkCount,
+ UploadBlockCount,
+ UploadChunkCount,
+ &UploadedCompressedChunkSize,
+ &UploadedRawChunkSize,
+ &UploadStats,
+ RawHash,
+ RawSize,
+ Payload = CompositeBuffer(std::move(Payload))](std::atomic<bool>&) mutable {
if (!AbortFlag)
{
ZEN_TRACE_CPU("AsyncUploadLooseChunk");
@@ -2660,7 +2787,16 @@ namespace {
PartPayload.SetContentType(ZenContentType::kBinary);
return PartPayload;
},
- [&, RawSize](uint64_t SentBytes, bool IsComplete) {
+ [RawSize,
+ &UploadStats,
+ &UploadedCompressedChunkSize,
+ &UploadChunkPool,
+ &UploadedBlockCount,
+ UploadBlockCount,
+ &UploadedChunkCount,
+ UploadChunkCount,
+ &FilteredUploadedBytesPerSecond,
+ &UploadedRawChunkSize](uint64_t SentBytes, bool IsComplete) {
UploadStats.ChunksBytes += SentBytes;
UploadedCompressedChunkSize += SentBytes;
if (IsComplete)
@@ -2718,56 +2854,71 @@ namespace {
const IoHash& BlockHash = NewBlocks.BlockDescriptions[BlockIndex].BlockHash;
if (!AbortFlag)
{
- Work.ScheduleWork(ReadChunkPool, [&, BlockIndex](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UploadPartBlobs_GenerateBlock");
+ Work.ScheduleWork(
+ ReadChunkPool,
+ [BlockHash = IoHash(BlockHash),
+ BlockIndex,
+ &FilteredGenerateBlockBytesPerSecond,
+ Path,
+ &Content,
+ &Lookup,
+ &NewBlocks,
+ &NewBlockChunks,
+ &GenerateBlockIndexes,
+ &GeneratedBlockCount,
+ &GeneratedBlockByteCount,
+ &DiskStats,
+ &AsyncUploadBlock,
+ &QueuedPendingInMemoryBlocksForUpload](std::atomic<bool>&) {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("UploadPartBlobs_GenerateBlock");
- FilteredGenerateBlockBytesPerSecond.Start();
+ FilteredGenerateBlockBytesPerSecond.Start();
- Stopwatch GenerateTimer;
- CompositeBuffer Payload;
- if (NewBlocks.BlockHeaders[BlockIndex])
- {
- Payload = RebuildBlock(Path,
- Content,
- Lookup,
- std::move(NewBlocks.BlockHeaders[BlockIndex]),
- NewBlockChunks[BlockIndex],
- DiskStats)
- .GetCompressed();
- }
- else
- {
- ChunkBlockDescription BlockDescription;
- CompressedBuffer CompressedBlock =
- GenerateBlock(Path, Content, Lookup, NewBlockChunks[BlockIndex], BlockDescription, DiskStats);
- if (!CompressedBlock)
+ Stopwatch GenerateTimer;
+ CompositeBuffer Payload;
+ if (NewBlocks.BlockHeaders[BlockIndex])
{
- throw std::runtime_error(fmt::format("Failed generating block {}", BlockHash));
+ Payload = RebuildBlock(Path,
+ Content,
+ Lookup,
+ std::move(NewBlocks.BlockHeaders[BlockIndex]),
+ NewBlockChunks[BlockIndex],
+ DiskStats)
+ .GetCompressed();
+ }
+ else
+ {
+ ChunkBlockDescription BlockDescription;
+ CompressedBuffer CompressedBlock =
+ GenerateBlock(Path, Content, Lookup, NewBlockChunks[BlockIndex], BlockDescription, DiskStats);
+ if (!CompressedBlock)
+ {
+ throw std::runtime_error(fmt::format("Failed generating block {}", BlockHash));
+ }
+ ZEN_ASSERT(BlockDescription.BlockHash == BlockHash);
+ Payload = std::move(CompressedBlock).GetCompressed();
}
- ZEN_ASSERT(BlockDescription.BlockHash == BlockHash);
- Payload = std::move(CompressedBlock).GetCompressed();
- }
- GeneratedBlockByteCount += NewBlocks.BlockSizes[BlockIndex];
- GeneratedBlockCount++;
- if (GeneratedBlockCount == GenerateBlockIndexes.size())
- {
- FilteredGenerateBlockBytesPerSecond.Stop();
- }
- ZEN_CONSOLE_VERBOSE("{} block {} ({}) containing {} chunks in {}",
- NewBlocks.BlockHeaders[BlockIndex] ? "Regenerated" : "Generated",
- NewBlocks.BlockDescriptions[BlockIndex].BlockHash,
- NiceBytes(NewBlocks.BlockSizes[BlockIndex]),
- NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size(),
- NiceTimeSpanMs(GenerateTimer.GetElapsedTimeMs()));
- if (!AbortFlag)
- {
- AsyncUploadBlock(BlockIndex, BlockHash, std::move(Payload), QueuedPendingInMemoryBlocksForUpload);
+ GeneratedBlockByteCount += NewBlocks.BlockSizes[BlockIndex];
+ GeneratedBlockCount++;
+ if (GeneratedBlockCount == GenerateBlockIndexes.size())
+ {
+ FilteredGenerateBlockBytesPerSecond.Stop();
+ }
+ ZEN_CONSOLE_VERBOSE("{} block {} ({}) containing {} chunks in {}",
+ NewBlocks.BlockHeaders[BlockIndex] ? "Regenerated" : "Generated",
+ NewBlocks.BlockDescriptions[BlockIndex].BlockHash,
+ NiceBytes(NewBlocks.BlockSizes[BlockIndex]),
+ NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size(),
+ NiceTimeSpanMs(GenerateTimer.GetElapsedTimeMs()));
+ if (!AbortFlag)
+ {
+ AsyncUploadBlock(BlockIndex, BlockHash, std::move(Payload), QueuedPendingInMemoryBlocksForUpload);
+ }
}
- }
- });
+ });
}
}
@@ -2775,32 +2926,43 @@ namespace {
for (const uint32_t LooseChunkOrderIndex : LooseChunkOrderIndexes)
{
const uint32_t ChunkIndex = LooseChunkIndexes[LooseChunkOrderIndex];
- Work.ScheduleWork(ReadChunkPool, [&, ChunkIndex](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UploadPartBlobs_CompressChunk");
-
- FilteredCompressedBytesPerSecond.Start();
- Stopwatch CompressTimer;
- CompositeBuffer Payload =
- CompressChunk(Path, Content, Lookup, ChunkIndex, ZenTempChunkFolderPath(ZenFolderPath), LooseChunksStats);
- ZEN_CONSOLE_VERBOSE("Compressed chunk {} ({} -> {}) in {}",
- Content.ChunkedContent.ChunkHashes[ChunkIndex],
- NiceBytes(Content.ChunkedContent.ChunkRawSizes[ChunkIndex]),
- NiceBytes(Payload.GetSize()),
- NiceTimeSpanMs(CompressTimer.GetElapsedTimeMs()));
- const uint64_t ChunkRawSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex];
- UploadStats.ReadFromDiskBytes += ChunkRawSize;
- if (LooseChunksStats.CompressedChunkCount == LooseChunkOrderIndexes.size())
- {
- FilteredCompressedBytesPerSecond.Stop();
- }
+ Work.ScheduleWork(
+ ReadChunkPool,
+ [&Path,
+ &Content,
+ &Lookup,
+ &ZenFolderPath,
+ &LooseChunksStats,
+ &LooseChunkOrderIndexes,
+ &FilteredCompressedBytesPerSecond,
+ &UploadStats,
+ &AsyncUploadLooseChunk,
+ ChunkIndex](std::atomic<bool>&) {
if (!AbortFlag)
{
- AsyncUploadLooseChunk(Content.ChunkedContent.ChunkHashes[ChunkIndex], ChunkRawSize, std::move(Payload));
+ ZEN_TRACE_CPU("UploadPartBlobs_CompressChunk");
+
+ FilteredCompressedBytesPerSecond.Start();
+ Stopwatch CompressTimer;
+ CompositeBuffer Payload =
+ CompressChunk(Path, Content, Lookup, ChunkIndex, ZenTempChunkFolderPath(ZenFolderPath), LooseChunksStats);
+ ZEN_CONSOLE_VERBOSE("Compressed chunk {} ({} -> {}) in {}",
+ Content.ChunkedContent.ChunkHashes[ChunkIndex],
+ NiceBytes(Content.ChunkedContent.ChunkRawSizes[ChunkIndex]),
+ NiceBytes(Payload.GetSize()),
+ NiceTimeSpanMs(CompressTimer.GetElapsedTimeMs()));
+ const uint64_t ChunkRawSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex];
+ UploadStats.ReadFromDiskBytes += ChunkRawSize;
+ if (LooseChunksStats.CompressedChunkCount == LooseChunkOrderIndexes.size())
+ {
+ FilteredCompressedBytesPerSecond.Stop();
+ }
+ if (!AbortFlag)
+ {
+ AsyncUploadLooseChunk(Content.ChunkedContent.ChunkHashes[ChunkIndex], ChunkRawSize, std::move(Payload));
+ }
}
- }
- });
+ });
}
Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) {
@@ -3649,7 +3811,19 @@ namespace {
PutBuildPartResult.second.size());
IoHash PartHash = PutBuildPartResult.first;
- auto UploadAttachments = [&](std::span<IoHash> RawHashes) {
+ auto UploadAttachments = [&Storage,
+ &BuildId,
+ &Path,
+ &ZenFolderPath,
+ &LocalContent,
+ &LocalLookup,
+ &NewBlockChunks,
+ &NewBlocks,
+ &LooseChunkIndexes,
+ &LargeAttachmentSize,
+ &DiskStats,
+ &UploadStats,
+ &LooseChunksStats](std::span<IoHash> RawHashes) {
if (!AbortFlag)
{
UploadStatistics TempUploadStats;
@@ -4089,7 +4263,8 @@ namespace {
Work.ScheduleWork(
VerifyPool,
- [&, PathIndex](std::atomic<bool>&) {
+ [&Path, &Content, &Lookup, &ErrorLock, &Errors, &VerifyFolderStats, VerifyFileHash, &IsAcceptedFolder, PathIndex](
+ std::atomic<bool>&) {
if (!AbortFlag)
{
ZEN_TRACE_CPU("VerifyFile_work");
@@ -5048,11 +5223,18 @@ namespace {
Work.ScheduleWork(
WritePool,
- [&,
+ [&ZenFolderPath,
+ &RemoteContent,
+ &RemoteLookup,
SequenceIndexChunksLeftToWriteCounters,
+ &Work,
+ &WritePool,
CompressedChunkPath,
RemoteChunkIndex,
TotalPartWriteCount,
+ &DiskStats,
+ &WritePartsComplete,
+ &FilteredWrittenBytesPerSecond,
ChunkTargetPtrs = std::move(ChunkTargetPtrs),
CompressedPart = std::move(Payload)](std::atomic<bool>&) mutable {
ZEN_TRACE_CPU("UpdateFolder_WriteChunk");
@@ -6215,40 +6397,51 @@ namespace {
}
if (!PrimeCacheOnly)
{
- Work.ScheduleWork(WritePool, [&, ScavengeOpIndex](std::atomic<bool>&) mutable {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_WriteScavanged");
+ Work.ScheduleWork(
+ WritePool,
+ [&RemoteContent,
+ &CacheFolderPath,
+ &ScavengedPaths,
+ &ScavengeCopyOperations,
+ &ScavengedContents,
+ &FilteredWrittenBytesPerSecond,
+ ScavengeOpIndex,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ &DiskStats](std::atomic<bool>&) mutable {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("UpdateFolder_WriteScavanged");
- FilteredWrittenBytesPerSecond.Start();
+ FilteredWrittenBytesPerSecond.Start();
- const ScavengeCopyOperation& ScavengeOp = ScavengeCopyOperations[ScavengeOpIndex];
- const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengeOp.ScavengedContentIndex];
- const std::filesystem::path ScavengedPath = ScavengedContent.Paths[ScavengeOp.ScavengedPathIndex];
+ const ScavengeCopyOperation& ScavengeOp = ScavengeCopyOperations[ScavengeOpIndex];
+ const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengeOp.ScavengedContentIndex];
+ const std::filesystem::path ScavengedPath = ScavengedContent.Paths[ScavengeOp.ScavengedPathIndex];
- const std::filesystem::path ScavengedFilePath =
- (ScavengedPaths[ScavengeOp.ScavengedContentIndex] / ScavengedPath).make_preferred();
- ZEN_ASSERT_SLOW(FileSizeFromPath(ScavengedFilePath) == ScavengeOp.RawSize);
+ const std::filesystem::path ScavengedFilePath =
+ (ScavengedPaths[ScavengeOp.ScavengedContentIndex] / ScavengedPath).make_preferred();
+ ZEN_ASSERT_SLOW(FileSizeFromPath(ScavengedFilePath) == ScavengeOp.RawSize);
- const IoHash& RemoteSequenceRawHash =
- RemoteContent.ChunkedContent.SequenceRawHashes[ScavengeOp.RemoteSequenceIndex];
- const std::filesystem::path TempFilePath =
- GetTempChunkedSequenceFileName(CacheFolderPath, RemoteSequenceRawHash);
+ const IoHash& RemoteSequenceRawHash =
+ RemoteContent.ChunkedContent.SequenceRawHashes[ScavengeOp.RemoteSequenceIndex];
+ const std::filesystem::path TempFilePath =
+ GetTempChunkedSequenceFileName(CacheFolderPath, RemoteSequenceRawHash);
- const uint64_t RawSize = ScavengedContent.RawSizes[ScavengeOp.ScavengedContentIndex];
- CopyFile(ScavengedFilePath, TempFilePath, RawSize, DiskStats.WriteCount, DiskStats.WriteByteCount);
+ const uint64_t RawSize = ScavengedContent.RawSizes[ScavengeOp.ScavengedContentIndex];
+ CopyFile(ScavengedFilePath, TempFilePath, RawSize, DiskStats.WriteCount, DiskStats.WriteByteCount);
- const std::filesystem::path CacheFilePath =
- GetFinalChunkedSequenceFileName(CacheFolderPath, RemoteSequenceRawHash);
- RenameFile(TempFilePath, CacheFilePath);
+ const std::filesystem::path CacheFilePath =
+ GetFinalChunkedSequenceFileName(CacheFolderPath, RemoteSequenceRawHash);
+ RenameFile(TempFilePath, CacheFilePath);
- WritePartsComplete++;
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
}
- }
- });
+ });
}
}
@@ -6273,7 +6466,31 @@ namespace {
Work.ScheduleWork(
WritePool,
- [&, RemoteChunkIndex, ChunkTargetPtrs, BuildId, TotalRequestCount, TotalPartWriteCount](std::atomic<bool>&) mutable {
+ [&Storage,
+ &Path,
+ &ZenFolderPath,
+ &RemoteContent,
+ &RemoteLookup,
+ &CacheFolderPath,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &Work,
+ &WritePool,
+ &NetworkPool,
+ PrimeCacheOnly,
+ &ExistsResult,
+ &DiskStats,
+ &DownloadStats,
+ &WriteChunkStats,
+ &WritePartsComplete,
+ RemoteChunkIndex,
+ ChunkTargetPtrs,
+ BuildId = Oid(BuildId),
+ LargeAttachmentSize,
+ PreferredMultipartChunkSize,
+ TotalRequestCount,
+ TotalPartWriteCount,
+ &FilteredDownloadedBytesPerSecond,
+ &FilteredWrittenBytesPerSecond](std::atomic<bool>&) mutable {
if (!AbortFlag)
{
ZEN_TRACE_CPU("UpdateFolder_ReadPreDownloaded");
@@ -6392,7 +6609,7 @@ namespace {
[&Path,
&ZenFolderPath,
&Storage,
- BuildId,
+ BuildId = Oid(BuildId),
&PrimeCacheOnly,
&RemoteContent,
&RemoteLookup,
@@ -6453,50 +6670,67 @@ namespace {
if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize)
{
ZEN_TRACE_CPU("UpdateFolder_GetLargeChunk");
- DownloadLargeBlob(
- *Storage.BuildStorage,
- ZenTempDownloadFolderPath(ZenFolderPath),
- BuildId,
- ChunkHash,
- PreferredMultipartChunkSize,
- Work,
- NetworkPool,
- DownloadStats,
- [&, TotalPartWriteCount, TotalRequestCount, RemoteChunkIndex, ChunkTargetPtrs](
- IoBuffer&& Payload) mutable {
- DownloadStats.RequestsCompleteCount++;
- if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
- if (Payload && Storage.BuildCacheStorage)
- {
- Storage.BuildCacheStorage->PutBuildBlob(
- BuildId,
- ChunkHash,
- ZenContentType::kCompressedBinary,
- CompositeBuffer(SharedBuffer(Payload)));
- }
- if (!PrimeCacheOnly)
- {
- if (!AbortFlag)
- {
- AsyncWriteDownloadedChunk(ZenFolderPath,
- RemoteContent,
- RemoteLookup,
- RemoteChunkIndex,
- std::move(ChunkTargetPtrs),
- Work,
- WritePool,
- std::move(Payload),
- SequenceIndexChunksLeftToWriteCounters,
- WritePartsComplete,
- TotalPartWriteCount,
- FilteredWrittenBytesPerSecond,
- DiskStats);
- }
- }
- });
+ DownloadLargeBlob(*Storage.BuildStorage,
+ ZenTempDownloadFolderPath(ZenFolderPath),
+ BuildId,
+ ChunkHash,
+ PreferredMultipartChunkSize,
+ Work,
+ NetworkPool,
+ DownloadStats,
+ [&Storage,
+ &ZenFolderPath,
+ &RemoteContent,
+ &RemoteLookup,
+ BuildId,
+ PrimeCacheOnly,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &Work,
+ &WritePool,
+ ChunkHash,
+ TotalPartWriteCount,
+ TotalRequestCount,
+ &WritePartsComplete,
+ &FilteredWrittenBytesPerSecond,
+ &FilteredDownloadedBytesPerSecond,
+ &DownloadStats,
+ &DiskStats,
+ RemoteChunkIndex,
+ ChunkTargetPtrs](IoBuffer&& Payload) mutable {
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ if (Payload && Storage.BuildCacheStorage)
+ {
+ Storage.BuildCacheStorage->PutBuildBlob(
+ BuildId,
+ ChunkHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(SharedBuffer(Payload)));
+ }
+ if (!PrimeCacheOnly)
+ {
+ if (!AbortFlag)
+ {
+ AsyncWriteDownloadedChunk(
+ ZenFolderPath,
+ RemoteContent,
+ RemoteLookup,
+ RemoteChunkIndex,
+ std::move(ChunkTargetPtrs),
+ Work,
+ WritePool,
+ std::move(Payload),
+ SequenceIndexChunksLeftToWriteCounters,
+ WritePartsComplete,
+ TotalPartWriteCount,
+ FilteredWrittenBytesPerSecond,
+ DiskStats);
+ }
+ }
+ });
}
else
{
@@ -6559,180 +6793,200 @@ namespace {
break;
}
- Work.ScheduleWork(WritePool, [&, CopyDataIndex](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_CopyLocal");
-
- FilteredWrittenBytesPerSecond.Start();
- const CacheCopyData& CopyData = CacheCopyDatas[CopyDataIndex];
-
- std::filesystem::path SourceFilePath;
-
- if (CopyData.ScavengeSourceIndex == (uint32_t)-1)
- {
- const uint32_t LocalPathIndex = LocalLookup.SequenceIndexFirstPathIndex[CopyData.SourceSequenceIndex];
- SourceFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred();
- }
- else
+ Work.ScheduleWork(
+ WritePool,
+ [&Path,
+ &LocalContent,
+ &RemoteContent,
+ &RemoteLookup,
+ &CacheFolderPath,
+ &LocalLookup,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &Work,
+ &WritePool,
+ &FilteredWrittenBytesPerSecond,
+ &CacheCopyDatas,
+ &ScavengedContents,
+ &ScavengedLookups,
+ &ScavengedPaths,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ &DiskStats,
+ CopyDataIndex](std::atomic<bool>&) {
+ if (!AbortFlag)
{
- const ChunkedFolderContent& ScavengedContent = ScavengedContents[CopyData.ScavengeSourceIndex];
- const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[CopyData.ScavengeSourceIndex];
- const std::filesystem::path ScavengedPath = ScavengedPaths[CopyData.ScavengeSourceIndex];
- const uint32_t ScavengedPathIndex = ScavengedLookup.SequenceIndexFirstPathIndex[CopyData.SourceSequenceIndex];
- SourceFilePath = (ScavengedPath / ScavengedContent.Paths[ScavengedPathIndex]).make_preferred();
- }
- ZEN_ASSERT_SLOW(IsFile(SourceFilePath));
- ZEN_ASSERT(!CopyData.TargetChunkLocationPtrs.empty());
-
- uint64_t CacheLocalFileBytesRead = 0;
+ ZEN_TRACE_CPU("UpdateFolder_CopyLocal");
- size_t TargetStart = 0;
- const std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> AllTargets(
- CopyData.TargetChunkLocationPtrs);
-
- struct WriteOp
- {
- const ChunkedContentLookup::ChunkSequenceLocation* Target = nullptr;
- uint64_t CacheFileOffset = (uint64_t)-1;
- uint32_t ChunkIndex = (uint32_t)-1;
- };
+ FilteredWrittenBytesPerSecond.Start();
+ const CacheCopyData& CopyData = CacheCopyDatas[CopyDataIndex];
- std::vector<WriteOp> WriteOps;
+ std::filesystem::path SourceFilePath;
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("Sort");
- WriteOps.reserve(AllTargets.size());
- for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets)
+ if (CopyData.ScavengeSourceIndex == (uint32_t)-1)
{
- std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> TargetRange =
- AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount);
- for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange)
- {
- WriteOps.push_back(WriteOp{.Target = Target,
- .CacheFileOffset = ChunkTarget.CacheFileOffset,
- .ChunkIndex = ChunkTarget.RemoteChunkIndex});
- }
- TargetStart += ChunkTarget.TargetChunkLocationCount;
+ const uint32_t LocalPathIndex = LocalLookup.SequenceIndexFirstPathIndex[CopyData.SourceSequenceIndex];
+ SourceFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred();
}
+ else
+ {
+ const ChunkedFolderContent& ScavengedContent = ScavengedContents[CopyData.ScavengeSourceIndex];
+ const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[CopyData.ScavengeSourceIndex];
+ const std::filesystem::path ScavengedPath = ScavengedPaths[CopyData.ScavengeSourceIndex];
+ const uint32_t ScavengedPathIndex =
+ ScavengedLookup.SequenceIndexFirstPathIndex[CopyData.SourceSequenceIndex];
+ SourceFilePath = (ScavengedPath / ScavengedContent.Paths[ScavengedPathIndex]).make_preferred();
+ }
+ ZEN_ASSERT_SLOW(IsFile(SourceFilePath));
+ ZEN_ASSERT(!CopyData.TargetChunkLocationPtrs.empty());
- std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) {
- if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
- {
- return true;
- }
- else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
- {
- return false;
- }
- if (Lhs.Target->Offset < Rhs.Target->Offset)
- {
- return true;
- }
- return false;
- });
- }
+ uint64_t CacheLocalFileBytesRead = 0;
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("Write");
+ size_t TargetStart = 0;
+ const std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> AllTargets(
+ CopyData.TargetChunkLocationPtrs);
- tsl::robin_set<uint32_t> ChunkIndexesWritten;
+ struct WriteOp
+ {
+ const ChunkedContentLookup::ChunkSequenceLocation* Target = nullptr;
+ uint64_t CacheFileOffset = (uint64_t)-1;
+ uint32_t ChunkIndex = (uint32_t)-1;
+ };
- BufferedOpenFile SourceFile(SourceFilePath, DiskStats);
- WriteFileCache OpenFileCache(DiskStats);
- for (size_t WriteOpIndex = 0; WriteOpIndex < WriteOps.size();)
+ std::vector<WriteOp> WriteOps;
+
+ if (!AbortFlag)
{
- if (AbortFlag)
+ ZEN_TRACE_CPU("Sort");
+ WriteOps.reserve(AllTargets.size());
+ for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets)
{
- break;
+ std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> TargetRange =
+ AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount);
+ for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange)
+ {
+ WriteOps.push_back(WriteOp{.Target = Target,
+ .CacheFileOffset = ChunkTarget.CacheFileOffset,
+ .ChunkIndex = ChunkTarget.RemoteChunkIndex});
+ }
+ TargetStart += ChunkTarget.TargetChunkLocationCount;
}
- const WriteOp& Op = WriteOps[WriteOpIndex];
-
- const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
- ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() <=
- RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]);
- ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() > 0);
- const uint32_t RemotePathIndex = RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex];
- const uint64_t ChunkSize = RemoteContent.ChunkedContent.ChunkRawSizes[Op.ChunkIndex];
-
- uint64_t ReadLength = ChunkSize;
- size_t WriteCount = 1;
- uint64_t OpSourceEnd = Op.CacheFileOffset + ChunkSize;
- uint64_t OpTargetEnd = Op.Target->Offset + ChunkSize;
- while ((WriteOpIndex + WriteCount) < WriteOps.size())
- {
- const WriteOp& NextOp = WriteOps[WriteOpIndex + WriteCount];
- if (NextOp.Target->SequenceIndex != Op.Target->SequenceIndex)
+
+ std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) {
+ if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
{
- break;
+ return true;
}
- if (NextOp.Target->Offset != OpTargetEnd)
+ else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
{
- break;
+ return false;
}
- if (NextOp.CacheFileOffset != OpSourceEnd)
+ if (Lhs.Target->Offset < Rhs.Target->Offset)
{
- break;
+ return true;
}
- const uint64_t NextChunkLength = RemoteContent.ChunkedContent.ChunkRawSizes[NextOp.ChunkIndex];
- if (ReadLength + NextChunkLength > 512u * 1024u)
+ return false;
+ });
+ }
+
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("Write");
+
+ tsl::robin_set<uint32_t> ChunkIndexesWritten;
+
+ BufferedOpenFile SourceFile(SourceFilePath, DiskStats);
+ WriteFileCache OpenFileCache(DiskStats);
+ for (size_t WriteOpIndex = 0; WriteOpIndex < WriteOps.size();)
+ {
+ if (AbortFlag)
{
break;
}
- ReadLength += NextChunkLength;
- OpSourceEnd += NextChunkLength;
- OpTargetEnd += NextChunkLength;
- WriteCount++;
- }
+ const WriteOp& Op = WriteOps[WriteOpIndex];
+
+ const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() <=
+ RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]);
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() > 0);
+ const uint32_t RemotePathIndex = RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex];
+ const uint64_t ChunkSize = RemoteContent.ChunkedContent.ChunkRawSizes[Op.ChunkIndex];
+
+ uint64_t ReadLength = ChunkSize;
+ size_t WriteCount = 1;
+ uint64_t OpSourceEnd = Op.CacheFileOffset + ChunkSize;
+ uint64_t OpTargetEnd = Op.Target->Offset + ChunkSize;
+ while ((WriteOpIndex + WriteCount) < WriteOps.size())
+ {
+ const WriteOp& NextOp = WriteOps[WriteOpIndex + WriteCount];
+ if (NextOp.Target->SequenceIndex != Op.Target->SequenceIndex)
+ {
+ break;
+ }
+ if (NextOp.Target->Offset != OpTargetEnd)
+ {
+ break;
+ }
+ if (NextOp.CacheFileOffset != OpSourceEnd)
+ {
+ break;
+ }
+ const uint64_t NextChunkLength = RemoteContent.ChunkedContent.ChunkRawSizes[NextOp.ChunkIndex];
+ if (ReadLength + NextChunkLength > 512u * 1024u)
+ {
+ break;
+ }
+ ReadLength += NextChunkLength;
+ OpSourceEnd += NextChunkLength;
+ OpTargetEnd += NextChunkLength;
+ WriteCount++;
+ }
- CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ReadLength);
- ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]);
+ CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ReadLength);
+ ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]);
- OpenFileCache.WriteToFile<CompositeBuffer>(
- RemoteSequenceIndex,
- [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) {
- return GetTempChunkedSequenceFileName(
- CacheFolderPath,
- RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
- },
- ChunkSource,
- Op.Target->Offset,
- RemoteContent.RawSizes[RemotePathIndex]);
+ OpenFileCache.WriteToFile<CompositeBuffer>(
+ RemoteSequenceIndex,
+ [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) {
+ return GetTempChunkedSequenceFileName(
+ CacheFolderPath,
+ RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
+ },
+ ChunkSource,
+ Op.Target->Offset,
+ RemoteContent.RawSizes[RemotePathIndex]);
- CacheLocalFileBytesRead += ReadLength; // TODO: This should be the sum of unique chunk sizes?
+ CacheLocalFileBytesRead += ReadLength; // TODO: This should be the sum of unique chunk sizes?
- WriteOpIndex += WriteCount;
+ WriteOpIndex += WriteCount;
+ }
}
- }
- if (!AbortFlag)
- {
- // Write tracking, updating this must be done without any files open (WriteFileCache)
- std::vector<uint32_t> CompletedChunkSequences;
- for (const WriteOp& Op : WriteOps)
+ if (!AbortFlag)
{
- const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
- if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters))
+ // Write tracking, updating this must be done without any files open (WriteFileCache)
+ std::vector<uint32_t> CompletedChunkSequences;
+ for (const WriteOp& Op : WriteOps)
{
- CompletedChunkSequences.push_back(RemoteSequenceIndex);
+ const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
+ if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters))
+ {
+ CompletedChunkSequences.push_back(RemoteSequenceIndex);
+ }
}
+ VerifyAndCompleteChunkSequencesAsync(CacheFolderPath,
+ RemoteContent,
+ RemoteLookup,
+ CompletedChunkSequences,
+ Work,
+ WritePool);
+ ZEN_CONSOLE_VERBOSE("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), SourceFilePath);
+ }
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
}
- VerifyAndCompleteChunkSequencesAsync(CacheFolderPath,
- RemoteContent,
- RemoteLookup,
- CompletedChunkSequences,
- Work,
- WritePool);
- ZEN_CONSOLE_VERBOSE("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), SourceFilePath);
- }
- WritePartsComplete++;
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
}
- }
- });
+ });
}
for (uint32_t BlockIndex : CachedChunkBlockIndexes)
@@ -6743,52 +6997,67 @@ namespace {
break;
}
- Work.ScheduleWork(WritePool, [&, BlockIndex](std::atomic<bool>&) mutable {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_WriteCachedBlock");
-
- const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
- FilteredWrittenBytesPerSecond.Start();
-
- std::filesystem::path BlockChunkPath =
- ZenTempBlockFolderPath(ZenFolderPath) / BlockDescription.BlockHash.ToHexString();
- IoBuffer BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
- if (!BlockBuffer)
- {
- throw std::runtime_error(
- fmt::format("Can not read block {} at {}", BlockDescription.BlockHash, BlockChunkPath));
- }
-
+ Work.ScheduleWork(
+ WritePool,
+ [&ZenFolderPath,
+ &CacheFolderPath,
+ &RemoteContent,
+ &RemoteLookup,
+ &RemoteChunkIndexNeedsCopyFromSourceFlags,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &Work,
+ &WritePool,
+ &BlockDescriptions,
+ &FilteredWrittenBytesPerSecond,
+ &DiskStats,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ BlockIndex](std::atomic<bool>&) mutable {
if (!AbortFlag)
{
- if (!WriteBlockToDisk(CacheFolderPath,
- RemoteContent,
- BlockDescription,
- SequenceIndexChunksLeftToWriteCounters,
- Work,
- WritePool,
- CompositeBuffer(std::move(BlockBuffer)),
- RemoteLookup,
- RemoteChunkIndexNeedsCopyFromSourceFlags,
- DiskStats))
+ ZEN_TRACE_CPU("UpdateFolder_WriteCachedBlock");
+
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+ FilteredWrittenBytesPerSecond.Start();
+
+ std::filesystem::path BlockChunkPath =
+ ZenTempBlockFolderPath(ZenFolderPath) / BlockDescription.BlockHash.ToHexString();
+ IoBuffer BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
+ if (!BlockBuffer)
{
- std::error_code DummyEc;
- RemoveFile(BlockChunkPath, DummyEc);
- throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash));
+ throw std::runtime_error(
+ fmt::format("Can not read block {} at {}", BlockDescription.BlockHash, BlockChunkPath));
}
- TryRemoveFile(BlockChunkPath);
+ if (!AbortFlag)
+ {
+ if (!WriteBlockToDisk(CacheFolderPath,
+ RemoteContent,
+ BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ WritePool,
+ CompositeBuffer(std::move(BlockBuffer)),
+ RemoteLookup,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ DiskStats))
+ {
+ std::error_code DummyEc;
+ RemoveFile(BlockChunkPath, DummyEc);
+ throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash));
+ }
- WritePartsComplete++;
+ TryRemoveFile(BlockChunkPath);
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
+ WritePartsComplete++;
+
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
}
}
- }
- });
+ });
}
for (size_t BlockRangeIndex = 0; BlockRangeIndex < BlockRangeWorks.size(); BlockRangeIndex++)
@@ -6801,214 +7070,66 @@ namespace {
const BlockRangeDescriptor BlockRange = BlockRangeWorks[BlockRangeIndex];
ZEN_ASSERT(BlockRange.BlockIndex != (uint32_t)-1);
const uint32_t BlockIndex = BlockRange.BlockIndex;
- Work.ScheduleWork(NetworkPool, [&, BlockIndex, BlockRange](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_GetPartialBlock");
-
- const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
-
- FilteredDownloadedBytesPerSecond.Start();
- IoBuffer BlockBuffer;
- if (Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash))
- {
- BlockBuffer = Storage.BuildCacheStorage->GetBuildBlob(BuildId,
- BlockDescription.BlockHash,
- BlockRange.RangeStart,
- BlockRange.RangeLength);
- }
- if (!BlockBuffer)
- {
- BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId,
- BlockDescription.BlockHash,
- BlockRange.RangeStart,
- BlockRange.RangeLength);
- }
- if (!BlockBuffer)
- {
- throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
- }
+ Work.ScheduleWork(
+ NetworkPool,
+ [&Storage,
+ &ZenFolderPath,
+ BuildId,
+ &RemoteLookup,
+ &BlockDescriptions,
+ &RemoteChunkIndexNeedsCopyFromSourceFlags,
+ &CacheFolderPath,
+ &RemoteContent,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &ExistsResult,
+ &FilteredDownloadedBytesPerSecond,
+ TotalRequestCount,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ &FilteredWrittenBytesPerSecond,
+ &DiskStats,
+ &DownloadStats,
+ &Work,
+ &WritePool,
+ BlockIndex,
+ BlockRange](std::atomic<bool>&) {
if (!AbortFlag)
{
- uint64_t BlockSize = BlockBuffer.GetSize();
- DownloadStats.DownloadedBlockCount++;
- DownloadStats.DownloadedBlockByteCount += BlockSize;
- DownloadStats.RequestsCompleteCount++;
- if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
+ ZEN_TRACE_CPU("UpdateFolder_GetPartialBlock");
- std::filesystem::path BlockChunkPath;
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
- // Check if the dowloaded block is file based and we can move it directly without rewriting it
+ FilteredDownloadedBytesPerSecond.Start();
+ IoBuffer BlockBuffer;
+ if (Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash))
{
- IoBufferFileReference FileRef;
- if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) &&
- (FileRef.FileChunkSize == BlockSize))
- {
- ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock");
-
- std::error_code Ec;
- std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
- if (!Ec)
- {
- BlockBuffer.SetDeleteOnClose(false);
- BlockBuffer = {};
- BlockChunkPath = ZenTempBlockFolderPath(ZenFolderPath) / fmt::format("{}_{:x}_{:x}",
- BlockDescription.BlockHash,
- BlockRange.RangeStart,
- BlockRange.RangeLength);
- RenameFile(TempBlobPath, BlockChunkPath, Ec);
- if (Ec)
- {
- BlockChunkPath = std::filesystem::path{};
-
- // Re-open the temp file again
- BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
- BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true);
- BlockBuffer.SetDeleteOnClose(true);
- }
- }
- }
+ BlockBuffer = Storage.BuildCacheStorage->GetBuildBlob(BuildId,
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength);
}
-
- if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u))
+ if (!BlockBuffer)
{
- ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock");
- // Could not be moved and rather large, lets store it on disk
- BlockChunkPath =
- ZenTempBlockFolderPath(ZenFolderPath) /
- fmt::format("{}_{:x}_{:x}", BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength);
- TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
- BlockBuffer = {};
+ BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId,
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength);
}
-
- if (!AbortFlag)
- {
- Work.ScheduleWork(
- WritePool,
- [&, BlockIndex, BlockRange, BlockChunkPath, BlockPartialBuffer = std::move(BlockBuffer)](
- std::atomic<bool>&) mutable {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_WritePartialBlock");
-
- const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
-
- if (BlockChunkPath.empty())
- {
- ZEN_ASSERT(BlockPartialBuffer);
- }
- else
- {
- ZEN_ASSERT(!BlockPartialBuffer);
- BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
- if (!BlockPartialBuffer)
- {
- throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}",
- BlockDescription.BlockHash,
- BlockChunkPath));
- }
- }
-
- FilteredWrittenBytesPerSecond.Start();
-
- if (!WritePartialBlockToDisk(
- CacheFolderPath,
- RemoteContent,
- BlockDescription,
- SequenceIndexChunksLeftToWriteCounters,
- Work,
- WritePool,
- CompositeBuffer(std::move(BlockPartialBuffer)),
- BlockRange.ChunkBlockIndexStart,
- BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1,
- RemoteLookup,
- RemoteChunkIndexNeedsCopyFromSourceFlags,
- DiskStats))
- {
- std::error_code DummyEc;
- RemoveFile(BlockChunkPath, DummyEc);
- throw std::runtime_error(
- fmt::format("Partial block {} is malformed", BlockDescription.BlockHash));
- }
-
- if (!BlockChunkPath.empty())
- {
- TryRemoveFile(BlockChunkPath);
- }
-
- WritePartsComplete++;
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
- }
- }
- });
- }
- }
- }
- });
- }
-
- for (uint32_t BlockIndex : FullBlockWorks)
- {
- if (AbortFlag)
- {
- break;
- }
-
- if (PrimeCacheOnly && ExistsResult.ExistingBlobs.contains(BlockDescriptions[BlockIndex].BlockHash))
- {
- DownloadStats.RequestsCompleteCount++;
- continue;
- }
-
- Work.ScheduleWork(NetworkPool, [&, BlockIndex](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_GetFullBlock");
-
- const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
-
- FilteredDownloadedBytesPerSecond.Start();
-
- IoBuffer BlockBuffer;
- const bool ExistsInCache =
- Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash);
- if (ExistsInCache)
- {
- BlockBuffer = Storage.BuildCacheStorage->GetBuildBlob(BuildId, BlockDescription.BlockHash);
- }
- if (!BlockBuffer)
- {
- BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId, BlockDescription.BlockHash);
- if (BlockBuffer && Storage.BuildCacheStorage)
+ if (!BlockBuffer)
{
- Storage.BuildCacheStorage->PutBuildBlob(BuildId,
- BlockDescription.BlockHash,
- BlockBuffer.GetContentType(),
- CompositeBuffer(SharedBuffer(BlockBuffer)));
+ throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
}
- }
- if (!BlockBuffer)
- {
- throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
- }
- if (!AbortFlag)
- {
- uint64_t BlockSize = BlockBuffer.GetSize();
- DownloadStats.DownloadedBlockCount++;
- DownloadStats.DownloadedBlockByteCount += BlockSize;
- DownloadStats.RequestsCompleteCount++;
- if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ if (!AbortFlag)
{
- FilteredDownloadedBytesPerSecond.Stop();
- }
+ uint64_t BlockSize = BlockBuffer.GetSize();
+ DownloadStats.DownloadedBlockCount++;
+ DownloadStats.DownloadedBlockByteCount += BlockSize;
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
- if (!PrimeCacheOnly)
- {
std::filesystem::path BlockChunkPath;
// Check if the dowloaded block is file based and we can move it directly without rewriting it
@@ -7018,14 +7139,17 @@ namespace {
(FileRef.FileChunkSize == BlockSize))
{
ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock");
+
std::error_code Ec;
std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
if (!Ec)
{
BlockBuffer.SetDeleteOnClose(false);
- BlockBuffer = {};
- BlockChunkPath =
- ZenTempBlockFolderPath(ZenFolderPath) / BlockDescription.BlockHash.ToHexString();
+ BlockBuffer = {};
+ BlockChunkPath = ZenTempBlockFolderPath(ZenFolderPath) / fmt::format("{}_{:x}_{:x}",
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength);
RenameFile(TempBlobPath, BlockChunkPath, Ec);
if (Ec)
{
@@ -7044,7 +7168,10 @@ namespace {
{
ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock");
// Could not be moved and rather large, lets store it on disk
- BlockChunkPath = ZenTempBlockFolderPath(ZenFolderPath) / BlockDescription.BlockHash.ToHexString();
+ BlockChunkPath = ZenTempBlockFolderPath(ZenFolderPath) / fmt::format("{}_{:x}_{:x}",
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength);
TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
BlockBuffer = {};
}
@@ -7053,60 +7180,64 @@ namespace {
{
Work.ScheduleWork(
WritePool,
- [&Work,
- &WritePool,
+ [&CacheFolderPath,
&RemoteContent,
&RemoteLookup,
- CacheFolderPath,
+ &BlockDescriptions,
&RemoteChunkIndexNeedsCopyFromSourceFlags,
&SequenceIndexChunksLeftToWriteCounters,
- BlockIndex,
- &BlockDescriptions,
- &WriteChunkStats,
- &DiskStats,
&WritePartsComplete,
+ &Work,
TotalPartWriteCount,
+ &WritePool,
+ &DiskStats,
&FilteredWrittenBytesPerSecond,
+ BlockIndex,
+ BlockRange,
BlockChunkPath,
- BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable {
+ BlockPartialBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable {
if (!AbortFlag)
{
- ZEN_TRACE_CPU("UpdateFolder_WriteFullBlock");
+ ZEN_TRACE_CPU("UpdateFolder_WritePartialBlock");
const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
if (BlockChunkPath.empty())
{
- ZEN_ASSERT(BlockBuffer);
+ ZEN_ASSERT(BlockPartialBuffer);
}
else
{
- ZEN_ASSERT(!BlockBuffer);
- BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
- if (!BlockBuffer)
+ ZEN_ASSERT(!BlockPartialBuffer);
+ BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
+ if (!BlockPartialBuffer)
{
- throw std::runtime_error(fmt::format("Could not open dowloaded block {} from {}",
+ throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}",
BlockDescription.BlockHash,
BlockChunkPath));
}
}
FilteredWrittenBytesPerSecond.Start();
- if (!WriteBlockToDisk(CacheFolderPath,
- RemoteContent,
- BlockDescription,
- SequenceIndexChunksLeftToWriteCounters,
- Work,
- WritePool,
- CompositeBuffer(std::move(BlockBuffer)),
- RemoteLookup,
- RemoteChunkIndexNeedsCopyFromSourceFlags,
- DiskStats))
+
+ if (!WritePartialBlockToDisk(
+ CacheFolderPath,
+ RemoteContent,
+ BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ WritePool,
+ CompositeBuffer(std::move(BlockPartialBuffer)),
+ BlockRange.ChunkBlockIndexStart,
+ BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1,
+ RemoteLookup,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ DiskStats))
{
std::error_code DummyEc;
RemoveFile(BlockChunkPath, DummyEc);
throw std::runtime_error(
- fmt::format("Block {} is malformed", BlockDescription.BlockHash));
+ fmt::format("Partial block {} is malformed", BlockDescription.BlockHash));
}
if (!BlockChunkPath.empty())
@@ -7115,7 +7246,6 @@ namespace {
}
WritePartsComplete++;
-
if (WritePartsComplete == TotalPartWriteCount)
{
FilteredWrittenBytesPerSecond.Stop();
@@ -7125,8 +7255,208 @@ namespace {
}
}
}
- }
- });
+ });
+ }
+
+ for (uint32_t BlockIndex : FullBlockWorks)
+ {
+ if (AbortFlag)
+ {
+ break;
+ }
+
+ if (PrimeCacheOnly && ExistsResult.ExistingBlobs.contains(BlockDescriptions[BlockIndex].BlockHash))
+ {
+ DownloadStats.RequestsCompleteCount++;
+ continue;
+ }
+
+ Work.ScheduleWork(
+ NetworkPool,
+ [&Storage,
+ &ZenFolderPath,
+ BuildId,
+ PrimeCacheOnly,
+ &BlockDescriptions,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ &FilteredWrittenBytesPerSecond,
+ &ExistsResult,
+ &Work,
+ &WritePool,
+ &RemoteContent,
+ &RemoteLookup,
+ &CacheFolderPath,
+ &RemoteChunkIndexNeedsCopyFromSourceFlags,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &FilteredDownloadedBytesPerSecond,
+ &WriteChunkStats,
+ &DiskStats,
+ &DownloadStats,
+ TotalRequestCount,
+ BlockIndex](std::atomic<bool>&) {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("UpdateFolder_GetFullBlock");
+
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+
+ FilteredDownloadedBytesPerSecond.Start();
+
+ IoBuffer BlockBuffer;
+ const bool ExistsInCache =
+ Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash);
+ if (ExistsInCache)
+ {
+ BlockBuffer = Storage.BuildCacheStorage->GetBuildBlob(BuildId, BlockDescription.BlockHash);
+ }
+ if (!BlockBuffer)
+ {
+ BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId, BlockDescription.BlockHash);
+ if (BlockBuffer && Storage.BuildCacheStorage)
+ {
+ Storage.BuildCacheStorage->PutBuildBlob(BuildId,
+ BlockDescription.BlockHash,
+ BlockBuffer.GetContentType(),
+ CompositeBuffer(SharedBuffer(BlockBuffer)));
+ }
+ }
+ if (!BlockBuffer)
+ {
+ throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
+ }
+ if (!AbortFlag)
+ {
+ uint64_t BlockSize = BlockBuffer.GetSize();
+ DownloadStats.DownloadedBlockCount++;
+ DownloadStats.DownloadedBlockByteCount += BlockSize;
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+
+ if (!PrimeCacheOnly)
+ {
+ std::filesystem::path BlockChunkPath;
+
+ // Check if the dowloaded block is file based and we can move it directly without rewriting it
+ {
+ IoBufferFileReference FileRef;
+ if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) &&
+ (FileRef.FileChunkSize == BlockSize))
+ {
+ ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock");
+ std::error_code Ec;
+ std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
+ if (!Ec)
+ {
+ BlockBuffer.SetDeleteOnClose(false);
+ BlockBuffer = {};
+ BlockChunkPath =
+ ZenTempBlockFolderPath(ZenFolderPath) / BlockDescription.BlockHash.ToHexString();
+ RenameFile(TempBlobPath, BlockChunkPath, Ec);
+ if (Ec)
+ {
+ BlockChunkPath = std::filesystem::path{};
+
+ // Re-open the temp file again
+ BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
+ BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true);
+ BlockBuffer.SetDeleteOnClose(true);
+ }
+ }
+ }
+ }
+
+ if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u))
+ {
+ ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock");
+ // Could not be moved and rather large, lets store it on disk
+ BlockChunkPath = ZenTempBlockFolderPath(ZenFolderPath) / BlockDescription.BlockHash.ToHexString();
+ TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
+ BlockBuffer = {};
+ }
+
+ if (!AbortFlag)
+ {
+ Work.ScheduleWork(WritePool,
+ [&Work,
+ &WritePool,
+ &RemoteContent,
+ &RemoteLookup,
+ CacheFolderPath,
+ &RemoteChunkIndexNeedsCopyFromSourceFlags,
+ &SequenceIndexChunksLeftToWriteCounters,
+ BlockIndex,
+ &BlockDescriptions,
+ &WriteChunkStats,
+ &DiskStats,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ &FilteredWrittenBytesPerSecond,
+ BlockChunkPath,
+ BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("UpdateFolder_WriteFullBlock");
+
+ const ChunkBlockDescription& BlockDescription =
+ BlockDescriptions[BlockIndex];
+
+ if (BlockChunkPath.empty())
+ {
+ ZEN_ASSERT(BlockBuffer);
+ }
+ else
+ {
+ ZEN_ASSERT(!BlockBuffer);
+ BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
+ if (!BlockBuffer)
+ {
+ throw std::runtime_error(
+ fmt::format("Could not open dowloaded block {} from {}",
+ BlockDescription.BlockHash,
+ BlockChunkPath));
+ }
+ }
+
+ FilteredWrittenBytesPerSecond.Start();
+ if (!WriteBlockToDisk(CacheFolderPath,
+ RemoteContent,
+ BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ WritePool,
+ CompositeBuffer(std::move(BlockBuffer)),
+ RemoteLookup,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ DiskStats))
+ {
+ std::error_code DummyEc;
+ RemoveFile(BlockChunkPath, DummyEc);
+ throw std::runtime_error(
+ fmt::format("Block {} is malformed", BlockDescription.BlockHash));
+ }
+
+ if (!BlockChunkPath.empty())
+ {
+ TryRemoveFile(BlockChunkPath);
+ }
+
+ WritePartsComplete++;
+
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
+ }
+ });
+ }
+ }
+ }
+ }
+ });
}
{
@@ -7328,20 +7658,22 @@ namespace {
{
break;
}
- Work.ScheduleWork(WritePool, [&, LocalPathIndex](std::atomic<bool>&) {
- ZEN_TRACE_CPU("UpdateFolder_AsyncCopyToCache");
- if (!AbortFlag)
- {
- const IoHash& RawHash = LocalContent.RawHashes[LocalPathIndex];
- const std::filesystem::path& LocalPath = LocalContent.Paths[LocalPathIndex];
- const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash);
- ZEN_ASSERT_SLOW(!IsFileWithRetry(CacheFilePath));
- const std::filesystem::path LocalFilePath = (Path / LocalPath).make_preferred();
- RenameFileWithRetry(LocalFilePath, CacheFilePath);
- CachedCount++;
- CachedByteCount += LocalContent.RawSizes[LocalPathIndex];
- }
- });
+ Work.ScheduleWork(
+ WritePool,
+ [&Path, &LocalContent, &CacheFolderPath, &CachedCount, &CachedByteCount, LocalPathIndex](std::atomic<bool>&) {
+ ZEN_TRACE_CPU("UpdateFolder_AsyncCopyToCache");
+ if (!AbortFlag)
+ {
+ const IoHash& RawHash = LocalContent.RawHashes[LocalPathIndex];
+ const std::filesystem::path& LocalPath = LocalContent.Paths[LocalPathIndex];
+ const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash);
+ ZEN_ASSERT_SLOW(!IsFileWithRetry(CacheFilePath));
+ const std::filesystem::path LocalFilePath = (Path / LocalPath).make_preferred();
+ RenameFileWithRetry(LocalFilePath, CacheFilePath);
+ CachedCount++;
+ CachedByteCount += LocalContent.RawSizes[LocalPathIndex];
+ }
+ });
}
{
@@ -7420,7 +7752,7 @@ namespace {
{
break;
}
- Work.ScheduleWork(WritePool, [&, LocalPathIndex](std::atomic<bool>&) {
+ Work.ScheduleWork(WritePool, [&Path, &LocalContent, &DeletedCount, LocalPathIndex](std::atomic<bool>&) {
if (!AbortFlag)
{
const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred();
@@ -7471,164 +7803,183 @@ namespace {
TargetCount++;
}
- Work.ScheduleWork(WritePool, [&, BaseTargetOffset = TargetOffset, TargetCount](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("FinalizeTree_Work");
+ Work.ScheduleWork(
+ WritePool,
+ [&Path,
+ &LocalContent,
+ &SequenceHashToLocalPathIndex,
+ &RemoteContent,
+ &RemoteLookup,
+ &CacheFolderPath,
+ &Targets,
+ &RemotePathIndexToLocalPathIndex,
+ &RebuildFolderStateStats,
+ &OutLocalFolderState,
+ BaseTargetOffset = TargetOffset,
+ TargetCount,
+ &TargetsComplete](std::atomic<bool>&) {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("FinalizeTree_Work");
- size_t TargetOffset = BaseTargetOffset;
- const IoHash& RawHash = Targets[TargetOffset].RawHash;
+ size_t TargetOffset = BaseTargetOffset;
+ const IoHash& RawHash = Targets[TargetOffset].RawHash;
- if (RawHash == IoHash::Zero)
- {
- ZEN_TRACE_CPU("ZeroSize");
- while (TargetOffset < (BaseTargetOffset + TargetCount))
+ if (RawHash == IoHash::Zero)
{
- const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex;
- ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash);
- const std::filesystem::path& TargetPath = RemoteContent.Paths[RemotePathIndex];
- std::filesystem::path TargetFilePath = (Path / TargetPath).make_preferred();
- if (!RemotePathIndexToLocalPathIndex[RemotePathIndex])
+ ZEN_TRACE_CPU("ZeroSize");
+ while (TargetOffset < (BaseTargetOffset + TargetCount))
{
- if (IsFileWithRetry(TargetFilePath))
+ const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex;
+ ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash);
+ const std::filesystem::path& TargetPath = RemoteContent.Paths[RemotePathIndex];
+ std::filesystem::path TargetFilePath = (Path / TargetPath).make_preferred();
+ if (!RemotePathIndexToLocalPathIndex[RemotePathIndex])
{
- SetFileReadOnlyWithRetry(TargetFilePath, false);
- }
- else
- {
- CreateDirectories(TargetFilePath.parent_path());
+ if (IsFileWithRetry(TargetFilePath))
+ {
+ SetFileReadOnlyWithRetry(TargetFilePath, false);
+ }
+ else
+ {
+ CreateDirectories(TargetFilePath.parent_path());
+ }
+ BasicFile OutputFile;
+ OutputFile.Open(TargetFilePath, BasicFile::Mode::kTruncate);
}
- BasicFile OutputFile;
- OutputFile.Open(TargetFilePath, BasicFile::Mode::kTruncate);
+ OutLocalFolderState.Paths[RemotePathIndex] = TargetPath;
+ OutLocalFolderState.RawSizes[RemotePathIndex] = RemoteContent.RawSizes[RemotePathIndex];
+
+ OutLocalFolderState.Attributes[RemotePathIndex] =
+ RemoteContent.Attributes.empty()
+ ? GetNativeFileAttributes(TargetFilePath)
+ : SetNativeFileAttributes(TargetFilePath,
+ RemoteContent.Platform,
+ RemoteContent.Attributes[RemotePathIndex]);
+ OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath);
+
+ TargetOffset++;
+ TargetsComplete++;
}
- OutLocalFolderState.Paths[RemotePathIndex] = TargetPath;
- OutLocalFolderState.RawSizes[RemotePathIndex] = RemoteContent.RawSizes[RemotePathIndex];
-
- OutLocalFolderState.Attributes[RemotePathIndex] =
- RemoteContent.Attributes.empty() ? GetNativeFileAttributes(TargetFilePath)
- : SetNativeFileAttributes(TargetFilePath,
- RemoteContent.Platform,
- RemoteContent.Attributes[RemotePathIndex]);
- OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath);
-
- TargetOffset++;
- TargetsComplete++;
- }
- }
- else
- {
- ZEN_TRACE_CPU("Files");
- ZEN_ASSERT(RemoteLookup.RawHashToSequenceIndex.contains(RawHash));
- const uint32_t FirstRemotePathIndex = Targets[TargetOffset].RemotePathIndex;
- const std::filesystem::path& FirstTargetPath = RemoteContent.Paths[FirstRemotePathIndex];
- std::filesystem::path FirstTargetFilePath = (Path / FirstTargetPath).make_preferred();
-
- if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(FirstRemotePathIndex);
- InPlaceIt != RemotePathIndexToLocalPathIndex.end())
- {
- ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath));
}
else
{
- if (IsFileWithRetry(FirstTargetFilePath))
- {
- SetFileReadOnlyWithRetry(FirstTargetFilePath, false);
- }
- else
- {
- CreateDirectories(FirstTargetFilePath.parent_path());
- }
+ ZEN_TRACE_CPU("Files");
+ ZEN_ASSERT(RemoteLookup.RawHashToSequenceIndex.contains(RawHash));
+ const uint32_t FirstRemotePathIndex = Targets[TargetOffset].RemotePathIndex;
+ const std::filesystem::path& FirstTargetPath = RemoteContent.Paths[FirstRemotePathIndex];
+ std::filesystem::path FirstTargetFilePath = (Path / FirstTargetPath).make_preferred();
- if (auto InplaceIt = SequenceHashToLocalPathIndex.find(RawHash);
- InplaceIt != SequenceHashToLocalPathIndex.end())
+ if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(FirstRemotePathIndex);
+ InPlaceIt != RemotePathIndexToLocalPathIndex.end())
{
- ZEN_TRACE_CPU("Copy");
- const uint32_t LocalPathIndex = InplaceIt->second;
- const std::filesystem::path& SourcePath = LocalContent.Paths[LocalPathIndex];
- std::filesystem::path SourceFilePath = (Path / SourcePath).make_preferred();
- ZEN_ASSERT_SLOW(IsFileWithRetry(SourceFilePath));
-
- ZEN_DEBUG("Copying from '{}' -> '{}'", SourceFilePath, FirstTargetFilePath);
- const uint64_t RawSize = LocalContent.RawSizes[LocalPathIndex];
- std::atomic<uint64_t> WriteCount;
- std::atomic<uint64_t> WriteByteCount;
- CopyFile(SourceFilePath, FirstTargetFilePath, RawSize, WriteCount, WriteByteCount);
- RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++;
+ ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath));
}
else
{
- ZEN_TRACE_CPU("Rename");
- const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash);
- ZEN_ASSERT_SLOW(IsFileWithRetry(CacheFilePath));
+ if (IsFileWithRetry(FirstTargetFilePath))
+ {
+ SetFileReadOnlyWithRetry(FirstTargetFilePath, false);
+ }
+ else
+ {
+ CreateDirectories(FirstTargetFilePath.parent_path());
+ }
- RenameFileWithRetry(CacheFilePath, FirstTargetFilePath);
+ if (auto InplaceIt = SequenceHashToLocalPathIndex.find(RawHash);
+ InplaceIt != SequenceHashToLocalPathIndex.end())
+ {
+ ZEN_TRACE_CPU("Copy");
+ const uint32_t LocalPathIndex = InplaceIt->second;
+ const std::filesystem::path& SourcePath = LocalContent.Paths[LocalPathIndex];
+ std::filesystem::path SourceFilePath = (Path / SourcePath).make_preferred();
+ ZEN_ASSERT_SLOW(IsFileWithRetry(SourceFilePath));
+
+ ZEN_DEBUG("Copying from '{}' -> '{}'", SourceFilePath, FirstTargetFilePath);
+ const uint64_t RawSize = LocalContent.RawSizes[LocalPathIndex];
+ std::atomic<uint64_t> WriteCount;
+ std::atomic<uint64_t> WriteByteCount;
+ CopyFile(SourceFilePath, FirstTargetFilePath, RawSize, WriteCount, WriteByteCount);
+ RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++;
+ }
+ else
+ {
+ ZEN_TRACE_CPU("Rename");
+ const std::filesystem::path CacheFilePath =
+ GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash);
+ ZEN_ASSERT_SLOW(IsFileWithRetry(CacheFilePath));
- RebuildFolderStateStats.FinalizeTreeFilesMovedCount++;
- }
- }
+ RenameFileWithRetry(CacheFilePath, FirstTargetFilePath);
- OutLocalFolderState.Paths[FirstRemotePathIndex] = FirstTargetPath;
- OutLocalFolderState.RawSizes[FirstRemotePathIndex] = RemoteContent.RawSizes[FirstRemotePathIndex];
+ RebuildFolderStateStats.FinalizeTreeFilesMovedCount++;
+ }
+ }
- OutLocalFolderState.Attributes[FirstRemotePathIndex] =
- RemoteContent.Attributes.empty() ? GetNativeFileAttributes(FirstTargetFilePath)
- : SetNativeFileAttributes(FirstTargetFilePath,
- RemoteContent.Platform,
- RemoteContent.Attributes[FirstRemotePathIndex]);
- OutLocalFolderState.ModificationTicks[FirstRemotePathIndex] = GetModificationTickFromPath(FirstTargetFilePath);
+ OutLocalFolderState.Paths[FirstRemotePathIndex] = FirstTargetPath;
+ OutLocalFolderState.RawSizes[FirstRemotePathIndex] = RemoteContent.RawSizes[FirstRemotePathIndex];
- TargetOffset++;
- TargetsComplete++;
+ OutLocalFolderState.Attributes[FirstRemotePathIndex] =
+ RemoteContent.Attributes.empty()
+ ? GetNativeFileAttributes(FirstTargetFilePath)
+ : SetNativeFileAttributes(FirstTargetFilePath,
+ RemoteContent.Platform,
+ RemoteContent.Attributes[FirstRemotePathIndex]);
+ OutLocalFolderState.ModificationTicks[FirstRemotePathIndex] =
+ GetModificationTickFromPath(FirstTargetFilePath);
- while (TargetOffset < (BaseTargetOffset + TargetCount))
- {
- const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex;
- ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash);
- const std::filesystem::path& TargetPath = RemoteContent.Paths[RemotePathIndex];
- std::filesystem::path TargetFilePath = (Path / TargetPath).make_preferred();
+ TargetOffset++;
+ TargetsComplete++;
- if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(RemotePathIndex);
- InPlaceIt != RemotePathIndexToLocalPathIndex.end())
- {
- ZEN_ASSERT_SLOW(IsFileWithRetry(TargetFilePath));
- }
- else
+ while (TargetOffset < (BaseTargetOffset + TargetCount))
{
- ZEN_TRACE_CPU("Copy");
- if (IsFileWithRetry(TargetFilePath))
+ const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex;
+ ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash);
+ const std::filesystem::path& TargetPath = RemoteContent.Paths[RemotePathIndex];
+ std::filesystem::path TargetFilePath = (Path / TargetPath).make_preferred();
+
+ if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(RemotePathIndex);
+ InPlaceIt != RemotePathIndexToLocalPathIndex.end())
{
- SetFileReadOnlyWithRetry(TargetFilePath, false);
+ ZEN_ASSERT_SLOW(IsFileWithRetry(TargetFilePath));
}
else
{
- CreateDirectories(TargetFilePath.parent_path());
- }
+ ZEN_TRACE_CPU("Copy");
+ if (IsFileWithRetry(TargetFilePath))
+ {
+ SetFileReadOnlyWithRetry(TargetFilePath, false);
+ }
+ else
+ {
+ CreateDirectories(TargetFilePath.parent_path());
+ }
- ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath));
- ZEN_DEBUG("Copying from '{}' -> '{}'", FirstTargetFilePath, TargetFilePath);
- const uint64_t RawSize = RemoteContent.RawSizes[RemotePathIndex];
- std::atomic<uint64_t> WriteCount;
- std::atomic<uint64_t> WriteByteCount;
- CopyFile(FirstTargetFilePath, TargetFilePath, RawSize, WriteCount, WriteByteCount);
- RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++;
- }
+ ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath));
+ ZEN_DEBUG("Copying from '{}' -> '{}'", FirstTargetFilePath, TargetFilePath);
+ const uint64_t RawSize = RemoteContent.RawSizes[RemotePathIndex];
+ std::atomic<uint64_t> WriteCount;
+ std::atomic<uint64_t> WriteByteCount;
+ CopyFile(FirstTargetFilePath, TargetFilePath, RawSize, WriteCount, WriteByteCount);
+ RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++;
+ }
- OutLocalFolderState.Paths[RemotePathIndex] = TargetPath;
- OutLocalFolderState.RawSizes[RemotePathIndex] = RemoteContent.RawSizes[RemotePathIndex];
+ OutLocalFolderState.Paths[RemotePathIndex] = TargetPath;
+ OutLocalFolderState.RawSizes[RemotePathIndex] = RemoteContent.RawSizes[RemotePathIndex];
- OutLocalFolderState.Attributes[RemotePathIndex] =
- RemoteContent.Attributes.empty() ? GetNativeFileAttributes(TargetFilePath)
- : SetNativeFileAttributes(TargetFilePath,
- RemoteContent.Platform,
- RemoteContent.Attributes[RemotePathIndex]);
- OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath);
+ OutLocalFolderState.Attributes[RemotePathIndex] =
+ RemoteContent.Attributes.empty()
+ ? GetNativeFileAttributes(TargetFilePath)
+ : SetNativeFileAttributes(TargetFilePath,
+ RemoteContent.Platform,
+ RemoteContent.Attributes[RemotePathIndex]);
+ OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath);
- TargetOffset++;
- TargetsComplete++;
+ TargetOffset++;
+ TargetsComplete++;
+ }
}
}
- }
- });
+ });
TargetOffset += TargetCount;
}
diff --git a/src/zenutil/jupiter/jupitersession.cpp b/src/zenutil/jupiter/jupitersession.cpp
index de138f994..01a703a1b 100644
--- a/src/zenutil/jupiter/jupitersession.cpp
+++ b/src/zenutil/jupiter/jupitersession.cpp
@@ -697,28 +697,35 @@ JupiterSession::GetMultipartBuildBlob(std::string_view Namespace,
while (Offset < TotalSize)
{
uint64_t PartSize = Min(ChunkSize, TotalSize - Offset);
- OutWorkItems.emplace_back(
- [this, Namespace, BucketId, BuildId, Hash, TotalSize, Workload, Offset, PartSize]() -> JupiterResult {
- std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}",
- Namespace,
- BucketId,
- BuildId,
- Hash.ToHexString(),
- m_AllowRedirect ? "true"sv : "false"sv);
- HttpClient::Response Response = m_HttpClient.Get(
- RequestUrl,
- HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", Offset, Offset + PartSize - 1)}}));
- if (Response.IsSuccess())
+ OutWorkItems.emplace_back([this,
+ Namespace = std::string(Namespace),
+ BucketId = std::string(BucketId),
+ BuildId = Oid(BuildId),
+ Hash = IoHash(Hash),
+ TotalSize,
+ Workload,
+ Offset,
+ PartSize]() -> JupiterResult {
+ std::string RequestUrl = fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect={}",
+ Namespace,
+ BucketId,
+ BuildId,
+ Hash.ToHexString(),
+ m_AllowRedirect ? "true"sv : "false"sv);
+ HttpClient::Response Response = m_HttpClient.Get(
+ RequestUrl,
+ HttpClient::KeyValueMap({{"Range", fmt::format("bytes={}-{}", Offset, Offset + PartSize - 1)}}));
+ if (Response.IsSuccess())
+ {
+ Workload->OnReceive(Offset, Response.ResponsePayload);
+ uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Response.ResponsePayload.GetSize());
+ if (ByteRemaning == Response.ResponsePayload.GetSize())
{
- Workload->OnReceive(Offset, Response.ResponsePayload);
- uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Response.ResponsePayload.GetSize());
- if (ByteRemaning == Response.ResponsePayload.GetSize())
- {
- Workload->OnComplete();
- }
+ Workload->OnComplete();
}
- return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv);
- });
+ }
+ return detail::ConvertResponse(Response, "JupiterSession::GetMultipartBuildBlob"sv);
+ });
Offset += PartSize;
}
}