diff options
| author | Stefan Boberg <[email protected]> | 2026-04-23 18:16:57 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2026-04-23 18:16:57 +0200 |
| commit | 0232b991cd7d8e3a2114ea30e4591dd3e7b65c36 (patch) | |
| tree | 94730e7594fd09ae1fa820391ce311f6daf13905 /src/zenremotestore/builds/buildvalidatebuildpart.cpp | |
| parent | Fix forward declaration order for s_GotSigWinch and SigWinchHandler (diff) | |
| parent | trace: declare Region event name fields as AnsiString (#1012) (diff) | |
| download | archived-zen-sb/zen-help.tar.xz archived-zen-sb/zen-help.zip | |
Merge branch 'main' into sb/zen-helpsb/zen-help
- Combine HelpCommand (this branch) with HistoryCommand (main) in zen CLI dispatcher
- Keep filter-aware TuiPickOne rewrite; adopt main's ASCII arrow glyphs in doc comment
Diffstat (limited to 'src/zenremotestore/builds/buildvalidatebuildpart.cpp')
| -rw-r--r-- | src/zenremotestore/builds/buildvalidatebuildpart.cpp | 374 |
1 files changed, 374 insertions, 0 deletions
diff --git a/src/zenremotestore/builds/buildvalidatebuildpart.cpp b/src/zenremotestore/builds/buildvalidatebuildpart.cpp new file mode 100644 index 000000000..d06502683 --- /dev/null +++ b/src/zenremotestore/builds/buildvalidatebuildpart.cpp @@ -0,0 +1,374 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenremotestore/builds/buildvalidatebuildpart.h> + +#include <zencore/fmtutils.h> +#include <zencore/parallelwork.h> +#include <zencore/scopeguard.h> +#include <zencore/trace.h> +#include <zenremotestore/builds/builduploadfolder.h> +#include <zenremotestore/transferthreadworkers.h> +#include <zenutil/filesystemutils.h> +#include <zenutil/filteredrate.h> +#include <zenutil/progress.h> + +namespace zen { + +using namespace std::literals; + +BuildsOperationValidateBuildPart::BuildsOperationValidateBuildPart(LoggerRef Log, + ProgressBase& Progress, + BuildStorageBase& Storage, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag, + WorkerThreadPool& IOWorkerPool, + WorkerThreadPool& NetworkPool, + const Oid& BuildId, + const Oid& BuildPartId, + const std::string_view BuildPartName, + const Options& Options) + +: m_Log(Log) +, m_Progress(Progress) +, m_Storage(Storage) +, m_AbortFlag(AbortFlag) +, m_PauseFlag(PauseFlag) +, m_IOWorkerPool(IOWorkerPool) +, m_NetworkPool(NetworkPool) +, m_BuildId(BuildId) +, m_BuildPartId(BuildPartId) +, m_BuildPartName(BuildPartName) +, m_Options(Options) +{ +} + +void +BuildsOperationValidateBuildPart::Execute() +{ + ZEN_TRACE_CPU("ValidateBuildPart"); + try + { + auto EndProgress = + MakeGuard([&]() { m_Progress.SetLogOperationProgress((uint32_t)TaskSteps::StepCount, (uint32_t)TaskSteps::StepCount); }); + + Stopwatch Timer; + auto _ = MakeGuard([&]() { + if (!m_Options.IsQuiet) + { + ZEN_INFO("Validated build part {}/{} ('{}') in {}", + m_BuildId, + m_BuildPartId, + m_BuildPartName, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + } + }); + + m_Progress.SetLogOperationProgress((uint32_t)TaskSteps::FetchBuild, (uint32_t)TaskSteps::StepCount); + + ResolvedBuildPart Resolved = ResolveBuildPart(); + + ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + const std::filesystem::path& TempFolder = m_Options.TempFolder; + ZEN_ASSERT(!TempFolder.empty()); + + CleanAndRemoveDirectory(m_IOWorkerPool, m_AbortFlag, m_PauseFlag, TempFolder); + CreateDirectories(TempFolder); + auto __ = MakeGuard([this, TempFolder]() { CleanAndRemoveDirectory(m_IOWorkerPool, m_AbortFlag, m_PauseFlag, TempFolder); }); + + m_Progress.SetLogOperationProgress((uint32_t)TaskSteps::ValidateBlobs, (uint32_t)TaskSteps::StepCount); + + std::unique_ptr<ProgressBase::ProgressBar> ProgressBar = m_Progress.CreateProgressBar("Validate Blobs"); + + const uint64_t AttachmentsToVerifyCount = Resolved.ChunkAttachments.size() + Resolved.BlockAttachments.size(); + FilteredRate FilteredDownloadedBytesPerSecond; + FilteredRate FilteredVerifiedBytesPerSecond; + + ValidateBlobsContext Context{.Work = Work, + .AttachmentsToVerifyCount = AttachmentsToVerifyCount, + .FilteredDownloadedBytesPerSecond = FilteredDownloadedBytesPerSecond, + .FilteredVerifiedBytesPerSecond = FilteredVerifiedBytesPerSecond}; + + ScheduleChunkAttachmentValidation(Context, Resolved.ChunkAttachments, TempFolder, Resolved.PreferredMultipartChunkSize); + ScheduleBlockAttachmentValidation(Context, Resolved.BlockAttachments); + + Work.Wait(m_Progress.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); + + const uint64_t DownloadedAttachmentCount = m_DownloadStats.DownloadedChunkCount + m_DownloadStats.DownloadedBlockCount; + const uint64_t DownloadedByteCount = m_DownloadStats.DownloadedChunkByteCount + m_DownloadStats.DownloadedBlockByteCount; + + FilteredDownloadedBytesPerSecond.Update(DownloadedByteCount); + FilteredVerifiedBytesPerSecond.Update(m_ValidateStats.VerifiedByteCount); + + std::string Details = fmt::format("Downloaded {}/{} ({}, {}bits/s). Verified {}/{} ({}, {}B/s)", + DownloadedAttachmentCount, + AttachmentsToVerifyCount, + NiceBytes(DownloadedByteCount), + NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8), + m_ValidateStats.VerifiedAttachmentCount.load(), + AttachmentsToVerifyCount, + NiceBytes(m_ValidateStats.VerifiedByteCount.load()), + NiceNum(FilteredVerifiedBytesPerSecond.GetCurrent())); + + ProgressBar->UpdateState( + {.Task = "Validating blobs ", + .Details = Details, + .TotalCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2), + .RemainingCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2 - + (DownloadedAttachmentCount + m_ValidateStats.VerifiedAttachmentCount.load())), + .Status = ProgressBase::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, + false); + }); + + ProgressBar->Finish(); + m_ValidateStats.ElapsedWallTimeUS = Timer.GetElapsedTimeUs(); + + m_Progress.SetLogOperationProgress((uint32_t)TaskSteps::Cleanup, (uint32_t)TaskSteps::StepCount); + } + catch (const std::exception&) + { + m_AbortFlag = true; + throw; + } +} + +BuildsOperationValidateBuildPart::ResolvedBuildPart +BuildsOperationValidateBuildPart::ResolveBuildPart() +{ + ResolvedBuildPart Result; + Result.PreferredMultipartChunkSize = 32u * 1024u * 1024u; + + CbObject Build = m_Storage.GetBuild(m_BuildId); + if (!m_BuildPartName.empty()) + { + m_BuildPartId = Build["parts"sv].AsObjectView()[m_BuildPartName].AsObjectId(); + if (m_BuildPartId == Oid::Zero) + { + throw std::runtime_error(fmt::format("Build {} does not have a part named '{}'", m_BuildId, m_BuildPartName)); + } + } + m_ValidateStats.BuildBlobSize = Build.GetSize(); + if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0) + { + Result.PreferredMultipartChunkSize = ChunkSize; + } + + m_Progress.SetLogOperationProgress((uint32_t)TaskSteps::FetchBuildPart, (uint32_t)TaskSteps::StepCount); + + CbObject BuildPart = m_Storage.GetBuildPart(m_BuildId, m_BuildPartId); + m_ValidateStats.BuildPartSize = BuildPart.GetSize(); + if (!m_Options.IsQuiet) + { + ZEN_INFO("Validating build part {}/{} ({})", m_BuildId, m_BuildPartId, NiceBytes(BuildPart.GetSize())); + } + if (const CbObjectView ChunkAttachmentsView = BuildPart["chunkAttachments"sv].AsObjectView()) + { + for (CbFieldView LooseFileView : ChunkAttachmentsView["rawHashes"sv]) + { + Result.ChunkAttachments.push_back(LooseFileView.AsBinaryAttachment()); + } + } + m_ValidateStats.ChunkAttachmentCount = Result.ChunkAttachments.size(); + if (const CbObjectView BlockAttachmentsView = BuildPart["blockAttachments"sv].AsObjectView()) + { + for (CbFieldView BlocksView : BlockAttachmentsView["rawHashes"sv]) + { + Result.BlockAttachments.push_back(BlocksView.AsBinaryAttachment()); + } + } + m_ValidateStats.BlockAttachmentCount = Result.BlockAttachments.size(); + + std::vector<ChunkBlockDescription> VerifyBlockDescriptions = + ParseChunkBlockDescriptionList(m_Storage.GetBlockMetadatas(m_BuildId, Result.BlockAttachments)); + if (VerifyBlockDescriptions.size() != Result.BlockAttachments.size()) + { + throw std::runtime_error(fmt::format("Uploaded blocks metadata could not all be found, {} blocks metadata is missing", + Result.BlockAttachments.size() - VerifyBlockDescriptions.size())); + } + + return Result; +} + +void +BuildsOperationValidateBuildPart::ScheduleChunkAttachmentValidation(ValidateBlobsContext& Context, + std::span<const IoHash> ChunkAttachments, + const std::filesystem::path& TempFolder, + uint64_t PreferredMultipartChunkSize) +{ + for (const IoHash& ChunkAttachment : ChunkAttachments) + { + Context.Work.ScheduleWork( + m_NetworkPool, + [this, &Context, &TempFolder, PreferredMultipartChunkSize, ChunkAttachment = IoHash(ChunkAttachment)](std::atomic<bool>&) { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("ValidateBuildPart_GetChunk"); + + Context.FilteredDownloadedBytesPerSecond.Start(); + DownloadLargeBlob( + m_Storage, + TempFolder, + m_BuildId, + ChunkAttachment, + PreferredMultipartChunkSize, + Context.Work, + m_NetworkPool, + m_DownloadStats.DownloadedChunkByteCount, + m_DownloadStats.MultipartAttachmentCount, + [this, &Context, ChunkHash = IoHash(ChunkAttachment)](IoBuffer&& Payload) { + m_DownloadStats.DownloadedChunkCount++; + Payload.SetContentType(ZenContentType::kCompressedBinary); + if (!m_AbortFlag) + { + Context.Work.ScheduleWork( + m_IOWorkerPool, + [this, &Context, Payload = IoBuffer(std::move(Payload)), ChunkHash](std::atomic<bool>&) mutable { + if (!m_AbortFlag) + { + ValidateDownloadedChunk(Context, ChunkHash, std::move(Payload)); + } + }); + } + }); + } + }); + } +} + +void +BuildsOperationValidateBuildPart::ScheduleBlockAttachmentValidation(ValidateBlobsContext& Context, std::span<const IoHash> BlockAttachments) +{ + for (const IoHash& BlockAttachment : BlockAttachments) + { + Context.Work.ScheduleWork(m_NetworkPool, [this, &Context, BlockAttachment = IoHash(BlockAttachment)](std::atomic<bool>&) { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("ValidateBuildPart_GetBlock"); + + Context.FilteredDownloadedBytesPerSecond.Start(); + IoBuffer Payload = m_Storage.GetBuildBlob(m_BuildId, BlockAttachment); + m_DownloadStats.DownloadedBlockCount++; + m_DownloadStats.DownloadedBlockByteCount += Payload.GetSize(); + if (m_DownloadStats.DownloadedChunkCount + m_DownloadStats.DownloadedBlockCount == Context.AttachmentsToVerifyCount) + { + Context.FilteredDownloadedBytesPerSecond.Stop(); + } + if (!Payload) + { + throw std::runtime_error(fmt::format("Block attachment {} could not be found", BlockAttachment)); + } + if (!m_AbortFlag) + { + Context.Work.ScheduleWork(m_IOWorkerPool, + [this, &Context, Payload = std::move(Payload), BlockAttachment](std::atomic<bool>&) mutable { + if (!m_AbortFlag) + { + ValidateDownloadedBlock(Context, BlockAttachment, std::move(Payload)); + } + }); + } + } + }); + } +} + +void +BuildsOperationValidateBuildPart::ValidateDownloadedChunk(ValidateBlobsContext& Context, const IoHash& ChunkHash, IoBuffer Payload) +{ + ZEN_TRACE_CPU("ValidateBuildPart_Validate"); + + if (m_DownloadStats.DownloadedChunkCount + m_DownloadStats.DownloadedBlockCount == Context.AttachmentsToVerifyCount) + { + Context.FilteredDownloadedBytesPerSecond.Stop(); + } + + Context.FilteredVerifiedBytesPerSecond.Start(); + + uint64_t CompressedSize; + uint64_t DecompressedSize; + ValidateBlob(m_AbortFlag, std::move(Payload), ChunkHash, CompressedSize, DecompressedSize); + m_ValidateStats.VerifiedAttachmentCount++; + m_ValidateStats.VerifiedByteCount += DecompressedSize; + if (m_ValidateStats.VerifiedAttachmentCount.load() == Context.AttachmentsToVerifyCount) + { + Context.FilteredVerifiedBytesPerSecond.Stop(); + } +} + +void +BuildsOperationValidateBuildPart::ValidateDownloadedBlock(ValidateBlobsContext& Context, const IoHash& BlockAttachment, IoBuffer Payload) +{ + ZEN_TRACE_CPU("ValidateBuildPart_ValidateBlock"); + + Context.FilteredVerifiedBytesPerSecond.Start(); + + uint64_t CompressedSize; + uint64_t DecompressedSize; + ValidateChunkBlock(std::move(Payload), BlockAttachment, CompressedSize, DecompressedSize); + m_ValidateStats.VerifiedAttachmentCount++; + m_ValidateStats.VerifiedByteCount += DecompressedSize; + if (m_ValidateStats.VerifiedAttachmentCount.load() == Context.AttachmentsToVerifyCount) + { + Context.FilteredVerifiedBytesPerSecond.Stop(); + } +} + +ChunkBlockDescription +BuildsOperationValidateBuildPart::ValidateChunkBlock(IoBuffer&& Payload, + const IoHash& BlobHash, + uint64_t& OutCompressedSize, + uint64_t& OutDecompressedSize) +{ + CompositeBuffer BlockBuffer = ValidateBlob(m_AbortFlag, std::move(Payload), BlobHash, OutCompressedSize, OutDecompressedSize); + if (!BlockBuffer) + { + throw std::runtime_error(fmt::format("Chunk block blob {} is not compressed using 'None' compression level", BlobHash)); + } + return GetChunkBlockDescription(BlockBuffer.Flatten(), BlobHash); +} + +void +ValidateBuildPart(LoggerRef Log, + ProgressBase& Progress, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag, + bool IsQuiet, + bool IsVerbose, + TransferThreadWorkers& Workers, + BuildStorageBase& Storage, + const std::filesystem::path& TempFolder, + const Oid& BuildId, + const Oid& BuildPartId, + std::string_view BuildPartName) +{ + ZEN_TRACE_CPU("ValidateBuildPart"); + + Progress.SetLogOperationName("Validate Part"); + + BuildsOperationValidateBuildPart ValidateOp( + Log, + Progress, + Storage, + AbortFlag, + PauseFlag, + Workers.GetIOWorkerPool(), + Workers.GetNetworkPool(), + BuildId, + BuildPartId, + BuildPartName, + BuildsOperationValidateBuildPart::Options{.TempFolder = TempFolder, .IsQuiet = IsQuiet, .IsVerbose = IsVerbose}); + + ValidateOp.Execute(); + + const uint64_t DownloadedCount = ValidateOp.m_DownloadStats.DownloadedChunkCount + ValidateOp.m_DownloadStats.DownloadedBlockCount; + const uint64_t DownloadedByteCount = + ValidateOp.m_DownloadStats.DownloadedChunkByteCount + ValidateOp.m_DownloadStats.DownloadedBlockByteCount; + ZEN_CONSOLE("Verified: {:>8} ({}), {}B/sec, {}", + DownloadedCount, + NiceBytes(DownloadedByteCount), + NiceNum(GetBytesPerSecond(ValidateOp.m_ValidateStats.ElapsedWallTimeUS, DownloadedByteCount)), + NiceTimeSpanMs(ValidateOp.m_ValidateStats.ElapsedWallTimeUS / 1000)); +} + +} // namespace zen |