diff options
| author | Dan Engelbrecht <[email protected]> | 2025-03-12 10:51:57 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-03-12 10:51:57 +0100 |
| commit | fb09d861fd76e459ac86bec388bd406aaca8e681 (patch) | |
| tree | 2710efa3a2492cf12886f447163fd8b4a939c196 /src | |
| parent | async find blocks (#300) (diff) | |
| download | zen-fb09d861fd76e459ac86bec388bd406aaca8e681.tar.xz zen-fb09d861fd76e459ac86bec388bd406aaca8e681.zip | |
improved block gen logic (#302)
- Improvement: Reduced memory usage during upload and part upload validation
- Improvement: Reduced I/O usage during upload and download
- Improvement: Faster block regeneration when uploading in response to PutBuild/FinalizeBuild
- Improvement: More trace scopes for build upload operations
- Bugfix: Fixed crash during download when trying to write outside a file range
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 1007 | ||||
| -rw-r--r-- | src/zen/zen.cpp | 3 | ||||
| -rw-r--r-- | src/zenutil/chunkedcontent.cpp | 15 | ||||
| -rw-r--r-- | src/zenutil/chunkedfile.cpp | 6 | ||||
| -rw-r--r-- | src/zenutil/chunkingcontroller.cpp | 4 | ||||
| -rw-r--r-- | src/zenutil/filebuildstorage.cpp | 17 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/chunkedcontent.h | 4 |
7 files changed, 599 insertions, 457 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index e03175256..baa46dda8 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -191,15 +191,34 @@ namespace { return SB.ToString(); } - void CleanDirectory(const std::filesystem::path& Path, std::span<const std::string_view> ExcludeDirectories) + bool CleanDirectory(const std::filesystem::path& Path, std::span<const std::string_view> ExcludeDirectories) { ZEN_TRACE_CPU("CleanDirectory"); + bool CleanWipe = true; + DirectoryContent LocalDirectoryContent; GetDirectoryContent(Path, DirectoryContentFlags::IncludeDirs | DirectoryContentFlags::IncludeFiles, LocalDirectoryContent); for (const std::filesystem::path& LocalFilePath : LocalDirectoryContent.Files) { - std::filesystem::remove(LocalFilePath); + try + { + std::filesystem::remove(LocalFilePath); + } + catch (const std::exception&) + { + // DeleteOnClose files may be a bit slow in getting cleaned up, so pause amd retry one time + Sleep(200); + try + { + std::filesystem::remove(LocalFilePath); + } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed removing file {}. Reason: {}", LocalFilePath, Ex.what()); + CleanWipe = false; + } + } } for (const std::filesystem::path& LocalDirPath : LocalDirectoryContent.Directories) @@ -215,10 +234,28 @@ namespace { } if (!Leave) { - zen::CleanDirectory(LocalDirPath); - std::filesystem::remove(LocalDirPath); + try + { + zen::CleanDirectory(LocalDirPath); + std::filesystem::remove(LocalDirPath); + } + catch (const std::exception&) + { + Sleep(200); + try + { + zen::CleanDirectory(LocalDirPath); + std::filesystem::remove(LocalDirPath); + } + catch (const std::exception& Ex) + { + ZEN_WARN("Failed removing directory {}. Reason: {}", LocalDirPath, Ex.what()); + CleanWipe = false; + } + } } } + return CleanWipe; } std::string ReadAccessTokenFromFile(const std::filesystem::path& Path) @@ -1188,6 +1225,7 @@ namespace { uint64_t& OutCompressedSize, uint64_t& OutDecompressedSize) { + ZEN_TRACE_CPU("ValidateBlob"); IoBuffer Payload = Storage.GetBuildBlob(BuildId, BlobHash); if (!Payload) { @@ -1214,6 +1252,7 @@ namespace { const IoHash& ChunkHash, ReadFileCache& OpenFileCache) { + ZEN_TRACE_CPU("FetchChunk"); auto It = Lookup.ChunkHashToChunkIndex.find(ChunkHash); ZEN_ASSERT(It != Lookup.ChunkHashToChunkIndex.end()); uint32_t ChunkIndex = It->second; @@ -1233,6 +1272,7 @@ namespace { ChunkBlockDescription& OutBlockDescription, DiskStatistics& DiskStats) { + ZEN_TRACE_CPU("GenerateBlock"); ReadFileCache OpenFileCache(DiskStats, Path, Content, Lookup, 4); std::vector<std::pair<IoHash, FetchChunkFunc>> BlockContent; @@ -1255,6 +1295,103 @@ namespace { return GenerateChunkBlock(std::move(BlockContent), OutBlockDescription); }; + CompressedBuffer RebuildBlock(const std::filesystem::path& Path, + const ChunkedFolderContent& Content, + const ChunkedContentLookup& Lookup, + CompositeBuffer&& HeaderBuffer, + const std::vector<uint32_t>& ChunksInBlock, + DiskStatistics& DiskStats) + { + ZEN_TRACE_CPU("RebuildBlock"); + ReadFileCache OpenFileCache(DiskStats, Path, Content, Lookup, 4); + + std::vector<SharedBuffer> ResultBuffers; + ResultBuffers.reserve(HeaderBuffer.GetSegments().size() + ChunksInBlock.size()); + ResultBuffers.insert(ResultBuffers.end(), HeaderBuffer.GetSegments().begin(), HeaderBuffer.GetSegments().end()); + for (uint32_t ChunkIndex : ChunksInBlock) + { + std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkLocations = GetChunkSequenceLocations(Lookup, ChunkIndex); + ZEN_ASSERT(!ChunkLocations.empty()); + CompositeBuffer Chunk = OpenFileCache.GetRange(ChunkLocations[0].SequenceIndex, + ChunkLocations[0].Offset, + Content.ChunkedContent.ChunkRawSizes[ChunkIndex]); + ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == Content.ChunkedContent.ChunkHashes[ChunkIndex]); + CompositeBuffer CompressedChunk = + CompressedBuffer::Compress(std::move(Chunk), OodleCompressor::Mermaid, OodleCompressionLevel::None).GetCompressed(); + ResultBuffers.insert(ResultBuffers.end(), CompressedChunk.GetSegments().begin(), CompressedChunk.GetSegments().end()); + } + return CompressedBuffer::FromCompressedNoValidate(CompositeBuffer(std::move(ResultBuffers))); + }; + + void DownloadLargeBlob(BuildStorage& Storage, + const std::filesystem::path& DownloadFolder, + const Oid& BuildId, + const IoHash& ChunkHash, + const std::uint64_t PreferredMultipartChunkSize, + ParallellWork& Work, + WorkerThreadPool& NetworkPool, + std::atomic<uint64_t>& BytesDownloaded, + std::atomic<uint64_t>& MultipartAttachmentCount, + std::function<void(IoBuffer&& Payload)>&& OnDownloadComplete) + { + ZEN_TRACE_CPU("DownloadLargeBlob"); + + struct WorkloadData + { + TemporaryFile TempFile; + }; + std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>()); + + std::error_code Ec; + Workload->TempFile.CreateTemporary(DownloadFolder, Ec); + if (Ec) + { + throw std::runtime_error( + fmt::format("Failed opening temporary file '{}': {} ({})", Workload->TempFile.GetPath(), Ec.message(), Ec.value())); + } + std::vector<std::function<void()>> WorkItems = Storage.GetLargeBuildBlob( + BuildId, + ChunkHash, + PreferredMultipartChunkSize, + [Workload, &BytesDownloaded, OnDownloadComplete = std::move(OnDownloadComplete)](uint64_t Offset, + const IoBuffer& Chunk, + uint64_t BytesRemaining) { + BytesDownloaded += Chunk.GetSize(); + + if (!AbortFlag.load()) + { + ZEN_TRACE_CPU("DownloadLargeBlob_Save"); + Workload->TempFile.Write(Chunk.GetView(), Offset); + if (Chunk.GetSize() == BytesRemaining) + { + uint64_t PayloadSize = Workload->TempFile.FileSize(); + void* FileHandle = Workload->TempFile.Detach(); + ZEN_ASSERT(FileHandle != nullptr); + IoBuffer Payload(IoBuffer::File, FileHandle, 0, PayloadSize, true); + Payload.SetDeleteOnClose(true); + OnDownloadComplete(std::move(Payload)); + } + } + }); + if (!WorkItems.empty()) + { + MultipartAttachmentCount++; + } + for (auto& WorkItem : WorkItems) + { + Work.ScheduleWork( + NetworkPool, // GetSyncWorkerPool(),// + [WorkItem = std::move(WorkItem)](std::atomic<bool>&) { + ZEN_TRACE_CPU("DownloadLargeBlob_Work"); + if (!AbortFlag) + { + WorkItem(); + } + }, + Work.DefaultErrorFunction()); + } + } + void ValidateBuildPart(BuildStorage& Storage, const Oid& BuildId, Oid BuildPartId, const std::string_view BuildPartName) { Stopwatch Timer; @@ -1274,6 +1411,11 @@ namespace { throw std::runtime_error(fmt::format("Build {} does not have a part named '{}'", BuildId, BuildPartName)); } } + uint64_t PreferredMultipartChunkSize = DefaultPreferredMultipartChunkSize; + if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0) + { + PreferredMultipartChunkSize = ChunkSize; + } CbObject BuildPart = Storage.GetBuildPart(BuildId, BuildPartId); ZEN_CONSOLE("Validating build part {}/{} ({})", BuildId, BuildPartId, NiceBytes(BuildPart.GetSize())); std::vector<IoHash> ChunkAttachments; @@ -1295,9 +1437,20 @@ namespace { } WorkerThreadPool& NetworkPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // + WorkerThreadPool& ReadPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // WorkerThreadPool& VerifyPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool(); // ParallellWork Work(AbortFlag); + const std::filesystem::path TempFolder = ".zen-tmp"; + + CreateDirectories(TempFolder); + auto __ = MakeGuard([&TempFolder]() { + if (CleanDirectory(TempFolder, {})) + { + std::filesystem::remove(TempFolder); + } + }); + ProgressBar ProgressBar(UsePlainProgress); uint64_t AttachmentsToVerifyCount = ChunkAttachments.size() + BlockAttachments.size(); @@ -1308,51 +1461,58 @@ namespace { FilteredRate FilteredDownloadedBytesPerSecond; FilteredRate FilteredVerifiedBytesPerSecond; + std::atomic<uint64_t> MultipartAttachmentCount = 0; + for (const IoHash& ChunkAttachment : ChunkAttachments) { Work.ScheduleWork( - NetworkPool, + ReadPool, [&, ChunkAttachment](std::atomic<bool>&) { if (!AbortFlag) { - FilteredDownloadedBytesPerSecond.Start(); - IoBuffer Payload = Storage.GetBuildBlob(BuildId, ChunkAttachment); - DownloadedAttachmentCount++; - DownloadedByteCount += Payload.GetSize(); - if (DownloadedAttachmentCount.load() == AttachmentsToVerifyCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - if (!Payload) - { - throw std::runtime_error(fmt::format("Chunk attachment {} could not be found", ChunkAttachment)); - } - if (!AbortFlag) - { - Work.ScheduleWork( - VerifyPool, - [&, Payload = std::move(Payload), ChunkAttachment](std::atomic<bool>&) mutable { - if (!AbortFlag) - { - FilteredVerifiedBytesPerSecond.Start(); + ZEN_TRACE_CPU("ValidateBuildPart_GetChunk"); - uint64_t CompressedSize; - uint64_t DecompressedSize; - ValidateBlob(std::move(Payload), ChunkAttachment, CompressedSize, DecompressedSize); - ZEN_CONSOLE_VERBOSE("Chunk attachment {} ({} -> {}) is valid", - ChunkAttachment, - NiceBytes(CompressedSize), - NiceBytes(DecompressedSize)); - VerifiedAttachmentCount++; - VerifiedByteCount += DecompressedSize; - if (VerifiedAttachmentCount.load() == AttachmentsToVerifyCount) - { - FilteredVerifiedBytesPerSecond.Stop(); - } - } - }, - Work.DefaultErrorFunction()); - } + FilteredDownloadedBytesPerSecond.Start(); + DownloadLargeBlob(Storage, + TempFolder, + BuildId, + ChunkAttachment, + PreferredMultipartChunkSize, + Work, + NetworkPool, + DownloadedByteCount, + MultipartAttachmentCount, + [&, ChunkHash = ChunkAttachment](IoBuffer&& Payload) { + Payload.SetContentType(ZenContentType::kCompressedBinary); + if (!AbortFlag) + { + Work.ScheduleWork( + VerifyPool, + [&, Payload = std::move(Payload), ChunkHash](std::atomic<bool>&) mutable { + if (!AbortFlag) + { + ZEN_TRACE_CPU("ValidateBuildPart_Validate"); + + FilteredVerifiedBytesPerSecond.Start(); + + uint64_t CompressedSize; + uint64_t DecompressedSize; + ValidateBlob(std::move(Payload), ChunkHash, CompressedSize, DecompressedSize); + ZEN_CONSOLE_VERBOSE("Chunk attachment {} ({} -> {}) is valid", + ChunkHash, + NiceBytes(CompressedSize), + NiceBytes(DecompressedSize)); + VerifiedAttachmentCount++; + VerifiedByteCount += DecompressedSize; + if (VerifiedAttachmentCount.load() == AttachmentsToVerifyCount) + { + FilteredVerifiedBytesPerSecond.Stop(); + } + } + }, + Work.DefaultErrorFunction()); + } + }); } }, Work.DefaultErrorFunction()); @@ -1365,6 +1525,8 @@ namespace { [&, BlockAttachment](std::atomic<bool>&) { if (!AbortFlag) { + ZEN_TRACE_CPU("ValidateBuildPart_GetBlock"); + FilteredDownloadedBytesPerSecond.Start(); IoBuffer Payload = Storage.GetBuildBlob(BuildId, BlockAttachment); DownloadedAttachmentCount++; @@ -1384,6 +1546,8 @@ namespace { [&, Payload = std::move(Payload), BlockAttachment](std::atomic<bool>&) mutable { if (!AbortFlag) { + ZEN_TRACE_CPU("ValidateBuildPart_ValidateBlock"); + FilteredVerifiedBytesPerSecond.Start(); uint64_t CompressedSize; @@ -1442,6 +1606,7 @@ namespace { std::vector<uint32_t>& ChunkIndexes, std::vector<std::vector<uint32_t>>& OutBlocks) { + ZEN_TRACE_CPU("ArrangeChunksIntoBlocks"); std::sort(ChunkIndexes.begin(), ChunkIndexes.end(), [&Content, &Lookup](uint32_t Lhs, uint32_t Rhs) { const ChunkedContentLookup::ChunkSequenceLocation& LhsLocation = GetChunkSequenceLocations(Lookup, Lhs)[0]; const ChunkedContentLookup::ChunkSequenceLocation& RhsLocation = GetChunkSequenceLocations(Lookup, Rhs)[0]; @@ -1535,6 +1700,7 @@ namespace { uint32_t ChunkIndex, const std::filesystem::path& TempFolderPath) { + ZEN_TRACE_CPU("CompressChunk"); ZEN_ASSERT(!TempFolderPath.empty()); const IoHash& ChunkHash = Content.ChunkedContent.ChunkHashes[ChunkIndex]; const uint64_t ChunkSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; @@ -1608,7 +1774,7 @@ namespace { { std::vector<ChunkBlockDescription> BlockDescriptions; std::vector<uint64_t> BlockSizes; - std::vector<CompositeBuffer> BlockBuffers; + std::vector<CompositeBuffer> BlockHeaders; std::vector<CbObject> BlockMetaDatas; std::vector<bool> MetaDataHasBeenUploaded; tsl::robin_map<IoHash, size_t, IoHash::Hasher> BlockHashToBlockIndex; @@ -1625,6 +1791,7 @@ namespace { UploadStatistics& UploadStats, GenerateBlocksStatistics& GenerateBlocksStats) { + ZEN_TRACE_CPU("GenerateBuildBlocks"); const std::size_t NewBlockCount = NewBlockChunks.size(); if (NewBlockCount > 0) { @@ -1632,22 +1799,23 @@ namespace { OutBlocks.BlockDescriptions.resize(NewBlockCount); OutBlocks.BlockSizes.resize(NewBlockCount); - OutBlocks.BlockBuffers.resize(NewBlockCount); OutBlocks.BlockMetaDatas.resize(NewBlockCount); + OutBlocks.BlockHeaders.resize(NewBlockCount); OutBlocks.MetaDataHasBeenUploaded.resize(NewBlockCount, false); OutBlocks.BlockHashToBlockIndex.reserve(NewBlockCount); RwLock Lock; - WorkerThreadPool& GenerateBlobsPool = GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool();// - WorkerThreadPool& UploadBlocksPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool();// + WorkerThreadPool& GenerateBlobsPool = + GetMediumWorkerPool(EWorkloadType::Burst); // GetMediumWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool();// + WorkerThreadPool& UploadBlocksPool = GetSmallWorkerPool(EWorkloadType::Burst); // GetSyncWorkerPool();// FilteredRate FilteredGeneratedBytesPerSecond; FilteredRate FilteredUploadedBytesPerSecond; ParallellWork Work(AbortFlag); - std::atomic<uint32_t> PendingUploadCount(0); + std::atomic<uint64_t> QueuedPendingBlocksForUpload = 0; for (size_t BlockIndex = 0; BlockIndex < NewBlockCount; BlockIndex++) { @@ -1661,6 +1829,8 @@ namespace { [&, BlockIndex](std::atomic<bool>&) { if (!AbortFlag) { + ZEN_TRACE_CPU("GenerateBuildBlocks_Generate"); + FilteredGeneratedBytesPerSecond.Start(); // TODO: Convert ScheduleWork body to function @@ -1672,14 +1842,6 @@ namespace { OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); OutBlocks.BlockSizes[BlockIndex] = CompressedBlock.GetCompressedSize(); - - if (!IsBufferDiskBased(CompressedBlock.GetCompressed())) - { - IoBuffer TempPayload = WriteToTempFile(std::move(CompressedBlock).GetCompressed(), - Path / ZenTempBlockFolderName, - OutBlocks.BlockDescriptions[BlockIndex].BlockHash); - CompressedBlock = CompressedBuffer::FromCompressedNoValidate(std::move(TempPayload)); - } { CbObjectWriter Writer; Writer.AddString("createdBy", "zen"); @@ -1693,66 +1855,88 @@ namespace { BlockIndex); }); + { + std::span<const SharedBuffer> Segments = CompressedBlock.GetCompressed().GetSegments(); + ZEN_ASSERT(Segments.size() >= 2); + OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]); + } + if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount) { FilteredGeneratedBytesPerSecond.Stop(); } - if (!AbortFlag) + if (QueuedPendingBlocksForUpload.load() > 16) { - PendingUploadCount++; - Work.ScheduleWork( - UploadBlocksPool, - [&, BlockIndex, Payload = std::move(CompressedBlock).GetCompressed()](std::atomic<bool>&) mutable { - if (!AbortFlag) - { - if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount) - { - FilteredUploadedBytesPerSecond.Stop(); - OutBlocks.BlockBuffers[BlockIndex] = std::move(Payload); - } - else + std::span<const SharedBuffer> Segments = CompressedBlock.GetCompressed().GetSegments(); + ZEN_ASSERT(Segments.size() >= 2); + OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]); + } + else + { + if (!AbortFlag) + { + QueuedPendingBlocksForUpload++; + + Work.ScheduleWork( + UploadBlocksPool, + [&, BlockIndex, Payload = std::move(CompressedBlock)](std::atomic<bool>&) mutable { + auto _ = MakeGuard([&QueuedPendingBlocksForUpload] { QueuedPendingBlocksForUpload--; }); + if (!AbortFlag) { - FilteredUploadedBytesPerSecond.Start(); - // TODO: Convert ScheduleWork body to function - - PendingUploadCount--; - - const CbObject BlockMetaData = - BuildChunkBlockDescription(OutBlocks.BlockDescriptions[BlockIndex], - OutBlocks.BlockMetaDatas[BlockIndex]); - - const IoHash& BlockHash = OutBlocks.BlockDescriptions[BlockIndex].BlockHash; - const uint64_t CompressedBlockSize = Payload.GetSize(); - Storage.PutBuildBlob(BuildId, - BlockHash, - ZenContentType::kCompressedBinary, - std::move(Payload)); - UploadStats.BlocksBytes += CompressedBlockSize; - ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks", - OutBlocks.BlockDescriptions[BlockIndex].BlockHash, - NiceBytes(CompressedBlockSize), - OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); - - Storage.PutBlockMetadata(BuildId, - OutBlocks.BlockDescriptions[BlockIndex].BlockHash, - BlockMetaData); - ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})", - OutBlocks.BlockDescriptions[BlockIndex].BlockHash, - NiceBytes(BlockMetaData.GetSize())); - - OutBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; - - UploadStats.BlocksBytes += BlockMetaData.GetSize(); - UploadStats.BlockCount++; - if (UploadStats.BlockCount == NewBlockCount) + if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount) { + ZEN_TRACE_CPU("GenerateBuildBlocks_Save"); + FilteredUploadedBytesPerSecond.Stop(); + std::span<const SharedBuffer> Segments = Payload.GetCompressed().GetSegments(); + ZEN_ASSERT(Segments.size() >= 2); + OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]); + } + else + { + ZEN_TRACE_CPU("GenerateBuildBlocks_Upload"); + + FilteredUploadedBytesPerSecond.Start(); + // TODO: Convert ScheduleWork body to function + + const CbObject BlockMetaData = + BuildChunkBlockDescription(OutBlocks.BlockDescriptions[BlockIndex], + OutBlocks.BlockMetaDatas[BlockIndex]); + + const IoHash& BlockHash = OutBlocks.BlockDescriptions[BlockIndex].BlockHash; + const uint64_t CompressedBlockSize = Payload.GetCompressedSize(); + + Storage.PutBuildBlob(BuildId, + BlockHash, + ZenContentType::kCompressedBinary, + std::move(Payload).GetCompressed()); + UploadStats.BlocksBytes += CompressedBlockSize; + ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks", + OutBlocks.BlockDescriptions[BlockIndex].BlockHash, + NiceBytes(CompressedBlockSize), + OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); + + Storage.PutBlockMetadata(BuildId, + OutBlocks.BlockDescriptions[BlockIndex].BlockHash, + BlockMetaData); + ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})", + OutBlocks.BlockDescriptions[BlockIndex].BlockHash, + NiceBytes(BlockMetaData.GetSize())); + + OutBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; + + UploadStats.BlocksBytes += BlockMetaData.GetSize(); + UploadStats.BlockCount++; + if (UploadStats.BlockCount == NewBlockCount) + { + FilteredUploadedBytesPerSecond.Stop(); + } } } - } - }, - Work.DefaultErrorFunction()); + }, + Work.DefaultErrorFunction()); + } } } }, @@ -1783,6 +1967,8 @@ namespace { false); }); + ZEN_ASSERT(AbortFlag || QueuedPendingBlocksForUpload.load() == 0); + ProgressBar.Finish(); GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGeneratedBytesPerSecond.GetElapsedTimeUS(); @@ -1805,6 +1991,7 @@ namespace { GenerateBlocksStatistics& GenerateBlocksStats, LooseChunksStatistics& LooseChunksStats) { + ZEN_TRACE_CPU("UploadPartBlobs"); { ProgressBar ProgressBar(UsePlainProgress); @@ -1858,12 +2045,37 @@ namespace { const size_t UploadBlockCount = BlockIndexes.size(); const uint32_t UploadChunkCount = gsl::narrow<uint32_t>(LooseChunkOrderIndexes.size()); - auto AsyncUploadBlock = [&](const size_t BlockIndex, const IoHash BlockHash, CompositeBuffer&& Payload) { + auto AsyncUploadBlock = [&](const size_t BlockIndex, + const IoHash BlockHash, + CompositeBuffer&& Payload, + std::atomic<uint64_t>& QueuedPendingInMemoryBlocksForUpload) { + bool IsInMemoryBlock = true; + if (QueuedPendingInMemoryBlocksForUpload.load() > 16) + { + ZEN_TRACE_CPU("AsyncUploadBlock_WriteTempBlock"); + Payload = CompositeBuffer(WriteToTempFile(std::move(Payload), Path / ZenTempBlockFolderName, BlockHash)); + IsInMemoryBlock = false; + } + else + { + QueuedPendingInMemoryBlocksForUpload++; + } + Work.ScheduleWork( UploadChunkPool, - [&, BlockIndex, BlockHash, Payload = std::move(Payload)](std::atomic<bool>&) mutable { + [&, IsInMemoryBlock, BlockIndex, BlockHash, Payload = std::move(Payload)](std::atomic<bool>&) mutable { + auto _ = MakeGuard([IsInMemoryBlock, &QueuedPendingInMemoryBlocksForUpload] { + if (IsInMemoryBlock) + { + QueuedPendingInMemoryBlocksForUpload--; + } + }); if (!AbortFlag) { + ZEN_TRACE_CPU("AsyncUploadBlock"); + + const uint64_t PayloadSize = Payload.GetSize(); + FilteredUploadedBytesPerSecond.Start(); const CbObject BlockMetaData = BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]); @@ -1871,10 +2083,10 @@ namespace { Storage.PutBuildBlob(BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload); ZEN_CONSOLE_VERBOSE("Uploaded block {} ({}) containing {} chunks", NewBlocks.BlockDescriptions[BlockIndex].BlockHash, - NiceBytes(Payload.GetSize()), + NiceBytes(PayloadSize), NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); - UploadedBlockSize += Payload.GetSize(); - UploadStats.BlocksBytes += Payload.GetSize(); + UploadedBlockSize += PayloadSize; + UploadStats.BlocksBytes += PayloadSize; Storage.PutBlockMetadata(BuildId, BlockHash, BlockMetaData); ZEN_CONSOLE_VERBOSE("Uploaded block {} metadata ({})", @@ -1902,9 +2114,13 @@ namespace { [&, RawHash, RawSize, Payload = CompositeBuffer(std::move(Payload))](std::atomic<bool>&) mutable { if (!AbortFlag) { + ZEN_TRACE_CPU("AsyncUploadLooseChunk"); + const uint64_t PayloadSize = Payload.GetSize(); + ; if (PayloadSize >= LargeAttachmentSize) { + ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart"); UploadStats.MultipartAttachmentCount++; std::vector<std::function<void()>> MultipartWork = Storage.PutLargeBuildBlob( BuildId, @@ -1938,6 +2154,7 @@ namespace { Work.ScheduleWork( UploadChunkPool, [Work = std::move(WorkPart)](std::atomic<bool>&) { + ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart_Work"); if (!AbortFlag) { Work(); @@ -1949,6 +2166,7 @@ namespace { } else { + ZEN_TRACE_CPU("AsyncUploadLooseChunk_Singlepart"); Storage.PutBuildBlob(BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); ZEN_CONSOLE_VERBOSE("Uploaded chunk {} ({})", RawHash, NiceBytes(PayloadSize)); UploadStats.ChunksBytes += Payload.GetSize(); @@ -1971,26 +2189,10 @@ namespace { std::atomic<uint64_t> GeneratedBlockCount = 0; std::atomic<uint64_t> GeneratedBlockByteCount = 0; - // Start upload of any pre-built blocks - for (const size_t BlockIndex : BlockIndexes) - { - if (CompositeBuffer BlockPayload = std::move(NewBlocks.BlockBuffers[BlockIndex]); BlockPayload) - { - const IoHash& BlockHash = NewBlocks.BlockDescriptions[BlockIndex].BlockHash; - if (!AbortFlag) - { - AsyncUploadBlock(BlockIndex, BlockHash, std::move(BlockPayload)); - } - // GeneratedBlockCount++; - } - else - { - GenerateBlockIndexes.push_back(BlockIndex); - } - } - std::vector<uint32_t> CompressLooseChunkOrderIndexes; + std::atomic<uint64_t> QueuedPendingInMemoryBlocksForUpload = 0; + // Start upload of any pre-compressed loose chunks for (const uint32_t LooseChunkOrderIndex : LooseChunkOrderIndexes) { @@ -1998,31 +2200,43 @@ namespace { } // Start generation of any non-prebuilt blocks and schedule upload - for (const size_t BlockIndex : GenerateBlockIndexes) + for (const size_t BlockIndex : BlockIndexes) { const IoHash& BlockHash = NewBlocks.BlockDescriptions[BlockIndex].BlockHash; if (!AbortFlag) { Work.ScheduleWork( - ReadChunkPool, + ReadChunkPool, // GetSyncWorkerPool() [&, BlockIndex](std::atomic<bool>&) { if (!AbortFlag) { + ZEN_TRACE_CPU("UploadPartBlobs_GenerateBlock"); + FilteredGenerateBlockBytesPerSecond.Start(); - ChunkBlockDescription BlockDescription; - CompressedBuffer CompressedBlock = - GenerateBlock(Path, Content, Lookup, NewBlockChunks[BlockIndex], BlockDescription, DiskStats); - if (!CompressedBlock) + + CompositeBuffer Payload; + if (NewBlocks.BlockHeaders[BlockIndex]) { - throw std::runtime_error(fmt::format("Failed generating block {}", BlockHash)); + Payload = RebuildBlock(Path, + Content, + Lookup, + std::move(NewBlocks.BlockHeaders[BlockIndex]), + NewBlockChunks[BlockIndex], + DiskStats) + .GetCompressed(); + } + else + { + ChunkBlockDescription BlockDescription; + CompressedBuffer CompressedBlock = + GenerateBlock(Path, Content, Lookup, NewBlockChunks[BlockIndex], BlockDescription, DiskStats); + if (!CompressedBlock) + { + throw std::runtime_error(fmt::format("Failed generating block {}", BlockHash)); + } + ZEN_ASSERT(BlockDescription.BlockHash == BlockHash); + Payload = std::move(CompressedBlock).GetCompressed(); } - ZEN_ASSERT(BlockDescription.BlockHash == BlockHash); - - CompositeBuffer Payload = IsBufferDiskBased(CompressedBlock.GetCompressed()) - ? std::move(CompressedBlock).GetCompressed() - : CompositeBuffer(WriteToTempFile(std::move(CompressedBlock).GetCompressed(), - Path / ZenTempBlockFolderName, - BlockDescription.BlockHash)); GenerateBlocksStats.GeneratedBlockByteCount += NewBlocks.BlockSizes[BlockIndex]; GenerateBlocksStats.GeneratedBlockCount++; @@ -2034,11 +2248,11 @@ namespace { } if (!AbortFlag) { - AsyncUploadBlock(BlockIndex, BlockHash, std::move(Payload)); + AsyncUploadBlock(BlockIndex, BlockHash, std::move(Payload), QueuedPendingInMemoryBlocksForUpload); } ZEN_CONSOLE_VERBOSE("Regenerated block {} ({}) containing {} chunks", NewBlocks.BlockDescriptions[BlockIndex].BlockHash, - NiceBytes(CompressedBlock.GetCompressedSize()), + NiceBytes(NewBlocks.BlockSizes[BlockIndex]), NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); } }, @@ -2059,6 +2273,8 @@ namespace { [&, ChunkIndex](std::atomic<bool>&) { if (!AbortFlag) { + ZEN_TRACE_CPU("UploadPartBlobs_CompressChunk"); + FilteredCompressedBytesPerSecond.Start(); CompositeBuffer Payload = CompressChunk(Path, Content, Lookup, ChunkIndex, Path / ZenTempChunkFolderName); ZEN_CONSOLE_VERBOSE("Compressed chunk {} ({} -> {})", @@ -2117,6 +2333,8 @@ namespace { false); }); + ZEN_ASSERT(AbortFlag || QueuedPendingInMemoryBlocksForUpload.load() == 0); + ProgressBar.Finish(); UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS(); GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGenerateBlockBytesPerSecond.GetElapsedTimeUS(); @@ -2131,6 +2349,8 @@ namespace { std::vector<uint32_t>& OutUnusedChunkIndexes, FindBlocksStatistics& FindBlocksStats) { + ZEN_TRACE_CPU("FindReuseBlocks"); + // Find all blocks with a usage level higher than MinPercentLimit // Pick out the blocks with usage higher or equal to MinPercentLimit // Sort them with highest size usage - most usage first @@ -2303,8 +2523,10 @@ namespace { CreateDirectories(ZenTempFolder); CleanDirectory(ZenTempFolder, {}); auto _ = MakeGuard([&]() { - CleanDirectory(ZenTempFolder, {}); - std::filesystem::remove(ZenTempFolder); + if (CleanDirectory(ZenTempFolder, {})) + { + std::filesystem::remove(ZenTempFolder); + } }); CreateDirectories(Path / ZenTempBlockFolderName); CreateDirectories(Path / ZenTempChunkFolderName); @@ -2327,10 +2549,14 @@ namespace { GetSmallWorkerPool(EWorkloadType::Burst) .EnqueueTask(std::packaged_task<PrepareBuildResult()>{ [&Storage, BuildId, &MetaData, CreateBuild, AllowMultiparts, IgnoreExistingBlocks, &FindBlocksStats] { + ZEN_TRACE_CPU("PrepareBuild"); + PrepareBuildResult Result; Stopwatch Timer; if (CreateBuild) { + ZEN_TRACE_CPU("CreateBuild"); + Stopwatch PutBuildTimer; CbObject PutBuildResult = Storage.PutBuild(BuildId, MetaData); Result.PrepareBuildTimeMs = PutBuildTimer.GetElapsedTimeMs(); @@ -2339,6 +2565,7 @@ namespace { } else { + ZEN_TRACE_CPU("PutBuild"); Stopwatch GetBuildTimer; CbObject Build = Storage.GetBuild(BuildId); Result.PrepareBuildTimeMs = GetBuildTimer.GetElapsedTimeMs(); @@ -2356,6 +2583,7 @@ namespace { if (!IgnoreExistingBlocks) { + ZEN_TRACE_CPU("FindBlocks"); Stopwatch KnownBlocksTimer; Result.KnownBlocks = Storage.FindBlocks(BuildId); FindBlocksStats.FindBlockTimeMS = KnownBlocksTimer.GetElapsedTimeMs(); @@ -2523,11 +2751,8 @@ namespace { ChunkerParameters = ChunkParametersWriter.Save(); } - std::uint64_t TotalRawSize = 0; - for (uint64_t RawSize : Content.RawSizes) - { - TotalRawSize += RawSize; - } + std::uint64_t TotalRawSize = std::accumulate(Content.RawSizes.begin(), Content.RawSizes.end(), std::uint64_t(0)); + { ProgressBar ProgressBar(UsePlainProgress); FilteredRate FilteredBytesHashed; @@ -2956,9 +3181,9 @@ namespace { ForceUploadChunkHashes.push_back(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]); } - for (size_t BlockIndex = 0; BlockIndex < NewBlocks.BlockBuffers.size(); BlockIndex++) + for (size_t BlockIndex = 0; BlockIndex < NewBlocks.BlockHeaders.size(); BlockIndex++) { - if (NewBlocks.BlockBuffers[BlockIndex]) + if (NewBlocks.BlockHeaders[BlockIndex]) { // Block was not uploaded during generation ForceUploadChunkHashes.push_back(NewBlocks.BlockDescriptions[BlockIndex].BlockHash); @@ -3457,8 +3682,8 @@ namespace { std::vector<CompositeBuffer> ChunkBuffers; struct WriteOpData { - const ChunkedContentLookup::ChunkSequenceLocation* Target; - size_t ChunkBufferIndex; + const ChunkedContentLookup::ChunkSequenceLocation* Target = nullptr; + size_t ChunkBufferIndex = (size_t)-1; }; std::vector<WriteOpData> WriteOps; }; @@ -3964,155 +4189,129 @@ namespace { } } - void DownloadLargeBlob(BuildStorage& Storage, - const std::filesystem::path& Path, - const ChunkedFolderContent& RemoteContent, - const ChunkedContentLookup& RemoteLookup, - const Oid& BuildId, - const IoHash& ChunkHash, - const std::uint64_t PreferredMultipartChunkSize, - const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs, - std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, - ParallellWork& Work, - WorkerThreadPool& WritePool, - WorkerThreadPool& NetworkPool, - std::atomic<uint64_t>& WriteToDiskBytes, - std::atomic<uint64_t>& BytesDownloaded, - std::atomic<uint64_t>& MultipartAttachmentCount, - std::function<void(uint64_t DowloadedBytes)>&& OnDownloadComplete, - std::function<void()>&& OnWriteStart, - std::function<void()>&& OnWriteComplete) + void AsyncWriteDownloadedChunk(const std::filesystem::path& Path, + const ChunkedFolderContent& RemoteContent, + const ChunkedContentLookup& RemoteLookup, + uint32_t RemoteChunkIndex, + std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>&& ChunkTargetPtrs, + ParallellWork& Work, + WorkerThreadPool& WritePool, + IoBuffer&& Payload, + std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, + std::atomic<uint64_t>& WriteToDiskBytes, + std::atomic<uint32_t>& ChunkCountWritten, + std::atomic<uint64_t>& WritePartsComplete, + std::atomic<uint64_t>& TotalPartWriteCount, + std::atomic<uint64_t>& LooseChunksBytes, + FilteredRate& FilteredWrittenBytesPerSecond) { - ZEN_TRACE_CPU("DownloadLargeBlob"); + ZEN_TRACE_CPU("AsyncWriteDownloadedChunk"); - struct WorkloadData + const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + + uint64_t Size = Payload.GetSize(); + LooseChunksBytes += Size; + + std::filesystem::path CompressedChunkPath; + + // Check if the dowloaded chunk is file based and we can move it directly without rewriting it { - TemporaryFile TempFile; - }; - std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>()); + IoBufferFileReference FileRef; + if (Payload.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == Size)) + { + ZEN_TRACE_CPU("MoveTempChunk"); + std::error_code Ec; + std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); + if (!Ec) + { + Payload.SetDeleteOnClose(false); + Payload = {}; + CompressedChunkPath = Path / ZenTempDownloadFolderName / ChunkHash.ToHexString(); + std::filesystem::rename(TempBlobPath, CompressedChunkPath, Ec); + if (Ec) + { + CompressedChunkPath = std::filesystem::path{}; - std::filesystem::path DownloadFolder = Path / ZenTempDownloadFolderName; - std::filesystem::path TargetFolder = Path / ZenTempCacheFolderName; + // Re-open the temp file again + BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); + Payload = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, Size, true); + Payload.SetDeleteOnClose(true); + } + } + } + } - std::error_code Ec; - Workload->TempFile.CreateTemporary(DownloadFolder, Ec); - if (Ec) + if (CompressedChunkPath.empty() && (Size > 512u * 1024u)) { - throw std::runtime_error( - fmt::format("Failed opening temporary file '{}': {} ({})", Workload->TempFile.GetPath(), Ec.message(), Ec.value())); + ZEN_TRACE_CPU("WriteTempChunk"); + // Could not be moved and rather large, lets store it on disk + CompressedChunkPath = Path / ZenTempDownloadFolderName / ChunkHash.ToHexString(); + TemporaryFile::SafeWriteFile(CompressedChunkPath, Payload); + Payload = {}; } - std::vector<std::function<void()>> WorkItems = Storage.GetLargeBuildBlob( - BuildId, - ChunkHash, - PreferredMultipartChunkSize, - [DownloadFolder, - TargetFolder, - &RemoteContent, - &RemoteLookup, - &Work, - &WritePool, - Workload, - ChunkHash, - &BytesDownloaded, - OnDownloadComplete = std::move(OnDownloadComplete), - OnWriteComplete = std::move(OnWriteComplete), - OnWriteStart = std::move(OnWriteStart), - &WriteToDiskBytes, + + Work.ScheduleWork( + WritePool, // GetSyncWorkerPool(),// + [&, SequenceIndexChunksLeftToWriteCounters, - ChunkTargetPtrs = std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>( - ChunkTargetPtrs)](uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining) { - BytesDownloaded += Chunk.GetSize(); + CompressedChunkPath, + RemoteChunkIndex, + ChunkTargetPtrs = std::move(ChunkTargetPtrs), + CompressedPart = std::move(Payload)](std::atomic<bool>&) mutable { + ZEN_TRACE_CPU("UpdateFolder_WriteChunk"); - if (!AbortFlag.load()) + if (!AbortFlag) { - ZEN_TRACE_CPU("DownloadLargeBlob_Save"); - Workload->TempFile.Write(Chunk.GetView(), Offset); - if (Chunk.GetSize() == BytesRemaining) + FilteredWrittenBytesPerSecond.Start(); + + const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; + if (CompressedChunkPath.empty()) { - OnDownloadComplete(Workload->TempFile.FileSize()); - - Work.ScheduleWork( - WritePool, // GetSyncWorkerPool(),// - [DownloadFolder, - TargetFolder, - &RemoteContent, - &RemoteLookup, - ChunkHash, - Workload, - Offset, - OnWriteComplete = std::move(OnWriteComplete), - OnWriteStart = std::move(OnWriteStart), - &WriteToDiskBytes, - SequenceIndexChunksLeftToWriteCounters, - ChunkTargetPtrs](std::atomic<bool>&) { - ZEN_TRACE_CPU("DownloadLargeBlob_Write"); + ZEN_ASSERT(CompressedPart); + } + else + { + ZEN_ASSERT(!CompressedPart); + CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); + if (!CompressedPart) + { + throw std::runtime_error( + fmt::format("Could not open dowloaded compressed chunk {} from {}", ChunkHash, CompressedChunkPath)); + } + } - if (!AbortFlag) - { - const std::filesystem::path CompressedChunkPath = DownloadFolder / ChunkHash.ToHexString(); - std::error_code Ec; - Workload->TempFile.MoveTemporaryIntoPlace(CompressedChunkPath, Ec); - if (Ec) - { - throw std::runtime_error(fmt::format("Failed moving downloaded chunk {} file to {}. Reason: {}", - ChunkHash, - CompressedChunkPath, - Ec.message())); - } + std::filesystem::path TargetFolder = Path / ZenTempCacheFolderName; - IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); - if (!CompressedPart) - { - throw std::runtime_error(fmt::format("Could not open dowloaded compressed chunk {} from {}", - ChunkHash, - CompressedChunkPath)); - } + bool NeedHashVerify = WriteCompressedChunk(TargetFolder, + RemoteContent, + RemoteLookup, + ChunkHash, + ChunkTargetPtrs, + std::move(CompressedPart), + WriteToDiskBytes); - OnWriteStart(); + if (!AbortFlag) + { + ChunkCountWritten++; + WritePartsComplete++; + if (WritePartsComplete == TotalPartWriteCount) + { + FilteredWrittenBytesPerSecond.Stop(); + } - bool NeedHashVerify = WriteCompressedChunk(TargetFolder, - RemoteContent, - RemoteLookup, - ChunkHash, - ChunkTargetPtrs, - std::move(CompressedPart), - WriteToDiskBytes); + std::filesystem::remove(CompressedChunkPath); - if (!AbortFlag) - { - std::filesystem::remove(CompressedChunkPath); - - CompleteChunkTargets(TargetFolder, - RemoteContent, - ChunkHash, - ChunkTargetPtrs, - SequenceIndexChunksLeftToWriteCounters, - NeedHashVerify); - } - } - }, - Work.DefaultErrorFunction()); + CompleteChunkTargets(TargetFolder, + RemoteContent, + ChunkHash, + ChunkTargetPtrs, + SequenceIndexChunksLeftToWriteCounters, + NeedHashVerify); } } - }); - if (!WorkItems.empty()) - { - MultipartAttachmentCount++; - } - for (auto& WorkItem : WorkItems) - { - Work.ScheduleWork( - NetworkPool, // GetSyncWorkerPool(),// - [WorkItem = std::move(WorkItem)](std::atomic<bool>&) { - ZEN_TRACE_CPU("DownloadLargeBlob_Work"); - if (!AbortFlag) - { - WorkItem(); - } - }, - Work.DefaultErrorFunction()); - } - } + }, + Work.DefaultErrorFunction()); + }; void UpdateFolder(BuildStorage& Storage, const Oid& BuildId, @@ -4291,13 +4490,13 @@ namespace { // Pick up all chunks in current local state struct CacheCopyData { - uint32_t LocalSequenceIndex; + uint32_t LocalSequenceIndex = (uint32_t)-1; std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> TargetChunkLocationPtrs; struct ChunkTarget { - uint32_t TargetChunkLocationCount; - uint64_t ChunkRawSize; - uint64_t CacheFileOffset; + uint32_t TargetChunkLocationCount = (uint32_t)-1; + uint64_t ChunkRawSize = (uint64_t)-1; + uint64_t CacheFileOffset = (uint64_t)-1; }; std::vector<ChunkTarget> ChunkTargets; }; @@ -4419,8 +4618,8 @@ namespace { uint64_t TotalRequestCount = 0; std::atomic<uint64_t> RequestsComplete = 0; std::atomic<uint32_t> ChunkCountWritten = 0; - std::atomic<size_t> TotalPartWriteCount = 0; - std::atomic<size_t> WritePartsComplete = 0; + std::atomic<uint64_t> TotalPartWriteCount = 0; + std::atomic<uint64_t> WritePartsComplete = 0; { ZEN_TRACE_CPU("WriteChunks"); @@ -4441,7 +4640,7 @@ namespace { struct LooseChunkHashWorkData { std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs; - uint32_t RemoteChunkIndex; + uint32_t RemoteChunkIndex = (uint32_t)-1; }; std::vector<LooseChunkHashWorkData> LooseChunkHashWorks; @@ -4687,9 +4886,9 @@ namespace { struct WriteOp { - const ChunkedContentLookup::ChunkSequenceLocation* Target; - uint64_t CacheFileOffset; - uint64_t ChunkSize; + const ChunkedContentLookup::ChunkSequenceLocation* Target = nullptr; + uint64_t CacheFileOffset = (uint64_t)-1; + uint64_t ChunkSize = (uint64_t)-1; }; std::vector<WriteOp> WriteOps; @@ -4821,10 +5020,11 @@ namespace { const uint32_t RemoteChunkIndex = LooseChunkHashWork.RemoteChunkIndex; Work.ScheduleWork( - NetworkPool, // NetworkPool, // GetSyncWorkerPool(),// - [&, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) { + WritePool, // NetworkPool, // GetSyncWorkerPool(),// + [&, RemoteChunkIndex, ChunkTargetPtrs](std::atomic<bool>&) mutable { if (!AbortFlag) { + ZEN_TRACE_CPU("UpdateFolder_ReadPreDownloaded"); std::filesystem::path ExistingCompressedChunkPath; { const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; @@ -4925,39 +5125,37 @@ namespace { if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= LargeAttachmentSize) { ZEN_TRACE_CPU("UpdateFolder_GetLargeChunk"); - DownloadLargeBlob( - Storage, - Path, - RemoteContent, - RemoteLookup, - BuildId, - ChunkHash, - PreferredMultipartChunkSize, - ChunkTargetPtrs, - SequenceIndexChunksLeftToWriteCounters, - Work, - WritePool, - NetworkPool, - WriteToDiskBytes, - BytesDownloaded, - MultipartAttachmentCount, - [&](uint64_t BytesDownloaded) { - LooseChunksBytes += BytesDownloaded; - RequestsComplete++; - if (RequestsComplete == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - }, - [&]() { FilteredWrittenBytesPerSecond.Start(); }, - [&]() { - ChunkCountWritten++; - WritePartsComplete++; - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); - } - }); + DownloadLargeBlob(Storage, + Path / ZenTempDownloadFolderName, + BuildId, + ChunkHash, + PreferredMultipartChunkSize, + Work, + NetworkPool, + BytesDownloaded, + MultipartAttachmentCount, + [&, RemoteChunkIndex, ChunkTargetPtrs](IoBuffer&& Payload) mutable { + RequestsComplete++; + if (RequestsComplete == TotalRequestCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + AsyncWriteDownloadedChunk(Path, + RemoteContent, + RemoteLookup, + RemoteChunkIndex, + std::move(ChunkTargetPtrs), + Work, + WritePool, + std::move(Payload), + SequenceIndexChunksLeftToWriteCounters, + WriteToDiskBytes, + ChunkCountWritten, + WritePartsComplete, + TotalPartWriteCount, + LooseChunksBytes, + FilteredWrittenBytesPerSecond); + }); } else { @@ -4970,129 +5168,27 @@ namespace { } uint64_t BlobSize = BuildBlob.GetSize(); BytesDownloaded += BlobSize; - LooseChunksBytes += BlobSize; + RequestsComplete++; if (RequestsComplete == TotalRequestCount) { FilteredDownloadedBytesPerSecond.Stop(); } - - std::filesystem::path CompressedChunkPath; - - // Check if the dowloaded file is file based and we can move it directly without rewriting it - { - IoBufferFileReference FileRef; - if (BuildBlob.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && - (FileRef.FileChunkSize == BlobSize)) - { - ZEN_TRACE_CPU("UpdateFolder_MoveTempChunk"); - std::error_code Ec; - std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); - if (!Ec) - { - BuildBlob.SetDeleteOnClose(false); - BuildBlob = {}; - CompressedChunkPath = Path / ZenTempDownloadFolderName / ChunkHash.ToHexString(); - std::filesystem::rename(TempBlobPath, CompressedChunkPath, Ec); - if (Ec) - { - CompressedChunkPath = std::filesystem::path{}; - - // Re-open the temp file again - BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); - BuildBlob = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlobSize, true); - BuildBlob.SetDeleteOnClose(true); - } - } - } - } - - if (CompressedChunkPath.empty() && (BlobSize > 512u * 1024u)) - { - ZEN_TRACE_CPU("UpdateFolder_WriteTempChunk"); - // Could not be moved and rather large, lets store it on disk - CompressedChunkPath = Path / ZenTempDownloadFolderName / ChunkHash.ToHexString(); - TemporaryFile::SafeWriteFile(CompressedChunkPath, BuildBlob); - BuildBlob = {}; - } - DownloadedChunks++; - - if (!AbortFlag) - { - Work.ScheduleWork( - WritePool, // WritePool, GetSyncWorkerPool() - [&Path, - &RemoteContent, - &RemoteLookup, - &CacheFolderPath, - &SequenceIndexChunksLeftToWriteCounters, - &WriteToDiskBytes, - &ChunkCountWritten, - &WritePartsComplete, - &TotalPartWriteCount, - &FilteredWrittenBytesPerSecond, - RemoteChunkIndex, - ChunkTargetPtrs, - CompressedChunkPath, - CompressedPart = std::move(BuildBlob)](std::atomic<bool>&) mutable { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_WriteChunk"); - - FilteredWrittenBytesPerSecond.Start(); - - const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; - if (CompressedChunkPath.empty()) - { - ZEN_ASSERT(CompressedPart); - } - else - { - ZEN_ASSERT(!CompressedPart); - CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); - if (!CompressedPart) - { - throw std::runtime_error( - fmt::format("Could not open dowloaded compressed chunk {} from {}", - ChunkHash, - CompressedChunkPath)); - } - } - - std::filesystem::path TargetFolder = Path / ZenTempCacheFolderName; - bool NeedHashVerify = WriteCompressedChunk(TargetFolder, - RemoteContent, - RemoteLookup, - ChunkHash, - ChunkTargetPtrs, - std::move(CompressedPart), - WriteToDiskBytes); - - if (!AbortFlag) - { - ChunkCountWritten++; - WritePartsComplete++; - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); - } - - if (!CompressedChunkPath.empty()) - { - std::filesystem::remove(CompressedChunkPath); - } - - CompleteChunkTargets(TargetFolder, - RemoteContent, - ChunkHash, - ChunkTargetPtrs, - SequenceIndexChunksLeftToWriteCounters, - NeedHashVerify); - } - } - }, - Work.DefaultErrorFunction()); - } + AsyncWriteDownloadedChunk(Path, + RemoteContent, + RemoteLookup, + RemoteChunkIndex, + std::move(ChunkTargetPtrs), + Work, + WritePool, + std::move(BuildBlob), + SequenceIndexChunksLeftToWriteCounters, + WriteToDiskBytes, + ChunkCountWritten, + WritePartsComplete, + TotalPartWriteCount, + LooseChunksBytes, + FilteredWrittenBytesPerSecond); } } } @@ -5375,7 +5471,7 @@ namespace { WritePool, // WritePool, GetSyncWorkerPool() [&RemoteContent, &RemoteLookup, - &CacheFolderPath, + CacheFolderPath, &RemoteChunkIndexNeedsCopyFromSourceFlags, &SequenceIndexChunksLeftToWriteCounters, BlockIndex, @@ -5550,7 +5646,10 @@ namespace { // Clean target folder ZEN_CONSOLE("Wiping {}", Path); - CleanDirectory(Path, DefaultExcludeFolders); + if (!CleanDirectory(Path, DefaultExcludeFolders)) + { + ZEN_WARN("Some files in {} could not be removed", Path); + } } else { @@ -6415,8 +6514,10 @@ namespace { ZEN_CONSOLE("Downloaded build in {}.", NiceTimeSpanMs(DownloadTimer.GetElapsedTimeMs())); } } - CleanDirectory(ZenTempFolder, {}); - std::filesystem::remove(ZenTempFolder); + if (CleanDirectory(ZenTempFolder, {})) + { + std::filesystem::remove(ZenTempFolder); + } } void DiffFolders(const std::filesystem::path& BasePath, const std::filesystem::path& ComparePath, bool OnlyChunked) @@ -7445,7 +7546,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { m_StoragePath = GetRunningExecutablePath().parent_path() / ".tmpstore"; CreateDirectories(m_StoragePath); - CleanDirectory(m_StoragePath); + CleanDirectory(m_StoragePath, {}); } auto _ = MakeGuard([&]() { if (m_BuildsUrl.empty() && m_StoragePath.empty()) diff --git a/src/zen/zen.cpp b/src/zen/zen.cpp index 4e6161e86..0fcf9d871 100644 --- a/src/zen/zen.cpp +++ b/src/zen/zen.cpp @@ -577,12 +577,15 @@ main(int argc, char** argv) GlobalOptions.PassthroughArgs = PassthroughArgs; GlobalOptions.PassthroughArgV = PassthroughArgV; + std::string MemoryOptions; + std::string SubCommand = "<None>"; cxxopts::Options Options("zen", "Zen management tool"); Options.add_options()("d, debug", "Enable debugging", cxxopts::value<bool>(GlobalOptions.IsDebug)); Options.add_options()("v, verbose", "Enable verbose logging", cxxopts::value<bool>(GlobalOptions.IsVerbose)); + Options.add_options()("malloc", "Configure memory allocator subsystem", cxxopts::value(MemoryOptions)->default_value("mimalloc")); Options.add_options()("help", "Show command line help"); Options.add_options()("c, command", "Sub command", cxxopts::value<std::string>(SubCommand)); diff --git a/src/zenutil/chunkedcontent.cpp b/src/zenutil/chunkedcontent.cpp index 4ca89d996..bb1ee5183 100644 --- a/src/zenutil/chunkedcontent.cpp +++ b/src/zenutil/chunkedcontent.cpp @@ -96,6 +96,8 @@ namespace { uint32_t PathIndex, std::atomic<bool>& AbortFlag) { + ZEN_TRACE_CPU("ChunkFolderContent"); + const uint64_t RawSize = OutChunkedContent.RawSizes[PathIndex]; const std::filesystem::path& Path = OutChunkedContent.Paths[PathIndex]; @@ -136,6 +138,8 @@ namespace { } else { + ZEN_TRACE_CPU("HashOnly"); + IoBuffer Buffer = IoBufferBuilder::MakeFromFile((FolderPath / Path).make_preferred()); const IoHash Hash = IoHash::HashBuffer(Buffer, &Stats.BytesHashed); @@ -228,6 +232,7 @@ FolderContent::operator==(const FolderContent& Rhs) const bool FolderContent::AreKnownFilesEqual(const FolderContent& Rhs) const { + ZEN_TRACE_CPU("FolderContent::AreKnownFilesEqual"); tsl::robin_map<std::string, size_t> RhsPathToIndex; const size_t RhsPathCount = Rhs.Paths.size(); RhsPathToIndex.reserve(RhsPathCount); @@ -259,6 +264,7 @@ FolderContent::AreKnownFilesEqual(const FolderContent& Rhs) const void FolderContent::UpdateState(const FolderContent& Rhs, std::vector<uint32_t>& OutPathIndexesOufOfDate) { + ZEN_TRACE_CPU("FolderContent::UpdateState"); tsl::robin_map<std::string, uint32_t> RhsPathToIndex; const uint32_t RhsPathCount = gsl::narrow<uint32_t>(Rhs.Paths.size()); RhsPathToIndex.reserve(RhsPathCount); @@ -297,6 +303,7 @@ FolderContent::UpdateState(const FolderContent& Rhs, std::vector<uint32_t>& OutP FolderContent GetUpdatedContent(const FolderContent& Old, const FolderContent& New, std::vector<std::filesystem::path>& OutDeletedPathIndexes) { + ZEN_TRACE_CPU("FolderContent::GetUpdatedContent"); FolderContent Result = {.Platform = Old.Platform}; tsl::robin_map<std::string, uint32_t> NewPathToIndex; const uint32_t NewPathCount = gsl::narrow<uint32_t>(New.Paths.size()); @@ -332,6 +339,7 @@ GetUpdatedContent(const FolderContent& Old, const FolderContent& New, std::vecto void SaveFolderContentToCompactBinary(const FolderContent& Content, CbWriter& Output) { + ZEN_TRACE_CPU("SaveFolderContentToCompactBinary"); Output.AddString("platform"sv, ToString(Content.Platform)); compactbinary_helpers::WriteArray(Content.Paths, "paths"sv, Output); compactbinary_helpers::WriteArray(Content.RawSizes, "rawSizes"sv, Output); @@ -342,6 +350,7 @@ SaveFolderContentToCompactBinary(const FolderContent& Content, CbWriter& Output) FolderContent LoadFolderContentToCompactBinary(CbObjectView Input) { + ZEN_TRACE_CPU("LoadFolderContentToCompactBinary"); FolderContent Content; Content.Platform = FromString(Input["platform"sv].AsString(), GetSourceCurrentPlatform()); compactbinary_helpers::ReadArray("paths"sv, Input, Content.Paths); @@ -494,6 +503,7 @@ GetFolderContent(GetFolderContentStatistics& Stats, void SaveChunkedFolderContentToCompactBinary(const ChunkedFolderContent& Content, CbWriter& Output) { + ZEN_TRACE_CPU("SaveChunkedFolderContentToCompactBinary"); Output.AddString("platform"sv, ToString(Content.Platform)); compactbinary_helpers::WriteArray(Content.Paths, "paths"sv, Output); compactbinary_helpers::WriteArray(Content.RawSizes, "rawSizes"sv, Output); @@ -512,6 +522,7 @@ SaveChunkedFolderContentToCompactBinary(const ChunkedFolderContent& Content, CbW ChunkedFolderContent LoadChunkedFolderContentToCompactBinary(CbObjectView Input) { + ZEN_TRACE_CPU("LoadChunkedFolderContentToCompactBinary"); ChunkedFolderContent Content; Content.Platform = FromString(Input["platform"sv].AsString(), GetSourceCurrentPlatform()); compactbinary_helpers::ReadArray("paths"sv, Input, Content.Paths); @@ -788,7 +799,7 @@ BuildChunkedContentLookup(const ChunkedFolderContent& Content) struct ChunkLocationReference { - uint32_t ChunkIndex; + uint32_t ChunkIndex = (uint32_t)-1; ChunkedContentLookup::ChunkSequenceLocation Location; }; @@ -853,7 +864,7 @@ BuildChunkedContentLookup(const ChunkedFolderContent& Content) { Result.ChunkHashToChunkIndex.insert({Content.ChunkedContent.ChunkHashes[ChunkIndex], ChunkIndex}); uint32_t Count = 0; - while (Locations[RangeOffset + Count].ChunkIndex == ChunkIndex) + while ((RangeOffset + Count < Locations.size()) && (Locations[RangeOffset + Count].ChunkIndex == ChunkIndex)) { Result.ChunkSequenceLocations.push_back(Locations[RangeOffset + Count].Location); Count++; diff --git a/src/zenutil/chunkedfile.cpp b/src/zenutil/chunkedfile.cpp index 4f9344039..a2c041ffd 100644 --- a/src/zenutil/chunkedfile.cpp +++ b/src/zenutil/chunkedfile.cpp @@ -3,6 +3,7 @@ #include <zenutil/chunkedfile.h> #include <zencore/basicfile.h> +#include <zencore/trace.h> #include "chunking.h" @@ -33,6 +34,7 @@ namespace { IoBuffer SerializeChunkedInfo(const ChunkedInfo& Info) { + ZEN_TRACE_CPU("SerializeChunkedInfo"); size_t HeaderSize = RoundUp(sizeof(ChunkedHeader), 16) + RoundUp(sizeof(uint32_t) * Info.ChunkSequence.size(), 16) + RoundUp(sizeof(IoHash) * Info.ChunkHashes.size(), 16); IoBuffer HeaderData(HeaderSize); @@ -65,6 +67,7 @@ SerializeChunkedInfo(const ChunkedInfo& Info) ChunkedInfo DeserializeChunkedInfo(IoBuffer& Buffer) { + ZEN_TRACE_CPU("DeserializeChunkedInfo"); MemoryView View = Buffer.GetView(); ChunkedHeader Header; { @@ -99,6 +102,7 @@ DeserializeChunkedInfo(IoBuffer& Buffer) void Reconstruct(const ChunkedInfo& Info, const std::filesystem::path& TargetPath, std::function<IoBuffer(const IoHash& ChunkHash)> GetChunk) { + ZEN_TRACE_CPU("Reconstruct"); BasicFile Reconstructed; Reconstructed.Open(TargetPath, BasicFile::Mode::kTruncate); BasicFileWriter ReconstructedWriter(Reconstructed, 64 * 1024); @@ -119,6 +123,8 @@ ChunkData(BasicFile& RawData, std::atomic<uint64_t>* BytesProcessed, std::atomic<bool>* AbortFlag) { + ZEN_TRACE_CPU("ChunkData"); + ChunkedInfoWithSource Result; tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> FoundChunks; diff --git a/src/zenutil/chunkingcontroller.cpp b/src/zenutil/chunkingcontroller.cpp index 017d12433..2a7057a46 100644 --- a/src/zenutil/chunkingcontroller.cpp +++ b/src/zenutil/chunkingcontroller.cpp @@ -4,6 +4,7 @@ #include <zencore/basicfile.h> #include <zencore/compactbinarybuilder.h> +#include <zencore/trace.h> ZEN_THIRD_PARTY_INCLUDES_START #include <tsl/robin_map.h> @@ -61,6 +62,7 @@ public: std::atomic<uint64_t>& BytesProcessed, std::atomic<bool>& AbortFlag) const override { + ZEN_TRACE_CPU("BasicChunkingController::ProcessFile"); const bool ExcludeFromChunking = std::find(m_ChunkExcludeExtensions.begin(), m_ChunkExcludeExtensions.end(), InputPath.extension()) != m_ChunkExcludeExtensions.end(); @@ -136,6 +138,7 @@ public: std::atomic<uint64_t>& BytesProcessed, std::atomic<bool>& AbortFlag) const override { + ZEN_TRACE_CPU("ChunkingControllerWithFixedChunking::ProcessFile"); if (RawSize < m_ChunkFileSizeLimit) { return false; @@ -145,6 +148,7 @@ public: if (FixedChunking) { + ZEN_TRACE_CPU("FixedChunking"); IoHashStream FullHash; IoBuffer Source = IoBufferBuilder::MakeFromFile(InputPath); uint64_t Offset = 0; diff --git a/src/zenutil/filebuildstorage.cpp b/src/zenutil/filebuildstorage.cpp index e57109006..47a4e1cc4 100644 --- a/src/zenutil/filebuildstorage.cpp +++ b/src/zenutil/filebuildstorage.cpp @@ -8,6 +8,7 @@ #include <zencore/fmtutils.h> #include <zencore/scopeguard.h> #include <zencore/timer.h> +#include <zencore/trace.h> namespace zen { @@ -36,6 +37,7 @@ public: virtual CbObject ListBuilds(CbObject Query) override { + ZEN_TRACE_CPU("FileBuildStorage::ListBuilds"); ZEN_UNUSED(Query); SimulateLatency(Query.GetSize(), 0); @@ -72,6 +74,7 @@ public: virtual CbObject PutBuild(const Oid& BuildId, const CbObject& MetaData) override { + ZEN_TRACE_CPU("FileBuildStorage::PutBuild"); SimulateLatency(MetaData.GetSize(), 0); Stopwatch ExecutionTimer; @@ -93,6 +96,7 @@ public: virtual CbObject GetBuild(const Oid& BuildId) override { + ZEN_TRACE_CPU("FileBuildStorage::GetBuild"); SimulateLatency(0, 0); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); @@ -105,6 +109,7 @@ public: virtual void FinalizeBuild(const Oid& BuildId) override { + ZEN_TRACE_CPU("FileBuildStorage::FinalizeBuild"); SimulateLatency(0, 0); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); @@ -119,6 +124,7 @@ public: std::string_view PartName, const CbObject& MetaData) override { + ZEN_TRACE_CPU("FileBuildStorage::PutBuildPart"); SimulateLatency(MetaData.GetSize(), 0); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); @@ -164,6 +170,7 @@ public: virtual CbObject GetBuildPart(const Oid& BuildId, const Oid& BuildPartId) override { + ZEN_TRACE_CPU("FileBuildStorage::GetBuildPart"); SimulateLatency(0, 0); Stopwatch ExecutionTimer; @@ -186,6 +193,7 @@ public: virtual std::vector<IoHash> FinalizeBuildPart(const Oid& BuildId, const Oid& BuildPartId, const IoHash& PartHash) override { + ZEN_TRACE_CPU("FileBuildStorage::FinalizeBuildPart"); SimulateLatency(0, 0); Stopwatch ExecutionTimer; @@ -215,6 +223,7 @@ public: ZenContentType ContentType, const CompositeBuffer& Payload) override { + ZEN_TRACE_CPU("FileBuildStorage::PutBuildBlob"); ZEN_UNUSED(BuildId); ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary); SimulateLatency(Payload.GetSize(), 0); @@ -242,6 +251,7 @@ public: std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter, std::function<void(uint64_t, bool)>&& OnSentBytes) override { + ZEN_TRACE_CPU("FileBuildStorage::PutLargeBuildBlob"); ZEN_UNUSED(BuildId); ZEN_UNUSED(ContentType); SimulateLatency(0, 0); @@ -281,6 +291,7 @@ public: uint64_t Size = Min(32u * 1024u * 1024u, PayloadSize - Offset); WorkItems.push_back([this, RawHash, BlockPath, Workload, Offset, Size]() { + ZEN_TRACE_CPU("FileBuildStorage::PutLargeBuildBlob_Work"); IoBuffer PartPayload = Workload->Transmitter(Offset, Size); SimulateLatency(PartPayload.GetSize(), 0); @@ -327,6 +338,7 @@ public: virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override { + ZEN_TRACE_CPU("FileBuildStorage::GetBuildBlob"); ZEN_UNUSED(BuildId); SimulateLatency(0, 0); Stopwatch ExecutionTimer; @@ -363,6 +375,7 @@ public: uint64_t ChunkSize, std::function<void(uint64_t Offset, const IoBuffer& Chunk, uint64_t BytesRemaining)>&& Receiver) override { + ZEN_TRACE_CPU("FileBuildStorage::GetLargeBuildBlob"); ZEN_UNUSED(BuildId); SimulateLatency(0, 0); Stopwatch ExecutionTimer; @@ -392,6 +405,7 @@ public: { uint64_t Size = Min(ChunkSize, BlobSize - Offset); WorkItems.push_back([this, BlockPath, Workload, Offset, Size]() { + ZEN_TRACE_CPU("FileBuildStorage::GetLargeBuildBlob_Work"); SimulateLatency(0, 0); IoBuffer PartPayload(Size); Workload->BlobFile.Read(PartPayload.GetMutableView().GetData(), Size, Offset); @@ -411,6 +425,7 @@ public: virtual void PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) override { + ZEN_TRACE_CPU("FileBuildStorage::PutBlockMetadata"); ZEN_UNUSED(BuildId); SimulateLatency(MetaData.GetSize(), 0); @@ -429,6 +444,7 @@ public: virtual std::vector<ChunkBlockDescription> FindBlocks(const Oid& BuildId) override { + ZEN_TRACE_CPU("FileBuildStorage::FindBlocks"); ZEN_UNUSED(BuildId); SimulateLatency(0, 0); Stopwatch ExecutionTimer; @@ -461,6 +477,7 @@ public: virtual std::vector<ChunkBlockDescription> GetBlockMetadata(const Oid& BuildId, std::span<const IoHash> BlockHashes) override { + ZEN_TRACE_CPU("FileBuildStorage::GetBlockMetadata"); ZEN_UNUSED(BuildId); SimulateLatency(0, 0); Stopwatch ExecutionTimer; diff --git a/src/zenutil/include/zenutil/chunkedcontent.h b/src/zenutil/include/zenutil/chunkedcontent.h index 309341550..57b55cb8e 100644 --- a/src/zenutil/include/zenutil/chunkedcontent.h +++ b/src/zenutil/include/zenutil/chunkedcontent.h @@ -124,8 +124,8 @@ struct ChunkedContentLookup { struct ChunkSequenceLocation { - uint32_t SequenceIndex; - uint64_t Offset; + uint32_t SequenceIndex = (uint32_t)-1; + uint64_t Offset = (uint64_t)-1; }; tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkHashToChunkIndex; tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> RawHashToSequenceIndex; |