aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-05-16 19:51:36 +0200
committerGitHub Enterprise <[email protected]>2025-05-16 19:51:36 +0200
commit4e2efa1051e3eb86ab48d92b3f6ad5896cda5d81 (patch)
tree1fbab083b3fe8919a36caa2d925c933f696a5791 /src
parentvalidate custom fields (#399) (diff)
downloadzen-4e2efa1051e3eb86ab48d92b3f6ad5896cda5d81.tar.xz
zen-4e2efa1051e3eb86ab48d92b3f6ad5896cda5d81.zip
parallel work handle dispatch exception (#400)
- Bugfix: Wait for async threads if dispatching of work using ParallellWork throws exception
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/builds_cmd.cpp2361
-rw-r--r--src/zen/cmds/wipe_cmd.cpp4
-rw-r--r--src/zencore/filesystem.cpp5
-rw-r--r--src/zenutil/buildstoragecache.cpp4
-rw-r--r--src/zenutil/chunkedcontent.cpp52
-rw-r--r--src/zenutil/include/zenutil/buildstoragecache.h2
-rw-r--r--src/zenutil/include/zenutil/chunkedcontent.h4
-rw-r--r--src/zenutil/include/zenutil/parallellwork.h119
-rw-r--r--src/zenutil/include/zenutil/parallelwork.h71
-rw-r--r--src/zenutil/parallelwork.cpp192
-rw-r--r--src/zenutil/zenutil.cpp2
11 files changed, 1444 insertions, 1372 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index 117d0b291..815bb7597 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -30,7 +30,7 @@
#include <zenutil/filebuildstorage.h>
#include <zenutil/jupiter/jupiterbuildstorage.h>
#include <zenutil/jupiter/jupitersession.h>
-#include <zenutil/parallellwork.h>
+#include <zenutil/parallelwork.h>
#include <zenutil/workerpools.h>
#include <zenutil/zenserverprocess.h>
@@ -356,7 +356,7 @@ namespace {
std::atomic<uint64_t> DiscoveredItemCount = 0;
std::atomic<uint64_t> DeletedItemCount = 0;
std::atomic<uint64_t> DeletedByteCount = 0;
- ParallellWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag);
struct AsyncVisitor : public GetDirectoryContentVisitor
{
@@ -1749,7 +1749,7 @@ namespace {
const Oid& BuildId,
const IoHash& ChunkHash,
const std::uint64_t PreferredMultipartChunkSize,
- ParallellWork& Work,
+ ParallelWork& Work,
WorkerThreadPool& NetworkPool,
DownloadStatistics& DownloadStats,
std::function<void(IoBuffer&& Payload)>&& OnDownloadComplete)
@@ -1800,16 +1800,13 @@ namespace {
}
for (auto& WorkItem : WorkItems)
{
- Work.ScheduleWork(
- NetworkPool,
- [WorkItem = std::move(WorkItem)](std::atomic<bool>&) {
- ZEN_TRACE_CPU("DownloadLargeBlob_Work");
- if (!AbortFlag)
- {
- WorkItem();
- }
- },
- Work.DefaultErrorFunction());
+ Work.ScheduleWork(NetworkPool, [WorkItem = std::move(WorkItem)](std::atomic<bool>&) {
+ ZEN_TRACE_CPU("DownloadLargeBlob_Work");
+ if (!AbortFlag)
+ {
+ WorkItem();
+ }
+ });
}
}
@@ -1902,7 +1899,7 @@ namespace {
WorkerThreadPool& NetworkPool = GetNetworkPool();
WorkerThreadPool& VerifyPool = GetIOWorkerPool();
- ParallellWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag);
const std::filesystem::path TempFolder = ".zen-tmp";
@@ -1927,95 +1924,41 @@ namespace {
for (const IoHash& ChunkAttachment : ChunkAttachments)
{
- Work.ScheduleWork(
- NetworkPool,
- [&, ChunkAttachment](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("ValidateBuildPart_GetChunk");
-
- FilteredDownloadedBytesPerSecond.Start();
- DownloadLargeBlob(Storage,
- TempFolder,
- BuildId,
- ChunkAttachment,
- PreferredMultipartChunkSize,
- Work,
- NetworkPool,
- DownloadStats,
- [&, ChunkHash = ChunkAttachment](IoBuffer&& Payload) {
- Payload.SetContentType(ZenContentType::kCompressedBinary);
- if (!AbortFlag)
- {
- Work.ScheduleWork(
- VerifyPool,
- [&, Payload = std::move(Payload), ChunkHash](std::atomic<bool>&) mutable {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("ValidateBuildPart_Validate");
-
- if (DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount ==
- AttachmentsToVerifyCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
-
- FilteredVerifiedBytesPerSecond.Start();
-
- uint64_t CompressedSize;
- uint64_t DecompressedSize;
- ValidateBlob(std::move(Payload), ChunkHash, CompressedSize, DecompressedSize);
- ValidateStats.VerifiedAttachmentCount++;
- ValidateStats.VerifiedByteCount += DecompressedSize;
- if (ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount)
- {
- FilteredVerifiedBytesPerSecond.Stop();
- }
- }
- },
- Work.DefaultErrorFunction());
- }
- });
- }
- },
- Work.DefaultErrorFunction());
- }
-
- for (const IoHash& BlockAttachment : BlockAttachments)
- {
- Work.ScheduleWork(
- NetworkPool,
- [&, BlockAttachment](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("ValidateBuildPart_GetBlock");
+ Work.ScheduleWork(NetworkPool, [&, ChunkAttachment](std::atomic<bool>&) {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("ValidateBuildPart_GetChunk");
- FilteredDownloadedBytesPerSecond.Start();
- IoBuffer Payload = Storage.GetBuildBlob(BuildId, BlockAttachment);
- DownloadStats.DownloadedBlockCount++;
- DownloadStats.DownloadedBlockByteCount += Payload.GetSize();
- if (DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount == AttachmentsToVerifyCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
- if (!Payload)
- {
- throw std::runtime_error(fmt::format("Block attachment {} could not be found", BlockAttachment));
- }
- if (!AbortFlag)
- {
- Work.ScheduleWork(
- VerifyPool,
- [&, Payload = std::move(Payload), BlockAttachment](std::atomic<bool>&) mutable {
+ FilteredDownloadedBytesPerSecond.Start();
+ DownloadLargeBlob(
+ Storage,
+ TempFolder,
+ BuildId,
+ ChunkAttachment,
+ PreferredMultipartChunkSize,
+ Work,
+ NetworkPool,
+ DownloadStats,
+ [&, ChunkHash = ChunkAttachment](IoBuffer&& Payload) {
+ Payload.SetContentType(ZenContentType::kCompressedBinary);
+ if (!AbortFlag)
+ {
+ Work.ScheduleWork(VerifyPool, [&, Payload = std::move(Payload), ChunkHash](std::atomic<bool>&) mutable {
if (!AbortFlag)
{
- ZEN_TRACE_CPU("ValidateBuildPart_ValidateBlock");
+ ZEN_TRACE_CPU("ValidateBuildPart_Validate");
+
+ if (DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount ==
+ AttachmentsToVerifyCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
FilteredVerifiedBytesPerSecond.Start();
uint64_t CompressedSize;
uint64_t DecompressedSize;
- ValidateChunkBlock(std::move(Payload), BlockAttachment, CompressedSize, DecompressedSize);
+ ValidateBlob(std::move(Payload), ChunkHash, CompressedSize, DecompressedSize);
ValidateStats.VerifiedAttachmentCount++;
ValidateStats.VerifiedByteCount += DecompressedSize;
if (ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount)
@@ -2023,12 +1966,55 @@ namespace {
FilteredVerifiedBytesPerSecond.Stop();
}
}
- },
- Work.DefaultErrorFunction());
- }
+ });
+ }
+ });
+ }
+ });
+ }
+
+ for (const IoHash& BlockAttachment : BlockAttachments)
+ {
+ Work.ScheduleWork(NetworkPool, [&, BlockAttachment](std::atomic<bool>&) {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("ValidateBuildPart_GetBlock");
+
+ FilteredDownloadedBytesPerSecond.Start();
+ IoBuffer Payload = Storage.GetBuildBlob(BuildId, BlockAttachment);
+ DownloadStats.DownloadedBlockCount++;
+ DownloadStats.DownloadedBlockByteCount += Payload.GetSize();
+ if (DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount == AttachmentsToVerifyCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
}
- },
- Work.DefaultErrorFunction());
+ if (!Payload)
+ {
+ throw std::runtime_error(fmt::format("Block attachment {} could not be found", BlockAttachment));
+ }
+ if (!AbortFlag)
+ {
+ Work.ScheduleWork(VerifyPool, [&, Payload = std::move(Payload), BlockAttachment](std::atomic<bool>&) mutable {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("ValidateBuildPart_ValidateBlock");
+
+ FilteredVerifiedBytesPerSecond.Start();
+
+ uint64_t CompressedSize;
+ uint64_t DecompressedSize;
+ ValidateChunkBlock(std::move(Payload), BlockAttachment, CompressedSize, DecompressedSize);
+ ValidateStats.VerifiedAttachmentCount++;
+ ValidateStats.VerifiedByteCount += DecompressedSize;
+ if (ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount)
+ {
+ FilteredVerifiedBytesPerSecond.Stop();
+ }
+ }
+ });
+ }
+ }
+ });
}
Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) {
@@ -2286,7 +2272,7 @@ namespace {
FilteredRate FilteredGeneratedBytesPerSecond;
FilteredRate FilteredUploadedBytesPerSecond;
- ParallellWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag);
std::atomic<uint64_t> QueuedPendingBlocksForUpload = 0;
@@ -2297,139 +2283,134 @@ namespace {
break;
}
const std::vector<uint32_t>& ChunksInBlock = NewBlockChunks[BlockIndex];
- Work.ScheduleWork(
- GenerateBlobsPool,
- [&, BlockIndex](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("GenerateBuildBlocks_Generate");
-
- FilteredGeneratedBytesPerSecond.Start();
- // TODO: Convert ScheduleWork body to function
-
- Stopwatch GenerateTimer;
- CompressedBuffer CompressedBlock =
- GenerateBlock(Path, Content, Lookup, ChunksInBlock, OutBlocks.BlockDescriptions[BlockIndex], DiskStats);
- ZEN_CONSOLE_VERBOSE("Generated block {} ({}) containing {} chunks in {}",
- OutBlocks.BlockDescriptions[BlockIndex].BlockHash,
- NiceBytes(CompressedBlock.GetCompressedSize()),
- OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size(),
- NiceTimeSpanMs(GenerateTimer.GetElapsedTimeMs()));
+ Work.ScheduleWork(GenerateBlobsPool, [&, BlockIndex](std::atomic<bool>&) {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("GenerateBuildBlocks_Generate");
- OutBlocks.BlockSizes[BlockIndex] = CompressedBlock.GetCompressedSize();
- {
- CbObjectWriter Writer;
- Writer.AddString("createdBy", "zen");
- OutBlocks.BlockMetaDatas[BlockIndex] = Writer.Save();
- }
- GenerateBlocksStats.GeneratedBlockByteCount += OutBlocks.BlockSizes[BlockIndex];
- GenerateBlocksStats.GeneratedBlockCount++;
+ FilteredGeneratedBytesPerSecond.Start();
+ // TODO: Convert ScheduleWork body to function
- Lock.WithExclusiveLock([&]() {
- OutBlocks.BlockHashToBlockIndex.insert_or_assign(OutBlocks.BlockDescriptions[BlockIndex].BlockHash,
- BlockIndex);
- });
+ Stopwatch GenerateTimer;
+ CompressedBuffer CompressedBlock =
+ GenerateBlock(Path, Content, Lookup, ChunksInBlock, OutBlocks.BlockDescriptions[BlockIndex], DiskStats);
+ ZEN_CONSOLE_VERBOSE("Generated block {} ({}) containing {} chunks in {}",
+ OutBlocks.BlockDescriptions[BlockIndex].BlockHash,
+ NiceBytes(CompressedBlock.GetCompressedSize()),
+ OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size(),
+ NiceTimeSpanMs(GenerateTimer.GetElapsedTimeMs()));
- {
- std::span<const SharedBuffer> Segments = CompressedBlock.GetCompressed().GetSegments();
- ZEN_ASSERT(Segments.size() >= 2);
- OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]);
- }
+ OutBlocks.BlockSizes[BlockIndex] = CompressedBlock.GetCompressedSize();
+ {
+ CbObjectWriter Writer;
+ Writer.AddString("createdBy", "zen");
+ OutBlocks.BlockMetaDatas[BlockIndex] = Writer.Save();
+ }
+ GenerateBlocksStats.GeneratedBlockByteCount += OutBlocks.BlockSizes[BlockIndex];
+ GenerateBlocksStats.GeneratedBlockCount++;
- if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount)
- {
- FilteredGeneratedBytesPerSecond.Stop();
- }
+ Lock.WithExclusiveLock([&]() {
+ OutBlocks.BlockHashToBlockIndex.insert_or_assign(OutBlocks.BlockDescriptions[BlockIndex].BlockHash, BlockIndex);
+ });
- if (QueuedPendingBlocksForUpload.load() > 16)
- {
- std::span<const SharedBuffer> Segments = CompressedBlock.GetCompressed().GetSegments();
- ZEN_ASSERT(Segments.size() >= 2);
- OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]);
- }
- else
+ {
+ std::span<const SharedBuffer> Segments = CompressedBlock.GetCompressed().GetSegments();
+ ZEN_ASSERT(Segments.size() >= 2);
+ OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]);
+ }
+
+ if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount)
+ {
+ FilteredGeneratedBytesPerSecond.Stop();
+ }
+
+ if (QueuedPendingBlocksForUpload.load() > 16)
+ {
+ std::span<const SharedBuffer> Segments = CompressedBlock.GetCompressed().GetSegments();
+ ZEN_ASSERT(Segments.size() >= 2);
+ OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]);
+ }
+ else
+ {
+ if (!AbortFlag)
{
- if (!AbortFlag)
- {
- QueuedPendingBlocksForUpload++;
+ QueuedPendingBlocksForUpload++;
- Work.ScheduleWork(
- UploadBlocksPool,
- [&, BlockIndex, Payload = std::move(CompressedBlock)](std::atomic<bool>&) mutable {
- auto _ = MakeGuard([&QueuedPendingBlocksForUpload] { QueuedPendingBlocksForUpload--; });
- if (!AbortFlag)
+ Work.ScheduleWork(
+ UploadBlocksPool,
+ [&, BlockIndex, Payload = std::move(CompressedBlock)](std::atomic<bool>&) mutable {
+ auto _ = MakeGuard([&QueuedPendingBlocksForUpload] { QueuedPendingBlocksForUpload--; });
+ if (!AbortFlag)
+ {
+ if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount)
{
- if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount)
- {
- ZEN_TRACE_CPU("GenerateBuildBlocks_Save");
+ ZEN_TRACE_CPU("GenerateBuildBlocks_Save");
- FilteredUploadedBytesPerSecond.Stop();
- std::span<const SharedBuffer> Segments = Payload.GetCompressed().GetSegments();
- ZEN_ASSERT(Segments.size() >= 2);
- OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]);
- }
- else
- {
- ZEN_TRACE_CPU("GenerateBuildBlocks_Upload");
+ FilteredUploadedBytesPerSecond.Stop();
+ std::span<const SharedBuffer> Segments = Payload.GetCompressed().GetSegments();
+ ZEN_ASSERT(Segments.size() >= 2);
+ OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]);
+ }
+ else
+ {
+ ZEN_TRACE_CPU("GenerateBuildBlocks_Upload");
- FilteredUploadedBytesPerSecond.Start();
- // TODO: Convert ScheduleWork body to function
+ FilteredUploadedBytesPerSecond.Start();
+ // TODO: Convert ScheduleWork body to function
- const CbObject BlockMetaData =
- BuildChunkBlockDescription(OutBlocks.BlockDescriptions[BlockIndex],
- OutBlocks.BlockMetaDatas[BlockIndex]);
+ const CbObject BlockMetaData =
+ BuildChunkBlockDescription(OutBlocks.BlockDescriptions[BlockIndex],
+ OutBlocks.BlockMetaDatas[BlockIndex]);
- const IoHash& BlockHash = OutBlocks.BlockDescriptions[BlockIndex].BlockHash;
- const uint64_t CompressedBlockSize = Payload.GetCompressedSize();
+ const IoHash& BlockHash = OutBlocks.BlockDescriptions[BlockIndex].BlockHash;
+ const uint64_t CompressedBlockSize = Payload.GetCompressedSize();
- if (Storage.BuildCacheStorage)
- {
- Storage.BuildCacheStorage->PutBuildBlob(BuildId,
- BlockHash,
- ZenContentType::kCompressedBinary,
- Payload.GetCompressed());
- }
+ if (Storage.BuildCacheStorage)
+ {
+ Storage.BuildCacheStorage->PutBuildBlob(BuildId,
+ BlockHash,
+ ZenContentType::kCompressedBinary,
+ Payload.GetCompressed());
+ }
- Storage.BuildStorage->PutBuildBlob(BuildId,
- BlockHash,
- ZenContentType::kCompressedBinary,
- std::move(Payload).GetCompressed());
- UploadStats.BlocksBytes += CompressedBlockSize;
+ Storage.BuildStorage->PutBuildBlob(BuildId,
+ BlockHash,
+ ZenContentType::kCompressedBinary,
+ std::move(Payload).GetCompressed());
+ UploadStats.BlocksBytes += CompressedBlockSize;
- ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks",
- BlockHash,
- NiceBytes(CompressedBlockSize),
- OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size());
+ ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks",
+ BlockHash,
+ NiceBytes(CompressedBlockSize),
+ OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size());
- if (Storage.BuildCacheStorage)
- {
- Storage.BuildCacheStorage->PutBlobMetadatas(BuildId,
- std::vector<IoHash>({BlockHash}),
- std::vector<CbObject>({BlockMetaData}));
- }
+ if (Storage.BuildCacheStorage)
+ {
+ Storage.BuildCacheStorage->PutBlobMetadatas(BuildId,
+ std::vector<IoHash>({BlockHash}),
+ std::vector<CbObject>({BlockMetaData}));
+ }
- Storage.BuildStorage->PutBlockMetadata(BuildId, BlockHash, BlockMetaData);
- ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})",
- BlockHash,
- NiceBytes(BlockMetaData.GetSize()));
+ Storage.BuildStorage->PutBlockMetadata(BuildId, BlockHash, BlockMetaData);
+ ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})",
+ BlockHash,
+ NiceBytes(BlockMetaData.GetSize()));
- OutBlocks.MetaDataHasBeenUploaded[BlockIndex] = true;
+ OutBlocks.MetaDataHasBeenUploaded[BlockIndex] = true;
- UploadStats.BlocksBytes += BlockMetaData.GetSize();
- UploadStats.BlockCount++;
- if (UploadStats.BlockCount == NewBlockCount)
- {
- FilteredUploadedBytesPerSecond.Stop();
- }
+ UploadStats.BlocksBytes += BlockMetaData.GetSize();
+ UploadStats.BlockCount++;
+ if (UploadStats.BlockCount == NewBlockCount)
+ {
+ FilteredUploadedBytesPerSecond.Stop();
}
}
- },
- Work.DefaultErrorFunction());
- }
+ }
+ });
}
}
- },
- Work.DefaultErrorFunction());
+ }
+ });
}
Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) {
@@ -2491,7 +2472,7 @@ namespace {
FilteredRate FilteredCompressedBytesPerSecond;
FilteredRate FilteredUploadedBytesPerSecond;
- ParallellWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag);
std::atomic<size_t> UploadedBlockSize = 0;
std::atomic<size_t> UploadedBlockCount = 0;
@@ -2606,8 +2587,7 @@ namespace {
FilteredUploadedBytesPerSecond.Stop();
}
}
- },
- Work.DefaultErrorFunction());
+ });
};
auto AsyncUploadLooseChunk = [&](const IoHash& RawHash, uint64_t RawSize, CompositeBuffer&& Payload) {
@@ -2658,16 +2638,13 @@ namespace {
});
for (auto& WorkPart : MultipartWork)
{
- Work.ScheduleWork(
- UploadChunkPool,
- [Work = std::move(WorkPart)](std::atomic<bool>&) {
- ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart_Work");
- if (!AbortFlag)
- {
- Work();
- }
- },
- Work.DefaultErrorFunction());
+ Work.ScheduleWork(UploadChunkPool, [Work = std::move(WorkPart)](std::atomic<bool>&) {
+ ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart_Work");
+ if (!AbortFlag)
+ {
+ Work();
+ }
+ });
}
ZEN_CONSOLE_VERBOSE("Uploaded multipart chunk {} ({})", RawHash, NiceBytes(PayloadSize));
}
@@ -2687,8 +2664,7 @@ namespace {
}
}
}
- },
- Work.DefaultErrorFunction());
+ });
};
std::vector<size_t> GenerateBlockIndexes;
@@ -2704,59 +2680,56 @@ namespace {
const IoHash& BlockHash = NewBlocks.BlockDescriptions[BlockIndex].BlockHash;
if (!AbortFlag)
{
- Work.ScheduleWork(
- ReadChunkPool,
- [&, BlockIndex](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UploadPartBlobs_GenerateBlock");
+ Work.ScheduleWork(ReadChunkPool, [&, BlockIndex](std::atomic<bool>&) {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("UploadPartBlobs_GenerateBlock");
- FilteredGenerateBlockBytesPerSecond.Start();
+ FilteredGenerateBlockBytesPerSecond.Start();
- Stopwatch GenerateTimer;
- CompositeBuffer Payload;
- if (NewBlocks.BlockHeaders[BlockIndex])
- {
- Payload = RebuildBlock(Path,
- Content,
- Lookup,
- std::move(NewBlocks.BlockHeaders[BlockIndex]),
- NewBlockChunks[BlockIndex],
- DiskStats)
- .GetCompressed();
- }
- else
+ Stopwatch GenerateTimer;
+ CompositeBuffer Payload;
+ if (NewBlocks.BlockHeaders[BlockIndex])
+ {
+ Payload = RebuildBlock(Path,
+ Content,
+ Lookup,
+ std::move(NewBlocks.BlockHeaders[BlockIndex]),
+ NewBlockChunks[BlockIndex],
+ DiskStats)
+ .GetCompressed();
+ }
+ else
+ {
+ ChunkBlockDescription BlockDescription;
+ CompressedBuffer CompressedBlock =
+ GenerateBlock(Path, Content, Lookup, NewBlockChunks[BlockIndex], BlockDescription, DiskStats);
+ if (!CompressedBlock)
{
- ChunkBlockDescription BlockDescription;
- CompressedBuffer CompressedBlock =
- GenerateBlock(Path, Content, Lookup, NewBlockChunks[BlockIndex], BlockDescription, DiskStats);
- if (!CompressedBlock)
- {
- throw std::runtime_error(fmt::format("Failed generating block {}", BlockHash));
- }
- ZEN_ASSERT(BlockDescription.BlockHash == BlockHash);
- Payload = std::move(CompressedBlock).GetCompressed();
+ throw std::runtime_error(fmt::format("Failed generating block {}", BlockHash));
}
+ ZEN_ASSERT(BlockDescription.BlockHash == BlockHash);
+ Payload = std::move(CompressedBlock).GetCompressed();
+ }
- GeneratedBlockByteCount += NewBlocks.BlockSizes[BlockIndex];
- GeneratedBlockCount++;
- if (GeneratedBlockCount == GenerateBlockIndexes.size())
- {
- FilteredGenerateBlockBytesPerSecond.Stop();
- }
- ZEN_CONSOLE_VERBOSE("{} block {} ({}) containing {} chunks in {}",
- NewBlocks.BlockHeaders[BlockIndex] ? "Regenerated" : "Generated",
- NewBlocks.BlockDescriptions[BlockIndex].BlockHash,
- NiceBytes(NewBlocks.BlockSizes[BlockIndex]),
- NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size(),
- NiceTimeSpanMs(GenerateTimer.GetElapsedTimeMs()));
- if (!AbortFlag)
- {
- AsyncUploadBlock(BlockIndex, BlockHash, std::move(Payload), QueuedPendingInMemoryBlocksForUpload);
- }
+ GeneratedBlockByteCount += NewBlocks.BlockSizes[BlockIndex];
+ GeneratedBlockCount++;
+ if (GeneratedBlockCount == GenerateBlockIndexes.size())
+ {
+ FilteredGenerateBlockBytesPerSecond.Stop();
}
- },
- Work.DefaultErrorFunction());
+ ZEN_CONSOLE_VERBOSE("{} block {} ({}) containing {} chunks in {}",
+ NewBlocks.BlockHeaders[BlockIndex] ? "Regenerated" : "Generated",
+ NewBlocks.BlockDescriptions[BlockIndex].BlockHash,
+ NiceBytes(NewBlocks.BlockSizes[BlockIndex]),
+ NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size(),
+ NiceTimeSpanMs(GenerateTimer.GetElapsedTimeMs()));
+ if (!AbortFlag)
+ {
+ AsyncUploadBlock(BlockIndex, BlockHash, std::move(Payload), QueuedPendingInMemoryBlocksForUpload);
+ }
+ }
+ });
}
}
@@ -2764,35 +2737,32 @@ namespace {
for (const uint32_t LooseChunkOrderIndex : LooseChunkOrderIndexes)
{
const uint32_t ChunkIndex = LooseChunkIndexes[LooseChunkOrderIndex];
- Work.ScheduleWork(
- ReadChunkPool,
- [&, ChunkIndex](std::atomic<bool>&) {
+ Work.ScheduleWork(ReadChunkPool, [&, ChunkIndex](std::atomic<bool>&) {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("UploadPartBlobs_CompressChunk");
+
+ FilteredCompressedBytesPerSecond.Start();
+ Stopwatch CompressTimer;
+ CompositeBuffer Payload =
+ CompressChunk(Path, Content, Lookup, ChunkIndex, ZenTempChunkFolderPath(ZenFolderPath), LooseChunksStats);
+ ZEN_CONSOLE_VERBOSE("Compressed chunk {} ({} -> {}) in {}",
+ Content.ChunkedContent.ChunkHashes[ChunkIndex],
+ NiceBytes(Content.ChunkedContent.ChunkRawSizes[ChunkIndex]),
+ NiceBytes(Payload.GetSize()),
+ NiceTimeSpanMs(CompressTimer.GetElapsedTimeMs()));
+ const uint64_t ChunkRawSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex];
+ UploadStats.ReadFromDiskBytes += ChunkRawSize;
+ if (LooseChunksStats.CompressedChunkCount == LooseChunkOrderIndexes.size())
+ {
+ FilteredCompressedBytesPerSecond.Stop();
+ }
if (!AbortFlag)
{
- ZEN_TRACE_CPU("UploadPartBlobs_CompressChunk");
-
- FilteredCompressedBytesPerSecond.Start();
- Stopwatch CompressTimer;
- CompositeBuffer Payload =
- CompressChunk(Path, Content, Lookup, ChunkIndex, ZenTempChunkFolderPath(ZenFolderPath), LooseChunksStats);
- ZEN_CONSOLE_VERBOSE("Compressed chunk {} ({} -> {}) in {}",
- Content.ChunkedContent.ChunkHashes[ChunkIndex],
- NiceBytes(Content.ChunkedContent.ChunkRawSizes[ChunkIndex]),
- NiceBytes(Payload.GetSize()),
- NiceTimeSpanMs(CompressTimer.GetElapsedTimeMs()));
- const uint64_t ChunkRawSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex];
- UploadStats.ReadFromDiskBytes += ChunkRawSize;
- if (LooseChunksStats.CompressedChunkCount == LooseChunkOrderIndexes.size())
- {
- FilteredCompressedBytesPerSecond.Stop();
- }
- if (!AbortFlag)
- {
- AsyncUploadLooseChunk(Content.ChunkedContent.ChunkHashes[ChunkIndex], ChunkRawSize, std::move(Payload));
- }
+ AsyncUploadLooseChunk(Content.ChunkedContent.ChunkHashes[ChunkIndex], ChunkRawSize, std::move(Payload));
}
- },
- Work.DefaultErrorFunction());
+ }
+ });
}
Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) {
@@ -4045,7 +4015,7 @@ namespace {
WorkerThreadPool& VerifyPool = GetIOWorkerPool();
- ParallellWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag);
const uint32_t PathCount = gsl::narrow<uint32_t>(Content.Paths.size());
@@ -4173,11 +4143,20 @@ namespace {
VerifyFolderStats.FilesVerified++;
}
},
- [&, PathIndex](const std::exception& Ex, std::atomic<bool>&) {
+ [&, PathIndex](std::exception_ptr Ex, std::atomic<bool>&) {
+ std::string Description;
+ try
+ {
+ std::rethrow_exception(Ex);
+ }
+ catch (const std::exception& Ex)
+ {
+ Description = Ex.what();
+ }
ErrorLock.WithExclusiveLock([&]() {
Errors.push_back(fmt::format("Failed verifying file '{}'. Reason: {}",
(Path / Content.Paths[PathIndex]).make_preferred(),
- Ex.what()));
+ Description));
});
VerifyFolderStats.FilesFailed++;
});
@@ -4405,7 +4384,7 @@ namespace {
const ChunkedFolderContent& RemoteContent,
const ChunkedContentLookup& Lookup,
std::span<const uint32_t> RemoteSequenceIndexes,
- ParallellWork& Work,
+ ParallelWork& Work,
WorkerThreadPool& VerifyPool)
{
if (RemoteSequenceIndexes.empty())
@@ -4416,21 +4395,18 @@ namespace {
for (uint32_t RemoteSequenceIndexOffset = 1; RemoteSequenceIndexOffset < RemoteSequenceIndexes.size(); RemoteSequenceIndexOffset++)
{
const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset];
- Work.ScheduleWork(
- VerifyPool,
- [&RemoteContent, &Lookup, TargetFolder, RemoteSequenceIndex](std::atomic<bool>&) {
+ Work.ScheduleWork(VerifyPool, [&RemoteContent, &Lookup, TargetFolder, RemoteSequenceIndex](std::atomic<bool>&) {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("VerifyAndCompleteChunkSequenceAsync");
+ VerifySequence(TargetFolder, RemoteContent, Lookup, RemoteSequenceIndex);
if (!AbortFlag)
{
- ZEN_TRACE_CPU("VerifyAndCompleteChunkSequenceAsync");
- VerifySequence(TargetFolder, RemoteContent, Lookup, RemoteSequenceIndex);
- if (!AbortFlag)
- {
- const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
- FinalizeChunkSequence(TargetFolder, SequenceRawHash);
- }
+ const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ FinalizeChunkSequence(TargetFolder, SequenceRawHash);
}
- },
- Work.DefaultErrorFunction());
+ }
+ });
}
const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[0];
@@ -4480,7 +4456,7 @@ namespace {
const ChunkedContentLookup& Lookup,
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
const BlockWriteOps& Ops,
- ParallellWork& Work,
+ ParallelWork& Work,
WorkerThreadPool& VerifyPool,
DiskStatistics& DiskStats)
{
@@ -4672,7 +4648,7 @@ namespace {
const ChunkedFolderContent& RemoteContent,
const ChunkBlockDescription& BlockDescription,
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
- ParallellWork& Work,
+ ParallelWork& Work,
WorkerThreadPool& VerifyPool,
CompositeBuffer&& BlockBuffer,
const ChunkedContentLookup& Lookup,
@@ -4745,7 +4721,7 @@ namespace {
const ChunkedFolderContent& RemoteContent,
const ChunkBlockDescription& BlockDescription,
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
- ParallellWork& Work,
+ ParallelWork& Work,
WorkerThreadPool& VerifyPool,
CompositeBuffer&& PartialBlockBuffer,
uint32_t FirstIncludedBlockChunkIndex,
@@ -4978,7 +4954,7 @@ namespace {
const ChunkedContentLookup& RemoteLookup,
uint32_t RemoteChunkIndex,
std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>&& ChunkTargetPtrs,
- ParallellWork& Work,
+ ParallelWork& Work,
WorkerThreadPool& WritePool,
IoBuffer&& Payload,
std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
@@ -5098,8 +5074,7 @@ namespace {
}
}
}
- },
- Work.DefaultErrorFunction());
+ });
};
bool ReadStateFile(const std::filesystem::path& StateFilePath,
@@ -5167,36 +5142,35 @@ namespace {
ProgressBar ProgressBar(ProgressMode, "Check Files");
- ParallellWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag);
std::atomic<uint64_t> CompletedPathCount = 0;
uint32_t PathIndex = 0;
while (PathIndex < PathCount)
{
uint32_t PathRangeCount = Min(128u, PathCount - PathIndex);
- Work.ScheduleWork(
- GetIOWorkerPool(),
- [PathIndex, PathRangeCount, &PathsToCheck, &Path, &Result, &CompletedPathCount, &LocalFolderScanStats](
- std::atomic<bool>&) {
- for (uint32_t PathRangeIndex = PathIndex; PathRangeIndex < PathIndex + PathRangeCount; PathRangeIndex++)
- {
- const std::filesystem::path& FilePath = PathsToCheck[PathRangeIndex];
- std::filesystem::path LocalFilePath = (Path / FilePath).make_preferred();
- if (TryGetFileProperties(LocalFilePath,
- Result.RawSizes[PathRangeIndex],
- Result.ModificationTicks[PathRangeIndex],
- Result.Attributes[PathRangeIndex]))
- {
- Result.Paths[PathRangeIndex] = std::move(FilePath);
- LocalFolderScanStats.FoundFileCount++;
- LocalFolderScanStats.FoundFileByteCount += Result.RawSizes[PathRangeIndex];
- LocalFolderScanStats.AcceptedFileCount++;
- LocalFolderScanStats.AcceptedFileByteCount += Result.RawSizes[PathRangeIndex];
- }
- CompletedPathCount++;
- }
- },
- Work.DefaultErrorFunction());
+ Work.ScheduleWork(GetIOWorkerPool(),
+ [PathIndex, PathRangeCount, &PathsToCheck, &Path, &Result, &CompletedPathCount, &LocalFolderScanStats](
+ std::atomic<bool>&) {
+ for (uint32_t PathRangeIndex = PathIndex; PathRangeIndex < PathIndex + PathRangeCount;
+ PathRangeIndex++)
+ {
+ const std::filesystem::path& FilePath = PathsToCheck[PathRangeIndex];
+ std::filesystem::path LocalFilePath = (Path / FilePath).make_preferred();
+ if (TryGetFileProperties(LocalFilePath,
+ Result.RawSizes[PathRangeIndex],
+ Result.ModificationTicks[PathRangeIndex],
+ Result.Attributes[PathRangeIndex]))
+ {
+ Result.Paths[PathRangeIndex] = std::move(FilePath);
+ LocalFolderScanStats.FoundFileCount++;
+ LocalFolderScanStats.FoundFileByteCount += Result.RawSizes[PathRangeIndex];
+ LocalFolderScanStats.AcceptedFileCount++;
+ LocalFolderScanStats.AcceptedFileByteCount += Result.RawSizes[PathRangeIndex];
+ }
+ CompletedPathCount++;
+ }
+ });
PathIndex += PathRangeCount;
}
Work.Wait(200, [&](bool, ptrdiff_t) {
@@ -5831,8 +5805,8 @@ namespace {
WorkerThreadPool& NetworkPool = GetNetworkPool();
WorkerThreadPool& WritePool = GetIOWorkerPool();
- ProgressBar WriteProgressBar(ProgressMode, PrimeCacheOnly ? "Downloading" : "Writing");
- ParallellWork Work(AbortFlag);
+ ProgressBar WriteProgressBar(ProgressMode, PrimeCacheOnly ? "Downloading" : "Writing");
+ ParallelWork Work(AbortFlag);
struct LooseChunkHashWorkData
{
@@ -6199,41 +6173,38 @@ namespace {
}
if (!PrimeCacheOnly)
{
- Work.ScheduleWork(
- WritePool,
- [&, ScavengeOpIndex](std::atomic<bool>&) mutable {
- if (!AbortFlag)
- {
- const ScavengeCopyOperation& ScavengeOp = ScavengeCopyOperations[ScavengeOpIndex];
- const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengeOp.ScavengedContentIndex];
- const std::filesystem::path ScavengedPath = ScavengedContent.Paths[ScavengeOp.ScavengedPathIndex];
+ Work.ScheduleWork(WritePool, [&, ScavengeOpIndex](std::atomic<bool>&) mutable {
+ if (!AbortFlag)
+ {
+ const ScavengeCopyOperation& ScavengeOp = ScavengeCopyOperations[ScavengeOpIndex];
+ const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengeOp.ScavengedContentIndex];
+ const std::filesystem::path ScavengedPath = ScavengedContent.Paths[ScavengeOp.ScavengedPathIndex];
- const std::filesystem::path ScavengedFilePath =
- (ScavengedPaths[ScavengeOp.ScavengedContentIndex] / ScavengedPath).make_preferred();
- ZEN_ASSERT_SLOW(FileSizeFromPath(ScavengedFilePath) == ScavengeOp.RawSize);
+ const std::filesystem::path ScavengedFilePath =
+ (ScavengedPaths[ScavengeOp.ScavengedContentIndex] / ScavengedPath).make_preferred();
+ ZEN_ASSERT_SLOW(FileSizeFromPath(ScavengedFilePath) == ScavengeOp.RawSize);
- const IoHash& RemoteSequenceRawHash =
- RemoteContent.ChunkedContent.SequenceRawHashes[ScavengeOp.RemoteSequenceIndex];
- const std::filesystem::path TempFilePath =
- GetTempChunkedSequenceFileName(CacheFolderPath, RemoteSequenceRawHash);
+ const IoHash& RemoteSequenceRawHash =
+ RemoteContent.ChunkedContent.SequenceRawHashes[ScavengeOp.RemoteSequenceIndex];
+ const std::filesystem::path TempFilePath =
+ GetTempChunkedSequenceFileName(CacheFolderPath, RemoteSequenceRawHash);
- CopyFile(ScavengedFilePath, TempFilePath, {.EnableClone = false});
+ CopyFile(ScavengedFilePath, TempFilePath, {.EnableClone = false});
- DiskStats.WriteCount++;
- DiskStats.WriteByteCount += ScavengeOp.RawSize;
+ DiskStats.WriteCount++;
+ DiskStats.WriteByteCount += ScavengeOp.RawSize;
- const std::filesystem::path CacheFilePath =
- GetFinalChunkedSequenceFileName(CacheFolderPath, RemoteSequenceRawHash);
- RenameFile(TempFilePath, CacheFilePath);
+ const std::filesystem::path CacheFilePath =
+ GetFinalChunkedSequenceFileName(CacheFolderPath, RemoteSequenceRawHash);
+ RenameFile(TempFilePath, CacheFilePath);
- WritePartsComplete++;
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
- }
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
}
- },
- Work.DefaultErrorFunction());
+ }
+ });
}
}
@@ -6256,287 +6227,279 @@ namespace {
continue;
}
- Work.ScheduleWork(
- WritePool,
- [&, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) mutable {
- if (!AbortFlag)
+ Work.ScheduleWork(WritePool, [&, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) mutable {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("UpdateFolder_ReadPreDownloaded");
+ std::filesystem::path ExistingCompressedChunkPath;
+ if (!PrimeCacheOnly)
{
- ZEN_TRACE_CPU("UpdateFolder_ReadPreDownloaded");
- std::filesystem::path ExistingCompressedChunkPath;
- if (!PrimeCacheOnly)
+ const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+ std::filesystem::path CompressedChunkPath = ZenTempDownloadFolderPath(ZenFolderPath) / ChunkHash.ToHexString();
+ if (IsFile(CompressedChunkPath))
{
- const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
- std::filesystem::path CompressedChunkPath =
- ZenTempDownloadFolderPath(ZenFolderPath) / ChunkHash.ToHexString();
- if (IsFile(CompressedChunkPath))
+ IoBuffer ExistingCompressedPart = IoBufferBuilder::MakeFromFile(ExistingCompressedChunkPath);
+ if (ExistingCompressedPart)
{
- IoBuffer ExistingCompressedPart = IoBufferBuilder::MakeFromFile(ExistingCompressedChunkPath);
- if (ExistingCompressedPart)
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (CompressedBuffer::ValidateCompressedHeader(ExistingCompressedPart, RawHash, RawSize))
{
- IoHash RawHash;
- uint64_t RawSize;
- if (CompressedBuffer::ValidateCompressedHeader(ExistingCompressedPart, RawHash, RawSize))
- {
- DownloadStats.RequestsCompleteCount++;
- if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
- ExistingCompressedChunkPath = std::move(CompressedChunkPath);
- }
- else
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
{
- std::error_code DummyEc;
- RemoveFile(CompressedChunkPath, DummyEc);
+ FilteredDownloadedBytesPerSecond.Stop();
}
+ ExistingCompressedChunkPath = std::move(CompressedChunkPath);
+ }
+ else
+ {
+ std::error_code DummyEc;
+ RemoveFile(CompressedChunkPath, DummyEc);
}
}
}
- if (!AbortFlag)
+ }
+ if (!AbortFlag)
+ {
+ if (!ExistingCompressedChunkPath.empty())
{
- if (!ExistingCompressedChunkPath.empty())
- {
- Work.ScheduleWork(
- WritePool,
- [&Path,
- &ZenFolderPath,
- &RemoteContent,
- &RemoteLookup,
- &CacheFolderPath,
- &SequenceIndexChunksLeftToWriteCounters,
- &Work,
- &WritePool,
- &DiskStats,
- &WriteChunkStats,
- &WritePartsComplete,
- TotalPartWriteCount,
- &FilteredWrittenBytesPerSecond,
- RemoteChunkIndex,
- ChunkTargetPtrs,
- CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>&) mutable {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_WritePreDownloaded");
+ Work.ScheduleWork(
+ WritePool,
+ [&Path,
+ &ZenFolderPath,
+ &RemoteContent,
+ &RemoteLookup,
+ &CacheFolderPath,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &Work,
+ &WritePool,
+ &DiskStats,
+ &WriteChunkStats,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ &FilteredWrittenBytesPerSecond,
+ RemoteChunkIndex,
+ ChunkTargetPtrs,
+ CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>&) mutable {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("UpdateFolder_WritePreDownloaded");
- FilteredWrittenBytesPerSecond.Start();
+ FilteredWrittenBytesPerSecond.Start();
- const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+ const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
- IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath);
- if (!CompressedPart)
- {
- throw std::runtime_error(
- fmt::format("Could not open dowloaded compressed chunk {} from {}",
- ChunkHash,
- CompressedChunkPath));
- }
+ IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath);
+ if (!CompressedPart)
+ {
+ throw std::runtime_error(fmt::format("Could not open dowloaded compressed chunk {} from {}",
+ ChunkHash,
+ CompressedChunkPath));
+ }
- std::filesystem::path TargetFolder = ZenTempCacheFolderPath(ZenFolderPath);
- bool NeedHashVerify = WriteCompressedChunk(TargetFolder,
- RemoteContent,
- RemoteLookup,
- ChunkHash,
- ChunkTargetPtrs,
- std::move(CompressedPart),
- DiskStats);
- WritePartsComplete++;
+ std::filesystem::path TargetFolder = ZenTempCacheFolderPath(ZenFolderPath);
+ bool NeedHashVerify = WriteCompressedChunk(TargetFolder,
+ RemoteContent,
+ RemoteLookup,
+ ChunkHash,
+ ChunkTargetPtrs,
+ std::move(CompressedPart),
+ DiskStats);
+ WritePartsComplete++;
- if (!AbortFlag)
+ if (!AbortFlag)
+ {
+ if (WritePartsComplete == TotalPartWriteCount)
{
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
- }
+ FilteredWrittenBytesPerSecond.Stop();
+ }
- RemoveFileWithRetry(CompressedChunkPath);
+ RemoveFileWithRetry(CompressedChunkPath);
- std::vector<uint32_t> CompletedSequences =
- CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters);
- if (NeedHashVerify)
- {
- VerifyAndCompleteChunkSequencesAsync(TargetFolder,
- RemoteContent,
- RemoteLookup,
- CompletedSequences,
- Work,
- WritePool);
- }
- else
- {
- FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences);
- }
+ std::vector<uint32_t> CompletedSequences =
+ CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters);
+ if (NeedHashVerify)
+ {
+ VerifyAndCompleteChunkSequencesAsync(TargetFolder,
+ RemoteContent,
+ RemoteLookup,
+ CompletedSequences,
+ Work,
+ WritePool);
+ }
+ else
+ {
+ FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences);
}
}
- },
- Work.DefaultErrorFunction());
- }
- else
- {
- Work.ScheduleWork(
- NetworkPool,
- [&Path,
- &ZenFolderPath,
- &Storage,
- BuildId,
- &PrimeCacheOnly,
- &RemoteContent,
- &RemoteLookup,
- &ExistsResult,
- &SequenceIndexChunksLeftToWriteCounters,
- &Work,
- &WritePool,
- &NetworkPool,
- &DiskStats,
- &WriteChunkStats,
- &WritePartsComplete,
- TotalPartWriteCount,
- TotalRequestCount,
- &FilteredDownloadedBytesPerSecond,
- &FilteredWrittenBytesPerSecond,
- LargeAttachmentSize,
- PreferredMultipartChunkSize,
- RemoteChunkIndex,
- ChunkTargetPtrs,
- &DownloadStats](std::atomic<bool>&) mutable {
- if (!AbortFlag)
+ }
+ });
+ }
+ else
+ {
+ Work.ScheduleWork(
+ NetworkPool,
+ [&Path,
+ &ZenFolderPath,
+ &Storage,
+ BuildId,
+ &PrimeCacheOnly,
+ &RemoteContent,
+ &RemoteLookup,
+ &ExistsResult,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &Work,
+ &WritePool,
+ &NetworkPool,
+ &DiskStats,
+ &WriteChunkStats,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ TotalRequestCount,
+ &FilteredDownloadedBytesPerSecond,
+ &FilteredWrittenBytesPerSecond,
+ LargeAttachmentSize,
+ PreferredMultipartChunkSize,
+ RemoteChunkIndex,
+ ChunkTargetPtrs,
+ &DownloadStats](std::atomic<bool>&) mutable {
+ if (!AbortFlag)
+ {
+ const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+ FilteredDownloadedBytesPerSecond.Start();
+ IoBuffer BuildBlob;
+ const bool ExistsInCache =
+ Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash);
+ if (ExistsInCache)
{
- const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
- FilteredDownloadedBytesPerSecond.Start();
- IoBuffer BuildBlob;
- const bool ExistsInCache =
- Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash);
- if (ExistsInCache)
+ BuildBlob = Storage.BuildCacheStorage->GetBuildBlob(BuildId, ChunkHash);
+ }
+ if (BuildBlob)
+ {
+ uint64_t BlobSize = BuildBlob.GetSize();
+ DownloadStats.DownloadedChunkCount++;
+ DownloadStats.DownloadedChunkByteCount += BlobSize;
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
{
- BuildBlob = Storage.BuildCacheStorage->GetBuildBlob(BuildId, ChunkHash);
+ FilteredDownloadedBytesPerSecond.Stop();
}
- if (BuildBlob)
+ AsyncWriteDownloadedChunk(ZenFolderPath,
+ RemoteContent,
+ RemoteLookup,
+ RemoteChunkIndex,
+ std::move(ChunkTargetPtrs),
+ Work,
+ WritePool,
+ std::move(BuildBlob),
+ SequenceIndexChunksLeftToWriteCounters,
+ WritePartsComplete,
+ TotalPartWriteCount,
+ FilteredWrittenBytesPerSecond,
+ DiskStats);
+ }
+ else
+ {
+ if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize)
{
- uint64_t BlobSize = BuildBlob.GetSize();
- DownloadStats.DownloadedChunkCount++;
- DownloadStats.DownloadedChunkByteCount += BlobSize;
- DownloadStats.RequestsCompleteCount++;
- if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
- AsyncWriteDownloadedChunk(ZenFolderPath,
- RemoteContent,
- RemoteLookup,
- RemoteChunkIndex,
- std::move(ChunkTargetPtrs),
- Work,
- WritePool,
- std::move(BuildBlob),
- SequenceIndexChunksLeftToWriteCounters,
- WritePartsComplete,
- TotalPartWriteCount,
- FilteredWrittenBytesPerSecond,
- DiskStats);
+ ZEN_TRACE_CPU("UpdateFolder_GetLargeChunk");
+ DownloadLargeBlob(
+ *Storage.BuildStorage,
+ ZenTempDownloadFolderPath(ZenFolderPath),
+ BuildId,
+ ChunkHash,
+ PreferredMultipartChunkSize,
+ Work,
+ NetworkPool,
+ DownloadStats,
+ [&, TotalPartWriteCount, TotalRequestCount, RemoteChunkIndex, ChunkTargetPtrs](
+ IoBuffer&& Payload) mutable {
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ if (Payload && Storage.BuildCacheStorage)
+ {
+ Storage.BuildCacheStorage->PutBuildBlob(
+ BuildId,
+ ChunkHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(SharedBuffer(Payload)));
+ }
+ if (!PrimeCacheOnly)
+ {
+ if (!AbortFlag)
+ {
+ AsyncWriteDownloadedChunk(ZenFolderPath,
+ RemoteContent,
+ RemoteLookup,
+ RemoteChunkIndex,
+ std::move(ChunkTargetPtrs),
+ Work,
+ WritePool,
+ std::move(Payload),
+ SequenceIndexChunksLeftToWriteCounters,
+ WritePartsComplete,
+ TotalPartWriteCount,
+ FilteredWrittenBytesPerSecond,
+ DiskStats);
+ }
+ }
+ });
}
else
{
- if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize)
+ ZEN_TRACE_CPU("UpdateFolder_GetChunk");
+ BuildBlob = Storage.BuildStorage->GetBuildBlob(BuildId, ChunkHash);
+ if (BuildBlob && Storage.BuildCacheStorage)
{
- ZEN_TRACE_CPU("UpdateFolder_GetLargeChunk");
- DownloadLargeBlob(
- *Storage.BuildStorage,
- ZenTempDownloadFolderPath(ZenFolderPath),
- BuildId,
- ChunkHash,
- PreferredMultipartChunkSize,
- Work,
- NetworkPool,
- DownloadStats,
- [&, TotalPartWriteCount, TotalRequestCount, RemoteChunkIndex, ChunkTargetPtrs](
- IoBuffer&& Payload) mutable {
- DownloadStats.RequestsCompleteCount++;
- if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
- if (Payload && Storage.BuildCacheStorage)
- {
- Storage.BuildCacheStorage->PutBuildBlob(
- BuildId,
- ChunkHash,
- ZenContentType::kCompressedBinary,
- CompositeBuffer(SharedBuffer(Payload)));
- }
- if (!PrimeCacheOnly)
- {
- if (!AbortFlag)
- {
- AsyncWriteDownloadedChunk(ZenFolderPath,
- RemoteContent,
- RemoteLookup,
- RemoteChunkIndex,
- std::move(ChunkTargetPtrs),
- Work,
- WritePool,
- std::move(Payload),
- SequenceIndexChunksLeftToWriteCounters,
- WritePartsComplete,
- TotalPartWriteCount,
- FilteredWrittenBytesPerSecond,
- DiskStats);
- }
- }
- });
+ Storage.BuildCacheStorage->PutBuildBlob(BuildId,
+ ChunkHash,
+ BuildBlob.GetContentType(),
+ CompositeBuffer(SharedBuffer(BuildBlob)));
}
- else
+ if (!BuildBlob)
{
- ZEN_TRACE_CPU("UpdateFolder_GetChunk");
- BuildBlob = Storage.BuildStorage->GetBuildBlob(BuildId, ChunkHash);
- if (BuildBlob && Storage.BuildCacheStorage)
- {
- Storage.BuildCacheStorage->PutBuildBlob(
- BuildId,
- ChunkHash,
- BuildBlob.GetContentType(),
- CompositeBuffer(SharedBuffer(BuildBlob)));
- }
- if (!BuildBlob)
- {
- throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash));
- }
- if (!PrimeCacheOnly)
+ throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash));
+ }
+ if (!PrimeCacheOnly)
+ {
+ if (!AbortFlag)
{
- if (!AbortFlag)
+ uint64_t BlobSize = BuildBlob.GetSize();
+ DownloadStats.DownloadedChunkCount++;
+ DownloadStats.DownloadedChunkByteCount += BlobSize;
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
{
- uint64_t BlobSize = BuildBlob.GetSize();
- DownloadStats.DownloadedChunkCount++;
- DownloadStats.DownloadedChunkByteCount += BlobSize;
- DownloadStats.RequestsCompleteCount++;
- if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
- AsyncWriteDownloadedChunk(ZenFolderPath,
- RemoteContent,
- RemoteLookup,
- RemoteChunkIndex,
- std::move(ChunkTargetPtrs),
- Work,
- WritePool,
- std::move(BuildBlob),
- SequenceIndexChunksLeftToWriteCounters,
- WritePartsComplete,
- TotalPartWriteCount,
- FilteredWrittenBytesPerSecond,
- DiskStats);
+ FilteredDownloadedBytesPerSecond.Stop();
}
+ AsyncWriteDownloadedChunk(ZenFolderPath,
+ RemoteContent,
+ RemoteLookup,
+ RemoteChunkIndex,
+ std::move(ChunkTargetPtrs),
+ Work,
+ WritePool,
+ std::move(BuildBlob),
+ SequenceIndexChunksLeftToWriteCounters,
+ WritePartsComplete,
+ TotalPartWriteCount,
+ FilteredWrittenBytesPerSecond,
+ DiskStats);
}
}
}
}
- },
- Work.DefaultErrorFunction());
- }
+ }
+ });
}
}
- },
- Work.DefaultErrorFunction());
+ }
+ });
}
for (size_t CopyDataIndex = 0; CopyDataIndex < CacheCopyDatas.size(); CopyDataIndex++)
@@ -6547,184 +6510,180 @@ namespace {
break;
}
- Work.ScheduleWork(
- WritePool,
- [&, CopyDataIndex](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_CopyLocal");
+ Work.ScheduleWork(WritePool, [&, CopyDataIndex](std::atomic<bool>&) {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("UpdateFolder_CopyLocal");
- FilteredWrittenBytesPerSecond.Start();
- const CacheCopyData& CopyData = CacheCopyDatas[CopyDataIndex];
+ FilteredWrittenBytesPerSecond.Start();
+ const CacheCopyData& CopyData = CacheCopyDatas[CopyDataIndex];
- std::filesystem::path SourceFilePath;
+ std::filesystem::path SourceFilePath;
- if (CopyData.ScavengeSourceIndex == (uint32_t)-1)
- {
- const uint32_t LocalPathIndex = LocalLookup.SequenceIndexFirstPathIndex[CopyData.SourceSequenceIndex];
- SourceFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred();
- }
- else
- {
- const ChunkedFolderContent& ScavengedContent = ScavengedContents[CopyData.ScavengeSourceIndex];
- const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[CopyData.ScavengeSourceIndex];
- const std::filesystem::path ScavengedPath = ScavengedPaths[CopyData.ScavengeSourceIndex];
- const uint32_t ScavengedPathIndex =
- ScavengedLookup.SequenceIndexFirstPathIndex[CopyData.SourceSequenceIndex];
- SourceFilePath = (ScavengedPath / ScavengedContent.Paths[ScavengedPathIndex]).make_preferred();
- }
- ZEN_ASSERT_SLOW(IsFile(SourceFilePath));
- ZEN_ASSERT(!CopyData.TargetChunkLocationPtrs.empty());
+ if (CopyData.ScavengeSourceIndex == (uint32_t)-1)
+ {
+ const uint32_t LocalPathIndex = LocalLookup.SequenceIndexFirstPathIndex[CopyData.SourceSequenceIndex];
+ SourceFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred();
+ }
+ else
+ {
+ const ChunkedFolderContent& ScavengedContent = ScavengedContents[CopyData.ScavengeSourceIndex];
+ const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[CopyData.ScavengeSourceIndex];
+ const std::filesystem::path ScavengedPath = ScavengedPaths[CopyData.ScavengeSourceIndex];
+ const uint32_t ScavengedPathIndex = ScavengedLookup.SequenceIndexFirstPathIndex[CopyData.SourceSequenceIndex];
+ SourceFilePath = (ScavengedPath / ScavengedContent.Paths[ScavengedPathIndex]).make_preferred();
+ }
+ ZEN_ASSERT_SLOW(IsFile(SourceFilePath));
+ ZEN_ASSERT(!CopyData.TargetChunkLocationPtrs.empty());
- uint64_t CacheLocalFileBytesRead = 0;
+ uint64_t CacheLocalFileBytesRead = 0;
- size_t TargetStart = 0;
- const std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> AllTargets(
- CopyData.TargetChunkLocationPtrs);
+ size_t TargetStart = 0;
+ const std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> AllTargets(
+ CopyData.TargetChunkLocationPtrs);
- struct WriteOp
- {
- const ChunkedContentLookup::ChunkSequenceLocation* Target = nullptr;
- uint64_t CacheFileOffset = (uint64_t)-1;
- uint32_t ChunkIndex = (uint32_t)-1;
- };
+ struct WriteOp
+ {
+ const ChunkedContentLookup::ChunkSequenceLocation* Target = nullptr;
+ uint64_t CacheFileOffset = (uint64_t)-1;
+ uint32_t ChunkIndex = (uint32_t)-1;
+ };
- std::vector<WriteOp> WriteOps;
+ std::vector<WriteOp> WriteOps;
- if (!AbortFlag)
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("Sort");
+ WriteOps.reserve(AllTargets.size());
+ for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets)
{
- ZEN_TRACE_CPU("Sort");
- WriteOps.reserve(AllTargets.size());
- for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets)
+ std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> TargetRange =
+ AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount);
+ for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange)
{
- std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> TargetRange =
- AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount);
- for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange)
- {
- WriteOps.push_back(WriteOp{.Target = Target,
- .CacheFileOffset = ChunkTarget.CacheFileOffset,
- .ChunkIndex = ChunkTarget.RemoteChunkIndex});
- }
- TargetStart += ChunkTarget.TargetChunkLocationCount;
+ WriteOps.push_back(WriteOp{.Target = Target,
+ .CacheFileOffset = ChunkTarget.CacheFileOffset,
+ .ChunkIndex = ChunkTarget.RemoteChunkIndex});
}
+ TargetStart += ChunkTarget.TargetChunkLocationCount;
+ }
- std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) {
- if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
- {
- return true;
- }
- else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
- {
- return false;
- }
- if (Lhs.Target->Offset < Rhs.Target->Offset)
- {
- return true;
- }
+ std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) {
+ if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
+ {
+ return true;
+ }
+ else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
+ {
return false;
- });
- }
+ }
+ if (Lhs.Target->Offset < Rhs.Target->Offset)
+ {
+ return true;
+ }
+ return false;
+ });
+ }
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("Write");
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("Write");
- tsl::robin_set<uint32_t> ChunkIndexesWritten;
+ tsl::robin_set<uint32_t> ChunkIndexesWritten;
- BufferedOpenFile SourceFile(SourceFilePath, DiskStats);
- WriteFileCache OpenFileCache(DiskStats);
- for (size_t WriteOpIndex = 0; WriteOpIndex < WriteOps.size();)
+ BufferedOpenFile SourceFile(SourceFilePath, DiskStats);
+ WriteFileCache OpenFileCache(DiskStats);
+ for (size_t WriteOpIndex = 0; WriteOpIndex < WriteOps.size();)
+ {
+ if (AbortFlag)
{
- if (AbortFlag)
+ break;
+ }
+ const WriteOp& Op = WriteOps[WriteOpIndex];
+
+ const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() <=
+ RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]);
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() > 0);
+ const uint32_t RemotePathIndex = RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex];
+ const uint64_t ChunkSize = RemoteContent.ChunkedContent.ChunkRawSizes[Op.ChunkIndex];
+
+ uint64_t ReadLength = ChunkSize;
+ size_t WriteCount = 1;
+ uint64_t OpSourceEnd = Op.CacheFileOffset + ChunkSize;
+ uint64_t OpTargetEnd = Op.Target->Offset + ChunkSize;
+ while ((WriteOpIndex + WriteCount) < WriteOps.size())
+ {
+ const WriteOp& NextOp = WriteOps[WriteOpIndex + WriteCount];
+ if (NextOp.Target->SequenceIndex != Op.Target->SequenceIndex)
{
break;
}
- const WriteOp& Op = WriteOps[WriteOpIndex];
-
- const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
- ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() <=
- RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]);
- ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() > 0);
- const uint32_t RemotePathIndex = RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex];
- const uint64_t ChunkSize = RemoteContent.ChunkedContent.ChunkRawSizes[Op.ChunkIndex];
-
- uint64_t ReadLength = ChunkSize;
- size_t WriteCount = 1;
- uint64_t OpSourceEnd = Op.CacheFileOffset + ChunkSize;
- uint64_t OpTargetEnd = Op.Target->Offset + ChunkSize;
- while ((WriteOpIndex + WriteCount) < WriteOps.size())
+ if (NextOp.Target->Offset != OpTargetEnd)
{
- const WriteOp& NextOp = WriteOps[WriteOpIndex + WriteCount];
- if (NextOp.Target->SequenceIndex != Op.Target->SequenceIndex)
- {
- break;
- }
- if (NextOp.Target->Offset != OpTargetEnd)
- {
- break;
- }
- if (NextOp.CacheFileOffset != OpSourceEnd)
- {
- break;
- }
- const uint64_t NextChunkLength = RemoteContent.ChunkedContent.ChunkRawSizes[NextOp.ChunkIndex];
- if (ReadLength + NextChunkLength > 512u * 1024u)
- {
- break;
- }
- ReadLength += NextChunkLength;
- OpSourceEnd += NextChunkLength;
- OpTargetEnd += NextChunkLength;
- WriteCount++;
+ break;
+ }
+ if (NextOp.CacheFileOffset != OpSourceEnd)
+ {
+ break;
+ }
+ const uint64_t NextChunkLength = RemoteContent.ChunkedContent.ChunkRawSizes[NextOp.ChunkIndex];
+ if (ReadLength + NextChunkLength > 512u * 1024u)
+ {
+ break;
}
+ ReadLength += NextChunkLength;
+ OpSourceEnd += NextChunkLength;
+ OpTargetEnd += NextChunkLength;
+ WriteCount++;
+ }
- CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ReadLength);
- ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]);
+ CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ReadLength);
+ ZEN_ASSERT(Op.Target->Offset + ChunkSource.GetSize() <= RemoteContent.RawSizes[RemotePathIndex]);
- OpenFileCache.WriteToFile<CompositeBuffer>(
- RemoteSequenceIndex,
- [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) {
- return GetTempChunkedSequenceFileName(
- CacheFolderPath,
- RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
- },
- ChunkSource,
- Op.Target->Offset,
- RemoteContent.RawSizes[RemotePathIndex]);
+ OpenFileCache.WriteToFile<CompositeBuffer>(
+ RemoteSequenceIndex,
+ [&CacheFolderPath, &RemoteContent](uint32_t SequenceIndex) {
+ return GetTempChunkedSequenceFileName(
+ CacheFolderPath,
+ RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
+ },
+ ChunkSource,
+ Op.Target->Offset,
+ RemoteContent.RawSizes[RemotePathIndex]);
- CacheLocalFileBytesRead += ReadLength; // TODO: This should be the sum of unique chunk sizes?
+ CacheLocalFileBytesRead += ReadLength; // TODO: This should be the sum of unique chunk sizes?
- WriteOpIndex += WriteCount;
- }
+ WriteOpIndex += WriteCount;
}
- if (!AbortFlag)
+ }
+ if (!AbortFlag)
+ {
+ // Write tracking, updating this must be done without any files open (WriteFileCache)
+ std::vector<uint32_t> CompletedChunkSequences;
+ for (const WriteOp& Op : WriteOps)
{
- // Write tracking, updating this must be done without any files open (WriteFileCache)
- std::vector<uint32_t> CompletedChunkSequences;
- for (const WriteOp& Op : WriteOps)
+ const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
+ if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters))
{
- const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
- if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters))
- {
- CompletedChunkSequences.push_back(RemoteSequenceIndex);
- }
+ CompletedChunkSequences.push_back(RemoteSequenceIndex);
}
- VerifyAndCompleteChunkSequencesAsync(CacheFolderPath,
- RemoteContent,
- RemoteLookup,
- CompletedChunkSequences,
- Work,
- WritePool);
- ZEN_CONSOLE_VERBOSE("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), SourceFilePath);
- }
- WritePartsComplete++;
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
}
+ VerifyAndCompleteChunkSequencesAsync(CacheFolderPath,
+ RemoteContent,
+ RemoteLookup,
+ CompletedChunkSequences,
+ Work,
+ WritePool);
+ ZEN_CONSOLE_VERBOSE("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), SourceFilePath);
}
- },
- Work.DefaultErrorFunction());
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
+ }
+ });
}
for (uint32_t BlockIndex : CachedChunkBlockIndexes)
@@ -6735,52 +6694,49 @@ namespace {
break;
}
- Work.ScheduleWork(
- WritePool,
- [&, BlockIndex](std::atomic<bool>&) mutable {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_WriteCachedBlock");
+ Work.ScheduleWork(WritePool, [&, BlockIndex](std::atomic<bool>&) mutable {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("UpdateFolder_WriteCachedBlock");
- const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
- FilteredWrittenBytesPerSecond.Start();
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+ FilteredWrittenBytesPerSecond.Start();
- std::filesystem::path BlockChunkPath =
- ZenTempBlockFolderPath(ZenFolderPath) / BlockDescription.BlockHash.ToHexString();
- IoBuffer BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
- if (!BlockBuffer)
+ std::filesystem::path BlockChunkPath =
+ ZenTempBlockFolderPath(ZenFolderPath) / BlockDescription.BlockHash.ToHexString();
+ IoBuffer BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
+ if (!BlockBuffer)
+ {
+ throw std::runtime_error(
+ fmt::format("Can not read block {} at {}", BlockDescription.BlockHash, BlockChunkPath));
+ }
+
+ if (!AbortFlag)
+ {
+ if (!WriteBlockToDisk(CacheFolderPath,
+ RemoteContent,
+ BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ WritePool,
+ CompositeBuffer(std::move(BlockBuffer)),
+ RemoteLookup,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ DiskStats))
{
- throw std::runtime_error(
- fmt::format("Can not read block {} at {}", BlockDescription.BlockHash, BlockChunkPath));
+ std::error_code DummyEc;
+ RemoveFile(BlockChunkPath, DummyEc);
+ throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash));
}
-
- if (!AbortFlag)
+ WritePartsComplete++;
+ RemoveFileWithRetry(BlockChunkPath);
+ if (WritePartsComplete == TotalPartWriteCount)
{
- if (!WriteBlockToDisk(CacheFolderPath,
- RemoteContent,
- BlockDescription,
- SequenceIndexChunksLeftToWriteCounters,
- Work,
- WritePool,
- CompositeBuffer(std::move(BlockBuffer)),
- RemoteLookup,
- RemoteChunkIndexNeedsCopyFromSourceFlags,
- DiskStats))
- {
- std::error_code DummyEc;
- RemoveFile(BlockChunkPath, DummyEc);
- throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash));
- }
- WritePartsComplete++;
- RemoveFileWithRetry(BlockChunkPath);
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
- }
+ FilteredWrittenBytesPerSecond.Stop();
}
}
- },
- Work.DefaultErrorFunction());
+ }
+ });
}
for (size_t BlockRangeIndex = 0; BlockRangeIndex < BlockRangeWorks.size(); BlockRangeIndex++)
@@ -6793,46 +6749,214 @@ namespace {
const BlockRangeDescriptor BlockRange = BlockRangeWorks[BlockRangeIndex];
ZEN_ASSERT(BlockRange.BlockIndex != (uint32_t)-1);
const uint32_t BlockIndex = BlockRange.BlockIndex;
- Work.ScheduleWork(
- NetworkPool,
- [&, BlockIndex, BlockRange](std::atomic<bool>&) {
+ Work.ScheduleWork(NetworkPool, [&, BlockIndex, BlockRange](std::atomic<bool>&) {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("UpdateFolder_GetPartialBlock");
+
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+
+ FilteredDownloadedBytesPerSecond.Start();
+ IoBuffer BlockBuffer;
+ if (Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash))
+ {
+ BlockBuffer = Storage.BuildCacheStorage->GetBuildBlob(BuildId,
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength);
+ }
+ if (!BlockBuffer)
+ {
+ BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId,
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength);
+ }
+ if (!BlockBuffer)
+ {
+ throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
+ }
if (!AbortFlag)
{
- ZEN_TRACE_CPU("UpdateFolder_GetPartialBlock");
+ uint64_t BlockSize = BlockBuffer.GetSize();
+ DownloadStats.DownloadedBlockCount++;
+ DownloadStats.DownloadedBlockByteCount += BlockSize;
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
- const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+ std::filesystem::path BlockChunkPath;
- FilteredDownloadedBytesPerSecond.Start();
- IoBuffer BlockBuffer;
- if (Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash))
+ // Check if the dowloaded block is file based and we can move it directly without rewriting it
{
- BlockBuffer = Storage.BuildCacheStorage->GetBuildBlob(BuildId,
- BlockDescription.BlockHash,
- BlockRange.RangeStart,
- BlockRange.RangeLength);
+ IoBufferFileReference FileRef;
+ if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) &&
+ (FileRef.FileChunkSize == BlockSize))
+ {
+ ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock");
+
+ std::error_code Ec;
+ std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
+ if (!Ec)
+ {
+ BlockBuffer.SetDeleteOnClose(false);
+ BlockBuffer = {};
+ BlockChunkPath = ZenTempBlockFolderPath(ZenFolderPath) / fmt::format("{}_{:x}_{:x}",
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength);
+ RenameFile(TempBlobPath, BlockChunkPath, Ec);
+ if (Ec)
+ {
+ BlockChunkPath = std::filesystem::path{};
+
+ // Re-open the temp file again
+ BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
+ BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true);
+ BlockBuffer.SetDeleteOnClose(true);
+ }
+ }
+ }
}
- if (!BlockBuffer)
+
+ if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u))
{
- BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId,
- BlockDescription.BlockHash,
- BlockRange.RangeStart,
- BlockRange.RangeLength);
+ ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock");
+ // Could not be moved and rather large, lets store it on disk
+ BlockChunkPath =
+ ZenTempBlockFolderPath(ZenFolderPath) /
+ fmt::format("{}_{:x}_{:x}", BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength);
+ TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
+ BlockBuffer = {};
}
- if (!BlockBuffer)
+
+ if (!AbortFlag)
{
- throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
+ Work.ScheduleWork(
+ WritePool,
+ [&, BlockIndex, BlockRange, BlockChunkPath, BlockPartialBuffer = std::move(BlockBuffer)](
+ std::atomic<bool>&) mutable {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("UpdateFolder_WritePartialBlock");
+
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+
+ if (BlockChunkPath.empty())
+ {
+ ZEN_ASSERT(BlockPartialBuffer);
+ }
+ else
+ {
+ ZEN_ASSERT(!BlockPartialBuffer);
+ BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
+ if (!BlockPartialBuffer)
+ {
+ throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}",
+ BlockDescription.BlockHash,
+ BlockChunkPath));
+ }
+ }
+
+ FilteredWrittenBytesPerSecond.Start();
+
+ if (!WritePartialBlockToDisk(
+ CacheFolderPath,
+ RemoteContent,
+ BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ WritePool,
+ CompositeBuffer(std::move(BlockPartialBuffer)),
+ BlockRange.ChunkBlockIndexStart,
+ BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1,
+ RemoteLookup,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ DiskStats))
+ {
+ std::error_code DummyEc;
+ RemoveFile(BlockChunkPath, DummyEc);
+ throw std::runtime_error(
+ fmt::format("Partial block {} is malformed", BlockDescription.BlockHash));
+ }
+
+ if (!BlockChunkPath.empty())
+ {
+ RemoveFileWithRetry(BlockChunkPath);
+ }
+
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
+ }
+ });
}
- if (!AbortFlag)
+ }
+ }
+ });
+ }
+
+ for (uint32_t BlockIndex : FullBlockWorks)
+ {
+ if (AbortFlag)
+ {
+ break;
+ }
+
+ if (PrimeCacheOnly && ExistsResult.ExistingBlobs.contains(BlockDescriptions[BlockIndex].BlockHash))
+ {
+ DownloadStats.RequestsCompleteCount++;
+ continue;
+ }
+
+ Work.ScheduleWork(NetworkPool, [&, BlockIndex](std::atomic<bool>&) {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("UpdateFolder_GetFullBlock");
+
+ const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
+
+ FilteredDownloadedBytesPerSecond.Start();
+
+ IoBuffer BlockBuffer;
+ const bool ExistsInCache =
+ Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash);
+ if (ExistsInCache)
+ {
+ BlockBuffer = Storage.BuildCacheStorage->GetBuildBlob(BuildId, BlockDescription.BlockHash);
+ }
+ if (!BlockBuffer)
+ {
+ BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId, BlockDescription.BlockHash);
+ if (BlockBuffer && Storage.BuildCacheStorage)
{
- uint64_t BlockSize = BlockBuffer.GetSize();
- DownloadStats.DownloadedBlockCount++;
- DownloadStats.DownloadedBlockByteCount += BlockSize;
- DownloadStats.RequestsCompleteCount++;
- if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
+ Storage.BuildCacheStorage->PutBuildBlob(BuildId,
+ BlockDescription.BlockHash,
+ BlockBuffer.GetContentType(),
+ CompositeBuffer(SharedBuffer(BlockBuffer)));
+ }
+ }
+ if (!BlockBuffer)
+ {
+ throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
+ }
+ if (!AbortFlag)
+ {
+ uint64_t BlockSize = BlockBuffer.GetSize();
+ DownloadStats.DownloadedBlockCount++;
+ DownloadStats.DownloadedBlockByteCount += BlockSize;
+ DownloadStats.RequestsCompleteCount++;
+ if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ if (!PrimeCacheOnly)
+ {
std::filesystem::path BlockChunkPath;
// Check if the dowloaded block is file based and we can move it directly without rewriting it
@@ -6842,17 +6966,14 @@ namespace {
(FileRef.FileChunkSize == BlockSize))
{
ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock");
-
std::error_code Ec;
std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
if (!Ec)
{
BlockBuffer.SetDeleteOnClose(false);
- BlockBuffer = {};
- BlockChunkPath = ZenTempBlockFolderPath(ZenFolderPath) / fmt::format("{}_{:x}_{:x}",
- BlockDescription.BlockHash,
- BlockRange.RangeStart,
- BlockRange.RangeLength);
+ BlockBuffer = {};
+ BlockChunkPath =
+ ZenTempBlockFolderPath(ZenFolderPath) / BlockDescription.BlockHash.ToHexString();
RenameFile(TempBlobPath, BlockChunkPath, Ec);
if (Ec)
{
@@ -6871,10 +6992,7 @@ namespace {
{
ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock");
// Could not be moved and rather large, lets store it on disk
- BlockChunkPath = ZenTempBlockFolderPath(ZenFolderPath) / fmt::format("{}_{:x}_{:x}",
- BlockDescription.BlockHash,
- BlockRange.RangeStart,
- BlockRange.RangeLength);
+ BlockChunkPath = ZenTempBlockFolderPath(ZenFolderPath) / BlockDescription.BlockHash.ToHexString();
TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
BlockBuffer = {};
}
@@ -6883,50 +7001,60 @@ namespace {
{
Work.ScheduleWork(
WritePool,
- [&, BlockIndex, BlockRange, BlockChunkPath, BlockPartialBuffer = std::move(BlockBuffer)](
- std::atomic<bool>&) mutable {
+ [&Work,
+ &WritePool,
+ &RemoteContent,
+ &RemoteLookup,
+ CacheFolderPath,
+ &RemoteChunkIndexNeedsCopyFromSourceFlags,
+ &SequenceIndexChunksLeftToWriteCounters,
+ BlockIndex,
+ &BlockDescriptions,
+ &WriteChunkStats,
+ &DiskStats,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ &FilteredWrittenBytesPerSecond,
+ BlockChunkPath,
+ BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable {
if (!AbortFlag)
{
- ZEN_TRACE_CPU("UpdateFolder_WritePartialBlock");
+ ZEN_TRACE_CPU("UpdateFolder_WriteFullBlock");
const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
if (BlockChunkPath.empty())
{
- ZEN_ASSERT(BlockPartialBuffer);
+ ZEN_ASSERT(BlockBuffer);
}
else
{
- ZEN_ASSERT(!BlockPartialBuffer);
- BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
- if (!BlockPartialBuffer)
+ ZEN_ASSERT(!BlockBuffer);
+ BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
+ if (!BlockBuffer)
{
- throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}",
+ throw std::runtime_error(fmt::format("Could not open dowloaded block {} from {}",
BlockDescription.BlockHash,
BlockChunkPath));
}
}
FilteredWrittenBytesPerSecond.Start();
-
- if (!WritePartialBlockToDisk(
- CacheFolderPath,
- RemoteContent,
- BlockDescription,
- SequenceIndexChunksLeftToWriteCounters,
- Work,
- WritePool,
- CompositeBuffer(std::move(BlockPartialBuffer)),
- BlockRange.ChunkBlockIndexStart,
- BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1,
- RemoteLookup,
- RemoteChunkIndexNeedsCopyFromSourceFlags,
- DiskStats))
+ if (!WriteBlockToDisk(CacheFolderPath,
+ RemoteContent,
+ BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ WritePool,
+ CompositeBuffer(std::move(BlockBuffer)),
+ RemoteLookup,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ DiskStats))
{
std::error_code DummyEc;
RemoveFile(BlockChunkPath, DummyEc);
throw std::runtime_error(
- fmt::format("Partial block {} is malformed", BlockDescription.BlockHash));
+ fmt::format("Block {} is malformed", BlockDescription.BlockHash));
}
if (!BlockChunkPath.empty())
@@ -6935,200 +7063,18 @@ namespace {
}
WritePartsComplete++;
+
if (WritePartsComplete == TotalPartWriteCount)
{
FilteredWrittenBytesPerSecond.Stop();
}
}
- },
- Work.DefaultErrorFunction());
- }
- }
- }
- },
- Work.DefaultErrorFunction());
- }
-
- for (uint32_t BlockIndex : FullBlockWorks)
- {
- if (AbortFlag)
- {
- break;
- }
-
- if (PrimeCacheOnly && ExistsResult.ExistingBlobs.contains(BlockDescriptions[BlockIndex].BlockHash))
- {
- DownloadStats.RequestsCompleteCount++;
- continue;
- }
-
- Work.ScheduleWork(
- NetworkPool,
- [&, BlockIndex](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_GetFullBlock");
-
- const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
-
- FilteredDownloadedBytesPerSecond.Start();
-
- IoBuffer BlockBuffer;
- const bool ExistsInCache =
- Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash);
- if (ExistsInCache)
- {
- BlockBuffer = Storage.BuildCacheStorage->GetBuildBlob(BuildId, BlockDescription.BlockHash);
- }
- if (!BlockBuffer)
- {
- BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId, BlockDescription.BlockHash);
- if (BlockBuffer && Storage.BuildCacheStorage)
- {
- Storage.BuildCacheStorage->PutBuildBlob(BuildId,
- BlockDescription.BlockHash,
- BlockBuffer.GetContentType(),
- CompositeBuffer(SharedBuffer(BlockBuffer)));
- }
- }
- if (!BlockBuffer)
- {
- throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
- }
- if (!AbortFlag)
- {
- uint64_t BlockSize = BlockBuffer.GetSize();
- DownloadStats.DownloadedBlockCount++;
- DownloadStats.DownloadedBlockByteCount += BlockSize;
- DownloadStats.RequestsCompleteCount++;
- if (DownloadStats.RequestsCompleteCount == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
-
- if (!PrimeCacheOnly)
- {
- std::filesystem::path BlockChunkPath;
-
- // Check if the dowloaded block is file based and we can move it directly without rewriting it
- {
- IoBufferFileReference FileRef;
- if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) &&
- (FileRef.FileChunkSize == BlockSize))
- {
- ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock");
- std::error_code Ec;
- std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
- if (!Ec)
- {
- BlockBuffer.SetDeleteOnClose(false);
- BlockBuffer = {};
- BlockChunkPath =
- ZenTempBlockFolderPath(ZenFolderPath) / BlockDescription.BlockHash.ToHexString();
- RenameFile(TempBlobPath, BlockChunkPath, Ec);
- if (Ec)
- {
- BlockChunkPath = std::filesystem::path{};
-
- // Re-open the temp file again
- BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
- BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true);
- BlockBuffer.SetDeleteOnClose(true);
- }
- }
- }
- }
-
- if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u))
- {
- ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock");
- // Could not be moved and rather large, lets store it on disk
- BlockChunkPath = ZenTempBlockFolderPath(ZenFolderPath) / BlockDescription.BlockHash.ToHexString();
- TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
- BlockBuffer = {};
- }
-
- if (!AbortFlag)
- {
- Work.ScheduleWork(
- WritePool,
- [&Work,
- &WritePool,
- &RemoteContent,
- &RemoteLookup,
- CacheFolderPath,
- &RemoteChunkIndexNeedsCopyFromSourceFlags,
- &SequenceIndexChunksLeftToWriteCounters,
- BlockIndex,
- &BlockDescriptions,
- &WriteChunkStats,
- &DiskStats,
- &WritePartsComplete,
- TotalPartWriteCount,
- &FilteredWrittenBytesPerSecond,
- BlockChunkPath,
- BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_WriteFullBlock");
-
- const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex];
-
- if (BlockChunkPath.empty())
- {
- ZEN_ASSERT(BlockBuffer);
- }
- else
- {
- ZEN_ASSERT(!BlockBuffer);
- BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
- if (!BlockBuffer)
- {
- throw std::runtime_error(
- fmt::format("Could not open dowloaded block {} from {}",
- BlockDescription.BlockHash,
- BlockChunkPath));
- }
- }
-
- FilteredWrittenBytesPerSecond.Start();
- if (!WriteBlockToDisk(CacheFolderPath,
- RemoteContent,
- BlockDescription,
- SequenceIndexChunksLeftToWriteCounters,
- Work,
- WritePool,
- CompositeBuffer(std::move(BlockBuffer)),
- RemoteLookup,
- RemoteChunkIndexNeedsCopyFromSourceFlags,
- DiskStats))
- {
- std::error_code DummyEc;
- RemoveFile(BlockChunkPath, DummyEc);
- throw std::runtime_error(
- fmt::format("Block {} is malformed", BlockDescription.BlockHash));
- }
-
- if (!BlockChunkPath.empty())
- {
- RemoveFileWithRetry(BlockChunkPath);
- }
-
- WritePartsComplete++;
-
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
- }
- }
- },
- Work.DefaultErrorFunction());
- }
+ });
}
}
}
- },
- Work.DefaultErrorFunction());
+ }
+ });
}
{
@@ -7321,8 +7267,8 @@ namespace {
WorkerThreadPool& WritePool = GetIOWorkerPool();
- ProgressBar CacheLocalProgressBar(ProgressMode, "Cache Local Data");
- ParallellWork Work(AbortFlag);
+ ProgressBar CacheLocalProgressBar(ProgressMode, "Cache Local Data");
+ ParallelWork Work(AbortFlag);
for (uint32_t LocalPathIndex : FilesToCache)
{
@@ -7330,23 +7276,20 @@ namespace {
{
break;
}
- Work.ScheduleWork(
- WritePool,
- [&, LocalPathIndex](std::atomic<bool>&) {
- ZEN_TRACE_CPU("UpdateFolder_AsyncCopyToCache");
- if (!AbortFlag)
- {
- const IoHash& RawHash = LocalContent.RawHashes[LocalPathIndex];
- const std::filesystem::path& LocalPath = LocalContent.Paths[LocalPathIndex];
- const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash);
- ZEN_ASSERT_SLOW(!IsFileWithRetry(CacheFilePath));
- const std::filesystem::path LocalFilePath = (Path / LocalPath).make_preferred();
- RenameFileWithRetry(LocalFilePath, CacheFilePath);
- CachedCount++;
- CachedByteCount += LocalContent.RawSizes[LocalPathIndex];
- }
- },
- Work.DefaultErrorFunction());
+ Work.ScheduleWork(WritePool, [&, LocalPathIndex](std::atomic<bool>&) {
+ ZEN_TRACE_CPU("UpdateFolder_AsyncCopyToCache");
+ if (!AbortFlag)
+ {
+ const IoHash& RawHash = LocalContent.RawHashes[LocalPathIndex];
+ const std::filesystem::path& LocalPath = LocalContent.Paths[LocalPathIndex];
+ const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash);
+ ZEN_ASSERT_SLOW(!IsFileWithRetry(CacheFilePath));
+ const std::filesystem::path LocalFilePath = (Path / LocalPath).make_preferred();
+ RenameFileWithRetry(LocalFilePath, CacheFilePath);
+ CachedCount++;
+ CachedByteCount += LocalContent.RawSizes[LocalPathIndex];
+ }
+ });
}
{
@@ -7409,8 +7352,8 @@ namespace {
WorkerThreadPool& WritePool = GetIOWorkerPool();
- ProgressBar RebuildProgressBar(ProgressMode, "Rebuild State");
- ParallellWork Work(AbortFlag);
+ ProgressBar RebuildProgressBar(ProgressMode, "Rebuild State");
+ ParallelWork Work(AbortFlag);
OutLocalFolderState.Paths.resize(RemoteContent.Paths.size());
OutLocalFolderState.RawSizes.resize(RemoteContent.Paths.size());
@@ -7425,18 +7368,15 @@ namespace {
{
break;
}
- Work.ScheduleWork(
- WritePool,
- [&, LocalPathIndex](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred();
- SetFileReadOnlyWithRetry(LocalFilePath, false);
- RemoveFileWithRetry(LocalFilePath);
- DeletedCount++;
- }
- },
- Work.DefaultErrorFunction());
+ Work.ScheduleWork(WritePool, [&, LocalPathIndex](std::atomic<bool>&) {
+ if (!AbortFlag)
+ {
+ const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred();
+ SetFileReadOnlyWithRetry(LocalFilePath, false);
+ RemoveFileWithRetry(LocalFilePath);
+ DeletedCount++;
+ }
+ });
}
std::atomic<uint64_t> TargetsComplete = 0;
@@ -7479,166 +7419,158 @@ namespace {
TargetCount++;
}
- Work.ScheduleWork(
- WritePool,
- [&, BaseTargetOffset = TargetOffset, TargetCount](std::atomic<bool>&) {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("FinalizeTree_Work");
+ Work.ScheduleWork(WritePool, [&, BaseTargetOffset = TargetOffset, TargetCount](std::atomic<bool>&) {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("FinalizeTree_Work");
- size_t TargetOffset = BaseTargetOffset;
- const IoHash& RawHash = Targets[TargetOffset].RawHash;
+ size_t TargetOffset = BaseTargetOffset;
+ const IoHash& RawHash = Targets[TargetOffset].RawHash;
- if (RawHash == IoHash::Zero)
+ if (RawHash == IoHash::Zero)
+ {
+ ZEN_TRACE_CPU("ZeroSize");
+ while (TargetOffset < (BaseTargetOffset + TargetCount))
{
- ZEN_TRACE_CPU("ZeroSize");
- while (TargetOffset < (BaseTargetOffset + TargetCount))
+ const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex;
+ ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash);
+ const std::filesystem::path& TargetPath = RemoteContent.Paths[RemotePathIndex];
+ std::filesystem::path TargetFilePath = (Path / TargetPath).make_preferred();
+ if (!RemotePathIndexToLocalPathIndex[RemotePathIndex])
{
- const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex;
- ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash);
- const std::filesystem::path& TargetPath = RemoteContent.Paths[RemotePathIndex];
- std::filesystem::path TargetFilePath = (Path / TargetPath).make_preferred();
- if (!RemotePathIndexToLocalPathIndex[RemotePathIndex])
+ if (IsFileWithRetry(TargetFilePath))
{
- if (IsFileWithRetry(TargetFilePath))
- {
- SetFileReadOnlyWithRetry(TargetFilePath, false);
- }
- else
- {
- CreateDirectories(TargetFilePath.parent_path());
- }
- BasicFile OutputFile;
- OutputFile.Open(TargetFilePath, BasicFile::Mode::kTruncate);
+ SetFileReadOnlyWithRetry(TargetFilePath, false);
+ }
+ else
+ {
+ CreateDirectories(TargetFilePath.parent_path());
}
- OutLocalFolderState.Paths[RemotePathIndex] = TargetPath;
- OutLocalFolderState.RawSizes[RemotePathIndex] = RemoteContent.RawSizes[RemotePathIndex];
-
- OutLocalFolderState.Attributes[RemotePathIndex] =
- RemoteContent.Attributes.empty()
- ? GetNativeFileAttributes(TargetFilePath)
- : SetNativeFileAttributes(TargetFilePath,
- RemoteContent.Platform,
- RemoteContent.Attributes[RemotePathIndex]);
- OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath);
-
- TargetOffset++;
- TargetsComplete++;
+ BasicFile OutputFile;
+ OutputFile.Open(TargetFilePath, BasicFile::Mode::kTruncate);
}
+ OutLocalFolderState.Paths[RemotePathIndex] = TargetPath;
+ OutLocalFolderState.RawSizes[RemotePathIndex] = RemoteContent.RawSizes[RemotePathIndex];
+
+ OutLocalFolderState.Attributes[RemotePathIndex] =
+ RemoteContent.Attributes.empty() ? GetNativeFileAttributes(TargetFilePath)
+ : SetNativeFileAttributes(TargetFilePath,
+ RemoteContent.Platform,
+ RemoteContent.Attributes[RemotePathIndex]);
+ OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath);
+
+ TargetOffset++;
+ TargetsComplete++;
+ }
+ }
+ else
+ {
+ ZEN_TRACE_CPU("Files");
+ ZEN_ASSERT(RemoteLookup.RawHashToSequenceIndex.contains(RawHash));
+ const uint32_t FirstRemotePathIndex = Targets[TargetOffset].RemotePathIndex;
+ const std::filesystem::path& FirstTargetPath = RemoteContent.Paths[FirstRemotePathIndex];
+ std::filesystem::path FirstTargetFilePath = (Path / FirstTargetPath).make_preferred();
+
+ if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(FirstRemotePathIndex);
+ InPlaceIt != RemotePathIndexToLocalPathIndex.end())
+ {
+ ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath));
}
else
{
- ZEN_TRACE_CPU("Files");
- ZEN_ASSERT(RemoteLookup.RawHashToSequenceIndex.contains(RawHash));
- const uint32_t FirstRemotePathIndex = Targets[TargetOffset].RemotePathIndex;
- const std::filesystem::path& FirstTargetPath = RemoteContent.Paths[FirstRemotePathIndex];
- std::filesystem::path FirstTargetFilePath = (Path / FirstTargetPath).make_preferred();
-
- if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(FirstRemotePathIndex);
- InPlaceIt != RemotePathIndexToLocalPathIndex.end())
+ if (IsFileWithRetry(FirstTargetFilePath))
{
- ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath));
+ SetFileReadOnlyWithRetry(FirstTargetFilePath, false);
}
else
{
- if (IsFileWithRetry(FirstTargetFilePath))
- {
- SetFileReadOnlyWithRetry(FirstTargetFilePath, false);
- }
- else
- {
- CreateDirectories(FirstTargetFilePath.parent_path());
- }
+ CreateDirectories(FirstTargetFilePath.parent_path());
+ }
- if (auto InplaceIt = SequenceHashToLocalPathIndex.find(RawHash);
- InplaceIt != SequenceHashToLocalPathIndex.end())
- {
- ZEN_TRACE_CPU("Copy");
- const uint32_t LocalPathIndex = InplaceIt->second;
- const std::filesystem::path& SourcePath = LocalContent.Paths[LocalPathIndex];
- std::filesystem::path SourceFilePath = (Path / SourcePath).make_preferred();
- ZEN_ASSERT_SLOW(IsFileWithRetry(SourceFilePath));
-
- ZEN_DEBUG("Copying from '{}' -> '{}'", SourceFilePath, FirstTargetFilePath);
- CopyFile(SourceFilePath, FirstTargetFilePath, {.EnableClone = false});
- RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++;
- }
- else
- {
- ZEN_TRACE_CPU("Rename");
- const std::filesystem::path CacheFilePath =
- GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash);
- ZEN_ASSERT_SLOW(IsFileWithRetry(CacheFilePath));
+ if (auto InplaceIt = SequenceHashToLocalPathIndex.find(RawHash);
+ InplaceIt != SequenceHashToLocalPathIndex.end())
+ {
+ ZEN_TRACE_CPU("Copy");
+ const uint32_t LocalPathIndex = InplaceIt->second;
+ const std::filesystem::path& SourcePath = LocalContent.Paths[LocalPathIndex];
+ std::filesystem::path SourceFilePath = (Path / SourcePath).make_preferred();
+ ZEN_ASSERT_SLOW(IsFileWithRetry(SourceFilePath));
+
+ ZEN_DEBUG("Copying from '{}' -> '{}'", SourceFilePath, FirstTargetFilePath);
+ CopyFile(SourceFilePath, FirstTargetFilePath, {.EnableClone = false});
+ RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++;
+ }
+ else
+ {
+ ZEN_TRACE_CPU("Rename");
+ const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash);
+ ZEN_ASSERT_SLOW(IsFileWithRetry(CacheFilePath));
- RenameFileWithRetry(CacheFilePath, FirstTargetFilePath);
+ RenameFileWithRetry(CacheFilePath, FirstTargetFilePath);
- RebuildFolderStateStats.FinalizeTreeFilesMovedCount++;
- }
+ RebuildFolderStateStats.FinalizeTreeFilesMovedCount++;
}
+ }
- OutLocalFolderState.Paths[FirstRemotePathIndex] = FirstTargetPath;
- OutLocalFolderState.RawSizes[FirstRemotePathIndex] = RemoteContent.RawSizes[FirstRemotePathIndex];
+ OutLocalFolderState.Paths[FirstRemotePathIndex] = FirstTargetPath;
+ OutLocalFolderState.RawSizes[FirstRemotePathIndex] = RemoteContent.RawSizes[FirstRemotePathIndex];
- OutLocalFolderState.Attributes[FirstRemotePathIndex] =
- RemoteContent.Attributes.empty()
- ? GetNativeFileAttributes(FirstTargetFilePath)
- : SetNativeFileAttributes(FirstTargetFilePath,
- RemoteContent.Platform,
- RemoteContent.Attributes[FirstRemotePathIndex]);
- OutLocalFolderState.ModificationTicks[FirstRemotePathIndex] =
- GetModificationTickFromPath(FirstTargetFilePath);
+ OutLocalFolderState.Attributes[FirstRemotePathIndex] =
+ RemoteContent.Attributes.empty() ? GetNativeFileAttributes(FirstTargetFilePath)
+ : SetNativeFileAttributes(FirstTargetFilePath,
+ RemoteContent.Platform,
+ RemoteContent.Attributes[FirstRemotePathIndex]);
+ OutLocalFolderState.ModificationTicks[FirstRemotePathIndex] = GetModificationTickFromPath(FirstTargetFilePath);
- TargetOffset++;
- TargetsComplete++;
+ TargetOffset++;
+ TargetsComplete++;
- while (TargetOffset < (BaseTargetOffset + TargetCount))
- {
- const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex;
- ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash);
- const std::filesystem::path& TargetPath = RemoteContent.Paths[RemotePathIndex];
- std::filesystem::path TargetFilePath = (Path / TargetPath).make_preferred();
+ while (TargetOffset < (BaseTargetOffset + TargetCount))
+ {
+ const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex;
+ ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash);
+ const std::filesystem::path& TargetPath = RemoteContent.Paths[RemotePathIndex];
+ std::filesystem::path TargetFilePath = (Path / TargetPath).make_preferred();
- if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(RemotePathIndex);
- InPlaceIt != RemotePathIndexToLocalPathIndex.end())
+ if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(RemotePathIndex);
+ InPlaceIt != RemotePathIndexToLocalPathIndex.end())
+ {
+ ZEN_ASSERT_SLOW(IsFileWithRetry(TargetFilePath));
+ }
+ else
+ {
+ ZEN_TRACE_CPU("Copy");
+ if (IsFileWithRetry(TargetFilePath))
{
- ZEN_ASSERT_SLOW(IsFileWithRetry(TargetFilePath));
+ SetFileReadOnlyWithRetry(TargetFilePath, false);
}
else
{
- ZEN_TRACE_CPU("Copy");
- if (IsFileWithRetry(TargetFilePath))
- {
- SetFileReadOnlyWithRetry(TargetFilePath, false);
- }
- else
- {
- CreateDirectories(TargetFilePath.parent_path());
- }
-
- ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath));
- ZEN_DEBUG("Copying from '{}' -> '{}'", FirstTargetFilePath, TargetFilePath);
- CopyFile(FirstTargetFilePath, TargetFilePath, {.EnableClone = false});
- RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++;
+ CreateDirectories(TargetFilePath.parent_path());
}
- OutLocalFolderState.Paths[RemotePathIndex] = TargetPath;
- OutLocalFolderState.RawSizes[RemotePathIndex] = RemoteContent.RawSizes[RemotePathIndex];
+ ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath));
+ ZEN_DEBUG("Copying from '{}' -> '{}'", FirstTargetFilePath, TargetFilePath);
+ CopyFile(FirstTargetFilePath, TargetFilePath, {.EnableClone = false});
+ RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++;
+ }
- OutLocalFolderState.Attributes[RemotePathIndex] =
- RemoteContent.Attributes.empty()
- ? GetNativeFileAttributes(TargetFilePath)
- : SetNativeFileAttributes(TargetFilePath,
- RemoteContent.Platform,
- RemoteContent.Attributes[RemotePathIndex]);
- OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath);
+ OutLocalFolderState.Paths[RemotePathIndex] = TargetPath;
+ OutLocalFolderState.RawSizes[RemotePathIndex] = RemoteContent.RawSizes[RemotePathIndex];
- TargetOffset++;
- TargetsComplete++;
- }
+ OutLocalFolderState.Attributes[RemotePathIndex] =
+ RemoteContent.Attributes.empty() ? GetNativeFileAttributes(TargetFilePath)
+ : SetNativeFileAttributes(TargetFilePath,
+ RemoteContent.Platform,
+ RemoteContent.Attributes[RemotePathIndex]);
+ OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath);
+
+ TargetOffset++;
+ TargetsComplete++;
}
}
- },
- Work.DefaultErrorFunction());
+ }
+ });
TargetOffset += TargetCount;
}
@@ -10470,7 +10402,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
return true;
};
- ParallellWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag);
uint32_t Randomizer = 0;
auto FileSizeIt = DownloadContent.FileSizes.begin();
@@ -10512,8 +10444,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
SetFileReadOnly(FilePath, true);
}
}
- },
- Work.DefaultErrorFunction());
+ });
}
}
break;
@@ -10677,14 +10608,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
return 0;
}
}
- catch (const ParallellWorkException& Ex)
- {
- for (const std::string& Error : Ex.m_Errors)
- {
- ZEN_ERROR("{}", Error);
- }
- return 3;
- }
catch (const std::exception& Ex)
{
ZEN_ERROR("{}", Ex.what());
diff --git a/src/zen/cmds/wipe_cmd.cpp b/src/zen/cmds/wipe_cmd.cpp
index 269f95417..9dfdca0a1 100644
--- a/src/zen/cmds/wipe_cmd.cpp
+++ b/src/zen/cmds/wipe_cmd.cpp
@@ -8,7 +8,7 @@
#include <zencore/string.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
-#include <zenutil/parallellwork.h>
+#include <zenutil/parallelwork.h>
#include <zenutil/workerpools.h>
#include <signal.h>
@@ -248,7 +248,7 @@ namespace {
return Added;
};
- ParallellWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag);
struct AsyncVisitor : public GetDirectoryContentVisitor
{
diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp
index c1df6d53e..f71397922 100644
--- a/src/zencore/filesystem.cpp
+++ b/src/zencore/filesystem.cpp
@@ -2322,12 +2322,17 @@ GetDirectoryContent(const std::filesystem::path& RootDir,
RelativeRoot = RelativeRoot / DirectoryName]() {
ZEN_ASSERT(Visitor);
auto _ = MakeGuard([&]() { PendingWorkCount->CountDown(); });
+ try
{
MultithreadedVisitor SubVisitor(*WorkerPool, *PendingWorkCount, RelativeRoot, Flags, Visitor);
FileSystemTraversal Traversal;
Traversal.TraverseFileSystem(Path, SubVisitor);
Visitor->AsyncVisitDirectory(SubVisitor.RelativeRoot, std::move(SubVisitor.Content));
}
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed scheduling work to scan subfolder '{}'. Reason: '{}'", Path / RelativeRoot, Ex.what());
+ }
});
}
catch (const std::exception Ex)
diff --git a/src/zenutil/buildstoragecache.cpp b/src/zenutil/buildstoragecache.cpp
index f273ac699..88238effd 100644
--- a/src/zenutil/buildstoragecache.cpp
+++ b/src/zenutil/buildstoragecache.cpp
@@ -338,7 +338,7 @@ public:
return {};
}
- virtual void Flush(int32_t UpdateInteralMS, std::function<bool(intptr_t Remaining)>&& UpdateCallback) override
+ virtual void Flush(int32_t UpdateIntervalMS, std::function<bool(intptr_t Remaining)>&& UpdateCallback) override
{
if (IsFlushed)
{
@@ -358,7 +358,7 @@ public:
intptr_t Remaining = m_PendingBackgroundWorkCount.Remaining();
if (UpdateCallback(Remaining))
{
- if (m_PendingBackgroundWorkCount.Wait(UpdateInteralMS))
+ if (m_PendingBackgroundWorkCount.Wait(UpdateIntervalMS))
{
UpdateCallback(0);
return;
diff --git a/src/zenutil/chunkedcontent.cpp b/src/zenutil/chunkedcontent.cpp
index 17b348f8d..ae129324e 100644
--- a/src/zenutil/chunkedcontent.cpp
+++ b/src/zenutil/chunkedcontent.cpp
@@ -11,7 +11,7 @@
#include <zenutil/chunkedfile.h>
#include <zenutil/chunkingcontroller.h>
-#include <zenutil/parallellwork.h>
+#include <zenutil/parallelwork.h>
#include <zenutil/workerpools.h>
ZEN_THIRD_PARTY_INCLUDES_START
@@ -378,7 +378,7 @@ GetFolderContent(GetFolderContentStatistics& Stats,
std::function<bool(const std::string_view& RelativePath)>&& AcceptDirectory,
std::function<bool(std::string_view RelativePath, uint64_t Size, uint32_t Attributes)>&& AcceptFile,
WorkerThreadPool& WorkerPool,
- int32_t UpdateInteralMS,
+ int32_t UpdateIntervalMS,
std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback,
std::atomic<bool>& AbortFlag)
{
@@ -467,7 +467,7 @@ GetFolderContent(GetFolderContentStatistics& Stats,
WorkerPool,
PendingWork);
PendingWork.CountDown();
- while (!PendingWork.Wait(UpdateInteralMS))
+ while (!PendingWork.Wait(UpdateIntervalMS))
{
UpdateCallback(AbortFlag.load(), PendingWork.Remaining());
}
@@ -731,7 +731,7 @@ ChunkFolderContent(ChunkingStatistics& Stats,
const std::filesystem::path& RootPath,
const FolderContent& Content,
const ChunkingController& InChunkingController,
- int32_t UpdateInteralMS,
+ int32_t UpdateIntervalMS,
std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback,
std::atomic<bool>& AbortFlag)
{
@@ -772,7 +772,7 @@ ChunkFolderContent(ChunkingStatistics& Stats,
RwLock Lock;
- ParallellWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag);
for (uint32_t PathIndex : Order)
{
@@ -780,28 +780,26 @@ ChunkFolderContent(ChunkingStatistics& Stats,
{
break;
}
- Work.ScheduleWork(
- WorkerPool, // GetSyncWorkerPool()
- [&, PathIndex](std::atomic<bool>& AbortFlag) {
- if (!AbortFlag)
- {
- IoHash RawHash = HashOneFile(Stats,
- InChunkingController,
- Result,
- ChunkHashToChunkIndex,
- RawHashToSequenceRawHashIndex,
- Lock,
- RootPath,
- PathIndex,
- AbortFlag);
- Lock.WithExclusiveLock([&]() { Result.RawHashes[PathIndex] = RawHash; });
- Stats.FilesProcessed++;
- }
- },
- Work.DefaultErrorFunction());
- }
-
- Work.Wait(UpdateInteralMS, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
+ Work.ScheduleWork(WorkerPool, // GetSyncWorkerPool()
+ [&, PathIndex](std::atomic<bool>& AbortFlag) {
+ if (!AbortFlag)
+ {
+ IoHash RawHash = HashOneFile(Stats,
+ InChunkingController,
+ Result,
+ ChunkHashToChunkIndex,
+ RawHashToSequenceRawHashIndex,
+ Lock,
+ RootPath,
+ PathIndex,
+ AbortFlag);
+ Lock.WithExclusiveLock([&]() { Result.RawHashes[PathIndex] = RawHash; });
+ Stats.FilesProcessed++;
+ }
+ });
+ }
+
+ Work.Wait(UpdateIntervalMS, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
ZEN_UNUSED(IsAborted);
ZEN_UNUSED(PendingWork);
UpdateCallback(Work.IsAborted(), Work.PendingWork().Remaining());
diff --git a/src/zenutil/include/zenutil/buildstoragecache.h b/src/zenutil/include/zenutil/buildstoragecache.h
index cab35328d..e1fb73fd4 100644
--- a/src/zenutil/include/zenutil/buildstoragecache.h
+++ b/src/zenutil/include/zenutil/buildstoragecache.h
@@ -44,7 +44,7 @@ public:
virtual std::vector<BlobExistsResult> BlobsExists(const Oid& BuildId, std::span<const IoHash> BlobHashes) = 0;
virtual void Flush(
- int32_t UpdateInteralMS,
+ int32_t UpdateIntervalMS,
std::function<bool(intptr_t Remaining)>&& UpdateCallback = [](intptr_t) { return true; }) = 0;
};
diff --git a/src/zenutil/include/zenutil/chunkedcontent.h b/src/zenutil/include/zenutil/chunkedcontent.h
index 57b55cb8e..d33869be2 100644
--- a/src/zenutil/include/zenutil/chunkedcontent.h
+++ b/src/zenutil/include/zenutil/chunkedcontent.h
@@ -67,7 +67,7 @@ FolderContent GetFolderContent(GetFolderContentStatistics& Stats,
std::function<bool(const std::string_view& RelativePath)>&& AcceptDirectory,
std::function<bool(std::string_view RelativePath, uint64_t Size, uint32_t Attributes)>&& AcceptFile,
WorkerThreadPool& WorkerPool,
- int32_t UpdateInteralMS,
+ int32_t UpdateIntervalMS,
std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback,
std::atomic<bool>& AbortFlag);
@@ -116,7 +116,7 @@ ChunkedFolderContent ChunkFolderContent(ChunkingStatistics& Stats,
const std::filesystem::path& RootPath,
const FolderContent& Content,
const ChunkingController& InChunkingController,
- int32_t UpdateInteralMS,
+ int32_t UpdateIntervalMS,
std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback,
std::atomic<bool>& AbortFlag);
diff --git a/src/zenutil/include/zenutil/parallellwork.h b/src/zenutil/include/zenutil/parallellwork.h
deleted file mode 100644
index 8ea77c65d..000000000
--- a/src/zenutil/include/zenutil/parallellwork.h
+++ /dev/null
@@ -1,119 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <zencore/except.h>
-#include <zencore/fmtutils.h>
-#include <zencore/thread.h>
-#include <zencore/workthreadpool.h>
-
-#include <atomic>
-
-class ParallellWorkException : public std::runtime_error
-{
-public:
- explicit ParallellWorkException(std::vector<std::string>&& Errors) : std::runtime_error(Errors.front()), m_Errors(std::move(Errors)) {}
-
- const std::vector<std::string> m_Errors;
-};
-
-namespace zen {
-
-class ParallellWork
-{
-public:
- ParallellWork(std::atomic<bool>& AbortFlag) : m_AbortFlag(AbortFlag), m_PendingWork(1) {}
-
- ~ParallellWork()
- {
- // Make sure to call Wait before destroying
- ZEN_ASSERT(m_PendingWork.Remaining() == 0);
- }
-
- std::function<void(const std::exception& Ex, std::atomic<bool>& AbortFlag)> DefaultErrorFunction()
- {
- return [&](const std::exception& Ex, std::atomic<bool>& AbortFlag) {
- m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex.what()); });
- AbortFlag = true;
- };
- }
-
- void ScheduleWork(WorkerThreadPool& WorkerPool,
- std::function<void(std::atomic<bool>& AbortFlag)>&& Work,
- std::function<void(const std::exception& Ex, std::atomic<bool>& AbortFlag)>&& OnError)
- {
- m_PendingWork.AddCount(1);
- try
- {
- WorkerPool.ScheduleWork([this, Work = std::move(Work), OnError = std::move(OnError)] {
- try
- {
- Work(m_AbortFlag);
- }
- catch (const AssertException& AssertEx)
- {
- OnError(
- std::runtime_error(fmt::format("Caught assert exception while handling request: {}", AssertEx.FullDescription())),
- m_AbortFlag);
- }
- catch (const std::system_error& SystemError)
- {
- if (IsOOM(SystemError.code()))
- {
- OnError(std::runtime_error(fmt::format("Out of memory. Reason: {}", SystemError.what())), m_AbortFlag);
- }
- else if (IsOOD(SystemError.code()))
- {
- OnError(std::runtime_error(fmt::format("Out of disk. Reason: {}", SystemError.what())), m_AbortFlag);
- }
- else
- {
- OnError(std::runtime_error(fmt::format("System error. Reason: {}", SystemError.what())), m_AbortFlag);
- }
- }
- catch (const std::exception& Ex)
- {
- OnError(Ex, m_AbortFlag);
- }
- m_PendingWork.CountDown();
- });
- }
- catch (const std::exception&)
- {
- m_PendingWork.CountDown();
- throw;
- }
- }
-
- void Abort() { m_AbortFlag = true; }
-
- bool IsAborted() const { return m_AbortFlag.load(); }
-
- void Wait(int32_t UpdateInteralMS, std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback)
- {
- ZEN_ASSERT(m_PendingWork.Remaining() > 0);
- m_PendingWork.CountDown();
- while (!m_PendingWork.Wait(UpdateInteralMS))
- {
- UpdateCallback(m_AbortFlag.load(), m_PendingWork.Remaining());
- }
- if (m_Errors.size() == 1)
- {
- throw std::runtime_error(m_Errors.front());
- }
- else if (m_Errors.size() > 1)
- {
- throw ParallellWorkException(std::move(m_Errors));
- }
- }
- Latch& PendingWork() { return m_PendingWork; }
-
-private:
- std::atomic<bool>& m_AbortFlag;
- Latch m_PendingWork;
-
- RwLock m_ErrorLock;
- std::vector<std::string> m_Errors;
-};
-
-} // namespace zen
diff --git a/src/zenutil/include/zenutil/parallelwork.h b/src/zenutil/include/zenutil/parallelwork.h
new file mode 100644
index 000000000..08e730b28
--- /dev/null
+++ b/src/zenutil/include/zenutil/parallelwork.h
@@ -0,0 +1,71 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/thread.h>
+#include <zencore/workthreadpool.h>
+
+#include <atomic>
+
+namespace zen {
+
+class ParallelWork
+{
+public:
+ ParallelWork(std::atomic<bool>& AbortFlag);
+
+ ~ParallelWork();
+
+ typedef std::function<void(std::atomic<bool>& AbortFlag)> WorkCallback;
+ typedef std::function<void(std::exception_ptr Ex, std::atomic<bool>& AbortFlag)> ExceptionCallback;
+ typedef std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)> UpdateCallback;
+
+ void ScheduleWork(WorkerThreadPool& WorkerPool, WorkCallback&& Work, ExceptionCallback&& OnError = {})
+ {
+ m_PendingWork.AddCount(1);
+ try
+ {
+ WorkerPool.ScheduleWork([this, Work = std::move(Work), OnError = OnError ? std::move(OnError) : DefaultErrorFunction()] {
+ try
+ {
+ Work(m_AbortFlag);
+ }
+ catch (...)
+ {
+ OnError(std::current_exception(), m_AbortFlag);
+ }
+ m_PendingWork.CountDown();
+ });
+ }
+ catch (const std::exception&)
+ {
+ m_PendingWork.CountDown();
+ throw;
+ }
+ }
+
+ void Abort() { m_AbortFlag = true; }
+
+ bool IsAborted() const { return m_AbortFlag.load(); }
+
+ void Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback);
+
+ void Wait();
+
+ Latch& PendingWork() { return m_PendingWork; }
+
+private:
+ ExceptionCallback DefaultErrorFunction();
+ void RethrowErrors();
+
+ std::atomic<bool>& m_AbortFlag;
+ bool m_DispatchComplete = false;
+ Latch m_PendingWork;
+
+ RwLock m_ErrorLock;
+ std::vector<std::exception_ptr> m_Errors;
+};
+
+void parallellwork_forcelink();
+
+} // namespace zen
diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp
new file mode 100644
index 000000000..516d70e28
--- /dev/null
+++ b/src/zenutil/parallelwork.cpp
@@ -0,0 +1,192 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenutil/parallelwork.h>
+
+#include <zencore/except.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+
+#include <typeinfo>
+
+#if ZEN_WITH_TESTS
+# include <zencore/testing.h>
+#endif // ZEN_WITH_TESTS
+
+namespace zen {
+
+ParallelWork::ParallelWork(std::atomic<bool>& AbortFlag) : m_AbortFlag(AbortFlag), m_PendingWork(1)
+{
+}
+
+ParallelWork::~ParallelWork()
+{
+ try
+ {
+ if (!m_DispatchComplete)
+ {
+ ZEN_ASSERT(m_PendingWork.Remaining() > 0);
+ ZEN_WARN(
+ "ParallelWork disposed without explicit wait for completion, likely caused by an exception, waiting for dispatched threads "
+ "to complete");
+ m_PendingWork.CountDown();
+ }
+ m_AbortFlag.store(true);
+ m_PendingWork.Wait();
+ ZEN_ASSERT(m_PendingWork.Remaining() == 0);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Exception in ~ParallelWork: {}", Ex.what());
+ }
+}
+
+ParallelWork::ExceptionCallback
+ParallelWork::DefaultErrorFunction()
+{
+ return [&](std::exception_ptr Ex, std::atomic<bool>& AbortFlag) {
+ m_ErrorLock.WithExclusiveLock([&]() { m_Errors.push_back(Ex); });
+ AbortFlag = true;
+ };
+}
+
+void
+ParallelWork::Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback)
+{
+ ZEN_ASSERT(!m_DispatchComplete);
+ m_DispatchComplete = true;
+
+ ZEN_ASSERT(m_PendingWork.Remaining() > 0);
+ m_PendingWork.CountDown();
+
+ while (!m_PendingWork.Wait(UpdateIntervalMS))
+ {
+ UpdateCallback(m_AbortFlag.load(), m_PendingWork.Remaining());
+ }
+
+ RethrowErrors();
+}
+
+void
+ParallelWork::Wait()
+{
+ ZEN_ASSERT(!m_DispatchComplete);
+ m_DispatchComplete = true;
+
+ ZEN_ASSERT(m_PendingWork.Remaining() > 0);
+ m_PendingWork.CountDown();
+ m_PendingWork.Wait();
+
+ RethrowErrors();
+}
+
+void
+ParallelWork::RethrowErrors()
+{
+ if (!m_Errors.empty())
+ {
+ if (m_Errors.size() > 1)
+ {
+ ZEN_INFO("Multiple exceptions throwm during ParallelWork execution, dropping the following exceptions:");
+ auto It = m_Errors.begin() + 1;
+ while (It != m_Errors.end())
+ {
+ try
+ {
+ std::rethrow_exception(*It);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_INFO(" {}", Ex.what());
+ }
+ It++;
+ }
+ }
+ std::exception_ptr Ex = m_Errors.front();
+ m_Errors.clear();
+ std::rethrow_exception(Ex);
+ }
+}
+
+#if ZEN_WITH_TESTS
+
+TEST_CASE("parallellwork.nowork")
+{
+ std::atomic<bool> AbortFlag;
+ ParallelWork Work(AbortFlag);
+ Work.Wait();
+}
+
+TEST_CASE("parallellwork.basic")
+{
+ WorkerThreadPool WorkerPool(2);
+
+ std::atomic<bool> AbortFlag;
+ ParallelWork Work(AbortFlag);
+ for (uint32_t I = 0; I < 5; I++)
+ {
+ Work.ScheduleWork(WorkerPool, [](std::atomic<bool>& AbortFlag) { CHECK(!AbortFlag); });
+ }
+ Work.Wait();
+}
+
+TEST_CASE("parallellwork.throws_in_work")
+{
+ WorkerThreadPool WorkerPool(2);
+
+ std::atomic<bool> AbortFlag;
+ ParallelWork Work(AbortFlag);
+ for (uint32_t I = 0; I < 10; I++)
+ {
+ Work.ScheduleWork(WorkerPool, [I](std::atomic<bool>& AbortFlag) {
+ ZEN_UNUSED(AbortFlag);
+ if (I > 3)
+ {
+ throw std::runtime_error("We throw in async thread");
+ }
+ else
+ {
+ Sleep(10);
+ }
+ });
+ }
+ CHECK_THROWS_WITH(Work.Wait(), "We throw in async thread");
+}
+
+TEST_CASE("parallellwork.throws_in_dispatch")
+{
+ WorkerThreadPool WorkerPool(2);
+ std::atomic<uint32_t> ExecutedCount;
+ try
+ {
+ std::atomic<bool> AbortFlag;
+ ParallelWork Work(AbortFlag);
+ for (uint32_t I = 0; I < 5; I++)
+ {
+ Work.ScheduleWork(WorkerPool, [I, &ExecutedCount](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag.load())
+ {
+ return;
+ }
+ ExecutedCount++;
+ });
+ if (I == 3)
+ {
+ throw std::runtime_error("We throw in dispatcher thread");
+ }
+ }
+ CHECK(false);
+ }
+ catch (const std::runtime_error& Ex)
+ {
+ CHECK_EQ("We throw in dispatcher thread", std::string(Ex.what()));
+ CHECK_LE(ExecutedCount.load(), 4);
+ }
+}
+
+void
+parallellwork_forcelink()
+{
+}
+#endif // ZEN_WITH_TESTS
+
+} // namespace zen
diff --git a/src/zenutil/zenutil.cpp b/src/zenutil/zenutil.cpp
index aff9156f4..fe23b00c1 100644
--- a/src/zenutil/zenutil.cpp
+++ b/src/zenutil/zenutil.cpp
@@ -8,6 +8,7 @@
# include <zenutil/cache/rpcrecording.h>
# include <zenutil/chunkedfile.h>
# include <zenutil/commandlineoptions.h>
+# include <zenutil/parallelwork.h>
namespace zen {
@@ -19,6 +20,7 @@ zenutil_forcelinktests()
cacherequests_forcelink();
chunkedfile_forcelink();
commandlineoptions_forcelink();
+ parallellwork_forcelink();
}
} // namespace zen