// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #include #include #include namespace zen { using namespace std::literals; BuildsOperationValidateBuildPart::BuildsOperationValidateBuildPart(LoggerRef Log, ProgressBase& Progress, BuildStorageBase& Storage, std::atomic& AbortFlag, std::atomic& PauseFlag, WorkerThreadPool& IOWorkerPool, WorkerThreadPool& NetworkPool, const Oid& BuildId, const Oid& BuildPartId, const std::string_view BuildPartName, const Options& Options) : m_Log(Log) , m_Progress(Progress) , m_Storage(Storage) , m_AbortFlag(AbortFlag) , m_PauseFlag(PauseFlag) , m_IOWorkerPool(IOWorkerPool) , m_NetworkPool(NetworkPool) , m_BuildId(BuildId) , m_BuildPartId(BuildPartId) , m_BuildPartName(BuildPartName) , m_Options(Options) { } void BuildsOperationValidateBuildPart::Execute() { ZEN_TRACE_CPU("ValidateBuildPart"); try { auto EndProgress = MakeGuard([&]() { m_Progress.SetLogOperationProgress((uint32_t)TaskSteps::StepCount, (uint32_t)TaskSteps::StepCount); }); Stopwatch Timer; auto _ = MakeGuard([&]() { if (!m_Options.IsQuiet) { ZEN_INFO("Validated build part {}/{} ('{}') in {}", m_BuildId, m_BuildPartId, m_BuildPartName, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } }); m_Progress.SetLogOperationProgress((uint32_t)TaskSteps::FetchBuild, (uint32_t)TaskSteps::StepCount); ResolvedBuildPart Resolved = ResolveBuildPart(); ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); const std::filesystem::path& TempFolder = m_Options.TempFolder; ZEN_ASSERT(!TempFolder.empty()); CleanAndRemoveDirectory(m_IOWorkerPool, m_AbortFlag, m_PauseFlag, TempFolder); CreateDirectories(TempFolder); auto __ = MakeGuard([this, TempFolder]() { CleanAndRemoveDirectory(m_IOWorkerPool, m_AbortFlag, m_PauseFlag, TempFolder); }); m_Progress.SetLogOperationProgress((uint32_t)TaskSteps::ValidateBlobs, (uint32_t)TaskSteps::StepCount); std::unique_ptr ProgressBar = m_Progress.CreateProgressBar("Validate Blobs"); const uint64_t AttachmentsToVerifyCount = Resolved.ChunkAttachments.size() + Resolved.BlockAttachments.size(); FilteredRate FilteredDownloadedBytesPerSecond; FilteredRate FilteredVerifiedBytesPerSecond; ValidateBlobsContext Context{.Work = Work, .AttachmentsToVerifyCount = AttachmentsToVerifyCount, .FilteredDownloadedBytesPerSecond = FilteredDownloadedBytesPerSecond, .FilteredVerifiedBytesPerSecond = FilteredVerifiedBytesPerSecond}; ScheduleChunkAttachmentValidation(Context, Resolved.ChunkAttachments, TempFolder, Resolved.PreferredMultipartChunkSize); ScheduleBlockAttachmentValidation(Context, Resolved.BlockAttachments); Work.Wait(m_Progress.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { ZEN_UNUSED(PendingWork); const uint64_t DownloadedAttachmentCount = m_DownloadStats.DownloadedChunkCount + m_DownloadStats.DownloadedBlockCount; const uint64_t DownloadedByteCount = m_DownloadStats.DownloadedChunkByteCount + m_DownloadStats.DownloadedBlockByteCount; FilteredDownloadedBytesPerSecond.Update(DownloadedByteCount); FilteredVerifiedBytesPerSecond.Update(m_ValidateStats.VerifiedByteCount); std::string Details = fmt::format("Downloaded {}/{} ({}, {}bits/s). Verified {}/{} ({}, {}B/s)", DownloadedAttachmentCount, AttachmentsToVerifyCount, NiceBytes(DownloadedByteCount), NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8), m_ValidateStats.VerifiedAttachmentCount.load(), AttachmentsToVerifyCount, NiceBytes(m_ValidateStats.VerifiedByteCount.load()), NiceNum(FilteredVerifiedBytesPerSecond.GetCurrent())); ProgressBar->UpdateState( {.Task = "Validating blobs ", .Details = Details, .TotalCount = gsl::narrow(AttachmentsToVerifyCount * 2), .RemainingCount = gsl::narrow(AttachmentsToVerifyCount * 2 - (DownloadedAttachmentCount + m_ValidateStats.VerifiedAttachmentCount.load())), .Status = ProgressBase::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); ProgressBar->Finish(); m_ValidateStats.ElapsedWallTimeUS = Timer.GetElapsedTimeUs(); m_Progress.SetLogOperationProgress((uint32_t)TaskSteps::Cleanup, (uint32_t)TaskSteps::StepCount); } catch (const std::exception&) { m_AbortFlag = true; throw; } } BuildsOperationValidateBuildPart::ResolvedBuildPart BuildsOperationValidateBuildPart::ResolveBuildPart() { ResolvedBuildPart Result; Result.PreferredMultipartChunkSize = 32u * 1024u * 1024u; CbObject Build = m_Storage.GetBuild(m_BuildId); if (!m_BuildPartName.empty()) { m_BuildPartId = Build["parts"sv].AsObjectView()[m_BuildPartName].AsObjectId(); if (m_BuildPartId == Oid::Zero) { throw std::runtime_error(fmt::format("Build {} does not have a part named '{}'", m_BuildId, m_BuildPartName)); } } m_ValidateStats.BuildBlobSize = Build.GetSize(); if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0) { Result.PreferredMultipartChunkSize = ChunkSize; } m_Progress.SetLogOperationProgress((uint32_t)TaskSteps::FetchBuildPart, (uint32_t)TaskSteps::StepCount); CbObject BuildPart = m_Storage.GetBuildPart(m_BuildId, m_BuildPartId); m_ValidateStats.BuildPartSize = BuildPart.GetSize(); if (!m_Options.IsQuiet) { ZEN_INFO("Validating build part {}/{} ({})", m_BuildId, m_BuildPartId, NiceBytes(BuildPart.GetSize())); } if (const CbObjectView ChunkAttachmentsView = BuildPart["chunkAttachments"sv].AsObjectView()) { for (CbFieldView LooseFileView : ChunkAttachmentsView["rawHashes"sv]) { Result.ChunkAttachments.push_back(LooseFileView.AsBinaryAttachment()); } } m_ValidateStats.ChunkAttachmentCount = Result.ChunkAttachments.size(); if (const CbObjectView BlockAttachmentsView = BuildPart["blockAttachments"sv].AsObjectView()) { for (CbFieldView BlocksView : BlockAttachmentsView["rawHashes"sv]) { Result.BlockAttachments.push_back(BlocksView.AsBinaryAttachment()); } } m_ValidateStats.BlockAttachmentCount = Result.BlockAttachments.size(); std::vector VerifyBlockDescriptions = ParseChunkBlockDescriptionList(m_Storage.GetBlockMetadatas(m_BuildId, Result.BlockAttachments)); if (VerifyBlockDescriptions.size() != Result.BlockAttachments.size()) { throw std::runtime_error(fmt::format("Uploaded blocks metadata could not all be found, {} blocks metadata is missing", Result.BlockAttachments.size() - VerifyBlockDescriptions.size())); } return Result; } void BuildsOperationValidateBuildPart::ScheduleChunkAttachmentValidation(ValidateBlobsContext& Context, std::span ChunkAttachments, const std::filesystem::path& TempFolder, uint64_t PreferredMultipartChunkSize) { for (const IoHash& ChunkAttachment : ChunkAttachments) { Context.Work.ScheduleWork( m_NetworkPool, [this, &Context, &TempFolder, PreferredMultipartChunkSize, ChunkAttachment = IoHash(ChunkAttachment)](std::atomic&) { if (!m_AbortFlag) { ZEN_TRACE_CPU("ValidateBuildPart_GetChunk"); Context.FilteredDownloadedBytesPerSecond.Start(); DownloadLargeBlob( m_Storage, TempFolder, m_BuildId, ChunkAttachment, PreferredMultipartChunkSize, Context.Work, m_NetworkPool, m_DownloadStats.DownloadedChunkByteCount, m_DownloadStats.MultipartAttachmentCount, [this, &Context, ChunkHash = IoHash(ChunkAttachment)](IoBuffer&& Payload) { m_DownloadStats.DownloadedChunkCount++; Payload.SetContentType(ZenContentType::kCompressedBinary); if (!m_AbortFlag) { Context.Work.ScheduleWork( m_IOWorkerPool, [this, &Context, Payload = IoBuffer(std::move(Payload)), ChunkHash](std::atomic&) mutable { if (!m_AbortFlag) { ValidateDownloadedChunk(Context, ChunkHash, std::move(Payload)); } }); } }); } }); } } void BuildsOperationValidateBuildPart::ScheduleBlockAttachmentValidation(ValidateBlobsContext& Context, std::span BlockAttachments) { for (const IoHash& BlockAttachment : BlockAttachments) { Context.Work.ScheduleWork(m_NetworkPool, [this, &Context, BlockAttachment = IoHash(BlockAttachment)](std::atomic&) { if (!m_AbortFlag) { ZEN_TRACE_CPU("ValidateBuildPart_GetBlock"); Context.FilteredDownloadedBytesPerSecond.Start(); IoBuffer Payload = m_Storage.GetBuildBlob(m_BuildId, BlockAttachment); m_DownloadStats.DownloadedBlockCount++; m_DownloadStats.DownloadedBlockByteCount += Payload.GetSize(); if (m_DownloadStats.DownloadedChunkCount + m_DownloadStats.DownloadedBlockCount == Context.AttachmentsToVerifyCount) { Context.FilteredDownloadedBytesPerSecond.Stop(); } if (!Payload) { throw std::runtime_error(fmt::format("Block attachment {} could not be found", BlockAttachment)); } if (!m_AbortFlag) { Context.Work.ScheduleWork(m_IOWorkerPool, [this, &Context, Payload = std::move(Payload), BlockAttachment](std::atomic&) mutable { if (!m_AbortFlag) { ValidateDownloadedBlock(Context, BlockAttachment, std::move(Payload)); } }); } } }); } } void BuildsOperationValidateBuildPart::ValidateDownloadedChunk(ValidateBlobsContext& Context, const IoHash& ChunkHash, IoBuffer Payload) { ZEN_TRACE_CPU("ValidateBuildPart_Validate"); if (m_DownloadStats.DownloadedChunkCount + m_DownloadStats.DownloadedBlockCount == Context.AttachmentsToVerifyCount) { Context.FilteredDownloadedBytesPerSecond.Stop(); } Context.FilteredVerifiedBytesPerSecond.Start(); uint64_t CompressedSize; uint64_t DecompressedSize; ValidateBlob(m_AbortFlag, std::move(Payload), ChunkHash, CompressedSize, DecompressedSize); m_ValidateStats.VerifiedAttachmentCount++; m_ValidateStats.VerifiedByteCount += DecompressedSize; if (m_ValidateStats.VerifiedAttachmentCount.load() == Context.AttachmentsToVerifyCount) { Context.FilteredVerifiedBytesPerSecond.Stop(); } } void BuildsOperationValidateBuildPart::ValidateDownloadedBlock(ValidateBlobsContext& Context, const IoHash& BlockAttachment, IoBuffer Payload) { ZEN_TRACE_CPU("ValidateBuildPart_ValidateBlock"); Context.FilteredVerifiedBytesPerSecond.Start(); uint64_t CompressedSize; uint64_t DecompressedSize; ValidateChunkBlock(std::move(Payload), BlockAttachment, CompressedSize, DecompressedSize); m_ValidateStats.VerifiedAttachmentCount++; m_ValidateStats.VerifiedByteCount += DecompressedSize; if (m_ValidateStats.VerifiedAttachmentCount.load() == Context.AttachmentsToVerifyCount) { Context.FilteredVerifiedBytesPerSecond.Stop(); } } ChunkBlockDescription BuildsOperationValidateBuildPart::ValidateChunkBlock(IoBuffer&& Payload, const IoHash& BlobHash, uint64_t& OutCompressedSize, uint64_t& OutDecompressedSize) { CompositeBuffer BlockBuffer = ValidateBlob(m_AbortFlag, std::move(Payload), BlobHash, OutCompressedSize, OutDecompressedSize); if (!BlockBuffer) { throw std::runtime_error(fmt::format("Chunk block blob {} is not compressed using 'None' compression level", BlobHash)); } return GetChunkBlockDescription(BlockBuffer.Flatten(), BlobHash); } void ValidateBuildPart(LoggerRef Log, ProgressBase& Progress, std::atomic& AbortFlag, std::atomic& PauseFlag, bool IsQuiet, bool IsVerbose, TransferThreadWorkers& Workers, BuildStorageBase& Storage, const std::filesystem::path& TempFolder, const Oid& BuildId, const Oid& BuildPartId, std::string_view BuildPartName) { ZEN_TRACE_CPU("ValidateBuildPart"); Progress.SetLogOperationName("Validate Part"); BuildsOperationValidateBuildPart ValidateOp( Log, Progress, Storage, AbortFlag, PauseFlag, Workers.GetIOWorkerPool(), Workers.GetNetworkPool(), BuildId, BuildPartId, BuildPartName, BuildsOperationValidateBuildPart::Options{.TempFolder = TempFolder, .IsQuiet = IsQuiet, .IsVerbose = IsVerbose}); ValidateOp.Execute(); const uint64_t DownloadedCount = ValidateOp.m_DownloadStats.DownloadedChunkCount + ValidateOp.m_DownloadStats.DownloadedBlockCount; const uint64_t DownloadedByteCount = ValidateOp.m_DownloadStats.DownloadedChunkByteCount + ValidateOp.m_DownloadStats.DownloadedBlockByteCount; ZEN_CONSOLE("Verified: {:>8} ({}), {}B/sec, {}", DownloadedCount, NiceBytes(DownloadedByteCount), NiceNum(GetBytesPerSecond(ValidateOp.m_ValidateStats.ElapsedWallTimeUS, DownloadedByteCount)), NiceTimeSpanMs(ValidateOp.m_ValidateStats.ElapsedWallTimeUS / 1000)); } } // namespace zen