diff options
| author | Dan Engelbrecht <[email protected]> | 2025-10-14 18:24:17 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-14 18:24:17 +0200 |
| commit | 74c90983853f5ef0966be35544173d6efd85db17 (patch) | |
| tree | e4fe6faa53697f2b1503e3919527d5ad18260890 /src/zenremotestore | |
| parent | refactor builds cmd part2 (#572) (diff) | |
| download | zen-74c90983853f5ef0966be35544173d6efd85db17.tar.xz zen-74c90983853f5ef0966be35544173d6efd85db17.zip | |
refactor builds cmd part3 (#573)
* move lambdas to member functions
* add BuildsOperationValidateBuildPart
Diffstat (limited to 'src/zenremotestore')
| -rw-r--r-- | src/zenremotestore/builds/buildstorageoperations.cpp | 724 | ||||
| -rw-r--r-- | src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h | 75 |
2 files changed, 620 insertions, 179 deletions
diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp index a4f4237cd..60058c4a4 100644 --- a/src/zenremotestore/builds/buildstorageoperations.cpp +++ b/src/zenremotestore/builds/buildstorageoperations.cpp @@ -544,6 +544,160 @@ namespace { return SB.ToString(); } + void DownloadLargeBlob(BuildStorage& Storage, + const std::filesystem::path& DownloadFolder, + const Oid& BuildId, + const IoHash& ChunkHash, + const std::uint64_t PreferredMultipartChunkSize, + ParallelWork& Work, + WorkerThreadPool& NetworkPool, + DownloadStatistics& DownloadStats, + std::function<void(IoBuffer&& Payload)>&& OnDownloadComplete) + { + ZEN_TRACE_CPU("DownloadLargeBlob"); + + struct WorkloadData + { + TemporaryFile TempFile; + }; + std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>()); + + std::error_code Ec; + Workload->TempFile.CreateTemporary(DownloadFolder, Ec); + if (Ec) + { + throw std::runtime_error( + fmt::format("Failed opening temporary file '{}': {} ({})", Workload->TempFile.GetPath(), Ec.message(), Ec.value())); + } + std::vector<std::function<void()>> WorkItems = Storage.GetLargeBuildBlob( + BuildId, + ChunkHash, + PreferredMultipartChunkSize, + [&Work, Workload, &DownloadStats](uint64_t Offset, const IoBuffer& Chunk) { + DownloadStats.DownloadedChunkByteCount += Chunk.GetSize(); + + if (!Work.IsAborted()) + { + ZEN_TRACE_CPU("Async_DownloadLargeBlob_OnReceive"); + Workload->TempFile.Write(Chunk.GetView(), Offset); + } + }, + [&Work, Workload, &DownloadStats, OnDownloadComplete = std::move(OnDownloadComplete)]() { + DownloadStats.DownloadedChunkCount++; + if (!Work.IsAborted()) + { + ZEN_TRACE_CPU("Async_DownloadLargeBlob_OnComplete"); + + uint64_t PayloadSize = Workload->TempFile.FileSize(); + void* FileHandle = Workload->TempFile.Detach(); + ZEN_ASSERT(FileHandle != nullptr); + IoBuffer Payload(IoBuffer::File, FileHandle, 0, PayloadSize, true); + Payload.SetDeleteOnClose(true); + OnDownloadComplete(std::move(Payload)); + } + }); + if (!WorkItems.empty()) + { + DownloadStats.MultipartAttachmentCount++; + } + for (auto& WorkItem : WorkItems) + { + Work.ScheduleWork(NetworkPool, [WorkItem = std::move(WorkItem)](std::atomic<bool>& AbortFlag) { + if (!AbortFlag) + { + ZEN_TRACE_CPU("Async_DownloadLargeBlob_Work"); + + WorkItem(); + } + }); + } + } + + CompositeBuffer ValidateBlob(std::atomic<bool>& AbortFlag, + IoBuffer&& Payload, + const IoHash& BlobHash, + uint64_t& OutCompressedSize, + uint64_t& OutDecompressedSize) + { + ZEN_TRACE_CPU("ValidateBlob"); + + if (Payload.GetContentType() != ZenContentType::kCompressedBinary) + { + throw std::runtime_error(fmt::format("Blob {} ({} bytes) has unexpected content type '{}'", + BlobHash, + Payload.GetSize(), + ToString(Payload.GetContentType()))); + } + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, RawSize); + if (!Compressed) + { + throw std::runtime_error(fmt::format("Blob {} ({} bytes) compressed header is invalid", BlobHash, Payload.GetSize())); + } + if (RawHash != BlobHash) + { + throw std::runtime_error( + fmt::format("Blob {} ({} bytes) compressed header has a mismatching raw hash {}", BlobHash, Payload.GetSize(), RawHash)); + } + + IoHashStream Hash; + bool CouldDecompress = Compressed.DecompressToStream( + 0, + RawSize, + [&AbortFlag, &Hash](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { + ZEN_UNUSED(SourceOffset, SourceSize, Offset); + if (!AbortFlag) + { + for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) + { + Hash.Append(Segment.GetView()); + } + return true; + } + return false; + }); + + if (AbortFlag) + { + return CompositeBuffer{}; + } + + if (!CouldDecompress) + { + throw std::runtime_error( + fmt::format("Blob {} ({} bytes) failed to decompress - header information mismatch", BlobHash, Payload.GetSize())); + } + IoHash ValidateRawHash = Hash.GetHash(); + if (ValidateRawHash != BlobHash) + { + throw std::runtime_error(fmt::format("Blob {} ({} bytes) decompressed hash {} does not match header information", + BlobHash, + Payload.GetSize(), + ValidateRawHash)); + } + OodleCompressor Compressor; + OodleCompressionLevel CompressionLevel; + uint64_t BlockSize; + if (!Compressed.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) + { + throw std::runtime_error(fmt::format("Blob {} ({} bytes) failed to get compression details", BlobHash, Payload.GetSize())); + } + OutCompressedSize = Payload.GetSize(); + OutDecompressedSize = RawSize; + if (CompressionLevel == OodleCompressionLevel::None) + { + // Only decompress to composite if we need it for block verification + CompositeBuffer DecompressedComposite = Compressed.DecompressToComposite(); + if (!DecompressedComposite) + { + throw std::runtime_error(fmt::format("Blob {} ({} bytes) failed to decompress to composite", BlobHash, Payload.GetSize())); + } + return DecompressedComposite; + } + return CompositeBuffer{}; + } + } // namespace class ReadFileCache @@ -1264,6 +1418,8 @@ BuildsOperationUpdateFolder::BuildsOperationUpdateFolder(BuildOpLogOutput& , m_LooseChunkHashes(LooseChunkHashes) , m_Options(Options) , m_CacheFolderPath(ZenTempCacheFolderPath(m_Options.ZenFolderPath)) +, m_TempDownloadFolderPath(ZenTempDownloadFolderPath(m_Options.ZenFolderPath)) +, m_TempBlockFolderPath(ZenTempBlockFolderPath(m_Options.ZenFolderPath)) { } @@ -1288,6 +1444,10 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) m_LogOutput.SetLogOperationProgress(TaskSteps::ScanExistingData, TaskSteps::StepCount); + CreateDirectories(m_CacheFolderPath); + CreateDirectories(m_TempDownloadFolderPath); + CreateDirectories(m_TempBlockFolderPath); + Stopwatch IndexTimer; if (!m_Options.IsQuiet) @@ -1836,8 +1996,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) { TotalPartWriteCount++; - std::filesystem::path BlockPath = - ZenTempBlockFolderPath(m_Options.ZenFolderPath) / BlockDescription.BlockHash.ToHexString(); + std::filesystem::path BlockPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); if (IsFile(BlockPath)) { CachedChunkBlockIndexes.push_back(BlockIndex); @@ -2279,7 +2438,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) m_Options.LargeAttachmentSize) { DownloadLargeBlob(*m_Storage.BuildStorage, - ZenTempDownloadFolderPath(m_Options.ZenFolderPath), + m_TempDownloadFolderPath, m_BuildId, ChunkHash, m_Options.PreferredMultipartChunkSize, @@ -2469,7 +2628,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) FilteredWrittenBytesPerSecond.Start(); std::filesystem::path BlockChunkPath = - ZenTempBlockFolderPath(m_Options.ZenFolderPath) / BlockDescription.BlockHash.ToHexString(); + m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); IoBuffer BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); if (!BlockBuffer) { @@ -2584,12 +2743,11 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) if (!Ec) { BlockBuffer.SetDeleteOnClose(false); - BlockBuffer = {}; - BlockChunkPath = - ZenTempBlockFolderPath(m_Options.ZenFolderPath) / fmt::format("{}_{:x}_{:x}", - BlockDescription.BlockHash, - BlockRange.RangeStart, - BlockRange.RangeLength); + BlockBuffer = {}; + BlockChunkPath = m_TempBlockFolderPath / fmt::format("{}_{:x}_{:x}", + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength); RenameFile(TempBlobPath, BlockChunkPath, Ec); if (Ec) { @@ -2609,7 +2767,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) ZEN_TRACE_CPU("WriteTempPartialBlock"); // Could not be moved and rather large, lets store it on disk BlockChunkPath = - ZenTempBlockFolderPath(m_Options.ZenFolderPath) / + m_TempBlockFolderPath / fmt::format("{}_{:x}_{:x}", BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength); TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); BlockBuffer = {}; @@ -2773,9 +2931,8 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) if (!Ec) { BlockBuffer.SetDeleteOnClose(false); - BlockBuffer = {}; - BlockChunkPath = - ZenTempBlockFolderPath(m_Options.ZenFolderPath) / BlockDescription.BlockHash.ToHexString(); + BlockBuffer = {}; + BlockChunkPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); RenameFile(TempBlobPath, BlockChunkPath, Ec); if (Ec) { @@ -2794,8 +2951,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) { ZEN_TRACE_CPU("WriteTempFullBlock"); // Could not be moved and rather large, lets store it on disk - BlockChunkPath = - ZenTempBlockFolderPath(m_Options.ZenFolderPath) / BlockDescription.BlockHash.ToHexString(); + BlockChunkPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); BlockBuffer = {}; } @@ -3540,7 +3696,7 @@ BuildsOperationUpdateFolder::ScanTempBlocksFolder(tsl::robin_map<IoHash, uint32_ } DirectoryContent BlockDirContent; - GetDirectoryContent(ZenTempBlockFolderPath(m_Options.ZenFolderPath), + GetDirectoryContent(m_TempBlockFolderPath, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeFileSizes, BlockDirContent); OutCachedBlocksFound.reserve(BlockDirContent.Files.size()); @@ -3792,7 +3948,7 @@ BuildsOperationUpdateFolder::FindDownloadedChunk(const IoHash& ChunkHash) { ZEN_TRACE_CPU("FindDownloadedChunk"); - std::filesystem::path CompressedChunkPath = ZenTempDownloadFolderPath(m_Options.ZenFolderPath) / ChunkHash.ToHexString(); + std::filesystem::path CompressedChunkPath = m_TempDownloadFolderPath / ChunkHash.ToHexString(); if (IsFile(CompressedChunkPath)) { IoBuffer ExistingCompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); @@ -4590,7 +4746,7 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa { Payload.SetDeleteOnClose(false); Payload = {}; - CompressedChunkPath = ZenTempDownloadFolderPath(ZenFolderPath) / ChunkHash.ToHexString(); + CompressedChunkPath = m_TempDownloadFolderPath / ChunkHash.ToHexString(); RenameFile(TempBlobPath, CompressedChunkPath, Ec); if (Ec) { @@ -4609,7 +4765,7 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa { ZEN_TRACE_CPU("WriteTempChunk"); // Could not be moved and rather large, lets store it on disk - CompressedChunkPath = ZenTempDownloadFolderPath(ZenFolderPath) / ChunkHash.ToHexString(); + CompressedChunkPath = m_TempDownloadFolderPath / ChunkHash.ToHexString(); TemporaryFile::SafeWriteFile(CompressedChunkPath, Payload); Payload = {}; } @@ -4929,72 +5085,6 @@ BuildsOperationUploadFolder::Execute() ChunkedFolderContent LocalContent; { - auto IsAcceptedFolder = [this](const std::string_view& RelativePath) -> bool { - for (const std::string& ExcludeFolder : m_Options.ExcludeFolders) - { - if (RelativePath.starts_with(ExcludeFolder)) - { - if (RelativePath.length() == ExcludeFolder.length()) - { - return false; - } - else if (RelativePath[ExcludeFolder.length()] == '/') - { - return false; - } - } - } - return true; - }; - - auto IsAcceptedFile = [this](const std::string_view& RelativePath, uint64_t, uint32_t) -> bool { - for (const std::string& ExcludeExtension : m_Options.ExcludeExtensions) - { - if (RelativePath.ends_with(ExcludeExtension)) - { - return false; - } - } - return true; - }; - - auto ParseManifest = [](const std::filesystem::path& Path, - const std::filesystem::path& ManifestPath) -> std::vector<std::filesystem::path> { - std::vector<std::filesystem::path> AssetPaths; - std::filesystem::path AbsoluteManifestPath = - MakeSafeAbsolutePath(ManifestPath.is_absolute() ? ManifestPath : Path / ManifestPath); - IoBuffer ManifestContent = ReadFile(AbsoluteManifestPath).Flatten(); - std::string_view ManifestString((const char*)ManifestContent.GetView().GetData(), ManifestContent.GetSize()); - std::string_view::size_type Offset = 0; - while (Offset < ManifestContent.GetSize()) - { - size_t PathBreakOffset = ManifestString.find_first_of("\t\r\n", Offset); - if (PathBreakOffset == std::string_view::npos) - { - PathBreakOffset = ManifestContent.GetSize(); - } - std::string_view AssetPath = ManifestString.substr(Offset, PathBreakOffset - Offset); - if (!AssetPath.empty()) - { - AssetPaths.emplace_back(std::filesystem::path(AssetPath)); - } - Offset = PathBreakOffset; - size_t EolOffset = ManifestString.find_first_of("\r\n", Offset); - if (EolOffset == std::string_view::npos) - { - break; - } - Offset = EolOffset; - size_t LineBreakOffset = ManifestString.find_first_not_of("\t\r\n", Offset); - if (LineBreakOffset == std::string_view::npos) - { - break; - } - Offset = LineBreakOffset; - } - return AssetPaths; - }; - Stopwatch ScanTimer; FolderContent Content; if (m_ManifestPath.empty()) @@ -5013,15 +5103,10 @@ BuildsOperationUploadFolder::Execute() Content = GetFolderContent( m_LocalFolderScanStats, m_Path, - std::move(IsAcceptedFolder), - [this, &IsAcceptedFile, &ExcludeAssetPaths](const std::string_view& RelativePath, - uint64_t Size, - uint32_t Attributes) -> bool { - if (RelativePath == m_Options.ZenExcludeManifestName) - { - return false; - } - if (!IsAcceptedFile(RelativePath, Size, Attributes)) + [this](const std::string_view& RelativePath) { return IsAcceptedFolder(RelativePath); }, + [this, &ExcludeAssetPaths](const std::string_view& RelativePath, uint64_t Size, uint32_t Attributes) -> bool { + ZEN_UNUSED(Size, Attributes); + if (!IsAcceptedFile(RelativePath)) { return false; } @@ -5342,7 +5427,7 @@ BuildsOperationUploadFolder::Execute() } for (const ChunkBlockDescription& Block : AllChunkBlockDescriptions) { - for (const IoHash& ChunkHash : Block.ChunkHashes) + for (const IoHash& ChunkHash : Block.ChunkRawHashes) { ChunkHashToAbsoluteChunkIndex.insert({ChunkHash, AbsoluteChunkHashes.size()}); AbsoluteChunkHashes.push_back(ChunkHash); @@ -5638,24 +5723,6 @@ BuildsOperationUploadFolder::Execute() } } - // if (m_PostUploadVerify && !m_AbortFlag) - // { - // //m_LogOutput.SetLogOperationProgress(TaskSteps::Validate, TaskSteps::StepCount); - // ValidateBuildPart(*m_Storage.BuildStorage, m_BuildId, m_BuildPartId, m_BuildPartName, ValidateStats, ValidateDownloadStats); - // std::string ValidateInfo; - // if (m_PostUploadVerify) - // { - // const uint64_t DownloadedCount = ValidateDownloadStats.DownloadedChunkCount + ValidateDownloadStats.DownloadedBlockCount; - // const uint64_t DownloadedByteCount = - // ValidateDownloadStats.DownloadedChunkByteCount + ValidateDownloadStats.DownloadedBlockByteCount; - // ValidateInfo = fmt::format("\n Verified: {:>8} ({}), {}B/sec, {}", - // DownloadedCount, - // NiceBytes(DownloadedByteCount), - // NiceNum(GetBytesPerSecond(ValidateStats.ElapsedWallTimeUS, DownloadedByteCount)), - // NiceTimeSpanMs(ValidateStats.ElapsedWallTimeUS / 1000)); - // } - // } - m_Storage.BuildStorage->PutBuildPartStats( m_BuildId, m_BuildPartId, @@ -5674,74 +5741,78 @@ BuildsOperationUploadFolder::Execute() m_LogOutput.SetLogOperationProgress(TaskSteps::Cleanup, TaskSteps::StepCount); } -void -DownloadLargeBlob(BuildStorage& Storage, - const std::filesystem::path& DownloadFolder, - const Oid& BuildId, - const IoHash& ChunkHash, - const std::uint64_t PreferredMultipartChunkSize, - ParallelWork& Work, - WorkerThreadPool& NetworkPool, - DownloadStatistics& DownloadStats, - std::function<void(IoBuffer&& Payload)>&& OnDownloadComplete) +std::vector<std::filesystem::path> +BuildsOperationUploadFolder::ParseManifest(const std::filesystem::path& Path, const std::filesystem::path& ManifestPath) { - ZEN_TRACE_CPU("DownloadLargeBlob"); - - struct WorkloadData - { - TemporaryFile TempFile; - }; - std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>()); - - std::error_code Ec; - Workload->TempFile.CreateTemporary(DownloadFolder, Ec); - if (Ec) + std::vector<std::filesystem::path> AssetPaths; + std::filesystem::path AbsoluteManifestPath = MakeSafeAbsolutePath(ManifestPath.is_absolute() ? ManifestPath : Path / ManifestPath); + IoBuffer ManifestContent = ReadFile(AbsoluteManifestPath).Flatten(); + std::string_view ManifestString((const char*)ManifestContent.GetView().GetData(), ManifestContent.GetSize()); + std::string_view::size_type Offset = 0; + while (Offset < ManifestContent.GetSize()) { - throw std::runtime_error( - fmt::format("Failed opening temporary file '{}': {} ({})", Workload->TempFile.GetPath(), Ec.message(), Ec.value())); + size_t PathBreakOffset = ManifestString.find_first_of("\t\r\n", Offset); + if (PathBreakOffset == std::string_view::npos) + { + PathBreakOffset = ManifestContent.GetSize(); + } + std::string_view AssetPath = ManifestString.substr(Offset, PathBreakOffset - Offset); + if (!AssetPath.empty()) + { + AssetPaths.emplace_back(std::filesystem::path(AssetPath)); + } + Offset = PathBreakOffset; + size_t EolOffset = ManifestString.find_first_of("\r\n", Offset); + if (EolOffset == std::string_view::npos) + { + break; + } + Offset = EolOffset; + size_t LineBreakOffset = ManifestString.find_first_not_of("\t\r\n", Offset); + if (LineBreakOffset == std::string_view::npos) + { + break; + } + Offset = LineBreakOffset; } - std::vector<std::function<void()>> WorkItems = Storage.GetLargeBuildBlob( - BuildId, - ChunkHash, - PreferredMultipartChunkSize, - [&Work, Workload, &DownloadStats](uint64_t Offset, const IoBuffer& Chunk) { - DownloadStats.DownloadedChunkByteCount += Chunk.GetSize(); + return AssetPaths; +} - if (!Work.IsAborted()) +bool +BuildsOperationUploadFolder::IsAcceptedFolder(const std::string_view& RelativePath) const +{ + for (const std::string& ExcludeFolder : m_Options.ExcludeFolders) + { + if (RelativePath.starts_with(ExcludeFolder)) + { + if (RelativePath.length() == ExcludeFolder.length()) { - ZEN_TRACE_CPU("Async_DownloadLargeBlob_OnReceive"); - Workload->TempFile.Write(Chunk.GetView(), Offset); + return false; } - }, - [&Work, Workload, &DownloadStats, OnDownloadComplete = std::move(OnDownloadComplete)]() { - DownloadStats.DownloadedChunkCount++; - if (!Work.IsAborted()) + else if (RelativePath[ExcludeFolder.length()] == '/') { - ZEN_TRACE_CPU("Async_DownloadLargeBlob_OnComplete"); - - uint64_t PayloadSize = Workload->TempFile.FileSize(); - void* FileHandle = Workload->TempFile.Detach(); - ZEN_ASSERT(FileHandle != nullptr); - IoBuffer Payload(IoBuffer::File, FileHandle, 0, PayloadSize, true); - Payload.SetDeleteOnClose(true); - OnDownloadComplete(std::move(Payload)); + return false; } - }); - if (!WorkItems.empty()) + } + } + return true; +} + +bool +BuildsOperationUploadFolder::IsAcceptedFile(const std::string_view& RelativePath) const +{ + if (RelativePath == m_Options.ZenExcludeManifestName) { - DownloadStats.MultipartAttachmentCount++; + return false; } - for (auto& WorkItem : WorkItems) + for (const std::string& ExcludeExtension : m_Options.ExcludeExtensions) { - Work.ScheduleWork(NetworkPool, [WorkItem = std::move(WorkItem)](std::atomic<bool>& AbortFlag) { - if (!AbortFlag) - { - ZEN_TRACE_CPU("Async_DownloadLargeBlob_Work"); - - WorkItem(); - } - }); + if (RelativePath.ends_with(ExcludeExtension)) + { + return false; + } } + return true; } std::vector<size_t> @@ -7130,4 +7201,317 @@ BuildsOperationUploadFolder::CompressChunk(const ChunkedFolderContent& Content, return std::move(CompressedBlob).GetCompressed(); } +BuildsOperationValidateBuildPart::BuildsOperationValidateBuildPart(BuildOpLogOutput& LogOutput, + BuildStorage& Storage, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag, + WorkerThreadPool& IOWorkerPool, + WorkerThreadPool& NetworkPool, + const Oid& BuildId, + const Oid& BuildPartId, + const std::string_view BuildPartName, + const Options& Options) + +: m_LogOutput(LogOutput) +, m_Storage(Storage) +, m_AbortFlag(AbortFlag) +, m_PauseFlag(PauseFlag) +, m_IOWorkerPool(IOWorkerPool) +, m_NetworkPool(NetworkPool) +, m_BuildId(BuildId) +, m_BuildPartId(BuildPartId) +, m_BuildPartName(BuildPartName) +, m_Options(Options) +{ +} + +void +BuildsOperationValidateBuildPart::Execute() +{ + ZEN_TRACE_CPU("ValidateBuildPart"); + + enum TaskSteps : uint32_t + { + FetchBuild, + FetchBuildPart, + ValidateBlobs, + Cleanup, + StepCount + }; + + auto EndProgress = MakeGuard([&]() { m_LogOutput.SetLogOperationProgress(TaskSteps::StepCount, TaskSteps::StepCount); }); + + Stopwatch Timer; + auto _ = MakeGuard([&]() { + if (!m_Options.IsQuiet) + { + ZEN_CONSOLE("Validated build part {}/{} ('{}') in {}", + m_BuildId, + m_BuildPartId, + m_BuildPartName, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + } + }); + + m_LogOutput.SetLogOperationProgress(TaskSteps::FetchBuild, TaskSteps::StepCount); + + CbObject Build = m_Storage.GetBuild(m_BuildId); + if (!m_BuildPartName.empty()) + { + m_BuildPartId = Build["parts"sv].AsObjectView()[m_BuildPartName].AsObjectId(); + if (m_BuildPartId == Oid::Zero) + { + throw std::runtime_error(fmt::format("Build {} does not have a part named '{}'", m_BuildId, m_BuildPartName)); + } + } + m_ValidateStats.BuildBlobSize = Build.GetSize(); + uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u; + if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0) + { + PreferredMultipartChunkSize = ChunkSize; + } + + m_LogOutput.SetLogOperationProgress(TaskSteps::FetchBuildPart, TaskSteps::StepCount); + + CbObject BuildPart = m_Storage.GetBuildPart(m_BuildId, m_BuildPartId); + m_ValidateStats.BuildPartSize = BuildPart.GetSize(); + if (!m_Options.IsQuiet) + { + ZEN_CONSOLE("Validating build part {}/{} ({})", m_BuildId, m_BuildPartId, NiceBytes(BuildPart.GetSize())); + } + std::vector<IoHash> ChunkAttachments; + for (CbFieldView LooseFileView : BuildPart["chunkAttachments"sv].AsObjectView()["rawHashes"sv]) + { + ChunkAttachments.push_back(LooseFileView.AsBinaryAttachment()); + } + m_ValidateStats.ChunkAttachmentCount = ChunkAttachments.size(); + std::vector<IoHash> BlockAttachments; + for (CbFieldView BlocksView : BuildPart["blockAttachments"sv].AsObjectView()["rawHashes"sv]) + { + BlockAttachments.push_back(BlocksView.AsBinaryAttachment()); + } + m_ValidateStats.BlockAttachmentCount = BlockAttachments.size(); + + std::vector<ChunkBlockDescription> VerifyBlockDescriptions = + ParseChunkBlockDescriptionList(m_Storage.GetBlockMetadatas(m_BuildId, BlockAttachments)); + if (VerifyBlockDescriptions.size() != BlockAttachments.size()) + { + throw std::runtime_error(fmt::format("Uploaded blocks metadata could not all be found, {} blocks metadata is missing", + BlockAttachments.size() - VerifyBlockDescriptions.size())); + } + + ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + const std::filesystem::path TempFolder = ".zen-tmp"; + + CreateDirectories(TempFolder); + auto __ = MakeGuard([&TempFolder]() { + if (CleanDirectory(TempFolder, {})) + { + std::error_code DummyEc; + RemoveDir(TempFolder, DummyEc); + } + }); + + m_LogOutput.SetLogOperationProgress(TaskSteps::ValidateBlobs, TaskSteps::StepCount); + + std::unique_ptr<BuildOpLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Validate Blobs")); + BuildOpLogOutput::ProgressBar& Progress(*ProgressBarPtr); + + uint64_t AttachmentsToVerifyCount = ChunkAttachments.size() + BlockAttachments.size(); + FilteredRate FilteredDownloadedBytesPerSecond; + FilteredRate FilteredVerifiedBytesPerSecond; + + std::atomic<uint64_t> MultipartAttachmentCount = 0; + + for (const IoHash& ChunkAttachment : ChunkAttachments) + { + Work.ScheduleWork( + m_NetworkPool, + [this, + &Work, + AttachmentsToVerifyCount, + &TempFolder, + PreferredMultipartChunkSize, + &FilteredDownloadedBytesPerSecond, + &FilteredVerifiedBytesPerSecond, + ChunkAttachment](std::atomic<bool>&) { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("ValidateBuildPart_GetChunk"); + + FilteredDownloadedBytesPerSecond.Start(); + DownloadLargeBlob( + m_Storage, + TempFolder, + m_BuildId, + ChunkAttachment, + PreferredMultipartChunkSize, + Work, + m_NetworkPool, + m_DownloadStats, + [this, + &Work, + AttachmentsToVerifyCount, + &FilteredDownloadedBytesPerSecond, + &FilteredVerifiedBytesPerSecond, + ChunkHash = ChunkAttachment](IoBuffer&& Payload) { + Payload.SetContentType(ZenContentType::kCompressedBinary); + if (!m_AbortFlag) + { + Work.ScheduleWork( + m_IOWorkerPool, + [this, + AttachmentsToVerifyCount, + &FilteredDownloadedBytesPerSecond, + &FilteredVerifiedBytesPerSecond, + Payload = std::move(Payload), + ChunkHash](std::atomic<bool>&) mutable { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("ValidateBuildPart_Validate"); + + if (m_DownloadStats.DownloadedChunkCount + m_DownloadStats.DownloadedBlockCount == + AttachmentsToVerifyCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + + FilteredVerifiedBytesPerSecond.Start(); + + uint64_t CompressedSize; + uint64_t DecompressedSize; + ValidateBlob(m_AbortFlag, std::move(Payload), ChunkHash, CompressedSize, DecompressedSize); + m_ValidateStats.VerifiedAttachmentCount++; + m_ValidateStats.VerifiedByteCount += DecompressedSize; + if (m_ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount) + { + FilteredVerifiedBytesPerSecond.Stop(); + } + } + }); + } + }); + } + }); + } + + for (const IoHash& BlockAttachment : BlockAttachments) + { + Work.ScheduleWork( + m_NetworkPool, + [this, &Work, AttachmentsToVerifyCount, &FilteredDownloadedBytesPerSecond, &FilteredVerifiedBytesPerSecond, BlockAttachment]( + std::atomic<bool>&) { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("ValidateBuildPart_GetBlock"); + + FilteredDownloadedBytesPerSecond.Start(); + IoBuffer Payload = m_Storage.GetBuildBlob(m_BuildId, BlockAttachment); + m_DownloadStats.DownloadedBlockCount++; + m_DownloadStats.DownloadedBlockByteCount += Payload.GetSize(); + if (m_DownloadStats.DownloadedChunkCount + m_DownloadStats.DownloadedBlockCount == AttachmentsToVerifyCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + if (!Payload) + { + throw std::runtime_error(fmt::format("Block attachment {} could not be found", BlockAttachment)); + } + if (!m_AbortFlag) + { + Work.ScheduleWork(m_IOWorkerPool, + [this, + &FilteredVerifiedBytesPerSecond, + AttachmentsToVerifyCount, + Payload = std::move(Payload), + BlockAttachment](std::atomic<bool>&) mutable { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("ValidateBuildPart_ValidateBlock"); + + FilteredVerifiedBytesPerSecond.Start(); + + uint64_t CompressedSize; + uint64_t DecompressedSize; + ValidateChunkBlock(std::move(Payload), BlockAttachment, CompressedSize, DecompressedSize); + m_ValidateStats.VerifiedAttachmentCount++; + m_ValidateStats.VerifiedByteCount += DecompressedSize; + if (m_ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount) + { + FilteredVerifiedBytesPerSecond.Stop(); + } + } + }); + } + } + }); + } + + Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); + + const uint64_t DownloadedAttachmentCount = m_DownloadStats.DownloadedChunkCount + m_DownloadStats.DownloadedBlockCount; + const uint64_t DownloadedByteCount = m_DownloadStats.DownloadedChunkByteCount + m_DownloadStats.DownloadedBlockByteCount; + + FilteredDownloadedBytesPerSecond.Update(DownloadedByteCount); + FilteredVerifiedBytesPerSecond.Update(m_ValidateStats.VerifiedByteCount); + + std::string Details = fmt::format("Downloaded {}/{} ({}, {}bits/s). Verified {}/{} ({}, {}B/s)", + DownloadedAttachmentCount, + AttachmentsToVerifyCount, + NiceBytes(DownloadedByteCount), + NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8), + m_ValidateStats.VerifiedAttachmentCount.load(), + AttachmentsToVerifyCount, + NiceBytes(m_ValidateStats.VerifiedByteCount.load()), + NiceNum(FilteredVerifiedBytesPerSecond.GetCurrent())); + + Progress.UpdateState( + {.Task = "Validating blobs ", + .Details = Details, + .TotalCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2), + .RemainingCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2 - + (DownloadedAttachmentCount + m_ValidateStats.VerifiedAttachmentCount.load())), + .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, + false); + }); + + Progress.Finish(); + m_ValidateStats.ElapsedWallTimeUS = Timer.GetElapsedTimeUs(); + + m_LogOutput.SetLogOperationProgress(TaskSteps::Cleanup, TaskSteps::StepCount); +} + +CompositeBuffer +ValidateBlob(std::atomic<bool>& AbortFlag, + BuildStorage& Storage, + const Oid& BuildId, + const IoHash& BlobHash, + uint64_t& OutCompressedSize, + uint64_t& OutDecompressedSize) +{ + ZEN_TRACE_CPU("ValidateBlob"); + IoBuffer Payload = Storage.GetBuildBlob(BuildId, BlobHash); + if (!Payload) + { + throw std::runtime_error(fmt::format("Blob {} could not be found", BlobHash)); + } + return ValidateBlob(AbortFlag, std::move(Payload), BlobHash, OutCompressedSize, OutDecompressedSize); +} + +ChunkBlockDescription +BuildsOperationValidateBuildPart::ValidateChunkBlock(IoBuffer&& Payload, + const IoHash& BlobHash, + uint64_t& OutCompressedSize, + uint64_t& OutDecompressedSize) +{ + CompositeBuffer BlockBuffer = ValidateBlob(m_AbortFlag, std::move(Payload), BlobHash, OutCompressedSize, OutDecompressedSize); + if (!BlockBuffer) + { + throw std::runtime_error(fmt::format("Chunk block blob {} is not compressed using 'None' compression level", BlobHash)); + } + return GetChunkBlockDescription(BlockBuffer.Flatten(), BlobHash); +} + } // namespace zen diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h index 8ba32127a..be6720d5d 100644 --- a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h +++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h @@ -374,6 +374,8 @@ private: const std::vector<IoHash>& m_LooseChunkHashes; const Options m_Options; const std::filesystem::path m_CacheFolderPath; + const std::filesystem::path m_TempDownloadFolderPath; + const std::filesystem::path m_TempBlockFolderPath; }; struct FindBlocksStatistics @@ -513,6 +515,11 @@ public: LooseChunksStatistics m_LooseChunksStats; private: + std::vector<std::filesystem::path> ParseManifest(const std::filesystem::path& Path, const std::filesystem::path& ManifestPath); + + bool IsAcceptedFolder(const std::string_view& RelativePath) const; + bool IsAcceptedFile(const std::string_view& RelativePath) const; + std::vector<size_t> FindReuseBlocks(const std::vector<ChunkBlockDescription>& KnownBlocks, std::span<const IoHash> ChunkHashes, std::span<const uint32_t> ChunkIndexes, @@ -611,14 +618,64 @@ private: const Options m_Options; }; -void DownloadLargeBlob(BuildStorage& Storage, - const std::filesystem::path& DownloadFolder, - const Oid& BuildId, - const IoHash& ChunkHash, - const std::uint64_t PreferredMultipartChunkSize, - ParallelWork& Work, - WorkerThreadPool& NetworkPool, - DownloadStatistics& DownloadStats, - std::function<void(IoBuffer&& Payload)>&& OnDownloadComplete); +struct ValidateStatistics +{ + uint64_t BuildBlobSize = 0; + uint64_t BuildPartSize = 0; + uint64_t ChunkAttachmentCount = 0; + uint64_t BlockAttachmentCount = 0; + std::atomic<uint64_t> VerifiedAttachmentCount = 0; + std::atomic<uint64_t> VerifiedByteCount = 0; + uint64_t ElapsedWallTimeUS = 0; +}; + +class BuildsOperationValidateBuildPart +{ +public: + struct Options + { + bool IsQuiet = false; + bool IsVerbose = false; + }; + BuildsOperationValidateBuildPart(BuildOpLogOutput& LogOutput, + BuildStorage& Storage, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag, + WorkerThreadPool& IOWorkerPool, + WorkerThreadPool& NetworkPool, + const Oid& BuildId, + const Oid& BuildPartId, + const std::string_view BuildPartName, + const Options& Options); + + void Execute(); + + ValidateStatistics m_ValidateStats; + DownloadStatistics m_DownloadStats; + +private: + ChunkBlockDescription ValidateChunkBlock(IoBuffer&& Payload, + const IoHash& BlobHash, + uint64_t& OutCompressedSize, + uint64_t& OutDecompressedSize); + + BuildOpLogOutput& m_LogOutput; + BuildStorage& m_Storage; + std::atomic<bool>& m_AbortFlag; + std::atomic<bool>& m_PauseFlag; + WorkerThreadPool& m_IOWorkerPool; + WorkerThreadPool& m_NetworkPool; + const Oid m_BuildId; + Oid m_BuildPartId; + const std::string m_BuildPartName; + const Options m_Options; +}; + +CompositeBuffer ValidateBlob(std::atomic<bool>& AbortFlag, + BuildStorage& Storage, + const Oid& BuildId, + const IoHash& BlobHash, + uint64_t& OutCompressedSize, + uint64_t& OutDecompressedSize); } // namespace zen |