diff options
| author | Dan Engelbrecht <[email protected]> | 2025-10-14 18:24:17 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-14 18:24:17 +0200 |
| commit | 74c90983853f5ef0966be35544173d6efd85db17 (patch) | |
| tree | e4fe6faa53697f2b1503e3919527d5ad18260890 /src | |
| parent | refactor builds cmd part2 (#572) (diff) | |
| download | zen-74c90983853f5ef0966be35544173d6efd85db17.tar.xz zen-74c90983853f5ef0966be35544173d6efd85db17.zip | |
refactor builds cmd part3 (#573)
* move lambdas to member functions
* add BuildsOperationValidateBuildPart
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 511 | ||||
| -rw-r--r-- | src/zenremotestore/builds/buildstorageoperations.cpp | 724 | ||||
| -rw-r--r-- | src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h | 75 |
3 files changed, 649 insertions, 661 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index b60844ea1..6a0a1a0da 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -66,46 +66,6 @@ namespace zen { using namespace std::literals; namespace { - namespace zenutil { - -#if ZEN_PLATFORM_WINDOWS - class SecurityAttributes - { - public: - inline SECURITY_ATTRIBUTES* Attributes() { return &m_Attributes; } - - protected: - SECURITY_ATTRIBUTES m_Attributes{}; - SECURITY_DESCRIPTOR m_Sd{}; - }; - - // Security attributes which allows any user access - - class AnyUserSecurityAttributes : public SecurityAttributes - { - public: - AnyUserSecurityAttributes() - { - m_Attributes.nLength = sizeof m_Attributes; - m_Attributes.bInheritHandle = false; // Disable inheritance - - const BOOL Success = InitializeSecurityDescriptor(&m_Sd, SECURITY_DESCRIPTOR_REVISION); - - if (Success) - { - if (!SetSecurityDescriptorDacl(&m_Sd, TRUE, (PACL)NULL, FALSE)) - { - ThrowLastError("SetSecurityDescriptorDacl failed"); - } - - m_Attributes.lpSecurityDescriptor = &m_Sd; - } - } - }; -#endif // ZEN_PLATFORM_WINDOWS - - } // namespace zenutil - static std::atomic<bool> AbortFlag = false; static std::atomic<bool> PauseFlag = false; @@ -283,14 +243,6 @@ namespace { // std::filesystem::path ZenStateFileJsonPath(const std::filesystem::path& ZenFolderPath) { return ZenFolderPath / "current_state.json"; // } - std::filesystem::path ZenTempCacheFolderPath(const std::filesystem::path& ZenFolderPath) - { - return ZenTempFolderPath(ZenFolderPath) / "cache"; // Decompressed and verified data - chunks & sequences - } - std::filesystem::path ZenTempBlockFolderPath(const std::filesystem::path& ZenFolderPath) - { - return ZenTempFolderPath(ZenFolderPath) / "blocks"; // Temp storage for whole and partial blocks - } std::filesystem::path UploadTempDirectory(const std::filesystem::path& Path) { const std::u8string LocalPathString = Path.generic_u8string(); @@ -298,11 +250,6 @@ namespace { return std::filesystem::temp_directory_path() / fmt::format("zen_{}", PathHash); } - std::filesystem::path ZenTempDownloadFolderPath(const std::filesystem::path& ZenFolderPath) - { - return ZenTempFolderPath(ZenFolderPath) / "download"; // Temp storage for decompressed and validated chunks - } - const std::string ZenExcludeManifestName = ".zen_exclude_manifest.txt"; const std::string UnsyncFolderName = ".unsync"; @@ -1159,406 +1106,35 @@ namespace { TemporaryFile::SafeWriteFile(WritePath, JsonPayload); } - CompositeBuffer ValidateBlob(IoBuffer&& Payload, const IoHash& BlobHash, uint64_t& OutCompressedSize, uint64_t& OutDecompressedSize) - { - ZEN_TRACE_CPU("ValidateBlob"); - - if (Payload.GetContentType() != ZenContentType::kCompressedBinary) - { - throw std::runtime_error(fmt::format("Blob {} ({} bytes) has unexpected content type '{}'", - BlobHash, - Payload.GetSize(), - ToString(Payload.GetContentType()))); - } - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, RawSize); - if (!Compressed) - { - throw std::runtime_error(fmt::format("Blob {} ({} bytes) compressed header is invalid", BlobHash, Payload.GetSize())); - } - if (RawHash != BlobHash) - { - throw std::runtime_error( - fmt::format("Blob {} ({} bytes) compressed header has a mismatching raw hash {}", BlobHash, Payload.GetSize(), RawHash)); - } - - IoHashStream Hash; - bool CouldDecompress = Compressed.DecompressToStream( - 0, - RawSize, - [&Hash](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { - ZEN_UNUSED(SourceOffset, SourceSize, Offset); - if (!AbortFlag) - { - for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) - { - Hash.Append(Segment.GetView()); - } - return true; - } - return false; - }); - - if (AbortFlag) - { - return CompositeBuffer{}; - } - - if (!CouldDecompress) - { - throw std::runtime_error( - fmt::format("Blob {} ({} bytes) failed to decompress - header information mismatch", BlobHash, Payload.GetSize())); - } - IoHash ValidateRawHash = Hash.GetHash(); - if (ValidateRawHash != BlobHash) - { - throw std::runtime_error(fmt::format("Blob {} ({} bytes) decompressed hash {} does not match header information", - BlobHash, - Payload.GetSize(), - ValidateRawHash)); - } - OodleCompressor Compressor; - OodleCompressionLevel CompressionLevel; - uint64_t BlockSize; - if (!Compressed.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) - { - throw std::runtime_error(fmt::format("Blob {} ({} bytes) failed to get compression details", BlobHash, Payload.GetSize())); - } - OutCompressedSize = Payload.GetSize(); - OutDecompressedSize = RawSize; - if (CompressionLevel == OodleCompressionLevel::None) - { - // Only decompress to composite if we need it for block verification - CompositeBuffer DecompressedComposite = Compressed.DecompressToComposite(); - if (!DecompressedComposite) - { - throw std::runtime_error(fmt::format("Blob {} ({} bytes) failed to decompress to composite", BlobHash, Payload.GetSize())); - } - return DecompressedComposite; - } - return CompositeBuffer{}; - } - - CompositeBuffer ValidateBlob(BuildStorage& Storage, - const Oid& BuildId, - const IoHash& BlobHash, - uint64_t& OutCompressedSize, - uint64_t& OutDecompressedSize) - { - ZEN_TRACE_CPU("ValidateBlob"); - IoBuffer Payload = Storage.GetBuildBlob(BuildId, BlobHash); - if (!Payload) - { - throw std::runtime_error(fmt::format("Blob {} could not be found", BlobHash)); - } - return ValidateBlob(std::move(Payload), BlobHash, OutCompressedSize, OutDecompressedSize); - } - - ChunkBlockDescription ValidateChunkBlock(IoBuffer&& Payload, - const IoHash& BlobHash, - uint64_t& OutCompressedSize, - uint64_t& OutDecompressedSize) - { - CompositeBuffer BlockBuffer = ValidateBlob(std::move(Payload), BlobHash, OutCompressedSize, OutDecompressedSize); - if (!BlockBuffer) - { - throw std::runtime_error(fmt::format("Chunk block blob {} is not compressed using 'None' compression level", BlobHash)); - } - return GetChunkBlockDescription(BlockBuffer.Flatten(), BlobHash); - } - - struct ValidateStatistics - { - uint64_t BuildBlobSize = 0; - uint64_t BuildPartSize = 0; - uint64_t ChunkAttachmentCount = 0; - uint64_t BlockAttachmentCount = 0; - std::atomic<uint64_t> VerifiedAttachmentCount = 0; - std::atomic<uint64_t> VerifiedByteCount = 0; - uint64_t ElapsedWallTimeUS = 0; - }; - - void ValidateBuildPart(BuildStorage& Storage, - const Oid& BuildId, - Oid BuildPartId, - const std::string_view BuildPartName, - ValidateStatistics& ValidateStats, - DownloadStatistics& DownloadStats) + void ValidateBuildPart(BuildStorage& Storage, const Oid& BuildId, Oid BuildPartId, const std::string_view BuildPartName) { ZEN_TRACE_CPU("ValidateBuildPart"); ProgressBar::SetLogOperationName(ProgressMode, "Validate Part"); - enum TaskSteps : uint32_t - { - FetchBuild, - FetchBuildPart, - ValidateBlobs, - Cleanup, - StepCount - }; - - auto EndProgress = - MakeGuard([&]() { ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::StepCount, TaskSteps::StepCount); }); - - Stopwatch Timer; - auto _ = MakeGuard([&]() { - if (!IsQuiet) - { - ZEN_CONSOLE("Validated build part {}/{} ('{}') in {}", - BuildId, - BuildPartId, - BuildPartName, - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - } - }); - - ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::FetchBuild, TaskSteps::StepCount); - - CbObject Build = Storage.GetBuild(BuildId); - if (!BuildPartName.empty()) - { - BuildPartId = Build["parts"sv].AsObjectView()[BuildPartName].AsObjectId(); - if (BuildPartId == Oid::Zero) - { - throw std::runtime_error(fmt::format("Build {} does not have a part named '{}'", BuildId, BuildPartName)); - } - } - ValidateStats.BuildBlobSize = Build.GetSize(); - uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u; - if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0) - { - PreferredMultipartChunkSize = ChunkSize; - } - - ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::FetchBuildPart, TaskSteps::StepCount); - - CbObject BuildPart = Storage.GetBuildPart(BuildId, BuildPartId); - ValidateStats.BuildPartSize = BuildPart.GetSize(); - if (!IsQuiet) - { - ZEN_CONSOLE("Validating build part {}/{} ({})", BuildId, BuildPartId, NiceBytes(BuildPart.GetSize())); - } - std::vector<IoHash> ChunkAttachments; - for (CbFieldView LooseFileView : BuildPart["chunkAttachments"sv].AsObjectView()["rawHashes"sv]) - { - ChunkAttachments.push_back(LooseFileView.AsBinaryAttachment()); - } - ValidateStats.ChunkAttachmentCount = ChunkAttachments.size(); - std::vector<IoHash> BlockAttachments; - for (CbFieldView BlocksView : BuildPart["blockAttachments"sv].AsObjectView()["rawHashes"sv]) - { - BlockAttachments.push_back(BlocksView.AsBinaryAttachment()); - } - ValidateStats.BlockAttachmentCount = BlockAttachments.size(); - - std::vector<ChunkBlockDescription> VerifyBlockDescriptions = - ParseChunkBlockDescriptionList(Storage.GetBlockMetadatas(BuildId, BlockAttachments)); - if (VerifyBlockDescriptions.size() != BlockAttachments.size()) - { - throw std::runtime_error(fmt::format("Uploaded blocks metadata could not all be found, {} blocks metadata is missing", - BlockAttachments.size() - VerifyBlockDescriptions.size())); - } - - WorkerThreadPool& NetworkPool = GetNetworkPool(); - WorkerThreadPool& VerifyPool = GetIOWorkerPool(); - ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - - const std::filesystem::path TempFolder = ".zen-tmp"; - - CreateDirectories(TempFolder); - auto __ = MakeGuard([&TempFolder]() { - if (CleanDirectory(TempFolder, {})) - { - std::error_code DummyEc; - RemoveDir(TempFolder, DummyEc); - } - }); - - ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::ValidateBlobs, TaskSteps::StepCount); - - ProgressBar ProgressBar(ProgressMode, "Validate Blobs"); - - uint64_t AttachmentsToVerifyCount = ChunkAttachments.size() + BlockAttachments.size(); - FilteredRate FilteredDownloadedBytesPerSecond; - FilteredRate FilteredVerifiedBytesPerSecond; - - std::atomic<uint64_t> MultipartAttachmentCount = 0; - - for (const IoHash& ChunkAttachment : ChunkAttachments) - { - Work.ScheduleWork(NetworkPool, - [&Storage, - &NetworkPool, - &VerifyPool, - &Work, - &DownloadStats, - &ValidateStats, - AttachmentsToVerifyCount, - &TempFolder, - BuildId = Oid(BuildId), - PreferredMultipartChunkSize, - &FilteredDownloadedBytesPerSecond, - &FilteredVerifiedBytesPerSecond, - ChunkAttachment](std::atomic<bool>&) { - if (!AbortFlag) - { - ZEN_TRACE_CPU("ValidateBuildPart_GetChunk"); - - FilteredDownloadedBytesPerSecond.Start(); - DownloadLargeBlob( - Storage, - TempFolder, - BuildId, - ChunkAttachment, - PreferredMultipartChunkSize, - Work, - NetworkPool, - DownloadStats, - [&Work, - &VerifyPool, - &DownloadStats, - &ValidateStats, - AttachmentsToVerifyCount, - &FilteredDownloadedBytesPerSecond, - &FilteredVerifiedBytesPerSecond, - ChunkHash = ChunkAttachment](IoBuffer&& Payload) { - Payload.SetContentType(ZenContentType::kCompressedBinary); - if (!AbortFlag) - { - Work.ScheduleWork( - VerifyPool, - [&DownloadStats, - &ValidateStats, - AttachmentsToVerifyCount, - &FilteredDownloadedBytesPerSecond, - &FilteredVerifiedBytesPerSecond, - Payload = std::move(Payload), - ChunkHash](std::atomic<bool>&) mutable { - if (!AbortFlag) - { - ZEN_TRACE_CPU("ValidateBuildPart_Validate"); - - if (DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount == - AttachmentsToVerifyCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - - FilteredVerifiedBytesPerSecond.Start(); - - uint64_t CompressedSize; - uint64_t DecompressedSize; - ValidateBlob(std::move(Payload), ChunkHash, CompressedSize, DecompressedSize); - ValidateStats.VerifiedAttachmentCount++; - ValidateStats.VerifiedByteCount += DecompressedSize; - if (ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount) - { - FilteredVerifiedBytesPerSecond.Stop(); - } - } - }); - } - }); - } - }); - } - - for (const IoHash& BlockAttachment : BlockAttachments) - { - Work.ScheduleWork( - NetworkPool, - [&Storage, - &BuildId, - &Work, - &VerifyPool, - &DownloadStats, - &ValidateStats, - AttachmentsToVerifyCount, - &FilteredDownloadedBytesPerSecond, - &FilteredVerifiedBytesPerSecond, - BlockAttachment](std::atomic<bool>&) { - if (!AbortFlag) - { - ZEN_TRACE_CPU("ValidateBuildPart_GetBlock"); - - FilteredDownloadedBytesPerSecond.Start(); - IoBuffer Payload = Storage.GetBuildBlob(BuildId, BlockAttachment); - DownloadStats.DownloadedBlockCount++; - DownloadStats.DownloadedBlockByteCount += Payload.GetSize(); - if (DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount == AttachmentsToVerifyCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - if (!Payload) - { - throw std::runtime_error(fmt::format("Block attachment {} could not be found", BlockAttachment)); - } - if (!AbortFlag) - { - Work.ScheduleWork( - VerifyPool, - [&FilteredVerifiedBytesPerSecond, - AttachmentsToVerifyCount, - &ValidateStats, - Payload = std::move(Payload), - BlockAttachment](std::atomic<bool>&) mutable { - if (!AbortFlag) - { - ZEN_TRACE_CPU("ValidateBuildPart_ValidateBlock"); - - FilteredVerifiedBytesPerSecond.Start(); - - uint64_t CompressedSize; - uint64_t DecompressedSize; - ValidateChunkBlock(std::move(Payload), BlockAttachment, CompressedSize, DecompressedSize); - ValidateStats.VerifiedAttachmentCount++; - ValidateStats.VerifiedByteCount += DecompressedSize; - if (ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount) - { - FilteredVerifiedBytesPerSecond.Stop(); - } - } - }); - } - } - }); - } - - Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(PendingWork); - - const uint64_t DownloadedAttachmentCount = DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount; - const uint64_t DownloadedByteCount = DownloadStats.DownloadedChunkByteCount + DownloadStats.DownloadedBlockByteCount; - - FilteredDownloadedBytesPerSecond.Update(DownloadedByteCount); - FilteredVerifiedBytesPerSecond.Update(ValidateStats.VerifiedByteCount); - - std::string Details = fmt::format("Downloaded {}/{} ({}, {}bits/s). Verified {}/{} ({}, {}B/s)", - DownloadedAttachmentCount, - AttachmentsToVerifyCount, - NiceBytes(DownloadedByteCount), - NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8), - ValidateStats.VerifiedAttachmentCount.load(), - AttachmentsToVerifyCount, - NiceBytes(ValidateStats.VerifiedByteCount.load()), - NiceNum(FilteredVerifiedBytesPerSecond.GetCurrent())); - - ProgressBar.UpdateState( - {.Task = "Validating blobs ", - .Details = Details, - .TotalCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2), - .RemainingCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2 - - (DownloadedAttachmentCount + ValidateStats.VerifiedAttachmentCount.load())), - .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, - false); - }); - - ProgressBar.Finish(); - ValidateStats.ElapsedWallTimeUS = Timer.GetElapsedTimeUs(); - - ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::Cleanup, TaskSteps::StepCount); + ConsoleOpLogOutput Output(ProgressMode); + + BuildsOperationValidateBuildPart ValidateOp(Output, + Storage, + AbortFlag, + PauseFlag, + GetIOWorkerPool(), + GetNetworkPool(), + BuildId, + BuildPartId, + BuildPartName, + BuildsOperationValidateBuildPart::Options{.IsQuiet = IsQuiet, .IsVerbose = IsVerbose}); + + ValidateOp.Execute(); + + const uint64_t DownloadedCount = ValidateOp.m_DownloadStats.DownloadedChunkCount + ValidateOp.m_DownloadStats.DownloadedBlockCount; + const uint64_t DownloadedByteCount = + ValidateOp.m_DownloadStats.DownloadedChunkByteCount + ValidateOp.m_DownloadStats.DownloadedBlockByteCount; + ZEN_CONSOLE("Verified: {:>8} ({}), {}B/sec, {}", + DownloadedCount, + NiceBytes(DownloadedByteCount), + NiceNum(GetBytesPerSecond(ValidateOp.m_ValidateStats.ElapsedWallTimeUS, DownloadedByteCount)), + NiceTimeSpanMs(ValidateOp.m_ValidateStats.ElapsedWallTimeUS / 1000)); } void UploadFolder(StorageInstance& Storage, @@ -3096,10 +2672,6 @@ namespace { const std::filesystem::path ZenTempFolder = ZenTempFolderPath(Options.ZenFolderPath); CreateDirectories(ZenTempFolder); - CreateDirectories(ZenTempBlockFolderPath(Options.ZenFolderPath)); - CreateDirectories(ZenTempCacheFolderPath(Options.ZenFolderPath)); - CreateDirectories(ZenTempDownloadFolderPath(Options.ZenFolderPath)); - std::uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u; std::vector<std::pair<Oid, std::string>> AllBuildParts = @@ -4976,22 +4548,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { if (m_PostUploadVerify) { - ValidateStatistics ValidateStats; - DownloadStatistics ValidateDownloadStats; - ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName, ValidateStats, ValidateDownloadStats); - std::string ValidateInfo; - if (m_PostUploadVerify) - { - const uint64_t DownloadedCount = - ValidateDownloadStats.DownloadedChunkCount + ValidateDownloadStats.DownloadedBlockCount; - const uint64_t DownloadedByteCount = - ValidateDownloadStats.DownloadedChunkByteCount + ValidateDownloadStats.DownloadedBlockByteCount; - ValidateInfo = fmt::format("Verified: {:>8} ({}), {}B/sec, {}", - DownloadedCount, - NiceBytes(DownloadedByteCount), - NiceNum(GetBytesPerSecond(ValidateStats.ElapsedWallTimeUS, DownloadedByteCount)), - NiceTimeSpanMs(ValidateStats.ElapsedWallTimeUS / 1000)); - } + ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName); } } @@ -5208,7 +4765,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) uint64_t CompressedSize; uint64_t DecompressedSize; - ValidateBlob(*Storage.BuildStorage, BuildId, BlobHash, CompressedSize, DecompressedSize); + ValidateBlob(AbortFlag, *Storage.BuildStorage, BuildId, BlobHash, CompressedSize, DecompressedSize); if (AbortFlag) { throw std::runtime_error("Fetch blob aborted"); @@ -5267,9 +4824,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) const Oid BuildPartId = m_BuildPartName.empty() ? Oid::Zero : ParseBuildPartId(); - ValidateStatistics ValidateStats; - DownloadStatistics DownloadStats; - ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName, ValidateStats, DownloadStats); + ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName); if (AbortFlag) { @@ -5485,11 +5040,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) throw std::runtime_error("Test aborted. (Upload build)"); } - { - ValidateStatistics ValidateStats; - DownloadStatistics ValidateDownloadStats; - ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName, ValidateStats, ValidateDownloadStats); - } + ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName); ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}'", BuildId, BuildPartId, m_BuildPartName, DownloadPath); DownloadFolder(Storage, @@ -5676,11 +5227,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) throw std::runtime_error("Test aborted. (Upload scrambled)"); } - { - ValidateStatistics ValidateStats; - DownloadStatistics ValidateDownloadStats; - ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName, ValidateStats, ValidateDownloadStats); - } + ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName); ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (original)", BuildId, BuildPartId, m_BuildPartName, DownloadPath); DownloadFolder(Storage, diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp index a4f4237cd..60058c4a4 100644 --- a/src/zenremotestore/builds/buildstorageoperations.cpp +++ b/src/zenremotestore/builds/buildstorageoperations.cpp @@ -544,6 +544,160 @@ namespace { return SB.ToString(); } + void DownloadLargeBlob(BuildStorage& Storage, + const std::filesystem::path& DownloadFolder, + const Oid& BuildId, + const IoHash& ChunkHash, + const std::uint64_t PreferredMultipartChunkSize, + ParallelWork& Work, + WorkerThreadPool& NetworkPool, + DownloadStatistics& DownloadStats, + std::function<void(IoBuffer&& Payload)>&& OnDownloadComplete) + { + ZEN_TRACE_CPU("DownloadLargeBlob"); + + struct WorkloadData + { + TemporaryFile TempFile; + }; + std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>()); + + std::error_code Ec; + Workload->TempFile.CreateTemporary(DownloadFolder, Ec); + if (Ec) + { + throw std::runtime_error( + fmt::format("Failed opening temporary file '{}': {} ({})", Workload->TempFile.GetPath(), Ec.message(), Ec.value())); + } + std::vector<std::function<void()>> WorkItems = Storage.GetLargeBuildBlob( + BuildId, + ChunkHash, + PreferredMultipartChunkSize, + [&Work, Workload, &DownloadStats](uint64_t Offset, const IoBuffer& Chunk) { + DownloadStats.DownloadedChunkByteCount += Chunk.GetSize(); + + if (!Work.IsAborted()) + { + ZEN_TRACE_CPU("Async_DownloadLargeBlob_OnReceive"); + Workload->TempFile.Write(Chunk.GetView(), Offset); + } + }, + [&Work, Workload, &DownloadStats, OnDownloadComplete = std::move(OnDownloadComplete)]() { + DownloadStats.DownloadedChunkCount++; + if (!Work.IsAborted()) + { + ZEN_TRACE_CPU("Async_DownloadLargeBlob_OnComplete"); + + uint64_t PayloadSize = Workload->TempFile.FileSize(); + void* FileHandle = Workload->TempFile.Detach(); + ZEN_ASSERT(FileHandle != nullptr); + IoBuffer Payload(IoBuffer::File, FileHandle, 0, PayloadSize, true); + Payload.SetDeleteOnClose(true); + OnDownloadComplete(std::move(Payload)); + } + }); + if (!WorkItems.empty()) + { + DownloadStats.MultipartAttachmentCount++; + } + for (auto& WorkItem : WorkItems) + { + Work.ScheduleWork(NetworkPool, [WorkItem = std::move(WorkItem)](std::atomic<bool>& AbortFlag) { + if (!AbortFlag) + { + ZEN_TRACE_CPU("Async_DownloadLargeBlob_Work"); + + WorkItem(); + } + }); + } + } + + CompositeBuffer ValidateBlob(std::atomic<bool>& AbortFlag, + IoBuffer&& Payload, + const IoHash& BlobHash, + uint64_t& OutCompressedSize, + uint64_t& OutDecompressedSize) + { + ZEN_TRACE_CPU("ValidateBlob"); + + if (Payload.GetContentType() != ZenContentType::kCompressedBinary) + { + throw std::runtime_error(fmt::format("Blob {} ({} bytes) has unexpected content type '{}'", + BlobHash, + Payload.GetSize(), + ToString(Payload.GetContentType()))); + } + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, RawSize); + if (!Compressed) + { + throw std::runtime_error(fmt::format("Blob {} ({} bytes) compressed header is invalid", BlobHash, Payload.GetSize())); + } + if (RawHash != BlobHash) + { + throw std::runtime_error( + fmt::format("Blob {} ({} bytes) compressed header has a mismatching raw hash {}", BlobHash, Payload.GetSize(), RawHash)); + } + + IoHashStream Hash; + bool CouldDecompress = Compressed.DecompressToStream( + 0, + RawSize, + [&AbortFlag, &Hash](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { + ZEN_UNUSED(SourceOffset, SourceSize, Offset); + if (!AbortFlag) + { + for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) + { + Hash.Append(Segment.GetView()); + } + return true; + } + return false; + }); + + if (AbortFlag) + { + return CompositeBuffer{}; + } + + if (!CouldDecompress) + { + throw std::runtime_error( + fmt::format("Blob {} ({} bytes) failed to decompress - header information mismatch", BlobHash, Payload.GetSize())); + } + IoHash ValidateRawHash = Hash.GetHash(); + if (ValidateRawHash != BlobHash) + { + throw std::runtime_error(fmt::format("Blob {} ({} bytes) decompressed hash {} does not match header information", + BlobHash, + Payload.GetSize(), + ValidateRawHash)); + } + OodleCompressor Compressor; + OodleCompressionLevel CompressionLevel; + uint64_t BlockSize; + if (!Compressed.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) + { + throw std::runtime_error(fmt::format("Blob {} ({} bytes) failed to get compression details", BlobHash, Payload.GetSize())); + } + OutCompressedSize = Payload.GetSize(); + OutDecompressedSize = RawSize; + if (CompressionLevel == OodleCompressionLevel::None) + { + // Only decompress to composite if we need it for block verification + CompositeBuffer DecompressedComposite = Compressed.DecompressToComposite(); + if (!DecompressedComposite) + { + throw std::runtime_error(fmt::format("Blob {} ({} bytes) failed to decompress to composite", BlobHash, Payload.GetSize())); + } + return DecompressedComposite; + } + return CompositeBuffer{}; + } + } // namespace class ReadFileCache @@ -1264,6 +1418,8 @@ BuildsOperationUpdateFolder::BuildsOperationUpdateFolder(BuildOpLogOutput& , m_LooseChunkHashes(LooseChunkHashes) , m_Options(Options) , m_CacheFolderPath(ZenTempCacheFolderPath(m_Options.ZenFolderPath)) +, m_TempDownloadFolderPath(ZenTempDownloadFolderPath(m_Options.ZenFolderPath)) +, m_TempBlockFolderPath(ZenTempBlockFolderPath(m_Options.ZenFolderPath)) { } @@ -1288,6 +1444,10 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) m_LogOutput.SetLogOperationProgress(TaskSteps::ScanExistingData, TaskSteps::StepCount); + CreateDirectories(m_CacheFolderPath); + CreateDirectories(m_TempDownloadFolderPath); + CreateDirectories(m_TempBlockFolderPath); + Stopwatch IndexTimer; if (!m_Options.IsQuiet) @@ -1836,8 +1996,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) { TotalPartWriteCount++; - std::filesystem::path BlockPath = - ZenTempBlockFolderPath(m_Options.ZenFolderPath) / BlockDescription.BlockHash.ToHexString(); + std::filesystem::path BlockPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); if (IsFile(BlockPath)) { CachedChunkBlockIndexes.push_back(BlockIndex); @@ -2279,7 +2438,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) m_Options.LargeAttachmentSize) { DownloadLargeBlob(*m_Storage.BuildStorage, - ZenTempDownloadFolderPath(m_Options.ZenFolderPath), + m_TempDownloadFolderPath, m_BuildId, ChunkHash, m_Options.PreferredMultipartChunkSize, @@ -2469,7 +2628,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) FilteredWrittenBytesPerSecond.Start(); std::filesystem::path BlockChunkPath = - ZenTempBlockFolderPath(m_Options.ZenFolderPath) / BlockDescription.BlockHash.ToHexString(); + m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); IoBuffer BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); if (!BlockBuffer) { @@ -2584,12 +2743,11 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) if (!Ec) { BlockBuffer.SetDeleteOnClose(false); - BlockBuffer = {}; - BlockChunkPath = - ZenTempBlockFolderPath(m_Options.ZenFolderPath) / fmt::format("{}_{:x}_{:x}", - BlockDescription.BlockHash, - BlockRange.RangeStart, - BlockRange.RangeLength); + BlockBuffer = {}; + BlockChunkPath = m_TempBlockFolderPath / fmt::format("{}_{:x}_{:x}", + BlockDescription.BlockHash, + BlockRange.RangeStart, + BlockRange.RangeLength); RenameFile(TempBlobPath, BlockChunkPath, Ec); if (Ec) { @@ -2609,7 +2767,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) ZEN_TRACE_CPU("WriteTempPartialBlock"); // Could not be moved and rather large, lets store it on disk BlockChunkPath = - ZenTempBlockFolderPath(m_Options.ZenFolderPath) / + m_TempBlockFolderPath / fmt::format("{}_{:x}_{:x}", BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength); TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); BlockBuffer = {}; @@ -2773,9 +2931,8 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) if (!Ec) { BlockBuffer.SetDeleteOnClose(false); - BlockBuffer = {}; - BlockChunkPath = - ZenTempBlockFolderPath(m_Options.ZenFolderPath) / BlockDescription.BlockHash.ToHexString(); + BlockBuffer = {}; + BlockChunkPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); RenameFile(TempBlobPath, BlockChunkPath, Ec); if (Ec) { @@ -2794,8 +2951,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) { ZEN_TRACE_CPU("WriteTempFullBlock"); // Could not be moved and rather large, lets store it on disk - BlockChunkPath = - ZenTempBlockFolderPath(m_Options.ZenFolderPath) / BlockDescription.BlockHash.ToHexString(); + BlockChunkPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); BlockBuffer = {}; } @@ -3540,7 +3696,7 @@ BuildsOperationUpdateFolder::ScanTempBlocksFolder(tsl::robin_map<IoHash, uint32_ } DirectoryContent BlockDirContent; - GetDirectoryContent(ZenTempBlockFolderPath(m_Options.ZenFolderPath), + GetDirectoryContent(m_TempBlockFolderPath, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeFileSizes, BlockDirContent); OutCachedBlocksFound.reserve(BlockDirContent.Files.size()); @@ -3792,7 +3948,7 @@ BuildsOperationUpdateFolder::FindDownloadedChunk(const IoHash& ChunkHash) { ZEN_TRACE_CPU("FindDownloadedChunk"); - std::filesystem::path CompressedChunkPath = ZenTempDownloadFolderPath(m_Options.ZenFolderPath) / ChunkHash.ToHexString(); + std::filesystem::path CompressedChunkPath = m_TempDownloadFolderPath / ChunkHash.ToHexString(); if (IsFile(CompressedChunkPath)) { IoBuffer ExistingCompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); @@ -4590,7 +4746,7 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa { Payload.SetDeleteOnClose(false); Payload = {}; - CompressedChunkPath = ZenTempDownloadFolderPath(ZenFolderPath) / ChunkHash.ToHexString(); + CompressedChunkPath = m_TempDownloadFolderPath / ChunkHash.ToHexString(); RenameFile(TempBlobPath, CompressedChunkPath, Ec); if (Ec) { @@ -4609,7 +4765,7 @@ BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::pa { ZEN_TRACE_CPU("WriteTempChunk"); // Could not be moved and rather large, lets store it on disk - CompressedChunkPath = ZenTempDownloadFolderPath(ZenFolderPath) / ChunkHash.ToHexString(); + CompressedChunkPath = m_TempDownloadFolderPath / ChunkHash.ToHexString(); TemporaryFile::SafeWriteFile(CompressedChunkPath, Payload); Payload = {}; } @@ -4929,72 +5085,6 @@ BuildsOperationUploadFolder::Execute() ChunkedFolderContent LocalContent; { - auto IsAcceptedFolder = [this](const std::string_view& RelativePath) -> bool { - for (const std::string& ExcludeFolder : m_Options.ExcludeFolders) - { - if (RelativePath.starts_with(ExcludeFolder)) - { - if (RelativePath.length() == ExcludeFolder.length()) - { - return false; - } - else if (RelativePath[ExcludeFolder.length()] == '/') - { - return false; - } - } - } - return true; - }; - - auto IsAcceptedFile = [this](const std::string_view& RelativePath, uint64_t, uint32_t) -> bool { - for (const std::string& ExcludeExtension : m_Options.ExcludeExtensions) - { - if (RelativePath.ends_with(ExcludeExtension)) - { - return false; - } - } - return true; - }; - - auto ParseManifest = [](const std::filesystem::path& Path, - const std::filesystem::path& ManifestPath) -> std::vector<std::filesystem::path> { - std::vector<std::filesystem::path> AssetPaths; - std::filesystem::path AbsoluteManifestPath = - MakeSafeAbsolutePath(ManifestPath.is_absolute() ? ManifestPath : Path / ManifestPath); - IoBuffer ManifestContent = ReadFile(AbsoluteManifestPath).Flatten(); - std::string_view ManifestString((const char*)ManifestContent.GetView().GetData(), ManifestContent.GetSize()); - std::string_view::size_type Offset = 0; - while (Offset < ManifestContent.GetSize()) - { - size_t PathBreakOffset = ManifestString.find_first_of("\t\r\n", Offset); - if (PathBreakOffset == std::string_view::npos) - { - PathBreakOffset = ManifestContent.GetSize(); - } - std::string_view AssetPath = ManifestString.substr(Offset, PathBreakOffset - Offset); - if (!AssetPath.empty()) - { - AssetPaths.emplace_back(std::filesystem::path(AssetPath)); - } - Offset = PathBreakOffset; - size_t EolOffset = ManifestString.find_first_of("\r\n", Offset); - if (EolOffset == std::string_view::npos) - { - break; - } - Offset = EolOffset; - size_t LineBreakOffset = ManifestString.find_first_not_of("\t\r\n", Offset); - if (LineBreakOffset == std::string_view::npos) - { - break; - } - Offset = LineBreakOffset; - } - return AssetPaths; - }; - Stopwatch ScanTimer; FolderContent Content; if (m_ManifestPath.empty()) @@ -5013,15 +5103,10 @@ BuildsOperationUploadFolder::Execute() Content = GetFolderContent( m_LocalFolderScanStats, m_Path, - std::move(IsAcceptedFolder), - [this, &IsAcceptedFile, &ExcludeAssetPaths](const std::string_view& RelativePath, - uint64_t Size, - uint32_t Attributes) -> bool { - if (RelativePath == m_Options.ZenExcludeManifestName) - { - return false; - } - if (!IsAcceptedFile(RelativePath, Size, Attributes)) + [this](const std::string_view& RelativePath) { return IsAcceptedFolder(RelativePath); }, + [this, &ExcludeAssetPaths](const std::string_view& RelativePath, uint64_t Size, uint32_t Attributes) -> bool { + ZEN_UNUSED(Size, Attributes); + if (!IsAcceptedFile(RelativePath)) { return false; } @@ -5342,7 +5427,7 @@ BuildsOperationUploadFolder::Execute() } for (const ChunkBlockDescription& Block : AllChunkBlockDescriptions) { - for (const IoHash& ChunkHash : Block.ChunkHashes) + for (const IoHash& ChunkHash : Block.ChunkRawHashes) { ChunkHashToAbsoluteChunkIndex.insert({ChunkHash, AbsoluteChunkHashes.size()}); AbsoluteChunkHashes.push_back(ChunkHash); @@ -5638,24 +5723,6 @@ BuildsOperationUploadFolder::Execute() } } - // if (m_PostUploadVerify && !m_AbortFlag) - // { - // //m_LogOutput.SetLogOperationProgress(TaskSteps::Validate, TaskSteps::StepCount); - // ValidateBuildPart(*m_Storage.BuildStorage, m_BuildId, m_BuildPartId, m_BuildPartName, ValidateStats, ValidateDownloadStats); - // std::string ValidateInfo; - // if (m_PostUploadVerify) - // { - // const uint64_t DownloadedCount = ValidateDownloadStats.DownloadedChunkCount + ValidateDownloadStats.DownloadedBlockCount; - // const uint64_t DownloadedByteCount = - // ValidateDownloadStats.DownloadedChunkByteCount + ValidateDownloadStats.DownloadedBlockByteCount; - // ValidateInfo = fmt::format("\n Verified: {:>8} ({}), {}B/sec, {}", - // DownloadedCount, - // NiceBytes(DownloadedByteCount), - // NiceNum(GetBytesPerSecond(ValidateStats.ElapsedWallTimeUS, DownloadedByteCount)), - // NiceTimeSpanMs(ValidateStats.ElapsedWallTimeUS / 1000)); - // } - // } - m_Storage.BuildStorage->PutBuildPartStats( m_BuildId, m_BuildPartId, @@ -5674,74 +5741,78 @@ BuildsOperationUploadFolder::Execute() m_LogOutput.SetLogOperationProgress(TaskSteps::Cleanup, TaskSteps::StepCount); } -void -DownloadLargeBlob(BuildStorage& Storage, - const std::filesystem::path& DownloadFolder, - const Oid& BuildId, - const IoHash& ChunkHash, - const std::uint64_t PreferredMultipartChunkSize, - ParallelWork& Work, - WorkerThreadPool& NetworkPool, - DownloadStatistics& DownloadStats, - std::function<void(IoBuffer&& Payload)>&& OnDownloadComplete) +std::vector<std::filesystem::path> +BuildsOperationUploadFolder::ParseManifest(const std::filesystem::path& Path, const std::filesystem::path& ManifestPath) { - ZEN_TRACE_CPU("DownloadLargeBlob"); - - struct WorkloadData - { - TemporaryFile TempFile; - }; - std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>()); - - std::error_code Ec; - Workload->TempFile.CreateTemporary(DownloadFolder, Ec); - if (Ec) + std::vector<std::filesystem::path> AssetPaths; + std::filesystem::path AbsoluteManifestPath = MakeSafeAbsolutePath(ManifestPath.is_absolute() ? ManifestPath : Path / ManifestPath); + IoBuffer ManifestContent = ReadFile(AbsoluteManifestPath).Flatten(); + std::string_view ManifestString((const char*)ManifestContent.GetView().GetData(), ManifestContent.GetSize()); + std::string_view::size_type Offset = 0; + while (Offset < ManifestContent.GetSize()) { - throw std::runtime_error( - fmt::format("Failed opening temporary file '{}': {} ({})", Workload->TempFile.GetPath(), Ec.message(), Ec.value())); + size_t PathBreakOffset = ManifestString.find_first_of("\t\r\n", Offset); + if (PathBreakOffset == std::string_view::npos) + { + PathBreakOffset = ManifestContent.GetSize(); + } + std::string_view AssetPath = ManifestString.substr(Offset, PathBreakOffset - Offset); + if (!AssetPath.empty()) + { + AssetPaths.emplace_back(std::filesystem::path(AssetPath)); + } + Offset = PathBreakOffset; + size_t EolOffset = ManifestString.find_first_of("\r\n", Offset); + if (EolOffset == std::string_view::npos) + { + break; + } + Offset = EolOffset; + size_t LineBreakOffset = ManifestString.find_first_not_of("\t\r\n", Offset); + if (LineBreakOffset == std::string_view::npos) + { + break; + } + Offset = LineBreakOffset; } - std::vector<std::function<void()>> WorkItems = Storage.GetLargeBuildBlob( - BuildId, - ChunkHash, - PreferredMultipartChunkSize, - [&Work, Workload, &DownloadStats](uint64_t Offset, const IoBuffer& Chunk) { - DownloadStats.DownloadedChunkByteCount += Chunk.GetSize(); + return AssetPaths; +} - if (!Work.IsAborted()) +bool +BuildsOperationUploadFolder::IsAcceptedFolder(const std::string_view& RelativePath) const +{ + for (const std::string& ExcludeFolder : m_Options.ExcludeFolders) + { + if (RelativePath.starts_with(ExcludeFolder)) + { + if (RelativePath.length() == ExcludeFolder.length()) { - ZEN_TRACE_CPU("Async_DownloadLargeBlob_OnReceive"); - Workload->TempFile.Write(Chunk.GetView(), Offset); + return false; } - }, - [&Work, Workload, &DownloadStats, OnDownloadComplete = std::move(OnDownloadComplete)]() { - DownloadStats.DownloadedChunkCount++; - if (!Work.IsAborted()) + else if (RelativePath[ExcludeFolder.length()] == '/') { - ZEN_TRACE_CPU("Async_DownloadLargeBlob_OnComplete"); - - uint64_t PayloadSize = Workload->TempFile.FileSize(); - void* FileHandle = Workload->TempFile.Detach(); - ZEN_ASSERT(FileHandle != nullptr); - IoBuffer Payload(IoBuffer::File, FileHandle, 0, PayloadSize, true); - Payload.SetDeleteOnClose(true); - OnDownloadComplete(std::move(Payload)); + return false; } - }); - if (!WorkItems.empty()) + } + } + return true; +} + +bool +BuildsOperationUploadFolder::IsAcceptedFile(const std::string_view& RelativePath) const +{ + if (RelativePath == m_Options.ZenExcludeManifestName) { - DownloadStats.MultipartAttachmentCount++; + return false; } - for (auto& WorkItem : WorkItems) + for (const std::string& ExcludeExtension : m_Options.ExcludeExtensions) { - Work.ScheduleWork(NetworkPool, [WorkItem = std::move(WorkItem)](std::atomic<bool>& AbortFlag) { - if (!AbortFlag) - { - ZEN_TRACE_CPU("Async_DownloadLargeBlob_Work"); - - WorkItem(); - } - }); + if (RelativePath.ends_with(ExcludeExtension)) + { + return false; + } } + return true; } std::vector<size_t> @@ -7130,4 +7201,317 @@ BuildsOperationUploadFolder::CompressChunk(const ChunkedFolderContent& Content, return std::move(CompressedBlob).GetCompressed(); } +BuildsOperationValidateBuildPart::BuildsOperationValidateBuildPart(BuildOpLogOutput& LogOutput, + BuildStorage& Storage, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag, + WorkerThreadPool& IOWorkerPool, + WorkerThreadPool& NetworkPool, + const Oid& BuildId, + const Oid& BuildPartId, + const std::string_view BuildPartName, + const Options& Options) + +: m_LogOutput(LogOutput) +, m_Storage(Storage) +, m_AbortFlag(AbortFlag) +, m_PauseFlag(PauseFlag) +, m_IOWorkerPool(IOWorkerPool) +, m_NetworkPool(NetworkPool) +, m_BuildId(BuildId) +, m_BuildPartId(BuildPartId) +, m_BuildPartName(BuildPartName) +, m_Options(Options) +{ +} + +void +BuildsOperationValidateBuildPart::Execute() +{ + ZEN_TRACE_CPU("ValidateBuildPart"); + + enum TaskSteps : uint32_t + { + FetchBuild, + FetchBuildPart, + ValidateBlobs, + Cleanup, + StepCount + }; + + auto EndProgress = MakeGuard([&]() { m_LogOutput.SetLogOperationProgress(TaskSteps::StepCount, TaskSteps::StepCount); }); + + Stopwatch Timer; + auto _ = MakeGuard([&]() { + if (!m_Options.IsQuiet) + { + ZEN_CONSOLE("Validated build part {}/{} ('{}') in {}", + m_BuildId, + m_BuildPartId, + m_BuildPartName, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + } + }); + + m_LogOutput.SetLogOperationProgress(TaskSteps::FetchBuild, TaskSteps::StepCount); + + CbObject Build = m_Storage.GetBuild(m_BuildId); + if (!m_BuildPartName.empty()) + { + m_BuildPartId = Build["parts"sv].AsObjectView()[m_BuildPartName].AsObjectId(); + if (m_BuildPartId == Oid::Zero) + { + throw std::runtime_error(fmt::format("Build {} does not have a part named '{}'", m_BuildId, m_BuildPartName)); + } + } + m_ValidateStats.BuildBlobSize = Build.GetSize(); + uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u; + if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0) + { + PreferredMultipartChunkSize = ChunkSize; + } + + m_LogOutput.SetLogOperationProgress(TaskSteps::FetchBuildPart, TaskSteps::StepCount); + + CbObject BuildPart = m_Storage.GetBuildPart(m_BuildId, m_BuildPartId); + m_ValidateStats.BuildPartSize = BuildPart.GetSize(); + if (!m_Options.IsQuiet) + { + ZEN_CONSOLE("Validating build part {}/{} ({})", m_BuildId, m_BuildPartId, NiceBytes(BuildPart.GetSize())); + } + std::vector<IoHash> ChunkAttachments; + for (CbFieldView LooseFileView : BuildPart["chunkAttachments"sv].AsObjectView()["rawHashes"sv]) + { + ChunkAttachments.push_back(LooseFileView.AsBinaryAttachment()); + } + m_ValidateStats.ChunkAttachmentCount = ChunkAttachments.size(); + std::vector<IoHash> BlockAttachments; + for (CbFieldView BlocksView : BuildPart["blockAttachments"sv].AsObjectView()["rawHashes"sv]) + { + BlockAttachments.push_back(BlocksView.AsBinaryAttachment()); + } + m_ValidateStats.BlockAttachmentCount = BlockAttachments.size(); + + std::vector<ChunkBlockDescription> VerifyBlockDescriptions = + ParseChunkBlockDescriptionList(m_Storage.GetBlockMetadatas(m_BuildId, BlockAttachments)); + if (VerifyBlockDescriptions.size() != BlockAttachments.size()) + { + throw std::runtime_error(fmt::format("Uploaded blocks metadata could not all be found, {} blocks metadata is missing", + BlockAttachments.size() - VerifyBlockDescriptions.size())); + } + + ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + const std::filesystem::path TempFolder = ".zen-tmp"; + + CreateDirectories(TempFolder); + auto __ = MakeGuard([&TempFolder]() { + if (CleanDirectory(TempFolder, {})) + { + std::error_code DummyEc; + RemoveDir(TempFolder, DummyEc); + } + }); + + m_LogOutput.SetLogOperationProgress(TaskSteps::ValidateBlobs, TaskSteps::StepCount); + + std::unique_ptr<BuildOpLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Validate Blobs")); + BuildOpLogOutput::ProgressBar& Progress(*ProgressBarPtr); + + uint64_t AttachmentsToVerifyCount = ChunkAttachments.size() + BlockAttachments.size(); + FilteredRate FilteredDownloadedBytesPerSecond; + FilteredRate FilteredVerifiedBytesPerSecond; + + std::atomic<uint64_t> MultipartAttachmentCount = 0; + + for (const IoHash& ChunkAttachment : ChunkAttachments) + { + Work.ScheduleWork( + m_NetworkPool, + [this, + &Work, + AttachmentsToVerifyCount, + &TempFolder, + PreferredMultipartChunkSize, + &FilteredDownloadedBytesPerSecond, + &FilteredVerifiedBytesPerSecond, + ChunkAttachment](std::atomic<bool>&) { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("ValidateBuildPart_GetChunk"); + + FilteredDownloadedBytesPerSecond.Start(); + DownloadLargeBlob( + m_Storage, + TempFolder, + m_BuildId, + ChunkAttachment, + PreferredMultipartChunkSize, + Work, + m_NetworkPool, + m_DownloadStats, + [this, + &Work, + AttachmentsToVerifyCount, + &FilteredDownloadedBytesPerSecond, + &FilteredVerifiedBytesPerSecond, + ChunkHash = ChunkAttachment](IoBuffer&& Payload) { + Payload.SetContentType(ZenContentType::kCompressedBinary); + if (!m_AbortFlag) + { + Work.ScheduleWork( + m_IOWorkerPool, + [this, + AttachmentsToVerifyCount, + &FilteredDownloadedBytesPerSecond, + &FilteredVerifiedBytesPerSecond, + Payload = std::move(Payload), + ChunkHash](std::atomic<bool>&) mutable { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("ValidateBuildPart_Validate"); + + if (m_DownloadStats.DownloadedChunkCount + m_DownloadStats.DownloadedBlockCount == + AttachmentsToVerifyCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + + FilteredVerifiedBytesPerSecond.Start(); + + uint64_t CompressedSize; + uint64_t DecompressedSize; + ValidateBlob(m_AbortFlag, std::move(Payload), ChunkHash, CompressedSize, DecompressedSize); + m_ValidateStats.VerifiedAttachmentCount++; + m_ValidateStats.VerifiedByteCount += DecompressedSize; + if (m_ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount) + { + FilteredVerifiedBytesPerSecond.Stop(); + } + } + }); + } + }); + } + }); + } + + for (const IoHash& BlockAttachment : BlockAttachments) + { + Work.ScheduleWork( + m_NetworkPool, + [this, &Work, AttachmentsToVerifyCount, &FilteredDownloadedBytesPerSecond, &FilteredVerifiedBytesPerSecond, BlockAttachment]( + std::atomic<bool>&) { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("ValidateBuildPart_GetBlock"); + + FilteredDownloadedBytesPerSecond.Start(); + IoBuffer Payload = m_Storage.GetBuildBlob(m_BuildId, BlockAttachment); + m_DownloadStats.DownloadedBlockCount++; + m_DownloadStats.DownloadedBlockByteCount += Payload.GetSize(); + if (m_DownloadStats.DownloadedChunkCount + m_DownloadStats.DownloadedBlockCount == AttachmentsToVerifyCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + if (!Payload) + { + throw std::runtime_error(fmt::format("Block attachment {} could not be found", BlockAttachment)); + } + if (!m_AbortFlag) + { + Work.ScheduleWork(m_IOWorkerPool, + [this, + &FilteredVerifiedBytesPerSecond, + AttachmentsToVerifyCount, + Payload = std::move(Payload), + BlockAttachment](std::atomic<bool>&) mutable { + if (!m_AbortFlag) + { + ZEN_TRACE_CPU("ValidateBuildPart_ValidateBlock"); + + FilteredVerifiedBytesPerSecond.Start(); + + uint64_t CompressedSize; + uint64_t DecompressedSize; + ValidateChunkBlock(std::move(Payload), BlockAttachment, CompressedSize, DecompressedSize); + m_ValidateStats.VerifiedAttachmentCount++; + m_ValidateStats.VerifiedByteCount += DecompressedSize; + if (m_ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount) + { + FilteredVerifiedBytesPerSecond.Stop(); + } + } + }); + } + } + }); + } + + Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); + + const uint64_t DownloadedAttachmentCount = m_DownloadStats.DownloadedChunkCount + m_DownloadStats.DownloadedBlockCount; + const uint64_t DownloadedByteCount = m_DownloadStats.DownloadedChunkByteCount + m_DownloadStats.DownloadedBlockByteCount; + + FilteredDownloadedBytesPerSecond.Update(DownloadedByteCount); + FilteredVerifiedBytesPerSecond.Update(m_ValidateStats.VerifiedByteCount); + + std::string Details = fmt::format("Downloaded {}/{} ({}, {}bits/s). Verified {}/{} ({}, {}B/s)", + DownloadedAttachmentCount, + AttachmentsToVerifyCount, + NiceBytes(DownloadedByteCount), + NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8), + m_ValidateStats.VerifiedAttachmentCount.load(), + AttachmentsToVerifyCount, + NiceBytes(m_ValidateStats.VerifiedByteCount.load()), + NiceNum(FilteredVerifiedBytesPerSecond.GetCurrent())); + + Progress.UpdateState( + {.Task = "Validating blobs ", + .Details = Details, + .TotalCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2), + .RemainingCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2 - + (DownloadedAttachmentCount + m_ValidateStats.VerifiedAttachmentCount.load())), + .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, + false); + }); + + Progress.Finish(); + m_ValidateStats.ElapsedWallTimeUS = Timer.GetElapsedTimeUs(); + + m_LogOutput.SetLogOperationProgress(TaskSteps::Cleanup, TaskSteps::StepCount); +} + +CompositeBuffer +ValidateBlob(std::atomic<bool>& AbortFlag, + BuildStorage& Storage, + const Oid& BuildId, + const IoHash& BlobHash, + uint64_t& OutCompressedSize, + uint64_t& OutDecompressedSize) +{ + ZEN_TRACE_CPU("ValidateBlob"); + IoBuffer Payload = Storage.GetBuildBlob(BuildId, BlobHash); + if (!Payload) + { + throw std::runtime_error(fmt::format("Blob {} could not be found", BlobHash)); + } + return ValidateBlob(AbortFlag, std::move(Payload), BlobHash, OutCompressedSize, OutDecompressedSize); +} + +ChunkBlockDescription +BuildsOperationValidateBuildPart::ValidateChunkBlock(IoBuffer&& Payload, + const IoHash& BlobHash, + uint64_t& OutCompressedSize, + uint64_t& OutDecompressedSize) +{ + CompositeBuffer BlockBuffer = ValidateBlob(m_AbortFlag, std::move(Payload), BlobHash, OutCompressedSize, OutDecompressedSize); + if (!BlockBuffer) + { + throw std::runtime_error(fmt::format("Chunk block blob {} is not compressed using 'None' compression level", BlobHash)); + } + return GetChunkBlockDescription(BlockBuffer.Flatten(), BlobHash); +} + } // namespace zen diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h index 8ba32127a..be6720d5d 100644 --- a/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h +++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageoperations.h @@ -374,6 +374,8 @@ private: const std::vector<IoHash>& m_LooseChunkHashes; const Options m_Options; const std::filesystem::path m_CacheFolderPath; + const std::filesystem::path m_TempDownloadFolderPath; + const std::filesystem::path m_TempBlockFolderPath; }; struct FindBlocksStatistics @@ -513,6 +515,11 @@ public: LooseChunksStatistics m_LooseChunksStats; private: + std::vector<std::filesystem::path> ParseManifest(const std::filesystem::path& Path, const std::filesystem::path& ManifestPath); + + bool IsAcceptedFolder(const std::string_view& RelativePath) const; + bool IsAcceptedFile(const std::string_view& RelativePath) const; + std::vector<size_t> FindReuseBlocks(const std::vector<ChunkBlockDescription>& KnownBlocks, std::span<const IoHash> ChunkHashes, std::span<const uint32_t> ChunkIndexes, @@ -611,14 +618,64 @@ private: const Options m_Options; }; -void DownloadLargeBlob(BuildStorage& Storage, - const std::filesystem::path& DownloadFolder, - const Oid& BuildId, - const IoHash& ChunkHash, - const std::uint64_t PreferredMultipartChunkSize, - ParallelWork& Work, - WorkerThreadPool& NetworkPool, - DownloadStatistics& DownloadStats, - std::function<void(IoBuffer&& Payload)>&& OnDownloadComplete); +struct ValidateStatistics +{ + uint64_t BuildBlobSize = 0; + uint64_t BuildPartSize = 0; + uint64_t ChunkAttachmentCount = 0; + uint64_t BlockAttachmentCount = 0; + std::atomic<uint64_t> VerifiedAttachmentCount = 0; + std::atomic<uint64_t> VerifiedByteCount = 0; + uint64_t ElapsedWallTimeUS = 0; +}; + +class BuildsOperationValidateBuildPart +{ +public: + struct Options + { + bool IsQuiet = false; + bool IsVerbose = false; + }; + BuildsOperationValidateBuildPart(BuildOpLogOutput& LogOutput, + BuildStorage& Storage, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag, + WorkerThreadPool& IOWorkerPool, + WorkerThreadPool& NetworkPool, + const Oid& BuildId, + const Oid& BuildPartId, + const std::string_view BuildPartName, + const Options& Options); + + void Execute(); + + ValidateStatistics m_ValidateStats; + DownloadStatistics m_DownloadStats; + +private: + ChunkBlockDescription ValidateChunkBlock(IoBuffer&& Payload, + const IoHash& BlobHash, + uint64_t& OutCompressedSize, + uint64_t& OutDecompressedSize); + + BuildOpLogOutput& m_LogOutput; + BuildStorage& m_Storage; + std::atomic<bool>& m_AbortFlag; + std::atomic<bool>& m_PauseFlag; + WorkerThreadPool& m_IOWorkerPool; + WorkerThreadPool& m_NetworkPool; + const Oid m_BuildId; + Oid m_BuildPartId; + const std::string m_BuildPartName; + const Options m_Options; +}; + +CompositeBuffer ValidateBlob(std::atomic<bool>& AbortFlag, + BuildStorage& Storage, + const Oid& BuildId, + const IoHash& BlobHash, + uint64_t& OutCompressedSize, + uint64_t& OutDecompressedSize); } // namespace zen |