// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include #include ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_WITH_TESTS # include # include # include #endif // ZEN_WITH_TESTS namespace zen { using namespace std::literals; namespace { std::filesystem::path ZenTempCacheFolderPath(const std::filesystem::path& ZenFolderPath) { return ZenTempFolderPath(ZenFolderPath) / "cache"; // Decompressed and verified data - chunks & sequences } std::filesystem::path ZenTempBlockFolderPath(const std::filesystem::path& ZenFolderPath) { return ZenTempFolderPath(ZenFolderPath) / "blocks"; // Temp storage for whole and partial blocks } std::filesystem::path ZenTempDownloadFolderPath(const std::filesystem::path& ZenFolderPath) { return ZenTempFolderPath(ZenFolderPath) / "download"; // Temp storage for decompressed and validated chunks } uint64_t GetBytesPerSecond(uint64_t ElapsedWallTimeUS, uint64_t Count) { if (ElapsedWallTimeUS == 0) { return 0; } return Count * 1000000 / ElapsedWallTimeUS; } std::filesystem::path GetTempChunkedSequenceFileName(const std::filesystem::path& CacheFolderPath, const IoHash& RawHash) { return CacheFolderPath / (RawHash.ToHexString() + ".tmp"); } std::filesystem::path GetFinalChunkedSequenceFileName(const std::filesystem::path& CacheFolderPath, const IoHash& RawHash) { return CacheFolderPath / RawHash.ToHexString(); } bool CleanDirectory(OperationLogOutput& OperationLogOutput, WorkerThreadPool& IOWorkerPool, std::atomic& AbortFlag, std::atomic& PauseFlag, bool IsQuiet, const std::filesystem::path& Path, std::span ExcludeDirectories) { ZEN_TRACE_CPU("CleanDirectory"); Stopwatch Timer; std::unique_ptr ProgressBarPtr(OperationLogOutput.CreateProgressBar("Clean Folder")); OperationLogOutput::ProgressBar& Progress(*ProgressBarPtr); CleanDirectoryResult Result = CleanDirectory( IOWorkerPool, AbortFlag, PauseFlag, Path, ExcludeDirectories, [&](const std::string_view Details, uint64_t TotalCount, uint64_t RemainingCount, bool IsPaused, bool IsAborted) { Progress.UpdateState({.Task = "Cleaning folder ", .Details = std::string(Details), .TotalCount = TotalCount, .RemainingCount = RemainingCount, .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }, OperationLogOutput.GetProgressUpdateDelayMS()); Progress.Finish(); if (AbortFlag) { return false; } uint64_t ElapsedTimeMs = Timer.GetElapsedTimeMs(); if (!Result.FailedRemovePaths.empty()) { ExtendableStringBuilder<512> SB; for (size_t FailedPathIndex = 0; FailedPathIndex < Result.FailedRemovePaths.size(); FailedPathIndex++) { SB << fmt::format("\n '{}': ({}) {}", Result.FailedRemovePaths[FailedPathIndex].first, Result.FailedRemovePaths[FailedPathIndex].second.value(), Result.FailedRemovePaths[FailedPathIndex].second.message()); } ZEN_OPERATION_LOG_WARN(OperationLogOutput, "Clean failed to remove files from '{}': {}", Path, SB.ToView()); } if (ElapsedTimeMs >= 200 && !IsQuiet) { ZEN_OPERATION_LOG_INFO(OperationLogOutput, "Wiped folder '{}' {} ({}) in {}", Path, Result.FoundCount, NiceBytes(Result.DeletedByteCount), NiceTimeSpanMs(ElapsedTimeMs)); } return Result.FailedRemovePaths.empty(); } bool IsExtensionHashCompressable(const tsl::robin_set& NonCompressableExtensionHashes, const uint32_t PathHash) { return !NonCompressableExtensionHashes.contains(PathHash); } bool IsChunkCompressable(const tsl::robin_set& NonCompressableExtensionHashes, const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, uint32_t ChunkIndex) { ZEN_UNUSED(Content); const uint32_t ChunkLocationCount = Lookup.ChunkSequenceLocationCounts[ChunkIndex]; if (ChunkLocationCount == 0) { return false; } const size_t ChunkLocationOffset = Lookup.ChunkSequenceLocationOffset[ChunkIndex]; const uint32_t SequenceIndex = Lookup.ChunkSequenceLocations[ChunkLocationOffset].SequenceIndex; const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex]; const uint32_t ExtensionHash = Lookup.PathExtensionHash[PathIndex]; const bool IsCompressable = IsExtensionHashCompressable(NonCompressableExtensionHashes, ExtensionHash); return IsCompressable; } template std::string FormatArray(std::span Items, std::string_view Prefix) { ExtendableStringBuilder<512> SB; for (const T& Item : Items) { SB.Append(fmt::format("{}{}", Prefix, Item)); } return SB.ToString(); } void DownloadLargeBlob(BuildStorageBase& Storage, const std::filesystem::path& DownloadFolder, const Oid& BuildId, const IoHash& ChunkHash, const std::uint64_t PreferredMultipartChunkSize, ParallelWork& Work, WorkerThreadPool& NetworkPool, std::atomic& DownloadedChunkByteCount, std::atomic& MultipartAttachmentCount, std::function&& OnDownloadComplete) { ZEN_TRACE_CPU("DownloadLargeBlob"); struct WorkloadData { TemporaryFile TempFile; }; std::shared_ptr Workload(std::make_shared()); std::error_code Ec; Workload->TempFile.CreateTemporary(DownloadFolder, Ec); if (Ec) { throw std::runtime_error( fmt::format("Failed opening temporary file '{}', reason: ({}) {}", Workload->TempFile.GetPath(), Ec.message(), Ec.value())); } std::vector> WorkItems = Storage.GetLargeBuildBlob( BuildId, ChunkHash, PreferredMultipartChunkSize, [&Work, Workload, &DownloadedChunkByteCount](uint64_t Offset, const IoBuffer& Chunk) { DownloadedChunkByteCount += Chunk.GetSize(); if (!Work.IsAborted()) { ZEN_TRACE_CPU("Async_DownloadLargeBlob_OnReceive"); Workload->TempFile.Write(Chunk.GetView(), Offset); } }, [&Work, Workload, &DownloadedChunkByteCount, OnDownloadComplete = std::move(OnDownloadComplete)]() { if (!Work.IsAborted()) { ZEN_TRACE_CPU("Async_DownloadLargeBlob_OnComplete"); uint64_t PayloadSize = Workload->TempFile.FileSize(); void* FileHandle = Workload->TempFile.Detach(); ZEN_ASSERT(FileHandle != nullptr); IoBuffer Payload(IoBuffer::File, FileHandle, 0, PayloadSize, true); Payload.SetDeleteOnClose(true); OnDownloadComplete(std::move(Payload)); } }); if (!WorkItems.empty()) { MultipartAttachmentCount++; } for (auto& WorkItem : WorkItems) { Work.ScheduleWork(NetworkPool, [WorkItem = std::move(WorkItem)](std::atomic& AbortFlag) { if (!AbortFlag) { ZEN_TRACE_CPU("Async_DownloadLargeBlob_Work"); WorkItem(); } }); } } CompositeBuffer ValidateBlob(std::atomic& AbortFlag, IoBuffer&& Payload, const IoHash& BlobHash, uint64_t& OutCompressedSize, uint64_t& OutDecompressedSize) { ZEN_TRACE_CPU("ValidateBlob"); if (Payload.GetContentType() != ZenContentType::kCompressedBinary) { throw std::runtime_error(fmt::format("Blob {} ({} bytes) has unexpected content type '{}'", BlobHash, Payload.GetSize(), ToString(Payload.GetContentType()))); } IoHash RawHash; uint64_t RawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, RawSize); if (!Compressed) { throw std::runtime_error(fmt::format("Blob {} ({} bytes) compressed header is invalid", BlobHash, Payload.GetSize())); } if (RawHash != BlobHash) { throw std::runtime_error( fmt::format("Blob {} ({} bytes) compressed header has a mismatching raw hash {}", BlobHash, Payload.GetSize(), RawHash)); } IoHashStream Hash; bool CouldDecompress = Compressed.DecompressToStream( 0, RawSize, [&AbortFlag, &Hash](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { ZEN_UNUSED(SourceOffset, SourceSize, Offset); if (!AbortFlag) { for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) { Hash.Append(Segment.GetView()); } return true; } return false; }); if (AbortFlag) { return CompositeBuffer{}; } if (!CouldDecompress) { throw std::runtime_error( fmt::format("Blob {} ({} bytes) failed to decompress - header information mismatch", BlobHash, Payload.GetSize())); } IoHash ValidateRawHash = Hash.GetHash(); if (ValidateRawHash != BlobHash) { throw std::runtime_error(fmt::format("Blob {} ({} bytes) decompressed hash {} does not match header information", BlobHash, Payload.GetSize(), ValidateRawHash)); } OodleCompressor Compressor; OodleCompressionLevel CompressionLevel; uint64_t BlockSize; if (!Compressed.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) { throw std::runtime_error(fmt::format("Blob {} ({} bytes) failed to get compression details", BlobHash, Payload.GetSize())); } OutCompressedSize = Payload.GetSize(); OutDecompressedSize = RawSize; if (CompressionLevel == OodleCompressionLevel::None) { // Only decompress to composite if we need it for block verification CompositeBuffer DecompressedComposite = Compressed.DecompressToComposite(); if (!DecompressedComposite) { throw std::runtime_error(fmt::format("Blob {} ({} bytes) failed to decompress to composite", BlobHash, Payload.GetSize())); } return DecompressedComposite; } return CompositeBuffer{}; } } // namespace bool IsSingleFileChunk(const ChunkedFolderContent& RemoteContent, const std::vector Locations) { if (Locations.size() == 1) { const uint32_t FirstSequenceIndex = Locations[0]->SequenceIndex; if (RemoteContent.ChunkedContent.ChunkCounts[FirstSequenceIndex] == 1) { ZEN_ASSERT_SLOW(Locations[0]->Offset == 0); return true; } } return false; } IoBuffer MakeBufferMemoryBased(const CompositeBuffer& PartialBlockBuffer) { ZEN_TRACE_CPU("MakeBufferMemoryBased"); IoBuffer BlockMemoryBuffer; std::span Segments = PartialBlockBuffer.GetSegments(); if (Segments.size() == 1) { IoBufferFileReference FileRef = {}; if (PartialBlockBuffer.GetSegments().front().AsIoBuffer().GetFileReference(FileRef)) { BlockMemoryBuffer = UniqueBuffer::Alloc(FileRef.FileChunkSize).MoveToShared().AsIoBuffer(); BasicFile Reader; Reader.Attach(FileRef.FileHandle); auto _ = MakeGuard([&Reader]() { Reader.Detach(); }); MutableMemoryView ReadMem = BlockMemoryBuffer.GetMutableView(); Reader.Read(ReadMem.GetData(), FileRef.FileChunkSize, FileRef.FileChunkOffset); return BlockMemoryBuffer; } else { return PartialBlockBuffer.GetSegments().front().AsIoBuffer(); } } else { // Not a homogenous memory buffer, read all to memory BlockMemoryBuffer = UniqueBuffer::Alloc(PartialBlockBuffer.GetSize()).MoveToShared().AsIoBuffer(); MutableMemoryView ReadMem = BlockMemoryBuffer.GetMutableView(); for (const SharedBuffer& Segment : Segments) { IoBufferFileReference FileRef = {}; if (Segment.AsIoBuffer().GetFileReference(FileRef)) { BasicFile Reader; Reader.Attach(FileRef.FileHandle); auto _ = MakeGuard([&Reader]() { Reader.Detach(); }); Reader.Read(ReadMem.GetData(), FileRef.FileChunkSize, FileRef.FileChunkOffset); ReadMem = ReadMem.Mid(FileRef.FileChunkSize); } else { ReadMem = ReadMem.CopyFrom(Segment.AsIoBuffer().GetView()); } } return BlockMemoryBuffer; } } class FilteredRate { public: FilteredRate() {} void Start() { if (StartTimeUS == (uint64_t)-1) { uint64_t Expected = (uint64_t)-1; if (StartTimeUS.compare_exchange_weak(Expected, Timer.GetElapsedTimeUs())) { LastTimeUS = StartTimeUS.load(); } } } void Stop() { if (EndTimeUS == (uint64_t)-1) { uint64_t Expected = (uint64_t)-1; EndTimeUS.compare_exchange_weak(Expected, Timer.GetElapsedTimeUs()); } } void Update(uint64_t Count) { if (LastTimeUS == (uint64_t)-1) { return; } uint64_t TimeUS = Timer.GetElapsedTimeUs(); uint64_t TimeDeltaUS = TimeUS - LastTimeUS; if (TimeDeltaUS >= 2000000) { uint64_t Delta = Count - LastCount; uint64_t PerSecond = (Delta * 1000000) / TimeDeltaUS; LastPerSecond = PerSecond; LastCount = Count; FilteredPerSecond = (PerSecond + (LastPerSecond * 7)) / 8; LastTimeUS = TimeUS; } } uint64_t GetCurrent() const // If Stopped - return total count / total time { if (LastTimeUS == (uint64_t)-1) { return 0; } return FilteredPerSecond; } uint64_t GetElapsedTimeUS() const { if (StartTimeUS == (uint64_t)-1) { return 0; } if (EndTimeUS == (uint64_t)-1) { return 0; } uint64_t TimeDeltaUS = EndTimeUS - StartTimeUS; return TimeDeltaUS; } bool IsActive() const { return (StartTimeUS != (uint64_t)-1) && (EndTimeUS == (uint64_t)-1); } private: Stopwatch Timer; std::atomic StartTimeUS = (uint64_t)-1; std::atomic EndTimeUS = (uint64_t)-1; std::atomic LastTimeUS = (uint64_t)-1; uint64_t LastCount = 0; uint64_t LastPerSecond = 0; uint64_t FilteredPerSecond = 0; }; std::filesystem::path ZenStateFilePath(const std::filesystem::path& ZenFolderPath) { return ZenFolderPath / "current_state.cbo"; } std::filesystem::path ZenTempFolderPath(const std::filesystem::path& ZenFolderPath) { return ZenFolderPath / "tmp"; } ////////////////////// BuildsOperationUpdateFolder BuildsOperationUpdateFolder::BuildsOperationUpdateFolder(OperationLogOutput& OperationLogOutput, StorageInstance& Storage, std::atomic& AbortFlag, std::atomic& PauseFlag, WorkerThreadPool& IOWorkerPool, WorkerThreadPool& NetworkPool, const Oid& BuildId, const std::filesystem::path& Path, const ChunkedFolderContent& LocalContent, const ChunkedContentLookup& LocalLookup, const ChunkedFolderContent& RemoteContent, const ChunkedContentLookup& RemoteLookup, const std::vector& BlockDescriptions, const std::vector& LooseChunkHashes, const Options& Options) : m_LogOutput(OperationLogOutput) , m_Storage(Storage) , m_AbortFlag(AbortFlag) , m_PauseFlag(PauseFlag) , m_IOWorkerPool(IOWorkerPool) , m_NetworkPool(NetworkPool) , m_BuildId(BuildId) , m_Path(Path) , m_LocalContent(LocalContent) , m_LocalLookup(LocalLookup) , m_RemoteContent(RemoteContent) , m_RemoteLookup(RemoteLookup) , m_BlockDescriptions(BlockDescriptions) , m_LooseChunkHashes(LooseChunkHashes) , m_Options(Options) , m_CacheFolderPath(ZenTempCacheFolderPath(m_Options.ZenFolderPath)) , m_TempDownloadFolderPath(ZenTempDownloadFolderPath(m_Options.ZenFolderPath)) , m_TempBlockFolderPath(ZenTempBlockFolderPath(m_Options.ZenFolderPath)) { } void BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) { ZEN_TRACE_CPU("BuildsOperationUpdateFolder::Execute"); try { enum class TaskSteps : uint32_t { ScanExistingData, WriteChunks, PrepareTarget, FinalizeTarget, Cleanup, StepCount }; auto EndProgress = MakeGuard([&]() { m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::StepCount, (uint32_t)TaskSteps::StepCount); }); ZEN_ASSERT((!m_Options.PrimeCacheOnly) || (m_Options.PrimeCacheOnly && (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Off))); m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::ScanExistingData, (uint32_t)TaskSteps::StepCount); CreateDirectories(m_CacheFolderPath); CreateDirectories(m_TempDownloadFolderPath); CreateDirectories(m_TempBlockFolderPath); Stopwatch CacheMappingTimer; std::vector> SequenceIndexChunksLeftToWriteCounters(m_RemoteContent.ChunkedContent.SequenceRawHashes.size()); std::vector RemoteChunkIndexNeedsCopyFromLocalFileFlags(m_RemoteContent.ChunkedContent.ChunkHashes.size()); std::vector> RemoteChunkIndexNeedsCopyFromSourceFlags(m_RemoteContent.ChunkedContent.ChunkHashes.size()); tsl::robin_map CachedChunkHashesFound; tsl::robin_map CachedSequenceHashesFound; if (!m_Options.PrimeCacheOnly) { ScanCacheFolder(CachedChunkHashesFound, CachedSequenceHashesFound); } tsl::robin_map CachedBlocksFound; if (!m_Options.PrimeCacheOnly) { ScanTempBlocksFolder(CachedBlocksFound); } tsl::robin_map SequenceIndexesLeftToFindToRemoteIndex; if (!m_Options.PrimeCacheOnly && m_Options.EnableTargetFolderScavenging) { // Pick up all whole files we can use from current local state ZEN_TRACE_CPU("GetLocalSequences"); Stopwatch LocalTimer; std::vector MissingSequenceIndexes = ScanTargetFolder(CachedChunkHashesFound, CachedSequenceHashesFound); for (uint32_t RemoteSequenceIndex : MissingSequenceIndexes) { // We must write the sequence const uint32_t ChunkCount = m_RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]; const IoHash& RemoteSequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = ChunkCount; SequenceIndexesLeftToFindToRemoteIndex.insert({RemoteSequenceRawHash, RemoteSequenceIndex}); } } else { for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < m_RemoteContent.ChunkedContent.SequenceRawHashes.size(); RemoteSequenceIndex++) { const uint32_t ChunkCount = m_RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]; SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = ChunkCount; } } std::vector ScavengedContents; std::vector ScavengedLookups; std::vector ScavengedPaths; std::vector ScavengedSequenceCopyOperations; uint64_t ScavengedPathsCount = 0; if (!m_Options.PrimeCacheOnly && m_Options.EnableOtherDownloadsScavenging) { ZEN_TRACE_CPU("GetScavengedSequences"); Stopwatch ScavengeTimer; if (!SequenceIndexesLeftToFindToRemoteIndex.empty()) { std::vector ScavengeSources = FindScavengeSources(); const size_t ScavengePathCount = ScavengeSources.size(); ScavengedContents.resize(ScavengePathCount); ScavengedLookups.resize(ScavengePathCount); ScavengedPaths.resize(ScavengePathCount); std::unique_ptr ProgressBarPtr(m_LogOutput.CreateProgressBar("Scavenging")); OperationLogOutput::ProgressBar& ScavengeProgressBar(*ProgressBarPtr); ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); std::atomic PathsFound(0); std::atomic ChunksFound(0); std::atomic PathsScavenged(0); for (size_t ScavengeIndex = 0; ScavengeIndex < ScavengePathCount; ScavengeIndex++) { Work.ScheduleWork(m_IOWorkerPool, [this, &ScavengeSources, &ScavengedContents, &ScavengedPaths, &ScavengedLookups, &PathsFound, &ChunksFound, &PathsScavenged, ScavengeIndex](std::atomic&) { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_FindScavengeContent"); const ScavengeSource& Source = ScavengeSources[ScavengeIndex]; ChunkedFolderContent& ScavengedLocalContent = ScavengedContents[ScavengeIndex]; ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengeIndex]; if (FindScavengeContent(Source, ScavengedLocalContent, ScavengedLookup)) { ScavengedPaths[ScavengeIndex] = Source.Path; PathsFound += ScavengedLocalContent.Paths.size(); ChunksFound += ScavengedLocalContent.ChunkedContent.ChunkHashes.size(); } else { ScavengedPaths[ScavengeIndex].clear(); } PathsScavenged++; } }); } { ZEN_TRACE_CPU("ScavengeScan_Wait"); Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { ZEN_UNUSED(PendingWork); std::string Details = fmt::format("{}/{} scanned. {} paths and {} chunks found for scavenging", PathsScavenged.load(), ScavengePathCount, PathsFound.load(), ChunksFound.load()); ScavengeProgressBar.UpdateState( {.Task = "Scavenging ", .Details = Details, .TotalCount = ScavengePathCount, .RemainingCount = ScavengePathCount - PathsScavenged.load(), .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); } ScavengeProgressBar.Finish(); if (m_AbortFlag) { return; } for (uint32_t ScavengedContentIndex = 0; ScavengedContentIndex < ScavengedContents.size() && (!SequenceIndexesLeftToFindToRemoteIndex.empty()); ScavengedContentIndex++) { const std::filesystem::path& ScavengePath = ScavengedPaths[ScavengedContentIndex]; if (!ScavengePath.empty()) { const ChunkedFolderContent& ScavengedLocalContent = ScavengedContents[ScavengedContentIndex]; const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengedContentIndex]; for (uint32_t ScavengedSequenceIndex = 0; ScavengedSequenceIndex < ScavengedLocalContent.ChunkedContent.SequenceRawHashes.size(); ScavengedSequenceIndex++) { const IoHash& SequenceRawHash = ScavengedLocalContent.ChunkedContent.SequenceRawHashes[ScavengedSequenceIndex]; if (auto It = SequenceIndexesLeftToFindToRemoteIndex.find(SequenceRawHash); It != SequenceIndexesLeftToFindToRemoteIndex.end()) { const uint32_t RemoteSequenceIndex = It->second; const uint64_t RawSize = m_RemoteContent.RawSizes[m_RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]]; ZEN_ASSERT(RawSize > 0); const uint32_t ScavengedPathIndex = ScavengedLookup.SequenceIndexFirstPathIndex[ScavengedSequenceIndex]; ZEN_ASSERT_SLOW(IsFile((ScavengePath / ScavengedLocalContent.Paths[ScavengedPathIndex]).make_preferred())); ScavengedSequenceCopyOperations.push_back({.ScavengedContentIndex = ScavengedContentIndex, .ScavengedPathIndex = ScavengedPathIndex, .RemoteSequenceIndex = RemoteSequenceIndex, .RawSize = RawSize}); SequenceIndexesLeftToFindToRemoteIndex.erase(SequenceRawHash); SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = 0; m_CacheMappingStats.ScavengedPathsMatchingSequencesCount++; m_CacheMappingStats.ScavengedPathsMatchingSequencesByteCount += RawSize; } } ScavengedPathsCount++; } } } m_CacheMappingStats.ScavengeElapsedWallTimeUs += ScavengeTimer.GetElapsedTimeUs(); } uint32_t RemainingChunkCount = 0; for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < m_RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++) { uint64_t ChunkWriteCount = GetChunkWriteCount(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); if (ChunkWriteCount > 0) { RemainingChunkCount++; } } // Pick up all chunks in current local state tsl::robin_map RawHashToCopyChunkDataIndex; std::vector CopyChunkDatas; if (!m_Options.PrimeCacheOnly && m_Options.EnableTargetFolderScavenging) { ZEN_TRACE_CPU("GetLocalChunks"); Stopwatch LocalTimer; ScavengeSourceForChunks(RemainingChunkCount, RemoteChunkIndexNeedsCopyFromLocalFileFlags, RawHashToCopyChunkDataIndex, SequenceIndexChunksLeftToWriteCounters, m_LocalContent, m_LocalLookup, CopyChunkDatas, uint32_t(-1), m_CacheMappingStats.LocalChunkMatchingRemoteCount, m_CacheMappingStats.LocalChunkMatchingRemoteByteCount); m_CacheMappingStats.LocalScanElapsedWallTimeUs += LocalTimer.GetElapsedTimeUs(); } if (!m_Options.PrimeCacheOnly && m_Options.EnableOtherDownloadsScavenging) { ZEN_TRACE_CPU("GetScavengeChunks"); Stopwatch ScavengeTimer; for (uint32_t ScavengedContentIndex = 0; ScavengedContentIndex < ScavengedContents.size() && (RemainingChunkCount > 0); ScavengedContentIndex++) { const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengedContentIndex]; const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengedContentIndex]; ScavengeSourceForChunks(RemainingChunkCount, RemoteChunkIndexNeedsCopyFromLocalFileFlags, RawHashToCopyChunkDataIndex, SequenceIndexChunksLeftToWriteCounters, ScavengedContent, ScavengedLookup, CopyChunkDatas, ScavengedContentIndex, m_CacheMappingStats.ScavengedChunkMatchingRemoteCount, m_CacheMappingStats.ScavengedChunkMatchingRemoteByteCount); } m_CacheMappingStats.ScavengeElapsedWallTimeUs += ScavengeTimer.GetElapsedTimeUs(); } if (!m_Options.IsQuiet) { if (m_CacheMappingStats.CacheSequenceHashesCount > 0 || m_CacheMappingStats.CacheChunkCount > 0 || m_CacheMappingStats.CacheBlockCount > 0) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Download cache: Found {} ({}) chunk sequences, {} ({}) chunks, {} ({}) blocks in {}", m_CacheMappingStats.CacheSequenceHashesCount, NiceBytes(m_CacheMappingStats.CacheSequenceHashesByteCount), m_CacheMappingStats.CacheChunkCount, NiceBytes(m_CacheMappingStats.CacheChunkByteCount), m_CacheMappingStats.CacheBlockCount, NiceBytes(m_CacheMappingStats.CacheBlocksByteCount), NiceTimeSpanMs(m_CacheMappingStats.CacheScanElapsedWallTimeUs / 1000)); } if (m_CacheMappingStats.LocalPathsMatchingSequencesCount > 0 || m_CacheMappingStats.LocalChunkMatchingRemoteCount > 0) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Local state : Found {} ({}) chunk sequences, {} ({}) chunks in {}", m_CacheMappingStats.LocalPathsMatchingSequencesCount, NiceBytes(m_CacheMappingStats.LocalPathsMatchingSequencesByteCount), m_CacheMappingStats.LocalChunkMatchingRemoteCount, NiceBytes(m_CacheMappingStats.LocalChunkMatchingRemoteByteCount), NiceTimeSpanMs(m_CacheMappingStats.LocalScanElapsedWallTimeUs / 1000)); } if (m_CacheMappingStats.ScavengedPathsMatchingSequencesCount > 0 || m_CacheMappingStats.ScavengedChunkMatchingRemoteCount > 0) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Scavenge of {} paths, found {} ({}) chunk sequences, {} ({}) chunks in {}", ScavengedPathsCount, m_CacheMappingStats.ScavengedPathsMatchingSequencesCount, NiceBytes(m_CacheMappingStats.ScavengedPathsMatchingSequencesByteCount), m_CacheMappingStats.ScavengedChunkMatchingRemoteCount, NiceBytes(m_CacheMappingStats.ScavengedChunkMatchingRemoteByteCount), NiceTimeSpanMs(m_CacheMappingStats.ScavengeElapsedWallTimeUs / 1000)); } } uint64_t BytesToWrite = 0; for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < m_RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++) { uint64_t ChunkWriteCount = GetChunkWriteCount(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); if (ChunkWriteCount > 0) { BytesToWrite += m_RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] * ChunkWriteCount; if (!RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) { RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex] = true; } } } for (const ScavengedSequenceCopyOperation& ScavengeCopyOp : ScavengedSequenceCopyOperations) { BytesToWrite += ScavengeCopyOp.RawSize; } uint64_t BytesToValidate = m_Options.ValidateCompletedSequences ? BytesToWrite : 0; uint64_t TotalRequestCount = 0; uint64_t TotalPartWriteCount = 0; std::atomic WritePartsComplete = 0; tsl::robin_map RemotePathToRemoteIndex; RemotePathToRemoteIndex.reserve(m_RemoteContent.Paths.size()); for (uint32_t RemotePathIndex = 0; RemotePathIndex < m_RemoteContent.Paths.size(); RemotePathIndex++) { RemotePathToRemoteIndex.insert({m_RemoteContent.Paths[RemotePathIndex].generic_string(), RemotePathIndex}); } CheckRequiredDiskSpace(RemotePathToRemoteIndex); BlobsExistsResult ExistsResult; { ChunkBlockAnalyser BlockAnalyser(m_LogOutput, m_BlockDescriptions, ChunkBlockAnalyser::Options{.IsQuiet = m_Options.IsQuiet, .IsVerbose = m_Options.IsVerbose, .HostLatencySec = m_Storage.BuildStorageLatencySec, .HostHighSpeedLatencySec = m_Storage.CacheLatencySec}); std::vector NeededBlocks = BlockAnalyser.GetNeeded( m_RemoteLookup.ChunkHashToChunkIndex, [&](uint32_t RemoteChunkIndex) -> bool { return RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex]; }); std::vector FetchBlockIndexes; std::vector CachedChunkBlockIndexes; { ZEN_TRACE_CPU("BlockCacheFileExists"); for (const ChunkBlockAnalyser::NeededBlock& NeededBlock : NeededBlocks) { if (m_Options.PrimeCacheOnly) { FetchBlockIndexes.push_back(NeededBlock.BlockIndex); } else { const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[NeededBlock.BlockIndex]; bool UsingCachedBlock = false; if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end()) { TotalPartWriteCount++; std::filesystem::path BlockPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); if (IsFile(BlockPath)) { CachedChunkBlockIndexes.push_back(NeededBlock.BlockIndex); UsingCachedBlock = true; } } if (!UsingCachedBlock) { FetchBlockIndexes.push_back(NeededBlock.BlockIndex); } } } } std::vector NeededLooseChunkIndexes; { NeededLooseChunkIndexes.reserve(m_LooseChunkHashes.size()); for (uint32_t LooseChunkIndex = 0; LooseChunkIndex < m_LooseChunkHashes.size(); LooseChunkIndex++) { const IoHash& ChunkHash = m_LooseChunkHashes[LooseChunkIndex]; auto RemoteChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); ZEN_ASSERT(RemoteChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end()); const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second; if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) { if (m_Options.IsVerbose) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Skipping chunk {} due to cache reuse", m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]); } continue; } bool NeedsCopy = true; if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex].compare_exchange_strong(NeedsCopy, false)) { uint64_t WriteCount = GetChunkWriteCount(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); if (WriteCount == 0) { if (m_Options.IsVerbose) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Skipping chunk {} due to cache reuse", m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]); } } else { NeededLooseChunkIndexes.push_back(LooseChunkIndex); } } } } if (m_Storage.BuildCacheStorage) { ZEN_TRACE_CPU("BlobCacheExistCheck"); Stopwatch Timer; std::vector BlobHashes; BlobHashes.reserve(NeededLooseChunkIndexes.size() + FetchBlockIndexes.size()); for (const uint32_t LooseChunkIndex : NeededLooseChunkIndexes) { BlobHashes.push_back(m_LooseChunkHashes[LooseChunkIndex]); } for (uint32_t BlockIndex : FetchBlockIndexes) { BlobHashes.push_back(m_BlockDescriptions[BlockIndex].BlockHash); } const std::vector CacheExistsResult = m_Storage.BuildCacheStorage->BlobsExists(m_BuildId, BlobHashes); if (CacheExistsResult.size() == BlobHashes.size()) { ExistsResult.ExistingBlobs.reserve(CacheExistsResult.size()); for (size_t BlobIndex = 0; BlobIndex < BlobHashes.size(); BlobIndex++) { if (CacheExistsResult[BlobIndex].HasBody) { ExistsResult.ExistingBlobs.insert(BlobHashes[BlobIndex]); } } } ExistsResult.ElapsedTimeMs = Timer.GetElapsedTimeMs(); if (!ExistsResult.ExistingBlobs.empty() && !m_Options.IsQuiet) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Remote cache : Found {} out of {} needed blobs in {}", ExistsResult.ExistingBlobs.size(), BlobHashes.size(), NiceTimeSpanMs(ExistsResult.ElapsedTimeMs)); } } std::vector BlockPartialDownloadModes; if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Off) { BlockPartialDownloadModes.resize(m_BlockDescriptions.size(), ChunkBlockAnalyser::EPartialBlockDownloadMode::Off); } else { BlockPartialDownloadModes.reserve(m_BlockDescriptions.size()); for (uint32_t BlockIndex = 0; BlockIndex < m_BlockDescriptions.size(); BlockIndex++) { const bool BlockExistInCache = ExistsResult.ExistingBlobs.contains(m_BlockDescriptions[BlockIndex].BlockHash); if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::All) { BlockPartialDownloadModes.push_back(BlockExistInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange); } else if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::ZenCacheOnly) { BlockPartialDownloadModes.push_back(BlockExistInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed : ChunkBlockAnalyser::EPartialBlockDownloadMode::Off); } else if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Mixed) { BlockPartialDownloadModes.push_back(BlockExistInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed : ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange); } } } ZEN_ASSERT(BlockPartialDownloadModes.size() == m_BlockDescriptions.size()); ChunkBlockAnalyser::BlockResult PartialBlocks = BlockAnalyser.CalculatePartialBlockDownloads(NeededBlocks, BlockPartialDownloadModes); struct LooseChunkHashWorkData { std::vector ChunkTargetPtrs; uint32_t RemoteChunkIndex = (uint32_t)-1; }; TotalRequestCount += NeededLooseChunkIndexes.size(); TotalPartWriteCount += NeededLooseChunkIndexes.size(); TotalRequestCount += PartialBlocks.BlockRanges.size(); TotalPartWriteCount += PartialBlocks.BlockRanges.size(); TotalRequestCount += PartialBlocks.FullBlockIndexes.size(); TotalPartWriteCount += PartialBlocks.FullBlockIndexes.size(); std::vector LooseChunkHashWorks; for (uint32_t LooseChunkIndex : NeededLooseChunkIndexes) { const IoHash& ChunkHash = m_LooseChunkHashes[LooseChunkIndex]; auto RemoteChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); ZEN_ASSERT(RemoteChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end()); const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second; std::vector ChunkTargetPtrs = GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); ZEN_ASSERT(!ChunkTargetPtrs.empty()); LooseChunkHashWorks.push_back( LooseChunkHashWorkData{.ChunkTargetPtrs = ChunkTargetPtrs, .RemoteChunkIndex = RemoteChunkIndex}); } ZEN_TRACE_CPU("WriteChunks"); m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::WriteChunks, (uint32_t)TaskSteps::StepCount); Stopwatch WriteTimer; FilteredRate FilteredDownloadedBytesPerSecond; FilteredRate FilteredWrittenBytesPerSecond; std::unique_ptr WriteProgressBarPtr( m_LogOutput.CreateProgressBar(m_Options.PrimeCacheOnly ? "Downloading" : "Writing")); OperationLogOutput::ProgressBar& WriteProgressBar(*WriteProgressBarPtr); ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); TotalPartWriteCount += CopyChunkDatas.size(); TotalPartWriteCount += ScavengedSequenceCopyOperations.size(); BufferedWriteFileCache WriteCache; for (uint32_t ScavengeOpIndex = 0; ScavengeOpIndex < ScavengedSequenceCopyOperations.size(); ScavengeOpIndex++) { if (m_AbortFlag) { break; } if (!m_Options.PrimeCacheOnly) { Work.ScheduleWork( m_IOWorkerPool, [this, &ScavengedPaths, &ScavengedSequenceCopyOperations, &ScavengedContents, &FilteredWrittenBytesPerSecond, ScavengeOpIndex, &WritePartsComplete, TotalPartWriteCount](std::atomic&) mutable { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_WriteScavenged"); FilteredWrittenBytesPerSecond.Start(); const ScavengedSequenceCopyOperation& ScavengeOp = ScavengedSequenceCopyOperations[ScavengeOpIndex]; const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengeOp.ScavengedContentIndex]; const std::filesystem::path& ScavengeRootPath = ScavengedPaths[ScavengeOp.ScavengedContentIndex]; WriteScavengedSequenceToCache(ScavengeRootPath, ScavengedContent, ScavengeOp); WritePartsComplete++; if (WritePartsComplete == TotalPartWriteCount) { FilteredWrittenBytesPerSecond.Stop(); } } }); } } for (uint32_t LooseChunkHashWorkIndex = 0; LooseChunkHashWorkIndex < LooseChunkHashWorks.size(); LooseChunkHashWorkIndex++) { if (m_AbortFlag) { break; } if (m_Options.PrimeCacheOnly) { const uint32_t RemoteChunkIndex = LooseChunkHashWorks[LooseChunkHashWorkIndex].RemoteChunkIndex; if (ExistsResult.ExistingBlobs.contains(m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex])) { m_DownloadStats.RequestsCompleteCount++; continue; } } Work.ScheduleWork( m_IOWorkerPool, [this, &SequenceIndexChunksLeftToWriteCounters, &Work, &ExistsResult, &WritePartsComplete, &LooseChunkHashWorks, LooseChunkHashWorkIndex, TotalRequestCount, TotalPartWriteCount, &WriteCache, &FilteredDownloadedBytesPerSecond, &FilteredWrittenBytesPerSecond](std::atomic&) mutable { ZEN_TRACE_CPU("Async_ReadPreDownloadedChunk"); if (!m_AbortFlag) { LooseChunkHashWorkData& LooseChunkHashWork = LooseChunkHashWorks[LooseChunkHashWorkIndex]; const uint32_t RemoteChunkIndex = LooseChunkHashWorks[LooseChunkHashWorkIndex].RemoteChunkIndex; WriteLooseChunk(RemoteChunkIndex, ExistsResult, SequenceIndexChunksLeftToWriteCounters, WritePartsComplete, std::move(LooseChunkHashWork.ChunkTargetPtrs), WriteCache, Work, TotalRequestCount, TotalPartWriteCount, FilteredDownloadedBytesPerSecond, FilteredWrittenBytesPerSecond); } }, WorkerThreadPool::EMode::EnableBacklog); } std::unique_ptr CloneQuery; if (m_Options.AllowFileClone) { CloneQuery = GetCloneQueryInterface(m_CacheFolderPath); } for (size_t CopyDataIndex = 0; CopyDataIndex < CopyChunkDatas.size(); CopyDataIndex++) { ZEN_ASSERT(!m_Options.PrimeCacheOnly); if (m_AbortFlag) { break; } Work.ScheduleWork(m_IOWorkerPool, [this, &CloneQuery, &SequenceIndexChunksLeftToWriteCounters, &WriteCache, &Work, &FilteredWrittenBytesPerSecond, &CopyChunkDatas, &ScavengedContents, &ScavengedLookups, &ScavengedPaths, &WritePartsComplete, TotalPartWriteCount, CopyDataIndex](std::atomic&) { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_CopyLocal"); FilteredWrittenBytesPerSecond.Start(); const CopyChunkData& CopyData = CopyChunkDatas[CopyDataIndex]; std::vector WrittenSequenceIndexes = WriteLocalChunkToCache(CloneQuery.get(), CopyData, ScavengedContents, ScavengedLookups, ScavengedPaths, WriteCache); WritePartsComplete++; if (!m_AbortFlag) { if (WritePartsComplete == TotalPartWriteCount) { FilteredWrittenBytesPerSecond.Stop(); } // Write tracking, updating this must be done without any files open std::vector CompletedChunkSequences; for (uint32_t RemoteSequenceIndex : WrittenSequenceIndexes) { if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) { CompletedChunkSequences.push_back(RemoteSequenceIndex); } } WriteCache.Close(CompletedChunkSequences); VerifyAndCompleteChunkSequencesAsync(CompletedChunkSequences, Work); } } }); } for (uint32_t BlockIndex : CachedChunkBlockIndexes) { ZEN_ASSERT(!m_Options.PrimeCacheOnly); if (m_AbortFlag) { break; } Work.ScheduleWork( m_IOWorkerPool, [this, &RemoteChunkIndexNeedsCopyFromSourceFlags, &SequenceIndexChunksLeftToWriteCounters, &WriteCache, &Work, &FilteredWrittenBytesPerSecond, &WritePartsComplete, TotalPartWriteCount, BlockIndex](std::atomic&) mutable { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_WriteCachedBlock"); const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; FilteredWrittenBytesPerSecond.Start(); std::filesystem::path BlockChunkPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); IoBuffer BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); if (!BlockBuffer) { throw std::runtime_error( fmt::format("Can not read block {} at {}", BlockDescription.BlockHash, BlockChunkPath)); } if (!m_AbortFlag) { if (!WriteChunksBlockToCache(BlockDescription, SequenceIndexChunksLeftToWriteCounters, Work, CompositeBuffer(std::move(BlockBuffer)), RemoteChunkIndexNeedsCopyFromSourceFlags, WriteCache)) { std::error_code DummyEc; RemoveFile(BlockChunkPath, DummyEc); throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash)); } std::error_code Ec = TryRemoveFile(BlockChunkPath); if (Ec) { ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Failed removing file '{}', reason: ({}) {}", BlockChunkPath, Ec.value(), Ec.message()); } WritePartsComplete++; if (WritePartsComplete == TotalPartWriteCount) { FilteredWrittenBytesPerSecond.Stop(); } } } }); } for (size_t BlockRangeIndex = 0; BlockRangeIndex < PartialBlocks.BlockRanges.size();) { ZEN_ASSERT(!m_Options.PrimeCacheOnly); if (m_AbortFlag) { break; } size_t RangeCount = 1; size_t RangesLeft = PartialBlocks.BlockRanges.size() - BlockRangeIndex; const ChunkBlockAnalyser::BlockRangeDescriptor& CurrentBlockRange = PartialBlocks.BlockRanges[BlockRangeIndex]; while (RangeCount < RangesLeft && CurrentBlockRange.BlockIndex == PartialBlocks.BlockRanges[BlockRangeIndex + RangeCount].BlockIndex) { RangeCount++; } Work.ScheduleWork( m_NetworkPool, [this, &RemoteChunkIndexNeedsCopyFromSourceFlags, &SequenceIndexChunksLeftToWriteCounters, &ExistsResult, &WriteCache, &FilteredDownloadedBytesPerSecond, TotalRequestCount, &WritePartsComplete, TotalPartWriteCount, &FilteredWrittenBytesPerSecond, &Work, &PartialBlocks, BlockRangeStartIndex = BlockRangeIndex, RangeCount](std::atomic&) { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_GetPartialBlockRanges"); FilteredDownloadedBytesPerSecond.Start(); for (size_t BlockRangeIndex = BlockRangeStartIndex; BlockRangeIndex < BlockRangeStartIndex + RangeCount; BlockRangeIndex++) { ZEN_TRACE_CPU("GetPartialBlock"); const ChunkBlockAnalyser::BlockRangeDescriptor& BlockRange = PartialBlocks.BlockRanges[BlockRangeIndex]; DownloadPartialBlock( BlockRange, ExistsResult, [this, &RemoteChunkIndexNeedsCopyFromSourceFlags, &SequenceIndexChunksLeftToWriteCounters, &WritePartsComplete, &WriteCache, &Work, TotalRequestCount, TotalPartWriteCount, &FilteredDownloadedBytesPerSecond, &FilteredWrittenBytesPerSecond, &BlockRange](IoBuffer&& InMemoryBuffer, const std::filesystem::path& OnDiskPath) { if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) { FilteredDownloadedBytesPerSecond.Stop(); } if (!m_AbortFlag) { Work.ScheduleWork( m_IOWorkerPool, [this, &RemoteChunkIndexNeedsCopyFromSourceFlags, &SequenceIndexChunksLeftToWriteCounters, &WritePartsComplete, &WriteCache, &Work, TotalPartWriteCount, &FilteredWrittenBytesPerSecond, &BlockRange, BlockChunkPath = std::filesystem::path(OnDiskPath), BlockPartialBuffer = std::move(InMemoryBuffer)](std::atomic&) mutable { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_WritePartialBlock"); const uint32_t BlockIndex = BlockRange.BlockIndex; const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; if (BlockChunkPath.empty()) { ZEN_ASSERT(BlockPartialBuffer); } else { ZEN_ASSERT(!BlockPartialBuffer); BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); if (!BlockPartialBuffer) { throw std::runtime_error( fmt::format("Could not open downloaded block {} from {}", BlockDescription.BlockHash, BlockChunkPath)); } } FilteredWrittenBytesPerSecond.Start(); if (!WritePartialBlockChunksToCache( BlockDescription, SequenceIndexChunksLeftToWriteCounters, Work, CompositeBuffer(std::move(BlockPartialBuffer)), BlockRange.ChunkBlockIndexStart, BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1, RemoteChunkIndexNeedsCopyFromSourceFlags, WriteCache)) { std::error_code DummyEc; RemoveFile(BlockChunkPath, DummyEc); throw std::runtime_error( fmt::format("Partial block {} is malformed", BlockDescription.BlockHash)); } std::error_code Ec = TryRemoveFile(BlockChunkPath); if (Ec) { ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Failed removing file '{}', reason: ({}) {}", BlockChunkPath, Ec.value(), Ec.message()); } WritePartsComplete++; if (WritePartsComplete == TotalPartWriteCount) { FilteredWrittenBytesPerSecond.Stop(); } } }, OnDiskPath.empty() ? WorkerThreadPool::EMode::DisableBacklog : WorkerThreadPool::EMode::EnableBacklog); } }); } } }); BlockRangeIndex += RangeCount; } for (uint32_t BlockIndex : PartialBlocks.FullBlockIndexes) { if (m_AbortFlag) { break; } if (m_Options.PrimeCacheOnly && ExistsResult.ExistingBlobs.contains(m_BlockDescriptions[BlockIndex].BlockHash)) { m_DownloadStats.RequestsCompleteCount++; continue; } Work.ScheduleWork( m_NetworkPool, [this, &WritePartsComplete, TotalPartWriteCount, &FilteredWrittenBytesPerSecond, &ExistsResult, &Work, &WriteCache, &RemoteChunkIndexNeedsCopyFromSourceFlags, &SequenceIndexChunksLeftToWriteCounters, &FilteredDownloadedBytesPerSecond, TotalRequestCount, BlockIndex](std::atomic&) { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_GetFullBlock"); const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; FilteredDownloadedBytesPerSecond.Start(); IoBuffer BlockBuffer; const bool ExistsInCache = m_Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash); if (ExistsInCache) { BlockBuffer = m_Storage.BuildCacheStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash); } if (!BlockBuffer) { BlockBuffer = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash); if (BlockBuffer && m_Storage.BuildCacheStorage && m_Options.PopulateCache) { m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, BlockDescription.BlockHash, ZenContentType::kCompressedBinary, CompositeBuffer(SharedBuffer(BlockBuffer))); } } if (!BlockBuffer) { throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); } if (!m_AbortFlag) { uint64_t BlockSize = BlockBuffer.GetSize(); m_DownloadStats.DownloadedBlockCount++; m_DownloadStats.DownloadedBlockByteCount += BlockSize; m_DownloadStats.RequestsCompleteCount++; if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) { FilteredDownloadedBytesPerSecond.Stop(); } if (!m_Options.PrimeCacheOnly) { std::filesystem::path BlockChunkPath; // Check if the dowloaded block is file based and we can move it directly without rewriting it { IoBufferFileReference FileRef; if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == BlockSize)) { ZEN_TRACE_CPU("MoveTempFullBlock"); std::error_code Ec; std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); if (!Ec) { BlockBuffer.SetDeleteOnClose(false); BlockBuffer = {}; BlockChunkPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); RenameFile(TempBlobPath, BlockChunkPath, Ec); if (Ec) { BlockChunkPath = std::filesystem::path{}; // Re-open the temp file again BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); BlockBuffer.SetDeleteOnClose(true); } } } } if (BlockChunkPath.empty() && (BlockSize > m_Options.MaximumInMemoryPayloadSize)) { ZEN_TRACE_CPU("WriteTempFullBlock"); // Could not be moved and rather large, lets store it on disk BlockChunkPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); BlockBuffer = {}; } if (!m_AbortFlag) { Work.ScheduleWork( m_IOWorkerPool, [this, &Work, &RemoteChunkIndexNeedsCopyFromSourceFlags, &SequenceIndexChunksLeftToWriteCounters, BlockIndex, &WriteCache, &WritePartsComplete, TotalPartWriteCount, &FilteredWrittenBytesPerSecond, BlockChunkPath, BlockBuffer = std::move(BlockBuffer)](std::atomic&) mutable { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_WriteFullBlock"); const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; if (BlockChunkPath.empty()) { ZEN_ASSERT(BlockBuffer); } else { ZEN_ASSERT(!BlockBuffer); BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); if (!BlockBuffer) { throw std::runtime_error( fmt::format("Could not open dowloaded block {} from {}", BlockDescription.BlockHash, BlockChunkPath)); } } FilteredWrittenBytesPerSecond.Start(); if (!WriteChunksBlockToCache(BlockDescription, SequenceIndexChunksLeftToWriteCounters, Work, CompositeBuffer(std::move(BlockBuffer)), RemoteChunkIndexNeedsCopyFromSourceFlags, WriteCache)) { std::error_code DummyEc; RemoveFile(BlockChunkPath, DummyEc); throw std::runtime_error( fmt::format("Block {} is malformed", BlockDescription.BlockHash)); } if (!BlockChunkPath.empty()) { std::error_code Ec = TryRemoveFile(BlockChunkPath); if (Ec) { ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Failed removing file '{}', reason: ({}) {}", BlockChunkPath, Ec.value(), Ec.message()); } } WritePartsComplete++; if (WritePartsComplete == TotalPartWriteCount) { FilteredWrittenBytesPerSecond.Stop(); } } }, BlockChunkPath.empty() ? WorkerThreadPool::EMode::DisableBacklog : WorkerThreadPool::EMode::EnableBacklog); } } } } }); } { ZEN_TRACE_CPU("WriteChunks_Wait"); Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { ZEN_UNUSED(PendingWork); uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() + m_DownloadStats.DownloadedBlockByteCount.load() + +m_DownloadStats.DownloadedPartialBlockByteCount.load(); FilteredWrittenBytesPerSecond.Update(m_DiskStats.WriteByteCount.load()); FilteredDownloadedBytesPerSecond.Update(DownloadedBytes); std::string DownloadRateString = (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) ? "" : fmt::format(" {}bits/s", NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8)); std::string CloneDetails; if (m_DiskStats.CloneCount.load() > 0) { CloneDetails = fmt::format(" ({} cloned)", NiceBytes(m_DiskStats.CloneByteCount.load())); } std::string WriteDetails = m_Options.PrimeCacheOnly ? "" : fmt::format(" {}/{} ({}B/s) written{}", NiceBytes(m_WrittenChunkByteCount.load()), NiceBytes(BytesToWrite), NiceNum(FilteredWrittenBytesPerSecond.GetCurrent()), CloneDetails); std::string Details = fmt::format("{}/{} ({}{}) downloaded.{}", m_DownloadStats.RequestsCompleteCount.load(), TotalRequestCount, NiceBytes(DownloadedBytes), DownloadRateString, WriteDetails); std::string Task; if (m_Options.PrimeCacheOnly) { Task = "Downloading "; } else if ((m_WrittenChunkByteCount < BytesToWrite) || (BytesToValidate == 0)) { Task = "Writing chunks "; } else { Task = "Verifying chunks "; } WriteProgressBar.UpdateState( {.Task = Task, .Details = Details, .TotalCount = m_Options.PrimeCacheOnly ? TotalRequestCount : (BytesToWrite + BytesToValidate), .RemainingCount = m_Options.PrimeCacheOnly ? (TotalRequestCount - m_DownloadStats.RequestsCompleteCount.load()) : ((BytesToWrite + BytesToValidate) - (m_WrittenChunkByteCount.load() + m_ValidatedChunkByteCount.load())), .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); } CloneQuery.reset(); FilteredWrittenBytesPerSecond.Stop(); FilteredDownloadedBytesPerSecond.Stop(); WriteProgressBar.Finish(); if (m_AbortFlag) { return; } if (!m_Options.PrimeCacheOnly) { uint32_t RawSequencesMissingWriteCount = 0; for (uint32_t SequenceIndex = 0; SequenceIndex < SequenceIndexChunksLeftToWriteCounters.size(); SequenceIndex++) { const auto& SequenceIndexChunksLeftToWriteCounter = SequenceIndexChunksLeftToWriteCounters[SequenceIndex]; if (SequenceIndexChunksLeftToWriteCounter.load() != 0) { RawSequencesMissingWriteCount++; const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; const std::filesystem::path& IncompletePath = m_RemoteContent.Paths[PathIndex]; ZEN_ASSERT(!IncompletePath.empty()); const uint32_t ExpectedSequenceCount = m_RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]; if (!m_Options.IsQuiet) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "{}: Max count {}, Current count {}", IncompletePath, ExpectedSequenceCount, SequenceIndexChunksLeftToWriteCounter.load()); } ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounter.load() <= ExpectedSequenceCount); } } ZEN_ASSERT(RawSequencesMissingWriteCount == 0); ZEN_ASSERT(m_WrittenChunkByteCount == BytesToWrite); ZEN_ASSERT(m_ValidatedChunkByteCount == BytesToValidate); } const uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() + m_DownloadStats.DownloadedBlockByteCount.load() + m_DownloadStats.DownloadedPartialBlockByteCount.load(); if (!m_Options.IsQuiet) { std::string CloneDetails; if (m_DiskStats.CloneCount.load() > 0) { CloneDetails = fmt::format(" ({} cloned)", NiceBytes(m_DiskStats.CloneByteCount.load())); } ZEN_OPERATION_LOG_INFO( m_LogOutput, "Downloaded {} ({}bits/s) in {}. Wrote {} ({}B/s){} in {}. Completed in {}", NiceBytes(DownloadedBytes), NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), DownloadedBytes * 8)), NiceTimeSpanMs(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS() / 1000), NiceBytes(m_WrittenChunkByteCount.load()), NiceNum(GetBytesPerSecond(FilteredWrittenBytesPerSecond.GetElapsedTimeUS(), m_DiskStats.WriteByteCount.load())), CloneDetails, NiceTimeSpanMs(FilteredWrittenBytesPerSecond.GetElapsedTimeUS() / 1000), NiceTimeSpanMs(WriteTimer.GetElapsedTimeMs())); } m_WriteChunkStats.WriteChunksElapsedWallTimeUs = WriteTimer.GetElapsedTimeUs(); m_WriteChunkStats.DownloadTimeUs = FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(); m_WriteChunkStats.WriteTimeUs = FilteredWrittenBytesPerSecond.GetElapsedTimeUS(); } if (m_Options.PrimeCacheOnly) { return; } m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::PrepareTarget, (uint32_t)TaskSteps::StepCount); tsl::robin_map RemotePathIndexToLocalPathIndex; RemotePathIndexToLocalPathIndex.reserve(m_RemoteContent.Paths.size()); tsl::robin_map SequenceHashToLocalPathIndex; std::vector RemoveLocalPathIndexes; if (m_AbortFlag) { return; } { ZEN_TRACE_CPU("PrepareTarget"); tsl::robin_set CachedRemoteSequences; std::vector FilesToCache; uint64_t MatchCount = 0; uint64_t PathMismatchCount = 0; uint64_t HashMismatchCount = 0; std::atomic CachedCount = 0; std::atomic CachedByteCount = 0; uint64_t SkippedCount = 0; uint64_t DeleteCount = 0; for (uint32_t LocalPathIndex = 0; LocalPathIndex < m_LocalContent.Paths.size(); LocalPathIndex++) { if (m_AbortFlag) { break; } const IoHash& RawHash = m_LocalContent.RawHashes[LocalPathIndex]; const std::filesystem::path& LocalPath = m_LocalContent.Paths[LocalPathIndex]; ZEN_ASSERT_SLOW(IsFile((m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred())); if (m_Options.EnableTargetFolderScavenging) { if (!m_Options.WipeTargetFolder) { // Check if it is already in the correct place if (auto RemotePathIt = RemotePathToRemoteIndex.find(LocalPath.generic_string()); RemotePathIt != RemotePathToRemoteIndex.end()) { const uint32_t RemotePathIndex = RemotePathIt->second; if (m_RemoteContent.RawHashes[RemotePathIndex] == RawHash) { // It is already in it's correct place RemotePathIndexToLocalPathIndex[RemotePathIndex] = LocalPathIndex; SequenceHashToLocalPathIndex.insert({RawHash, LocalPathIndex}); MatchCount++; continue; } else { HashMismatchCount++; } } else { PathMismatchCount++; } } // Do we need it? if (m_RemoteLookup.RawHashToSequenceIndex.contains(RawHash)) { if (!CachedRemoteSequences.contains(RawHash)) { // We need it, make sure we move it to the cache FilesToCache.push_back(LocalPathIndex); CachedRemoteSequences.insert(RawHash); continue; } else { SkippedCount++; } } } if (!m_Options.WipeTargetFolder) { // Explicitly delete the unneeded local file RemoveLocalPathIndexes.push_back(LocalPathIndex); DeleteCount++; } } if (m_AbortFlag) { return; } { ZEN_TRACE_CPU("CopyToCache"); Stopwatch Timer; std::unique_ptr CacheLocalProgressBarPtr( m_LogOutput.CreateProgressBar("Cache Local Data")); OperationLogOutput::ProgressBar& CacheLocalProgressBar(*CacheLocalProgressBarPtr); ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); for (uint32_t LocalPathIndex : FilesToCache) { if (m_AbortFlag) { break; } Work.ScheduleWork(m_IOWorkerPool, [this, &CachedCount, &CachedByteCount, LocalPathIndex](std::atomic&) { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_CopyToCache"); const IoHash& RawHash = m_LocalContent.RawHashes[LocalPathIndex]; const std::filesystem::path& LocalPath = m_LocalContent.Paths[LocalPathIndex]; const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RawHash); ZEN_ASSERT_SLOW(!IsFileWithRetry(CacheFilePath)); const std::filesystem::path LocalFilePath = (m_Path / LocalPath).make_preferred(); std::error_code Ec = RenameFileWithRetry(LocalFilePath, CacheFilePath); if (Ec) { ZEN_OPERATION_LOG_WARN(m_LogOutput, "Failed to move file from '{}' to '{}', reason: ({}) {}, retrying...", LocalFilePath, CacheFilePath, Ec.value(), Ec.message()); Ec = RenameFileWithRetry(LocalFilePath, CacheFilePath); if (Ec) { throw std::system_error(std::error_code(Ec.value(), std::system_category()), fmt::format("Failed to file from '{}' to '{}', reason: ({}) {}", LocalFilePath, CacheFilePath, Ec.value(), Ec.message())); } } CachedCount++; CachedByteCount += m_LocalContent.RawSizes[LocalPathIndex]; } }); } { ZEN_TRACE_CPU("CopyToCache_Wait"); Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { ZEN_UNUSED(PendingWork); const uint64_t WorkTotal = FilesToCache.size(); const uint64_t WorkComplete = CachedCount.load(); std::string Details = fmt::format("{}/{} ({}) files", WorkComplete, WorkTotal, NiceBytes(CachedByteCount)); CacheLocalProgressBar.UpdateState( {.Task = "Caching local ", .Details = Details, .TotalCount = gsl::narrow(WorkTotal), .RemainingCount = gsl::narrow(WorkTotal - WorkComplete), .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); } CacheLocalProgressBar.Finish(); if (m_AbortFlag) { return; } ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Local state prep: Match: {}, PathMismatch: {}, HashMismatch: {}, Cached: {} ({}), Skipped: {}, " "Delete: {}", MatchCount, PathMismatchCount, HashMismatchCount, CachedCount.load(), NiceBytes(CachedByteCount.load()), SkippedCount, DeleteCount); } } m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::FinalizeTarget, (uint32_t)TaskSteps::StepCount); if (m_Options.WipeTargetFolder) { ZEN_TRACE_CPU("WipeTarget"); Stopwatch Timer; // Clean target folder if (!CleanDirectory(m_LogOutput, m_IOWorkerPool, m_AbortFlag, m_PauseFlag, m_Options.IsQuiet, m_Path, m_Options.ExcludeFolders)) { ZEN_OPERATION_LOG_WARN(m_LogOutput, "Some files in {} could not be removed", m_Path); } m_RebuildFolderStateStats.CleanFolderElapsedWallTimeUs = Timer.GetElapsedTimeUs(); } if (m_AbortFlag) { return; } { ZEN_TRACE_CPU("FinalizeTree"); Stopwatch Timer; std::unique_ptr RebuildProgressBarPtr(m_LogOutput.CreateProgressBar("Rebuild State")); OperationLogOutput::ProgressBar& RebuildProgressBar(*RebuildProgressBarPtr); ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); OutLocalFolderState.Paths.resize(m_RemoteContent.Paths.size()); OutLocalFolderState.RawSizes.resize(m_RemoteContent.Paths.size()); OutLocalFolderState.Attributes.resize(m_RemoteContent.Paths.size()); OutLocalFolderState.ModificationTicks.resize(m_RemoteContent.Paths.size()); std::atomic DeletedCount = 0; for (uint32_t LocalPathIndex : RemoveLocalPathIndexes) { if (m_AbortFlag) { break; } Work.ScheduleWork(m_IOWorkerPool, [this, &DeletedCount, LocalPathIndex](std::atomic&) { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_RemoveFile"); const std::filesystem::path LocalFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred(); SetFileReadOnlyWithRetry(LocalFilePath, false); RemoveFileWithRetry(LocalFilePath); DeletedCount++; } }); } std::atomic TargetsComplete = 0; struct FinalizeTarget { IoHash RawHash; uint32_t RemotePathIndex; }; std::vector Targets; Targets.reserve(m_RemoteContent.Paths.size()); for (uint32_t RemotePathIndex = 0; RemotePathIndex < m_RemoteContent.Paths.size(); RemotePathIndex++) { Targets.push_back( FinalizeTarget{.RawHash = m_RemoteContent.RawHashes[RemotePathIndex], .RemotePathIndex = RemotePathIndex}); } std::sort(Targets.begin(), Targets.end(), [](const FinalizeTarget& Lhs, const FinalizeTarget& Rhs) { if (Lhs.RawHash < Rhs.RawHash) { return true; } else if (Lhs.RawHash > Rhs.RawHash) { return false; } return Lhs.RemotePathIndex < Rhs.RemotePathIndex; }); size_t TargetOffset = 0; while (TargetOffset < Targets.size()) { if (m_AbortFlag) { break; } size_t TargetCount = 1; while ((TargetOffset + TargetCount) < Targets.size() && (Targets[TargetOffset + TargetCount].RawHash == Targets[TargetOffset].RawHash)) { TargetCount++; } Work.ScheduleWork( m_IOWorkerPool, [this, &SequenceHashToLocalPathIndex, &Targets, &RemotePathIndexToLocalPathIndex, &OutLocalFolderState, BaseTargetOffset = TargetOffset, TargetCount, &TargetsComplete](std::atomic&) { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_FinalizeChunkSequence"); size_t TargetOffset = BaseTargetOffset; const IoHash& RawHash = Targets[TargetOffset].RawHash; if (RawHash == IoHash::Zero) { ZEN_TRACE_CPU("CreateEmptyFiles"); while (TargetOffset < (BaseTargetOffset + TargetCount)) { const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex; ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash); const std::filesystem::path& TargetPath = m_RemoteContent.Paths[RemotePathIndex]; std::filesystem::path TargetFilePath = (m_Path / TargetPath).make_preferred(); if (!RemotePathIndexToLocalPathIndex[RemotePathIndex]) { if (IsFileWithRetry(TargetFilePath)) { SetFileReadOnlyWithRetry(TargetFilePath, false); } else { CreateDirectories(TargetFilePath.parent_path()); } BasicFile OutputFile; OutputFile.Open(TargetFilePath, BasicFile::Mode::kTruncate); } OutLocalFolderState.Paths[RemotePathIndex] = TargetPath; OutLocalFolderState.RawSizes[RemotePathIndex] = m_RemoteContent.RawSizes[RemotePathIndex]; OutLocalFolderState.Attributes[RemotePathIndex] = m_RemoteContent.Attributes.empty() ? GetNativeFileAttributes(TargetFilePath) : SetNativeFileAttributes(TargetFilePath, m_RemoteContent.Platform, m_RemoteContent.Attributes[RemotePathIndex]); OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath); TargetOffset++; TargetsComplete++; } } else { ZEN_TRACE_CPU("FinalizeFile"); ZEN_ASSERT(m_RemoteLookup.RawHashToSequenceIndex.contains(RawHash)); const uint32_t FirstRemotePathIndex = Targets[TargetOffset].RemotePathIndex; const std::filesystem::path& FirstTargetPath = m_RemoteContent.Paths[FirstRemotePathIndex]; std::filesystem::path FirstTargetFilePath = (m_Path / FirstTargetPath).make_preferred(); if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(FirstRemotePathIndex); InPlaceIt != RemotePathIndexToLocalPathIndex.end()) { ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath)); } else { if (IsFileWithRetry(FirstTargetFilePath)) { SetFileReadOnlyWithRetry(FirstTargetFilePath, false); } else { CreateDirectories(FirstTargetFilePath.parent_path()); } if (auto InplaceIt = SequenceHashToLocalPathIndex.find(RawHash); InplaceIt != SequenceHashToLocalPathIndex.end()) { ZEN_TRACE_CPU("Copy"); const uint32_t LocalPathIndex = InplaceIt->second; const std::filesystem::path& SourcePath = m_LocalContent.Paths[LocalPathIndex]; std::filesystem::path SourceFilePath = (m_Path / SourcePath).make_preferred(); ZEN_ASSERT_SLOW(IsFileWithRetry(SourceFilePath)); ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Copying from '{}' -> '{}'", SourceFilePath, FirstTargetFilePath); const uint64_t RawSize = m_LocalContent.RawSizes[LocalPathIndex]; FastCopyFile(m_Options.AllowFileClone, m_Options.UseSparseFiles, SourceFilePath, FirstTargetFilePath, RawSize, m_DiskStats.WriteCount, m_DiskStats.WriteByteCount, m_DiskStats.CloneCount, m_DiskStats.CloneByteCount); m_RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++; } else { ZEN_TRACE_CPU("Rename"); const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RawHash); ZEN_ASSERT_SLOW(IsFileWithRetry(CacheFilePath)); std::error_code Ec = RenameFileWithRetry(CacheFilePath, FirstTargetFilePath); if (Ec) { ZEN_OPERATION_LOG_WARN(m_LogOutput, "Failed to move file from '{}' to '{}', reason: ({}) {}, retrying...", CacheFilePath, FirstTargetFilePath, Ec.value(), Ec.message()); Ec = RenameFileWithRetry(CacheFilePath, FirstTargetFilePath); if (Ec) { throw std::system_error( std::error_code(Ec.value(), std::system_category()), fmt::format("Failed to move file from '{}' to '{}', reason: ({}) {}", CacheFilePath, FirstTargetFilePath, Ec.value(), Ec.message())); } } m_RebuildFolderStateStats.FinalizeTreeFilesMovedCount++; } } OutLocalFolderState.Paths[FirstRemotePathIndex] = FirstTargetPath; OutLocalFolderState.RawSizes[FirstRemotePathIndex] = m_RemoteContent.RawSizes[FirstRemotePathIndex]; OutLocalFolderState.Attributes[FirstRemotePathIndex] = m_RemoteContent.Attributes.empty() ? GetNativeFileAttributes(FirstTargetFilePath) : SetNativeFileAttributes(FirstTargetFilePath, m_RemoteContent.Platform, m_RemoteContent.Attributes[FirstRemotePathIndex]); OutLocalFolderState.ModificationTicks[FirstRemotePathIndex] = GetModificationTickFromPath(FirstTargetFilePath); TargetOffset++; TargetsComplete++; while (TargetOffset < (BaseTargetOffset + TargetCount)) { const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex; ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash); const std::filesystem::path& TargetPath = m_RemoteContent.Paths[RemotePathIndex]; std::filesystem::path TargetFilePath = (m_Path / TargetPath).make_preferred(); if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(RemotePathIndex); InPlaceIt != RemotePathIndexToLocalPathIndex.end()) { ZEN_ASSERT_SLOW(IsFileWithRetry(TargetFilePath)); } else { ZEN_TRACE_CPU("Copy"); if (IsFileWithRetry(TargetFilePath)) { SetFileReadOnlyWithRetry(TargetFilePath, false); } else { CreateDirectories(TargetFilePath.parent_path()); } ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath)); ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Copying from '{}' -> '{}'", FirstTargetFilePath, TargetFilePath); const uint64_t RawSize = m_RemoteContent.RawSizes[RemotePathIndex]; FastCopyFile(m_Options.AllowFileClone, m_Options.UseSparseFiles, FirstTargetFilePath, TargetFilePath, RawSize, m_DiskStats.WriteCount, m_DiskStats.WriteByteCount, m_DiskStats.CloneCount, m_DiskStats.CloneByteCount); m_RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++; } OutLocalFolderState.Paths[RemotePathIndex] = TargetPath; OutLocalFolderState.RawSizes[RemotePathIndex] = m_RemoteContent.RawSizes[RemotePathIndex]; OutLocalFolderState.Attributes[RemotePathIndex] = m_RemoteContent.Attributes.empty() ? GetNativeFileAttributes(TargetFilePath) : SetNativeFileAttributes(TargetFilePath, m_RemoteContent.Platform, m_RemoteContent.Attributes[RemotePathIndex]); OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath); TargetOffset++; TargetsComplete++; } } } }); TargetOffset += TargetCount; } { ZEN_TRACE_CPU("FinalizeTree_Wait"); Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { ZEN_UNUSED(PendingWork); const uint64_t WorkTotal = Targets.size() + RemoveLocalPathIndexes.size(); const uint64_t WorkComplete = TargetsComplete.load() + DeletedCount.load(); std::string Details = fmt::format("{}/{} files", WorkComplete, WorkTotal); RebuildProgressBar.UpdateState({.Task = "Rebuilding state ", .Details = Details, .TotalCount = gsl::narrow(WorkTotal), .RemainingCount = gsl::narrow(WorkTotal - WorkComplete), .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); } m_RebuildFolderStateStats.FinalizeTreeElapsedWallTimeUs = Timer.GetElapsedTimeUs(); RebuildProgressBar.Finish(); } m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::Cleanup, (uint32_t)TaskSteps::StepCount); } catch (const std::exception&) { m_AbortFlag = true; throw; } } void BuildsOperationUpdateFolder::ScanCacheFolder(tsl::robin_map& OutCachedChunkHashesFound, tsl::robin_map& OutCachedSequenceHashesFound) { ZEN_TRACE_CPU("ScanCacheFolder"); Stopwatch CacheTimer; DirectoryContent CacheDirContent; GetDirectoryContent(m_CacheFolderPath, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeFileSizes, CacheDirContent); for (size_t Index = 0; Index < CacheDirContent.Files.size(); Index++) { if (m_Options.EnableTargetFolderScavenging) { IoHash FileHash; if (IoHash::TryParse(CacheDirContent.Files[Index].filename().string(), FileHash)) { if (auto ChunkIt = m_RemoteLookup.ChunkHashToChunkIndex.find(FileHash); ChunkIt != m_RemoteLookup.ChunkHashToChunkIndex.end()) { const uint32_t ChunkIndex = ChunkIt->second; const uint64_t ChunkSize = m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; if (ChunkSize == CacheDirContent.FileSizes[Index]) { OutCachedChunkHashesFound.insert({FileHash, ChunkIndex}); m_CacheMappingStats.CacheChunkCount++; m_CacheMappingStats.CacheChunkByteCount += ChunkSize; continue; } } else if (auto SequenceIt = m_RemoteLookup.RawHashToSequenceIndex.find(FileHash); SequenceIt != m_RemoteLookup.RawHashToSequenceIndex.end()) { const uint32_t SequenceIndex = SequenceIt->second; const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; const uint64_t SequenceSize = m_RemoteContent.RawSizes[PathIndex]; if (SequenceSize == CacheDirContent.FileSizes[Index]) { OutCachedSequenceHashesFound.insert({FileHash, SequenceIndex}); m_CacheMappingStats.CacheSequenceHashesCount++; m_CacheMappingStats.CacheSequenceHashesByteCount += SequenceSize; const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); ZEN_ASSERT_SLOW(IsFile(CacheFilePath)); continue; } } } } std::error_code Ec = TryRemoveFile(CacheDirContent.Files[Index]); if (Ec) { ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Failed removing file '{}', reason: ({}) {}", CacheDirContent.Files[Index], Ec.value(), Ec.message()); } } m_CacheMappingStats.CacheScanElapsedWallTimeUs += CacheTimer.GetElapsedTimeUs(); } void BuildsOperationUpdateFolder::ScanTempBlocksFolder(tsl::robin_map& OutCachedBlocksFound) { ZEN_TRACE_CPU("ScanTempBlocksFolder"); Stopwatch CacheTimer; tsl::robin_map AllBlockSizes; AllBlockSizes.reserve(m_BlockDescriptions.size()); for (uint32_t BlockIndex = 0; BlockIndex < m_BlockDescriptions.size(); BlockIndex++) { const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; AllBlockSizes.insert({BlockDescription.BlockHash, BlockIndex}); } DirectoryContent BlockDirContent; GetDirectoryContent(m_TempBlockFolderPath, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeFileSizes, BlockDirContent); OutCachedBlocksFound.reserve(BlockDirContent.Files.size()); for (size_t Index = 0; Index < BlockDirContent.Files.size(); Index++) { if (m_Options.EnableTargetFolderScavenging) { IoHash FileHash; if (IoHash::TryParse(BlockDirContent.Files[Index].filename().string(), FileHash)) { if (auto BlockIt = AllBlockSizes.find(FileHash); BlockIt != AllBlockSizes.end()) { const uint32_t BlockIndex = BlockIt->second; const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; uint64_t BlockSize = CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize; for (uint64_t ChunkSize : BlockDescription.ChunkCompressedLengths) { BlockSize += ChunkSize; } if (BlockSize == BlockDirContent.FileSizes[Index]) { OutCachedBlocksFound.insert({FileHash, BlockIndex}); m_CacheMappingStats.CacheBlockCount++; m_CacheMappingStats.CacheBlocksByteCount += BlockSize; continue; } } } } std::error_code Ec = TryRemoveFile(BlockDirContent.Files[Index]); if (Ec) { ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Failed removing file '{}', reason: ({}) {}", BlockDirContent.Files[Index], Ec.value(), Ec.message()); } } m_CacheMappingStats.CacheScanElapsedWallTimeUs += CacheTimer.GetElapsedTimeUs(); } std::vector BuildsOperationUpdateFolder::FindScavengeSources() { ZEN_TRACE_CPU("FindScavengeSources"); const bool TargetPathExists = IsDir(m_Path); std::vector StatePaths = GetDownloadedStatePaths(m_Options.SystemRootDir); std::vector Result; for (const std::filesystem::path& EntryPath : StatePaths) { if (IsFile(EntryPath)) { bool DeleteEntry = false; try { BuildsDownloadInfo Info = ReadDownloadedInfoFile(EntryPath); const bool LocalPathExists = !Info.LocalPath.empty() && IsDir(Info.LocalPath); const bool LocalStateFileExists = IsFile(Info.StateFilePath); if (LocalPathExists && LocalStateFileExists) { if (TargetPathExists && std::filesystem::equivalent(Info.LocalPath, m_Path)) { DeleteEntry = true; } else { Result.push_back({.StateFilePath = std::move(Info.StateFilePath), .Path = std::move(Info.LocalPath)}); } } else { DeleteEntry = true; } } catch (const std::exception& Ex) { ZEN_OPERATION_LOG_WARN(m_LogOutput, "{}", Ex.what()); DeleteEntry = true; } if (DeleteEntry) { std::error_code DummyEc; std::filesystem::remove(EntryPath, DummyEc); } } } return Result; } std::vector BuildsOperationUpdateFolder::ScanTargetFolder(const tsl::robin_map& CachedChunkHashesFound, const tsl::robin_map& CachedSequenceHashesFound) { ZEN_TRACE_CPU("ScanTargetFolder"); Stopwatch LocalTimer; std::vector MissingSequenceIndexes; for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < m_RemoteContent.ChunkedContent.SequenceRawHashes.size(); RemoteSequenceIndex++) { const IoHash& RemoteSequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; const uint32_t RemotePathIndex = GetFirstPathIndexForSeqeuenceIndex(m_RemoteLookup, RemoteSequenceIndex); const uint64_t RemoteRawSize = m_RemoteContent.RawSizes[RemotePathIndex]; if (auto CacheSequenceIt = CachedSequenceHashesFound.find(RemoteSequenceRawHash); CacheSequenceIt != CachedSequenceHashesFound.end()) { const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RemoteSequenceRawHash); ZEN_ASSERT_SLOW(IsFile(CacheFilePath)); if (m_Options.IsVerbose) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Found sequence {} at {} ({})", RemoteSequenceRawHash, CacheFilePath, NiceBytes(RemoteRawSize)); } } else if (auto CacheChunkIt = CachedChunkHashesFound.find(RemoteSequenceRawHash); CacheChunkIt != CachedChunkHashesFound.end()) { const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RemoteSequenceRawHash); ZEN_ASSERT_SLOW(IsFile(CacheFilePath)); if (m_Options.IsVerbose) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Found chunk {} at {} ({})", RemoteSequenceRawHash, CacheFilePath, NiceBytes(RemoteRawSize)); } } else if (auto It = m_LocalLookup.RawHashToSequenceIndex.find(RemoteSequenceRawHash); It != m_LocalLookup.RawHashToSequenceIndex.end()) { const uint32_t LocalSequenceIndex = It->second; const uint32_t LocalPathIndex = GetFirstPathIndexForSeqeuenceIndex(m_LocalLookup, LocalSequenceIndex); const std::filesystem::path LocalFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred(); ZEN_ASSERT_SLOW(IsFile(LocalFilePath)); m_CacheMappingStats.LocalPathsMatchingSequencesCount++; m_CacheMappingStats.LocalPathsMatchingSequencesByteCount += RemoteRawSize; if (m_Options.IsVerbose) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Found sequence {} at {} ({})", RemoteSequenceRawHash, LocalFilePath, NiceBytes(RemoteRawSize)); } } else { MissingSequenceIndexes.push_back(RemoteSequenceIndex); } } m_CacheMappingStats.LocalScanElapsedWallTimeUs += LocalTimer.GetElapsedTimeUs(); return MissingSequenceIndexes; } bool BuildsOperationUpdateFolder::FindScavengeContent(const ScavengeSource& Source, ChunkedFolderContent& OutScavengedLocalContent, ChunkedContentLookup& OutScavengedLookup) { ZEN_TRACE_CPU("FindScavengeContent"); FolderContent LocalFolderState; try { BuildSaveState SavedState = ReadBuildSaveStateFile(Source.StateFilePath); OutScavengedLocalContent = std::move(SavedState.State.ChunkedContent); LocalFolderState = std::move(SavedState.FolderState); } catch (const std::exception& Ex) { ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Skipping invalid build state at '{}', reason: {}", Source.StateFilePath, Ex.what()); return false; } tsl::robin_set PathIndexesToScavenge; PathIndexesToScavenge.reserve(OutScavengedLocalContent.Paths.size()); std::vector ChunkOrderOffsets = BuildChunkOrderOffset(OutScavengedLocalContent.ChunkedContent.ChunkCounts); { tsl::robin_map RawHashToPathIndex; RawHashToPathIndex.reserve(OutScavengedLocalContent.Paths.size()); for (uint32_t ScavengedPathIndex = 0; ScavengedPathIndex < OutScavengedLocalContent.RawHashes.size(); ScavengedPathIndex++) { if (!RawHashToPathIndex.contains(OutScavengedLocalContent.RawHashes[ScavengedPathIndex])) { RawHashToPathIndex.insert_or_assign(OutScavengedLocalContent.RawHashes[ScavengedPathIndex], ScavengedPathIndex); } } for (uint32_t ScavengeSequenceIndex = 0; ScavengeSequenceIndex < OutScavengedLocalContent.ChunkedContent.SequenceRawHashes.size(); ScavengeSequenceIndex++) { const IoHash& SequenceHash = OutScavengedLocalContent.ChunkedContent.SequenceRawHashes[ScavengeSequenceIndex]; if (auto It = RawHashToPathIndex.find(SequenceHash); It != RawHashToPathIndex.end()) { uint32_t PathIndex = It->second; if (!PathIndexesToScavenge.contains(PathIndex)) { if (m_RemoteLookup.RawHashToSequenceIndex.contains(SequenceHash)) { PathIndexesToScavenge.insert(PathIndex); } else { uint32_t ChunkOrderIndexStart = ChunkOrderOffsets[ScavengeSequenceIndex]; const uint32_t ChunkCount = OutScavengedLocalContent.ChunkedContent.ChunkCounts[ScavengeSequenceIndex]; for (uint32_t ChunkOrderIndex = 0; ChunkOrderIndex < ChunkCount; ChunkOrderIndex++) { const uint32_t ChunkIndex = OutScavengedLocalContent.ChunkedContent.ChunkOrders[ChunkOrderIndexStart + ChunkOrderIndex]; const IoHash& ChunkHash = OutScavengedLocalContent.ChunkedContent.ChunkHashes[ChunkIndex]; if (m_RemoteLookup.ChunkHashToChunkIndex.contains(ChunkHash)) { PathIndexesToScavenge.insert(PathIndex); break; } } } } } else { ZEN_OPERATION_LOG_WARN(m_LogOutput, "Scavenged state file at '{}' for '{}' is invalid, skipping scavenging for sequence {}", Source.StateFilePath, Source.Path, SequenceHash); } } } if (PathIndexesToScavenge.empty()) { OutScavengedLocalContent = {}; return false; } std::vector PathsToScavenge; PathsToScavenge.reserve(PathIndexesToScavenge.size()); for (uint32_t ScavengedStatePathIndex : PathIndexesToScavenge) { PathsToScavenge.push_back(OutScavengedLocalContent.Paths[ScavengedStatePathIndex]); } FolderContent ValidFolderContent = GetValidFolderContent(m_IOWorkerPool, m_ScavengedFolderScanStats, Source.Path, PathsToScavenge, {}, 0, m_AbortFlag, m_PauseFlag); if (!LocalFolderState.AreKnownFilesEqual(ValidFolderContent)) { std::vector DeletedPaths; FolderContent UpdatedContent = GetUpdatedContent(LocalFolderState, ValidFolderContent, DeletedPaths); // If the files are modified since the state was saved we ignore the files since we don't // want to incur the cost of scanning/hashing scavenged files DeletedPaths.insert(DeletedPaths.end(), UpdatedContent.Paths.begin(), UpdatedContent.Paths.end()); if (!DeletedPaths.empty()) { OutScavengedLocalContent = DeletePathsFromChunkedContent(OutScavengedLocalContent, BuildHashLookup(OutScavengedLocalContent.ChunkedContent.SequenceRawHashes), ChunkOrderOffsets, DeletedPaths); } } if (OutScavengedLocalContent.Paths.empty()) { OutScavengedLocalContent = {}; return false; } OutScavengedLookup = BuildChunkedContentLookup(OutScavengedLocalContent); return true; } void BuildsOperationUpdateFolder::ScavengeSourceForChunks(uint32_t& InOutRemainingChunkCount, std::vector& InOutRemoteChunkIndexNeedsCopyFromLocalFileFlags, tsl::robin_map& InOutRawHashToCopyChunkDataIndex, const std::vector>& SequenceIndexChunksLeftToWriteCounters, const ChunkedFolderContent& ScavengedContent, const ChunkedContentLookup& ScavengedLookup, std::vector& InOutCopyChunkDatas, uint32_t ScavengedContentIndex, uint64_t& InOutChunkMatchingRemoteCount, uint64_t& InOutChunkMatchingRemoteByteCount) { for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < m_RemoteContent.ChunkedContent.ChunkHashes.size() && (InOutRemainingChunkCount > 0); RemoteChunkIndex++) { if (!InOutRemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) { const IoHash& RemoteChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; if (auto It = ScavengedLookup.ChunkHashToChunkIndex.find(RemoteChunkHash); It != ScavengedLookup.ChunkHashToChunkIndex.end()) { std::vector ChunkTargetPtrs = GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); if (!ChunkTargetPtrs.empty()) { const uint32_t ScavengedChunkIndex = It->second; const uint64_t ScavengedChunkRawSize = ScavengedContent.ChunkedContent.ChunkRawSizes[ScavengedChunkIndex]; const size_t ChunkSequenceLocationOffset = ScavengedLookup.ChunkSequenceLocationOffset[ScavengedChunkIndex]; const ChunkedContentLookup::ChunkSequenceLocation& ScavengeLocation = ScavengedLookup.ChunkSequenceLocations[ChunkSequenceLocationOffset]; const IoHash& ScavengedSequenceRawHash = ScavengedContent.ChunkedContent.SequenceRawHashes[ScavengeLocation.SequenceIndex]; CopyChunkData::ChunkTarget Target = {.TargetChunkLocationCount = gsl::narrow(ChunkTargetPtrs.size()), .RemoteChunkIndex = RemoteChunkIndex, .CacheFileOffset = ScavengeLocation.Offset}; if (auto CopySourceIt = InOutRawHashToCopyChunkDataIndex.find(ScavengedSequenceRawHash); CopySourceIt != InOutRawHashToCopyChunkDataIndex.end()) { CopyChunkData& Data = InOutCopyChunkDatas[CopySourceIt->second]; if (Data.TargetChunkLocationPtrs.size() > 1024) { InOutRawHashToCopyChunkDataIndex.insert_or_assign(ScavengedSequenceRawHash, InOutCopyChunkDatas.size()); InOutCopyChunkDatas.push_back(CopyChunkData{.ScavengeSourceIndex = ScavengedContentIndex, .SourceSequenceIndex = ScavengeLocation.SequenceIndex, .TargetChunkLocationPtrs = ChunkTargetPtrs, .ChunkTargets = std::vector{Target}}); } else { Data.TargetChunkLocationPtrs.insert(Data.TargetChunkLocationPtrs.end(), ChunkTargetPtrs.begin(), ChunkTargetPtrs.end()); Data.ChunkTargets.push_back(Target); } } else { InOutRawHashToCopyChunkDataIndex.insert_or_assign(ScavengedSequenceRawHash, InOutCopyChunkDatas.size()); InOutCopyChunkDatas.push_back(CopyChunkData{.ScavengeSourceIndex = ScavengedContentIndex, .SourceSequenceIndex = ScavengeLocation.SequenceIndex, .TargetChunkLocationPtrs = ChunkTargetPtrs, .ChunkTargets = std::vector{Target}}); } InOutChunkMatchingRemoteCount++; InOutChunkMatchingRemoteByteCount += ScavengedChunkRawSize; InOutRemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex] = true; InOutRemainingChunkCount--; } } } } } std::filesystem::path BuildsOperationUpdateFolder::FindDownloadedChunk(const IoHash& ChunkHash) { ZEN_TRACE_CPU("FindDownloadedChunk"); std::filesystem::path CompressedChunkPath = m_TempDownloadFolderPath / ChunkHash.ToHexString(); if (IsFile(CompressedChunkPath)) { IoBuffer ExistingCompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); if (ExistingCompressedPart) { IoHash RawHash; uint64_t RawSize; if (CompressedBuffer::ValidateCompressedHeader(ExistingCompressedPart, RawHash, RawSize, /*OutOptionalTotalCompressedSize*/ nullptr)) { return CompressedChunkPath; } else { std::error_code DummyEc; RemoveFile(CompressedChunkPath, DummyEc); } } } return {}; } std::vector BuildsOperationUpdateFolder::GetRemainingChunkTargets(std::span> SequenceIndexChunksLeftToWriteCounters, uint32_t ChunkIndex) { ZEN_TRACE_CPU("GetRemainingChunkTargets"); std::span ChunkSources = GetChunkSequenceLocations(m_RemoteLookup, ChunkIndex); std::vector ChunkTargetPtrs; if (!ChunkSources.empty()) { ChunkTargetPtrs.reserve(ChunkSources.size()); for (const ChunkedContentLookup::ChunkSequenceLocation& Source : ChunkSources) { if (SequenceIndexChunksLeftToWriteCounters[Source.SequenceIndex].load() > 0) { ChunkTargetPtrs.push_back(&Source); } } } return ChunkTargetPtrs; }; uint64_t BuildsOperationUpdateFolder::GetChunkWriteCount(std::span> SequenceIndexChunksLeftToWriteCounters, uint32_t ChunkIndex) { ZEN_TRACE_CPU("GetChunkWriteCount"); uint64_t WriteCount = 0; std::span ChunkSources = GetChunkSequenceLocations(m_RemoteLookup, ChunkIndex); for (const ChunkedContentLookup::ChunkSequenceLocation& Source : ChunkSources) { if (SequenceIndexChunksLeftToWriteCounters[Source.SequenceIndex].load() > 0) { WriteCount++; } } return WriteCount; }; void BuildsOperationUpdateFolder::CheckRequiredDiskSpace(const tsl::robin_map& RemotePathToRemoteIndex) { tsl::robin_set ExistingRemotePaths; if (m_Options.EnableTargetFolderScavenging) { for (uint32_t LocalPathIndex = 0; LocalPathIndex < m_LocalContent.Paths.size(); LocalPathIndex++) { const IoHash& RawHash = m_LocalContent.RawHashes[LocalPathIndex]; const std::filesystem::path& LocalPath = m_LocalContent.Paths[LocalPathIndex]; if (auto RemotePathIt = RemotePathToRemoteIndex.find(LocalPath.generic_string()); RemotePathIt != RemotePathToRemoteIndex.end()) { const uint32_t RemotePathIndex = RemotePathIt->second; if (m_RemoteContent.RawHashes[RemotePathIndex] == RawHash) { ExistingRemotePaths.insert(RemotePathIndex); } } } } uint64_t RequiredSpace = 0; for (uint32_t RemotePathIndex = 0; RemotePathIndex < m_RemoteContent.Paths.size(); RemotePathIndex++) { if (!ExistingRemotePaths.contains(RemotePathIndex)) { RequiredSpace += m_RemoteContent.RawSizes[RemotePathIndex]; } } std::error_code Ec; DiskSpace Space = DiskSpaceInfo(m_Path, Ec); if (Ec) { throw std::runtime_error(fmt::format("Get free disk space for target path '{}' FAILED, reason: {}", m_Path, Ec.message())); } if (Space.Free < (RequiredSpace + 16u * 1024u * 1024u)) { throw std::runtime_error( fmt::format("Not enough free space for target path '{}', {} of free space is needed", m_Path, RequiredSpace)); } } void BuildsOperationUpdateFolder::WriteScavengedSequenceToCache(const std::filesystem::path& ScavengeRootPath, const ChunkedFolderContent& ScavengedContent, const ScavengedSequenceCopyOperation& ScavengeOp) { ZEN_TRACE_CPU("WriteScavengedSequenceToCache"); const std::filesystem::path ScavengedPath = ScavengedContent.Paths[ScavengeOp.ScavengedPathIndex]; const std::filesystem::path ScavengedFilePath = (ScavengeRootPath / ScavengedPath).make_preferred(); ZEN_ASSERT_SLOW(FileSizeFromPath(ScavengedFilePath) == ScavengeOp.RawSize); const IoHash& RemoteSequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[ScavengeOp.RemoteSequenceIndex]; const std::filesystem::path TempFilePath = GetTempChunkedSequenceFileName(m_CacheFolderPath, RemoteSequenceRawHash); const uint64_t RawSize = ScavengedContent.RawSizes[ScavengeOp.ScavengedPathIndex]; FastCopyFile(m_Options.AllowFileClone, m_Options.UseSparseFiles, ScavengedFilePath, TempFilePath, RawSize, m_DiskStats.WriteCount, m_DiskStats.WriteByteCount, m_DiskStats.CloneCount, m_DiskStats.CloneByteCount); const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RemoteSequenceRawHash); RenameFile(TempFilePath, CacheFilePath); m_WrittenChunkByteCount += RawSize; if (m_Options.ValidateCompletedSequences) { m_ValidatedChunkByteCount += RawSize; } } void BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkIndex, const BlobsExistsResult& ExistsResult, std::span> SequenceIndexChunksLeftToWriteCounters, std::atomic& WritePartsComplete, std::vector&& ChunkTargetPtrs, BufferedWriteFileCache& WriteCache, ParallelWork& Work, uint64_t TotalRequestCount, uint64_t TotalPartWriteCount, FilteredRate& FilteredDownloadedBytesPerSecond, FilteredRate& FilteredWrittenBytesPerSecond) { std::filesystem::path ExistingCompressedChunkPath; if (!m_Options.PrimeCacheOnly) { const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; ExistingCompressedChunkPath = FindDownloadedChunk(ChunkHash); if (!ExistingCompressedChunkPath.empty()) { m_DownloadStats.RequestsCompleteCount++; if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) { FilteredDownloadedBytesPerSecond.Stop(); } } } if (!m_AbortFlag) { if (!ExistingCompressedChunkPath.empty()) { Work.ScheduleWork( m_IOWorkerPool, [this, SequenceIndexChunksLeftToWriteCounters, &WriteCache, &Work, &WritePartsComplete, TotalPartWriteCount, &FilteredWrittenBytesPerSecond, RemoteChunkIndex, ChunkTargetPtrs = std::move(ChunkTargetPtrs), CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic& AbortFlag) mutable { if (!AbortFlag) { ZEN_TRACE_CPU("Async_WritePreDownloadedChunk"); FilteredWrittenBytesPerSecond.Start(); const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); if (!CompressedPart) { throw std::runtime_error( fmt::format("Could not open dowloaded compressed chunk {} from {}", ChunkHash, CompressedChunkPath)); } bool NeedHashVerify = WriteCompressedChunkToCache(ChunkHash, ChunkTargetPtrs, WriteCache, std::move(CompressedPart)); WritePartsComplete++; if (!AbortFlag) { if (WritePartsComplete == TotalPartWriteCount) { FilteredWrittenBytesPerSecond.Stop(); } std::error_code Ec = TryRemoveFile(CompressedChunkPath); if (Ec) { ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Failed removing file '{}', reason: ({}) {}", CompressedChunkPath, Ec.value(), Ec.message()); } std::vector CompletedSequences = CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); WriteCache.Close(CompletedSequences); if (NeedHashVerify) { VerifyAndCompleteChunkSequencesAsync(CompletedSequences, Work); } else { FinalizeChunkSequences(CompletedSequences); } } } }); } else { Work.ScheduleWork(m_NetworkPool, [this, &ExistsResult, SequenceIndexChunksLeftToWriteCounters, &WriteCache, &Work, &WritePartsComplete, TotalPartWriteCount, TotalRequestCount, &FilteredDownloadedBytesPerSecond, &FilteredWrittenBytesPerSecond, RemoteChunkIndex, ChunkTargetPtrs = std::vector( std::move(ChunkTargetPtrs))](std::atomic&) mutable { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_DownloadChunk"); FilteredDownloadedBytesPerSecond.Start(); DownloadBuildBlob(RemoteChunkIndex, ExistsResult, Work, [this, &ExistsResult, SequenceIndexChunksLeftToWriteCounters, &WriteCache, &Work, &WritePartsComplete, TotalPartWriteCount, TotalRequestCount, RemoteChunkIndex, &FilteredDownloadedBytesPerSecond, &FilteredWrittenBytesPerSecond, ChunkTargetPtrs = std::move(ChunkTargetPtrs)](IoBuffer&& Payload) mutable { if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) { FilteredDownloadedBytesPerSecond.Stop(); } IoBufferFileReference FileRef; bool EnableBacklog = Payload.GetFileReference(FileRef); AsyncWriteDownloadedChunk(m_Options.ZenFolderPath, RemoteChunkIndex, std::move(ChunkTargetPtrs), WriteCache, Work, std::move(Payload), SequenceIndexChunksLeftToWriteCounters, WritePartsComplete, TotalPartWriteCount, FilteredWrittenBytesPerSecond, EnableBacklog); }); } }); } } } void BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkIndex, const BlobsExistsResult& ExistsResult, ParallelWork& Work, std::function&& OnDownloaded) { const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; // FilteredDownloadedBytesPerSecond.Start(); IoBuffer BuildBlob; const bool ExistsInCache = m_Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash); if (ExistsInCache) { BuildBlob = m_Storage.BuildCacheStorage->GetBuildBlob(m_BuildId, ChunkHash); } if (BuildBlob) { uint64_t BlobSize = BuildBlob.GetSize(); m_DownloadStats.DownloadedChunkCount++; m_DownloadStats.DownloadedChunkByteCount += BlobSize; m_DownloadStats.RequestsCompleteCount++; OnDownloaded(std::move(BuildBlob)); } else { if (m_RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= m_Options.LargeAttachmentSize) { DownloadLargeBlob( *m_Storage.BuildStorage, m_TempDownloadFolderPath, m_BuildId, ChunkHash, m_Options.PreferredMultipartChunkSize, Work, m_NetworkPool, m_DownloadStats.DownloadedChunkByteCount, m_DownloadStats.MultipartAttachmentCount, [this, &Work, ChunkHash, RemoteChunkIndex, OnDownloaded = std::move(OnDownloaded)](IoBuffer&& Payload) mutable { m_DownloadStats.DownloadedChunkCount++; m_DownloadStats.RequestsCompleteCount++; if (Payload && m_Storage.BuildCacheStorage && m_Options.PopulateCache) { m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, ChunkHash, ZenContentType::kCompressedBinary, CompositeBuffer(SharedBuffer(Payload))); } OnDownloaded(std::move(Payload)); }); } else { BuildBlob = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, ChunkHash); if (BuildBlob && m_Storage.BuildCacheStorage && m_Options.PopulateCache) { m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, ChunkHash, ZenContentType::kCompressedBinary, CompositeBuffer(SharedBuffer(BuildBlob))); } if (!BuildBlob) { throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash)); } if (!m_Options.PrimeCacheOnly) { if (!m_AbortFlag) { uint64_t BlobSize = BuildBlob.GetSize(); m_DownloadStats.DownloadedChunkCount++; m_DownloadStats.DownloadedChunkByteCount += BlobSize; m_DownloadStats.RequestsCompleteCount++; OnDownloaded(std::move(BuildBlob)); } } } } } void BuildsOperationUpdateFolder::DownloadPartialBlock( const ChunkBlockAnalyser::BlockRangeDescriptor BlockRange, const BlobsExistsResult& ExistsResult, std::function&& OnDownloaded) { const uint32_t BlockIndex = BlockRange.BlockIndex; const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; IoBuffer BlockBuffer; if (m_Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash)) { BlockBuffer = m_Storage.BuildCacheStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength); } if (!BlockBuffer) { BlockBuffer = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength); } if (!BlockBuffer) { throw std::runtime_error(fmt::format("Block {} is missing when fetching range {} -> {}", BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeStart + BlockRange.RangeLength)); } if (!m_AbortFlag) { uint64_t BlockSize = BlockBuffer.GetSize(); m_DownloadStats.DownloadedBlockCount++; m_DownloadStats.DownloadedBlockByteCount += BlockSize; m_DownloadStats.RequestsCompleteCount++; std::filesystem::path BlockChunkPath; // Check if the dowloaded block is file based and we can move it directly without rewriting it { IoBufferFileReference FileRef; if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == BlockSize)) { ZEN_TRACE_CPU("MoveTempPartialBlock"); std::error_code Ec; std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); if (!Ec) { BlockBuffer.SetDeleteOnClose(false); BlockBuffer = {}; BlockChunkPath = m_TempBlockFolderPath / fmt::format("{}_{:x}_{:x}", BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength); RenameFile(TempBlobPath, BlockChunkPath, Ec); if (Ec) { BlockChunkPath = std::filesystem::path{}; // Re-open the temp file again BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); BlockBuffer.SetDeleteOnClose(true); } } } } if (BlockChunkPath.empty() && (BlockSize > m_Options.MaximumInMemoryPayloadSize)) { ZEN_TRACE_CPU("WriteTempPartialBlock"); // Could not be moved and rather large, lets store it on disk BlockChunkPath = m_TempBlockFolderPath / fmt::format("{}_{:x}_{:x}", BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength); TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); BlockBuffer = {}; } if (!m_AbortFlag) { OnDownloaded(std::move(BlockBuffer), std::move(BlockChunkPath)); } } } std::vector BuildsOperationUpdateFolder::WriteLocalChunkToCache(CloneQueryInterface* CloneQuery, const CopyChunkData& CopyData, const std::vector& ScavengedContents, const std::vector& ScavengedLookups, const std::vector& ScavengedPaths, BufferedWriteFileCache& WriteCache) { ZEN_TRACE_CPU("WriteLocalChunkToCache"); std::filesystem::path SourceFilePath; if (CopyData.ScavengeSourceIndex == (uint32_t)-1) { const uint32_t LocalPathIndex = m_LocalLookup.SequenceIndexFirstPathIndex[CopyData.SourceSequenceIndex]; SourceFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred(); } else { const ChunkedFolderContent& ScavengedContent = ScavengedContents[CopyData.ScavengeSourceIndex]; const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[CopyData.ScavengeSourceIndex]; const std::filesystem::path ScavengedPath = ScavengedPaths[CopyData.ScavengeSourceIndex]; const uint32_t ScavengedPathIndex = ScavengedLookup.SequenceIndexFirstPathIndex[CopyData.SourceSequenceIndex]; SourceFilePath = (ScavengedPath / ScavengedContent.Paths[ScavengedPathIndex]).make_preferred(); } ZEN_ASSERT_SLOW(IsFile(SourceFilePath)); ZEN_ASSERT(!CopyData.TargetChunkLocationPtrs.empty()); uint64_t CacheLocalFileBytesRead = 0; size_t TargetStart = 0; const std::span AllTargets(CopyData.TargetChunkLocationPtrs); struct WriteOp { const ChunkedContentLookup::ChunkSequenceLocation* Target = nullptr; uint64_t CacheFileOffset = (uint64_t)-1; uint32_t ChunkIndex = (uint32_t)-1; }; std::vector WriteOps; if (!m_AbortFlag) { ZEN_TRACE_CPU("Sort"); WriteOps.reserve(AllTargets.size()); for (const CopyChunkData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets) { std::span TargetRange = AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount); for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange) { WriteOps.push_back( WriteOp{.Target = Target, .CacheFileOffset = ChunkTarget.CacheFileOffset, .ChunkIndex = ChunkTarget.RemoteChunkIndex}); } TargetStart += ChunkTarget.TargetChunkLocationCount; } std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) { if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex) { return true; } else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex) { return false; } if (Lhs.Target->Offset < Rhs.Target->Offset) { return true; } return false; }); } if (!m_AbortFlag) { ZEN_TRACE_CPU("Write"); tsl::robin_set ChunkIndexesWritten; BufferedOpenFile SourceFile(SourceFilePath, m_DiskStats.OpenReadCount, m_DiskStats.CurrentOpenFileCount, m_DiskStats.ReadCount, m_DiskStats.ReadByteCount); bool CanCloneSource = CloneQuery && CloneQuery->CanClone(SourceFile.Handle()); BufferedWriteFileCache::Local LocalWriter(WriteCache); for (size_t WriteOpIndex = 0; WriteOpIndex < WriteOps.size();) { if (m_AbortFlag) { break; } const WriteOp& Op = WriteOps[WriteOpIndex]; const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex; const uint32_t RemotePathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]; const uint64_t TargetSize = m_RemoteContent.RawSizes[RemotePathIndex]; const uint64_t ChunkSize = m_RemoteContent.ChunkedContent.ChunkRawSizes[Op.ChunkIndex]; uint64_t ReadLength = ChunkSize; size_t WriteCount = 1; uint64_t OpSourceEnd = Op.CacheFileOffset + ChunkSize; uint64_t OpTargetEnd = Op.Target->Offset + ChunkSize; while ((WriteOpIndex + WriteCount) < WriteOps.size()) { const WriteOp& NextOp = WriteOps[WriteOpIndex + WriteCount]; if (NextOp.Target->SequenceIndex != Op.Target->SequenceIndex) { break; } if (NextOp.Target->Offset != OpTargetEnd) { break; } if (NextOp.CacheFileOffset != OpSourceEnd) { break; } const uint64_t NextChunkLength = m_RemoteContent.ChunkedContent.ChunkRawSizes[NextOp.ChunkIndex]; if (ReadLength + NextChunkLength > BufferedOpenFile::BlockSize) { break; } ReadLength += NextChunkLength; OpSourceEnd += NextChunkLength; OpTargetEnd += NextChunkLength; WriteCount++; } { bool DidClone = false; if (CanCloneSource) { uint64_t PreBytes = 0; uint64_t PostBytes = 0; uint64_t ClonableBytes = CloneQuery->GetClonableRange(Op.CacheFileOffset, Op.Target->Offset, ReadLength, PreBytes, PostBytes); if (ClonableBytes > 0) { // We need to open the file... BufferedWriteFileCache::Local::Writer* Writer = LocalWriter.GetWriter(RemoteSequenceIndex); if (!Writer) { Writer = LocalWriter.PutWriter(RemoteSequenceIndex, std::make_unique()); Writer->File = std::make_unique(); const std::filesystem::path FileName = GetTempChunkedSequenceFileName(m_CacheFolderPath, m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]); Writer->File->Open(FileName, BasicFile::Mode::kWrite); if (m_Options.UseSparseFiles) { PrepareFileForScatteredWrite(Writer->File->Handle(), TargetSize); } } DidClone = CloneQuery->TryClone(SourceFile.Handle(), Writer->File->Handle(), Op.CacheFileOffset + PreBytes, Op.Target->Offset + PreBytes, ClonableBytes, TargetSize); if (DidClone) { m_DiskStats.WriteCount++; m_DiskStats.WriteByteCount += ClonableBytes; m_DiskStats.CloneCount++; m_DiskStats.CloneByteCount += ClonableBytes; m_WrittenChunkByteCount += ClonableBytes; if (PreBytes > 0) { CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, PreBytes); const uint64_t FileOffset = Op.Target->Offset; WriteSequenceChunkToCache(LocalWriter, ChunkSource, RemoteSequenceIndex, FileOffset, RemotePathIndex); } if (PostBytes > 0) { CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset + ReadLength - PostBytes, PostBytes); const uint64_t FileOffset = Op.Target->Offset + ReadLength - PostBytes; WriteSequenceChunkToCache(LocalWriter, ChunkSource, RemoteSequenceIndex, FileOffset, RemotePathIndex); } } } } if (!DidClone) { CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ReadLength); const uint64_t FileOffset = Op.Target->Offset; WriteSequenceChunkToCache(LocalWriter, ChunkSource, RemoteSequenceIndex, FileOffset, RemotePathIndex); } } CacheLocalFileBytesRead += ReadLength; // TODO: This should be the sum of unique chunk sizes? WriteOpIndex += WriteCount; } } if (m_Options.IsVerbose) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), SourceFilePath); } std::vector Result; Result.reserve(WriteOps.size()); for (const WriteOp& Op : WriteOps) { Result.push_back(Op.Target->SequenceIndex); } return Result; } bool BuildsOperationUpdateFolder::WriteCompressedChunkToCache( const IoHash& ChunkHash, const std::vector& ChunkTargetPtrs, BufferedWriteFileCache& WriteCache, IoBuffer&& CompressedPart) { ZEN_TRACE_CPU("WriteCompressedChunkToCache"); auto ChunkHashToChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); ZEN_ASSERT(ChunkHashToChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end()); if (IsSingleFileChunk(m_RemoteContent, ChunkTargetPtrs)) { const std::uint32_t SequenceIndex = ChunkTargetPtrs.front()->SequenceIndex; const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]; StreamDecompress(SequenceRawHash, CompositeBuffer(std::move(CompressedPart))); return false; } else { IoHash RawHash; uint64_t RawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompositeBuffer(std::move(CompressedPart)), RawHash, RawSize); if (!Compressed) { throw std::runtime_error(fmt::format("Failed to parse header of compressed large blob {}", ChunkHash)); } if (RawHash != ChunkHash) { throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, ChunkHash)); } BufferedWriteFileCache::Local LocalWriter(WriteCache); IoHashStream Hash; bool CouldDecompress = Compressed.DecompressToStream( 0, (uint64_t)-1, [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { ZEN_UNUSED(SourceOffset); ZEN_TRACE_CPU("Async_StreamDecompress_Write"); m_DiskStats.ReadByteCount += SourceSize; if (!m_AbortFlag) { for (const ChunkedContentLookup::ChunkSequenceLocation* TargetPtr : ChunkTargetPtrs) { const auto& Target = *TargetPtr; const uint64_t FileOffset = Target.Offset + Offset; const uint32_t SequenceIndex = Target.SequenceIndex; const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; WriteSequenceChunkToCache(LocalWriter, RangeBuffer, SequenceIndex, FileOffset, PathIndex); } return true; } return false; }); if (m_AbortFlag) { return false; } if (!CouldDecompress) { throw std::runtime_error(fmt::format("Failed to decompress large chunk {}", ChunkHash)); } return true; } } void BuildsOperationUpdateFolder::StreamDecompress(const IoHash& SequenceRawHash, CompositeBuffer&& CompressedPart) { ZEN_TRACE_CPU("StreamDecompress"); const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash); TemporaryFile DecompressedTemp; std::error_code Ec; DecompressedTemp.CreateTemporary(TempChunkSequenceFileName.parent_path(), Ec); if (Ec) { throw std::runtime_error(fmt::format("Failed creating temporary file for decompressing large blob {}, reason: ({}) {}", SequenceRawHash, Ec.value(), Ec.message())); } IoHash RawHash; uint64_t RawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompressedPart, RawHash, RawSize); if (!Compressed) { throw std::runtime_error(fmt::format("Failed to parse header of compressed large blob {}", SequenceRawHash)); } if (RawHash != SequenceRawHash) { throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, SequenceRawHash)); } PrepareFileForScatteredWrite(DecompressedTemp.Handle(), RawSize); IoHashStream Hash; bool CouldDecompress = Compressed.DecompressToStream(0, (uint64_t)-1, [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { ZEN_UNUSED(SourceOffset); ZEN_TRACE_CPU("StreamDecompress_Write"); m_DiskStats.ReadCount++; m_DiskStats.ReadByteCount += SourceSize; if (!m_AbortFlag) { for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) { if (m_Options.ValidateCompletedSequences) { Hash.Append(Segment.GetView()); m_ValidatedChunkByteCount += Segment.GetSize(); } DecompressedTemp.Write(Segment, Offset); Offset += Segment.GetSize(); m_DiskStats.WriteByteCount += Segment.GetSize(); m_DiskStats.WriteCount++; m_WrittenChunkByteCount += Segment.GetSize(); } return true; } return false; }); if (m_AbortFlag) { return; } if (!CouldDecompress) { throw std::runtime_error(fmt::format("Failed to decompress large blob {}", SequenceRawHash)); } if (m_Options.ValidateCompletedSequences) { const IoHash VerifyHash = Hash.GetHash(); if (VerifyHash != SequenceRawHash) { throw std::runtime_error( fmt::format("Decompressed blob payload hash {} does not match expected hash {}", VerifyHash, SequenceRawHash)); } } DecompressedTemp.MoveTemporaryIntoPlace(TempChunkSequenceFileName, Ec); if (Ec) { throw std::runtime_error(fmt::format("Failed moving temporary file for decompressing large blob {}, reason: ({}) {}", SequenceRawHash, Ec.value(), Ec.message())); } // WriteChunkStats.ChunkCountWritten++; } void BuildsOperationUpdateFolder::WriteSequenceChunkToCache(BufferedWriteFileCache::Local& LocalWriter, const CompositeBuffer& Chunk, const uint32_t SequenceIndex, const uint64_t FileOffset, const uint32_t PathIndex) { ZEN_TRACE_CPU("WriteSequenceChunkToCache"); const uint64_t SequenceSize = m_RemoteContent.RawSizes[PathIndex]; auto OpenFile = [&](BasicFile& File) { const std::filesystem::path FileName = GetTempChunkedSequenceFileName(m_CacheFolderPath, m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); File.Open(FileName, BasicFile::Mode::kWrite); if (m_Options.UseSparseFiles) { PrepareFileForScatteredWrite(File.Handle(), SequenceSize); } }; const uint64_t ChunkSize = Chunk.GetSize(); ZEN_ASSERT(FileOffset + ChunkSize <= SequenceSize); if (ChunkSize == SequenceSize) { BasicFile SingleChunkFile; OpenFile(SingleChunkFile); m_DiskStats.CurrentOpenFileCount++; auto _ = MakeGuard([this]() { m_DiskStats.CurrentOpenFileCount--; }); SingleChunkFile.Write(Chunk, FileOffset); } else { const uint64_t MaxWriterBufferSize = 256u * 1025u; BufferedWriteFileCache::Local::Writer* Writer = LocalWriter.GetWriter(SequenceIndex); if (Writer) { if ((!Writer->Writer) && (ChunkSize < MaxWriterBufferSize)) { Writer->Writer = std::make_unique(*Writer->File, Min(SequenceSize, MaxWriterBufferSize)); } Writer->Write(Chunk, FileOffset); } else { Writer = LocalWriter.PutWriter(SequenceIndex, std::make_unique()); Writer->File = std::make_unique(); OpenFile(*Writer->File); if (ChunkSize < MaxWriterBufferSize) { Writer->Writer = std::make_unique(*Writer->File, Min(SequenceSize, MaxWriterBufferSize)); } Writer->Write(Chunk, FileOffset); } } m_DiskStats.WriteCount++; m_DiskStats.WriteByteCount += ChunkSize; m_WrittenChunkByteCount += ChunkSize; } bool BuildsOperationUpdateFolder::GetBlockWriteOps(const IoHash& BlockRawHash, std::span ChunkRawHashes, std::span ChunkCompressedLengths, std::span> SequenceIndexChunksLeftToWriteCounters, std::span> RemoteChunkIndexNeedsCopyFromSourceFlags, const MemoryView BlockView, uint32_t FirstIncludedBlockChunkIndex, uint32_t LastIncludedBlockChunkIndex, BlockWriteOps& OutOps) { ZEN_TRACE_CPU("GetBlockWriteOps"); uint32_t OffsetInBlock = 0; for (uint32_t ChunkBlockIndex = FirstIncludedBlockChunkIndex; ChunkBlockIndex <= LastIncludedBlockChunkIndex; ChunkBlockIndex++) { const uint32_t ChunkCompressedSize = ChunkCompressedLengths[ChunkBlockIndex]; const IoHash& ChunkHash = ChunkRawHashes[ChunkBlockIndex]; if (auto It = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); It != m_RemoteLookup.ChunkHashToChunkIndex.end()) { const uint32_t ChunkIndex = It->second; std::vector ChunkTargetPtrs = GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, ChunkIndex); if (!ChunkTargetPtrs.empty()) { bool NeedsWrite = true; if (RemoteChunkIndexNeedsCopyFromSourceFlags[ChunkIndex].compare_exchange_strong(NeedsWrite, false)) { MemoryView ChunkMemoryView = BlockView.Mid(OffsetInBlock, ChunkCompressedSize); IoHash VerifyChunkHash; uint64_t VerifyChunkSize; CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(SharedBuffer::MakeView(ChunkMemoryView), VerifyChunkHash, VerifyChunkSize); if (!CompressedChunk) { throw std::runtime_error(fmt::format("Chunk {} at {}, size {} in block {} is not a valid compressed buffer", ChunkHash, OffsetInBlock, ChunkCompressedSize, BlockRawHash)); } if (VerifyChunkHash != ChunkHash) { throw std::runtime_error(fmt::format("Chunk {} at {}, size {} in block {} has a mismatching content hash {}", ChunkHash, OffsetInBlock, ChunkCompressedSize, BlockRawHash, VerifyChunkHash)); } if (VerifyChunkSize != m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]) { throw std::runtime_error( fmt::format("Chunk {} at {}, size {} in block {} has a mismatching raw size {}, expected {}", ChunkHash, OffsetInBlock, ChunkCompressedSize, BlockRawHash, VerifyChunkSize, m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex])); } OodleCompressor ChunkCompressor; OodleCompressionLevel ChunkCompressionLevel; uint64_t ChunkBlockSize; bool GetCompressParametersSuccess = CompressedChunk.TryGetCompressParameters(ChunkCompressor, ChunkCompressionLevel, ChunkBlockSize); ZEN_ASSERT(GetCompressParametersSuccess); IoBuffer Decompressed; if (ChunkCompressionLevel == OodleCompressionLevel::None) { MemoryView ChunkDecompressedMemoryView = ChunkMemoryView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder()); Decompressed = IoBuffer(IoBuffer::Wrap, ChunkDecompressedMemoryView.GetData(), ChunkDecompressedMemoryView.GetSize()); } else { Decompressed = CompressedChunk.Decompress().AsIoBuffer(); } if (Decompressed.GetSize() != m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]) { throw std::runtime_error(fmt::format("Chunk {} at {}, size {} in block {} decompressed to size {}, expected {}", ChunkHash, OffsetInBlock, ChunkCompressedSize, BlockRawHash, Decompressed.GetSize(), m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex])); } ZEN_ASSERT_SLOW(ChunkHash == IoHash::HashBuffer(Decompressed)); for (const ChunkedContentLookup::ChunkSequenceLocation* Target : ChunkTargetPtrs) { OutOps.WriteOps.push_back( BlockWriteOps::WriteOpData{.Target = Target, .ChunkBufferIndex = OutOps.ChunkBuffers.size()}); } OutOps.ChunkBuffers.emplace_back(std::move(Decompressed)); } } } OffsetInBlock += ChunkCompressedSize; } { ZEN_TRACE_CPU("Sort"); std::sort(OutOps.WriteOps.begin(), OutOps.WriteOps.end(), [](const BlockWriteOps::WriteOpData& Lhs, const BlockWriteOps::WriteOpData& Rhs) { if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex) { return true; } if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex) { return false; } return Lhs.Target->Offset < Rhs.Target->Offset; }); } return true; } void BuildsOperationUpdateFolder::WriteBlockChunkOpsToCache(std::span> SequenceIndexChunksLeftToWriteCounters, const BlockWriteOps& Ops, BufferedWriteFileCache& WriteCache, ParallelWork& Work) { ZEN_TRACE_CPU("WriteBlockChunkOpsToCache"); { BufferedWriteFileCache::Local LocalWriter(WriteCache); for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps) { if (Work.IsAborted()) { break; } const CompositeBuffer& Chunk = Ops.ChunkBuffers[WriteOp.ChunkBufferIndex]; const uint32_t SequenceIndex = WriteOp.Target->SequenceIndex; ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() <= m_RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]); ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() > 0); const uint64_t FileOffset = WriteOp.Target->Offset; const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; WriteSequenceChunkToCache(LocalWriter, Chunk, SequenceIndex, FileOffset, PathIndex); } } if (!Work.IsAborted()) { // Write tracking, updating this must be done without any files open (BufferedWriteFileCache::Local) std::vector CompletedChunkSequences; for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps) { const uint32_t RemoteSequenceIndex = WriteOp.Target->SequenceIndex; if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) { CompletedChunkSequences.push_back(RemoteSequenceIndex); } } WriteCache.Close(CompletedChunkSequences); VerifyAndCompleteChunkSequencesAsync(CompletedChunkSequences, Work); } } bool BuildsOperationUpdateFolder::WriteChunksBlockToCache(const ChunkBlockDescription& BlockDescription, std::span> SequenceIndexChunksLeftToWriteCounters, ParallelWork& Work, CompositeBuffer&& BlockBuffer, std::span> RemoteChunkIndexNeedsCopyFromSourceFlags, BufferedWriteFileCache& WriteCache) { ZEN_TRACE_CPU("WriteChunksBlockToCache"); IoBuffer BlockMemoryBuffer = MakeBufferMemoryBased(BlockBuffer); const MemoryView BlockView = BlockMemoryBuffer.GetView(); BlockWriteOps Ops; if ((BlockDescription.HeaderSize == 0) || BlockDescription.ChunkCompressedLengths.empty()) { ZEN_TRACE_CPU("WriteChunksBlockToCache_Legacy"); uint64_t HeaderSize; const std::vector ChunkCompressedLengths = ReadChunkBlockHeader(BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder()), HeaderSize); if (GetBlockWriteOps(BlockDescription.BlockHash, BlockDescription.ChunkRawHashes, ChunkCompressedLengths, SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndexNeedsCopyFromSourceFlags, BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder() + HeaderSize), 0, gsl::narrow(BlockDescription.ChunkRawHashes.size() - 1), Ops)) { WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work); return true; } return false; } if (GetBlockWriteOps(BlockDescription.BlockHash, BlockDescription.ChunkRawHashes, BlockDescription.ChunkCompressedLengths, SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndexNeedsCopyFromSourceFlags, BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize), 0, gsl::narrow(BlockDescription.ChunkRawHashes.size() - 1), Ops)) { WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work); return true; } return false; } bool BuildsOperationUpdateFolder::WritePartialBlockChunksToCache(const ChunkBlockDescription& BlockDescription, std::span> SequenceIndexChunksLeftToWriteCounters, ParallelWork& Work, CompositeBuffer&& PartialBlockBuffer, uint32_t FirstIncludedBlockChunkIndex, uint32_t LastIncludedBlockChunkIndex, std::span> RemoteChunkIndexNeedsCopyFromSourceFlags, BufferedWriteFileCache& WriteCache) { ZEN_TRACE_CPU("WritePartialBlockChunksToCache"); IoBuffer BlockMemoryBuffer = MakeBufferMemoryBased(PartialBlockBuffer); const MemoryView BlockView = BlockMemoryBuffer.GetView(); BlockWriteOps Ops; if (GetBlockWriteOps(BlockDescription.BlockHash, BlockDescription.ChunkRawHashes, BlockDescription.ChunkCompressedLengths, SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndexNeedsCopyFromSourceFlags, BlockView, FirstIncludedBlockChunkIndex, LastIncludedBlockChunkIndex, Ops)) { WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work); return true; } else { return false; } } void BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::path& ZenFolderPath, uint32_t RemoteChunkIndex, std::vector&& ChunkTargetPtrs, BufferedWriteFileCache& WriteCache, ParallelWork& Work, IoBuffer&& Payload, std::span> SequenceIndexChunksLeftToWriteCounters, std::atomic& WritePartsComplete, const uint64_t TotalPartWriteCount, FilteredRate& FilteredWrittenBytesPerSecond, bool EnableBacklog) { ZEN_TRACE_CPU("AsyncWriteDownloadedChunk"); const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; const uint64_t Size = Payload.GetSize(); std::filesystem::path CompressedChunkPath; // Check if the dowloaded chunk is file based and we can move it directly without rewriting it { IoBufferFileReference FileRef; if (Payload.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == Size)) { ZEN_TRACE_CPU("MoveTempChunk"); std::error_code Ec; std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); if (!Ec) { Payload.SetDeleteOnClose(false); Payload = {}; CompressedChunkPath = m_TempDownloadFolderPath / ChunkHash.ToHexString(); RenameFile(TempBlobPath, CompressedChunkPath, Ec); if (Ec) { CompressedChunkPath = std::filesystem::path{}; // Re-open the temp file again BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); Payload = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, Size, true); Payload.SetDeleteOnClose(true); } } } } if (CompressedChunkPath.empty() && (Size > m_Options.MaximumInMemoryPayloadSize)) { ZEN_TRACE_CPU("WriteTempChunk"); // Could not be moved and rather large, lets store it on disk CompressedChunkPath = m_TempDownloadFolderPath / ChunkHash.ToHexString(); TemporaryFile::SafeWriteFile(CompressedChunkPath, Payload); Payload = {}; } Work.ScheduleWork( m_IOWorkerPool, [&ZenFolderPath, this, SequenceIndexChunksLeftToWriteCounters, &Work, CompressedChunkPath, RemoteChunkIndex, TotalPartWriteCount, &WriteCache, &WritePartsComplete, &FilteredWrittenBytesPerSecond, ChunkTargetPtrs = std::move(ChunkTargetPtrs), CompressedPart = IoBuffer(std::move(Payload))](std::atomic&) mutable { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_WriteChunk"); FilteredWrittenBytesPerSecond.Start(); const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; if (CompressedChunkPath.empty()) { ZEN_ASSERT(CompressedPart); } else { ZEN_ASSERT(!CompressedPart); CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); if (!CompressedPart) { throw std::runtime_error( fmt::format("Could not open dowloaded compressed chunk {} from {}", ChunkHash, CompressedChunkPath)); } } bool NeedHashVerify = WriteCompressedChunkToCache(ChunkHash, ChunkTargetPtrs, WriteCache, std::move(CompressedPart)); if (!m_AbortFlag) { WritePartsComplete++; if (WritePartsComplete == TotalPartWriteCount) { FilteredWrittenBytesPerSecond.Stop(); } if (!CompressedChunkPath.empty()) { std::error_code Ec = TryRemoveFile(CompressedChunkPath); if (Ec) { ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Failed removing file '{}', reason: ({}) {}", CompressedChunkPath, Ec.value(), Ec.message()); } } std::vector CompletedSequences = CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); WriteCache.Close(CompletedSequences); if (NeedHashVerify) { VerifyAndCompleteChunkSequencesAsync(CompletedSequences, Work); } else { FinalizeChunkSequences(CompletedSequences); } } } }, EnableBacklog ? WorkerThreadPool::EMode::EnableBacklog : WorkerThreadPool::EMode::DisableBacklog); } void BuildsOperationUpdateFolder::VerifyAndCompleteChunkSequencesAsync(std::span RemoteSequenceIndexes, ParallelWork& Work) { if (RemoteSequenceIndexes.empty()) { return; } ZEN_TRACE_CPU("VerifyAndCompleteChunkSequence"); if (m_Options.ValidateCompletedSequences) { for (uint32_t RemoteSequenceIndexOffset = 1; RemoteSequenceIndexOffset < RemoteSequenceIndexes.size(); RemoteSequenceIndexOffset++) { const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset]; Work.ScheduleWork(m_IOWorkerPool, [this, RemoteSequenceIndex](std::atomic&) { if (!m_AbortFlag) { ZEN_TRACE_CPU("Async_VerifyAndFinalizeSequence"); VerifySequence(RemoteSequenceIndex); if (!m_AbortFlag) { const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; FinalizeChunkSequence(SequenceRawHash); } } }); } const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[0]; VerifySequence(RemoteSequenceIndex); const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; FinalizeChunkSequence(SequenceRawHash); } else { for (uint32_t RemoteSequenceIndexOffset = 0; RemoteSequenceIndexOffset < RemoteSequenceIndexes.size(); RemoteSequenceIndexOffset++) { const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset]; const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; FinalizeChunkSequence(SequenceRawHash); } } } bool BuildsOperationUpdateFolder::CompleteSequenceChunk(uint32_t RemoteSequenceIndex, std::span> SequenceIndexChunksLeftToWriteCounters) { uint32_t PreviousValue = SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1); ZEN_ASSERT(PreviousValue >= 1); ZEN_ASSERT(PreviousValue != (uint32_t)-1); return PreviousValue == 1; } std::vector BuildsOperationUpdateFolder::CompleteChunkTargets(const std::vector& ChunkTargetPtrs, std::span> SequenceIndexChunksLeftToWriteCounters) { ZEN_TRACE_CPU("CompleteChunkTargets"); std::vector CompletedSequenceIndexes; for (const ChunkedContentLookup::ChunkSequenceLocation* Location : ChunkTargetPtrs) { const uint32_t RemoteSequenceIndex = Location->SequenceIndex; if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) { CompletedSequenceIndexes.push_back(RemoteSequenceIndex); } } return CompletedSequenceIndexes; } void BuildsOperationUpdateFolder::FinalizeChunkSequence(const IoHash& SequenceRawHash) { ZEN_TRACE_CPU("FinalizeChunkSequence"); ZEN_ASSERT_SLOW(!IsFile(GetFinalChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash))); std::error_code Ec; RenameFile(GetTempChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash), GetFinalChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash), Ec); if (Ec) { throw std::system_error(Ec); } } void BuildsOperationUpdateFolder::FinalizeChunkSequences(std::span RemoteSequenceIndexes) { ZEN_TRACE_CPU("FinalizeChunkSequences"); for (uint32_t SequenceIndex : RemoteSequenceIndexes) { FinalizeChunkSequence(m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); } } void BuildsOperationUpdateFolder::VerifySequence(uint32_t RemoteSequenceIndex) { ZEN_TRACE_CPU("VerifySequence"); ZEN_ASSERT(m_Options.ValidateCompletedSequences); const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; { ZEN_TRACE_CPU("HashSequence"); const std::uint32_t RemotePathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]; const uint64_t ExpectedSize = m_RemoteContent.RawSizes[RemotePathIndex]; IoBuffer VerifyBuffer = IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash)); const uint64_t VerifySize = VerifyBuffer.GetSize(); if (VerifySize != ExpectedSize) { throw std::runtime_error(fmt::format("Written chunk sequence {} size {} does not match expected size {}", SequenceRawHash, VerifySize, ExpectedSize)); } const IoHash VerifyChunkHash = IoHash::HashBuffer(std::move(VerifyBuffer), &m_ValidatedChunkByteCount); if (VerifyChunkHash != SequenceRawHash) { throw std::runtime_error( fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash)); } } } ////////////////////// BuildsOperationUploadFolder BuildsOperationUploadFolder::BuildsOperationUploadFolder(OperationLogOutput& OperationLogOutput, StorageInstance& Storage, std::atomic& AbortFlag, std::atomic& PauseFlag, WorkerThreadPool& IOWorkerPool, WorkerThreadPool& NetworkPool, const Oid& BuildId, const std::filesystem::path& Path, bool CreateBuild, const CbObject& MetaData, const Options& Options) : m_LogOutput(OperationLogOutput) , m_Storage(Storage) , m_AbortFlag(AbortFlag) , m_PauseFlag(PauseFlag) , m_IOWorkerPool(IOWorkerPool) , m_NetworkPool(NetworkPool) , m_BuildId(BuildId) , m_Path(Path) , m_CreateBuild(CreateBuild) , m_MetaData(MetaData) , m_Options(Options) { m_NonCompressableExtensionHashes.reserve(Options.NonCompressableExtensions.size()); for (const std::string& Extension : Options.NonCompressableExtensions) { m_NonCompressableExtensionHashes.insert(HashStringAsLowerDjb2(Extension)); } } BuildsOperationUploadFolder::PrepareBuildResult BuildsOperationUploadFolder::PrepareBuild() { ZEN_TRACE_CPU("PrepareBuild"); PrepareBuildResult Result; Result.PreferredMultipartChunkSize = m_Options.PreferredMultipartChunkSize; Stopwatch Timer; if (m_CreateBuild) { ZEN_TRACE_CPU("CreateBuild"); Stopwatch PutBuildTimer; CbObject PutBuildResult = m_Storage.BuildStorage->PutBuild(m_BuildId, m_MetaData); Result.PrepareBuildTimeMs = PutBuildTimer.GetElapsedTimeMs(); if (auto ChunkSize = PutBuildResult["chunkSize"sv].AsUInt64(); ChunkSize != 0) { Result.PreferredMultipartChunkSize = ChunkSize; } Result.PayloadSize = m_MetaData.GetSize(); } else { ZEN_TRACE_CPU("PutBuild"); Stopwatch GetBuildTimer; CbObject Build = m_Storage.BuildStorage->GetBuild(m_BuildId); Result.PrepareBuildTimeMs = GetBuildTimer.GetElapsedTimeMs(); Result.PayloadSize = Build.GetSize(); if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0) { Result.PreferredMultipartChunkSize = ChunkSize; } else if (m_Options.AllowMultiparts) { ZEN_OPERATION_LOG_WARN(m_LogOutput, "PreferredMultipartChunkSize is unknown. Defaulting to '{}'", NiceBytes(Result.PreferredMultipartChunkSize)); } } if (!m_Options.IgnoreExistingBlocks) { ZEN_TRACE_CPU("FindBlocks"); Stopwatch KnownBlocksTimer; CbObject BlockDescriptionList = m_Storage.BuildStorage->FindBlocks(m_BuildId, m_Options.FindBlockMaxCount); if (BlockDescriptionList) { Result.KnownBlocks = ParseChunkBlockDescriptionList(BlockDescriptionList); } Result.FindBlocksTimeMs = KnownBlocksTimer.GetElapsedTimeMs(); } Result.ElapsedTimeMs = Timer.GetElapsedTimeMs(); return Result; } std::vector BuildsOperationUploadFolder::ReadFolder() { std::vector UploadParts; std::filesystem::path ExcludeManifestPath = m_Path / m_Options.ZenExcludeManifestName; tsl::robin_set ExcludeAssetPaths; if (IsFile(ExcludeManifestPath)) { std::filesystem::path AbsoluteExcludeManifestPath = MakeSafeAbsolutePath(ExcludeManifestPath.is_absolute() ? ExcludeManifestPath : m_Path / ExcludeManifestPath); BuildManifest Manifest = ParseBuildManifest(AbsoluteExcludeManifestPath); const std::vector& AssetPaths = Manifest.Parts.front().Files; ExcludeAssetPaths.reserve(AssetPaths.size()); for (const std::filesystem::path& AssetPath : AssetPaths) { ExcludeAssetPaths.insert(AssetPath.generic_string()); } } UploadParts.resize(1); UploadPart& Part = UploadParts.front(); GetFolderContentStatistics& LocalFolderScanStats = Part.LocalFolderScanStats; Part.Content = GetFolderContent( Part.LocalFolderScanStats, m_Path, [this](const std::string_view& RelativePath) { return IsAcceptedFolder(RelativePath); }, [this, &ExcludeAssetPaths](const std::string_view& RelativePath, uint64_t Size, uint32_t Attributes) -> bool { ZEN_UNUSED(Size, Attributes); if (!IsAcceptedFile(RelativePath)) { return false; } if (ExcludeAssetPaths.contains(std::filesystem::path(RelativePath).generic_string())) { return false; } return true; }, m_IOWorkerPool, m_LogOutput.GetProgressUpdateDelayMS(), [&](bool, std::ptrdiff_t) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), m_Path); }, m_AbortFlag); Part.TotalRawSize = std::accumulate(Part.Content.RawSizes.begin(), Part.Content.RawSizes.end(), std::uint64_t(0)); return UploadParts; } std::vector BuildsOperationUploadFolder::ReadManifestParts(const std::filesystem::path& ManifestPath) { std::vector UploadParts; Stopwatch ManifestParseTimer; std::filesystem::path AbsoluteManifestPath = MakeSafeAbsolutePath(ManifestPath.is_absolute() ? ManifestPath : m_Path / ManifestPath); BuildManifest Manifest = ParseBuildManifest(AbsoluteManifestPath); if (Manifest.Parts.empty()) { throw std::runtime_error(fmt::format("Manifest file at '{}' is invalid", ManifestPath)); } UploadParts.resize(Manifest.Parts.size()); for (size_t PartIndex = 0; PartIndex < Manifest.Parts.size(); PartIndex++) { BuildManifest::Part& PartManifest = Manifest.Parts[PartIndex]; if (ManifestPath.is_relative()) { PartManifest.Files.push_back(ManifestPath); } UploadPart& Part = UploadParts[PartIndex]; FolderContent& Content = Part.Content; GetFolderContentStatistics& LocalFolderScanStats = Part.LocalFolderScanStats; const std::vector& AssetPaths = PartManifest.Files; Content = GetValidFolderContent( m_IOWorkerPool, LocalFolderScanStats, m_Path, AssetPaths, [](uint64_t PathCount, uint64_t CompletedPathCount) { ZEN_UNUSED(PathCount, CompletedPathCount); }, 1000, m_AbortFlag, m_PauseFlag); if (Content.Paths.size() != AssetPaths.size()) { const tsl::robin_set FoundPaths(Content.Paths.begin(), Content.Paths.end()); ExtendableStringBuilder<1024> SB; for (const std::filesystem::path& AssetPath : AssetPaths) { if (!FoundPaths.contains(AssetPath)) { SB << "\n " << AssetPath.generic_string(); } } throw std::runtime_error( fmt::format("Manifest file at '{}' references files that does not exist{}", ManifestPath, SB.ToView())); } Part.PartId = PartManifest.PartId; Part.PartName = PartManifest.PartName; Part.TotalRawSize = std::accumulate(Part.Content.RawSizes.begin(), Part.Content.RawSizes.end(), std::uint64_t(0)); } return UploadParts; } std::vector> BuildsOperationUploadFolder::Execute(const Oid& BuildPartId, const std::string_view BuildPartName, const std::filesystem::path& ManifestPath, ChunkingController& ChunkController, ChunkingCache& ChunkCache) { ZEN_TRACE_CPU("BuildsOperationUploadFolder::Execute"); try { Stopwatch ReadPartsTimer; std::vector UploadParts = ManifestPath.empty() ? ReadFolder() : ReadManifestParts(ManifestPath); for (UploadPart& Part : UploadParts) { if (Part.PartId == Oid::Zero) { if (UploadParts.size() != 1) { throw std::runtime_error(fmt::format("Multi part upload manifest '{}' must contains build part id", ManifestPath)); } if (BuildPartId == Oid::Zero) { Part.PartId = Oid::NewOid(); } else { Part.PartId = BuildPartId; } } if (Part.PartName.empty()) { if (UploadParts.size() != 1) { throw std::runtime_error(fmt::format("Multi part upload manifest '{}' must contains build part name", ManifestPath)); } if (BuildPartName.empty()) { throw std::runtime_error("Build part name must be set"); } Part.PartName = std::string(BuildPartName); } } if (!m_Options.IsQuiet) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Reading {} parts took {}", UploadParts.size(), NiceTimeSpanMs(ReadPartsTimer.GetElapsedTimeMs())); } const uint32_t PartsUploadStepCount = gsl::narrow(uint32_t(PartTaskSteps::StepCount) * UploadParts.size()); const uint32_t PrepareBuildStep = 0; const uint32_t UploadPartsStep = 1; const uint32_t FinalizeBuildStep = UploadPartsStep + PartsUploadStepCount; const uint32_t CleanupStep = FinalizeBuildStep + 1; const uint32_t StepCount = CleanupStep + 1; auto EndProgress = MakeGuard([&]() { m_LogOutput.SetLogOperationProgress(StepCount, StepCount); }); Stopwatch ProcessTimer; CleanAndRemoveDirectory(m_IOWorkerPool, m_AbortFlag, m_PauseFlag, m_Options.TempDir); CreateDirectories(m_Options.TempDir); auto _ = MakeGuard([&]() { CleanAndRemoveDirectory(m_IOWorkerPool, m_AbortFlag, m_PauseFlag, m_Options.TempDir); }); m_LogOutput.SetLogOperationProgress(PrepareBuildStep, StepCount); m_PrepBuildResultFuture = m_NetworkPool.EnqueueTask(std::packaged_task{[this] { return PrepareBuild(); }}, WorkerThreadPool::EMode::EnableBacklog); for (uint32_t PartIndex = 0; PartIndex < UploadParts.size(); PartIndex++) { const uint32_t PartStepOffset = UploadPartsStep + (PartIndex * uint32_t(PartTaskSteps::StepCount)); const UploadPart& Part = UploadParts[PartIndex]; UploadBuildPart(ChunkController, ChunkCache, PartIndex, Part, PartStepOffset, StepCount); if (m_AbortFlag) { return {}; } } m_LogOutput.SetLogOperationProgress(FinalizeBuildStep, StepCount); if (m_CreateBuild && !m_AbortFlag) { Stopwatch FinalizeBuildTimer; m_Storage.BuildStorage->FinalizeBuild(m_BuildId); if (!m_Options.IsQuiet) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "FinalizeBuild took {}", NiceTimeSpanMs(FinalizeBuildTimer.GetElapsedTimeMs())); } } m_LogOutput.SetLogOperationProgress(CleanupStep, StepCount); std::vector> Result; Result.reserve(UploadParts.size()); for (UploadPart& Part : UploadParts) { Result.push_back(std::make_pair(Part.PartId, Part.PartName)); } return Result; } catch (const std::exception&) { m_AbortFlag = true; throw; } } bool BuildsOperationUploadFolder::IsAcceptedFolder(const std::string_view& RelativePath) const { for (const std::string& ExcludeFolder : m_Options.ExcludeFolders) { if (RelativePath.starts_with(ExcludeFolder)) { if (RelativePath.length() == ExcludeFolder.length()) { return false; } else if (RelativePath[ExcludeFolder.length()] == '/') { return false; } } } return true; } bool BuildsOperationUploadFolder::IsAcceptedFile(const std::string_view& RelativePath) const { if (RelativePath == m_Options.ZenExcludeManifestName) { return false; } for (const std::string& ExcludeExtension : m_Options.ExcludeExtensions) { if (RelativePath.ends_with(ExcludeExtension)) { return false; } } return true; } void BuildsOperationUploadFolder::ArrangeChunksIntoBlocks(const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, std::vector& ChunkIndexes, std::vector>& OutBlocks) { ZEN_TRACE_CPU("ArrangeChunksIntoBlocks"); std::sort(ChunkIndexes.begin(), ChunkIndexes.end(), [&Content, &Lookup](uint32_t Lhs, uint32_t Rhs) { const ChunkedContentLookup::ChunkSequenceLocation& LhsLocation = GetChunkSequenceLocations(Lookup, Lhs)[0]; const ChunkedContentLookup::ChunkSequenceLocation& RhsLocation = GetChunkSequenceLocations(Lookup, Rhs)[0]; if (LhsLocation.SequenceIndex < RhsLocation.SequenceIndex) { return true; } else if (LhsLocation.SequenceIndex > RhsLocation.SequenceIndex) { return false; } return LhsLocation.Offset < RhsLocation.Offset; }); uint64_t MaxBlockSizeLowThreshold = m_Options.BlockParameters.MaxBlockSize - (m_Options.BlockParameters.MaxBlockSize / 16); uint64_t BlockSize = 0; uint32_t ChunkIndexStart = 0; for (uint32_t ChunkIndexOffset = 0; ChunkIndexOffset < ChunkIndexes.size();) { const uint32_t ChunkIndex = ChunkIndexes[ChunkIndexOffset]; const uint64_t ChunkSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; if (((BlockSize + ChunkSize) > m_Options.BlockParameters.MaxBlockSize) || (ChunkIndexOffset - ChunkIndexStart) > m_Options.BlockParameters.MaxChunksPerBlock) { // Within the span of MaxBlockSizeLowThreshold and MaxBlockSize, see if there is a break // between source paths for chunks. Break the block at the last such break if any. ZEN_ASSERT(ChunkIndexOffset > ChunkIndexStart); const uint32_t ChunkSequenceIndex = Lookup.ChunkSequenceLocations[Lookup.ChunkSequenceLocationOffset[ChunkIndex]].SequenceIndex; uint64_t ScanBlockSize = BlockSize; uint32_t ScanChunkIndexOffset = ChunkIndexOffset - 1; while (ScanChunkIndexOffset > (ChunkIndexStart + 2)) { const uint32_t TestChunkIndex = ChunkIndexes[ScanChunkIndexOffset]; const uint64_t TestChunkSize = Content.ChunkedContent.ChunkRawSizes[TestChunkIndex]; if ((ScanBlockSize - TestChunkSize) < MaxBlockSizeLowThreshold) { break; } const uint32_t TestSequenceIndex = Lookup.ChunkSequenceLocations[Lookup.ChunkSequenceLocationOffset[TestChunkIndex]].SequenceIndex; if (ChunkSequenceIndex != TestSequenceIndex) { ChunkIndexOffset = ScanChunkIndexOffset + 1; break; } ScanBlockSize -= TestChunkSize; ScanChunkIndexOffset--; } std::vector ChunksInBlock; ChunksInBlock.reserve(ChunkIndexOffset - ChunkIndexStart); for (uint32_t AddIndexOffset = ChunkIndexStart; AddIndexOffset < ChunkIndexOffset; AddIndexOffset++) { const uint32_t AddChunkIndex = ChunkIndexes[AddIndexOffset]; ChunksInBlock.push_back(AddChunkIndex); } OutBlocks.emplace_back(std::move(ChunksInBlock)); BlockSize = 0; ChunkIndexStart = ChunkIndexOffset; } else { ChunkIndexOffset++; BlockSize += ChunkSize; } } if (ChunkIndexStart < ChunkIndexes.size()) { std::vector ChunksInBlock; ChunksInBlock.reserve(ChunkIndexes.size() - ChunkIndexStart); for (uint32_t AddIndexOffset = ChunkIndexStart; AddIndexOffset < ChunkIndexes.size(); AddIndexOffset++) { const uint32_t AddChunkIndex = ChunkIndexes[AddIndexOffset]; ChunksInBlock.push_back(AddChunkIndex); } OutBlocks.emplace_back(std::move(ChunksInBlock)); } } void BuildsOperationUploadFolder::GenerateBuildBlocks(const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, const std::vector>& NewBlockChunks, GeneratedBlocks& OutBlocks, GenerateBlocksStatistics& GenerateBlocksStats, UploadStatistics& UploadStats) { ZEN_TRACE_CPU("GenerateBuildBlocks"); const std::size_t NewBlockCount = NewBlockChunks.size(); if (NewBlockCount > 0) { std::unique_ptr ProgressBarPtr(m_LogOutput.CreateProgressBar("Generate Blocks")); OperationLogOutput::ProgressBar& Progress(*ProgressBarPtr); OutBlocks.BlockDescriptions.resize(NewBlockCount); OutBlocks.BlockSizes.resize(NewBlockCount); OutBlocks.BlockMetaDatas.resize(NewBlockCount); OutBlocks.BlockHeaders.resize(NewBlockCount); OutBlocks.MetaDataHasBeenUploaded.resize(NewBlockCount, 0); OutBlocks.BlockHashToBlockIndex.reserve(NewBlockCount); RwLock Lock; WorkerThreadPool& GenerateBlobsPool = m_IOWorkerPool; WorkerThreadPool& UploadBlocksPool = m_NetworkPool; FilteredRate FilteredGeneratedBytesPerSecond; FilteredRate FilteredUploadedBytesPerSecond; ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); std::atomic QueuedPendingBlocksForUpload = 0; for (size_t BlockIndex = 0; BlockIndex < NewBlockCount; BlockIndex++) { if (Work.IsAborted()) { break; } const std::vector& ChunksInBlock = NewBlockChunks[BlockIndex]; Work.ScheduleWork( GenerateBlobsPool, [this, &Content, &Lookup, &Work, &UploadBlocksPool, NewBlockCount, ChunksInBlock, &Lock, &OutBlocks, &GenerateBlocksStats, &UploadStats, &FilteredGeneratedBytesPerSecond, &QueuedPendingBlocksForUpload, &FilteredUploadedBytesPerSecond, BlockIndex](std::atomic&) { if (!m_AbortFlag) { ZEN_TRACE_CPU("GenerateBuildBlocks_Generate"); FilteredGeneratedBytesPerSecond.Start(); Stopwatch GenerateTimer; CompressedBuffer CompressedBlock = GenerateBlock(Content, Lookup, ChunksInBlock, OutBlocks.BlockDescriptions[BlockIndex]); if (m_Options.IsVerbose) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Generated block {} ({}) containing {} chunks in {}", OutBlocks.BlockDescriptions[BlockIndex].BlockHash, NiceBytes(CompressedBlock.GetCompressedSize()), OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size(), NiceTimeSpanMs(GenerateTimer.GetElapsedTimeMs())); } OutBlocks.BlockSizes[BlockIndex] = CompressedBlock.GetCompressedSize(); { CbObjectWriter Writer; Writer.AddString("createdBy", "zen"); OutBlocks.BlockMetaDatas[BlockIndex] = Writer.Save(); } GenerateBlocksStats.GeneratedBlockByteCount += OutBlocks.BlockSizes[BlockIndex]; GenerateBlocksStats.GeneratedBlockCount++; Lock.WithExclusiveLock([&]() { OutBlocks.BlockHashToBlockIndex.insert_or_assign(OutBlocks.BlockDescriptions[BlockIndex].BlockHash, BlockIndex); }); { std::span Segments = CompressedBlock.GetCompressed().GetSegments(); ZEN_ASSERT(Segments.size() >= 2); OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]); } if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount) { FilteredGeneratedBytesPerSecond.Stop(); } if (QueuedPendingBlocksForUpload.load() > 16) { std::span Segments = CompressedBlock.GetCompressed().GetSegments(); ZEN_ASSERT(Segments.size() >= 2); OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]); } else { if (!m_AbortFlag) { QueuedPendingBlocksForUpload++; Work.ScheduleWork( UploadBlocksPool, [this, NewBlockCount, &GenerateBlocksStats, &UploadStats, &FilteredUploadedBytesPerSecond, &QueuedPendingBlocksForUpload, &OutBlocks, BlockIndex, Payload = std::move(CompressedBlock)](std::atomic&) mutable { auto _ = MakeGuard([&QueuedPendingBlocksForUpload] { QueuedPendingBlocksForUpload--; }); if (!m_AbortFlag) { if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount) { ZEN_TRACE_CPU("GenerateBuildBlocks_Save"); FilteredUploadedBytesPerSecond.Stop(); std::span Segments = Payload.GetCompressed().GetSegments(); ZEN_ASSERT(Segments.size() >= 2); OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]); } else { ZEN_TRACE_CPU("GenerateBuildBlocks_Upload"); FilteredUploadedBytesPerSecond.Start(); const CbObject BlockMetaData = BuildChunkBlockDescription(OutBlocks.BlockDescriptions[BlockIndex], OutBlocks.BlockMetaDatas[BlockIndex]); const IoHash& BlockHash = OutBlocks.BlockDescriptions[BlockIndex].BlockHash; const uint64_t CompressedBlockSize = Payload.GetCompressedSize(); if (m_Storage.BuildCacheStorage && m_Options.PopulateCache) { m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload.GetCompressed()); } m_Storage.BuildStorage->PutBuildBlob(m_BuildId, BlockHash, ZenContentType::kCompressedBinary, std::move(Payload).GetCompressed()); UploadStats.BlocksBytes += CompressedBlockSize; if (m_Options.IsVerbose) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Uploaded block {} ({}) containing {} chunks", BlockHash, NiceBytes(CompressedBlockSize), OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); } if (m_Storage.BuildCacheStorage && m_Options.PopulateCache) { m_Storage.BuildCacheStorage->PutBlobMetadatas(m_BuildId, std::vector({BlockHash}), std::vector({BlockMetaData})); } bool MetadataSucceeded = m_Storage.BuildStorage->PutBlockMetadata(m_BuildId, BlockHash, BlockMetaData); if (MetadataSucceeded) { if (m_Options.IsVerbose) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Uploaded block {} metadata ({})", BlockHash, NiceBytes(BlockMetaData.GetSize())); } OutBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; UploadStats.BlocksBytes += BlockMetaData.GetSize(); } UploadStats.BlockCount++; if (UploadStats.BlockCount == NewBlockCount) { FilteredUploadedBytesPerSecond.Stop(); } } } }); } } } }); } Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { ZEN_UNUSED(PendingWork); FilteredGeneratedBytesPerSecond.Update(GenerateBlocksStats.GeneratedBlockByteCount.load()); FilteredUploadedBytesPerSecond.Update(UploadStats.BlocksBytes.load()); std::string Details = fmt::format("Generated {}/{} ({}, {}B/s). Uploaded {}/{} ({}, {}bits/s)", GenerateBlocksStats.GeneratedBlockCount.load(), NewBlockCount, NiceBytes(GenerateBlocksStats.GeneratedBlockByteCount.load()), NiceNum(FilteredGeneratedBytesPerSecond.GetCurrent()), UploadStats.BlockCount.load(), NewBlockCount, NiceBytes(UploadStats.BlocksBytes.load()), NiceNum(FilteredUploadedBytesPerSecond.GetCurrent() * 8)); Progress.UpdateState({.Task = "Generating blocks", .Details = Details, .TotalCount = gsl::narrow(NewBlockCount), .RemainingCount = gsl::narrow(NewBlockCount - GenerateBlocksStats.GeneratedBlockCount.load()), .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); ZEN_ASSERT(m_AbortFlag || QueuedPendingBlocksForUpload.load() == 0); Progress.Finish(); GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGeneratedBytesPerSecond.GetElapsedTimeUS(); UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS(); } } std::vector BuildsOperationUploadFolder::CalculateAbsoluteChunkOrders( const std::span LocalChunkHashes, const std::span LocalChunkOrder, const tsl::robin_map& ChunkHashToLocalChunkIndex, const std::span& LooseChunkIndexes, const std::span& BlockDescriptions) { ZEN_TRACE_CPU("CalculateAbsoluteChunkOrders"); std::vector TmpAbsoluteChunkHashes; if (m_Options.DoExtraContentValidation) { TmpAbsoluteChunkHashes.reserve(LocalChunkHashes.size()); } std::vector LocalChunkIndexToAbsoluteChunkIndex; LocalChunkIndexToAbsoluteChunkIndex.resize(LocalChunkHashes.size(), (uint32_t)-1); std::uint32_t AbsoluteChunkCount = 0; for (uint32_t ChunkIndex : LooseChunkIndexes) { LocalChunkIndexToAbsoluteChunkIndex[ChunkIndex] = AbsoluteChunkCount; if (m_Options.DoExtraContentValidation) { TmpAbsoluteChunkHashes.push_back(LocalChunkHashes[ChunkIndex]); } AbsoluteChunkCount++; } for (const ChunkBlockDescription& Block : BlockDescriptions) { for (const IoHash& ChunkHash : Block.ChunkRawHashes) { if (auto It = ChunkHashToLocalChunkIndex.find(ChunkHash); It != ChunkHashToLocalChunkIndex.end()) { const uint32_t LocalChunkIndex = It->second; ZEN_ASSERT_SLOW(LocalChunkHashes[LocalChunkIndex] == ChunkHash); LocalChunkIndexToAbsoluteChunkIndex[LocalChunkIndex] = AbsoluteChunkCount; } if (m_Options.DoExtraContentValidation) { TmpAbsoluteChunkHashes.push_back(ChunkHash); } AbsoluteChunkCount++; } } std::vector AbsoluteChunkOrder; AbsoluteChunkOrder.reserve(LocalChunkHashes.size()); for (const uint32_t LocalChunkIndex : LocalChunkOrder) { const uint32_t AbsoluteChunkIndex = LocalChunkIndexToAbsoluteChunkIndex[LocalChunkIndex]; if (m_Options.DoExtraContentValidation) { ZEN_ASSERT(LocalChunkHashes[LocalChunkIndex] == TmpAbsoluteChunkHashes[AbsoluteChunkIndex]); } AbsoluteChunkOrder.push_back(AbsoluteChunkIndex); } if (m_Options.DoExtraContentValidation) { uint32_t OrderIndex = 0; while (OrderIndex < LocalChunkOrder.size()) { const uint32_t LocalChunkIndex = LocalChunkOrder[OrderIndex]; const IoHash& LocalChunkHash = LocalChunkHashes[LocalChunkIndex]; const uint32_t AbsoluteChunkIndex = AbsoluteChunkOrder[OrderIndex]; const IoHash& AbsoluteChunkHash = TmpAbsoluteChunkHashes[AbsoluteChunkIndex]; ZEN_ASSERT(LocalChunkHash == AbsoluteChunkHash); OrderIndex++; } } return AbsoluteChunkOrder; } CompositeBuffer BuildsOperationUploadFolder::FetchChunk(const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, const IoHash& ChunkHash, ReadFileCache& OpenFileCache) { ZEN_TRACE_CPU("FetchChunk"); auto It = Lookup.ChunkHashToChunkIndex.find(ChunkHash); ZEN_ASSERT(It != Lookup.ChunkHashToChunkIndex.end()); uint32_t ChunkIndex = It->second; std::span ChunkLocations = GetChunkSequenceLocations(Lookup, ChunkIndex); ZEN_ASSERT(!ChunkLocations.empty()); CompositeBuffer Chunk = OpenFileCache.GetRange(ChunkLocations[0].SequenceIndex, ChunkLocations[0].Offset, Content.ChunkedContent.ChunkRawSizes[ChunkIndex]); if (!Chunk) { throw std::runtime_error(fmt::format("Unable to read chunk at {}, size {} from '{}'", ChunkLocations[0].Offset, Content.ChunkedContent.ChunkRawSizes[ChunkIndex], Content.Paths[Lookup.SequenceIndexFirstPathIndex[ChunkLocations[0].SequenceIndex]])); } ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == ChunkHash); return Chunk; }; CompressedBuffer BuildsOperationUploadFolder::GenerateBlock(const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, const std::vector& ChunksInBlock, ChunkBlockDescription& OutBlockDescription) { ZEN_TRACE_CPU("GenerateBlock"); ReadFileCache OpenFileCache(m_DiskStats.OpenReadCount, m_DiskStats.CurrentOpenFileCount, m_DiskStats.ReadCount, m_DiskStats.ReadByteCount, m_Path, Content, Lookup, 4); std::vector> BlockContent; BlockContent.reserve(ChunksInBlock.size()); for (uint32_t ChunkIndex : ChunksInBlock) { BlockContent.emplace_back(std::make_pair( Content.ChunkedContent.ChunkHashes[ChunkIndex], [this, &Content, &Lookup, &OpenFileCache, ChunkIndex](const IoHash& ChunkHash) -> std::pair { CompositeBuffer Chunk = FetchChunk(Content, Lookup, ChunkHash, OpenFileCache); ZEN_ASSERT(Chunk); uint64_t RawSize = Chunk.GetSize(); const bool ShouldCompressChunk = RawSize >= m_Options.MinimumSizeForCompressInBlock && IsChunkCompressable(m_NonCompressableExtensionHashes, Content, Lookup, ChunkIndex); const OodleCompressionLevel CompressionLevel = ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None; return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid, CompressionLevel)}; })); } return GenerateChunkBlock(std::move(BlockContent), OutBlockDescription); }; CompressedBuffer BuildsOperationUploadFolder::RebuildBlock(const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, CompositeBuffer&& HeaderBuffer, const std::vector& ChunksInBlock) { ZEN_TRACE_CPU("RebuildBlock"); ReadFileCache OpenFileCache(m_DiskStats.OpenReadCount, m_DiskStats.CurrentOpenFileCount, m_DiskStats.ReadCount, m_DiskStats.ReadByteCount, m_Path, Content, Lookup, 4); std::vector ResultBuffers; ResultBuffers.reserve(HeaderBuffer.GetSegments().size() + ChunksInBlock.size()); ResultBuffers.insert(ResultBuffers.end(), HeaderBuffer.GetSegments().begin(), HeaderBuffer.GetSegments().end()); for (uint32_t ChunkIndex : ChunksInBlock) { std::span ChunkLocations = GetChunkSequenceLocations(Lookup, ChunkIndex); ZEN_ASSERT(!ChunkLocations.empty()); CompositeBuffer Chunk = OpenFileCache.GetRange(ChunkLocations[0].SequenceIndex, ChunkLocations[0].Offset, Content.ChunkedContent.ChunkRawSizes[ChunkIndex]); ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == Content.ChunkedContent.ChunkHashes[ChunkIndex]); const uint64_t RawSize = Chunk.GetSize(); const bool ShouldCompressChunk = RawSize >= m_Options.MinimumSizeForCompressInBlock && IsChunkCompressable(m_NonCompressableExtensionHashes, Content, Lookup, ChunkIndex); const OodleCompressionLevel CompressionLevel = ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None; CompositeBuffer CompressedChunk = CompressedBuffer::Compress(std::move(Chunk), OodleCompressor::Mermaid, CompressionLevel).GetCompressed(); ResultBuffers.insert(ResultBuffers.end(), CompressedChunk.GetSegments().begin(), CompressedChunk.GetSegments().end()); } return CompressedBuffer::FromCompressedNoValidate(CompositeBuffer(std::move(ResultBuffers))); }; void BuildsOperationUploadFolder::UploadBuildPart(ChunkingController& ChunkController, ChunkingCache& ChunkCache, uint32_t PartIndex, const UploadPart& Part, uint32_t PartStepOffset, uint32_t StepCount) { Stopwatch UploadTimer; ChunkingStatistics ChunkingStats; FindBlocksStatistics FindBlocksStats; ReuseBlocksStatistics ReuseBlocksStats; UploadStatistics UploadStats; GenerateBlocksStatistics GenerateBlocksStats; LooseChunksStatistics LooseChunksStats; ChunkedFolderContent LocalContent; m_LogOutput.SetLogOperationProgress(PartStepOffset + (uint32_t)PartTaskSteps::ChunkPartContent, StepCount); Stopwatch ScanTimer; { std::unique_ptr ProgressBarPtr(m_LogOutput.CreateProgressBar("Scan Folder")); OperationLogOutput::ProgressBar& Progress(*ProgressBarPtr); FilteredRate FilteredBytesHashed; FilteredBytesHashed.Start(); LocalContent = ChunkFolderContent( ChunkingStats, m_IOWorkerPool, m_Path, Part.Content, ChunkController, ChunkCache, m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t) { FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load()); std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found", ChunkingStats.FilesProcessed.load(), Part.Content.Paths.size(), NiceBytes(ChunkingStats.BytesHashed.load()), NiceBytes(Part.TotalRawSize), NiceNum(FilteredBytesHashed.GetCurrent()), ChunkingStats.UniqueChunksFound.load(), NiceBytes(ChunkingStats.UniqueBytesFound.load())); Progress.UpdateState({.Task = "Scanning files ", .Details = Details, .TotalCount = Part.TotalRawSize, .RemainingCount = Part.TotalRawSize - ChunkingStats.BytesHashed.load(), .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }, m_AbortFlag, m_PauseFlag); FilteredBytesHashed.Stop(); Progress.Finish(); if (m_AbortFlag) { return; } } if (!m_Options.IsQuiet) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Found {} ({}) files divided into {} ({}) unique chunks in '{}' in {}. Average hash rate {}B/sec", Part.Content.Paths.size(), NiceBytes(Part.TotalRawSize), ChunkingStats.UniqueChunksFound.load(), NiceBytes(ChunkingStats.UniqueBytesFound.load()), m_Path, NiceTimeSpanMs(ScanTimer.GetElapsedTimeMs()), NiceNum(GetBytesPerSecond(ChunkingStats.ElapsedWallTimeUS, ChunkingStats.BytesHashed))); } const ChunkedContentLookup LocalLookup = BuildChunkedContentLookup(LocalContent); std::vector ReuseBlockIndexes; std::vector NewBlockChunkIndexes; if (PartIndex == 0) { const PrepareBuildResult PrepBuildResult = m_PrepBuildResultFuture.get(); m_FindBlocksStats.FindBlockTimeMS = PrepBuildResult.ElapsedTimeMs; m_FindBlocksStats.FoundBlockCount = PrepBuildResult.KnownBlocks.size(); if (!m_Options.IsQuiet) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Build prepare took {}. {} took {}, payload size {}{}", NiceTimeSpanMs(PrepBuildResult.ElapsedTimeMs), m_CreateBuild ? "PutBuild" : "GetBuild", NiceTimeSpanMs(PrepBuildResult.PrepareBuildTimeMs), NiceBytes(PrepBuildResult.PayloadSize), m_Options.IgnoreExistingBlocks ? "" : fmt::format(". Found {} blocks in {}", PrepBuildResult.KnownBlocks.size(), NiceTimeSpanMs(PrepBuildResult.FindBlocksTimeMs))); } m_PreferredMultipartChunkSize = PrepBuildResult.PreferredMultipartChunkSize; m_LargeAttachmentSize = m_Options.AllowMultiparts ? m_PreferredMultipartChunkSize * 4u : (std::uint64_t)-1; m_KnownBlocks = std::move(PrepBuildResult.KnownBlocks); } ZEN_ASSERT(m_PreferredMultipartChunkSize != 0); ZEN_ASSERT(m_LargeAttachmentSize != 0); m_LogOutput.SetLogOperationProgress(PartStepOffset + (uint32_t)PartTaskSteps::CalculateDelta, StepCount); Stopwatch BlockArrangeTimer; std::vector LooseChunkIndexes; { bool EnableBlocks = true; std::vector BlockChunkIndexes; for (uint32_t ChunkIndex = 0; ChunkIndex < LocalContent.ChunkedContent.ChunkHashes.size(); ChunkIndex++) { const uint64_t ChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; if (!EnableBlocks || ChunkRawSize == 0 || ChunkRawSize > m_Options.BlockParameters.MaxChunkEmbedSize) { LooseChunkIndexes.push_back(ChunkIndex); LooseChunksStats.ChunkByteCount += ChunkRawSize; } else { BlockChunkIndexes.push_back(ChunkIndex); FindBlocksStats.PotentialChunkByteCount += ChunkRawSize; } } FindBlocksStats.PotentialChunkCount += BlockChunkIndexes.size(); LooseChunksStats.ChunkCount = LooseChunkIndexes.size(); if (m_Options.IgnoreExistingBlocks) { if (!m_Options.IsQuiet) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Ignoring any existing blocks in store"); } NewBlockChunkIndexes = std::move(BlockChunkIndexes); } else { ReuseBlockIndexes = FindReuseBlocks(m_LogOutput, m_Options.BlockReuseMinPercentLimit, m_Options.IsVerbose, ReuseBlocksStats, m_KnownBlocks, LocalContent.ChunkedContent.ChunkHashes, BlockChunkIndexes, NewBlockChunkIndexes); FindBlocksStats.AcceptedBlockCount += ReuseBlockIndexes.size(); for (const ChunkBlockDescription& Description : m_KnownBlocks) { for (uint32_t ChunkRawLength : Description.ChunkRawLengths) { FindBlocksStats.FoundBlockByteCount += ChunkRawLength; } FindBlocksStats.FoundBlockChunkCount += Description.ChunkRawHashes.size(); } } } std::vector> NewBlockChunks; ArrangeChunksIntoBlocks(LocalContent, LocalLookup, NewBlockChunkIndexes, NewBlockChunks); FindBlocksStats.NewBlocksCount += NewBlockChunks.size(); for (uint32_t ChunkIndex : NewBlockChunkIndexes) { FindBlocksStats.NewBlocksChunkByteCount += LocalContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; } FindBlocksStats.NewBlocksChunkCount += NewBlockChunkIndexes.size(); const double AcceptedByteCountPercent = FindBlocksStats.PotentialChunkByteCount > 0 ? (100.0 * ReuseBlocksStats.AcceptedRawByteCount / FindBlocksStats.PotentialChunkByteCount) : 0.0; const double AcceptedReduntantByteCountPercent = ReuseBlocksStats.AcceptedByteCount > 0 ? (100.0 * ReuseBlocksStats.AcceptedReduntantByteCount) / (ReuseBlocksStats.AcceptedByteCount + ReuseBlocksStats.AcceptedReduntantByteCount) : 0.0; if (!m_Options.IsQuiet) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Found {} chunks in {} ({}) blocks eligible for reuse in {}\n" " Reusing {} ({}) matching chunks in {} blocks ({:.1f}%)\n" " Accepting {} ({}) redundant chunks ({:.1f}%)\n" " Rejected {} ({}) chunks in {} blocks\n" " Arranged {} ({}) chunks in {} new blocks\n" " Keeping {} ({}) chunks as loose chunks\n" " Discovery completed in {}", FindBlocksStats.FoundBlockChunkCount, FindBlocksStats.FoundBlockCount, NiceBytes(FindBlocksStats.FoundBlockByteCount), NiceTimeSpanMs(FindBlocksStats.FindBlockTimeMS), ReuseBlocksStats.AcceptedChunkCount, NiceBytes(ReuseBlocksStats.AcceptedRawByteCount), FindBlocksStats.AcceptedBlockCount, AcceptedByteCountPercent, ReuseBlocksStats.AcceptedReduntantChunkCount, NiceBytes(ReuseBlocksStats.AcceptedReduntantByteCount), AcceptedReduntantByteCountPercent, ReuseBlocksStats.RejectedChunkCount, NiceBytes(ReuseBlocksStats.RejectedByteCount), ReuseBlocksStats.RejectedBlockCount, FindBlocksStats.NewBlocksChunkCount, NiceBytes(FindBlocksStats.NewBlocksChunkByteCount), FindBlocksStats.NewBlocksCount, LooseChunksStats.ChunkCount, NiceBytes(LooseChunksStats.ChunkByteCount), NiceTimeSpanMs(BlockArrangeTimer.GetElapsedTimeMs())); } m_LogOutput.SetLogOperationProgress(PartStepOffset + (uint32_t)PartTaskSteps::GenerateBlocks, StepCount); GeneratedBlocks NewBlocks; if (!NewBlockChunks.empty()) { Stopwatch GenerateBuildBlocksTimer; auto __ = MakeGuard([&]() { uint64_t BlockGenerateTimeUs = GenerateBuildBlocksTimer.GetElapsedTimeUs(); if (!m_Options.IsQuiet) { ZEN_OPERATION_LOG_INFO( m_LogOutput, "Generated {} ({}) and uploaded {} ({}) blocks in {}. Generate speed: {}B/sec. Transfer speed {}bits/sec.", GenerateBlocksStats.GeneratedBlockCount.load(), NiceBytes(GenerateBlocksStats.GeneratedBlockByteCount), UploadStats.BlockCount.load(), NiceBytes(UploadStats.BlocksBytes.load()), NiceTimeSpanMs(BlockGenerateTimeUs / 1000), NiceNum(GetBytesPerSecond(GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS, GenerateBlocksStats.GeneratedBlockByteCount)), NiceNum(GetBytesPerSecond(UploadStats.ElapsedWallTimeUS, UploadStats.BlocksBytes * 8))); } }); GenerateBuildBlocks(LocalContent, LocalLookup, NewBlockChunks, NewBlocks, GenerateBlocksStats, UploadStats); } m_LogOutput.SetLogOperationProgress(PartStepOffset + (uint32_t)PartTaskSteps::BuildPartManifest, StepCount); CbObject PartManifest; { CbObjectWriter PartManifestWriter; Stopwatch ManifestGenerationTimer; auto __ = MakeGuard([&]() { if (!m_Options.IsQuiet) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Generated build part manifest in {} ({})", NiceTimeSpanMs(ManifestGenerationTimer.GetElapsedTimeMs()), NiceBytes(PartManifestWriter.GetSaveSize())); } }); PartManifestWriter.BeginObject("chunker"sv); { PartManifestWriter.AddString("name"sv, ChunkController.GetName()); PartManifestWriter.AddObject("parameters"sv, ChunkController.GetParameters()); } PartManifestWriter.EndObject(); // chunker std::vector AllChunkBlockHashes; std::vector AllChunkBlockDescriptions; AllChunkBlockHashes.reserve(ReuseBlockIndexes.size() + NewBlocks.BlockDescriptions.size()); AllChunkBlockDescriptions.reserve(ReuseBlockIndexes.size() + NewBlocks.BlockDescriptions.size()); for (size_t ReuseBlockIndex : ReuseBlockIndexes) { AllChunkBlockDescriptions.push_back(m_KnownBlocks[ReuseBlockIndex]); AllChunkBlockHashes.push_back(m_KnownBlocks[ReuseBlockIndex].BlockHash); } AllChunkBlockDescriptions.insert(AllChunkBlockDescriptions.end(), NewBlocks.BlockDescriptions.begin(), NewBlocks.BlockDescriptions.end()); for (const ChunkBlockDescription& BlockDescription : NewBlocks.BlockDescriptions) { AllChunkBlockHashes.push_back(BlockDescription.BlockHash); } std::vector AbsoluteChunkHashes; if (m_Options.DoExtraContentValidation) { tsl::robin_map ChunkHashToAbsoluteChunkIndex; AbsoluteChunkHashes.reserve(LocalContent.ChunkedContent.ChunkHashes.size()); for (uint32_t ChunkIndex : LooseChunkIndexes) { ChunkHashToAbsoluteChunkIndex.insert({LocalContent.ChunkedContent.ChunkHashes[ChunkIndex], AbsoluteChunkHashes.size()}); AbsoluteChunkHashes.push_back(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]); } for (const ChunkBlockDescription& Block : AllChunkBlockDescriptions) { for (const IoHash& ChunkHash : Block.ChunkRawHashes) { ChunkHashToAbsoluteChunkIndex.insert({ChunkHash, AbsoluteChunkHashes.size()}); AbsoluteChunkHashes.push_back(ChunkHash); } } for (const IoHash& ChunkHash : LocalContent.ChunkedContent.ChunkHashes) { ZEN_ASSERT(AbsoluteChunkHashes[ChunkHashToAbsoluteChunkIndex.at(ChunkHash)] == ChunkHash); ZEN_ASSERT(LocalContent.ChunkedContent.ChunkHashes[LocalLookup.ChunkHashToChunkIndex.at(ChunkHash)] == ChunkHash); } for (const uint32_t ChunkIndex : LocalContent.ChunkedContent.ChunkOrders) { ZEN_ASSERT(AbsoluteChunkHashes[ChunkHashToAbsoluteChunkIndex.at(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex])] == LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]); ZEN_ASSERT(LocalLookup.ChunkHashToChunkIndex.at(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]) == ChunkIndex); } } std::vector AbsoluteChunkOrders = CalculateAbsoluteChunkOrders(LocalContent.ChunkedContent.ChunkHashes, LocalContent.ChunkedContent.ChunkOrders, LocalLookup.ChunkHashToChunkIndex, LooseChunkIndexes, AllChunkBlockDescriptions); if (m_Options.DoExtraContentValidation) { for (uint32_t ChunkOrderIndex = 0; ChunkOrderIndex < LocalContent.ChunkedContent.ChunkOrders.size(); ChunkOrderIndex++) { uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[ChunkOrderIndex]; uint32_t AbsoluteChunkIndex = AbsoluteChunkOrders[ChunkOrderIndex]; const IoHash& LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; const IoHash& AbsoluteChunkHash = AbsoluteChunkHashes[AbsoluteChunkIndex]; ZEN_ASSERT(LocalChunkHash == AbsoluteChunkHash); } } WriteBuildContentToCompactBinary(PartManifestWriter, LocalContent.Platform, LocalContent.Paths, LocalContent.RawHashes, LocalContent.RawSizes, LocalContent.Attributes, LocalContent.ChunkedContent.SequenceRawHashes, LocalContent.ChunkedContent.ChunkCounts, LocalContent.ChunkedContent.ChunkHashes, LocalContent.ChunkedContent.ChunkRawSizes, AbsoluteChunkOrders, LooseChunkIndexes, AllChunkBlockHashes); if (m_Options.DoExtraContentValidation) { ChunkedFolderContent VerifyFolderContent; std::vector OutAbsoluteChunkOrders; std::vector OutLooseChunkHashes; std::vector OutLooseChunkRawSizes; std::vector OutBlockRawHashes; ReadBuildContentFromCompactBinary(PartManifestWriter.Save(), VerifyFolderContent.Platform, VerifyFolderContent.Paths, VerifyFolderContent.RawHashes, VerifyFolderContent.RawSizes, VerifyFolderContent.Attributes, VerifyFolderContent.ChunkedContent.SequenceRawHashes, VerifyFolderContent.ChunkedContent.ChunkCounts, OutAbsoluteChunkOrders, OutLooseChunkHashes, OutLooseChunkRawSizes, OutBlockRawHashes); ZEN_ASSERT(OutBlockRawHashes == AllChunkBlockHashes); for (uint32_t OrderIndex = 0; OrderIndex < OutAbsoluteChunkOrders.size(); OrderIndex++) { uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[OrderIndex]; const IoHash LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; uint32_t VerifyChunkIndex = OutAbsoluteChunkOrders[OrderIndex]; const IoHash VerifyChunkHash = AbsoluteChunkHashes[VerifyChunkIndex]; ZEN_ASSERT(LocalChunkHash == VerifyChunkHash); } CalculateLocalChunkOrders(OutAbsoluteChunkOrders, OutLooseChunkHashes, OutLooseChunkRawSizes, AllChunkBlockDescriptions, VerifyFolderContent.ChunkedContent.ChunkHashes, VerifyFolderContent.ChunkedContent.ChunkRawSizes, VerifyFolderContent.ChunkedContent.ChunkOrders, m_Options.DoExtraContentValidation); ZEN_ASSERT(LocalContent.Paths == VerifyFolderContent.Paths); ZEN_ASSERT(LocalContent.RawHashes == VerifyFolderContent.RawHashes); ZEN_ASSERT(LocalContent.RawSizes == VerifyFolderContent.RawSizes); ZEN_ASSERT(LocalContent.Attributes == VerifyFolderContent.Attributes); ZEN_ASSERT(LocalContent.ChunkedContent.SequenceRawHashes == VerifyFolderContent.ChunkedContent.SequenceRawHashes); ZEN_ASSERT(LocalContent.ChunkedContent.ChunkCounts == VerifyFolderContent.ChunkedContent.ChunkCounts); for (uint32_t OrderIndex = 0; OrderIndex < LocalContent.ChunkedContent.ChunkOrders.size(); OrderIndex++) { uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[OrderIndex]; const IoHash LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; uint64_t LocalChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[LocalChunkIndex]; uint32_t VerifyChunkIndex = VerifyFolderContent.ChunkedContent.ChunkOrders[OrderIndex]; const IoHash VerifyChunkHash = VerifyFolderContent.ChunkedContent.ChunkHashes[VerifyChunkIndex]; uint64_t VerifyChunkRawSize = VerifyFolderContent.ChunkedContent.ChunkRawSizes[VerifyChunkIndex]; ZEN_ASSERT(LocalChunkHash == VerifyChunkHash); ZEN_ASSERT(LocalChunkRawSize == VerifyChunkRawSize); } } PartManifest = PartManifestWriter.Save(); } m_LogOutput.SetLogOperationProgress(PartStepOffset + (uint32_t)PartTaskSteps::UploadBuildPart, StepCount); Stopwatch PutBuildPartResultTimer; std::pair> PutBuildPartResult = m_Storage.BuildStorage->PutBuildPart(m_BuildId, Part.PartId, Part.PartName, PartManifest); if (!m_Options.IsQuiet) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "PutBuildPart took {}, payload size {}. {} attachments are needed.", NiceTimeSpanMs(PutBuildPartResultTimer.GetElapsedTimeMs()), NiceBytes(PartManifest.GetSize()), PutBuildPartResult.second.size()); } IoHash PartHash = PutBuildPartResult.first; auto UploadAttachments = [this, &LooseChunksStats, &UploadStats, &LocalContent, &LocalLookup, &NewBlockChunks, &NewBlocks, &LooseChunkIndexes]( std::span RawHashes, std::vector& OutUnknownChunks) { if (!m_AbortFlag) { UploadStatistics TempUploadStats; LooseChunksStatistics TempLooseChunksStats; Stopwatch TempUploadTimer; auto __ = MakeGuard([&]() { if (!m_Options.IsQuiet) { uint64_t TempChunkUploadTimeUs = TempUploadTimer.GetElapsedTimeUs(); ZEN_OPERATION_LOG_INFO( m_LogOutput, "Uploaded {} ({}) blocks. " "Compressed {} ({} {}B/s) and uploaded {} ({}) chunks. " "Transferred {} ({}bits/s) in {}", TempUploadStats.BlockCount.load(), NiceBytes(TempUploadStats.BlocksBytes), TempLooseChunksStats.CompressedChunkCount.load(), NiceBytes(TempLooseChunksStats.CompressedChunkBytes.load()), NiceNum(GetBytesPerSecond(TempLooseChunksStats.CompressChunksElapsedWallTimeUS, TempLooseChunksStats.ChunkByteCount)), TempUploadStats.ChunkCount.load(), NiceBytes(TempUploadStats.ChunksBytes), NiceBytes(TempUploadStats.BlocksBytes + TempUploadStats.ChunksBytes), NiceNum(GetBytesPerSecond(TempUploadStats.ElapsedWallTimeUS, TempUploadStats.ChunksBytes * 8)), NiceTimeSpanMs(TempChunkUploadTimeUs / 1000)); } }); UploadPartBlobs(LocalContent, LocalLookup, RawHashes, NewBlockChunks, NewBlocks, LooseChunkIndexes, m_LargeAttachmentSize, TempUploadStats, TempLooseChunksStats, OutUnknownChunks); UploadStats += TempUploadStats; LooseChunksStats += TempLooseChunksStats; } }; m_LogOutput.SetLogOperationProgress(PartStepOffset + (uint32_t)PartTaskSteps::UploadAttachments, StepCount); std::vector UnknownChunks; if (m_Options.IgnoreExistingBlocks) { if (m_Options.IsVerbose) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "PutBuildPart uploading all attachments, needs are: {}", FormatArray(PutBuildPartResult.second, "\n "sv)); } std::vector ForceUploadChunkHashes; ForceUploadChunkHashes.reserve(LooseChunkIndexes.size()); for (uint32_t ChunkIndex : LooseChunkIndexes) { ForceUploadChunkHashes.push_back(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]); } for (size_t BlockIndex = 0; BlockIndex < NewBlocks.BlockHeaders.size(); BlockIndex++) { if (NewBlocks.BlockHeaders[BlockIndex]) { // Block was not uploaded during generation ForceUploadChunkHashes.push_back(NewBlocks.BlockDescriptions[BlockIndex].BlockHash); } } UploadAttachments(ForceUploadChunkHashes, UnknownChunks); } else if (!PutBuildPartResult.second.empty()) { if (m_Options.IsVerbose) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "PutBuildPart needs attachments: {}", FormatArray(PutBuildPartResult.second, "\n "sv)); } UploadAttachments(PutBuildPartResult.second, UnknownChunks); } auto BuildUnkownChunksResponse = [](const std::vector& UnknownChunks, bool WillRetry) { return fmt::format( "The following build blobs was reported as needed for upload but was reported as existing at the start of the " "operation.{}{}", WillRetry ? " Treating this as a transient inconsistency issue and will attempt to retry finalization."sv : ""sv, FormatArray(UnknownChunks, "\n "sv)); }; if (!UnknownChunks.empty()) { ZEN_OPERATION_LOG_WARN(m_LogOutput, "{}", BuildUnkownChunksResponse(UnknownChunks, /*WillRetry*/ true)); } uint32_t FinalizeBuildPartRetryCount = 5; while (!m_AbortFlag && (FinalizeBuildPartRetryCount--) > 0) { Stopwatch FinalizeBuildPartTimer; std::vector Needs = m_Storage.BuildStorage->FinalizeBuildPart(m_BuildId, Part.PartId, PartHash); if (!m_Options.IsQuiet) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "FinalizeBuildPart took {}. {} attachments are missing.", NiceTimeSpanMs(FinalizeBuildPartTimer.GetElapsedTimeMs()), Needs.size()); } if (Needs.empty()) { break; } if (m_Options.IsVerbose) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "FinalizeBuildPart needs attachments: {}", FormatArray(Needs, "\n "sv)); } std::vector RetryUnknownChunks; UploadAttachments(Needs, RetryUnknownChunks); if (RetryUnknownChunks == UnknownChunks) { if (FinalizeBuildPartRetryCount > 0) { // Back off a bit Sleep(1000); } } else { UnknownChunks = RetryUnknownChunks; ZEN_OPERATION_LOG_WARN(m_LogOutput, "{}", BuildUnkownChunksResponse(UnknownChunks, /*WillRetry*/ FinalizeBuildPartRetryCount != 0)); } } if (!UnknownChunks.empty()) { throw std::runtime_error(BuildUnkownChunksResponse(UnknownChunks, /*WillRetry*/ false)); } if (!NewBlocks.BlockDescriptions.empty() && !m_AbortFlag) { uint64_t UploadBlockMetadataCount = 0; Stopwatch UploadBlockMetadataTimer; uint32_t FailedMetadataUploadCount = 1; int32_t MetadataUploadRetryCount = 3; while ((MetadataUploadRetryCount-- > 0) && (FailedMetadataUploadCount > 0)) { FailedMetadataUploadCount = 0; for (size_t BlockIndex = 0; BlockIndex < NewBlocks.BlockDescriptions.size(); BlockIndex++) { if (m_AbortFlag) { break; } const IoHash& BlockHash = NewBlocks.BlockDescriptions[BlockIndex].BlockHash; if (!NewBlocks.MetaDataHasBeenUploaded[BlockIndex]) { const CbObject BlockMetaData = BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]); if (m_Storage.BuildCacheStorage && m_Options.PopulateCache) { m_Storage.BuildCacheStorage->PutBlobMetadatas(m_BuildId, std::vector({BlockHash}), std::vector({BlockMetaData})); } bool MetadataSucceeded = m_Storage.BuildStorage->PutBlockMetadata(m_BuildId, BlockHash, BlockMetaData); if (MetadataSucceeded) { UploadStats.BlocksBytes += BlockMetaData.GetSize(); NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; UploadBlockMetadataCount++; } else { FailedMetadataUploadCount++; } } } } if (UploadBlockMetadataCount > 0) { uint64_t ElapsedUS = UploadBlockMetadataTimer.GetElapsedTimeUs(); UploadStats.ElapsedWallTimeUS += ElapsedUS; if (!m_Options.IsQuiet) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Uploaded metadata for {} blocks in {}", UploadBlockMetadataCount, NiceTimeSpanMs(ElapsedUS / 1000)); } } // The newly generated blocks are now known blocks so the next part upload can use those blocks as well m_KnownBlocks.insert(m_KnownBlocks.end(), NewBlocks.BlockDescriptions.begin(), NewBlocks.BlockDescriptions.end()); } m_LogOutput.SetLogOperationProgress(PartStepOffset + (uint32_t)PartTaskSteps::PutBuildPartStats, StepCount); m_Storage.BuildStorage->PutBuildPartStats( m_BuildId, Part.PartId, {{"totalSize", double(Part.LocalFolderScanStats.FoundFileByteCount.load())}, {"reusedRatio", AcceptedByteCountPercent / 100.0}, {"reusedBlockCount", double(FindBlocksStats.AcceptedBlockCount)}, {"reusedBlockByteCount", double(ReuseBlocksStats.AcceptedRawByteCount)}, {"newBlockCount", double(FindBlocksStats.NewBlocksCount)}, {"newBlockByteCount", double(FindBlocksStats.NewBlocksChunkByteCount)}, {"uploadedCount", double(UploadStats.BlockCount.load() + UploadStats.ChunkCount.load())}, {"uploadedByteCount", double(UploadStats.BlocksBytes.load() + UploadStats.ChunksBytes.load())}, {"uploadedBytesPerSec", double(GetBytesPerSecond(UploadStats.ElapsedWallTimeUS, UploadStats.ChunksBytes + UploadStats.BlocksBytes))}, {"elapsedTimeSec", double(UploadTimer.GetElapsedTimeMs() / 1000.0)}}); m_LocalFolderScanStats += Part.LocalFolderScanStats; m_ChunkingStats += ChunkingStats; m_FindBlocksStats += FindBlocksStats; m_ReuseBlocksStats += ReuseBlocksStats; m_UploadStats += UploadStats; m_GenerateBlocksStats += GenerateBlocksStats; m_LooseChunksStats += LooseChunksStats; } void BuildsOperationUploadFolder::UploadPartBlobs(const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, std::span RawHashes, const std::vector>& NewBlockChunks, GeneratedBlocks& NewBlocks, std::span LooseChunkIndexes, const std::uint64_t LargeAttachmentSize, UploadStatistics& TempUploadStats, LooseChunksStatistics& TempLooseChunksStats, std::vector& OutUnknownChunks) { ZEN_TRACE_CPU("UploadPartBlobs"); { std::unique_ptr ProgressBarPtr(m_LogOutput.CreateProgressBar("Upload Blobs")); OperationLogOutput::ProgressBar& Progress(*ProgressBarPtr); WorkerThreadPool& ReadChunkPool = m_IOWorkerPool; WorkerThreadPool& UploadChunkPool = m_NetworkPool; FilteredRate FilteredGenerateBlockBytesPerSecond; FilteredRate FilteredCompressedBytesPerSecond; FilteredRate FilteredUploadedBytesPerSecond; ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); std::atomic UploadedBlockSize = 0; std::atomic UploadedBlockCount = 0; std::atomic UploadedRawChunkSize = 0; std::atomic UploadedCompressedChunkSize = 0; std::atomic UploadedChunkCount = 0; tsl::robin_map ChunkIndexToLooseChunkOrderIndex; ChunkIndexToLooseChunkOrderIndex.reserve(LooseChunkIndexes.size()); for (uint32_t OrderIndex = 0; OrderIndex < LooseChunkIndexes.size(); OrderIndex++) { ChunkIndexToLooseChunkOrderIndex.insert_or_assign(LooseChunkIndexes[OrderIndex], OrderIndex); } std::vector BlockIndexes; std::vector LooseChunkOrderIndexes; uint64_t TotalLooseChunksSize = 0; uint64_t TotalBlocksSize = 0; for (const IoHash& RawHash : RawHashes) { if (auto It = NewBlocks.BlockHashToBlockIndex.find(RawHash); It != NewBlocks.BlockHashToBlockIndex.end()) { BlockIndexes.push_back(It->second); TotalBlocksSize += NewBlocks.BlockSizes[It->second]; } else if (auto ChunkIndexIt = Lookup.ChunkHashToChunkIndex.find(RawHash); ChunkIndexIt != Lookup.ChunkHashToChunkIndex.end()) { const uint32_t ChunkIndex = ChunkIndexIt->second; if (auto LooseOrderIndexIt = ChunkIndexToLooseChunkOrderIndex.find(ChunkIndex); LooseOrderIndexIt != ChunkIndexToLooseChunkOrderIndex.end()) { LooseChunkOrderIndexes.push_back(LooseOrderIndexIt->second); TotalLooseChunksSize += Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; } } else { OutUnknownChunks.push_back(RawHash); } } if (BlockIndexes.empty() && LooseChunkOrderIndexes.empty()) { return; } uint64_t TotalRawSize = TotalLooseChunksSize + TotalBlocksSize; const size_t UploadBlockCount = BlockIndexes.size(); const uint32_t UploadChunkCount = gsl::narrow(LooseChunkOrderIndexes.size()); auto AsyncUploadBlock = [this, &Work, &NewBlocks, UploadBlockCount, &UploadedBlockCount, UploadChunkCount, &UploadedChunkCount, &UploadedBlockSize, &TempUploadStats, &FilteredUploadedBytesPerSecond, &UploadChunkPool](const size_t BlockIndex, const IoHash BlockHash, CompositeBuffer&& Payload, std::atomic& QueuedPendingInMemoryBlocksForUpload) { bool IsInMemoryBlock = true; if (QueuedPendingInMemoryBlocksForUpload.load() > 16) { ZEN_TRACE_CPU("AsyncUploadBlock_WriteTempBlock"); std::filesystem::path TempFilePath = m_Options.TempDir / (BlockHash.ToHexString()); Payload = CompositeBuffer(WriteToTempFile(std::move(Payload), TempFilePath)); IsInMemoryBlock = false; } else { QueuedPendingInMemoryBlocksForUpload++; } Work.ScheduleWork( UploadChunkPool, [this, &QueuedPendingInMemoryBlocksForUpload, &NewBlocks, UploadBlockCount, &UploadedBlockCount, UploadChunkCount, &UploadedChunkCount, &UploadedBlockSize, &TempUploadStats, &FilteredUploadedBytesPerSecond, IsInMemoryBlock, BlockIndex, BlockHash, Payload = CompositeBuffer(std::move(Payload))](std::atomic&) mutable { auto _ = MakeGuard([IsInMemoryBlock, &QueuedPendingInMemoryBlocksForUpload] { if (IsInMemoryBlock) { QueuedPendingInMemoryBlocksForUpload--; } }); if (!m_AbortFlag) { ZEN_TRACE_CPU("AsyncUploadBlock"); const uint64_t PayloadSize = Payload.GetSize(); FilteredUploadedBytesPerSecond.Start(); const CbObject BlockMetaData = BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]); if (m_Storage.BuildCacheStorage && m_Options.PopulateCache) { m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload); } m_Storage.BuildStorage->PutBuildBlob(m_BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload); if (m_Options.IsVerbose) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Uploaded block {} ({}) containing {} chunks", BlockHash, NiceBytes(PayloadSize), NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); } UploadedBlockSize += PayloadSize; TempUploadStats.BlocksBytes += PayloadSize; if (m_Storage.BuildCacheStorage && m_Options.PopulateCache) { m_Storage.BuildCacheStorage->PutBlobMetadatas(m_BuildId, std::vector({BlockHash}), std::vector({BlockMetaData})); } bool MetadataSucceeded = m_Storage.BuildStorage->PutBlockMetadata(m_BuildId, BlockHash, BlockMetaData); if (MetadataSucceeded) { if (m_Options.IsVerbose) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Uploaded block {} metadata ({})", BlockHash, NiceBytes(BlockMetaData.GetSize())); } NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; TempUploadStats.BlocksBytes += BlockMetaData.GetSize(); } TempUploadStats.BlockCount++; UploadedBlockCount++; if (UploadedBlockCount == UploadBlockCount && UploadedChunkCount == UploadChunkCount) { FilteredUploadedBytesPerSecond.Stop(); } } }); }; auto AsyncUploadLooseChunk = [this, LargeAttachmentSize, &Work, &UploadChunkPool, &FilteredUploadedBytesPerSecond, &UploadedBlockCount, &UploadedChunkCount, UploadBlockCount, UploadChunkCount, &UploadedCompressedChunkSize, &UploadedRawChunkSize, &TempUploadStats](const IoHash& RawHash, uint64_t RawSize, CompositeBuffer&& Payload) { Work.ScheduleWork( UploadChunkPool, [this, &Work, LargeAttachmentSize, &FilteredUploadedBytesPerSecond, &UploadChunkPool, &UploadedBlockCount, &UploadedChunkCount, UploadBlockCount, UploadChunkCount, &UploadedCompressedChunkSize, &UploadedRawChunkSize, &TempUploadStats, RawHash, RawSize, Payload = CompositeBuffer(std::move(Payload))](std::atomic&) mutable { if (!m_AbortFlag) { ZEN_TRACE_CPU("AsyncUploadLooseChunk"); const uint64_t PayloadSize = Payload.GetSize(); if (m_Storage.BuildCacheStorage && m_Options.PopulateCache) { m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); } if (PayloadSize >= LargeAttachmentSize) { ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart"); TempUploadStats.MultipartAttachmentCount++; std::vector> MultipartWork = m_Storage.BuildStorage->PutLargeBuildBlob( m_BuildId, RawHash, ZenContentType::kCompressedBinary, PayloadSize, [Payload = std::move(Payload), &FilteredUploadedBytesPerSecond](uint64_t Offset, uint64_t Size) mutable -> IoBuffer { FilteredUploadedBytesPerSecond.Start(); IoBuffer PartPayload = Payload.Mid(Offset, Size).Flatten().AsIoBuffer(); PartPayload.SetContentType(ZenContentType::kBinary); return PartPayload; }, [RawSize, &TempUploadStats, &UploadedCompressedChunkSize, &UploadChunkPool, &UploadedBlockCount, UploadBlockCount, &UploadedChunkCount, UploadChunkCount, &FilteredUploadedBytesPerSecond, &UploadedRawChunkSize](uint64_t SentBytes, bool IsComplete) { TempUploadStats.ChunksBytes += SentBytes; UploadedCompressedChunkSize += SentBytes; if (IsComplete) { TempUploadStats.ChunkCount++; UploadedChunkCount++; if (UploadedBlockCount == UploadBlockCount && UploadedChunkCount == UploadChunkCount) { FilteredUploadedBytesPerSecond.Stop(); } UploadedRawChunkSize += RawSize; } }); for (auto& WorkPart : MultipartWork) { Work.ScheduleWork(UploadChunkPool, [Work = std::move(WorkPart)](std::atomic& AbortFlag) { ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart_Work"); if (!AbortFlag) { Work(); } }); } if (m_Options.IsVerbose) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Uploaded multipart chunk {} ({})", RawHash, NiceBytes(PayloadSize)); } } else { ZEN_TRACE_CPU("AsyncUploadLooseChunk_Singlepart"); m_Storage.BuildStorage->PutBuildBlob(m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); if (m_Options.IsVerbose) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Uploaded chunk {} ({})", RawHash, NiceBytes(PayloadSize)); } TempUploadStats.ChunksBytes += Payload.GetSize(); TempUploadStats.ChunkCount++; UploadedCompressedChunkSize += Payload.GetSize(); UploadedRawChunkSize += RawSize; UploadedChunkCount++; if (UploadedChunkCount == UploadChunkCount) { FilteredUploadedBytesPerSecond.Stop(); } } } }); }; std::vector GenerateBlockIndexes; std::atomic GeneratedBlockCount = 0; std::atomic GeneratedBlockByteCount = 0; std::atomic QueuedPendingInMemoryBlocksForUpload = 0; // Start generation of any non-prebuilt blocks and schedule upload for (const size_t BlockIndex : BlockIndexes) { const IoHash& BlockHash = NewBlocks.BlockDescriptions[BlockIndex].BlockHash; if (!m_AbortFlag) { Work.ScheduleWork( ReadChunkPool, [this, BlockHash = IoHash(BlockHash), BlockIndex, &FilteredGenerateBlockBytesPerSecond, &Content, &Lookup, &NewBlocks, &NewBlockChunks, &GenerateBlockIndexes, &GeneratedBlockCount, &GeneratedBlockByteCount, &AsyncUploadBlock, &QueuedPendingInMemoryBlocksForUpload](std::atomic&) { if (!m_AbortFlag) { ZEN_TRACE_CPU("UploadPartBlobs_GenerateBlock"); FilteredGenerateBlockBytesPerSecond.Start(); Stopwatch GenerateTimer; CompositeBuffer Payload; if (NewBlocks.BlockHeaders[BlockIndex]) { Payload = RebuildBlock(Content, Lookup, std::move(NewBlocks.BlockHeaders[BlockIndex]), NewBlockChunks[BlockIndex]) .GetCompressed(); } else { ChunkBlockDescription BlockDescription; CompressedBuffer CompressedBlock = GenerateBlock(Content, Lookup, NewBlockChunks[BlockIndex], BlockDescription); if (!CompressedBlock) { throw std::runtime_error(fmt::format("Failed generating block {}", BlockHash)); } ZEN_ASSERT(BlockDescription.BlockHash == BlockHash); Payload = std::move(CompressedBlock).GetCompressed(); } GeneratedBlockByteCount += NewBlocks.BlockSizes[BlockIndex]; GeneratedBlockCount++; if (GeneratedBlockCount == GenerateBlockIndexes.size()) { FilteredGenerateBlockBytesPerSecond.Stop(); } if (m_Options.IsVerbose) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "{} block {} ({}) containing {} chunks in {}", NewBlocks.BlockHeaders[BlockIndex] ? "Regenerated" : "Generated", NewBlocks.BlockDescriptions[BlockIndex].BlockHash, NiceBytes(NewBlocks.BlockSizes[BlockIndex]), NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size(), NiceTimeSpanMs(GenerateTimer.GetElapsedTimeMs())); } if (!m_AbortFlag) { AsyncUploadBlock(BlockIndex, BlockHash, std::move(Payload), QueuedPendingInMemoryBlocksForUpload); } } }); } } // Start compression of any non-precompressed loose chunks and schedule upload for (const uint32_t LooseChunkOrderIndex : LooseChunkOrderIndexes) { const uint32_t ChunkIndex = LooseChunkIndexes[LooseChunkOrderIndex]; Work.ScheduleWork( ReadChunkPool, [this, &Content, &Lookup, &TempLooseChunksStats, &LooseChunkOrderIndexes, &FilteredCompressedBytesPerSecond, &TempUploadStats, &AsyncUploadLooseChunk, ChunkIndex](std::atomic&) { if (!m_AbortFlag) { ZEN_TRACE_CPU("UploadPartBlobs_CompressChunk"); FilteredCompressedBytesPerSecond.Start(); Stopwatch CompressTimer; CompositeBuffer Payload = CompressChunk(Content, Lookup, ChunkIndex, TempLooseChunksStats); if (m_Options.IsVerbose) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Compressed chunk {} ({} -> {}) in {}", Content.ChunkedContent.ChunkHashes[ChunkIndex], NiceBytes(Content.ChunkedContent.ChunkRawSizes[ChunkIndex]), NiceBytes(Payload.GetSize()), NiceTimeSpanMs(CompressTimer.GetElapsedTimeMs())); } const uint64_t ChunkRawSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; TempUploadStats.ReadFromDiskBytes += ChunkRawSize; if (TempLooseChunksStats.CompressedChunkCount == LooseChunkOrderIndexes.size()) { FilteredCompressedBytesPerSecond.Stop(); } if (!m_AbortFlag) { AsyncUploadLooseChunk(Content.ChunkedContent.ChunkHashes[ChunkIndex], ChunkRawSize, std::move(Payload)); } } }); } Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { ZEN_UNUSED(PendingWork); FilteredCompressedBytesPerSecond.Update(TempLooseChunksStats.CompressedChunkRawBytes.load()); FilteredGenerateBlockBytesPerSecond.Update(GeneratedBlockByteCount.load()); FilteredUploadedBytesPerSecond.Update(UploadedCompressedChunkSize.load() + UploadedBlockSize.load()); uint64_t UploadedRawSize = UploadedRawChunkSize.load() + UploadedBlockSize.load(); uint64_t UploadedCompressedSize = UploadedCompressedChunkSize.load() + UploadedBlockSize.load(); std::string Details = fmt::format( "Compressed {}/{} ({}/{}{}) chunks. " "Uploaded {}/{} ({}/{}) blobs " "({}{})", TempLooseChunksStats.CompressedChunkCount.load(), LooseChunkOrderIndexes.size(), NiceBytes(TempLooseChunksStats.CompressedChunkRawBytes), NiceBytes(TotalLooseChunksSize), (TempLooseChunksStats.CompressedChunkCount == LooseChunkOrderIndexes.size()) ? "" : fmt::format(" {}B/s", NiceNum(FilteredCompressedBytesPerSecond.GetCurrent())), UploadedBlockCount.load() + UploadedChunkCount.load(), UploadBlockCount + UploadChunkCount, NiceBytes(UploadedRawSize), NiceBytes(TotalRawSize), NiceBytes(UploadedCompressedSize), (UploadedBlockCount == UploadBlockCount && UploadedChunkCount == UploadChunkCount) ? "" : fmt::format(" {}bits/s", NiceNum(FilteredUploadedBytesPerSecond.GetCurrent()))); Progress.UpdateState({.Task = "Uploading blobs ", .Details = Details, .TotalCount = gsl::narrow(TotalRawSize), .RemainingCount = gsl::narrow(TotalRawSize - UploadedRawSize), .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); ZEN_ASSERT(m_AbortFlag || QueuedPendingInMemoryBlocksForUpload.load() == 0); Progress.Finish(); TempUploadStats.ElapsedWallTimeUS += FilteredUploadedBytesPerSecond.GetElapsedTimeUS(); TempLooseChunksStats.CompressChunksElapsedWallTimeUS += FilteredCompressedBytesPerSecond.GetElapsedTimeUS(); } } CompositeBuffer BuildsOperationUploadFolder::CompressChunk(const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, uint32_t ChunkIndex, LooseChunksStatistics& TempLooseChunksStats) { ZEN_TRACE_CPU("CompressChunk"); ZEN_ASSERT(!m_Options.TempDir.empty()); const IoHash& ChunkHash = Content.ChunkedContent.ChunkHashes[ChunkIndex]; const uint64_t ChunkSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; const ChunkedContentLookup::ChunkSequenceLocation& Source = GetChunkSequenceLocations(Lookup, ChunkIndex)[0]; const std::uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[Source.SequenceIndex]; IoBuffer RawSource = IoBufferBuilder::MakeFromFile((m_Path / Content.Paths[PathIndex]).make_preferred(), Source.Offset, ChunkSize); if (!RawSource) { throw std::runtime_error(fmt::format("Failed fetching chunk {}", ChunkHash)); } if (RawSource.GetSize() != ChunkSize) { throw std::runtime_error(fmt::format("Fetched chunk {} has invalid size", ChunkHash)); } const bool ShouldCompressChunk = IsChunkCompressable(m_NonCompressableExtensionHashes, Content, Lookup, ChunkIndex); const OodleCompressionLevel CompressionLevel = ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None; if (ShouldCompressChunk) { std::filesystem::path TempFilePath = m_Options.TempDir / ChunkHash.ToHexString(); BasicFile CompressedFile; std::error_code Ec; CompressedFile.Open(TempFilePath, BasicFile::Mode::kTruncateDelete, Ec); if (Ec) { throw std::runtime_error(fmt::format("Failed creating temporary file for compressing blob {}, reason: ({}) {}", ChunkHash, Ec.value(), Ec.message())); } uint64_t StreamRawBytes = 0; uint64_t StreamCompressedBytes = 0; bool CouldCompress = CompressedBuffer::CompressToStream( CompositeBuffer(SharedBuffer(RawSource)), [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { ZEN_UNUSED(SourceOffset); TempLooseChunksStats.CompressedChunkRawBytes += SourceSize; CompressedFile.Write(RangeBuffer, Offset); TempLooseChunksStats.CompressedChunkBytes += RangeBuffer.GetSize(); StreamRawBytes += SourceSize; StreamCompressedBytes += RangeBuffer.GetSize(); }, OodleCompressor::Mermaid, CompressionLevel); if (CouldCompress) { uint64_t CompressedSize = CompressedFile.FileSize(); void* FileHandle = CompressedFile.Detach(); IoBuffer TempPayload = IoBuffer(IoBuffer::File, FileHandle, 0, CompressedSize, /*IsWholeFile*/ true); ZEN_ASSERT(TempPayload); TempPayload.SetDeleteOnClose(true); IoHash RawHash; uint64_t RawSize; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(TempPayload), RawHash, RawSize); ZEN_ASSERT(Compressed); ZEN_ASSERT(RawHash == ChunkHash); ZEN_ASSERT(RawSize == ChunkSize); TempLooseChunksStats.CompressedChunkCount++; return Compressed.GetCompressed(); } else { TempLooseChunksStats.CompressedChunkRawBytes -= StreamRawBytes; TempLooseChunksStats.CompressedChunkBytes -= StreamCompressedBytes; } CompressedFile.Close(); RemoveFile(TempFilePath, Ec); ZEN_UNUSED(Ec); } CompressedBuffer CompressedBlob = CompressedBuffer::Compress(SharedBuffer(std::move(RawSource)), OodleCompressor::Mermaid, CompressionLevel); if (!CompressedBlob) { throw std::runtime_error(fmt::format("Failed to compress large blob {}", ChunkHash)); } ZEN_ASSERT_SLOW(CompressedBlob.DecodeRawHash() == ChunkHash); ZEN_ASSERT_SLOW(CompressedBlob.DecodeRawSize() == ChunkSize); TempLooseChunksStats.CompressedChunkRawBytes += ChunkSize; TempLooseChunksStats.CompressedChunkBytes += CompressedBlob.GetCompressedSize(); // If we use none-compression, the compressed blob references the data and has 64 kb in memory so we don't need to write it to disk if (ShouldCompressChunk) { std::filesystem::path TempFilePath = m_Options.TempDir / (ChunkHash.ToHexString()); IoBuffer TempPayload = WriteToTempFile(std::move(CompressedBlob).GetCompressed(), TempFilePath); CompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(TempPayload)); } TempLooseChunksStats.CompressedChunkCount++; return std::move(CompressedBlob).GetCompressed(); } BuildsOperationValidateBuildPart::BuildsOperationValidateBuildPart(OperationLogOutput& OperationLogOutput, BuildStorageBase& Storage, std::atomic& AbortFlag, std::atomic& PauseFlag, WorkerThreadPool& IOWorkerPool, WorkerThreadPool& NetworkPool, const Oid& BuildId, const Oid& BuildPartId, const std::string_view BuildPartName, const Options& Options) : m_LogOutput(OperationLogOutput) , m_Storage(Storage) , m_AbortFlag(AbortFlag) , m_PauseFlag(PauseFlag) , m_IOWorkerPool(IOWorkerPool) , m_NetworkPool(NetworkPool) , m_BuildId(BuildId) , m_BuildPartId(BuildPartId) , m_BuildPartName(BuildPartName) , m_Options(Options) { } void BuildsOperationValidateBuildPart::Execute() { ZEN_TRACE_CPU("ValidateBuildPart"); try { enum class TaskSteps : uint32_t { FetchBuild, FetchBuildPart, ValidateBlobs, Cleanup, StepCount }; auto EndProgress = MakeGuard([&]() { m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::StepCount, (uint32_t)TaskSteps::StepCount); }); Stopwatch Timer; auto _ = MakeGuard([&]() { if (!m_Options.IsQuiet) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Validated build part {}/{} ('{}') in {}", m_BuildId, m_BuildPartId, m_BuildPartName, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } }); m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::FetchBuild, (uint32_t)TaskSteps::StepCount); CbObject Build = m_Storage.GetBuild(m_BuildId); if (!m_BuildPartName.empty()) { m_BuildPartId = Build["parts"sv].AsObjectView()[m_BuildPartName].AsObjectId(); if (m_BuildPartId == Oid::Zero) { throw std::runtime_error(fmt::format("Build {} does not have a part named '{}'", m_BuildId, m_BuildPartName)); } } m_ValidateStats.BuildBlobSize = Build.GetSize(); uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u; if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0) { PreferredMultipartChunkSize = ChunkSize; } m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::FetchBuildPart, (uint32_t)TaskSteps::StepCount); CbObject BuildPart = m_Storage.GetBuildPart(m_BuildId, m_BuildPartId); m_ValidateStats.BuildPartSize = BuildPart.GetSize(); if (!m_Options.IsQuiet) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Validating build part {}/{} ({})", m_BuildId, m_BuildPartId, NiceBytes(BuildPart.GetSize())); } std::vector ChunkAttachments; if (const CbObjectView ChunkAttachmentsView = BuildPart["chunkAttachments"sv].AsObjectView()) { for (CbFieldView LooseFileView : ChunkAttachmentsView["rawHashes"sv]) { ChunkAttachments.push_back(LooseFileView.AsBinaryAttachment()); } } m_ValidateStats.ChunkAttachmentCount = ChunkAttachments.size(); std::vector BlockAttachments; if (const CbObjectView BlockAttachmentsView = BuildPart["blockAttachments"sv].AsObjectView()) { { for (CbFieldView BlocksView : BlockAttachmentsView["rawHashes"sv]) { BlockAttachments.push_back(BlocksView.AsBinaryAttachment()); } } } m_ValidateStats.BlockAttachmentCount = BlockAttachments.size(); std::vector VerifyBlockDescriptions = ParseChunkBlockDescriptionList(m_Storage.GetBlockMetadatas(m_BuildId, BlockAttachments)); if (VerifyBlockDescriptions.size() != BlockAttachments.size()) { throw std::runtime_error(fmt::format("Uploaded blocks metadata could not all be found, {} blocks metadata is missing", BlockAttachments.size() - VerifyBlockDescriptions.size())); } ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); const std::filesystem::path TempFolder = ".zen-tmp"; CleanAndRemoveDirectory(m_IOWorkerPool, m_AbortFlag, m_PauseFlag, TempFolder); CreateDirectories(TempFolder); auto __ = MakeGuard([this, TempFolder]() { CleanAndRemoveDirectory(m_IOWorkerPool, m_AbortFlag, m_PauseFlag, TempFolder); }); m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::ValidateBlobs, (uint32_t)TaskSteps::StepCount); std::unique_ptr ProgressBarPtr(m_LogOutput.CreateProgressBar("Validate Blobs")); OperationLogOutput::ProgressBar& Progress(*ProgressBarPtr); uint64_t AttachmentsToVerifyCount = ChunkAttachments.size() + BlockAttachments.size(); FilteredRate FilteredDownloadedBytesPerSecond; FilteredRate FilteredVerifiedBytesPerSecond; std::atomic MultipartAttachmentCount = 0; for (const IoHash& ChunkAttachment : ChunkAttachments) { Work.ScheduleWork( m_NetworkPool, [this, &Work, AttachmentsToVerifyCount, &TempFolder, PreferredMultipartChunkSize, &FilteredDownloadedBytesPerSecond, &FilteredVerifiedBytesPerSecond, &ChunkAttachments, ChunkAttachment = IoHash(ChunkAttachment)](std::atomic&) { if (!m_AbortFlag) { ZEN_TRACE_CPU("ValidateBuildPart_GetChunk"); FilteredDownloadedBytesPerSecond.Start(); DownloadLargeBlob( m_Storage, TempFolder, m_BuildId, ChunkAttachment, PreferredMultipartChunkSize, Work, m_NetworkPool, m_DownloadStats.DownloadedChunkByteCount, m_DownloadStats.MultipartAttachmentCount, [this, &Work, AttachmentsToVerifyCount, &FilteredDownloadedBytesPerSecond, &FilteredVerifiedBytesPerSecond, ChunkHash = IoHash(ChunkAttachment)](IoBuffer&& Payload) { m_DownloadStats.DownloadedChunkCount++; Payload.SetContentType(ZenContentType::kCompressedBinary); if (!m_AbortFlag) { Work.ScheduleWork( m_IOWorkerPool, [this, AttachmentsToVerifyCount, &FilteredDownloadedBytesPerSecond, &FilteredVerifiedBytesPerSecond, Payload = IoBuffer(std::move(Payload)), ChunkHash](std::atomic&) mutable { if (!m_AbortFlag) { ZEN_TRACE_CPU("ValidateBuildPart_Validate"); if (m_DownloadStats.DownloadedChunkCount + m_DownloadStats.DownloadedBlockCount == AttachmentsToVerifyCount) { FilteredDownloadedBytesPerSecond.Stop(); } FilteredVerifiedBytesPerSecond.Start(); uint64_t CompressedSize; uint64_t DecompressedSize; ValidateBlob(m_AbortFlag, std::move(Payload), ChunkHash, CompressedSize, DecompressedSize); m_ValidateStats.VerifiedAttachmentCount++; m_ValidateStats.VerifiedByteCount += DecompressedSize; if (m_ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount) { FilteredVerifiedBytesPerSecond.Stop(); } } }); } }); } }); } for (const IoHash& BlockAttachment : BlockAttachments) { Work.ScheduleWork( m_NetworkPool, [this, &Work, AttachmentsToVerifyCount, &FilteredDownloadedBytesPerSecond, &FilteredVerifiedBytesPerSecond, BlockAttachment = IoHash(BlockAttachment)](std::atomic&) { if (!m_AbortFlag) { ZEN_TRACE_CPU("ValidateBuildPart_GetBlock"); FilteredDownloadedBytesPerSecond.Start(); IoBuffer Payload = m_Storage.GetBuildBlob(m_BuildId, BlockAttachment); m_DownloadStats.DownloadedBlockCount++; m_DownloadStats.DownloadedBlockByteCount += Payload.GetSize(); if (m_DownloadStats.DownloadedChunkCount + m_DownloadStats.DownloadedBlockCount == AttachmentsToVerifyCount) { FilteredDownloadedBytesPerSecond.Stop(); } if (!Payload) { throw std::runtime_error(fmt::format("Block attachment {} could not be found", BlockAttachment)); } if (!m_AbortFlag) { Work.ScheduleWork( m_IOWorkerPool, [this, &FilteredVerifiedBytesPerSecond, AttachmentsToVerifyCount, Payload = std::move(Payload), BlockAttachment](std::atomic&) mutable { if (!m_AbortFlag) { ZEN_TRACE_CPU("ValidateBuildPart_ValidateBlock"); FilteredVerifiedBytesPerSecond.Start(); uint64_t CompressedSize; uint64_t DecompressedSize; ValidateChunkBlock(std::move(Payload), BlockAttachment, CompressedSize, DecompressedSize); m_ValidateStats.VerifiedAttachmentCount++; m_ValidateStats.VerifiedByteCount += DecompressedSize; if (m_ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount) { FilteredVerifiedBytesPerSecond.Stop(); } } }); } } }); } Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { ZEN_UNUSED(PendingWork); const uint64_t DownloadedAttachmentCount = m_DownloadStats.DownloadedChunkCount + m_DownloadStats.DownloadedBlockCount; const uint64_t DownloadedByteCount = m_DownloadStats.DownloadedChunkByteCount + m_DownloadStats.DownloadedBlockByteCount; FilteredDownloadedBytesPerSecond.Update(DownloadedByteCount); FilteredVerifiedBytesPerSecond.Update(m_ValidateStats.VerifiedByteCount); std::string Details = fmt::format("Downloaded {}/{} ({}, {}bits/s). Verified {}/{} ({}, {}B/s)", DownloadedAttachmentCount, AttachmentsToVerifyCount, NiceBytes(DownloadedByteCount), NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8), m_ValidateStats.VerifiedAttachmentCount.load(), AttachmentsToVerifyCount, NiceBytes(m_ValidateStats.VerifiedByteCount.load()), NiceNum(FilteredVerifiedBytesPerSecond.GetCurrent())); Progress.UpdateState( {.Task = "Validating blobs ", .Details = Details, .TotalCount = gsl::narrow(AttachmentsToVerifyCount * 2), .RemainingCount = gsl::narrow(AttachmentsToVerifyCount * 2 - (DownloadedAttachmentCount + m_ValidateStats.VerifiedAttachmentCount.load())), .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); Progress.Finish(); m_ValidateStats.ElapsedWallTimeUS = Timer.GetElapsedTimeUs(); m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::Cleanup, (uint32_t)TaskSteps::StepCount); } catch (const std::exception&) { m_AbortFlag = true; throw; } } BuildsOperationPrimeCache::BuildsOperationPrimeCache(OperationLogOutput& OperationLogOutput, StorageInstance& Storage, std::atomic& AbortFlag, std::atomic& PauseFlag, WorkerThreadPool& NetworkPool, const Oid& BuildId, std::span BuildPartIds, const Options& Options, BuildStorageCache::Statistics& StorageCacheStats) : m_LogOutput(OperationLogOutput) , m_Storage(Storage) , m_AbortFlag(AbortFlag) , m_PauseFlag(PauseFlag) , m_NetworkPool(NetworkPool) , m_BuildId(BuildId) , m_BuildPartIds(BuildPartIds.begin(), BuildPartIds.end()) , m_Options(Options) , m_StorageCacheStats(StorageCacheStats) { m_TempPath = m_Options.ZenFolderPath / "tmp"; CreateDirectories(m_TempPath); } void BuildsOperationPrimeCache::Execute() { ZEN_TRACE_CPU("BuildsOperationPrimeCache::Execute"); Stopwatch PrimeTimer; tsl::robin_map LooseChunkRawSizes; tsl::robin_set BuildBlobs; for (const Oid& BuildPartId : m_BuildPartIds) { CbObject BuildPart = m_Storage.BuildStorage->GetBuildPart(m_BuildId, BuildPartId); CbObjectView BlockAttachmentsView = BuildPart["blockAttachments"sv].AsObjectView(); std::vector BlockAttachments = compactbinary_helpers::ReadBinaryAttachmentArray("rawHashes"sv, BlockAttachmentsView); CbObjectView ChunkAttachmentsView = BuildPart["chunkAttachments"sv].AsObjectView(); std::vector ChunkAttachments = compactbinary_helpers::ReadBinaryAttachmentArray("rawHashes"sv, ChunkAttachmentsView); std::vector ChunkRawSizes = compactbinary_helpers::ReadArray("chunkRawSizes"sv, ChunkAttachmentsView); if (ChunkAttachments.size() != ChunkRawSizes.size()) { throw std::runtime_error(fmt::format("Mismatch of loose chunk raw size array, expected {}, found {}", ChunkAttachments.size(), ChunkRawSizes.size())); } BuildBlobs.reserve(ChunkAttachments.size() + BlockAttachments.size()); BuildBlobs.insert(BlockAttachments.begin(), BlockAttachments.end()); BuildBlobs.insert(ChunkAttachments.begin(), ChunkAttachments.end()); for (size_t ChunkAttachmentIndex = 0; ChunkAttachmentIndex < ChunkAttachments.size(); ChunkAttachmentIndex++) { LooseChunkRawSizes.insert_or_assign(ChunkAttachments[ChunkAttachmentIndex], ChunkRawSizes[ChunkAttachmentIndex]); } } if (!m_Options.IsQuiet) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Found {} referenced blobs", BuildBlobs.size()); } if (BuildBlobs.empty()) { return; } std::vector BlobsToDownload; BlobsToDownload.reserve(BuildBlobs.size()); if (m_Storage.BuildCacheStorage && !BuildBlobs.empty() && !m_Options.ForceUpload) { ZEN_TRACE_CPU("BlobCacheExistCheck"); Stopwatch Timer; const std::vector BlobHashes(BuildBlobs.begin(), BuildBlobs.end()); const std::vector CacheExistsResult = m_Storage.BuildCacheStorage->BlobsExists(m_BuildId, BlobHashes); if (CacheExistsResult.size() == BlobHashes.size()) { for (size_t BlobIndex = 0; BlobIndex < BlobHashes.size(); BlobIndex++) { if (!CacheExistsResult[BlobIndex].HasBody) { BlobsToDownload.push_back(BlobHashes[BlobIndex]); } } size_t FoundCount = BuildBlobs.size() - BlobsToDownload.size(); if (FoundCount > 0 && !m_Options.IsQuiet) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Remote cache : Found {} out of {} needed blobs in {}", FoundCount, BuildBlobs.size(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } } } else { BlobsToDownload.insert(BlobsToDownload.end(), BuildBlobs.begin(), BuildBlobs.end()); } if (BlobsToDownload.empty()) { return; } std::atomic MultipartAttachmentCount; std::atomic CompletedDownloadCount; FilteredRate FilteredDownloadedBytesPerSecond; { std::unique_ptr ProgressBarPtr(m_LogOutput.CreateProgressBar("Downloading")); OperationLogOutput::ProgressBar& Progress(*ProgressBarPtr); ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); const size_t BlobCount = BlobsToDownload.size(); for (size_t BlobIndex = 0; BlobIndex < BlobCount; BlobIndex++) { Work.ScheduleWork( m_NetworkPool, [this, &Work, &BlobsToDownload, BlobCount, &LooseChunkRawSizes, &CompletedDownloadCount, &FilteredDownloadedBytesPerSecond, &MultipartAttachmentCount, BlobIndex](std::atomic&) { if (!m_AbortFlag) { const IoHash& BlobHash = BlobsToDownload[BlobIndex]; bool IsLargeBlob = false; if (auto It = LooseChunkRawSizes.find(BlobHash); It != LooseChunkRawSizes.end()) { IsLargeBlob = It->second >= m_Options.LargeAttachmentSize; } FilteredDownloadedBytesPerSecond.Start(); if (IsLargeBlob) { DownloadLargeBlob(*m_Storage.BuildStorage, m_TempPath, m_BuildId, BlobHash, m_Options.PreferredMultipartChunkSize, Work, m_NetworkPool, m_DownloadStats.DownloadedChunkByteCount, MultipartAttachmentCount, [this, BlobCount, BlobHash, &FilteredDownloadedBytesPerSecond, &CompletedDownloadCount]( IoBuffer&& Payload) { m_DownloadStats.DownloadedChunkCount++; m_DownloadStats.RequestsCompleteCount++; if (!m_AbortFlag) { if (Payload && m_Storage.BuildCacheStorage) { m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, BlobHash, ZenContentType::kCompressedBinary, CompositeBuffer(SharedBuffer(Payload))); } } CompletedDownloadCount++; if (CompletedDownloadCount == BlobCount) { FilteredDownloadedBytesPerSecond.Stop(); } }); } else { IoBuffer Payload = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlobHash); m_DownloadStats.DownloadedBlockCount++; m_DownloadStats.DownloadedBlockByteCount += Payload.GetSize(); m_DownloadStats.RequestsCompleteCount++; if (!m_AbortFlag) { if (Payload && m_Storage.BuildCacheStorage) { m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, BlobHash, ZenContentType::kCompressedBinary, CompositeBuffer(SharedBuffer(std::move(Payload)))); } } CompletedDownloadCount++; if (CompletedDownloadCount == BlobCount) { FilteredDownloadedBytesPerSecond.Stop(); } } } }); } Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { ZEN_UNUSED(PendingWork); uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() + m_DownloadStats.DownloadedBlockByteCount.load(); FilteredDownloadedBytesPerSecond.Update(DownloadedBytes); std::string DownloadRateString = (CompletedDownloadCount == BlobCount) ? "" : fmt::format(" {}bits/s", NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8)); std::string UploadDetails = m_Storage.BuildCacheStorage ? fmt::format(" {} ({}) uploaded.", m_StorageCacheStats.PutBlobCount.load(), NiceBytes(m_StorageCacheStats.PutBlobByteCount.load())) : ""; std::string Details = fmt::format("{}/{} ({}{}) downloaded.{}", CompletedDownloadCount.load(), BlobCount, NiceBytes(DownloadedBytes), DownloadRateString, UploadDetails); Progress.UpdateState({.Task = "Downloading", .Details = Details, .TotalCount = BlobCount, .RemainingCount = BlobCount - CompletedDownloadCount.load(), .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); FilteredDownloadedBytesPerSecond.Stop(); Progress.Finish(); } if (m_AbortFlag) { return; } if (m_Storage.BuildCacheStorage) { m_Storage.BuildCacheStorage->Flush(m_LogOutput.GetProgressUpdateDelayMS(), [this](intptr_t Remaining) -> bool { ZEN_UNUSED(Remaining); if (!m_Options.IsQuiet) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Waiting for {} blobs to finish upload to '{}'", Remaining, m_Storage.CacheName); } return !m_AbortFlag; }); } if (!m_Options.IsQuiet) { uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() + m_DownloadStats.DownloadedBlockByteCount.load(); ZEN_OPERATION_LOG_INFO(m_LogOutput, "Downloaded {} ({}bits/s) in {}. {} as multipart. Completed in {}", NiceBytes(DownloadedBytes), NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), DownloadedBytes * 8)), NiceTimeSpanMs(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS() / 1000), MultipartAttachmentCount.load(), NiceTimeSpanMs(PrimeTimer.GetElapsedTimeMs())); } } CompositeBuffer ValidateBlob(std::atomic& AbortFlag, BuildStorageBase& Storage, const Oid& BuildId, const IoHash& BlobHash, uint64_t& OutCompressedSize, uint64_t& OutDecompressedSize) { ZEN_TRACE_CPU("ValidateBlob"); IoBuffer Payload = Storage.GetBuildBlob(BuildId, BlobHash); if (!Payload) { throw std::runtime_error(fmt::format("Blob {} could not be found", BlobHash)); } return ValidateBlob(AbortFlag, std::move(Payload), BlobHash, OutCompressedSize, OutDecompressedSize); } ChunkBlockDescription BuildsOperationValidateBuildPart::ValidateChunkBlock(IoBuffer&& Payload, const IoHash& BlobHash, uint64_t& OutCompressedSize, uint64_t& OutDecompressedSize) { CompositeBuffer BlockBuffer = ValidateBlob(m_AbortFlag, std::move(Payload), BlobHash, OutCompressedSize, OutDecompressedSize); if (!BlockBuffer) { throw std::runtime_error(fmt::format("Chunk block blob {} is not compressed using 'None' compression level", BlobHash)); } return GetChunkBlockDescription(BlockBuffer.Flatten(), BlobHash); } std::vector> ResolveBuildPartNames(CbObjectView BuildObject, const Oid& BuildId, const std::vector& BuildPartIds, std::span BuildPartNames, std::uint64_t& OutPreferredMultipartChunkSize) { std::vector> Result; { CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView(); if (!PartsObject) { throw std::runtime_error("Build object does not have a 'parts' object"); } OutPreferredMultipartChunkSize = BuildObject["chunkSize"sv].AsUInt64(OutPreferredMultipartChunkSize); std::vector> AvailableParts; for (CbFieldView PartView : PartsObject) { const std::string BuildPartName = std::string(PartView.GetName()); const Oid BuildPartId = PartView.AsObjectId(); if (BuildPartId == Oid::Zero) { ExtendableStringBuilder<128> SB; for (CbFieldView ScanPartView : PartsObject) { SB.Append(fmt::format("\n {}: {}", ScanPartView.GetName(), ScanPartView.AsObjectId())); } throw std::runtime_error(fmt::format("Build object parts does not have a '{}' object id{}", BuildPartName, SB.ToView())); } AvailableParts.push_back({BuildPartId, BuildPartName}); } if (BuildPartIds.empty() && BuildPartNames.empty()) { Result = AvailableParts; } else { for (const std::string& BuildPartName : BuildPartNames) { if (auto It = std::find_if(AvailableParts.begin(), AvailableParts.end(), [&BuildPartName](const auto& Part) { return Part.second == BuildPartName; }); It != AvailableParts.end()) { Result.push_back(*It); } else { throw std::runtime_error(fmt::format("Build {} object does not have a part named '{}'", BuildId, BuildPartName)); } } for (const Oid& BuildPartId : BuildPartIds) { if (auto It = std::find_if(AvailableParts.begin(), AvailableParts.end(), [&BuildPartId](const auto& Part) { return Part.first == BuildPartId; }); It != AvailableParts.end()) { Result.push_back(*It); } else { throw std::runtime_error(fmt::format("Build {} object does not have a part with id '{}'", BuildId, BuildPartId)); } } } if (Result.empty()) { throw std::runtime_error(fmt::format("Build object does not have any parts", BuildId)); } } return Result; } ChunkedFolderContent GetRemoteContent(OperationLogOutput& Output, StorageInstance& Storage, const Oid& BuildId, const std::vector>& BuildParts, const BuildManifest& Manifest, std::span IncludeWildcards, std::span ExcludeWildcards, std::unique_ptr& OutChunkController, std::vector& OutPartContents, std::vector& OutBlockDescriptions, std::vector& OutLooseChunkHashes, bool IsQuiet, bool IsVerbose, bool DoExtraContentVerify) { ZEN_TRACE_CPU("GetRemoteContent"); Stopwatch GetBuildPartTimer; const Oid BuildPartId = BuildParts[0].first; const std::string_view BuildPartName = BuildParts[0].second; CbObject BuildPartManifest = Storage.BuildStorage->GetBuildPart(BuildId, BuildPartId); if (!IsQuiet) { ZEN_OPERATION_LOG_INFO(Output, "GetBuildPart {} ('{}') took {}. Payload size: {}", BuildPartId, BuildPartName, NiceTimeSpanMs(GetBuildPartTimer.GetElapsedTimeMs()), NiceBytes(BuildPartManifest.GetSize())); ZEN_OPERATION_LOG_INFO(Output, "{}", GetCbObjectAsNiceString(BuildPartManifest, " "sv, "\n"sv)); } { CbObjectView Chunker = BuildPartManifest["chunker"sv].AsObjectView(); std::string_view ChunkerName = Chunker["name"sv].AsString(); CbObjectView Parameters = Chunker["parameters"sv].AsObjectView(); OutChunkController = CreateChunkingController(ChunkerName, Parameters); } auto ParseBuildPartManifest = [&Output, IsQuiet, IsVerbose, DoExtraContentVerify]( StorageInstance& Storage, const Oid& BuildId, const Oid& BuildPartId, CbObject BuildPartManifest, std::span IncludeWildcards, std::span ExcludeWildcards, const BuildManifest::Part* OptionalManifest, ChunkedFolderContent& OutRemoteContent, std::vector& OutBlockDescriptions, std::vector& OutLooseChunkHashes) { std::vector AbsoluteChunkOrders; std::vector LooseChunkRawSizes; std::vector BlockRawHashes; ReadBuildContentFromCompactBinary(BuildPartManifest, OutRemoteContent.Platform, OutRemoteContent.Paths, OutRemoteContent.RawHashes, OutRemoteContent.RawSizes, OutRemoteContent.Attributes, OutRemoteContent.ChunkedContent.SequenceRawHashes, OutRemoteContent.ChunkedContent.ChunkCounts, AbsoluteChunkOrders, OutLooseChunkHashes, LooseChunkRawSizes, BlockRawHashes); // TODO: GetBlockDescriptions for all BlockRawHashes in one go - check for local block descriptions when we cache them { bool AttemptFallback = false; OutBlockDescriptions = GetBlockDescriptions(Output, *Storage.BuildStorage, Storage.BuildCacheStorage.get(), BuildId, BuildPartId, BlockRawHashes, AttemptFallback, IsQuiet, IsVerbose); } CalculateLocalChunkOrders(AbsoluteChunkOrders, OutLooseChunkHashes, LooseChunkRawSizes, OutBlockDescriptions, OutRemoteContent.ChunkedContent.ChunkHashes, OutRemoteContent.ChunkedContent.ChunkRawSizes, OutRemoteContent.ChunkedContent.ChunkOrders, DoExtraContentVerify); std::vector DeletedPaths; if (OptionalManifest) { tsl::robin_set PathsInManifest; PathsInManifest.reserve(OptionalManifest->Files.size()); for (const std::filesystem::path& ManifestPath : OptionalManifest->Files) { PathsInManifest.insert(ToLower(ManifestPath.generic_string())); } for (const std::filesystem::path& RemotePath : OutRemoteContent.Paths) { if (!PathsInManifest.contains(ToLower(RemotePath.generic_string()))) { DeletedPaths.push_back(RemotePath); } } } if (!IncludeWildcards.empty() || !ExcludeWildcards.empty()) { for (const std::filesystem::path& RemotePath : OutRemoteContent.Paths) { if (!IncludePath(IncludeWildcards, ExcludeWildcards, ToLower(RemotePath.generic_string()), /*CaseSensitive*/ true)) { DeletedPaths.push_back(RemotePath); } } } if (!DeletedPaths.empty()) { OutRemoteContent = DeletePathsFromChunkedContent(OutRemoteContent, DeletedPaths); InlineRemoveUnusedHashes(OutLooseChunkHashes, OutRemoteContent.ChunkedContent.ChunkHashes); } #if ZEN_BUILD_DEBUG ValidateChunkedFolderContent(OutRemoteContent, OutBlockDescriptions, OutLooseChunkHashes, IncludeWildcards, ExcludeWildcards); #endif // ZEN_BUILD_DEBUG }; auto FindManifest = [&Manifest](const Oid& BuildPartId, std::string_view BuildPartName) -> const BuildManifest::Part* { if (Manifest.Parts.empty()) { return nullptr; } if (Manifest.Parts.size() == 1) { if (Manifest.Parts[0].PartId == Oid::Zero && Manifest.Parts[0].PartName.empty()) { return &Manifest.Parts[0]; } } auto It = std::find_if(Manifest.Parts.begin(), Manifest.Parts.end(), [BuildPartId, BuildPartName](const BuildManifest::Part& Part) { if (Part.PartId != Oid::Zero) { return Part.PartId == BuildPartId; } if (!Part.PartName.empty()) { return Part.PartName == BuildPartName; } return false; }); if (It != Manifest.Parts.end()) { return &(*It); } return nullptr; }; OutPartContents.resize(1); ParseBuildPartManifest(Storage, BuildId, BuildPartId, BuildPartManifest, IncludeWildcards, ExcludeWildcards, FindManifest(BuildPartId, BuildPartName), OutPartContents[0], OutBlockDescriptions, OutLooseChunkHashes); ChunkedFolderContent RemoteContent; if (BuildParts.size() > 1) { std::vector OverlayBlockDescriptions; std::vector OverlayLooseChunkHashes; for (size_t PartIndex = 1; PartIndex < BuildParts.size(); PartIndex++) { const Oid& OverlayBuildPartId = BuildParts[PartIndex].first; const std::string& OverlayBuildPartName = BuildParts[PartIndex].second; Stopwatch GetOverlayBuildPartTimer; CbObject OverlayBuildPartManifest = Storage.BuildStorage->GetBuildPart(BuildId, OverlayBuildPartId); if (!IsQuiet) { ZEN_OPERATION_LOG_INFO(Output, "GetBuildPart {} ('{}') took {}. Payload size: {}", OverlayBuildPartId, OverlayBuildPartName, NiceTimeSpanMs(GetOverlayBuildPartTimer.GetElapsedTimeMs()), NiceBytes(OverlayBuildPartManifest.GetSize())); } ChunkedFolderContent OverlayPartContent; std::vector OverlayPartBlockDescriptions; std::vector OverlayPartLooseChunkHashes; ParseBuildPartManifest(Storage, BuildId, OverlayBuildPartId, OverlayBuildPartManifest, IncludeWildcards, ExcludeWildcards, FindManifest(OverlayBuildPartId, OverlayBuildPartName), OverlayPartContent, OverlayPartBlockDescriptions, OverlayPartLooseChunkHashes); OutPartContents.push_back(OverlayPartContent); OverlayBlockDescriptions.insert(OverlayBlockDescriptions.end(), OverlayPartBlockDescriptions.begin(), OverlayPartBlockDescriptions.end()); OverlayLooseChunkHashes.insert(OverlayLooseChunkHashes.end(), OverlayPartLooseChunkHashes.begin(), OverlayPartLooseChunkHashes.end()); } RemoteContent = MergeChunkedFolderContents(OutPartContents[0], std::span(OutPartContents).subspan(1)); { tsl::robin_set AllBlockHashes; for (const ChunkBlockDescription& Description : OutBlockDescriptions) { AllBlockHashes.insert(Description.BlockHash); } for (const ChunkBlockDescription& Description : OverlayBlockDescriptions) { if (!AllBlockHashes.contains(Description.BlockHash)) { AllBlockHashes.insert(Description.BlockHash); OutBlockDescriptions.push_back(Description); } } } { tsl::robin_set AllLooseChunkHashes(OutLooseChunkHashes.begin(), OutLooseChunkHashes.end()); for (const IoHash& OverlayLooseChunkHash : OverlayLooseChunkHashes) { if (!AllLooseChunkHashes.contains(OverlayLooseChunkHash)) { AllLooseChunkHashes.insert(OverlayLooseChunkHash); OutLooseChunkHashes.push_back(OverlayLooseChunkHash); } } } } else { RemoteContent = OutPartContents[0]; } return RemoteContent; } std::string GetCbObjectAsNiceString(CbObjectView Object, std::string_view Prefix, std::string_view Suffix) { ExtendableStringBuilder<512> SB; std::vector> NameStringValuePairs; for (CbFieldView Field : Object) { std::string_view Name = Field.GetName(); switch (CbValue Accessor = Field.GetValue(); Accessor.GetType()) { case CbFieldType::String: NameStringValuePairs.push_back({std::string(Name), std::string(Accessor.AsString())}); break; case CbFieldType::IntegerPositive: NameStringValuePairs.push_back({std::string(Name), fmt::format("{}", Accessor.AsIntegerPositive())}); break; case CbFieldType::IntegerNegative: NameStringValuePairs.push_back({std::string(Name), fmt::format("{}", Accessor.AsIntegerNegative())}); break; case CbFieldType::Float32: { const float Value = Accessor.AsFloat32(); if (std::isfinite(Value)) { NameStringValuePairs.push_back({std::string(Name), fmt::format("{:.9g}", Value)}); } else { NameStringValuePairs.push_back({std::string(Name), "null"}); } } break; case CbFieldType::Float64: { const double Value = Accessor.AsFloat64(); if (std::isfinite(Value)) { NameStringValuePairs.push_back({std::string(Name), fmt::format("{:.17g}", Value)}); } else { NameStringValuePairs.push_back({std::string(Name), "null"}); } } break; case CbFieldType::BoolFalse: NameStringValuePairs.push_back({std::string(Name), "false"}); break; case CbFieldType::BoolTrue: NameStringValuePairs.push_back({std::string(Name), "true"}); break; case CbFieldType::Hash: { NameStringValuePairs.push_back({std::string(Name), Accessor.AsHash().ToHexString()}); } break; case CbFieldType::Uuid: { StringBuilder Builder; Accessor.AsUuid().ToString(Builder); NameStringValuePairs.push_back({std::string(Name), Builder.ToString()}); } break; case CbFieldType::DateTime: { ExtendableStringBuilder<64> Builder; Builder << DateTime(Accessor.AsDateTimeTicks()).ToIso8601(); NameStringValuePairs.push_back({std::string(Name), Builder.ToString()}); } break; case CbFieldType::TimeSpan: { ExtendableStringBuilder<64> Builder; const TimeSpan Span(Accessor.AsTimeSpanTicks()); if (Span.GetDays() == 0) { Builder << Span.ToString("%h:%m:%s.%n"); } else { Builder << Span.ToString("%d.%h:%m:%s.%n"); } NameStringValuePairs.push_back({std::string(Name), Builder.ToString()}); break; } case CbFieldType::ObjectId: NameStringValuePairs.push_back({std::string(Name), Accessor.AsObjectId().ToString()}); break; } } std::string::size_type LongestKey = 0; for (const std::pair& KeyValue : NameStringValuePairs) { LongestKey = Max(KeyValue.first.length(), LongestKey); } for (const std::pair& KeyValue : NameStringValuePairs) { SB.Append(fmt::format("{}{:<{}}: {}{}", Prefix, KeyValue.first, LongestKey, KeyValue.second, Suffix)); } return SB.ToString(); } #if ZEN_WITH_TESTS namespace buildstorageoperations_testutils { struct TestState { TestState(const std::filesystem::path& InRootPath) : RootPath(InRootPath) , LogOutput(CreateStandardLogOutput(Log)) , ChunkController(CreateStandardChunkingController(StandardChunkingControllerSettings{})) , ChunkCache(CreateMemoryChunkingCache()) , WorkerPool(2) , NetworkPool(2) { } void Initialize() { StoragePath = RootPath / "storage"; TempPath = RootPath / "temp"; SystemRootDir = RootPath / "sysroot"; ZenFolderPath = RootPath / ".zen"; CreateDirectories(TempPath); CreateDirectories(StoragePath); Storage.BuildStorage = CreateFileBuildStorage(StoragePath, StorageStats, false); } void CreateSourceData(const std::filesystem::path& Source, std::span Paths, std::span Sizes) { const std::filesystem::path SourcePath = RootPath / Source; CreateDirectories(SourcePath); for (size_t FileIndex = 0; FileIndex < Paths.size(); FileIndex++) { const std::string& FilePath = Paths[FileIndex]; const uint64_t FileSize = Sizes[FileIndex]; IoBuffer FileData = FileSize > 0 ? CreateSemiRandomBlob(FileSize) : IoBuffer{}; WriteFile(SourcePath / FilePath, FileData); } } std::vector> Upload(const Oid& BuildId, const Oid& BuildPartId, const std::string_view BuildPartName, const std::filesystem::path& Source, const std::filesystem::path& ManifestPath) { const std::filesystem::path SourcePath = RootPath / Source; CbObject MetaData; BuildsOperationUploadFolder Upload(*LogOutput, Storage, AbortFlag, PauseFlag, WorkerPool, NetworkPool, BuildId, SourcePath, true, MetaData, BuildsOperationUploadFolder::Options{.TempDir = TempPath}); return Upload.Execute(BuildPartId, BuildPartName, ManifestPath, *ChunkController, *ChunkCache); } void ValidateUpload(const Oid& BuildId, const std::vector>& Parts) { for (auto Part : Parts) { BuildsOperationValidateBuildPart Validate(*LogOutput, *Storage.BuildStorage, AbortFlag, PauseFlag, WorkerPool, NetworkPool, BuildId, Part.first, Part.second, BuildsOperationValidateBuildPart::Options{}); Validate.Execute(); } } FolderContent Download(const Oid& BuildId, const Oid& BuildPartId, const std::string_view BuildPartName, const std::filesystem::path& Target, bool Append) { const std::filesystem::path TargetPath = RootPath / Target; CreateDirectories(TargetPath); uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u; CbObject BuildObject = Storage.BuildStorage->GetBuild(BuildId); std::vector PartIds; if (BuildPartId != Oid::Zero) { PartIds.push_back(BuildPartId); } std::vector PartNames; if (!BuildPartName.empty()) { PartNames.push_back(std::string(BuildPartName)); } std::vector> AllBuildParts = ResolveBuildPartNames(BuildObject, BuildId, PartIds, PartNames, PreferredMultipartChunkSize); std::vector PartContents; std::vector BlockDescriptions; std::vector LooseChunkHashes; ChunkedFolderContent RemoteContent = GetRemoteContent(*LogOutput, Storage, BuildId, AllBuildParts, {}, {}, {}, ChunkController, PartContents, BlockDescriptions, LooseChunkHashes, /*IsQuiet*/ false, /*IsVerbose*/ false, /*DoExtraContentVerify*/ true); GetFolderContentStatistics LocalFolderScanStats; struct ContentVisitor : public GetDirectoryContentVisitor { virtual void AsyncVisitDirectory(const std::filesystem::path& RelativeRoot, DirectoryContent&& Content) { RwLock::ExclusiveLockScope _(ExistingPathsLock); for (const std::filesystem::path& FileName : Content.FileNames) { if (RelativeRoot.empty()) { ExistingPaths.push_back(FileName); } else { ExistingPaths.push_back(RelativeRoot / FileName); } } } RwLock ExistingPathsLock; std::vector ExistingPaths; } Visitor; Latch PendingWorkCount(1); GetDirectoryContent(TargetPath, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive, Visitor, WorkerPool, PendingWorkCount); PendingWorkCount.CountDown(); PendingWorkCount.Wait(); FolderContent CurrentLocalFolderState = GetValidFolderContent( WorkerPool, LocalFolderScanStats, TargetPath, Visitor.ExistingPaths, [](uint64_t PathCount, uint64_t CompletedPathCount) { ZEN_UNUSED(PathCount, CompletedPathCount); }, 1000, AbortFlag, PauseFlag); ChunkingStatistics LocalChunkingStats; ChunkedFolderContent LocalContent = ChunkFolderContent( LocalChunkingStats, WorkerPool, TargetPath, CurrentLocalFolderState, *ChunkController, *ChunkCache, 1000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t) { ZEN_UNUSED(IsAborted, IsPaused); }, AbortFlag, PauseFlag); if (Append) { RemoteContent = ApplyChunkedContentOverlay(LocalContent, RemoteContent, {}, {}); } const ChunkedContentLookup LocalLookup = BuildChunkedContentLookup(LocalContent); const ChunkedContentLookup RemoteLookup = BuildChunkedContentLookup(RemoteContent); BuildsOperationUpdateFolder Download(*LogOutput, Storage, AbortFlag, PauseFlag, WorkerPool, NetworkPool, BuildId, TargetPath, LocalContent, LocalLookup, RemoteContent, RemoteLookup, BlockDescriptions, LooseChunkHashes, BuildsOperationUpdateFolder::Options{.SystemRootDir = SystemRootDir, .ZenFolderPath = ZenFolderPath, .ValidateCompletedSequences = true}); FolderContent ResultingState; Download.Execute(ResultingState); return ResultingState; } void ValidateDownload(std::span Paths, std::span Sizes, const std::filesystem::path& Source, const std::filesystem::path& Target, const FolderContent& DownloadContent) { const std::filesystem::path SourcePath = RootPath / Source; const std::filesystem::path TargetPath = RootPath / Target; CHECK_EQ(Paths.size(), DownloadContent.Paths.size()); tsl::robin_map ExpectedSizes; tsl::robin_map ExpectedHashes; for (size_t Index = 0; Index < Paths.size(); Index++) { const std::string LookupString = std::filesystem::path(Paths[Index]).generic_string(); ExpectedSizes.insert_or_assign(LookupString, Sizes[Index]); std::filesystem::path FilePath = SourcePath / Paths[Index]; const IoHash SourceHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(FilePath.make_preferred())); ExpectedHashes.insert_or_assign(LookupString, SourceHash); } for (size_t Index = 0; Index < DownloadContent.Paths.size(); Index++) { const std::string LookupString = std::filesystem::path(DownloadContent.Paths[Index]).generic_string(); auto SizeIt = ExpectedSizes.find(LookupString); CHECK_NE(SizeIt, ExpectedSizes.end()); CHECK_EQ(SizeIt->second, DownloadContent.RawSizes[Index]); std::filesystem::path FilePath = TargetPath / DownloadContent.Paths[Index]; const IoHash DownloadedHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(FilePath.make_preferred())); auto HashIt = ExpectedHashes.find(LookupString); CHECK_NE(HashIt, ExpectedHashes.end()); CHECK_EQ(HashIt->second, DownloadedHash); } } const std::filesystem::path RootPath; std::filesystem::path StoragePath; std::filesystem::path TempPath; std::filesystem::path SystemRootDir; std::filesystem::path ZenFolderPath; LoggerRef Log = ConsoleLog(); std::unique_ptr LogOutput; std::unique_ptr ChunkController; std::unique_ptr ChunkCache; StorageInstance Storage; BuildStorageBase::Statistics StorageStats; WorkerThreadPool WorkerPool; WorkerThreadPool NetworkPool; std::atomic AbortFlag; std::atomic PauseFlag; }; } // namespace buildstorageoperations_testutils TEST_CASE("buildstorageoperations.upload.folder") { using namespace buildstorageoperations_testutils; FastRandom BaseRandom; const size_t FileCount = 11; const std::string Paths[FileCount] = {{"file_1"}, {"file_2.exe"}, {"file_3.txt"}, {"dir_1/dir1_file_1.exe"}, {"dir_1/dir1_file_2.pdb"}, {"dir_1/dir1_file_3.txt"}, {"dir_2/dir2_dir1/dir2_dir1_file_1.exe"}, {"dir_2/dir2_dir1/dir2_dir1_file_2.pdb"}, {"dir_2/dir2_dir1/dir2_dir1_file_3.dll"}, {"dir_2/dir2_dir2/dir2_dir2_file_1.txt"}, {"dir_2/dir2_dir2/dir2_dir2_file_2.json"}}; const uint64_t Sizes[FileCount] = {6u * 1024u, 0, 798, 19u * 1024u, 7u * 1024u, 93, 31u * 1024u, 17u * 1024u, 13u * 1024u, 2u * 1024u, 3u * 1024u}; ScopedTemporaryDirectory SourceFolder; TestState State(SourceFolder.Path()); State.Initialize(); State.CreateSourceData("source", Paths, Sizes); const Oid BuildId = Oid::NewOid(); const Oid BuildPartId = Oid::NewOid(); const std::string BuildPartName = "default"; auto Result = State.Upload(BuildId, BuildPartId, BuildPartName, "source", {}); CHECK_EQ(Result.size(), 1u); CHECK_EQ(Result[0].first, BuildPartId); CHECK_EQ(Result[0].second, BuildPartName); State.ValidateUpload(BuildId, Result); FolderContent DownloadContent = State.Download(BuildId, Oid::Zero, {}, "download", /* Append */ false); CHECK_EQ(DownloadContent.Paths.size(), FileCount); State.ValidateDownload(Paths, Sizes, "source", "download", DownloadContent); } TEST_CASE("buildstorageoperations.upload.manifest") { using namespace buildstorageoperations_testutils; FastRandom BaseRandom; const size_t FileCount = 11; const std::string Paths[FileCount] = {{"file_1"}, {"file_2.exe"}, {"file_3.txt"}, {"dir_1/dir1_file_1.exe"}, {"dir_1/dir1_file_2.pdb"}, {"dir_1/dir1_file_3.txt"}, {"dir_2/dir2_dir1/dir2_dir1_file_1.exe"}, {"dir_2/dir2_dir1/dir2_dir1_file_2.pdb"}, {"dir_2/dir2_dir1/dir2_dir1_file_3.dll"}, {"dir_2/dir2_dir2/dir2_dir2_file_1.txt"}, {"dir_2/dir2_dir2/dir2_dir2_file_2.json"}}; const uint64_t Sizes[FileCount] = {6u * 1024u, 0, 798, 19u * 1024u, 7u * 1024u, 93, 31u * 1024u, 17u * 1024u, 13u * 1024u, 2u * 1024u, 3u * 1024u}; ScopedTemporaryDirectory SourceFolder; TestState State(SourceFolder.Path()); State.Initialize(); State.CreateSourceData("source", Paths, Sizes); std::span ManifestFiles(Paths); ManifestFiles = ManifestFiles.subspan(0, FileCount / 2); std::span ManifestSizes(Sizes); ManifestSizes = ManifestSizes.subspan(0, FileCount / 2); ExtendableStringBuilder<1024> Manifest; for (const std::string& FilePath : ManifestFiles) { Manifest << FilePath << "\n"; } WriteFile(State.RootPath / "manifest.txt", IoBuffer(IoBuffer::Wrap, Manifest.Data(), Manifest.Size())); const Oid BuildId = Oid::NewOid(); const Oid BuildPartId = Oid::NewOid(); const std::string BuildPartName = "default"; auto Result = State.Upload(BuildId, BuildPartId, BuildPartName, "source", State.RootPath / "manifest.txt"); CHECK_EQ(Result.size(), 1u); CHECK_EQ(Result[0].first, BuildPartId); CHECK_EQ(Result[0].second, BuildPartName); State.ValidateUpload(BuildId, Result); FolderContent DownloadContent = State.Download(BuildId, Oid::Zero, {}, "download", /* Append */ false); State.ValidateDownload(ManifestFiles, ManifestSizes, "source", "download", DownloadContent); } TEST_CASE("buildstorageoperations.memorychunkingcache") { using namespace buildstorageoperations_testutils; FastRandom BaseRandom; const size_t FileCount = 11; const std::string Paths[FileCount] = {{"file_1"}, {"file_2.exe"}, {"file_3.txt"}, {"dir_1/dir1_file_1.exe"}, {"dir_1/dir1_file_2.pdb"}, {"dir_1/dir1_file_3.txt"}, {"dir_2/dir2_dir1/dir2_dir1_file_1.exe"}, {"dir_2/dir2_dir1/dir2_dir1_file_2.pdb"}, {"dir_2/dir2_dir1/dir2_dir1_file_3.dll"}, {"dir_2/dir2_dir2/dir2_dir2_file_1.txt"}, {"dir_2/dir2_dir2/dir2_dir2_file_2.json"}}; const uint64_t Sizes[FileCount] = {6u * 1024u, 0, 798, 19u * 1024u, 7u * 1024u, 93, 31u * 1024u, 17u * 1024u, 13u * 1024u, 2u * 1024u, 3u * 1024u}; ScopedTemporaryDirectory SourceFolder; TestState State(SourceFolder.Path()); State.Initialize(); State.CreateSourceData("source", Paths, Sizes); const Oid BuildId = Oid::NewOid(); const Oid BuildPartId = Oid::NewOid(); const std::string BuildPartName = "default"; { const std::filesystem::path SourcePath = SourceFolder.Path() / "source"; CbObject MetaData; BuildsOperationUploadFolder Upload(*State.LogOutput, State.Storage, State.AbortFlag, State.PauseFlag, State.WorkerPool, State.NetworkPool, BuildId, SourcePath, true, MetaData, BuildsOperationUploadFolder::Options{.TempDir = State.TempPath}); auto Result = Upload.Execute(BuildPartId, BuildPartName, {}, *State.ChunkController, *State.ChunkCache); CHECK_EQ(Upload.m_ChunkingStats.FilesStoredInCache.load(), FileCount - 1); // Zero size files are not stored in cache CHECK_EQ(Upload.m_ChunkingStats.BytesStoredInCache.load(), std::accumulate(&Sizes[0], &Sizes[FileCount], uint64_t(0))); CHECK(Upload.m_ChunkingStats.ChunksStoredInCache.load() >= FileCount - 1); // Zero size files are not stored in cache CHECK_EQ(Result.size(), 1u); CHECK_EQ(Result[0].first, BuildPartId); CHECK_EQ(Result[0].second, BuildPartName); } auto Result = State.Upload(BuildId, BuildPartId, BuildPartName, "source", {}); const Oid BuildId2 = Oid::NewOid(); const Oid BuildPartId2 = Oid::NewOid(); { const std::filesystem::path SourcePath = SourceFolder.Path() / "source"; CbObject MetaData; BuildsOperationUploadFolder Upload(*State.LogOutput, State.Storage, State.AbortFlag, State.PauseFlag, State.WorkerPool, State.NetworkPool, BuildId2, SourcePath, true, MetaData, BuildsOperationUploadFolder::Options{.TempDir = State.TempPath}); Upload.Execute(BuildPartId2, BuildPartName, {}, *State.ChunkController, *State.ChunkCache); CHECK_EQ(Upload.m_ChunkingStats.FilesFoundInCache.load(), FileCount - 1); // Zero size files are not stored in cache CHECK_EQ(Upload.m_ChunkingStats.BytesFoundInCache.load(), std::accumulate(&Sizes[0], &Sizes[FileCount], uint64_t(0))); CHECK(Upload.m_ChunkingStats.ChunksFoundInCache.load() >= FileCount - 1); // Zero size files are not stored in cache } FolderContent DownloadContent = State.Download(BuildId2, BuildPartId2, {}, "download", /* Append */ false); State.ValidateDownload(Paths, Sizes, "source", "download", DownloadContent); } TEST_CASE("buildstorageoperations.upload.multipart") { // Disabled since it relies on authentication and specific block being present in cloud storage if (false) { using namespace buildstorageoperations_testutils; FastRandom BaseRandom; const size_t FileCount = 11; const std::string Paths[FileCount] = {{"file_1"}, {"file_2.exe"}, {"file_3.txt"}, {"dir_1/dir1_file_1.exe"}, {"dir_1/dir1_file_2.pdb"}, {"dir_1/dir1_file_3.txt"}, {"dir_2/dir2_dir1/dir2_dir1_file_1.exe"}, {"dir_2/dir2_dir1/dir2_dir1_file_2.pdb"}, {"dir_2/dir2_dir1/dir2_dir1_file_3.dll"}, {"dir_2/dir2_dir2/dir2_dir2_file_1.txt"}, {"dir_2/dir2_dir2/dir2_dir2_file_2.json"}}; const uint64_t Sizes[FileCount] = {6u * 1024u, 0, 798, 19u * 1024u, 7u * 1024u, 93, 31u * 1024u, 17u * 1024u, 13u * 1024u, 2u * 1024u, 3u * 1024u}; ScopedTemporaryDirectory SourceFolder; TestState State(SourceFolder.Path()); State.Initialize(); State.CreateSourceData("source", Paths, Sizes); std::span ManifestFiles1(Paths); ManifestFiles1 = ManifestFiles1.subspan(0, FileCount / 2); std::span ManifestSizes1(Sizes); ManifestSizes1 = ManifestSizes1.subspan(0, FileCount / 2); std::span ManifestFiles2(Paths); ManifestFiles2 = ManifestFiles2.subspan(FileCount / 2 - 1); std::span ManifestSizes2(Sizes); ManifestSizes2 = ManifestSizes2.subspan(FileCount / 2 - 1); const Oid BuildPart1Id = Oid::NewOid(); const std::string BuildPart1Name = "part1"; const Oid BuildPart2Id = Oid::NewOid(); const std::string BuildPart2Name = "part2"; { CbObjectWriter Writer; Writer.BeginObject("parts"sv); { Writer.BeginObject(BuildPart1Name); { Writer.AddObjectId("partId"sv, BuildPart1Id); Writer.BeginArray("files"sv); for (const std::string& ManifestFile : ManifestFiles1) { Writer.AddString(ManifestFile); } Writer.EndArray(); // files } Writer.EndObject(); // part1 Writer.BeginObject(BuildPart2Name); { Writer.AddObjectId("partId"sv, BuildPart2Id); Writer.BeginArray("files"sv); for (const std::string& ManifestFile : ManifestFiles2) { Writer.AddString(ManifestFile); } Writer.EndArray(); // files } Writer.EndObject(); // part2 } Writer.EndObject(); // parts ExtendableStringBuilder<1024> Manifest; CompactBinaryToJson(Writer.Save(), Manifest); WriteFile(State.RootPath / "manifest.json", IoBuffer(IoBuffer::Wrap, Manifest.Data(), Manifest.Size())); } const Oid BuildId = Oid::NewOid(); auto Result = State.Upload(BuildId, {}, {}, "source", State.RootPath / "manifest.json"); CHECK_EQ(Result.size(), 2u); CHECK_EQ(Result[0].first, BuildPart1Id); CHECK_EQ(Result[0].second, BuildPart1Name); CHECK_EQ(Result[1].first, BuildPart2Id); CHECK_EQ(Result[1].second, BuildPart2Name); State.ValidateUpload(BuildId, Result); FolderContent DownloadContent = State.Download(BuildId, Oid::Zero, {}, "download", /* Append */ false); State.ValidateDownload(Paths, Sizes, "source", "download", DownloadContent); FolderContent Part1DownloadContent = State.Download(BuildId, BuildPart1Id, {}, "download_part1", /* Append */ false); State.ValidateDownload(ManifestFiles1, ManifestSizes1, "source", "download_part1", Part1DownloadContent); FolderContent Part2DownloadContent = State.Download(BuildId, Oid::Zero, BuildPart2Name, "download_part2", /* Append */ false); State.ValidateDownload(ManifestFiles2, ManifestSizes2, "source", "download_part2", Part2DownloadContent); (void)State.Download(BuildId, BuildPart1Id, BuildPart1Name, "download_part1+2", /* Append */ false); FolderContent Part1And2DownloadContent = State.Download(BuildId, BuildPart2Id, {}, "download_part1+2", /* Append */ true); State.ValidateDownload(Paths, Sizes, "source", "download_part1+2", Part1And2DownloadContent); } } void buildstorageoperations_forcelink() { } #endif // ZEN_WITH_TESTS } // namespace zen