aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-03-12 10:51:57 +0100
committerGitHub Enterprise <[email protected]>2025-03-12 10:51:57 +0100
commitfb09d861fd76e459ac86bec388bd406aaca8e681 (patch)
tree2710efa3a2492cf12886f447163fd8b4a939c196 /src
parentasync find blocks (#300) (diff)
downloadzen-fb09d861fd76e459ac86bec388bd406aaca8e681.tar.xz
zen-fb09d861fd76e459ac86bec388bd406aaca8e681.zip
improved block gen logic (#302)
- Improvement: Reduced memory usage during upload and part upload validation - Improvement: Reduced I/O usage during upload and download - Improvement: Faster block regeneration when uploading in response to PutBuild/FinalizeBuild - Improvement: More trace scopes for build upload operations - Bugfix: Fixed crash during download when trying to write outside a file range
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/builds_cmd.cpp1007
-rw-r--r--src/zen/zen.cpp3
-rw-r--r--src/zenutil/chunkedcontent.cpp15
-rw-r--r--src/zenutil/chunkedfile.cpp6
-rw-r--r--src/zenutil/chunkingcontroller.cpp4
-rw-r--r--src/zenutil/filebuildstorage.cpp17
-rw-r--r--src/zenutil/include/zenutil/chunkedcontent.h4
7 files changed, 599 insertions, 457 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index e03175256..baa46dda8 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -191,15 +191,34 @@ namespace {
return SB.ToString();
}
- void CleanDirectory(const std::filesystem::path& Path, std::span<const std::string_view> ExcludeDirectories)
+ bool CleanDirectory(const std::filesystem::path& Path, std::span<const std::string_view> ExcludeDirectories)
{
ZEN_TRACE_CPU("CleanDirectory");
+ bool CleanWipe = true;
+
DirectoryContent LocalDirectoryContent;
GetDirectoryContent(Path, DirectoryContentFlags::IncludeDirs | DirectoryContentFlags::IncludeFiles, LocalDirectoryContent);
for (const std::filesystem::path& LocalFilePath : LocalDirectoryContent.Files)
{
- std::filesystem::remove(LocalFilePath);
+ try
+ {
+ std::filesystem::remove(LocalFilePath);
+ }
+ catch (const std::exception&)
+ {
+ // DeleteOnClose files may be a bit slow in getting cleaned up, so pause amd retry one time
+ Sleep(200);
+ try
+ {
+ std::filesystem::remove(LocalFilePath);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed removing file {}. Reason: {}", LocalFilePath, Ex.what());
+ CleanWipe = false;
+ }
+ }
}
for (const std::filesystem::path& LocalDirPath : LocalDirectoryContent.Directories)
@@ -215,10 +234,28 @@ namespace {
}
if (!Leave)
{
- zen::CleanDirectory(LocalDirPath);
- std::filesystem::remove(LocalDirPath);
+ try
+ {
+ zen::CleanDirectory(LocalDirPath);
+ std::filesystem::remove(LocalDirPath);
+ }
+ catch (const std::exception&)
+ {
+ Sleep(200);
+ try
+ {
+ zen::CleanDirectory(LocalDirPath);
+ std::filesystem::remove(LocalDirPath);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed removing directory {}. Reason: {}", LocalDirPath, Ex.what());
+ CleanWipe = false;
+ }
+ }
}
}
+ return CleanWipe;
}
std::string ReadAccessTokenFromFile(const std::filesystem::path& Path)
@@ -1188,6 +1225,7 @@ namespace {
uint64_t& OutCompressedSize,
uint64_t& OutDecompressedSize)
{
+ ZEN_TRACE_CPU("ValidateBlob");
IoBuffer Payload = Storage.GetBuildBlob(BuildId, BlobHash);
if (!Payload)
{
@@ -1214,6 +1252,7 @@ namespace {
const IoHash& ChunkHash,
ReadFileCache& OpenFileCache)
{
+ ZEN_TRACE_CPU("FetchChunk");
auto It = Lookup.ChunkHashToChunkIndex.find(ChunkHash);
ZEN_ASSERT(It != Lookup.ChunkHashToChunkIndex.end());
uint32_t ChunkIndex = It->second;
@@ -1233,6 +1272,7 @@ namespace {
ChunkBlockDescription& OutBlockDescription,
DiskStatistics& DiskStats)
{
+ ZEN_TRACE_CPU("GenerateBlock");
ReadFileCache OpenFileCache(DiskStats, Path, Content, Lookup, 4);
std::vector<std::pair<IoHash, FetchChunkFunc>> BlockContent;
@@ -1255,6 +1295,103 @@ namespace {
return GenerateChunkBlock(std::move(BlockContent), OutBlockDescription);
};
+ CompressedBuffer RebuildBlock(const std::filesystem::path& Path,
+ const ChunkedFolderContent& Content,
+ const ChunkedContentLookup& Lookup,
+ CompositeBuffer&& HeaderBuffer,
+ const std::vector<uint32_t>& ChunksInBlock,
+ DiskStatistics& DiskStats)
+ {
+ ZEN_TRACE_CPU("RebuildBlock");
+ ReadFileCache OpenFileCache(DiskStats, Path, Content, Lookup, 4);
+
+ std::vector<SharedBuffer> ResultBuffers;
+ ResultBuffers.reserve(HeaderBuffer.GetSegments().size() + ChunksInBlock.size());
+ ResultBuffers.insert(ResultBuffers.end(), HeaderBuffer.GetSegments().begin(), HeaderBuffer.GetSegments().end());
+ for (uint32_t ChunkIndex : ChunksInBlock)
+ {
+ std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkLocations = GetChunkSequenceLocations(Lookup, ChunkIndex);
+ ZEN_ASSERT(!ChunkLocations.empty());
+ CompositeBuffer Chunk = OpenFileCache.GetRange(ChunkLocations[0].SequenceIndex,
+ ChunkLocations[0].Offset,
+ Content.ChunkedContent.ChunkRawSizes[ChunkIndex]);
+ ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == Content.ChunkedContent.ChunkHashes[ChunkIndex]);
+ CompositeBuffer CompressedChunk =
+ CompressedBuffer::Compress(std::move(Chunk), OodleCompressor::Mermaid, OodleCompressionLevel::None).GetCompressed();
+ ResultBuffers.insert(ResultBuffers.end(), CompressedChunk.GetSegments().begin(), CompressedChunk.GetSegments().end());
+ }
+ return CompressedBuffer::FromCompressedNoValidate(CompositeBuffer(std::move(ResultBuffers)));
+ };
+
+ void DownloadLargeBlob(BuildStorage& Storage,
+ const std::filesystem::path& DownloadFolder,
+ const Oid& BuildId,
+ const IoHash& ChunkHash,
+ const std::uint64_t PreferredMultipartChunkSize,
+ ParallellWork& Work,
+ WorkerThreadPool& NetworkPool,
+ std::atomic<uint64_t>& BytesDownloaded,
+ std::atomic<uint64_t>& MultipartAttachmentCount,
+ std::function<void(IoBuffer&& Payload)>&& OnDownloadComplete)
+ {
+ ZEN_TRACE_CPU("DownloadLargeBlob");
+
+ struct WorkloadData
+ {
+ TemporaryFile TempFile;
+ };
+ std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>());
+
+ std::error_code Ec;
+ Workload->TempFile.CreateTemporary(DownloadFolder, Ec);
+ if (Ec)
+ {
+ throw std::runtime_error(
+ fmt::format("Failed opening temporary file '{}': {} ({})", Workload->TempFile.GetPath(), Ec.message(), Ec.value()));
+ }
+ std::vector<std::function<void()>> WorkItems = Storage.GetLargeBuildBlob(
+ BuildId,
+ ChunkHash,
+ PreferredMultipartChunkSize,
+ [Workload, &BytesDownloaded, OnDownloadComplete = std::move(OnDownloadComplete)](uint64_t Offset,
+ const IoBuffer& Chunk,
+ uint64_t BytesRemaining) {
+ BytesDownloaded += Chunk.GetSize();
+
+ if (!AbortFlag.load())
+ {
+ ZEN_TRACE_CPU("DownloadLargeBlob_Save");
+ Workload->TempFile.Write(Chunk.GetView(), Offset);
+ if (Chunk.GetSize() == BytesRemaining)
+ {
+ uint64_t PayloadSize = Workload->TempFile.FileSize();
+ void* FileHandle = Workload->TempFile.Detach();
+ ZEN_ASSERT(FileHandle != nullptr);
+ IoBuffer Payload(IoBuffer::File, FileHandle, 0, PayloadSize, true);
+ Payload.SetDeleteOnClose(true);
+ OnDownloadComplete(std::move(Payload));
+ }
+ }
+ });
+ if (!WorkItems.empty())
+ {
+ MultipartAttachmentCount++;
+ }
+ for (auto& WorkItem : WorkItems)
+ {
+ Work.ScheduleWork(
+ NetworkPool, // GetSyncWorkerPool(),//
+ [WorkItem = std::move(WorkItem)](std::atomic<bool>&) {
+ ZEN_TRACE_CPU("DownloadLargeBlob_Work");
+ if (!AbortFlag)
+ {
+ WorkItem();
+ }
+ },
+ Work.DefaultErrorFunction());
+ }
+ }
+
void ValidateBuildPart(BuildStorage& Storage, const Oid& BuildId, Oid BuildPartId, const std::string_view BuildPartName)
{
Stopwatch Timer;
@@ -1274,6 +1411,11 @@ namespace {
throw std::runtime_error(fmt::format("Build {} does not have a part named '{}'", BuildId, BuildPartName));
}
}
+ uint64_t PreferredMultipartChunkSize = DefaultPreferredMultipartChunkSize;
+ if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0)
+ {
+ PreferredMultipartChunkSize = ChunkSize;
+ }
CbObject BuildPart = Storage.GetBuildPart(BuildId, BuildPartId);
ZEN_CONSOLE("Validating build part {}/{} ({})", BuildId, BuildPartId, NiceBytes(BuildPart.GetSize()));
std::vector<IoHash> ChunkAttachments;
@@ -1295,9 +1437,20 @@ namespace {
}
WorkerThreadPool& NetworkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
+ WorkerThreadPool& ReadPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
WorkerThreadPool& VerifyPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); //
ParallellWork Work(AbortFlag);
+ const std::filesystem::path TempFolder = ".zen-tmp";
+
+ CreateDirectories(TempFolder);
+ auto __ = MakeGuard([&TempFolder]() {
+ if (CleanDirectory(TempFolder, {}))
+ {
+ std::filesystem::remove(TempFolder);
+ }
+ });
+
ProgressBar ProgressBar(UsePlainProgress);
uint64_t AttachmentsToVerifyCount = ChunkAttachments.size() + BlockAttachments.size();
@@ -1308,51 +1461,58 @@ namespace {
FilteredRate FilteredDownloadedBytesPerSecond;
FilteredRate FilteredVerifiedBytesPerSecond;
+ std::atomic<uint64_t> MultipartAttachmentCount = 0;
+
for (const IoHash& ChunkAttachment : ChunkAttachments)
{
Work.ScheduleWork(
- NetworkPool,
+ ReadPool,
[&, ChunkAttachment](std::atomic<bool>&) {
if (!AbortFlag)
{
- FilteredDownloadedBytesPerSecond.Start();
- IoBuffer Payload = Storage.GetBuildBlob(BuildId, ChunkAttachment);
- DownloadedAttachmentCount++;
- DownloadedByteCount += Payload.GetSize();
- if (DownloadedAttachmentCount.load() == AttachmentsToVerifyCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
- if (!Payload)
- {
- throw std::runtime_error(fmt::format("Chunk attachment {} could not be found", ChunkAttachment));
- }
- if (!AbortFlag)
- {
- Work.ScheduleWork(
- VerifyPool,
- [&, Payload = std::move(Payload), ChunkAttachment](std::atomic<bool>&) mutable {
- if (!AbortFlag)
- {
- FilteredVerifiedBytesPerSecond.Start();
+ ZEN_TRACE_CPU("ValidateBuildPart_GetChunk");
- uint64_t CompressedSize;
- uint64_t DecompressedSize;
- ValidateBlob(std::move(Payload), ChunkAttachment, CompressedSize, DecompressedSize);
- ZEN_CONSOLE_VERBOSE("Chunk attachment {} ({} -> {}) is valid",
- ChunkAttachment,
- NiceBytes(CompressedSize),
- NiceBytes(DecompressedSize));
- VerifiedAttachmentCount++;
- VerifiedByteCount += DecompressedSize;
- if (VerifiedAttachmentCount.load() == AttachmentsToVerifyCount)
- {
- FilteredVerifiedBytesPerSecond.Stop();
- }
- }
- },
- Work.DefaultErrorFunction());
- }
+ FilteredDownloadedBytesPerSecond.Start();
+ DownloadLargeBlob(Storage,
+ TempFolder,
+ BuildId,
+ ChunkAttachment,
+ PreferredMultipartChunkSize,
+ Work,
+ NetworkPool,
+ DownloadedByteCount,
+ MultipartAttachmentCount,
+ [&, ChunkHash = ChunkAttachment](IoBuffer&& Payload) {
+ Payload.SetContentType(ZenContentType::kCompressedBinary);
+ if (!AbortFlag)
+ {
+ Work.ScheduleWork(
+ VerifyPool,
+ [&, Payload = std::move(Payload), ChunkHash](std::atomic<bool>&) mutable {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("ValidateBuildPart_Validate");
+
+ FilteredVerifiedBytesPerSecond.Start();
+
+ uint64_t CompressedSize;
+ uint64_t DecompressedSize;
+ ValidateBlob(std::move(Payload), ChunkHash, CompressedSize, DecompressedSize);
+ ZEN_CONSOLE_VERBOSE("Chunk attachment {} ({} -> {}) is valid",
+ ChunkHash,
+ NiceBytes(CompressedSize),
+ NiceBytes(DecompressedSize));
+ VerifiedAttachmentCount++;
+ VerifiedByteCount += DecompressedSize;
+ if (VerifiedAttachmentCount.load() == AttachmentsToVerifyCount)
+ {
+ FilteredVerifiedBytesPerSecond.Stop();
+ }
+ }
+ },
+ Work.DefaultErrorFunction());
+ }
+ });
}
},
Work.DefaultErrorFunction());
@@ -1365,6 +1525,8 @@ namespace {
[&, BlockAttachment](std::atomic<bool>&) {
if (!AbortFlag)
{
+ ZEN_TRACE_CPU("ValidateBuildPart_GetBlock");
+
FilteredDownloadedBytesPerSecond.Start();
IoBuffer Payload = Storage.GetBuildBlob(BuildId, BlockAttachment);
DownloadedAttachmentCount++;
@@ -1384,6 +1546,8 @@ namespace {
[&, Payload = std::move(Payload), BlockAttachment](std::atomic<bool>&) mutable {
if (!AbortFlag)
{
+ ZEN_TRACE_CPU("ValidateBuildPart_ValidateBlock");
+
FilteredVerifiedBytesPerSecond.Start();
uint64_t CompressedSize;
@@ -1442,6 +1606,7 @@ namespace {
std::vector<uint32_t>& ChunkIndexes,
std::vector<std::vector<uint32_t>>& OutBlocks)
{
+ ZEN_TRACE_CPU("ArrangeChunksIntoBlocks");
std::sort(ChunkIndexes.begin(), ChunkIndexes.end(), [&Content, &Lookup](uint32_t Lhs, uint32_t Rhs) {
const ChunkedContentLookup::ChunkSequenceLocation& LhsLocation = GetChunkSequenceLocations(Lookup, Lhs)[0];
const ChunkedContentLookup::ChunkSequenceLocation& RhsLocation = GetChunkSequenceLocations(Lookup, Rhs)[0];
@@ -1535,6 +1700,7 @@ namespace {
uint32_t ChunkIndex,
const std::filesystem::path& TempFolderPath)
{
+ ZEN_TRACE_CPU("CompressChunk");
ZEN_ASSERT(!TempFolderPath.empty());
const IoHash& ChunkHash = Content.ChunkedContent.ChunkHashes[ChunkIndex];
const uint64_t ChunkSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex];
@@ -1608,7 +1774,7 @@ namespace {
{
std::vector<ChunkBlockDescription> BlockDescriptions;
std::vector<uint64_t> BlockSizes;
- std::vector<CompositeBuffer> BlockBuffers;
+ std::vector<CompositeBuffer> BlockHeaders;
std::vector<CbObject> BlockMetaDatas;
std::vector<bool> MetaDataHasBeenUploaded;
tsl::robin_map<IoHash, size_t, IoHash::Hasher> BlockHashToBlockIndex;
@@ -1625,6 +1791,7 @@ namespace {
UploadStatistics& UploadStats,
GenerateBlocksStatistics& GenerateBlocksStats)
{
+ ZEN_TRACE_CPU("GenerateBuildBlocks");
const std::size_t NewBlockCount = NewBlockChunks.size();
if (NewBlockCount > 0)
{
@@ -1632,22 +1799,23 @@ namespace {
OutBlocks.BlockDescriptions.resize(NewBlockCount);
OutBlocks.BlockSizes.resize(NewBlockCount);
- OutBlocks.BlockBuffers.resize(NewBlockCount);
OutBlocks.BlockMetaDatas.resize(NewBlockCount);
+ OutBlocks.BlockHeaders.resize(NewBlockCount);
OutBlocks.MetaDataHasBeenUploaded.resize(NewBlockCount, false);
OutBlocks.BlockHashToBlockIndex.reserve(NewBlockCount);
RwLock Lock;
- WorkerThreadPool& GenerateBlobsPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool();//
- WorkerThreadPool& UploadBlocksPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool();//
+ WorkerThreadPool& GenerateBlobsPool =
+ GetMediumWorkerPool(EWorkloadType::Burst); // GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool();//
+ WorkerThreadPool& UploadBlocksPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool();//
FilteredRate FilteredGeneratedBytesPerSecond;
FilteredRate FilteredUploadedBytesPerSecond;
ParallellWork Work(AbortFlag);
- std::atomic<uint32_t> PendingUploadCount(0);
+ std::atomic<uint64_t> QueuedPendingBlocksForUpload = 0;
for (size_t BlockIndex = 0; BlockIndex < NewBlockCount; BlockIndex++)
{
@@ -1661,6 +1829,8 @@ namespace {
[&, BlockIndex](std::atomic<bool>&) {
if (!AbortFlag)
{
+ ZEN_TRACE_CPU("GenerateBuildBlocks_Generate");
+
FilteredGeneratedBytesPerSecond.Start();
// TODO: Convert ScheduleWork body to function
@@ -1672,14 +1842,6 @@ namespace {
OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size());
OutBlocks.BlockSizes[BlockIndex] = CompressedBlock.GetCompressedSize();
-
- if (!IsBufferDiskBased(CompressedBlock.GetCompressed()))
- {
- IoBuffer TempPayload = WriteToTempFile(std::move(CompressedBlock).GetCompressed(),
- Path / ZenTempBlockFolderName,
- OutBlocks.BlockDescriptions[BlockIndex].BlockHash);
- CompressedBlock = CompressedBuffer::FromCompressedNoValidate(std::move(TempPayload));
- }
{
CbObjectWriter Writer;
Writer.AddString("createdBy", "zen");
@@ -1693,66 +1855,88 @@ namespace {
BlockIndex);
});
+ {
+ std::span<const SharedBuffer> Segments = CompressedBlock.GetCompressed().GetSegments();
+ ZEN_ASSERT(Segments.size() >= 2);
+ OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]);
+ }
+
if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount)
{
FilteredGeneratedBytesPerSecond.Stop();
}
- if (!AbortFlag)
+ if (QueuedPendingBlocksForUpload.load() > 16)
{
- PendingUploadCount++;
- Work.ScheduleWork(
- UploadBlocksPool,
- [&, BlockIndex, Payload = std::move(CompressedBlock).GetCompressed()](std::atomic<bool>&) mutable {
- if (!AbortFlag)
- {
- if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount)
- {
- FilteredUploadedBytesPerSecond.Stop();
- OutBlocks.BlockBuffers[BlockIndex] = std::move(Payload);
- }
- else
+ std::span<const SharedBuffer> Segments = CompressedBlock.GetCompressed().GetSegments();
+ ZEN_ASSERT(Segments.size() >= 2);
+ OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]);
+ }
+ else
+ {
+ if (!AbortFlag)
+ {
+ QueuedPendingBlocksForUpload++;
+
+ Work.ScheduleWork(
+ UploadBlocksPool,
+ [&, BlockIndex, Payload = std::move(CompressedBlock)](std::atomic<bool>&) mutable {
+ auto _ = MakeGuard([&QueuedPendingBlocksForUpload] { QueuedPendingBlocksForUpload--; });
+ if (!AbortFlag)
{
- FilteredUploadedBytesPerSecond.Start();
- // TODO: Convert ScheduleWork body to function
-
- PendingUploadCount--;
-
- const CbObject BlockMetaData =
- BuildChunkBlockDescription(OutBlocks.BlockDescriptions[BlockIndex],
- OutBlocks.BlockMetaDatas[BlockIndex]);
-
- const IoHash& BlockHash = OutBlocks.BlockDescriptions[BlockIndex].BlockHash;
- const uint64_t CompressedBlockSize = Payload.GetSize();
- Storage.PutBuildBlob(BuildId,
- BlockHash,
- ZenContentType::kCompressedBinary,
- std::move(Payload));
- UploadStats.BlocksBytes += CompressedBlockSize;
- ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks",
- OutBlocks.BlockDescriptions[BlockIndex].BlockHash,
- NiceBytes(CompressedBlockSize),
- OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size());
-
- Storage.PutBlockMetadata(BuildId,
- OutBlocks.BlockDescriptions[BlockIndex].BlockHash,
- BlockMetaData);
- ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})",
- OutBlocks.BlockDescriptions[BlockIndex].BlockHash,
- NiceBytes(BlockMetaData.GetSize()));
-
- OutBlocks.MetaDataHasBeenUploaded[BlockIndex] = true;
-
- UploadStats.BlocksBytes += BlockMetaData.GetSize();
- UploadStats.BlockCount++;
- if (UploadStats.BlockCount == NewBlockCount)
+ if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount)
{
+ ZEN_TRACE_CPU("GenerateBuildBlocks_Save");
+
FilteredUploadedBytesPerSecond.Stop();
+ std::span<const SharedBuffer> Segments = Payload.GetCompressed().GetSegments();
+ ZEN_ASSERT(Segments.size() >= 2);
+ OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]);
+ }
+ else
+ {
+ ZEN_TRACE_CPU("GenerateBuildBlocks_Upload");
+
+ FilteredUploadedBytesPerSecond.Start();
+ // TODO: Convert ScheduleWork body to function
+
+ const CbObject BlockMetaData =
+ BuildChunkBlockDescription(OutBlocks.BlockDescriptions[BlockIndex],
+ OutBlocks.BlockMetaDatas[BlockIndex]);
+
+ const IoHash& BlockHash = OutBlocks.BlockDescriptions[BlockIndex].BlockHash;
+ const uint64_t CompressedBlockSize = Payload.GetCompressedSize();
+
+ Storage.PutBuildBlob(BuildId,
+ BlockHash,
+ ZenContentType::kCompressedBinary,
+ std::move(Payload).GetCompressed());
+ UploadStats.BlocksBytes += CompressedBlockSize;
+ ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks",
+ OutBlocks.BlockDescriptions[BlockIndex].BlockHash,
+ NiceBytes(CompressedBlockSize),
+ OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size());
+
+ Storage.PutBlockMetadata(BuildId,
+ OutBlocks.BlockDescriptions[BlockIndex].BlockHash,
+ BlockMetaData);
+ ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})",
+ OutBlocks.BlockDescriptions[BlockIndex].BlockHash,
+ NiceBytes(BlockMetaData.GetSize()));
+
+ OutBlocks.MetaDataHasBeenUploaded[BlockIndex] = true;
+
+ UploadStats.BlocksBytes += BlockMetaData.GetSize();
+ UploadStats.BlockCount++;
+ if (UploadStats.BlockCount == NewBlockCount)
+ {
+ FilteredUploadedBytesPerSecond.Stop();
+ }
}
}
- }
- },
- Work.DefaultErrorFunction());
+ },
+ Work.DefaultErrorFunction());
+ }
}
}
},
@@ -1783,6 +1967,8 @@ namespace {
false);
});
+ ZEN_ASSERT(AbortFlag || QueuedPendingBlocksForUpload.load() == 0);
+
ProgressBar.Finish();
GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGeneratedBytesPerSecond.GetElapsedTimeUS();
@@ -1805,6 +1991,7 @@ namespace {
GenerateBlocksStatistics& GenerateBlocksStats,
LooseChunksStatistics& LooseChunksStats)
{
+ ZEN_TRACE_CPU("UploadPartBlobs");
{
ProgressBar ProgressBar(UsePlainProgress);
@@ -1858,12 +2045,37 @@ namespace {
const size_t UploadBlockCount = BlockIndexes.size();
const uint32_t UploadChunkCount = gsl::narrow<uint32_t>(LooseChunkOrderIndexes.size());
- auto AsyncUploadBlock = [&](const size_t BlockIndex, const IoHash BlockHash, CompositeBuffer&& Payload) {
+ auto AsyncUploadBlock = [&](const size_t BlockIndex,
+ const IoHash BlockHash,
+ CompositeBuffer&& Payload,
+ std::atomic<uint64_t>& QueuedPendingInMemoryBlocksForUpload) {
+ bool IsInMemoryBlock = true;
+ if (QueuedPendingInMemoryBlocksForUpload.load() > 16)
+ {
+ ZEN_TRACE_CPU("AsyncUploadBlock_WriteTempBlock");
+ Payload = CompositeBuffer(WriteToTempFile(std::move(Payload), Path / ZenTempBlockFolderName, BlockHash));
+ IsInMemoryBlock = false;
+ }
+ else
+ {
+ QueuedPendingInMemoryBlocksForUpload++;
+ }
+
Work.ScheduleWork(
UploadChunkPool,
- [&, BlockIndex, BlockHash, Payload = std::move(Payload)](std::atomic<bool>&) mutable {
+ [&, IsInMemoryBlock, BlockIndex, BlockHash, Payload = std::move(Payload)](std::atomic<bool>&) mutable {
+ auto _ = MakeGuard([IsInMemoryBlock, &QueuedPendingInMemoryBlocksForUpload] {
+ if (IsInMemoryBlock)
+ {
+ QueuedPendingInMemoryBlocksForUpload--;
+ }
+ });
if (!AbortFlag)
{
+ ZEN_TRACE_CPU("AsyncUploadBlock");
+
+ const uint64_t PayloadSize = Payload.GetSize();
+
FilteredUploadedBytesPerSecond.Start();
const CbObject BlockMetaData =
BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]);
@@ -1871,10 +2083,10 @@ namespace {
Storage.PutBuildBlob(BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload);
ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks",
NewBlocks.BlockDescriptions[BlockIndex].BlockHash,
- NiceBytes(Payload.GetSize()),
+ NiceBytes(PayloadSize),
NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size());
- UploadedBlockSize += Payload.GetSize();
- UploadStats.BlocksBytes += Payload.GetSize();
+ UploadedBlockSize += PayloadSize;
+ UploadStats.BlocksBytes += PayloadSize;
Storage.PutBlockMetadata(BuildId, BlockHash, BlockMetaData);
ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})",
@@ -1902,9 +2114,13 @@ namespace {
[&, RawHash, RawSize, Payload = CompositeBuffer(std::move(Payload))](std::atomic<bool>&) mutable {
if (!AbortFlag)
{
+ ZEN_TRACE_CPU("AsyncUploadLooseChunk");
+
const uint64_t PayloadSize = Payload.GetSize();
+ ;
if (PayloadSize >= LargeAttachmentSize)
{
+ ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart");
UploadStats.MultipartAttachmentCount++;
std::vector<std::function<void()>> MultipartWork = Storage.PutLargeBuildBlob(
BuildId,
@@ -1938,6 +2154,7 @@ namespace {
Work.ScheduleWork(
UploadChunkPool,
[Work = std::move(WorkPart)](std::atomic<bool>&) {
+ ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart_Work");
if (!AbortFlag)
{
Work();
@@ -1949,6 +2166,7 @@ namespace {
}
else
{
+ ZEN_TRACE_CPU("AsyncUploadLooseChunk_Singlepart");
Storage.PutBuildBlob(BuildId, RawHash, ZenContentType::kCompressedBinary, Payload);
ZEN_CONSOLE_VERBOSE("Uploaded chunk {} ({})", RawHash, NiceBytes(PayloadSize));
UploadStats.ChunksBytes += Payload.GetSize();
@@ -1971,26 +2189,10 @@ namespace {
std::atomic<uint64_t> GeneratedBlockCount = 0;
std::atomic<uint64_t> GeneratedBlockByteCount = 0;
- // Start upload of any pre-built blocks
- for (const size_t BlockIndex : BlockIndexes)
- {
- if (CompositeBuffer BlockPayload = std::move(NewBlocks.BlockBuffers[BlockIndex]); BlockPayload)
- {
- const IoHash& BlockHash = NewBlocks.BlockDescriptions[BlockIndex].BlockHash;
- if (!AbortFlag)
- {
- AsyncUploadBlock(BlockIndex, BlockHash, std::move(BlockPayload));
- }
- // GeneratedBlockCount++;
- }
- else
- {
- GenerateBlockIndexes.push_back(BlockIndex);
- }
- }
-
std::vector<uint32_t> CompressLooseChunkOrderIndexes;
+ std::atomic<uint64_t> QueuedPendingInMemoryBlocksForUpload = 0;
+
// Start upload of any pre-compressed loose chunks
for (const uint32_t LooseChunkOrderIndex : LooseChunkOrderIndexes)
{
@@ -1998,31 +2200,43 @@ namespace {
}
// Start generation of any non-prebuilt blocks and schedule upload
- for (const size_t BlockIndex : GenerateBlockIndexes)
+ for (const size_t BlockIndex : BlockIndexes)
{
const IoHash& BlockHash = NewBlocks.BlockDescriptions[BlockIndex].BlockHash;
if (!AbortFlag)
{
Work.ScheduleWork(
- ReadChunkPool,
+ ReadChunkPool, // GetSyncWorkerPool()
[&, BlockIndex](std::atomic<bool>&) {
if (!AbortFlag)
{
+ ZEN_TRACE_CPU("UploadPartBlobs_GenerateBlock");
+
FilteredGenerateBlockBytesPerSecond.Start();
- ChunkBlockDescription BlockDescription;
- CompressedBuffer CompressedBlock =
- GenerateBlock(Path, Content, Lookup, NewBlockChunks[BlockIndex], BlockDescription, DiskStats);
- if (!CompressedBlock)
+
+ CompositeBuffer Payload;
+ if (NewBlocks.BlockHeaders[BlockIndex])
{
- throw std::runtime_error(fmt::format("Failed generating block {}", BlockHash));
+ Payload = RebuildBlock(Path,
+ Content,
+ Lookup,
+ std::move(NewBlocks.BlockHeaders[BlockIndex]),
+ NewBlockChunks[BlockIndex],
+ DiskStats)
+ .GetCompressed();
+ }
+ else
+ {
+ ChunkBlockDescription BlockDescription;
+ CompressedBuffer CompressedBlock =
+ GenerateBlock(Path, Content, Lookup, NewBlockChunks[BlockIndex], BlockDescription, DiskStats);
+ if (!CompressedBlock)
+ {
+ throw std::runtime_error(fmt::format("Failed generating block {}", BlockHash));
+ }
+ ZEN_ASSERT(BlockDescription.BlockHash == BlockHash);
+ Payload = std::move(CompressedBlock).GetCompressed();
}
- ZEN_ASSERT(BlockDescription.BlockHash == BlockHash);
-
- CompositeBuffer Payload = IsBufferDiskBased(CompressedBlock.GetCompressed())
- ? std::move(CompressedBlock).GetCompressed()
- : CompositeBuffer(WriteToTempFile(std::move(CompressedBlock).GetCompressed(),
- Path / ZenTempBlockFolderName,
- BlockDescription.BlockHash));
GenerateBlocksStats.GeneratedBlockByteCount += NewBlocks.BlockSizes[BlockIndex];
GenerateBlocksStats.GeneratedBlockCount++;
@@ -2034,11 +2248,11 @@ namespace {
}
if (!AbortFlag)
{
- AsyncUploadBlock(BlockIndex, BlockHash, std::move(Payload));
+ AsyncUploadBlock(BlockIndex, BlockHash, std::move(Payload), QueuedPendingInMemoryBlocksForUpload);
}
ZEN_CONSOLE_VERBOSE("Regenerated block {} ({}) containing {} chunks",
NewBlocks.BlockDescriptions[BlockIndex].BlockHash,
- NiceBytes(CompressedBlock.GetCompressedSize()),
+ NiceBytes(NewBlocks.BlockSizes[BlockIndex]),
NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size());
}
},
@@ -2059,6 +2273,8 @@ namespace {
[&, ChunkIndex](std::atomic<bool>&) {
if (!AbortFlag)
{
+ ZEN_TRACE_CPU("UploadPartBlobs_CompressChunk");
+
FilteredCompressedBytesPerSecond.Start();
CompositeBuffer Payload = CompressChunk(Path, Content, Lookup, ChunkIndex, Path / ZenTempChunkFolderName);
ZEN_CONSOLE_VERBOSE("Compressed chunk {} ({} -> {})",
@@ -2117,6 +2333,8 @@ namespace {
false);
});
+ ZEN_ASSERT(AbortFlag || QueuedPendingInMemoryBlocksForUpload.load() == 0);
+
ProgressBar.Finish();
UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS();
GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGenerateBlockBytesPerSecond.GetElapsedTimeUS();
@@ -2131,6 +2349,8 @@ namespace {
std::vector<uint32_t>& OutUnusedChunkIndexes,
FindBlocksStatistics& FindBlocksStats)
{
+ ZEN_TRACE_CPU("FindReuseBlocks");
+
// Find all blocks with a usage level higher than MinPercentLimit
// Pick out the blocks with usage higher or equal to MinPercentLimit
// Sort them with highest size usage - most usage first
@@ -2303,8 +2523,10 @@ namespace {
CreateDirectories(ZenTempFolder);
CleanDirectory(ZenTempFolder, {});
auto _ = MakeGuard([&]() {
- CleanDirectory(ZenTempFolder, {});
- std::filesystem::remove(ZenTempFolder);
+ if (CleanDirectory(ZenTempFolder, {}))
+ {
+ std::filesystem::remove(ZenTempFolder);
+ }
});
CreateDirectories(Path / ZenTempBlockFolderName);
CreateDirectories(Path / ZenTempChunkFolderName);
@@ -2327,10 +2549,14 @@ namespace {
GetSmallWorkerPool(EWorkloadType::Burst)
.EnqueueTask(std::packaged_task<PrepareBuildResult()>{
[&Storage, BuildId, &MetaData, CreateBuild, AllowMultiparts, IgnoreExistingBlocks, &FindBlocksStats] {
+ ZEN_TRACE_CPU("PrepareBuild");
+
PrepareBuildResult Result;
Stopwatch Timer;
if (CreateBuild)
{
+ ZEN_TRACE_CPU("CreateBuild");
+
Stopwatch PutBuildTimer;
CbObject PutBuildResult = Storage.PutBuild(BuildId, MetaData);
Result.PrepareBuildTimeMs = PutBuildTimer.GetElapsedTimeMs();
@@ -2339,6 +2565,7 @@ namespace {
}
else
{
+ ZEN_TRACE_CPU("PutBuild");
Stopwatch GetBuildTimer;
CbObject Build = Storage.GetBuild(BuildId);
Result.PrepareBuildTimeMs = GetBuildTimer.GetElapsedTimeMs();
@@ -2356,6 +2583,7 @@ namespace {
if (!IgnoreExistingBlocks)
{
+ ZEN_TRACE_CPU("FindBlocks");
Stopwatch KnownBlocksTimer;
Result.KnownBlocks = Storage.FindBlocks(BuildId);
FindBlocksStats.FindBlockTimeMS = KnownBlocksTimer.GetElapsedTimeMs();
@@ -2523,11 +2751,8 @@ namespace {
ChunkerParameters = ChunkParametersWriter.Save();
}
- std::uint64_t TotalRawSize = 0;
- for (uint64_t RawSize : Content.RawSizes)
- {
- TotalRawSize += RawSize;
- }
+ std::uint64_t TotalRawSize = std::accumulate(Content.RawSizes.begin(), Content.RawSizes.end(), std::uint64_t(0));
+
{
ProgressBar ProgressBar(UsePlainProgress);
FilteredRate FilteredBytesHashed;
@@ -2956,9 +3181,9 @@ namespace {
ForceUploadChunkHashes.push_back(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]);
}
- for (size_t BlockIndex = 0; BlockIndex < NewBlocks.BlockBuffers.size(); BlockIndex++)
+ for (size_t BlockIndex = 0; BlockIndex < NewBlocks.BlockHeaders.size(); BlockIndex++)
{
- if (NewBlocks.BlockBuffers[BlockIndex])
+ if (NewBlocks.BlockHeaders[BlockIndex])
{
// Block was not uploaded during generation
ForceUploadChunkHashes.push_back(NewBlocks.BlockDescriptions[BlockIndex].BlockHash);
@@ -3457,8 +3682,8 @@ namespace {
std::vector<CompositeBuffer> ChunkBuffers;
struct WriteOpData
{
- const ChunkedContentLookup::ChunkSequenceLocation* Target;
- size_t ChunkBufferIndex;
+ const ChunkedContentLookup::ChunkSequenceLocation* Target = nullptr;
+ size_t ChunkBufferIndex = (size_t)-1;
};
std::vector<WriteOpData> WriteOps;
};
@@ -3964,155 +4189,129 @@ namespace {
}
}
- void DownloadLargeBlob(BuildStorage& Storage,
- const std::filesystem::path& Path,
- const ChunkedFolderContent& RemoteContent,
- const ChunkedContentLookup& RemoteLookup,
- const Oid& BuildId,
- const IoHash& ChunkHash,
- const std::uint64_t PreferredMultipartChunkSize,
- const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs,
- std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
- ParallellWork& Work,
- WorkerThreadPool& WritePool,
- WorkerThreadPool& NetworkPool,
- std::atomic<uint64_t>& WriteToDiskBytes,
- std::atomic<uint64_t>& BytesDownloaded,
- std::atomic<uint64_t>& MultipartAttachmentCount,
- std::function<void(uint64_t DowloadedBytes)>&& OnDownloadComplete,
- std::function<void()>&& OnWriteStart,
- std::function<void()>&& OnWriteComplete)
+ void AsyncWriteDownloadedChunk(const std::filesystem::path& Path,
+ const ChunkedFolderContent& RemoteContent,
+ const ChunkedContentLookup& RemoteLookup,
+ uint32_t RemoteChunkIndex,
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>&& ChunkTargetPtrs,
+ ParallellWork& Work,
+ WorkerThreadPool& WritePool,
+ IoBuffer&& Payload,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ std::atomic<uint64_t>& WriteToDiskBytes,
+ std::atomic<uint32_t>& ChunkCountWritten,
+ std::atomic<uint64_t>& WritePartsComplete,
+ std::atomic<uint64_t>& TotalPartWriteCount,
+ std::atomic<uint64_t>& LooseChunksBytes,
+ FilteredRate& FilteredWrittenBytesPerSecond)
{
- ZEN_TRACE_CPU("DownloadLargeBlob");
+ ZEN_TRACE_CPU("AsyncWriteDownloadedChunk");
- struct WorkloadData
+ const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+
+ uint64_t Size = Payload.GetSize();
+ LooseChunksBytes += Size;
+
+ std::filesystem::path CompressedChunkPath;
+
+ // Check if the dowloaded chunk is file based and we can move it directly without rewriting it
{
- TemporaryFile TempFile;
- };
- std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>());
+ IoBufferFileReference FileRef;
+ if (Payload.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == Size))
+ {
+ ZEN_TRACE_CPU("MoveTempChunk");
+ std::error_code Ec;
+ std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
+ if (!Ec)
+ {
+ Payload.SetDeleteOnClose(false);
+ Payload = {};
+ CompressedChunkPath = Path / ZenTempDownloadFolderName / ChunkHash.ToHexString();
+ std::filesystem::rename(TempBlobPath, CompressedChunkPath, Ec);
+ if (Ec)
+ {
+ CompressedChunkPath = std::filesystem::path{};
- std::filesystem::path DownloadFolder = Path / ZenTempDownloadFolderName;
- std::filesystem::path TargetFolder = Path / ZenTempCacheFolderName;
+ // Re-open the temp file again
+ BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
+ Payload = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, Size, true);
+ Payload.SetDeleteOnClose(true);
+ }
+ }
+ }
+ }
- std::error_code Ec;
- Workload->TempFile.CreateTemporary(DownloadFolder, Ec);
- if (Ec)
+ if (CompressedChunkPath.empty() && (Size > 512u * 1024u))
{
- throw std::runtime_error(
- fmt::format("Failed opening temporary file '{}': {} ({})", Workload->TempFile.GetPath(), Ec.message(), Ec.value()));
+ ZEN_TRACE_CPU("WriteTempChunk");
+ // Could not be moved and rather large, lets store it on disk
+ CompressedChunkPath = Path / ZenTempDownloadFolderName / ChunkHash.ToHexString();
+ TemporaryFile::SafeWriteFile(CompressedChunkPath, Payload);
+ Payload = {};
}
- std::vector<std::function<void()>> WorkItems = Storage.GetLargeBuildBlob(
- BuildId,
- ChunkHash,
- PreferredMultipartChunkSize,
- [DownloadFolder,
- TargetFolder,
- &RemoteContent,
- &RemoteLookup,
- &Work,
- &WritePool,
- Workload,
- ChunkHash,
- &BytesDownloaded,
- OnDownloadComplete = std::move(OnDownloadComplete),
- OnWriteComplete = std::move(OnWriteComplete),
- OnWriteStart = std::move(OnWriteStart),
- &WriteToDiskBytes,
+
+ Work.ScheduleWork(
+ WritePool, // GetSyncWorkerPool(),//
+ [&,
SequenceIndexChunksLeftToWriteCounters,
- ChunkTargetPtrs = std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>(
- ChunkTargetPtrs)](uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining) {
- BytesDownloaded += Chunk.GetSize();
+ CompressedChunkPath,
+ RemoteChunkIndex,
+ ChunkTargetPtrs = std::move(ChunkTargetPtrs),
+ CompressedPart = std::move(Payload)](std::atomic<bool>&) mutable {
+ ZEN_TRACE_CPU("UpdateFolder_WriteChunk");
- if (!AbortFlag.load())
+ if (!AbortFlag)
{
- ZEN_TRACE_CPU("DownloadLargeBlob_Save");
- Workload->TempFile.Write(Chunk.GetView(), Offset);
- if (Chunk.GetSize() == BytesRemaining)
+ FilteredWrittenBytesPerSecond.Start();
+
+ const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+ if (CompressedChunkPath.empty())
{
- OnDownloadComplete(Workload->TempFile.FileSize());
-
- Work.ScheduleWork(
- WritePool, // GetSyncWorkerPool(),//
- [DownloadFolder,
- TargetFolder,
- &RemoteContent,
- &RemoteLookup,
- ChunkHash,
- Workload,
- Offset,
- OnWriteComplete = std::move(OnWriteComplete),
- OnWriteStart = std::move(OnWriteStart),
- &WriteToDiskBytes,
- SequenceIndexChunksLeftToWriteCounters,
- ChunkTargetPtrs](std::atomic<bool>&) {
- ZEN_TRACE_CPU("DownloadLargeBlob_Write");
+ ZEN_ASSERT(CompressedPart);
+ }
+ else
+ {
+ ZEN_ASSERT(!CompressedPart);
+ CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath);
+ if (!CompressedPart)
+ {
+ throw std::runtime_error(
+ fmt::format("Could not open dowloaded compressed chunk {} from {}", ChunkHash, CompressedChunkPath));
+ }
+ }
- if (!AbortFlag)
- {
- const std::filesystem::path CompressedChunkPath = DownloadFolder / ChunkHash.ToHexString();
- std::error_code Ec;
- Workload->TempFile.MoveTemporaryIntoPlace(CompressedChunkPath, Ec);
- if (Ec)
- {
- throw std::runtime_error(fmt::format("Failed moving downloaded chunk {} file to {}. Reason: {}",
- ChunkHash,
- CompressedChunkPath,
- Ec.message()));
- }
+ std::filesystem::path TargetFolder = Path / ZenTempCacheFolderName;
- IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath);
- if (!CompressedPart)
- {
- throw std::runtime_error(fmt::format("Could not open dowloaded compressed chunk {} from {}",
- ChunkHash,
- CompressedChunkPath));
- }
+ bool NeedHashVerify = WriteCompressedChunk(TargetFolder,
+ RemoteContent,
+ RemoteLookup,
+ ChunkHash,
+ ChunkTargetPtrs,
+ std::move(CompressedPart),
+ WriteToDiskBytes);
- OnWriteStart();
+ if (!AbortFlag)
+ {
+ ChunkCountWritten++;
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
- bool NeedHashVerify = WriteCompressedChunk(TargetFolder,
- RemoteContent,
- RemoteLookup,
- ChunkHash,
- ChunkTargetPtrs,
- std::move(CompressedPart),
- WriteToDiskBytes);
+ std::filesystem::remove(CompressedChunkPath);
- if (!AbortFlag)
- {
- std::filesystem::remove(CompressedChunkPath);
-
- CompleteChunkTargets(TargetFolder,
- RemoteContent,
- ChunkHash,
- ChunkTargetPtrs,
- SequenceIndexChunksLeftToWriteCounters,
- NeedHashVerify);
- }
- }
- },
- Work.DefaultErrorFunction());
+ CompleteChunkTargets(TargetFolder,
+ RemoteContent,
+ ChunkHash,
+ ChunkTargetPtrs,
+ SequenceIndexChunksLeftToWriteCounters,
+ NeedHashVerify);
}
}
- });
- if (!WorkItems.empty())
- {
- MultipartAttachmentCount++;
- }
- for (auto& WorkItem : WorkItems)
- {
- Work.ScheduleWork(
- NetworkPool, // GetSyncWorkerPool(),//
- [WorkItem = std::move(WorkItem)](std::atomic<bool>&) {
- ZEN_TRACE_CPU("DownloadLargeBlob_Work");
- if (!AbortFlag)
- {
- WorkItem();
- }
- },
- Work.DefaultErrorFunction());
- }
- }
+ },
+ Work.DefaultErrorFunction());
+ };
void UpdateFolder(BuildStorage& Storage,
const Oid& BuildId,
@@ -4291,13 +4490,13 @@ namespace {
// Pick up all chunks in current local state
struct CacheCopyData
{
- uint32_t LocalSequenceIndex;
+ uint32_t LocalSequenceIndex = (uint32_t)-1;
std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> TargetChunkLocationPtrs;
struct ChunkTarget
{
- uint32_t TargetChunkLocationCount;
- uint64_t ChunkRawSize;
- uint64_t CacheFileOffset;
+ uint32_t TargetChunkLocationCount = (uint32_t)-1;
+ uint64_t ChunkRawSize = (uint64_t)-1;
+ uint64_t CacheFileOffset = (uint64_t)-1;
};
std::vector<ChunkTarget> ChunkTargets;
};
@@ -4419,8 +4618,8 @@ namespace {
uint64_t TotalRequestCount = 0;
std::atomic<uint64_t> RequestsComplete = 0;
std::atomic<uint32_t> ChunkCountWritten = 0;
- std::atomic<size_t> TotalPartWriteCount = 0;
- std::atomic<size_t> WritePartsComplete = 0;
+ std::atomic<uint64_t> TotalPartWriteCount = 0;
+ std::atomic<uint64_t> WritePartsComplete = 0;
{
ZEN_TRACE_CPU("WriteChunks");
@@ -4441,7 +4640,7 @@ namespace {
struct LooseChunkHashWorkData
{
std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs;
- uint32_t RemoteChunkIndex;
+ uint32_t RemoteChunkIndex = (uint32_t)-1;
};
std::vector<LooseChunkHashWorkData> LooseChunkHashWorks;
@@ -4687,9 +4886,9 @@ namespace {
struct WriteOp
{
- const ChunkedContentLookup::ChunkSequenceLocation* Target;
- uint64_t CacheFileOffset;
- uint64_t ChunkSize;
+ const ChunkedContentLookup::ChunkSequenceLocation* Target = nullptr;
+ uint64_t CacheFileOffset = (uint64_t)-1;
+ uint64_t ChunkSize = (uint64_t)-1;
};
std::vector<WriteOp> WriteOps;
@@ -4821,10 +5020,11 @@ namespace {
const uint32_t RemoteChunkIndex = LooseChunkHashWork.RemoteChunkIndex;
Work.ScheduleWork(
- NetworkPool, // NetworkPool, // GetSyncWorkerPool(),//
- [&, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) {
+ WritePool, // NetworkPool, // GetSyncWorkerPool(),//
+ [&, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) mutable {
if (!AbortFlag)
{
+ ZEN_TRACE_CPU("UpdateFolder_ReadPreDownloaded");
std::filesystem::path ExistingCompressedChunkPath;
{
const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
@@ -4925,39 +5125,37 @@ namespace {
if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize)
{
ZEN_TRACE_CPU("UpdateFolder_GetLargeChunk");
- DownloadLargeBlob(
- Storage,
- Path,
- RemoteContent,
- RemoteLookup,
- BuildId,
- ChunkHash,
- PreferredMultipartChunkSize,
- ChunkTargetPtrs,
- SequenceIndexChunksLeftToWriteCounters,
- Work,
- WritePool,
- NetworkPool,
- WriteToDiskBytes,
- BytesDownloaded,
- MultipartAttachmentCount,
- [&](uint64_t BytesDownloaded) {
- LooseChunksBytes += BytesDownloaded;
- RequestsComplete++;
- if (RequestsComplete == TotalRequestCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
- },
- [&]() { FilteredWrittenBytesPerSecond.Start(); },
- [&]() {
- ChunkCountWritten++;
- WritePartsComplete++;
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
- }
- });
+ DownloadLargeBlob(Storage,
+ Path / ZenTempDownloadFolderName,
+ BuildId,
+ ChunkHash,
+ PreferredMultipartChunkSize,
+ Work,
+ NetworkPool,
+ BytesDownloaded,
+ MultipartAttachmentCount,
+ [&, RemoteChunkIndex, ChunkTargetPtrs](IoBuffer&& Payload) mutable {
+ RequestsComplete++;
+ if (RequestsComplete == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ AsyncWriteDownloadedChunk(Path,
+ RemoteContent,
+ RemoteLookup,
+ RemoteChunkIndex,
+ std::move(ChunkTargetPtrs),
+ Work,
+ WritePool,
+ std::move(Payload),
+ SequenceIndexChunksLeftToWriteCounters,
+ WriteToDiskBytes,
+ ChunkCountWritten,
+ WritePartsComplete,
+ TotalPartWriteCount,
+ LooseChunksBytes,
+ FilteredWrittenBytesPerSecond);
+ });
}
else
{
@@ -4970,129 +5168,27 @@ namespace {
}
uint64_t BlobSize = BuildBlob.GetSize();
BytesDownloaded += BlobSize;
- LooseChunksBytes += BlobSize;
+
RequestsComplete++;
if (RequestsComplete == TotalRequestCount)
{
FilteredDownloadedBytesPerSecond.Stop();
}
-
- std::filesystem::path CompressedChunkPath;
-
- // Check if the dowloaded file is file based and we can move it directly without rewriting it
- {
- IoBufferFileReference FileRef;
- if (BuildBlob.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) &&
- (FileRef.FileChunkSize == BlobSize))
- {
- ZEN_TRACE_CPU("UpdateFolder_MoveTempChunk");
- std::error_code Ec;
- std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
- if (!Ec)
- {
- BuildBlob.SetDeleteOnClose(false);
- BuildBlob = {};
- CompressedChunkPath = Path / ZenTempDownloadFolderName / ChunkHash.ToHexString();
- std::filesystem::rename(TempBlobPath, CompressedChunkPath, Ec);
- if (Ec)
- {
- CompressedChunkPath = std::filesystem::path{};
-
- // Re-open the temp file again
- BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
- BuildBlob = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlobSize, true);
- BuildBlob.SetDeleteOnClose(true);
- }
- }
- }
- }
-
- if (CompressedChunkPath.empty() && (BlobSize > 512u * 1024u))
- {
- ZEN_TRACE_CPU("UpdateFolder_WriteTempChunk");
- // Could not be moved and rather large, lets store it on disk
- CompressedChunkPath = Path / ZenTempDownloadFolderName / ChunkHash.ToHexString();
- TemporaryFile::SafeWriteFile(CompressedChunkPath, BuildBlob);
- BuildBlob = {};
- }
- DownloadedChunks++;
-
- if (!AbortFlag)
- {
- Work.ScheduleWork(
- WritePool, // WritePool, GetSyncWorkerPool()
- [&Path,
- &RemoteContent,
- &RemoteLookup,
- &CacheFolderPath,
- &SequenceIndexChunksLeftToWriteCounters,
- &WriteToDiskBytes,
- &ChunkCountWritten,
- &WritePartsComplete,
- &TotalPartWriteCount,
- &FilteredWrittenBytesPerSecond,
- RemoteChunkIndex,
- ChunkTargetPtrs,
- CompressedChunkPath,
- CompressedPart = std::move(BuildBlob)](std::atomic<bool>&) mutable {
- if (!AbortFlag)
- {
- ZEN_TRACE_CPU("UpdateFolder_WriteChunk");
-
- FilteredWrittenBytesPerSecond.Start();
-
- const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
- if (CompressedChunkPath.empty())
- {
- ZEN_ASSERT(CompressedPart);
- }
- else
- {
- ZEN_ASSERT(!CompressedPart);
- CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath);
- if (!CompressedPart)
- {
- throw std::runtime_error(
- fmt::format("Could not open dowloaded compressed chunk {} from {}",
- ChunkHash,
- CompressedChunkPath));
- }
- }
-
- std::filesystem::path TargetFolder = Path / ZenTempCacheFolderName;
- bool NeedHashVerify = WriteCompressedChunk(TargetFolder,
- RemoteContent,
- RemoteLookup,
- ChunkHash,
- ChunkTargetPtrs,
- std::move(CompressedPart),
- WriteToDiskBytes);
-
- if (!AbortFlag)
- {
- ChunkCountWritten++;
- WritePartsComplete++;
- if (WritePartsComplete == TotalPartWriteCount)
- {
- FilteredWrittenBytesPerSecond.Stop();
- }
-
- if (!CompressedChunkPath.empty())
- {
- std::filesystem::remove(CompressedChunkPath);
- }
-
- CompleteChunkTargets(TargetFolder,
- RemoteContent,
- ChunkHash,
- ChunkTargetPtrs,
- SequenceIndexChunksLeftToWriteCounters,
- NeedHashVerify);
- }
- }
- },
- Work.DefaultErrorFunction());
- }
+ AsyncWriteDownloadedChunk(Path,
+ RemoteContent,
+ RemoteLookup,
+ RemoteChunkIndex,
+ std::move(ChunkTargetPtrs),
+ Work,
+ WritePool,
+ std::move(BuildBlob),
+ SequenceIndexChunksLeftToWriteCounters,
+ WriteToDiskBytes,
+ ChunkCountWritten,
+ WritePartsComplete,
+ TotalPartWriteCount,
+ LooseChunksBytes,
+ FilteredWrittenBytesPerSecond);
}
}
}
@@ -5375,7 +5471,7 @@ namespace {
WritePool, // WritePool, GetSyncWorkerPool()
[&RemoteContent,
&RemoteLookup,
- &CacheFolderPath,
+ CacheFolderPath,
&RemoteChunkIndexNeedsCopyFromSourceFlags,
&SequenceIndexChunksLeftToWriteCounters,
BlockIndex,
@@ -5550,7 +5646,10 @@ namespace {
// Clean target folder
ZEN_CONSOLE("Wiping {}", Path);
- CleanDirectory(Path, DefaultExcludeFolders);
+ if (!CleanDirectory(Path, DefaultExcludeFolders))
+ {
+ ZEN_WARN("Some files in {} could not be removed", Path);
+ }
}
else
{
@@ -6415,8 +6514,10 @@ namespace {
ZEN_CONSOLE("Downloaded build in {}.", NiceTimeSpanMs(DownloadTimer.GetElapsedTimeMs()));
}
}
- CleanDirectory(ZenTempFolder, {});
- std::filesystem::remove(ZenTempFolder);
+ if (CleanDirectory(ZenTempFolder, {}))
+ {
+ std::filesystem::remove(ZenTempFolder);
+ }
}
void DiffFolders(const std::filesystem::path& BasePath, const std::filesystem::path& ComparePath, bool OnlyChunked)
@@ -7445,7 +7546,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
m_StoragePath = GetRunningExecutablePath().parent_path() / ".tmpstore";
CreateDirectories(m_StoragePath);
- CleanDirectory(m_StoragePath);
+ CleanDirectory(m_StoragePath, {});
}
auto _ = MakeGuard([&]() {
if (m_BuildsUrl.empty() && m_StoragePath.empty())
diff --git a/src/zen/zen.cpp b/src/zen/zen.cpp
index 4e6161e86..0fcf9d871 100644
--- a/src/zen/zen.cpp
+++ b/src/zen/zen.cpp
@@ -577,12 +577,15 @@ main(int argc, char** argv)
GlobalOptions.PassthroughArgs = PassthroughArgs;
GlobalOptions.PassthroughArgV = PassthroughArgV;
+ std::string MemoryOptions;
+
std::string SubCommand = "<None>";
cxxopts::Options Options("zen", "Zen management tool");
Options.add_options()("d, debug", "Enable debugging", cxxopts::value<bool>(GlobalOptions.IsDebug));
Options.add_options()("v, verbose", "Enable verbose logging", cxxopts::value<bool>(GlobalOptions.IsVerbose));
+ Options.add_options()("malloc", "Configure memory allocator subsystem", cxxopts::value(MemoryOptions)->default_value("mimalloc"));
Options.add_options()("help", "Show command line help");
Options.add_options()("c, command", "Sub command", cxxopts::value<std::string>(SubCommand));
diff --git a/src/zenutil/chunkedcontent.cpp b/src/zenutil/chunkedcontent.cpp
index 4ca89d996..bb1ee5183 100644
--- a/src/zenutil/chunkedcontent.cpp
+++ b/src/zenutil/chunkedcontent.cpp
@@ -96,6 +96,8 @@ namespace {
uint32_t PathIndex,
std::atomic<bool>& AbortFlag)
{
+ ZEN_TRACE_CPU("ChunkFolderContent");
+
const uint64_t RawSize = OutChunkedContent.RawSizes[PathIndex];
const std::filesystem::path& Path = OutChunkedContent.Paths[PathIndex];
@@ -136,6 +138,8 @@ namespace {
}
else
{
+ ZEN_TRACE_CPU("HashOnly");
+
IoBuffer Buffer = IoBufferBuilder::MakeFromFile((FolderPath / Path).make_preferred());
const IoHash Hash = IoHash::HashBuffer(Buffer, &Stats.BytesHashed);
@@ -228,6 +232,7 @@ FolderContent::operator==(const FolderContent& Rhs) const
bool
FolderContent::AreKnownFilesEqual(const FolderContent& Rhs) const
{
+ ZEN_TRACE_CPU("FolderContent::AreKnownFilesEqual");
tsl::robin_map<std::string, size_t> RhsPathToIndex;
const size_t RhsPathCount = Rhs.Paths.size();
RhsPathToIndex.reserve(RhsPathCount);
@@ -259,6 +264,7 @@ FolderContent::AreKnownFilesEqual(const FolderContent& Rhs) const
void
FolderContent::UpdateState(const FolderContent& Rhs, std::vector<uint32_t>& OutPathIndexesOufOfDate)
{
+ ZEN_TRACE_CPU("FolderContent::UpdateState");
tsl::robin_map<std::string, uint32_t> RhsPathToIndex;
const uint32_t RhsPathCount = gsl::narrow<uint32_t>(Rhs.Paths.size());
RhsPathToIndex.reserve(RhsPathCount);
@@ -297,6 +303,7 @@ FolderContent::UpdateState(const FolderContent& Rhs, std::vector<uint32_t>& OutP
FolderContent
GetUpdatedContent(const FolderContent& Old, const FolderContent& New, std::vector<std::filesystem::path>& OutDeletedPathIndexes)
{
+ ZEN_TRACE_CPU("FolderContent::GetUpdatedContent");
FolderContent Result = {.Platform = Old.Platform};
tsl::robin_map<std::string, uint32_t> NewPathToIndex;
const uint32_t NewPathCount = gsl::narrow<uint32_t>(New.Paths.size());
@@ -332,6 +339,7 @@ GetUpdatedContent(const FolderContent& Old, const FolderContent& New, std::vecto
void
SaveFolderContentToCompactBinary(const FolderContent& Content, CbWriter& Output)
{
+ ZEN_TRACE_CPU("SaveFolderContentToCompactBinary");
Output.AddString("platform"sv, ToString(Content.Platform));
compactbinary_helpers::WriteArray(Content.Paths, "paths"sv, Output);
compactbinary_helpers::WriteArray(Content.RawSizes, "rawSizes"sv, Output);
@@ -342,6 +350,7 @@ SaveFolderContentToCompactBinary(const FolderContent& Content, CbWriter& Output)
FolderContent
LoadFolderContentToCompactBinary(CbObjectView Input)
{
+ ZEN_TRACE_CPU("LoadFolderContentToCompactBinary");
FolderContent Content;
Content.Platform = FromString(Input["platform"sv].AsString(), GetSourceCurrentPlatform());
compactbinary_helpers::ReadArray("paths"sv, Input, Content.Paths);
@@ -494,6 +503,7 @@ GetFolderContent(GetFolderContentStatistics& Stats,
void
SaveChunkedFolderContentToCompactBinary(const ChunkedFolderContent& Content, CbWriter& Output)
{
+ ZEN_TRACE_CPU("SaveChunkedFolderContentToCompactBinary");
Output.AddString("platform"sv, ToString(Content.Platform));
compactbinary_helpers::WriteArray(Content.Paths, "paths"sv, Output);
compactbinary_helpers::WriteArray(Content.RawSizes, "rawSizes"sv, Output);
@@ -512,6 +522,7 @@ SaveChunkedFolderContentToCompactBinary(const ChunkedFolderContent& Content, CbW
ChunkedFolderContent
LoadChunkedFolderContentToCompactBinary(CbObjectView Input)
{
+ ZEN_TRACE_CPU("LoadChunkedFolderContentToCompactBinary");
ChunkedFolderContent Content;
Content.Platform = FromString(Input["platform"sv].AsString(), GetSourceCurrentPlatform());
compactbinary_helpers::ReadArray("paths"sv, Input, Content.Paths);
@@ -788,7 +799,7 @@ BuildChunkedContentLookup(const ChunkedFolderContent& Content)
struct ChunkLocationReference
{
- uint32_t ChunkIndex;
+ uint32_t ChunkIndex = (uint32_t)-1;
ChunkedContentLookup::ChunkSequenceLocation Location;
};
@@ -853,7 +864,7 @@ BuildChunkedContentLookup(const ChunkedFolderContent& Content)
{
Result.ChunkHashToChunkIndex.insert({Content.ChunkedContent.ChunkHashes[ChunkIndex], ChunkIndex});
uint32_t Count = 0;
- while (Locations[RangeOffset + Count].ChunkIndex == ChunkIndex)
+ while ((RangeOffset + Count < Locations.size()) && (Locations[RangeOffset + Count].ChunkIndex == ChunkIndex))
{
Result.ChunkSequenceLocations.push_back(Locations[RangeOffset + Count].Location);
Count++;
diff --git a/src/zenutil/chunkedfile.cpp b/src/zenutil/chunkedfile.cpp
index 4f9344039..a2c041ffd 100644
--- a/src/zenutil/chunkedfile.cpp
+++ b/src/zenutil/chunkedfile.cpp
@@ -3,6 +3,7 @@
#include <zenutil/chunkedfile.h>
#include <zencore/basicfile.h>
+#include <zencore/trace.h>
#include "chunking.h"
@@ -33,6 +34,7 @@ namespace {
IoBuffer
SerializeChunkedInfo(const ChunkedInfo& Info)
{
+ ZEN_TRACE_CPU("SerializeChunkedInfo");
size_t HeaderSize = RoundUp(sizeof(ChunkedHeader), 16) + RoundUp(sizeof(uint32_t) * Info.ChunkSequence.size(), 16) +
RoundUp(sizeof(IoHash) * Info.ChunkHashes.size(), 16);
IoBuffer HeaderData(HeaderSize);
@@ -65,6 +67,7 @@ SerializeChunkedInfo(const ChunkedInfo& Info)
ChunkedInfo
DeserializeChunkedInfo(IoBuffer& Buffer)
{
+ ZEN_TRACE_CPU("DeserializeChunkedInfo");
MemoryView View = Buffer.GetView();
ChunkedHeader Header;
{
@@ -99,6 +102,7 @@ DeserializeChunkedInfo(IoBuffer& Buffer)
void
Reconstruct(const ChunkedInfo& Info, const std::filesystem::path& TargetPath, std::function<IoBuffer(const IoHash& ChunkHash)> GetChunk)
{
+ ZEN_TRACE_CPU("Reconstruct");
BasicFile Reconstructed;
Reconstructed.Open(TargetPath, BasicFile::Mode::kTruncate);
BasicFileWriter ReconstructedWriter(Reconstructed, 64 * 1024);
@@ -119,6 +123,8 @@ ChunkData(BasicFile& RawData,
std::atomic<uint64_t>* BytesProcessed,
std::atomic<bool>* AbortFlag)
{
+ ZEN_TRACE_CPU("ChunkData");
+
ChunkedInfoWithSource Result;
tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> FoundChunks;
diff --git a/src/zenutil/chunkingcontroller.cpp b/src/zenutil/chunkingcontroller.cpp
index 017d12433..2a7057a46 100644
--- a/src/zenutil/chunkingcontroller.cpp
+++ b/src/zenutil/chunkingcontroller.cpp
@@ -4,6 +4,7 @@
#include <zencore/basicfile.h>
#include <zencore/compactbinarybuilder.h>
+#include <zencore/trace.h>
ZEN_THIRD_PARTY_INCLUDES_START
#include <tsl/robin_map.h>
@@ -61,6 +62,7 @@ public:
std::atomic<uint64_t>& BytesProcessed,
std::atomic<bool>& AbortFlag) const override
{
+ ZEN_TRACE_CPU("BasicChunkingController::ProcessFile");
const bool ExcludeFromChunking =
std::find(m_ChunkExcludeExtensions.begin(), m_ChunkExcludeExtensions.end(), InputPath.extension()) !=
m_ChunkExcludeExtensions.end();
@@ -136,6 +138,7 @@ public:
std::atomic<uint64_t>& BytesProcessed,
std::atomic<bool>& AbortFlag) const override
{
+ ZEN_TRACE_CPU("ChunkingControllerWithFixedChunking::ProcessFile");
if (RawSize < m_ChunkFileSizeLimit)
{
return false;
@@ -145,6 +148,7 @@ public:
if (FixedChunking)
{
+ ZEN_TRACE_CPU("FixedChunking");
IoHashStream FullHash;
IoBuffer Source = IoBufferBuilder::MakeFromFile(InputPath);
uint64_t Offset = 0;
diff --git a/src/zenutil/filebuildstorage.cpp b/src/zenutil/filebuildstorage.cpp
index e57109006..47a4e1cc4 100644
--- a/src/zenutil/filebuildstorage.cpp
+++ b/src/zenutil/filebuildstorage.cpp
@@ -8,6 +8,7 @@
#include <zencore/fmtutils.h>
#include <zencore/scopeguard.h>
#include <zencore/timer.h>
+#include <zencore/trace.h>
namespace zen {
@@ -36,6 +37,7 @@ public:
virtual CbObject ListBuilds(CbObject Query) override
{
+ ZEN_TRACE_CPU("FileBuildStorage::ListBuilds");
ZEN_UNUSED(Query);
SimulateLatency(Query.GetSize(), 0);
@@ -72,6 +74,7 @@ public:
virtual CbObject PutBuild(const Oid& BuildId, const CbObject& MetaData) override
{
+ ZEN_TRACE_CPU("FileBuildStorage::PutBuild");
SimulateLatency(MetaData.GetSize(), 0);
Stopwatch ExecutionTimer;
@@ -93,6 +96,7 @@ public:
virtual CbObject GetBuild(const Oid& BuildId) override
{
+ ZEN_TRACE_CPU("FileBuildStorage::GetBuild");
SimulateLatency(0, 0);
Stopwatch ExecutionTimer;
auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
@@ -105,6 +109,7 @@ public:
virtual void FinalizeBuild(const Oid& BuildId) override
{
+ ZEN_TRACE_CPU("FileBuildStorage::FinalizeBuild");
SimulateLatency(0, 0);
Stopwatch ExecutionTimer;
auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
@@ -119,6 +124,7 @@ public:
std::string_view PartName,
const CbObject& MetaData) override
{
+ ZEN_TRACE_CPU("FileBuildStorage::PutBuildPart");
SimulateLatency(MetaData.GetSize(), 0);
Stopwatch ExecutionTimer;
auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
@@ -164,6 +170,7 @@ public:
virtual CbObject GetBuildPart(const Oid& BuildId, const Oid& BuildPartId) override
{
+ ZEN_TRACE_CPU("FileBuildStorage::GetBuildPart");
SimulateLatency(0, 0);
Stopwatch ExecutionTimer;
@@ -186,6 +193,7 @@ public:
virtual std::vector<IoHash> FinalizeBuildPart(const Oid& BuildId, const Oid& BuildPartId, const IoHash& PartHash) override
{
+ ZEN_TRACE_CPU("FileBuildStorage::FinalizeBuildPart");
SimulateLatency(0, 0);
Stopwatch ExecutionTimer;
@@ -215,6 +223,7 @@ public:
ZenContentType ContentType,
const CompositeBuffer& Payload) override
{
+ ZEN_TRACE_CPU("FileBuildStorage::PutBuildBlob");
ZEN_UNUSED(BuildId);
ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary);
SimulateLatency(Payload.GetSize(), 0);
@@ -242,6 +251,7 @@ public:
std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter,
std::function<void(uint64_t, bool)>&& OnSentBytes) override
{
+ ZEN_TRACE_CPU("FileBuildStorage::PutLargeBuildBlob");
ZEN_UNUSED(BuildId);
ZEN_UNUSED(ContentType);
SimulateLatency(0, 0);
@@ -281,6 +291,7 @@ public:
uint64_t Size = Min(32u * 1024u * 1024u, PayloadSize - Offset);
WorkItems.push_back([this, RawHash, BlockPath, Workload, Offset, Size]() {
+ ZEN_TRACE_CPU("FileBuildStorage::PutLargeBuildBlob_Work");
IoBuffer PartPayload = Workload->Transmitter(Offset, Size);
SimulateLatency(PartPayload.GetSize(), 0);
@@ -327,6 +338,7 @@ public:
virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override
{
+ ZEN_TRACE_CPU("FileBuildStorage::GetBuildBlob");
ZEN_UNUSED(BuildId);
SimulateLatency(0, 0);
Stopwatch ExecutionTimer;
@@ -363,6 +375,7 @@ public:
uint64_t ChunkSize,
std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver) override
{
+ ZEN_TRACE_CPU("FileBuildStorage::GetLargeBuildBlob");
ZEN_UNUSED(BuildId);
SimulateLatency(0, 0);
Stopwatch ExecutionTimer;
@@ -392,6 +405,7 @@ public:
{
uint64_t Size = Min(ChunkSize, BlobSize - Offset);
WorkItems.push_back([this, BlockPath, Workload, Offset, Size]() {
+ ZEN_TRACE_CPU("FileBuildStorage::GetLargeBuildBlob_Work");
SimulateLatency(0, 0);
IoBuffer PartPayload(Size);
Workload->BlobFile.Read(PartPayload.GetMutableView().GetData(), Size, Offset);
@@ -411,6 +425,7 @@ public:
virtual void PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) override
{
+ ZEN_TRACE_CPU("FileBuildStorage::PutBlockMetadata");
ZEN_UNUSED(BuildId);
SimulateLatency(MetaData.GetSize(), 0);
@@ -429,6 +444,7 @@ public:
virtual std::vector<ChunkBlockDescription> FindBlocks(const Oid& BuildId) override
{
+ ZEN_TRACE_CPU("FileBuildStorage::FindBlocks");
ZEN_UNUSED(BuildId);
SimulateLatency(0, 0);
Stopwatch ExecutionTimer;
@@ -461,6 +477,7 @@ public:
virtual std::vector<ChunkBlockDescription> GetBlockMetadata(const Oid& BuildId, std::span<const IoHash> BlockHashes) override
{
+ ZEN_TRACE_CPU("FileBuildStorage::GetBlockMetadata");
ZEN_UNUSED(BuildId);
SimulateLatency(0, 0);
Stopwatch ExecutionTimer;
diff --git a/src/zenutil/include/zenutil/chunkedcontent.h b/src/zenutil/include/zenutil/chunkedcontent.h
index 309341550..57b55cb8e 100644
--- a/src/zenutil/include/zenutil/chunkedcontent.h
+++ b/src/zenutil/include/zenutil/chunkedcontent.h
@@ -124,8 +124,8 @@ struct ChunkedContentLookup
{
struct ChunkSequenceLocation
{
- uint32_t SequenceIndex;
- uint64_t Offset;
+ uint32_t SequenceIndex = (uint32_t)-1;
+ uint64_t Offset = (uint64_t)-1;
};
tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkHashToChunkIndex;
tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> RawHashToSequenceIndex;