diff options
Diffstat (limited to 'src/zenremotestore/builds/buildstorageoperations.cpp')
| -rw-r--r-- | src/zenremotestore/builds/buildstorageoperations.cpp | 8319 |
1 files changed, 0 insertions, 8319 deletions
diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp deleted file mode 100644 index a04063c4c..000000000 --- a/src/zenremotestore/builds/buildstorageoperations.cpp +++ /dev/null @@ -1,8319 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include <zenremotestore/builds/buildstorageoperations.h> - -#include <zenremotestore/builds/buildcontent.h> -#include <zenremotestore/builds/buildmanifest.h> -#include <zenremotestore/builds/buildsavedstate.h> -#include <zenremotestore/builds/buildstorage.h> -#include <zenremotestore/builds/buildstoragecache.h> -#include <zenremotestore/builds/buildstorageutil.h> -#include <zenremotestore/chunking/chunkblock.h> -#include <zenremotestore/chunking/chunkingcache.h> -#include <zenremotestore/chunking/chunkingcontroller.h> -#include <zenremotestore/filesystemutils.h> -#include <zenremotestore/operationlogoutput.h> - -#include <zencore/basicfile.h> -#include <zencore/compactbinary.h> -#include <zencore/compactbinaryfile.h> -#include <zencore/compactbinaryutil.h> -#include <zencore/compactbinaryvalue.h> -#include <zencore/filesystem.h> -#include <zencore/fmtutils.h> -#include <zencore/parallelwork.h> -#include <zencore/scopeguard.h> -#include <zencore/string.h> -#include <zencore/timer.h> -#include <zencore/trace.h> -#include <zenutil/wildcard.h> - -#include <numeric> - -ZEN_THIRD_PARTY_INCLUDES_START -#include <tsl/robin_map.h> -#include <tsl/robin_set.h> -ZEN_THIRD_PARTY_INCLUDES_END - -#if ZEN_WITH_TESTS -# include <zencore/testing.h> -# include <zencore/testutils.h> -# include <zenhttp/httpclientauth.h> -# include <zenremotestore/builds/filebuildstorage.h> -#endif // ZEN_WITH_TESTS - -namespace zen { - -using namespace std::literals; - -namespace { - std::filesystem::path ZenTempCacheFolderPath(const std::filesystem::path& ZenFolderPath) - { - return ZenTempFolderPath(ZenFolderPath) / "cache"; // Decompressed and verified data - chunks & sequences - } - std::filesystem::path ZenTempBlockFolderPath(const std::filesystem::path& ZenFolderPath) - { - return ZenTempFolderPath(ZenFolderPath) / "blocks"; // Temp storage for whole and partial blocks - } - std::filesystem::path ZenTempDownloadFolderPath(const std::filesystem::path& ZenFolderPath) - { - return ZenTempFolderPath(ZenFolderPath) / "download"; // Temp storage for decompressed and validated chunks - } - - uint64_t GetBytesPerSecond(uint64_t ElapsedWallTimeUS, uint64_t Count) - { - if (ElapsedWallTimeUS == 0) - { - return 0; - } - return Count * 1000000 / ElapsedWallTimeUS; - } - - std::filesystem::path GetTempChunkedSequenceFileName(const std::filesystem::path& CacheFolderPath, const IoHash& RawHash) - { - return CacheFolderPath / (RawHash.ToHexString() + ".tmp"); - } - - std::filesystem::path GetFinalChunkedSequenceFileName(const std::filesystem::path& CacheFolderPath, const IoHash& RawHash) - { - return CacheFolderPath / RawHash.ToHexString(); - } - - bool CleanDirectory(OperationLogOutput& OperationLogOutput, - WorkerThreadPool& IOWorkerPool, - std::atomic<bool>& AbortFlag, - std::atomic<bool>& PauseFlag, - bool IsQuiet, - const std::filesystem::path& Path, - std::span<const std::string> ExcludeDirectories) - { - ZEN_TRACE_CPU("CleanDirectory"); - Stopwatch Timer; - - std::unique_ptr<OperationLogOutput::ProgressBar> ProgressBarPtr(OperationLogOutput.CreateProgressBar("Clean Folder")); - OperationLogOutput::ProgressBar& Progress(*ProgressBarPtr); - - CleanDirectoryResult Result = CleanDirectory( - IOWorkerPool, - AbortFlag, - PauseFlag, - Path, - ExcludeDirectories, - [&](const std::string_view Details, uint64_t TotalCount, uint64_t RemainingCount, bool IsPaused, bool IsAborted) { - Progress.UpdateState({.Task = "Cleaning folder ", - .Details = std::string(Details), - .TotalCount = TotalCount, - .RemainingCount = RemainingCount, - .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, - false); - }, - OperationLogOutput.GetProgressUpdateDelayMS()); - - Progress.Finish(); - - if (AbortFlag) - { - return false; - } - - uint64_t ElapsedTimeMs = Timer.GetElapsedTimeMs(); - - if (!Result.FailedRemovePaths.empty()) - { - ExtendableStringBuilder<512> SB; - for (size_t FailedPathIndex = 0; FailedPathIndex < Result.FailedRemovePaths.size(); FailedPathIndex++) - { - SB << fmt::format("\n '{}': ({}) {}", - Result.FailedRemovePaths[FailedPathIndex].first, - Result.FailedRemovePaths[FailedPathIndex].second.value(), - Result.FailedRemovePaths[FailedPathIndex].second.message()); - } - ZEN_OPERATION_LOG_WARN(OperationLogOutput, "Clean failed to remove files from '{}': {}", Path, SB.ToView()); - } - - if (ElapsedTimeMs >= 200 && !IsQuiet) - { - ZEN_OPERATION_LOG_INFO(OperationLogOutput, - "Wiped folder '{}' {} ({}) in {}", - Path, - Result.FoundCount, - NiceBytes(Result.DeletedByteCount), - NiceTimeSpanMs(ElapsedTimeMs)); - } - - return Result.FailedRemovePaths.empty(); - } - - bool IsExtensionHashCompressable(const tsl::robin_set<uint32_t>& NonCompressableExtensionHashes, const uint32_t PathHash) - { - return !NonCompressableExtensionHashes.contains(PathHash); - } - - bool IsChunkCompressable(const tsl::robin_set<uint32_t>& NonCompressableExtensionHashes, - const ChunkedFolderContent& Content, - const ChunkedContentLookup& Lookup, - uint32_t ChunkIndex) - { - ZEN_UNUSED(Content); - const uint32_t ChunkLocationCount = Lookup.ChunkSequenceLocationCounts[ChunkIndex]; - if (ChunkLocationCount == 0) - { - return false; - } - const size_t ChunkLocationOffset = Lookup.ChunkSequenceLocationOffset[ChunkIndex]; - const uint32_t SequenceIndex = Lookup.ChunkSequenceLocations[ChunkLocationOffset].SequenceIndex; - const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex]; - const uint32_t ExtensionHash = Lookup.PathExtensionHash[PathIndex]; - - const bool IsCompressable = IsExtensionHashCompressable(NonCompressableExtensionHashes, ExtensionHash); - return IsCompressable; - } - - template<typename T> - std::string FormatArray(std::span<const T> Items, std::string_view Prefix) - { - ExtendableStringBuilder<512> SB; - for (const T& Item : Items) - { - SB.Append(fmt::format("{}{}", Prefix, Item)); - } - return SB.ToString(); - } - - void DownloadLargeBlob(BuildStorageBase& Storage, - const std::filesystem::path& DownloadFolder, - const Oid& BuildId, - const IoHash& ChunkHash, - const std::uint64_t PreferredMultipartChunkSize, - ParallelWork& Work, - WorkerThreadPool& NetworkPool, - std::atomic<uint64_t>& DownloadedChunkByteCount, - std::atomic<uint64_t>& MultipartAttachmentCount, - std::function<void(IoBuffer&& Payload)>&& OnDownloadComplete) - { - ZEN_TRACE_CPU("DownloadLargeBlob"); - - struct WorkloadData - { - TemporaryFile TempFile; - }; - std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>()); - - std::error_code Ec; - Workload->TempFile.CreateTemporary(DownloadFolder, Ec); - if (Ec) - { - throw std::runtime_error( - fmt::format("Failed opening temporary file '{}', reason: ({}) {}", Workload->TempFile.GetPath(), Ec.message(), Ec.value())); - } - std::vector<std::function<void()>> WorkItems = Storage.GetLargeBuildBlob( - BuildId, - ChunkHash, - PreferredMultipartChunkSize, - [&Work, Workload, &DownloadedChunkByteCount](uint64_t Offset, const IoBuffer& Chunk) { - DownloadedChunkByteCount += Chunk.GetSize(); - - if (!Work.IsAborted()) - { - ZEN_TRACE_CPU("Async_DownloadLargeBlob_OnReceive"); - Workload->TempFile.Write(Chunk.GetView(), Offset); - } - }, - [&Work, Workload, &DownloadedChunkByteCount, OnDownloadComplete = std::move(OnDownloadComplete)]() { - if (!Work.IsAborted()) - { - ZEN_TRACE_CPU("Async_DownloadLargeBlob_OnComplete"); - - uint64_t PayloadSize = Workload->TempFile.FileSize(); - void* FileHandle = Workload->TempFile.Detach(); - ZEN_ASSERT(FileHandle != nullptr); - IoBuffer Payload(IoBuffer::File, FileHandle, 0, PayloadSize, true); - Payload.SetDeleteOnClose(true); - OnDownloadComplete(std::move(Payload)); - } - }); - if (!WorkItems.empty()) - { - MultipartAttachmentCount++; - } - for (auto& WorkItem : WorkItems) - { - Work.ScheduleWork(NetworkPool, [WorkItem = std::move(WorkItem)](std::atomic<bool>& AbortFlag) { - if (!AbortFlag) - { - ZEN_TRACE_CPU("Async_DownloadLargeBlob_Work"); - - WorkItem(); - } - }); - } - } - - CompositeBuffer ValidateBlob(std::atomic<bool>& AbortFlag, - IoBuffer&& Payload, - const IoHash& BlobHash, - uint64_t& OutCompressedSize, - uint64_t& OutDecompressedSize) - { - ZEN_TRACE_CPU("ValidateBlob"); - - if (Payload.GetContentType() != ZenContentType::kCompressedBinary) - { - throw std::runtime_error(fmt::format("Blob {} ({} bytes) has unexpected content type '{}'", - BlobHash, - Payload.GetSize(), - ToString(Payload.GetContentType()))); - } - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, RawSize); - if (!Compressed) - { - throw std::runtime_error(fmt::format("Blob {} ({} bytes) compressed header is invalid", BlobHash, Payload.GetSize())); - } - if (RawHash != BlobHash) - { - throw std::runtime_error( - fmt::format("Blob {} ({} bytes) compressed header has a mismatching raw hash {}", BlobHash, Payload.GetSize(), RawHash)); - } - - IoHashStream Hash; - bool CouldDecompress = Compressed.DecompressToStream( - 0, - RawSize, - [&AbortFlag, &Hash](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { - ZEN_UNUSED(SourceOffset, SourceSize, Offset); - if (!AbortFlag) - { - for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) - { - Hash.Append(Segment.GetView()); - } - return true; - } - return false; - }); - - if (AbortFlag) - { - return CompositeBuffer{}; - } - - if (!CouldDecompress) - { - throw std::runtime_error( - fmt::format("Blob {} ({} bytes) failed to decompress - header information mismatch", BlobHash, Payload.GetSize())); - } - IoHash ValidateRawHash = Hash.GetHash(); - if (ValidateRawHash != BlobHash) - { - throw std::runtime_error(fmt::format("Blob {} ({} bytes) decompressed hash {} does not match header information", - BlobHash, - Payload.GetSize(), - ValidateRawHash)); - } - OodleCompressor Compressor; - OodleCompressionLevel CompressionLevel; - uint64_t BlockSize; - if (!Compressed.TryGetCompressParameters(Compressor, CompressionLevel, BlockSize)) - { - throw std::runtime_error(fmt::format("Blob {} ({} bytes) failed to get compression details", BlobHash, Payload.GetSize())); - } - OutCompressedSize = Payload.GetSize(); - OutDecompressedSize = RawSize; - if (CompressionLevel == OodleCompressionLevel::None) - { - // Only decompress to composite if we need it for block verification - CompositeBuffer DecompressedComposite = Compressed.DecompressToComposite(); - if (!DecompressedComposite) - { - throw std::runtime_error(fmt::format("Blob {} ({} bytes) failed to decompress to composite", BlobHash, Payload.GetSize())); - } - return DecompressedComposite; - } - return CompositeBuffer{}; - } - -} // namespace - -bool -IsSingleFileChunk(const ChunkedFolderContent& RemoteContent, - const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> Locations) -{ - if (Locations.size() == 1) - { - const uint32_t FirstSequenceIndex = Locations[0]->SequenceIndex; - if (RemoteContent.ChunkedContent.ChunkCounts[FirstSequenceIndex] == 1) - { - ZEN_ASSERT_SLOW(Locations[0]->Offset == 0); - return true; - } - } - return false; -} - -IoBuffer -MakeBufferMemoryBased(const CompositeBuffer& PartialBlockBuffer) -{ - ZEN_TRACE_CPU("MakeBufferMemoryBased"); - IoBuffer BlockMemoryBuffer; - std::span<const SharedBuffer> Segments = PartialBlockBuffer.GetSegments(); - if (Segments.size() == 1) - { - IoBufferFileReference FileRef = {}; - if (PartialBlockBuffer.GetSegments().front().AsIoBuffer().GetFileReference(FileRef)) - { - BlockMemoryBuffer = UniqueBuffer::Alloc(FileRef.FileChunkSize).MoveToShared().AsIoBuffer(); - BasicFile Reader; - Reader.Attach(FileRef.FileHandle); - auto _ = MakeGuard([&Reader]() { Reader.Detach(); }); - MutableMemoryView ReadMem = BlockMemoryBuffer.GetMutableView(); - Reader.Read(ReadMem.GetData(), FileRef.FileChunkSize, FileRef.FileChunkOffset); - return BlockMemoryBuffer; - } - else - { - return PartialBlockBuffer.GetSegments().front().AsIoBuffer(); - } - } - else - { - // Not a homogenous memory buffer, read all to memory - - BlockMemoryBuffer = UniqueBuffer::Alloc(PartialBlockBuffer.GetSize()).MoveToShared().AsIoBuffer(); - MutableMemoryView ReadMem = BlockMemoryBuffer.GetMutableView(); - for (const SharedBuffer& Segment : Segments) - { - IoBufferFileReference FileRef = {}; - if (Segment.AsIoBuffer().GetFileReference(FileRef)) - { - BasicFile Reader; - Reader.Attach(FileRef.FileHandle); - auto _ = MakeGuard([&Reader]() { Reader.Detach(); }); - Reader.Read(ReadMem.GetData(), FileRef.FileChunkSize, FileRef.FileChunkOffset); - ReadMem = ReadMem.Mid(FileRef.FileChunkSize); - } - else - { - ReadMem = ReadMem.CopyFrom(Segment.AsIoBuffer().GetView()); - } - } - return BlockMemoryBuffer; - } -} - -class FilteredRate -{ -public: - FilteredRate() {} - - void Start() - { - if (StartTimeUS == (uint64_t)-1) - { - uint64_t Expected = (uint64_t)-1; - if (StartTimeUS.compare_exchange_weak(Expected, Timer.GetElapsedTimeUs())) - { - LastTimeUS = StartTimeUS.load(); - } - } - } - void Stop() - { - if (EndTimeUS == (uint64_t)-1) - { - uint64_t Expected = (uint64_t)-1; - EndTimeUS.compare_exchange_weak(Expected, Timer.GetElapsedTimeUs()); - } - } - - void Update(uint64_t Count) - { - if (LastTimeUS == (uint64_t)-1) - { - return; - } - uint64_t TimeUS = Timer.GetElapsedTimeUs(); - uint64_t TimeDeltaUS = TimeUS - LastTimeUS; - if (TimeDeltaUS >= 2000000) - { - uint64_t Delta = Count - LastCount; - uint64_t PerSecond = (Delta * 1000000) / TimeDeltaUS; - - LastPerSecond = PerSecond; - - LastCount = Count; - - FilteredPerSecond = (PerSecond + (LastPerSecond * 7)) / 8; - - LastTimeUS = TimeUS; - } - } - - uint64_t GetCurrent() const // If Stopped - return total count / total time - { - if (LastTimeUS == (uint64_t)-1) - { - return 0; - } - return FilteredPerSecond; - } - - uint64_t GetElapsedTimeUS() const - { - if (StartTimeUS == (uint64_t)-1) - { - return 0; - } - if (EndTimeUS == (uint64_t)-1) - { - return 0; - } - uint64_t TimeDeltaUS = EndTimeUS - StartTimeUS; - return TimeDeltaUS; - } - - bool IsActive() const { return (StartTimeUS != (uint64_t)-1) && (EndTimeUS == (uint64_t)-1); } - -private: - Stopwatch Timer; - std::atomic<uint64_t> StartTimeUS = (uint64_t)-1; - std::atomic<uint64_t> EndTimeUS = (uint64_t)-1; - std::atomic<uint64_t> LastTimeUS = (uint64_t)-1; - uint64_t LastCount = 0; - uint64_t LastPerSecond = 0; - uint64_t FilteredPerSecond = 0; -}; - -std::filesystem::path -ZenStateFilePath(const std::filesystem::path& ZenFolderPath) -{ - return ZenFolderPath / "current_state.cbo"; -} -std::filesystem::path -ZenTempFolderPath(const std::filesystem::path& ZenFolderPath) -{ - return ZenFolderPath / "tmp"; -} - -////////////////////// BuildsOperationUpdateFolder - -BuildsOperationUpdateFolder::BuildsOperationUpdateFolder(OperationLogOutput& OperationLogOutput, - StorageInstance& Storage, - std::atomic<bool>& AbortFlag, - std::atomic<bool>& PauseFlag, - WorkerThreadPool& IOWorkerPool, - WorkerThreadPool& NetworkPool, - const Oid& BuildId, - const std::filesystem::path& Path, - const ChunkedFolderContent& LocalContent, - const ChunkedContentLookup& LocalLookup, - const ChunkedFolderContent& RemoteContent, - const ChunkedContentLookup& RemoteLookup, - const std::vector<ChunkBlockDescription>& BlockDescriptions, - const std::vector<IoHash>& LooseChunkHashes, - const Options& Options) -: m_LogOutput(OperationLogOutput) -, m_Storage(Storage) -, m_AbortFlag(AbortFlag) -, m_PauseFlag(PauseFlag) -, m_IOWorkerPool(IOWorkerPool) -, m_NetworkPool(NetworkPool) -, m_BuildId(BuildId) -, m_Path(Path) -, m_LocalContent(LocalContent) -, m_LocalLookup(LocalLookup) -, m_RemoteContent(RemoteContent) -, m_RemoteLookup(RemoteLookup) -, m_BlockDescriptions(BlockDescriptions) -, m_LooseChunkHashes(LooseChunkHashes) -, m_Options(Options) -, m_CacheFolderPath(ZenTempCacheFolderPath(m_Options.ZenFolderPath)) -, m_TempDownloadFolderPath(ZenTempDownloadFolderPath(m_Options.ZenFolderPath)) -, m_TempBlockFolderPath(ZenTempBlockFolderPath(m_Options.ZenFolderPath)) -{ -} - -void -BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) -{ - ZEN_TRACE_CPU("BuildsOperationUpdateFolder::Execute"); - try - { - enum class TaskSteps : uint32_t - { - ScanExistingData, - WriteChunks, - PrepareTarget, - FinalizeTarget, - Cleanup, - StepCount - }; - - auto EndProgress = - MakeGuard([&]() { m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::StepCount, (uint32_t)TaskSteps::StepCount); }); - - ZEN_ASSERT((!m_Options.PrimeCacheOnly) || - (m_Options.PrimeCacheOnly && (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Off))); - - m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::ScanExistingData, (uint32_t)TaskSteps::StepCount); - - CreateDirectories(m_CacheFolderPath); - CreateDirectories(m_TempDownloadFolderPath); - CreateDirectories(m_TempBlockFolderPath); - - Stopwatch CacheMappingTimer; - - std::vector<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters(m_RemoteContent.ChunkedContent.SequenceRawHashes.size()); - std::vector<bool> RemoteChunkIndexNeedsCopyFromLocalFileFlags(m_RemoteContent.ChunkedContent.ChunkHashes.size()); - std::vector<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags(m_RemoteContent.ChunkedContent.ChunkHashes.size()); - - tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedChunkHashesFound; - tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedSequenceHashesFound; - if (!m_Options.PrimeCacheOnly) - { - ScanCacheFolder(CachedChunkHashesFound, CachedSequenceHashesFound); - } - - tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedBlocksFound; - if (!m_Options.PrimeCacheOnly) - { - ScanTempBlocksFolder(CachedBlocksFound); - } - - tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> SequenceIndexesLeftToFindToRemoteIndex; - - if (!m_Options.PrimeCacheOnly && m_Options.EnableTargetFolderScavenging) - { - // Pick up all whole files we can use from current local state - ZEN_TRACE_CPU("GetLocalSequences"); - - Stopwatch LocalTimer; - - std::vector<uint32_t> MissingSequenceIndexes = ScanTargetFolder(CachedChunkHashesFound, CachedSequenceHashesFound); - - for (uint32_t RemoteSequenceIndex : MissingSequenceIndexes) - { - // We must write the sequence - const uint32_t ChunkCount = m_RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]; - const IoHash& RemoteSequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; - SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = ChunkCount; - SequenceIndexesLeftToFindToRemoteIndex.insert({RemoteSequenceRawHash, RemoteSequenceIndex}); - } - } - else - { - for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < m_RemoteContent.ChunkedContent.SequenceRawHashes.size(); - RemoteSequenceIndex++) - { - const uint32_t ChunkCount = m_RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]; - SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = ChunkCount; - } - } - - std::vector<ChunkedFolderContent> ScavengedContents; - std::vector<ChunkedContentLookup> ScavengedLookups; - std::vector<std::filesystem::path> ScavengedPaths; - - std::vector<ScavengedSequenceCopyOperation> ScavengedSequenceCopyOperations; - uint64_t ScavengedPathsCount = 0; - - if (!m_Options.PrimeCacheOnly && m_Options.EnableOtherDownloadsScavenging) - { - ZEN_TRACE_CPU("GetScavengedSequences"); - - Stopwatch ScavengeTimer; - - if (!SequenceIndexesLeftToFindToRemoteIndex.empty()) - { - std::vector<ScavengeSource> ScavengeSources = FindScavengeSources(); - - const size_t ScavengePathCount = ScavengeSources.size(); - - ScavengedContents.resize(ScavengePathCount); - ScavengedLookups.resize(ScavengePathCount); - ScavengedPaths.resize(ScavengePathCount); - - std::unique_ptr<OperationLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Scavenging")); - OperationLogOutput::ProgressBar& ScavengeProgressBar(*ProgressBarPtr); - - ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - - std::atomic<uint64_t> PathsFound(0); - std::atomic<uint64_t> ChunksFound(0); - std::atomic<uint64_t> PathsScavenged(0); - - for (size_t ScavengeIndex = 0; ScavengeIndex < ScavengePathCount; ScavengeIndex++) - { - Work.ScheduleWork(m_IOWorkerPool, - [this, - &ScavengeSources, - &ScavengedContents, - &ScavengedPaths, - &ScavengedLookups, - &PathsFound, - &ChunksFound, - &PathsScavenged, - ScavengeIndex](std::atomic<bool>&) { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Async_FindScavengeContent"); - - const ScavengeSource& Source = ScavengeSources[ScavengeIndex]; - ChunkedFolderContent& ScavengedLocalContent = ScavengedContents[ScavengeIndex]; - ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengeIndex]; - - if (FindScavengeContent(Source, ScavengedLocalContent, ScavengedLookup)) - { - ScavengedPaths[ScavengeIndex] = Source.Path; - PathsFound += ScavengedLocalContent.Paths.size(); - ChunksFound += ScavengedLocalContent.ChunkedContent.ChunkHashes.size(); - } - else - { - ScavengedPaths[ScavengeIndex].clear(); - } - PathsScavenged++; - } - }); - } - { - ZEN_TRACE_CPU("ScavengeScan_Wait"); - - Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(PendingWork); - std::string Details = fmt::format("{}/{} scanned. {} paths and {} chunks found for scavenging", - PathsScavenged.load(), - ScavengePathCount, - PathsFound.load(), - ChunksFound.load()); - ScavengeProgressBar.UpdateState( - {.Task = "Scavenging ", - .Details = Details, - .TotalCount = ScavengePathCount, - .RemainingCount = ScavengePathCount - PathsScavenged.load(), - .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, - false); - }); - } - - ScavengeProgressBar.Finish(); - if (m_AbortFlag) - { - return; - } - - for (uint32_t ScavengedContentIndex = 0; - ScavengedContentIndex < ScavengedContents.size() && (!SequenceIndexesLeftToFindToRemoteIndex.empty()); - ScavengedContentIndex++) - { - const std::filesystem::path& ScavengePath = ScavengedPaths[ScavengedContentIndex]; - if (!ScavengePath.empty()) - { - const ChunkedFolderContent& ScavengedLocalContent = ScavengedContents[ScavengedContentIndex]; - const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengedContentIndex]; - - for (uint32_t ScavengedSequenceIndex = 0; - ScavengedSequenceIndex < ScavengedLocalContent.ChunkedContent.SequenceRawHashes.size(); - ScavengedSequenceIndex++) - { - const IoHash& SequenceRawHash = ScavengedLocalContent.ChunkedContent.SequenceRawHashes[ScavengedSequenceIndex]; - if (auto It = SequenceIndexesLeftToFindToRemoteIndex.find(SequenceRawHash); - It != SequenceIndexesLeftToFindToRemoteIndex.end()) - { - const uint32_t RemoteSequenceIndex = It->second; - const uint64_t RawSize = - m_RemoteContent.RawSizes[m_RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]]; - ZEN_ASSERT(RawSize > 0); - - const uint32_t ScavengedPathIndex = ScavengedLookup.SequenceIndexFirstPathIndex[ScavengedSequenceIndex]; - ZEN_ASSERT_SLOW(IsFile((ScavengePath / ScavengedLocalContent.Paths[ScavengedPathIndex]).make_preferred())); - - ScavengedSequenceCopyOperations.push_back({.ScavengedContentIndex = ScavengedContentIndex, - .ScavengedPathIndex = ScavengedPathIndex, - .RemoteSequenceIndex = RemoteSequenceIndex, - .RawSize = RawSize}); - - SequenceIndexesLeftToFindToRemoteIndex.erase(SequenceRawHash); - SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = 0; - - m_CacheMappingStats.ScavengedPathsMatchingSequencesCount++; - m_CacheMappingStats.ScavengedPathsMatchingSequencesByteCount += RawSize; - } - } - ScavengedPathsCount++; - } - } - } - m_CacheMappingStats.ScavengeElapsedWallTimeUs += ScavengeTimer.GetElapsedTimeUs(); - } - - uint32_t RemainingChunkCount = 0; - for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < m_RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++) - { - uint64_t ChunkWriteCount = GetChunkWriteCount(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); - if (ChunkWriteCount > 0) - { - RemainingChunkCount++; - } - } - - // Pick up all chunks in current local state - tsl::robin_map<IoHash, size_t, IoHash::Hasher> RawHashToCopyChunkDataIndex; - std::vector<CopyChunkData> CopyChunkDatas; - - if (!m_Options.PrimeCacheOnly && m_Options.EnableTargetFolderScavenging) - { - ZEN_TRACE_CPU("GetLocalChunks"); - - Stopwatch LocalTimer; - - ScavengeSourceForChunks(RemainingChunkCount, - RemoteChunkIndexNeedsCopyFromLocalFileFlags, - RawHashToCopyChunkDataIndex, - SequenceIndexChunksLeftToWriteCounters, - m_LocalContent, - m_LocalLookup, - CopyChunkDatas, - uint32_t(-1), - m_CacheMappingStats.LocalChunkMatchingRemoteCount, - m_CacheMappingStats.LocalChunkMatchingRemoteByteCount); - - m_CacheMappingStats.LocalScanElapsedWallTimeUs += LocalTimer.GetElapsedTimeUs(); - } - - if (!m_Options.PrimeCacheOnly && m_Options.EnableOtherDownloadsScavenging) - { - ZEN_TRACE_CPU("GetScavengeChunks"); - - Stopwatch ScavengeTimer; - - for (uint32_t ScavengedContentIndex = 0; ScavengedContentIndex < ScavengedContents.size() && (RemainingChunkCount > 0); - ScavengedContentIndex++) - { - const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengedContentIndex]; - const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengedContentIndex]; - - ScavengeSourceForChunks(RemainingChunkCount, - RemoteChunkIndexNeedsCopyFromLocalFileFlags, - RawHashToCopyChunkDataIndex, - SequenceIndexChunksLeftToWriteCounters, - ScavengedContent, - ScavengedLookup, - CopyChunkDatas, - ScavengedContentIndex, - m_CacheMappingStats.ScavengedChunkMatchingRemoteCount, - m_CacheMappingStats.ScavengedChunkMatchingRemoteByteCount); - } - m_CacheMappingStats.ScavengeElapsedWallTimeUs += ScavengeTimer.GetElapsedTimeUs(); - } - - if (!m_Options.IsQuiet) - { - if (m_CacheMappingStats.CacheSequenceHashesCount > 0 || m_CacheMappingStats.CacheChunkCount > 0 || - m_CacheMappingStats.CacheBlockCount > 0) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Download cache: Found {} ({}) chunk sequences, {} ({}) chunks, {} ({}) blocks in {}", - m_CacheMappingStats.CacheSequenceHashesCount, - NiceBytes(m_CacheMappingStats.CacheSequenceHashesByteCount), - m_CacheMappingStats.CacheChunkCount, - NiceBytes(m_CacheMappingStats.CacheChunkByteCount), - m_CacheMappingStats.CacheBlockCount, - NiceBytes(m_CacheMappingStats.CacheBlocksByteCount), - NiceTimeSpanMs(m_CacheMappingStats.CacheScanElapsedWallTimeUs / 1000)); - } - - if (m_CacheMappingStats.LocalPathsMatchingSequencesCount > 0 || m_CacheMappingStats.LocalChunkMatchingRemoteCount > 0) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Local state : Found {} ({}) chunk sequences, {} ({}) chunks in {}", - m_CacheMappingStats.LocalPathsMatchingSequencesCount, - NiceBytes(m_CacheMappingStats.LocalPathsMatchingSequencesByteCount), - m_CacheMappingStats.LocalChunkMatchingRemoteCount, - NiceBytes(m_CacheMappingStats.LocalChunkMatchingRemoteByteCount), - NiceTimeSpanMs(m_CacheMappingStats.LocalScanElapsedWallTimeUs / 1000)); - } - if (m_CacheMappingStats.ScavengedPathsMatchingSequencesCount > 0 || m_CacheMappingStats.ScavengedChunkMatchingRemoteCount > 0) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Scavenge of {} paths, found {} ({}) chunk sequences, {} ({}) chunks in {}", - ScavengedPathsCount, - m_CacheMappingStats.ScavengedPathsMatchingSequencesCount, - NiceBytes(m_CacheMappingStats.ScavengedPathsMatchingSequencesByteCount), - m_CacheMappingStats.ScavengedChunkMatchingRemoteCount, - NiceBytes(m_CacheMappingStats.ScavengedChunkMatchingRemoteByteCount), - NiceTimeSpanMs(m_CacheMappingStats.ScavengeElapsedWallTimeUs / 1000)); - } - } - - uint64_t BytesToWrite = 0; - - for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < m_RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++) - { - uint64_t ChunkWriteCount = GetChunkWriteCount(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); - if (ChunkWriteCount > 0) - { - BytesToWrite += m_RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] * ChunkWriteCount; - if (!RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) - { - RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex] = true; - } - } - } - - for (const ScavengedSequenceCopyOperation& ScavengeCopyOp : ScavengedSequenceCopyOperations) - { - BytesToWrite += ScavengeCopyOp.RawSize; - } - - uint64_t BytesToValidate = m_Options.ValidateCompletedSequences ? BytesToWrite : 0; - - uint64_t TotalRequestCount = 0; - uint64_t TotalPartWriteCount = 0; - std::atomic<uint64_t> WritePartsComplete = 0; - - tsl::robin_map<std::string, uint32_t> RemotePathToRemoteIndex; - RemotePathToRemoteIndex.reserve(m_RemoteContent.Paths.size()); - for (uint32_t RemotePathIndex = 0; RemotePathIndex < m_RemoteContent.Paths.size(); RemotePathIndex++) - { - RemotePathToRemoteIndex.insert({m_RemoteContent.Paths[RemotePathIndex].generic_string(), RemotePathIndex}); - } - - CheckRequiredDiskSpace(RemotePathToRemoteIndex); - - BlobsExistsResult ExistsResult; - { - ChunkBlockAnalyser BlockAnalyser( - m_LogOutput, - m_BlockDescriptions, - ChunkBlockAnalyser::Options{.IsQuiet = m_Options.IsQuiet, - .IsVerbose = m_Options.IsVerbose, - .HostLatencySec = m_Storage.BuildStorageHost.LatencySec, - .HostHighSpeedLatencySec = m_Storage.CacheHost.LatencySec, - .HostMaxRangeCountPerRequest = m_Storage.BuildStorageHost.Caps.MaxRangeCountPerRequest, - .HostHighSpeedMaxRangeCountPerRequest = m_Storage.CacheHost.Caps.MaxRangeCountPerRequest}); - - std::vector<ChunkBlockAnalyser::NeededBlock> NeededBlocks = BlockAnalyser.GetNeeded( - m_RemoteLookup.ChunkHashToChunkIndex, - [&](uint32_t RemoteChunkIndex) -> bool { return RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex]; }); - - std::vector<uint32_t> FetchBlockIndexes; - std::vector<uint32_t> CachedChunkBlockIndexes; - - { - ZEN_TRACE_CPU("BlockCacheFileExists"); - for (const ChunkBlockAnalyser::NeededBlock& NeededBlock : NeededBlocks) - { - if (m_Options.PrimeCacheOnly) - { - FetchBlockIndexes.push_back(NeededBlock.BlockIndex); - } - else - { - const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[NeededBlock.BlockIndex]; - bool UsingCachedBlock = false; - if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end()) - { - TotalPartWriteCount++; - - std::filesystem::path BlockPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); - if (IsFile(BlockPath)) - { - CachedChunkBlockIndexes.push_back(NeededBlock.BlockIndex); - UsingCachedBlock = true; - } - } - if (!UsingCachedBlock) - { - FetchBlockIndexes.push_back(NeededBlock.BlockIndex); - } - } - } - } - - std::vector<uint32_t> NeededLooseChunkIndexes; - - { - NeededLooseChunkIndexes.reserve(m_LooseChunkHashes.size()); - for (uint32_t LooseChunkIndex = 0; LooseChunkIndex < m_LooseChunkHashes.size(); LooseChunkIndex++) - { - const IoHash& ChunkHash = m_LooseChunkHashes[LooseChunkIndex]; - auto RemoteChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); - ZEN_ASSERT(RemoteChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end()); - const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second; - - if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) - { - if (m_Options.IsVerbose) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Skipping chunk {} due to cache reuse", - m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]); - } - continue; - } - - bool NeedsCopy = true; - if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex].compare_exchange_strong(NeedsCopy, false)) - { - uint64_t WriteCount = GetChunkWriteCount(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); - if (WriteCount == 0) - { - if (m_Options.IsVerbose) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Skipping chunk {} due to cache reuse", - m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]); - } - } - else - { - NeededLooseChunkIndexes.push_back(LooseChunkIndex); - } - } - } - } - - if (m_Storage.CacheStorage) - { - ZEN_TRACE_CPU("BlobCacheExistCheck"); - Stopwatch Timer; - - std::vector<IoHash> BlobHashes; - BlobHashes.reserve(NeededLooseChunkIndexes.size() + FetchBlockIndexes.size()); - - for (const uint32_t LooseChunkIndex : NeededLooseChunkIndexes) - { - BlobHashes.push_back(m_LooseChunkHashes[LooseChunkIndex]); - } - - for (uint32_t BlockIndex : FetchBlockIndexes) - { - BlobHashes.push_back(m_BlockDescriptions[BlockIndex].BlockHash); - } - - const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult = - m_Storage.CacheStorage->BlobsExists(m_BuildId, BlobHashes); - - if (CacheExistsResult.size() == BlobHashes.size()) - { - ExistsResult.ExistingBlobs.reserve(CacheExistsResult.size()); - for (size_t BlobIndex = 0; BlobIndex < BlobHashes.size(); BlobIndex++) - { - if (CacheExistsResult[BlobIndex].HasBody) - { - ExistsResult.ExistingBlobs.insert(BlobHashes[BlobIndex]); - } - } - } - ExistsResult.ElapsedTimeMs = Timer.GetElapsedTimeMs(); - if (!ExistsResult.ExistingBlobs.empty() && !m_Options.IsQuiet) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Remote cache : Found {} out of {} needed blobs in {}", - ExistsResult.ExistingBlobs.size(), - BlobHashes.size(), - NiceTimeSpanMs(ExistsResult.ElapsedTimeMs)); - } - } - - std::vector<ChunkBlockAnalyser::EPartialBlockDownloadMode> BlockPartialDownloadModes; - - if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Off) - { - BlockPartialDownloadModes.resize(m_BlockDescriptions.size(), ChunkBlockAnalyser::EPartialBlockDownloadMode::Off); - } - else - { - ChunkBlockAnalyser::EPartialBlockDownloadMode CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::Off; - ChunkBlockAnalyser::EPartialBlockDownloadMode CachePartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::Off; - - switch (m_Options.PartialBlockRequestMode) - { - case EPartialBlockRequestMode::Off: - break; - case EPartialBlockRequestMode::ZenCacheOnly: - CachePartialDownloadMode = m_Storage.CacheHost.Caps.MaxRangeCountPerRequest > 1 - ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed - : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange; - CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::Off; - break; - case EPartialBlockRequestMode::Mixed: - CachePartialDownloadMode = m_Storage.CacheHost.Caps.MaxRangeCountPerRequest > 1 - ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed - : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange; - CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange; - break; - case EPartialBlockRequestMode::All: - CachePartialDownloadMode = m_Storage.CacheHost.Caps.MaxRangeCountPerRequest > 1 - ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed - : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange; - CloudPartialDownloadMode = m_Storage.BuildStorageHost.Caps.MaxRangeCountPerRequest > 1 - ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange - : ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange; - break; - default: - ZEN_ASSERT(false); - break; - } - - BlockPartialDownloadModes.reserve(m_BlockDescriptions.size()); - for (uint32_t BlockIndex = 0; BlockIndex < m_BlockDescriptions.size(); BlockIndex++) - { - const bool BlockExistInCache = ExistsResult.ExistingBlobs.contains(m_BlockDescriptions[BlockIndex].BlockHash); - BlockPartialDownloadModes.push_back(BlockExistInCache ? CachePartialDownloadMode : CloudPartialDownloadMode); - } - } - - ZEN_ASSERT(BlockPartialDownloadModes.size() == m_BlockDescriptions.size()); - - ChunkBlockAnalyser::BlockResult PartialBlocks = - BlockAnalyser.CalculatePartialBlockDownloads(NeededBlocks, BlockPartialDownloadModes); - - struct LooseChunkHashWorkData - { - std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs; - uint32_t RemoteChunkIndex = (uint32_t)-1; - }; - - TotalRequestCount += NeededLooseChunkIndexes.size(); - TotalPartWriteCount += NeededLooseChunkIndexes.size(); - TotalRequestCount += PartialBlocks.BlockRanges.size(); - TotalPartWriteCount += PartialBlocks.BlockRanges.size(); - TotalRequestCount += PartialBlocks.FullBlockIndexes.size(); - TotalPartWriteCount += PartialBlocks.FullBlockIndexes.size(); - - std::vector<LooseChunkHashWorkData> LooseChunkHashWorks; - for (uint32_t LooseChunkIndex : NeededLooseChunkIndexes) - { - const IoHash& ChunkHash = m_LooseChunkHashes[LooseChunkIndex]; - auto RemoteChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); - ZEN_ASSERT(RemoteChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end()); - const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second; - - std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = - GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); - - ZEN_ASSERT(!ChunkTargetPtrs.empty()); - LooseChunkHashWorks.push_back( - LooseChunkHashWorkData{.ChunkTargetPtrs = ChunkTargetPtrs, .RemoteChunkIndex = RemoteChunkIndex}); - } - - ZEN_TRACE_CPU("WriteChunks"); - - m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::WriteChunks, (uint32_t)TaskSteps::StepCount); - - Stopwatch WriteTimer; - - FilteredRate FilteredDownloadedBytesPerSecond; - FilteredRate FilteredWrittenBytesPerSecond; - - std::unique_ptr<OperationLogOutput::ProgressBar> WriteProgressBarPtr( - m_LogOutput.CreateProgressBar(m_Options.PrimeCacheOnly ? "Downloading" : "Writing")); - OperationLogOutput::ProgressBar& WriteProgressBar(*WriteProgressBarPtr); - ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - - TotalPartWriteCount += CopyChunkDatas.size(); - TotalPartWriteCount += ScavengedSequenceCopyOperations.size(); - - BufferedWriteFileCache WriteCache; - - for (uint32_t ScavengeOpIndex = 0; ScavengeOpIndex < ScavengedSequenceCopyOperations.size(); ScavengeOpIndex++) - { - if (m_AbortFlag) - { - break; - } - if (!m_Options.PrimeCacheOnly) - { - Work.ScheduleWork( - m_IOWorkerPool, - [this, - &ScavengedPaths, - &ScavengedSequenceCopyOperations, - &ScavengedContents, - &FilteredWrittenBytesPerSecond, - ScavengeOpIndex, - &WritePartsComplete, - TotalPartWriteCount](std::atomic<bool>&) mutable { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Async_WriteScavenged"); - - FilteredWrittenBytesPerSecond.Start(); - - const ScavengedSequenceCopyOperation& ScavengeOp = ScavengedSequenceCopyOperations[ScavengeOpIndex]; - const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengeOp.ScavengedContentIndex]; - const std::filesystem::path& ScavengeRootPath = ScavengedPaths[ScavengeOp.ScavengedContentIndex]; - - WriteScavengedSequenceToCache(ScavengeRootPath, ScavengedContent, ScavengeOp); - - WritePartsComplete++; - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); - } - } - }); - } - } - - for (uint32_t LooseChunkHashWorkIndex = 0; LooseChunkHashWorkIndex < LooseChunkHashWorks.size(); LooseChunkHashWorkIndex++) - { - if (m_AbortFlag) - { - break; - } - - if (m_Options.PrimeCacheOnly) - { - const uint32_t RemoteChunkIndex = LooseChunkHashWorks[LooseChunkHashWorkIndex].RemoteChunkIndex; - if (ExistsResult.ExistingBlobs.contains(m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex])) - { - m_DownloadStats.RequestsCompleteCount++; - continue; - } - } - - Work.ScheduleWork( - m_IOWorkerPool, - [this, - &SequenceIndexChunksLeftToWriteCounters, - &Work, - &ExistsResult, - &WritePartsComplete, - &LooseChunkHashWorks, - LooseChunkHashWorkIndex, - TotalRequestCount, - TotalPartWriteCount, - &WriteCache, - &FilteredDownloadedBytesPerSecond, - &FilteredWrittenBytesPerSecond](std::atomic<bool>&) mutable { - ZEN_TRACE_CPU("Async_ReadPreDownloadedChunk"); - if (!m_AbortFlag) - { - LooseChunkHashWorkData& LooseChunkHashWork = LooseChunkHashWorks[LooseChunkHashWorkIndex]; - const uint32_t RemoteChunkIndex = LooseChunkHashWorks[LooseChunkHashWorkIndex].RemoteChunkIndex; - WriteLooseChunk(RemoteChunkIndex, - ExistsResult, - SequenceIndexChunksLeftToWriteCounters, - WritePartsComplete, - std::move(LooseChunkHashWork.ChunkTargetPtrs), - WriteCache, - Work, - TotalRequestCount, - TotalPartWriteCount, - FilteredDownloadedBytesPerSecond, - FilteredWrittenBytesPerSecond); - } - }, - WorkerThreadPool::EMode::EnableBacklog); - } - - std::unique_ptr<CloneQueryInterface> CloneQuery; - if (m_Options.AllowFileClone) - { - CloneQuery = GetCloneQueryInterface(m_CacheFolderPath); - } - - for (size_t CopyDataIndex = 0; CopyDataIndex < CopyChunkDatas.size(); CopyDataIndex++) - { - ZEN_ASSERT(!m_Options.PrimeCacheOnly); - if (m_AbortFlag) - { - break; - } - - Work.ScheduleWork(m_IOWorkerPool, - [this, - &CloneQuery, - &SequenceIndexChunksLeftToWriteCounters, - &WriteCache, - &Work, - &FilteredWrittenBytesPerSecond, - &CopyChunkDatas, - &ScavengedContents, - &ScavengedLookups, - &ScavengedPaths, - &WritePartsComplete, - TotalPartWriteCount, - CopyDataIndex](std::atomic<bool>&) { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Async_CopyLocal"); - - FilteredWrittenBytesPerSecond.Start(); - const CopyChunkData& CopyData = CopyChunkDatas[CopyDataIndex]; - - std::vector<uint32_t> WrittenSequenceIndexes = WriteLocalChunkToCache(CloneQuery.get(), - CopyData, - ScavengedContents, - ScavengedLookups, - ScavengedPaths, - WriteCache); - WritePartsComplete++; - if (!m_AbortFlag) - { - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); - } - - // Write tracking, updating this must be done without any files open - std::vector<uint32_t> CompletedChunkSequences; - for (uint32_t RemoteSequenceIndex : WrittenSequenceIndexes) - { - if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) - { - CompletedChunkSequences.push_back(RemoteSequenceIndex); - } - } - WriteCache.Close(CompletedChunkSequences); - VerifyAndCompleteChunkSequencesAsync(CompletedChunkSequences, Work); - } - } - }); - } - - for (uint32_t BlockIndex : CachedChunkBlockIndexes) - { - ZEN_ASSERT(!m_Options.PrimeCacheOnly); - if (m_AbortFlag) - { - break; - } - - Work.ScheduleWork( - m_IOWorkerPool, - [this, - &RemoteChunkIndexNeedsCopyFromSourceFlags, - &SequenceIndexChunksLeftToWriteCounters, - &WriteCache, - &Work, - &FilteredWrittenBytesPerSecond, - &WritePartsComplete, - TotalPartWriteCount, - BlockIndex](std::atomic<bool>&) mutable { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Async_WriteCachedBlock"); - - const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; - FilteredWrittenBytesPerSecond.Start(); - - std::filesystem::path BlockChunkPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); - IoBuffer BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); - if (!BlockBuffer) - { - throw std::runtime_error( - fmt::format("Can not read block {} at {}", BlockDescription.BlockHash, BlockChunkPath)); - } - - if (!m_AbortFlag) - { - if (!WriteChunksBlockToCache(BlockDescription, - SequenceIndexChunksLeftToWriteCounters, - Work, - CompositeBuffer(std::move(BlockBuffer)), - RemoteChunkIndexNeedsCopyFromSourceFlags, - WriteCache)) - { - std::error_code DummyEc; - RemoveFile(BlockChunkPath, DummyEc); - throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash)); - } - - std::error_code Ec = TryRemoveFile(BlockChunkPath); - if (Ec) - { - ZEN_OPERATION_LOG_DEBUG(m_LogOutput, - "Failed removing file '{}', reason: ({}) {}", - BlockChunkPath, - Ec.value(), - Ec.message()); - } - - WritePartsComplete++; - - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); - } - } - } - }); - } - - for (size_t BlockRangeIndex = 0; BlockRangeIndex < PartialBlocks.BlockRanges.size();) - { - ZEN_ASSERT(!m_Options.PrimeCacheOnly); - if (m_AbortFlag) - { - break; - } - - size_t RangeCount = 1; - size_t RangesLeft = PartialBlocks.BlockRanges.size() - BlockRangeIndex; - const ChunkBlockAnalyser::BlockRangeDescriptor& CurrentBlockRange = PartialBlocks.BlockRanges[BlockRangeIndex]; - while (RangeCount < RangesLeft && - CurrentBlockRange.BlockIndex == PartialBlocks.BlockRanges[BlockRangeIndex + RangeCount].BlockIndex) - { - RangeCount++; - } - - Work.ScheduleWork( - m_NetworkPool, - [this, - &RemoteChunkIndexNeedsCopyFromSourceFlags, - &SequenceIndexChunksLeftToWriteCounters, - &ExistsResult, - &WriteCache, - &FilteredDownloadedBytesPerSecond, - TotalRequestCount, - &WritePartsComplete, - TotalPartWriteCount, - &FilteredWrittenBytesPerSecond, - &Work, - &PartialBlocks, - BlockRangeStartIndex = BlockRangeIndex, - RangeCount = RangeCount](std::atomic<bool>&) { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Async_GetPartialBlockRanges"); - - FilteredDownloadedBytesPerSecond.Start(); - - DownloadPartialBlock( - PartialBlocks.BlockRanges, - BlockRangeStartIndex, - RangeCount, - ExistsResult, - [this, - &RemoteChunkIndexNeedsCopyFromSourceFlags, - &SequenceIndexChunksLeftToWriteCounters, - &WritePartsComplete, - &WriteCache, - &Work, - TotalRequestCount, - TotalPartWriteCount, - &FilteredDownloadedBytesPerSecond, - &FilteredWrittenBytesPerSecond, - &PartialBlocks](IoBuffer&& InMemoryBuffer, - const std::filesystem::path& OnDiskPath, - size_t BlockRangeStartIndex, - std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengths) { - if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - - if (!m_AbortFlag) - { - Work.ScheduleWork( - m_IOWorkerPool, - [this, - &RemoteChunkIndexNeedsCopyFromSourceFlags, - &SequenceIndexChunksLeftToWriteCounters, - &WritePartsComplete, - &WriteCache, - &Work, - TotalPartWriteCount, - &FilteredWrittenBytesPerSecond, - &PartialBlocks, - BlockRangeStartIndex, - BlockChunkPath = std::filesystem::path(OnDiskPath), - BlockPartialBuffer = std::move(InMemoryBuffer), - OffsetAndLengths = std::vector<std::pair<uint64_t, uint64_t>>(OffsetAndLengths.begin(), - OffsetAndLengths.end())]( - std::atomic<bool>&) mutable { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Async_WritePartialBlock"); - - const uint32_t BlockIndex = PartialBlocks.BlockRanges[BlockRangeStartIndex].BlockIndex; - - const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; - - if (BlockChunkPath.empty()) - { - ZEN_ASSERT(BlockPartialBuffer); - } - else - { - ZEN_ASSERT(!BlockPartialBuffer); - BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); - if (!BlockPartialBuffer) - { - throw std::runtime_error( - fmt::format("Could not open downloaded block {} from {}", - BlockDescription.BlockHash, - BlockChunkPath)); - } - } - - FilteredWrittenBytesPerSecond.Start(); - - size_t RangeCount = OffsetAndLengths.size(); - - for (size_t PartialRangeIndex = 0; PartialRangeIndex < RangeCount; PartialRangeIndex++) - { - const std::pair<uint64_t, uint64_t>& OffsetAndLength = - OffsetAndLengths[PartialRangeIndex]; - IoBuffer BlockRangeBuffer(BlockPartialBuffer, - OffsetAndLength.first, - OffsetAndLength.second); - - const ChunkBlockAnalyser::BlockRangeDescriptor& RangeDescriptor = - PartialBlocks.BlockRanges[BlockRangeStartIndex + PartialRangeIndex]; - - if (!WritePartialBlockChunksToCache(BlockDescription, - SequenceIndexChunksLeftToWriteCounters, - Work, - CompositeBuffer(std::move(BlockRangeBuffer)), - RangeDescriptor.ChunkBlockIndexStart, - RangeDescriptor.ChunkBlockIndexStart + - RangeDescriptor.ChunkBlockIndexCount - 1, - RemoteChunkIndexNeedsCopyFromSourceFlags, - WriteCache)) - { - std::error_code DummyEc; - RemoveFile(BlockChunkPath, DummyEc); - throw std::runtime_error( - fmt::format("Partial block {} is malformed", BlockDescription.BlockHash)); - } - - WritePartsComplete++; - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); - } - } - std::error_code Ec = TryRemoveFile(BlockChunkPath); - if (Ec) - { - ZEN_OPERATION_LOG_DEBUG(m_LogOutput, - "Failed removing file '{}', reason: ({}) {}", - BlockChunkPath, - Ec.value(), - Ec.message()); - } - } - }, - OnDiskPath.empty() ? WorkerThreadPool::EMode::DisableBacklog - : WorkerThreadPool::EMode::EnableBacklog); - } - }); - } - }); - BlockRangeIndex += RangeCount; - } - - for (uint32_t BlockIndex : PartialBlocks.FullBlockIndexes) - { - if (m_AbortFlag) - { - break; - } - - if (m_Options.PrimeCacheOnly && ExistsResult.ExistingBlobs.contains(m_BlockDescriptions[BlockIndex].BlockHash)) - { - m_DownloadStats.RequestsCompleteCount++; - continue; - } - - Work.ScheduleWork( - m_NetworkPool, - [this, - &WritePartsComplete, - TotalPartWriteCount, - &FilteredWrittenBytesPerSecond, - &ExistsResult, - &Work, - &WriteCache, - &RemoteChunkIndexNeedsCopyFromSourceFlags, - &SequenceIndexChunksLeftToWriteCounters, - &FilteredDownloadedBytesPerSecond, - TotalRequestCount, - BlockIndex](std::atomic<bool>&) { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Async_GetFullBlock"); - - const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; - - FilteredDownloadedBytesPerSecond.Start(); - - IoBuffer BlockBuffer; - const bool ExistsInCache = - m_Storage.CacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash); - if (ExistsInCache) - { - BlockBuffer = m_Storage.CacheStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash); - } - if (!BlockBuffer) - { - BlockBuffer = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash); - if (BlockBuffer && m_Storage.CacheStorage && m_Options.PopulateCache) - { - m_Storage.CacheStorage->PutBuildBlob(m_BuildId, - BlockDescription.BlockHash, - ZenContentType::kCompressedBinary, - CompositeBuffer(SharedBuffer(BlockBuffer))); - } - } - if (!BlockBuffer) - { - throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); - } - if (!m_AbortFlag) - { - uint64_t BlockSize = BlockBuffer.GetSize(); - m_DownloadStats.DownloadedBlockCount++; - m_DownloadStats.DownloadedBlockByteCount += BlockSize; - m_DownloadStats.RequestsCompleteCount++; - if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - - if (!m_Options.PrimeCacheOnly) - { - std::filesystem::path BlockChunkPath; - - // Check if the dowloaded block is file based and we can move it directly without rewriting it - { - IoBufferFileReference FileRef; - if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && - (FileRef.FileChunkSize == BlockSize)) - { - ZEN_TRACE_CPU("MoveTempFullBlock"); - std::error_code Ec; - std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); - if (!Ec) - { - BlockBuffer.SetDeleteOnClose(false); - BlockBuffer = {}; - BlockChunkPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); - RenameFile(TempBlobPath, BlockChunkPath, Ec); - if (Ec) - { - BlockChunkPath = std::filesystem::path{}; - - // Re-open the temp file again - BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); - BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); - BlockBuffer.SetDeleteOnClose(true); - } - } - } - } - - if (BlockChunkPath.empty() && (BlockSize > m_Options.MaximumInMemoryPayloadSize)) - { - ZEN_TRACE_CPU("WriteTempFullBlock"); - // Could not be moved and rather large, lets store it on disk - BlockChunkPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(); - TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); - BlockBuffer = {}; - } - - if (!m_AbortFlag) - { - Work.ScheduleWork( - m_IOWorkerPool, - [this, - &Work, - &RemoteChunkIndexNeedsCopyFromSourceFlags, - &SequenceIndexChunksLeftToWriteCounters, - BlockIndex, - &WriteCache, - &WritePartsComplete, - TotalPartWriteCount, - &FilteredWrittenBytesPerSecond, - BlockChunkPath, - BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Async_WriteFullBlock"); - - const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; - - if (BlockChunkPath.empty()) - { - ZEN_ASSERT(BlockBuffer); - } - else - { - ZEN_ASSERT(!BlockBuffer); - BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); - if (!BlockBuffer) - { - throw std::runtime_error( - fmt::format("Could not open dowloaded block {} from {}", - BlockDescription.BlockHash, - BlockChunkPath)); - } - } - - FilteredWrittenBytesPerSecond.Start(); - if (!WriteChunksBlockToCache(BlockDescription, - SequenceIndexChunksLeftToWriteCounters, - Work, - CompositeBuffer(std::move(BlockBuffer)), - RemoteChunkIndexNeedsCopyFromSourceFlags, - WriteCache)) - { - std::error_code DummyEc; - RemoveFile(BlockChunkPath, DummyEc); - throw std::runtime_error( - fmt::format("Block {} is malformed", BlockDescription.BlockHash)); - } - - if (!BlockChunkPath.empty()) - { - std::error_code Ec = TryRemoveFile(BlockChunkPath); - if (Ec) - { - ZEN_OPERATION_LOG_DEBUG(m_LogOutput, - "Failed removing file '{}', reason: ({}) {}", - BlockChunkPath, - Ec.value(), - Ec.message()); - } - } - - WritePartsComplete++; - - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); - } - } - }, - BlockChunkPath.empty() ? WorkerThreadPool::EMode::DisableBacklog - : WorkerThreadPool::EMode::EnableBacklog); - } - } - } - } - }); - } - - { - ZEN_TRACE_CPU("WriteChunks_Wait"); - - Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(PendingWork); - uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() + - m_DownloadStats.DownloadedBlockByteCount.load() + - +m_DownloadStats.DownloadedPartialBlockByteCount.load(); - FilteredWrittenBytesPerSecond.Update(m_DiskStats.WriteByteCount.load()); - FilteredDownloadedBytesPerSecond.Update(DownloadedBytes); - std::string DownloadRateString = - (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) - ? "" - : fmt::format(" {}bits/s", NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8)); - std::string CloneDetails; - if (m_DiskStats.CloneCount.load() > 0) - { - CloneDetails = fmt::format(" ({} cloned)", NiceBytes(m_DiskStats.CloneByteCount.load())); - } - std::string WriteDetails = m_Options.PrimeCacheOnly ? "" - : fmt::format(" {}/{} ({}B/s) written{}", - NiceBytes(m_WrittenChunkByteCount.load()), - NiceBytes(BytesToWrite), - NiceNum(FilteredWrittenBytesPerSecond.GetCurrent()), - CloneDetails); - - std::string Details = fmt::format("{}/{} ({}{}) downloaded.{}", - m_DownloadStats.RequestsCompleteCount.load(), - TotalRequestCount, - NiceBytes(DownloadedBytes), - DownloadRateString, - WriteDetails); - - std::string Task; - if (m_Options.PrimeCacheOnly) - { - Task = "Downloading "; - } - else if ((m_WrittenChunkByteCount < BytesToWrite) || (BytesToValidate == 0)) - { - Task = "Writing chunks "; - } - else - { - Task = "Verifying chunks "; - } - - WriteProgressBar.UpdateState( - {.Task = Task, - .Details = Details, - .TotalCount = m_Options.PrimeCacheOnly ? TotalRequestCount : (BytesToWrite + BytesToValidate), - .RemainingCount = m_Options.PrimeCacheOnly ? (TotalRequestCount - m_DownloadStats.RequestsCompleteCount.load()) - : ((BytesToWrite + BytesToValidate) - - (m_WrittenChunkByteCount.load() + m_ValidatedChunkByteCount.load())), - .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, - false); - }); - } - - CloneQuery.reset(); - - FilteredWrittenBytesPerSecond.Stop(); - FilteredDownloadedBytesPerSecond.Stop(); - - WriteProgressBar.Finish(); - if (m_AbortFlag) - { - return; - } - - if (!m_Options.PrimeCacheOnly) - { - uint32_t RawSequencesMissingWriteCount = 0; - for (uint32_t SequenceIndex = 0; SequenceIndex < SequenceIndexChunksLeftToWriteCounters.size(); SequenceIndex++) - { - const auto& SequenceIndexChunksLeftToWriteCounter = SequenceIndexChunksLeftToWriteCounters[SequenceIndex]; - if (SequenceIndexChunksLeftToWriteCounter.load() != 0) - { - RawSequencesMissingWriteCount++; - const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; - const std::filesystem::path& IncompletePath = m_RemoteContent.Paths[PathIndex]; - ZEN_ASSERT(!IncompletePath.empty()); - const uint32_t ExpectedSequenceCount = m_RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]; - if (!m_Options.IsQuiet) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "{}: Max count {}, Current count {}", - IncompletePath, - ExpectedSequenceCount, - SequenceIndexChunksLeftToWriteCounter.load()); - } - ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounter.load() <= ExpectedSequenceCount); - } - } - ZEN_ASSERT(RawSequencesMissingWriteCount == 0); - ZEN_ASSERT(m_WrittenChunkByteCount == BytesToWrite); - ZEN_ASSERT(m_ValidatedChunkByteCount == BytesToValidate); - } - - const uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() + - m_DownloadStats.DownloadedBlockByteCount.load() + - m_DownloadStats.DownloadedPartialBlockByteCount.load(); - if (!m_Options.IsQuiet) - { - std::string CloneDetails; - if (m_DiskStats.CloneCount.load() > 0) - { - CloneDetails = fmt::format(" ({} cloned)", NiceBytes(m_DiskStats.CloneByteCount.load())); - } - ZEN_OPERATION_LOG_INFO( - m_LogOutput, - "Downloaded {} ({}bits/s) in {}. Wrote {} ({}B/s){} in {}. Completed in {}", - NiceBytes(DownloadedBytes), - NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), DownloadedBytes * 8)), - NiceTimeSpanMs(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS() / 1000), - NiceBytes(m_WrittenChunkByteCount.load()), - NiceNum(GetBytesPerSecond(FilteredWrittenBytesPerSecond.GetElapsedTimeUS(), m_DiskStats.WriteByteCount.load())), - CloneDetails, - NiceTimeSpanMs(FilteredWrittenBytesPerSecond.GetElapsedTimeUS() / 1000), - NiceTimeSpanMs(WriteTimer.GetElapsedTimeMs())); - } - - m_WriteChunkStats.WriteChunksElapsedWallTimeUs = WriteTimer.GetElapsedTimeUs(); - m_WriteChunkStats.DownloadTimeUs = FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(); - m_WriteChunkStats.WriteTimeUs = FilteredWrittenBytesPerSecond.GetElapsedTimeUS(); - } - - if (m_Options.PrimeCacheOnly) - { - return; - } - - m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::PrepareTarget, (uint32_t)TaskSteps::StepCount); - - tsl::robin_map<uint32_t, uint32_t> RemotePathIndexToLocalPathIndex; - RemotePathIndexToLocalPathIndex.reserve(m_RemoteContent.Paths.size()); - - tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> SequenceHashToLocalPathIndex; - std::vector<uint32_t> RemoveLocalPathIndexes; - - if (m_AbortFlag) - { - return; - } - - { - ZEN_TRACE_CPU("PrepareTarget"); - - tsl::robin_set<IoHash, IoHash::Hasher> CachedRemoteSequences; - - std::vector<uint32_t> FilesToCache; - - uint64_t MatchCount = 0; - uint64_t PathMismatchCount = 0; - uint64_t HashMismatchCount = 0; - std::atomic<uint64_t> CachedCount = 0; - std::atomic<uint64_t> CachedByteCount = 0; - uint64_t SkippedCount = 0; - uint64_t DeleteCount = 0; - for (uint32_t LocalPathIndex = 0; LocalPathIndex < m_LocalContent.Paths.size(); LocalPathIndex++) - { - if (m_AbortFlag) - { - break; - } - const IoHash& RawHash = m_LocalContent.RawHashes[LocalPathIndex]; - const std::filesystem::path& LocalPath = m_LocalContent.Paths[LocalPathIndex]; - - ZEN_ASSERT_SLOW(IsFile((m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred())); - - if (m_Options.EnableTargetFolderScavenging) - { - if (!m_Options.WipeTargetFolder) - { - // Check if it is already in the correct place - if (auto RemotePathIt = RemotePathToRemoteIndex.find(LocalPath.generic_string()); - RemotePathIt != RemotePathToRemoteIndex.end()) - { - const uint32_t RemotePathIndex = RemotePathIt->second; - if (m_RemoteContent.RawHashes[RemotePathIndex] == RawHash) - { - // It is already in it's correct place - RemotePathIndexToLocalPathIndex[RemotePathIndex] = LocalPathIndex; - SequenceHashToLocalPathIndex.insert({RawHash, LocalPathIndex}); - MatchCount++; - continue; - } - else - { - HashMismatchCount++; - } - } - else - { - PathMismatchCount++; - } - } - - // Do we need it? - if (m_RemoteLookup.RawHashToSequenceIndex.contains(RawHash)) - { - if (!CachedRemoteSequences.contains(RawHash)) - { - // We need it, make sure we move it to the cache - FilesToCache.push_back(LocalPathIndex); - CachedRemoteSequences.insert(RawHash); - continue; - } - else - { - SkippedCount++; - } - } - } - - if (!m_Options.WipeTargetFolder) - { - // Explicitly delete the unneeded local file - RemoveLocalPathIndexes.push_back(LocalPathIndex); - DeleteCount++; - } - } - - if (m_AbortFlag) - { - return; - } - - { - ZEN_TRACE_CPU("CopyToCache"); - - Stopwatch Timer; - - std::unique_ptr<OperationLogOutput::ProgressBar> CacheLocalProgressBarPtr( - m_LogOutput.CreateProgressBar("Cache Local Data")); - OperationLogOutput::ProgressBar& CacheLocalProgressBar(*CacheLocalProgressBarPtr); - ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - - for (uint32_t LocalPathIndex : FilesToCache) - { - if (m_AbortFlag) - { - break; - } - Work.ScheduleWork(m_IOWorkerPool, [this, &CachedCount, &CachedByteCount, LocalPathIndex](std::atomic<bool>&) { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Async_CopyToCache"); - - const IoHash& RawHash = m_LocalContent.RawHashes[LocalPathIndex]; - const std::filesystem::path& LocalPath = m_LocalContent.Paths[LocalPathIndex]; - const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RawHash); - ZEN_ASSERT_SLOW(!IsFileWithRetry(CacheFilePath)); - const std::filesystem::path LocalFilePath = (m_Path / LocalPath).make_preferred(); - - std::error_code Ec = RenameFileWithRetry(LocalFilePath, CacheFilePath); - if (Ec) - { - ZEN_OPERATION_LOG_WARN(m_LogOutput, - "Failed to move file from '{}' to '{}', reason: ({}) {}, retrying...", - LocalFilePath, - CacheFilePath, - Ec.value(), - Ec.message()); - Ec = RenameFileWithRetry(LocalFilePath, CacheFilePath); - if (Ec) - { - throw std::system_error(std::error_code(Ec.value(), std::system_category()), - fmt::format("Failed to file from '{}' to '{}', reason: ({}) {}", - LocalFilePath, - CacheFilePath, - Ec.value(), - Ec.message())); - } - } - - CachedCount++; - CachedByteCount += m_LocalContent.RawSizes[LocalPathIndex]; - } - }); - } - - { - ZEN_TRACE_CPU("CopyToCache_Wait"); - - Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(PendingWork); - const uint64_t WorkTotal = FilesToCache.size(); - const uint64_t WorkComplete = CachedCount.load(); - std::string Details = fmt::format("{}/{} ({}) files", WorkComplete, WorkTotal, NiceBytes(CachedByteCount)); - CacheLocalProgressBar.UpdateState( - {.Task = "Caching local ", - .Details = Details, - .TotalCount = gsl::narrow<uint64_t>(WorkTotal), - .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete), - .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, - false); - }); - } - - CacheLocalProgressBar.Finish(); - if (m_AbortFlag) - { - return; - } - - ZEN_OPERATION_LOG_DEBUG(m_LogOutput, - "Local state prep: Match: {}, PathMismatch: {}, HashMismatch: {}, Cached: {} ({}), Skipped: {}, " - "Delete: {}", - MatchCount, - PathMismatchCount, - HashMismatchCount, - CachedCount.load(), - NiceBytes(CachedByteCount.load()), - SkippedCount, - DeleteCount); - } - } - - m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::FinalizeTarget, (uint32_t)TaskSteps::StepCount); - - if (m_Options.WipeTargetFolder) - { - ZEN_TRACE_CPU("WipeTarget"); - Stopwatch Timer; - - // Clean target folder - if (!CleanDirectory(m_LogOutput, m_IOWorkerPool, m_AbortFlag, m_PauseFlag, m_Options.IsQuiet, m_Path, m_Options.ExcludeFolders)) - { - ZEN_OPERATION_LOG_WARN(m_LogOutput, "Some files in {} could not be removed", m_Path); - } - m_RebuildFolderStateStats.CleanFolderElapsedWallTimeUs = Timer.GetElapsedTimeUs(); - } - - if (m_AbortFlag) - { - return; - } - - { - ZEN_TRACE_CPU("FinalizeTree"); - - Stopwatch Timer; - - std::unique_ptr<OperationLogOutput::ProgressBar> RebuildProgressBarPtr(m_LogOutput.CreateProgressBar("Rebuild State")); - OperationLogOutput::ProgressBar& RebuildProgressBar(*RebuildProgressBarPtr); - ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - - OutLocalFolderState.Paths.resize(m_RemoteContent.Paths.size()); - OutLocalFolderState.RawSizes.resize(m_RemoteContent.Paths.size()); - OutLocalFolderState.Attributes.resize(m_RemoteContent.Paths.size()); - OutLocalFolderState.ModificationTicks.resize(m_RemoteContent.Paths.size()); - - std::atomic<uint64_t> DeletedCount = 0; - - for (uint32_t LocalPathIndex : RemoveLocalPathIndexes) - { - if (m_AbortFlag) - { - break; - } - Work.ScheduleWork(m_IOWorkerPool, [this, &DeletedCount, LocalPathIndex](std::atomic<bool>&) { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Async_RemoveFile"); - - const std::filesystem::path LocalFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred(); - SetFileReadOnlyWithRetry(LocalFilePath, false); - RemoveFileWithRetry(LocalFilePath); - DeletedCount++; - } - }); - } - - std::atomic<uint64_t> TargetsComplete = 0; - - struct FinalizeTarget - { - IoHash RawHash; - uint32_t RemotePathIndex; - }; - - std::vector<FinalizeTarget> Targets; - Targets.reserve(m_RemoteContent.Paths.size()); - for (uint32_t RemotePathIndex = 0; RemotePathIndex < m_RemoteContent.Paths.size(); RemotePathIndex++) - { - Targets.push_back( - FinalizeTarget{.RawHash = m_RemoteContent.RawHashes[RemotePathIndex], .RemotePathIndex = RemotePathIndex}); - } - std::sort(Targets.begin(), Targets.end(), [](const FinalizeTarget& Lhs, const FinalizeTarget& Rhs) { - if (Lhs.RawHash < Rhs.RawHash) - { - return true; - } - else if (Lhs.RawHash > Rhs.RawHash) - { - return false; - } - return Lhs.RemotePathIndex < Rhs.RemotePathIndex; - }); - - size_t TargetOffset = 0; - while (TargetOffset < Targets.size()) - { - if (m_AbortFlag) - { - break; - } - - size_t TargetCount = 1; - while ((TargetOffset + TargetCount) < Targets.size() && - (Targets[TargetOffset + TargetCount].RawHash == Targets[TargetOffset].RawHash)) - { - TargetCount++; - } - - Work.ScheduleWork( - m_IOWorkerPool, - [this, - &SequenceHashToLocalPathIndex, - &Targets, - &RemotePathIndexToLocalPathIndex, - &OutLocalFolderState, - BaseTargetOffset = TargetOffset, - TargetCount, - &TargetsComplete](std::atomic<bool>&) { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Async_FinalizeChunkSequence"); - - size_t TargetOffset = BaseTargetOffset; - const IoHash& RawHash = Targets[TargetOffset].RawHash; - - if (RawHash == IoHash::Zero) - { - ZEN_TRACE_CPU("CreateEmptyFiles"); - while (TargetOffset < (BaseTargetOffset + TargetCount)) - { - const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex; - ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash); - const std::filesystem::path& TargetPath = m_RemoteContent.Paths[RemotePathIndex]; - std::filesystem::path TargetFilePath = (m_Path / TargetPath).make_preferred(); - if (!RemotePathIndexToLocalPathIndex[RemotePathIndex]) - { - if (IsFileWithRetry(TargetFilePath)) - { - SetFileReadOnlyWithRetry(TargetFilePath, false); - } - else - { - CreateDirectories(TargetFilePath.parent_path()); - } - BasicFile OutputFile; - OutputFile.Open(TargetFilePath, BasicFile::Mode::kTruncate); - } - OutLocalFolderState.Paths[RemotePathIndex] = TargetPath; - OutLocalFolderState.RawSizes[RemotePathIndex] = m_RemoteContent.RawSizes[RemotePathIndex]; - - OutLocalFolderState.Attributes[RemotePathIndex] = - m_RemoteContent.Attributes.empty() - ? GetNativeFileAttributes(TargetFilePath) - : SetNativeFileAttributes(TargetFilePath, - m_RemoteContent.Platform, - m_RemoteContent.Attributes[RemotePathIndex]); - OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath); - - TargetOffset++; - TargetsComplete++; - } - } - else - { - ZEN_TRACE_CPU("FinalizeFile"); - ZEN_ASSERT(m_RemoteLookup.RawHashToSequenceIndex.contains(RawHash)); - const uint32_t FirstRemotePathIndex = Targets[TargetOffset].RemotePathIndex; - const std::filesystem::path& FirstTargetPath = m_RemoteContent.Paths[FirstRemotePathIndex]; - std::filesystem::path FirstTargetFilePath = (m_Path / FirstTargetPath).make_preferred(); - - if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(FirstRemotePathIndex); - InPlaceIt != RemotePathIndexToLocalPathIndex.end()) - { - ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath)); - } - else - { - if (IsFileWithRetry(FirstTargetFilePath)) - { - SetFileReadOnlyWithRetry(FirstTargetFilePath, false); - } - else - { - CreateDirectories(FirstTargetFilePath.parent_path()); - } - - if (auto InplaceIt = SequenceHashToLocalPathIndex.find(RawHash); - InplaceIt != SequenceHashToLocalPathIndex.end()) - { - ZEN_TRACE_CPU("Copy"); - const uint32_t LocalPathIndex = InplaceIt->second; - const std::filesystem::path& SourcePath = m_LocalContent.Paths[LocalPathIndex]; - std::filesystem::path SourceFilePath = (m_Path / SourcePath).make_preferred(); - ZEN_ASSERT_SLOW(IsFileWithRetry(SourceFilePath)); - - ZEN_OPERATION_LOG_DEBUG(m_LogOutput, - "Copying from '{}' -> '{}'", - SourceFilePath, - FirstTargetFilePath); - const uint64_t RawSize = m_LocalContent.RawSizes[LocalPathIndex]; - FastCopyFile(m_Options.AllowFileClone, - m_Options.UseSparseFiles, - SourceFilePath, - FirstTargetFilePath, - RawSize, - m_DiskStats.WriteCount, - m_DiskStats.WriteByteCount, - m_DiskStats.CloneCount, - m_DiskStats.CloneByteCount); - - m_RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++; - } - else - { - ZEN_TRACE_CPU("Rename"); - const std::filesystem::path CacheFilePath = - GetFinalChunkedSequenceFileName(m_CacheFolderPath, RawHash); - ZEN_ASSERT_SLOW(IsFileWithRetry(CacheFilePath)); - - std::error_code Ec = RenameFileWithRetry(CacheFilePath, FirstTargetFilePath); - if (Ec) - { - ZEN_OPERATION_LOG_WARN(m_LogOutput, - "Failed to move file from '{}' to '{}', reason: ({}) {}, retrying...", - CacheFilePath, - FirstTargetFilePath, - Ec.value(), - Ec.message()); - Ec = RenameFileWithRetry(CacheFilePath, FirstTargetFilePath); - if (Ec) - { - throw std::system_error( - std::error_code(Ec.value(), std::system_category()), - fmt::format("Failed to move file from '{}' to '{}', reason: ({}) {}", - CacheFilePath, - FirstTargetFilePath, - Ec.value(), - Ec.message())); - } - } - - m_RebuildFolderStateStats.FinalizeTreeFilesMovedCount++; - } - } - - OutLocalFolderState.Paths[FirstRemotePathIndex] = FirstTargetPath; - OutLocalFolderState.RawSizes[FirstRemotePathIndex] = m_RemoteContent.RawSizes[FirstRemotePathIndex]; - - OutLocalFolderState.Attributes[FirstRemotePathIndex] = - m_RemoteContent.Attributes.empty() - ? GetNativeFileAttributes(FirstTargetFilePath) - : SetNativeFileAttributes(FirstTargetFilePath, - m_RemoteContent.Platform, - m_RemoteContent.Attributes[FirstRemotePathIndex]); - OutLocalFolderState.ModificationTicks[FirstRemotePathIndex] = - GetModificationTickFromPath(FirstTargetFilePath); - - TargetOffset++; - TargetsComplete++; - - while (TargetOffset < (BaseTargetOffset + TargetCount)) - { - const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex; - ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash); - const std::filesystem::path& TargetPath = m_RemoteContent.Paths[RemotePathIndex]; - std::filesystem::path TargetFilePath = (m_Path / TargetPath).make_preferred(); - - if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(RemotePathIndex); - InPlaceIt != RemotePathIndexToLocalPathIndex.end()) - { - ZEN_ASSERT_SLOW(IsFileWithRetry(TargetFilePath)); - } - else - { - ZEN_TRACE_CPU("Copy"); - if (IsFileWithRetry(TargetFilePath)) - { - SetFileReadOnlyWithRetry(TargetFilePath, false); - } - else - { - CreateDirectories(TargetFilePath.parent_path()); - } - - ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath)); - ZEN_OPERATION_LOG_DEBUG(m_LogOutput, - "Copying from '{}' -> '{}'", - FirstTargetFilePath, - TargetFilePath); - const uint64_t RawSize = m_RemoteContent.RawSizes[RemotePathIndex]; - FastCopyFile(m_Options.AllowFileClone, - m_Options.UseSparseFiles, - FirstTargetFilePath, - TargetFilePath, - RawSize, - m_DiskStats.WriteCount, - m_DiskStats.WriteByteCount, - m_DiskStats.CloneCount, - m_DiskStats.CloneByteCount); - - m_RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++; - } - - OutLocalFolderState.Paths[RemotePathIndex] = TargetPath; - OutLocalFolderState.RawSizes[RemotePathIndex] = m_RemoteContent.RawSizes[RemotePathIndex]; - - OutLocalFolderState.Attributes[RemotePathIndex] = - m_RemoteContent.Attributes.empty() - ? GetNativeFileAttributes(TargetFilePath) - : SetNativeFileAttributes(TargetFilePath, - m_RemoteContent.Platform, - m_RemoteContent.Attributes[RemotePathIndex]); - OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath); - - TargetOffset++; - TargetsComplete++; - } - } - } - }); - - TargetOffset += TargetCount; - } - - { - ZEN_TRACE_CPU("FinalizeTree_Wait"); - - Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(PendingWork); - const uint64_t WorkTotal = Targets.size() + RemoveLocalPathIndexes.size(); - const uint64_t WorkComplete = TargetsComplete.load() + DeletedCount.load(); - std::string Details = fmt::format("{}/{} files", WorkComplete, WorkTotal); - RebuildProgressBar.UpdateState({.Task = "Rebuilding state ", - .Details = Details, - .TotalCount = gsl::narrow<uint64_t>(WorkTotal), - .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete), - .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, - false); - }); - } - - m_RebuildFolderStateStats.FinalizeTreeElapsedWallTimeUs = Timer.GetElapsedTimeUs(); - RebuildProgressBar.Finish(); - } - m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::Cleanup, (uint32_t)TaskSteps::StepCount); - } - catch (const std::exception&) - { - m_AbortFlag = true; - throw; - } -} - -void -BuildsOperationUpdateFolder::ScanCacheFolder(tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& OutCachedChunkHashesFound, - tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& OutCachedSequenceHashesFound) -{ - ZEN_TRACE_CPU("ScanCacheFolder"); - - Stopwatch CacheTimer; - - DirectoryContent CacheDirContent; - GetDirectoryContent(m_CacheFolderPath, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeFileSizes, CacheDirContent); - for (size_t Index = 0; Index < CacheDirContent.Files.size(); Index++) - { - if (m_Options.EnableTargetFolderScavenging) - { - IoHash FileHash; - if (IoHash::TryParse(CacheDirContent.Files[Index].filename().string(), FileHash)) - { - if (auto ChunkIt = m_RemoteLookup.ChunkHashToChunkIndex.find(FileHash); - ChunkIt != m_RemoteLookup.ChunkHashToChunkIndex.end()) - { - const uint32_t ChunkIndex = ChunkIt->second; - const uint64_t ChunkSize = m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; - if (ChunkSize == CacheDirContent.FileSizes[Index]) - { - OutCachedChunkHashesFound.insert({FileHash, ChunkIndex}); - m_CacheMappingStats.CacheChunkCount++; - m_CacheMappingStats.CacheChunkByteCount += ChunkSize; - continue; - } - } - else if (auto SequenceIt = m_RemoteLookup.RawHashToSequenceIndex.find(FileHash); - SequenceIt != m_RemoteLookup.RawHashToSequenceIndex.end()) - { - const uint32_t SequenceIndex = SequenceIt->second; - const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; - const uint64_t SequenceSize = m_RemoteContent.RawSizes[PathIndex]; - if (SequenceSize == CacheDirContent.FileSizes[Index]) - { - OutCachedSequenceHashesFound.insert({FileHash, SequenceIndex}); - m_CacheMappingStats.CacheSequenceHashesCount++; - m_CacheMappingStats.CacheSequenceHashesByteCount += SequenceSize; - - const std::filesystem::path CacheFilePath = - GetFinalChunkedSequenceFileName(m_CacheFolderPath, - m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); - ZEN_ASSERT_SLOW(IsFile(CacheFilePath)); - - continue; - } - } - } - } - std::error_code Ec = TryRemoveFile(CacheDirContent.Files[Index]); - if (Ec) - { - ZEN_OPERATION_LOG_DEBUG(m_LogOutput, - "Failed removing file '{}', reason: ({}) {}", - CacheDirContent.Files[Index], - Ec.value(), - Ec.message()); - } - } - m_CacheMappingStats.CacheScanElapsedWallTimeUs += CacheTimer.GetElapsedTimeUs(); -} - -void -BuildsOperationUpdateFolder::ScanTempBlocksFolder(tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& OutCachedBlocksFound) -{ - ZEN_TRACE_CPU("ScanTempBlocksFolder"); - - Stopwatch CacheTimer; - - tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> AllBlockSizes; - AllBlockSizes.reserve(m_BlockDescriptions.size()); - for (uint32_t BlockIndex = 0; BlockIndex < m_BlockDescriptions.size(); BlockIndex++) - { - const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; - AllBlockSizes.insert({BlockDescription.BlockHash, BlockIndex}); - } - - DirectoryContent BlockDirContent; - GetDirectoryContent(m_TempBlockFolderPath, - DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeFileSizes, - BlockDirContent); - OutCachedBlocksFound.reserve(BlockDirContent.Files.size()); - for (size_t Index = 0; Index < BlockDirContent.Files.size(); Index++) - { - if (m_Options.EnableTargetFolderScavenging) - { - IoHash FileHash; - if (IoHash::TryParse(BlockDirContent.Files[Index].filename().string(), FileHash)) - { - if (auto BlockIt = AllBlockSizes.find(FileHash); BlockIt != AllBlockSizes.end()) - { - const uint32_t BlockIndex = BlockIt->second; - const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; - uint64_t BlockSize = CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize; - for (uint64_t ChunkSize : BlockDescription.ChunkCompressedLengths) - { - BlockSize += ChunkSize; - } - - if (BlockSize == BlockDirContent.FileSizes[Index]) - { - OutCachedBlocksFound.insert({FileHash, BlockIndex}); - m_CacheMappingStats.CacheBlockCount++; - m_CacheMappingStats.CacheBlocksByteCount += BlockSize; - continue; - } - } - } - } - std::error_code Ec = TryRemoveFile(BlockDirContent.Files[Index]); - if (Ec) - { - ZEN_OPERATION_LOG_DEBUG(m_LogOutput, - "Failed removing file '{}', reason: ({}) {}", - BlockDirContent.Files[Index], - Ec.value(), - Ec.message()); - } - } - - m_CacheMappingStats.CacheScanElapsedWallTimeUs += CacheTimer.GetElapsedTimeUs(); -} - -std::vector<BuildsOperationUpdateFolder::ScavengeSource> -BuildsOperationUpdateFolder::FindScavengeSources() -{ - ZEN_TRACE_CPU("FindScavengeSources"); - - const bool TargetPathExists = IsDir(m_Path); - - std::vector<std::filesystem::path> StatePaths = GetDownloadedStatePaths(m_Options.SystemRootDir); - - std::vector<ScavengeSource> Result; - for (const std::filesystem::path& EntryPath : StatePaths) - { - if (IsFile(EntryPath)) - { - bool DeleteEntry = false; - - try - { - BuildsDownloadInfo Info = ReadDownloadedInfoFile(EntryPath); - const bool LocalPathExists = !Info.LocalPath.empty() && IsDir(Info.LocalPath); - const bool LocalStateFileExists = IsFile(Info.StateFilePath); - if (LocalPathExists && LocalStateFileExists) - { - if (TargetPathExists && std::filesystem::equivalent(Info.LocalPath, m_Path)) - { - DeleteEntry = true; - } - else - { - Result.push_back({.StateFilePath = std::move(Info.StateFilePath), .Path = std::move(Info.LocalPath)}); - } - } - else - { - DeleteEntry = true; - } - } - catch (const std::exception& Ex) - { - ZEN_OPERATION_LOG_WARN(m_LogOutput, "{}", Ex.what()); - DeleteEntry = true; - } - - if (DeleteEntry) - { - std::error_code DummyEc; - std::filesystem::remove(EntryPath, DummyEc); - } - } - } - return Result; -} - -std::vector<uint32_t> -BuildsOperationUpdateFolder::ScanTargetFolder(const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& CachedChunkHashesFound, - const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& CachedSequenceHashesFound) -{ - ZEN_TRACE_CPU("ScanTargetFolder"); - - Stopwatch LocalTimer; - - std::vector<uint32_t> MissingSequenceIndexes; - - for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < m_RemoteContent.ChunkedContent.SequenceRawHashes.size(); - RemoteSequenceIndex++) - { - const IoHash& RemoteSequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; - const uint32_t RemotePathIndex = GetFirstPathIndexForSeqeuenceIndex(m_RemoteLookup, RemoteSequenceIndex); - const uint64_t RemoteRawSize = m_RemoteContent.RawSizes[RemotePathIndex]; - if (auto CacheSequenceIt = CachedSequenceHashesFound.find(RemoteSequenceRawHash); - CacheSequenceIt != CachedSequenceHashesFound.end()) - { - const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RemoteSequenceRawHash); - ZEN_ASSERT_SLOW(IsFile(CacheFilePath)); - if (m_Options.IsVerbose) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Found sequence {} at {} ({})", - RemoteSequenceRawHash, - CacheFilePath, - NiceBytes(RemoteRawSize)); - } - } - else if (auto CacheChunkIt = CachedChunkHashesFound.find(RemoteSequenceRawHash); CacheChunkIt != CachedChunkHashesFound.end()) - { - const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RemoteSequenceRawHash); - ZEN_ASSERT_SLOW(IsFile(CacheFilePath)); - if (m_Options.IsVerbose) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Found chunk {} at {} ({})", - RemoteSequenceRawHash, - CacheFilePath, - NiceBytes(RemoteRawSize)); - } - } - else if (auto It = m_LocalLookup.RawHashToSequenceIndex.find(RemoteSequenceRawHash); - It != m_LocalLookup.RawHashToSequenceIndex.end()) - { - const uint32_t LocalSequenceIndex = It->second; - const uint32_t LocalPathIndex = GetFirstPathIndexForSeqeuenceIndex(m_LocalLookup, LocalSequenceIndex); - const std::filesystem::path LocalFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred(); - ZEN_ASSERT_SLOW(IsFile(LocalFilePath)); - m_CacheMappingStats.LocalPathsMatchingSequencesCount++; - m_CacheMappingStats.LocalPathsMatchingSequencesByteCount += RemoteRawSize; - if (m_Options.IsVerbose) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Found sequence {} at {} ({})", - RemoteSequenceRawHash, - LocalFilePath, - NiceBytes(RemoteRawSize)); - } - } - else - { - MissingSequenceIndexes.push_back(RemoteSequenceIndex); - } - } - - m_CacheMappingStats.LocalScanElapsedWallTimeUs += LocalTimer.GetElapsedTimeUs(); - return MissingSequenceIndexes; -} - -bool -BuildsOperationUpdateFolder::FindScavengeContent(const ScavengeSource& Source, - ChunkedFolderContent& OutScavengedLocalContent, - ChunkedContentLookup& OutScavengedLookup) -{ - ZEN_TRACE_CPU("FindScavengeContent"); - - FolderContent LocalFolderState; - try - { - BuildSaveState SavedState = ReadBuildSaveStateFile(Source.StateFilePath); - if (SavedState.Version == BuildSaveState::NoVersion) - { - ZEN_OPERATION_LOG_DEBUG(m_LogOutput, - "Skipping old build state at '{}', state files before version {} can not be trusted during scavenge", - Source.StateFilePath, - BuildSaveState::kVersion1); - return false; - } - OutScavengedLocalContent = std::move(SavedState.State.ChunkedContent); - LocalFolderState = std::move(SavedState.FolderState); - } - catch (const std::exception& Ex) - { - ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Skipping invalid build state at '{}', reason: {}", Source.StateFilePath, Ex.what()); - return false; - } - - tsl::robin_set<uint32_t> PathIndexesToScavenge; - PathIndexesToScavenge.reserve(OutScavengedLocalContent.Paths.size()); - std::vector<uint32_t> ChunkOrderOffsets = BuildChunkOrderOffset(OutScavengedLocalContent.ChunkedContent.ChunkCounts); - - { - tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> RawHashToPathIndex; - - RawHashToPathIndex.reserve(OutScavengedLocalContent.Paths.size()); - for (uint32_t ScavengedPathIndex = 0; ScavengedPathIndex < OutScavengedLocalContent.RawHashes.size(); ScavengedPathIndex++) - { - if (!RawHashToPathIndex.contains(OutScavengedLocalContent.RawHashes[ScavengedPathIndex])) - { - RawHashToPathIndex.insert_or_assign(OutScavengedLocalContent.RawHashes[ScavengedPathIndex], ScavengedPathIndex); - } - } - - for (uint32_t ScavengeSequenceIndex = 0; ScavengeSequenceIndex < OutScavengedLocalContent.ChunkedContent.SequenceRawHashes.size(); - ScavengeSequenceIndex++) - { - const IoHash& SequenceHash = OutScavengedLocalContent.ChunkedContent.SequenceRawHashes[ScavengeSequenceIndex]; - if (auto It = RawHashToPathIndex.find(SequenceHash); It != RawHashToPathIndex.end()) - { - uint32_t PathIndex = It->second; - if (!PathIndexesToScavenge.contains(PathIndex)) - { - if (m_RemoteLookup.RawHashToSequenceIndex.contains(SequenceHash)) - { - PathIndexesToScavenge.insert(PathIndex); - } - else - { - uint32_t ChunkOrderIndexStart = ChunkOrderOffsets[ScavengeSequenceIndex]; - const uint32_t ChunkCount = OutScavengedLocalContent.ChunkedContent.ChunkCounts[ScavengeSequenceIndex]; - for (uint32_t ChunkOrderIndex = 0; ChunkOrderIndex < ChunkCount; ChunkOrderIndex++) - { - const uint32_t ChunkIndex = - OutScavengedLocalContent.ChunkedContent.ChunkOrders[ChunkOrderIndexStart + ChunkOrderIndex]; - const IoHash& ChunkHash = OutScavengedLocalContent.ChunkedContent.ChunkHashes[ChunkIndex]; - if (m_RemoteLookup.ChunkHashToChunkIndex.contains(ChunkHash)) - { - PathIndexesToScavenge.insert(PathIndex); - break; - } - } - } - } - } - else - { - ZEN_OPERATION_LOG_WARN(m_LogOutput, - "Scavenged state file at '{}' for '{}' is invalid, skipping scavenging for sequence {}", - Source.StateFilePath, - Source.Path, - SequenceHash); - } - } - } - - if (PathIndexesToScavenge.empty()) - { - OutScavengedLocalContent = {}; - return false; - } - - std::vector<std::filesystem::path> PathsToScavenge; - PathsToScavenge.reserve(PathIndexesToScavenge.size()); - for (uint32_t ScavengedStatePathIndex : PathIndexesToScavenge) - { - PathsToScavenge.push_back(OutScavengedLocalContent.Paths[ScavengedStatePathIndex]); - } - - FolderContent ValidFolderContent = - GetValidFolderContent(m_IOWorkerPool, m_ScavengedFolderScanStats, Source.Path, PathsToScavenge, {}, 0, m_AbortFlag, m_PauseFlag); - - if (!LocalFolderState.AreKnownFilesEqual(ValidFolderContent)) - { - std::vector<std::filesystem::path> DeletedPaths; - FolderContent UpdatedContent = GetUpdatedContent(LocalFolderState, ValidFolderContent, DeletedPaths); - - // If the files are modified since the state was saved we ignore the files since we don't - // want to incur the cost of scanning/hashing scavenged files - DeletedPaths.insert(DeletedPaths.end(), UpdatedContent.Paths.begin(), UpdatedContent.Paths.end()); - if (!DeletedPaths.empty()) - { - OutScavengedLocalContent = - DeletePathsFromChunkedContent(OutScavengedLocalContent, - BuildHashLookup(OutScavengedLocalContent.ChunkedContent.SequenceRawHashes), - ChunkOrderOffsets, - DeletedPaths); - } - } - - if (OutScavengedLocalContent.Paths.empty()) - { - OutScavengedLocalContent = {}; - return false; - } - - OutScavengedLookup = BuildChunkedContentLookup(OutScavengedLocalContent); - - return true; -} - -void -BuildsOperationUpdateFolder::ScavengeSourceForChunks(uint32_t& InOutRemainingChunkCount, - std::vector<bool>& InOutRemoteChunkIndexNeedsCopyFromLocalFileFlags, - tsl::robin_map<IoHash, size_t, IoHash::Hasher>& InOutRawHashToCopyChunkDataIndex, - const std::vector<std::atomic<uint32_t>>& SequenceIndexChunksLeftToWriteCounters, - const ChunkedFolderContent& ScavengedContent, - const ChunkedContentLookup& ScavengedLookup, - std::vector<CopyChunkData>& InOutCopyChunkDatas, - uint32_t ScavengedContentIndex, - uint64_t& InOutChunkMatchingRemoteCount, - uint64_t& InOutChunkMatchingRemoteByteCount) -{ - for (uint32_t RemoteChunkIndex = 0; - RemoteChunkIndex < m_RemoteContent.ChunkedContent.ChunkHashes.size() && (InOutRemainingChunkCount > 0); - RemoteChunkIndex++) - { - if (!InOutRemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) - { - const IoHash& RemoteChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; - if (auto It = ScavengedLookup.ChunkHashToChunkIndex.find(RemoteChunkHash); It != ScavengedLookup.ChunkHashToChunkIndex.end()) - { - std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = - GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex); - - if (!ChunkTargetPtrs.empty()) - { - const uint32_t ScavengedChunkIndex = It->second; - const uint64_t ScavengedChunkRawSize = ScavengedContent.ChunkedContent.ChunkRawSizes[ScavengedChunkIndex]; - const size_t ChunkSequenceLocationOffset = ScavengedLookup.ChunkSequenceLocationOffset[ScavengedChunkIndex]; - const ChunkedContentLookup::ChunkSequenceLocation& ScavengeLocation = - ScavengedLookup.ChunkSequenceLocations[ChunkSequenceLocationOffset]; - const IoHash& ScavengedSequenceRawHash = - ScavengedContent.ChunkedContent.SequenceRawHashes[ScavengeLocation.SequenceIndex]; - - CopyChunkData::ChunkTarget Target = {.TargetChunkLocationCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()), - .RemoteChunkIndex = RemoteChunkIndex, - .CacheFileOffset = ScavengeLocation.Offset}; - if (auto CopySourceIt = InOutRawHashToCopyChunkDataIndex.find(ScavengedSequenceRawHash); - CopySourceIt != InOutRawHashToCopyChunkDataIndex.end()) - { - CopyChunkData& Data = InOutCopyChunkDatas[CopySourceIt->second]; - if (Data.TargetChunkLocationPtrs.size() > 1024) - { - InOutRawHashToCopyChunkDataIndex.insert_or_assign(ScavengedSequenceRawHash, InOutCopyChunkDatas.size()); - InOutCopyChunkDatas.push_back(CopyChunkData{.ScavengeSourceIndex = ScavengedContentIndex, - .SourceSequenceIndex = ScavengeLocation.SequenceIndex, - .TargetChunkLocationPtrs = ChunkTargetPtrs, - .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}}); - } - else - { - Data.TargetChunkLocationPtrs.insert(Data.TargetChunkLocationPtrs.end(), - ChunkTargetPtrs.begin(), - ChunkTargetPtrs.end()); - Data.ChunkTargets.push_back(Target); - } - } - else - { - InOutRawHashToCopyChunkDataIndex.insert_or_assign(ScavengedSequenceRawHash, InOutCopyChunkDatas.size()); - InOutCopyChunkDatas.push_back(CopyChunkData{.ScavengeSourceIndex = ScavengedContentIndex, - .SourceSequenceIndex = ScavengeLocation.SequenceIndex, - .TargetChunkLocationPtrs = ChunkTargetPtrs, - .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}}); - } - InOutChunkMatchingRemoteCount++; - InOutChunkMatchingRemoteByteCount += ScavengedChunkRawSize; - InOutRemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex] = true; - InOutRemainingChunkCount--; - } - } - } - } -} - -std::filesystem::path -BuildsOperationUpdateFolder::FindDownloadedChunk(const IoHash& ChunkHash) -{ - ZEN_TRACE_CPU("FindDownloadedChunk"); - - std::filesystem::path CompressedChunkPath = m_TempDownloadFolderPath / ChunkHash.ToHexString(); - if (IsFile(CompressedChunkPath)) - { - IoBuffer ExistingCompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); - if (ExistingCompressedPart) - { - IoHash RawHash; - uint64_t RawSize; - if (CompressedBuffer::ValidateCompressedHeader(ExistingCompressedPart, - RawHash, - RawSize, - /*OutOptionalTotalCompressedSize*/ nullptr)) - { - return CompressedChunkPath; - } - else - { - std::error_code DummyEc; - RemoveFile(CompressedChunkPath, DummyEc); - } - } - } - return {}; -} - -std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> -BuildsOperationUpdateFolder::GetRemainingChunkTargets(std::span<const std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, - uint32_t ChunkIndex) -{ - ZEN_TRACE_CPU("GetRemainingChunkTargets"); - - std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkSources = GetChunkSequenceLocations(m_RemoteLookup, ChunkIndex); - std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs; - if (!ChunkSources.empty()) - { - ChunkTargetPtrs.reserve(ChunkSources.size()); - for (const ChunkedContentLookup::ChunkSequenceLocation& Source : ChunkSources) - { - if (SequenceIndexChunksLeftToWriteCounters[Source.SequenceIndex].load() > 0) - { - ChunkTargetPtrs.push_back(&Source); - } - } - } - return ChunkTargetPtrs; -}; - -uint64_t -BuildsOperationUpdateFolder::GetChunkWriteCount(std::span<const std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, - uint32_t ChunkIndex) -{ - ZEN_TRACE_CPU("GetChunkWriteCount"); - - uint64_t WriteCount = 0; - std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkSources = GetChunkSequenceLocations(m_RemoteLookup, ChunkIndex); - for (const ChunkedContentLookup::ChunkSequenceLocation& Source : ChunkSources) - { - if (SequenceIndexChunksLeftToWriteCounters[Source.SequenceIndex].load() > 0) - { - WriteCount++; - } - } - return WriteCount; -}; - -void -BuildsOperationUpdateFolder::CheckRequiredDiskSpace(const tsl::robin_map<std::string, uint32_t>& RemotePathToRemoteIndex) -{ - tsl::robin_set<uint32_t> ExistingRemotePaths; - - if (m_Options.EnableTargetFolderScavenging) - { - for (uint32_t LocalPathIndex = 0; LocalPathIndex < m_LocalContent.Paths.size(); LocalPathIndex++) - { - const IoHash& RawHash = m_LocalContent.RawHashes[LocalPathIndex]; - const std::filesystem::path& LocalPath = m_LocalContent.Paths[LocalPathIndex]; - - if (auto RemotePathIt = RemotePathToRemoteIndex.find(LocalPath.generic_string()); RemotePathIt != RemotePathToRemoteIndex.end()) - { - const uint32_t RemotePathIndex = RemotePathIt->second; - if (m_RemoteContent.RawHashes[RemotePathIndex] == RawHash) - { - ExistingRemotePaths.insert(RemotePathIndex); - } - } - } - } - - uint64_t RequiredSpace = 0; - for (uint32_t RemotePathIndex = 0; RemotePathIndex < m_RemoteContent.Paths.size(); RemotePathIndex++) - { - if (!ExistingRemotePaths.contains(RemotePathIndex)) - { - RequiredSpace += m_RemoteContent.RawSizes[RemotePathIndex]; - } - } - - std::error_code Ec; - DiskSpace Space = DiskSpaceInfo(m_Path, Ec); - if (Ec) - { - throw std::runtime_error(fmt::format("Get free disk space for target path '{}' FAILED, reason: {}", m_Path, Ec.message())); - } - if (Space.Free < (RequiredSpace + 16u * 1024u * 1024u)) - { - throw std::runtime_error( - fmt::format("Not enough free space for target path '{}', {} of free space is needed", m_Path, RequiredSpace)); - } -} - -void -BuildsOperationUpdateFolder::WriteScavengedSequenceToCache(const std::filesystem::path& ScavengeRootPath, - const ChunkedFolderContent& ScavengedContent, - const ScavengedSequenceCopyOperation& ScavengeOp) -{ - ZEN_TRACE_CPU("WriteScavengedSequenceToCache"); - - const std::filesystem::path ScavengedPath = ScavengedContent.Paths[ScavengeOp.ScavengedPathIndex]; - const std::filesystem::path ScavengedFilePath = (ScavengeRootPath / ScavengedPath).make_preferred(); - ZEN_ASSERT_SLOW(FileSizeFromPath(ScavengedFilePath) == ScavengeOp.RawSize); - - const IoHash& RemoteSequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[ScavengeOp.RemoteSequenceIndex]; - const std::filesystem::path TempFilePath = GetTempChunkedSequenceFileName(m_CacheFolderPath, RemoteSequenceRawHash); - - const uint64_t RawSize = ScavengedContent.RawSizes[ScavengeOp.ScavengedPathIndex]; - FastCopyFile(m_Options.AllowFileClone, - m_Options.UseSparseFiles, - ScavengedFilePath, - TempFilePath, - RawSize, - m_DiskStats.WriteCount, - m_DiskStats.WriteByteCount, - m_DiskStats.CloneCount, - m_DiskStats.CloneByteCount); - - const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RemoteSequenceRawHash); - RenameFile(TempFilePath, CacheFilePath); - - m_WrittenChunkByteCount += RawSize; - if (m_Options.ValidateCompletedSequences) - { - m_ValidatedChunkByteCount += RawSize; - } -} - -void -BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkIndex, - const BlobsExistsResult& ExistsResult, - std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, - std::atomic<uint64_t>& WritePartsComplete, - std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>&& ChunkTargetPtrs, - BufferedWriteFileCache& WriteCache, - ParallelWork& Work, - uint64_t TotalRequestCount, - uint64_t TotalPartWriteCount, - FilteredRate& FilteredDownloadedBytesPerSecond, - FilteredRate& FilteredWrittenBytesPerSecond) -{ - std::filesystem::path ExistingCompressedChunkPath; - if (!m_Options.PrimeCacheOnly) - { - const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; - ExistingCompressedChunkPath = FindDownloadedChunk(ChunkHash); - if (!ExistingCompressedChunkPath.empty()) - { - m_DownloadStats.RequestsCompleteCount++; - if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - } - } - if (!m_AbortFlag) - { - if (!ExistingCompressedChunkPath.empty()) - { - Work.ScheduleWork( - m_IOWorkerPool, - [this, - SequenceIndexChunksLeftToWriteCounters, - &WriteCache, - &Work, - &WritePartsComplete, - TotalPartWriteCount, - &FilteredWrittenBytesPerSecond, - RemoteChunkIndex, - ChunkTargetPtrs = std::move(ChunkTargetPtrs), - CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>& AbortFlag) mutable { - if (!AbortFlag) - { - ZEN_TRACE_CPU("Async_WritePreDownloadedChunk"); - - FilteredWrittenBytesPerSecond.Start(); - - const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; - - IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); - if (!CompressedPart) - { - throw std::runtime_error( - fmt::format("Could not open dowloaded compressed chunk {} from {}", ChunkHash, CompressedChunkPath)); - } - - bool NeedHashVerify = - WriteCompressedChunkToCache(ChunkHash, ChunkTargetPtrs, WriteCache, std::move(CompressedPart)); - WritePartsComplete++; - - if (!AbortFlag) - { - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); - } - - std::error_code Ec = TryRemoveFile(CompressedChunkPath); - if (Ec) - { - ZEN_OPERATION_LOG_DEBUG(m_LogOutput, - "Failed removing file '{}', reason: ({}) {}", - CompressedChunkPath, - Ec.value(), - Ec.message()); - } - - std::vector<uint32_t> CompletedSequences = - CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); - WriteCache.Close(CompletedSequences); - if (NeedHashVerify) - { - VerifyAndCompleteChunkSequencesAsync(CompletedSequences, Work); - } - else - { - FinalizeChunkSequences(CompletedSequences); - } - } - } - }); - } - else - { - Work.ScheduleWork(m_NetworkPool, - [this, - &ExistsResult, - SequenceIndexChunksLeftToWriteCounters, - &WriteCache, - &Work, - &WritePartsComplete, - TotalPartWriteCount, - TotalRequestCount, - &FilteredDownloadedBytesPerSecond, - &FilteredWrittenBytesPerSecond, - RemoteChunkIndex, - ChunkTargetPtrs = std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>( - std::move(ChunkTargetPtrs))](std::atomic<bool>&) mutable { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Async_DownloadChunk"); - - FilteredDownloadedBytesPerSecond.Start(); - DownloadBuildBlob(RemoteChunkIndex, - ExistsResult, - Work, - [this, - &ExistsResult, - SequenceIndexChunksLeftToWriteCounters, - &WriteCache, - &Work, - &WritePartsComplete, - TotalPartWriteCount, - TotalRequestCount, - RemoteChunkIndex, - &FilteredDownloadedBytesPerSecond, - &FilteredWrittenBytesPerSecond, - ChunkTargetPtrs = std::move(ChunkTargetPtrs)](IoBuffer&& Payload) mutable { - if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - IoBufferFileReference FileRef; - bool EnableBacklog = Payload.GetFileReference(FileRef); - AsyncWriteDownloadedChunk(m_Options.ZenFolderPath, - RemoteChunkIndex, - std::move(ChunkTargetPtrs), - WriteCache, - Work, - std::move(Payload), - SequenceIndexChunksLeftToWriteCounters, - WritePartsComplete, - TotalPartWriteCount, - FilteredWrittenBytesPerSecond, - EnableBacklog); - }); - } - }); - } - } -} - -void -BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkIndex, - const BlobsExistsResult& ExistsResult, - ParallelWork& Work, - std::function<void(IoBuffer&& Payload)>&& OnDownloaded) -{ - const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; - // FilteredDownloadedBytesPerSecond.Start(); - IoBuffer BuildBlob; - const bool ExistsInCache = m_Storage.CacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash); - if (ExistsInCache) - { - BuildBlob = m_Storage.CacheStorage->GetBuildBlob(m_BuildId, ChunkHash); - } - if (BuildBlob) - { - uint64_t BlobSize = BuildBlob.GetSize(); - m_DownloadStats.DownloadedChunkCount++; - m_DownloadStats.DownloadedChunkByteCount += BlobSize; - m_DownloadStats.RequestsCompleteCount++; - OnDownloaded(std::move(BuildBlob)); - } - else - { - if (m_RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= m_Options.LargeAttachmentSize) - { - DownloadLargeBlob( - *m_Storage.BuildStorage, - m_TempDownloadFolderPath, - m_BuildId, - ChunkHash, - m_Options.PreferredMultipartChunkSize, - Work, - m_NetworkPool, - m_DownloadStats.DownloadedChunkByteCount, - m_DownloadStats.MultipartAttachmentCount, - [this, &Work, ChunkHash, RemoteChunkIndex, OnDownloaded = std::move(OnDownloaded)](IoBuffer&& Payload) mutable { - m_DownloadStats.DownloadedChunkCount++; - m_DownloadStats.RequestsCompleteCount++; - - if (Payload && m_Storage.CacheStorage && m_Options.PopulateCache) - { - m_Storage.CacheStorage->PutBuildBlob(m_BuildId, - ChunkHash, - ZenContentType::kCompressedBinary, - CompositeBuffer(SharedBuffer(Payload))); - } - - OnDownloaded(std::move(Payload)); - }); - } - else - { - BuildBlob = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, ChunkHash); - if (BuildBlob && m_Storage.CacheStorage && m_Options.PopulateCache) - { - m_Storage.CacheStorage->PutBuildBlob(m_BuildId, - ChunkHash, - ZenContentType::kCompressedBinary, - CompositeBuffer(SharedBuffer(BuildBlob))); - } - if (!BuildBlob) - { - throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash)); - } - if (!m_Options.PrimeCacheOnly) - { - if (!m_AbortFlag) - { - uint64_t BlobSize = BuildBlob.GetSize(); - m_DownloadStats.DownloadedChunkCount++; - m_DownloadStats.DownloadedChunkByteCount += BlobSize; - m_DownloadStats.RequestsCompleteCount++; - - OnDownloaded(std::move(BuildBlob)); - } - } - } - } -} - -void -BuildsOperationUpdateFolder::DownloadPartialBlock( - std::span<const ChunkBlockAnalyser::BlockRangeDescriptor> BlockRanges, - size_t BlockRangeStartIndex, - size_t BlockRangeCount, - const BlobsExistsResult& ExistsResult, - std::function<void(IoBuffer&& InMemoryBuffer, - const std::filesystem::path& OnDiskPath, - size_t BlockRangeStartIndex, - std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengths)>&& OnDownloaded) -{ - const uint32_t BlockIndex = BlockRanges[BlockRangeStartIndex].BlockIndex; - - const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex]; - - auto ProcessDownload = [this]( - const ChunkBlockDescription& BlockDescription, - IoBuffer&& BlockRangeBuffer, - size_t BlockRangeStartIndex, - std::span<const std::pair<uint64_t, uint64_t>> BlockOffsetAndLengths, - const std::function<void(IoBuffer && InMemoryBuffer, - const std::filesystem::path& OnDiskPath, - size_t BlockRangeStartIndex, - std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengths)>& OnDownloaded) { - uint64_t BlockRangeBufferSize = BlockRangeBuffer.GetSize(); - m_DownloadStats.DownloadedBlockCount++; - m_DownloadStats.DownloadedBlockByteCount += BlockRangeBufferSize; - m_DownloadStats.RequestsCompleteCount += BlockOffsetAndLengths.size(); - - std::filesystem::path BlockChunkPath; - - // Check if the dowloaded block is file based and we can move it directly without rewriting it - { - IoBufferFileReference FileRef; - if (BlockRangeBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && - (FileRef.FileChunkSize == BlockRangeBufferSize)) - { - ZEN_TRACE_CPU("MoveTempPartialBlock"); - - std::error_code Ec; - std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); - if (!Ec) - { - BlockRangeBuffer.SetDeleteOnClose(false); - BlockRangeBuffer = {}; - - IoHashStream RangeId; - for (const std::pair<uint64_t, uint64_t>& Range : BlockOffsetAndLengths) - { - RangeId.Append(&Range.first, sizeof(uint64_t)); - RangeId.Append(&Range.second, sizeof(uint64_t)); - } - - BlockChunkPath = m_TempBlockFolderPath / fmt::format("{}_{}", BlockDescription.BlockHash, RangeId.GetHash()); - RenameFile(TempBlobPath, BlockChunkPath, Ec); - if (Ec) - { - BlockChunkPath = std::filesystem::path{}; - - // Re-open the temp file again - BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); - BlockRangeBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockRangeBufferSize, true); - BlockRangeBuffer.SetDeleteOnClose(true); - } - } - } - } - - if (BlockChunkPath.empty() && (BlockRangeBufferSize > m_Options.MaximumInMemoryPayloadSize)) - { - ZEN_TRACE_CPU("WriteTempPartialBlock"); - - IoHashStream RangeId; - for (const std::pair<uint64_t, uint64_t>& Range : BlockOffsetAndLengths) - { - RangeId.Append(&Range.first, sizeof(uint64_t)); - RangeId.Append(&Range.second, sizeof(uint64_t)); - } - - // Could not be moved and rather large, lets store it on disk - BlockChunkPath = m_TempBlockFolderPath / fmt::format("{}_{}", BlockDescription.BlockHash, RangeId.GetHash()); - TemporaryFile::SafeWriteFile(BlockChunkPath, BlockRangeBuffer); - BlockRangeBuffer = {}; - } - if (!m_AbortFlag) - { - OnDownloaded(std::move(BlockRangeBuffer), std::move(BlockChunkPath), BlockRangeStartIndex, BlockOffsetAndLengths); - } - }; - - std::vector<std::pair<uint64_t, uint64_t>> Ranges; - Ranges.reserve(BlockRangeCount); - for (size_t BlockRangeIndex = BlockRangeStartIndex; BlockRangeIndex < BlockRangeStartIndex + BlockRangeCount; BlockRangeIndex++) - { - const ChunkBlockAnalyser::BlockRangeDescriptor& BlockRange = BlockRanges[BlockRangeIndex]; - Ranges.push_back(std::make_pair(BlockRange.RangeStart, BlockRange.RangeLength)); - } - - const bool ExistsInCache = m_Storage.CacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash); - - size_t SubBlockRangeCount = BlockRangeCount; - size_t SubRangeCountComplete = 0; - std::span<const std::pair<uint64_t, uint64_t>> RangesSpan(Ranges); - while (SubRangeCountComplete < SubBlockRangeCount) - { - if (m_AbortFlag) - { - break; - } - - // First try to get subrange from cache. - // If not successful, try to get the ranges from the build store and adapt SubRangeCount... - - size_t SubRangeStartIndex = BlockRangeStartIndex + SubRangeCountComplete; - if (ExistsInCache) - { - size_t SubRangeCount = Min(BlockRangeCount - SubRangeCountComplete, m_Storage.CacheHost.Caps.MaxRangeCountPerRequest); - - if (SubRangeCount == 1) - { - // Legacy single-range path, prefer that for max compatibility - - const std::pair<uint64_t, uint64_t> SubRange = RangesSpan[SubRangeCountComplete]; - IoBuffer PayloadBuffer = - m_Storage.CacheStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash, SubRange.first, SubRange.second); - if (m_AbortFlag) - { - break; - } - if (PayloadBuffer) - { - ProcessDownload(BlockDescription, - std::move(PayloadBuffer), - SubRangeStartIndex, - std::vector<std::pair<uint64_t, uint64_t>>{std::make_pair(0u, SubRange.second)}, - OnDownloaded); - SubRangeCountComplete += SubRangeCount; - continue; - } - } - else - { - auto SubRanges = RangesSpan.subspan(SubRangeCountComplete, SubRangeCount); - - BuildStorageCache::BuildBlobRanges RangeBuffers = - m_Storage.CacheStorage->GetBuildBlobRanges(m_BuildId, BlockDescription.BlockHash, SubRanges); - if (m_AbortFlag) - { - break; - } - if (RangeBuffers.PayloadBuffer) - { - if (RangeBuffers.Ranges.empty()) - { - SubRangeCount = Ranges.size() - SubRangeCountComplete; - ProcessDownload(BlockDescription, - std::move(RangeBuffers.PayloadBuffer), - SubRangeStartIndex, - RangesSpan.subspan(SubRangeCountComplete, SubRangeCount), - OnDownloaded); - SubRangeCountComplete += SubRangeCount; - continue; - } - else if (RangeBuffers.Ranges.size() == SubRangeCount) - { - ProcessDownload(BlockDescription, - std::move(RangeBuffers.PayloadBuffer), - SubRangeStartIndex, - RangeBuffers.Ranges, - OnDownloaded); - SubRangeCountComplete += SubRangeCount; - continue; - } - } - } - } - - size_t SubRangeCount = Min(BlockRangeCount - SubRangeCountComplete, m_Storage.BuildStorageHost.Caps.MaxRangeCountPerRequest); - - auto SubRanges = RangesSpan.subspan(SubRangeCountComplete, SubRangeCount); - - BuildStorageBase::BuildBlobRanges RangeBuffers = - m_Storage.BuildStorage->GetBuildBlobRanges(m_BuildId, BlockDescription.BlockHash, SubRanges); - if (m_AbortFlag) - { - break; - } - if (RangeBuffers.PayloadBuffer) - { - if (RangeBuffers.Ranges.empty()) - { - // Jupiter will ignore the ranges and send the whole payload if it fetches the payload from S3 - // Upload to cache (if enabled) and use the whole payload for the remaining ranges - - if (m_Storage.CacheStorage && m_Options.PopulateCache) - { - m_Storage.CacheStorage->PutBuildBlob(m_BuildId, - BlockDescription.BlockHash, - ZenContentType::kCompressedBinary, - CompositeBuffer(std::vector<IoBuffer>{RangeBuffers.PayloadBuffer})); - if (m_AbortFlag) - { - break; - } - } - - SubRangeCount = Ranges.size() - SubRangeCountComplete; - ProcessDownload(BlockDescription, - std::move(RangeBuffers.PayloadBuffer), - SubRangeStartIndex, - RangesSpan.subspan(SubRangeCountComplete, SubRangeCount), - OnDownloaded); - } - else - { - if (RangeBuffers.Ranges.size() != SubRanges.size()) - { - throw std::runtime_error(fmt::format("Fetching {} ranges from {} resulted in {} ranges", - SubRanges.size(), - BlockDescription.BlockHash, - RangeBuffers.Ranges.size())); - } - ProcessDownload(BlockDescription, - std::move(RangeBuffers.PayloadBuffer), - SubRangeStartIndex, - RangeBuffers.Ranges, - OnDownloaded); - } - } - else - { - throw std::runtime_error(fmt::format("Block {} is missing when fetching {} ranges", BlockDescription.BlockHash, SubRangeCount)); - } - - SubRangeCountComplete += SubRangeCount; - } -} - -std::vector<uint32_t> -BuildsOperationUpdateFolder::WriteLocalChunkToCache(CloneQueryInterface* CloneQuery, - const CopyChunkData& CopyData, - const std::vector<ChunkedFolderContent>& ScavengedContents, - const std::vector<ChunkedContentLookup>& ScavengedLookups, - const std::vector<std::filesystem::path>& ScavengedPaths, - BufferedWriteFileCache& WriteCache) -{ - ZEN_TRACE_CPU("WriteLocalChunkToCache"); - - std::filesystem::path SourceFilePath; - - if (CopyData.ScavengeSourceIndex == (uint32_t)-1) - { - const uint32_t LocalPathIndex = m_LocalLookup.SequenceIndexFirstPathIndex[CopyData.SourceSequenceIndex]; - SourceFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred(); - } - else - { - const ChunkedFolderContent& ScavengedContent = ScavengedContents[CopyData.ScavengeSourceIndex]; - const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[CopyData.ScavengeSourceIndex]; - const std::filesystem::path ScavengedPath = ScavengedPaths[CopyData.ScavengeSourceIndex]; - const uint32_t ScavengedPathIndex = ScavengedLookup.SequenceIndexFirstPathIndex[CopyData.SourceSequenceIndex]; - SourceFilePath = (ScavengedPath / ScavengedContent.Paths[ScavengedPathIndex]).make_preferred(); - } - ZEN_ASSERT_SLOW(IsFile(SourceFilePath)); - ZEN_ASSERT(!CopyData.TargetChunkLocationPtrs.empty()); - - uint64_t CacheLocalFileBytesRead = 0; - - size_t TargetStart = 0; - const std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> AllTargets(CopyData.TargetChunkLocationPtrs); - - struct WriteOp - { - const ChunkedContentLookup::ChunkSequenceLocation* Target = nullptr; - uint64_t CacheFileOffset = (uint64_t)-1; - uint32_t ChunkIndex = (uint32_t)-1; - }; - - std::vector<WriteOp> WriteOps; - - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Sort"); - WriteOps.reserve(AllTargets.size()); - for (const CopyChunkData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets) - { - std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> TargetRange = - AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount); - for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange) - { - WriteOps.push_back( - WriteOp{.Target = Target, .CacheFileOffset = ChunkTarget.CacheFileOffset, .ChunkIndex = ChunkTarget.RemoteChunkIndex}); - } - TargetStart += ChunkTarget.TargetChunkLocationCount; - } - - std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) { - if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex) - { - return true; - } - else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex) - { - return false; - } - if (Lhs.Target->Offset < Rhs.Target->Offset) - { - return true; - } - return false; - }); - } - - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Write"); - - tsl::robin_set<uint32_t> ChunkIndexesWritten; - - BufferedOpenFile SourceFile(SourceFilePath, - m_DiskStats.OpenReadCount, - m_DiskStats.CurrentOpenFileCount, - m_DiskStats.ReadCount, - m_DiskStats.ReadByteCount); - - bool CanCloneSource = CloneQuery && CloneQuery->CanClone(SourceFile.Handle()); - - BufferedWriteFileCache::Local LocalWriter(WriteCache); - - for (size_t WriteOpIndex = 0; WriteOpIndex < WriteOps.size();) - { - if (m_AbortFlag) - { - break; - } - const WriteOp& Op = WriteOps[WriteOpIndex]; - - const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex; - const uint32_t RemotePathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]; - const uint64_t TargetSize = m_RemoteContent.RawSizes[RemotePathIndex]; - const uint64_t ChunkSize = m_RemoteContent.ChunkedContent.ChunkRawSizes[Op.ChunkIndex]; - - uint64_t ReadLength = ChunkSize; - size_t WriteCount = 1; - uint64_t OpSourceEnd = Op.CacheFileOffset + ChunkSize; - uint64_t OpTargetEnd = Op.Target->Offset + ChunkSize; - while ((WriteOpIndex + WriteCount) < WriteOps.size()) - { - const WriteOp& NextOp = WriteOps[WriteOpIndex + WriteCount]; - if (NextOp.Target->SequenceIndex != Op.Target->SequenceIndex) - { - break; - } - if (NextOp.Target->Offset != OpTargetEnd) - { - break; - } - if (NextOp.CacheFileOffset != OpSourceEnd) - { - break; - } - const uint64_t NextChunkLength = m_RemoteContent.ChunkedContent.ChunkRawSizes[NextOp.ChunkIndex]; - if (ReadLength + NextChunkLength > BufferedOpenFile::BlockSize) - { - break; - } - ReadLength += NextChunkLength; - OpSourceEnd += NextChunkLength; - OpTargetEnd += NextChunkLength; - WriteCount++; - } - - { - bool DidClone = false; - - if (CanCloneSource) - { - uint64_t PreBytes = 0; - uint64_t PostBytes = 0; - uint64_t ClonableBytes = - CloneQuery->GetClonableRange(Op.CacheFileOffset, Op.Target->Offset, ReadLength, PreBytes, PostBytes); - if (ClonableBytes > 0) - { - // We need to open the file... - BufferedWriteFileCache::Local::Writer* Writer = LocalWriter.GetWriter(RemoteSequenceIndex); - if (!Writer) - { - Writer = LocalWriter.PutWriter(RemoteSequenceIndex, std::make_unique<BufferedWriteFileCache::Local::Writer>()); - - Writer->File = std::make_unique<BasicFile>(); - - const std::filesystem::path FileName = - GetTempChunkedSequenceFileName(m_CacheFolderPath, - m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]); - Writer->File->Open(FileName, BasicFile::Mode::kWrite); - if (m_Options.UseSparseFiles) - { - PrepareFileForScatteredWrite(Writer->File->Handle(), TargetSize); - } - } - DidClone = CloneQuery->TryClone(SourceFile.Handle(), - Writer->File->Handle(), - Op.CacheFileOffset + PreBytes, - Op.Target->Offset + PreBytes, - ClonableBytes, - TargetSize); - if (DidClone) - { - m_DiskStats.WriteCount++; - m_DiskStats.WriteByteCount += ClonableBytes; - - m_DiskStats.CloneCount++; - m_DiskStats.CloneByteCount += ClonableBytes; - - m_WrittenChunkByteCount += ClonableBytes; - - if (PreBytes > 0) - { - CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, PreBytes); - const uint64_t FileOffset = Op.Target->Offset; - - WriteSequenceChunkToCache(LocalWriter, ChunkSource, RemoteSequenceIndex, FileOffset, RemotePathIndex); - } - if (PostBytes > 0) - { - CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset + ReadLength - PostBytes, PostBytes); - const uint64_t FileOffset = Op.Target->Offset + ReadLength - PostBytes; - - WriteSequenceChunkToCache(LocalWriter, ChunkSource, RemoteSequenceIndex, FileOffset, RemotePathIndex); - } - } - } - } - - if (!DidClone) - { - CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ReadLength); - - const uint64_t FileOffset = Op.Target->Offset; - - WriteSequenceChunkToCache(LocalWriter, ChunkSource, RemoteSequenceIndex, FileOffset, RemotePathIndex); - } - } - - CacheLocalFileBytesRead += ReadLength; // TODO: This should be the sum of unique chunk sizes? - - WriteOpIndex += WriteCount; - } - } - - if (m_Options.IsVerbose) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, "Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), SourceFilePath); - } - - std::vector<uint32_t> Result; - Result.reserve(WriteOps.size()); - - for (const WriteOp& Op : WriteOps) - { - Result.push_back(Op.Target->SequenceIndex); - } - return Result; -} - -bool -BuildsOperationUpdateFolder::WriteCompressedChunkToCache( - const IoHash& ChunkHash, - const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs, - BufferedWriteFileCache& WriteCache, - IoBuffer&& CompressedPart) -{ - ZEN_TRACE_CPU("WriteCompressedChunkToCache"); - - auto ChunkHashToChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); - ZEN_ASSERT(ChunkHashToChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end()); - if (IsSingleFileChunk(m_RemoteContent, ChunkTargetPtrs)) - { - const std::uint32_t SequenceIndex = ChunkTargetPtrs.front()->SequenceIndex; - const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]; - StreamDecompress(SequenceRawHash, CompositeBuffer(std::move(CompressedPart))); - return false; - } - else - { - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompositeBuffer(std::move(CompressedPart)), RawHash, RawSize); - if (!Compressed) - { - throw std::runtime_error(fmt::format("Failed to parse header of compressed large blob {}", ChunkHash)); - } - if (RawHash != ChunkHash) - { - throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, ChunkHash)); - } - - BufferedWriteFileCache::Local LocalWriter(WriteCache); - - IoHashStream Hash; - bool CouldDecompress = Compressed.DecompressToStream( - 0, - (uint64_t)-1, - [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { - ZEN_UNUSED(SourceOffset); - ZEN_TRACE_CPU("Async_StreamDecompress_Write"); - m_DiskStats.ReadByteCount += SourceSize; - if (!m_AbortFlag) - { - for (const ChunkedContentLookup::ChunkSequenceLocation* TargetPtr : ChunkTargetPtrs) - { - const auto& Target = *TargetPtr; - const uint64_t FileOffset = Target.Offset + Offset; - const uint32_t SequenceIndex = Target.SequenceIndex; - const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; - - WriteSequenceChunkToCache(LocalWriter, RangeBuffer, SequenceIndex, FileOffset, PathIndex); - } - - return true; - } - return false; - }); - - if (m_AbortFlag) - { - return false; - } - - if (!CouldDecompress) - { - throw std::runtime_error(fmt::format("Failed to decompress large chunk {}", ChunkHash)); - } - - return true; - } -} - -void -BuildsOperationUpdateFolder::StreamDecompress(const IoHash& SequenceRawHash, CompositeBuffer&& CompressedPart) -{ - ZEN_TRACE_CPU("StreamDecompress"); - const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash); - TemporaryFile DecompressedTemp; - std::error_code Ec; - DecompressedTemp.CreateTemporary(TempChunkSequenceFileName.parent_path(), Ec); - if (Ec) - { - throw std::runtime_error(fmt::format("Failed creating temporary file for decompressing large blob {}, reason: ({}) {}", - SequenceRawHash, - Ec.value(), - Ec.message())); - } - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompressedPart, RawHash, RawSize); - if (!Compressed) - { - throw std::runtime_error(fmt::format("Failed to parse header of compressed large blob {}", SequenceRawHash)); - } - if (RawHash != SequenceRawHash) - { - throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, SequenceRawHash)); - } - PrepareFileForScatteredWrite(DecompressedTemp.Handle(), RawSize); - - IoHashStream Hash; - bool CouldDecompress = - Compressed.DecompressToStream(0, - (uint64_t)-1, - [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { - ZEN_UNUSED(SourceOffset); - ZEN_TRACE_CPU("StreamDecompress_Write"); - m_DiskStats.ReadCount++; - m_DiskStats.ReadByteCount += SourceSize; - if (!m_AbortFlag) - { - for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) - { - if (m_Options.ValidateCompletedSequences) - { - Hash.Append(Segment.GetView()); - m_ValidatedChunkByteCount += Segment.GetSize(); - } - DecompressedTemp.Write(Segment, Offset); - Offset += Segment.GetSize(); - m_DiskStats.WriteByteCount += Segment.GetSize(); - m_DiskStats.WriteCount++; - m_WrittenChunkByteCount += Segment.GetSize(); - } - return true; - } - return false; - }); - - if (m_AbortFlag) - { - return; - } - - if (!CouldDecompress) - { - throw std::runtime_error(fmt::format("Failed to decompress large blob {}", SequenceRawHash)); - } - if (m_Options.ValidateCompletedSequences) - { - const IoHash VerifyHash = Hash.GetHash(); - if (VerifyHash != SequenceRawHash) - { - throw std::runtime_error( - fmt::format("Decompressed blob payload hash {} does not match expected hash {}", VerifyHash, SequenceRawHash)); - } - } - DecompressedTemp.MoveTemporaryIntoPlace(TempChunkSequenceFileName, Ec); - if (Ec) - { - throw std::runtime_error(fmt::format("Failed moving temporary file for decompressing large blob {}, reason: ({}) {}", - SequenceRawHash, - Ec.value(), - Ec.message())); - } - // WriteChunkStats.ChunkCountWritten++; -} - -void -BuildsOperationUpdateFolder::WriteSequenceChunkToCache(BufferedWriteFileCache::Local& LocalWriter, - const CompositeBuffer& Chunk, - const uint32_t SequenceIndex, - const uint64_t FileOffset, - const uint32_t PathIndex) -{ - ZEN_TRACE_CPU("WriteSequenceChunkToCache"); - - const uint64_t SequenceSize = m_RemoteContent.RawSizes[PathIndex]; - - auto OpenFile = [&](BasicFile& File) { - const std::filesystem::path FileName = - GetTempChunkedSequenceFileName(m_CacheFolderPath, m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); - File.Open(FileName, BasicFile::Mode::kWrite); - if (m_Options.UseSparseFiles) - { - PrepareFileForScatteredWrite(File.Handle(), SequenceSize); - } - }; - - const uint64_t ChunkSize = Chunk.GetSize(); - ZEN_ASSERT(FileOffset + ChunkSize <= SequenceSize); - if (ChunkSize == SequenceSize) - { - BasicFile SingleChunkFile; - OpenFile(SingleChunkFile); - - m_DiskStats.CurrentOpenFileCount++; - auto _ = MakeGuard([this]() { m_DiskStats.CurrentOpenFileCount--; }); - SingleChunkFile.Write(Chunk, FileOffset); - } - else - { - const uint64_t MaxWriterBufferSize = 256u * 1025u; - - BufferedWriteFileCache::Local::Writer* Writer = LocalWriter.GetWriter(SequenceIndex); - if (Writer) - { - if ((!Writer->Writer) && (ChunkSize < MaxWriterBufferSize)) - { - Writer->Writer = std::make_unique<BasicFileWriter>(*Writer->File, Min(SequenceSize, MaxWriterBufferSize)); - } - Writer->Write(Chunk, FileOffset); - } - else - { - Writer = LocalWriter.PutWriter(SequenceIndex, std::make_unique<BufferedWriteFileCache::Local::Writer>()); - - Writer->File = std::make_unique<BasicFile>(); - OpenFile(*Writer->File); - if (ChunkSize < MaxWriterBufferSize) - { - Writer->Writer = std::make_unique<BasicFileWriter>(*Writer->File, Min(SequenceSize, MaxWriterBufferSize)); - } - Writer->Write(Chunk, FileOffset); - } - } - m_DiskStats.WriteCount++; - m_DiskStats.WriteByteCount += ChunkSize; - m_WrittenChunkByteCount += ChunkSize; -} - -bool -BuildsOperationUpdateFolder::GetBlockWriteOps(const IoHash& BlockRawHash, - std::span<const IoHash> ChunkRawHashes, - std::span<const uint32_t> ChunkCompressedLengths, - std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, - std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, - const MemoryView BlockView, - uint32_t FirstIncludedBlockChunkIndex, - uint32_t LastIncludedBlockChunkIndex, - BlockWriteOps& OutOps) -{ - ZEN_TRACE_CPU("GetBlockWriteOps"); - - uint32_t OffsetInBlock = 0; - for (uint32_t ChunkBlockIndex = FirstIncludedBlockChunkIndex; ChunkBlockIndex <= LastIncludedBlockChunkIndex; ChunkBlockIndex++) - { - const uint32_t ChunkCompressedSize = ChunkCompressedLengths[ChunkBlockIndex]; - const IoHash& ChunkHash = ChunkRawHashes[ChunkBlockIndex]; - if (auto It = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); It != m_RemoteLookup.ChunkHashToChunkIndex.end()) - { - const uint32_t ChunkIndex = It->second; - std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = - GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, ChunkIndex); - - if (!ChunkTargetPtrs.empty()) - { - bool NeedsWrite = true; - if (RemoteChunkIndexNeedsCopyFromSourceFlags[ChunkIndex].compare_exchange_strong(NeedsWrite, false)) - { - MemoryView ChunkMemoryView = BlockView.Mid(OffsetInBlock, ChunkCompressedSize); - IoHash VerifyChunkHash; - uint64_t VerifyChunkSize; - CompressedBuffer CompressedChunk = - CompressedBuffer::FromCompressed(SharedBuffer::MakeView(ChunkMemoryView), VerifyChunkHash, VerifyChunkSize); - if (!CompressedChunk) - { - throw std::runtime_error(fmt::format("Chunk {} at {}, size {} in block {} is not a valid compressed buffer", - ChunkHash, - OffsetInBlock, - ChunkCompressedSize, - BlockRawHash)); - } - if (VerifyChunkHash != ChunkHash) - { - throw std::runtime_error(fmt::format("Chunk {} at {}, size {} in block {} has a mismatching content hash {}", - ChunkHash, - OffsetInBlock, - ChunkCompressedSize, - BlockRawHash, - VerifyChunkHash)); - } - if (VerifyChunkSize != m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]) - { - throw std::runtime_error( - fmt::format("Chunk {} at {}, size {} in block {} has a mismatching raw size {}, expected {}", - ChunkHash, - OffsetInBlock, - ChunkCompressedSize, - BlockRawHash, - VerifyChunkSize, - m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex])); - } - - OodleCompressor ChunkCompressor; - OodleCompressionLevel ChunkCompressionLevel; - uint64_t ChunkBlockSize; - - bool GetCompressParametersSuccess = - CompressedChunk.TryGetCompressParameters(ChunkCompressor, ChunkCompressionLevel, ChunkBlockSize); - ZEN_ASSERT(GetCompressParametersSuccess); - - IoBuffer Decompressed; - if (ChunkCompressionLevel == OodleCompressionLevel::None) - { - MemoryView ChunkDecompressedMemoryView = ChunkMemoryView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder()); - Decompressed = - IoBuffer(IoBuffer::Wrap, ChunkDecompressedMemoryView.GetData(), ChunkDecompressedMemoryView.GetSize()); - } - else - { - Decompressed = CompressedChunk.Decompress().AsIoBuffer(); - } - - if (Decompressed.GetSize() != m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]) - { - throw std::runtime_error(fmt::format("Chunk {} at {}, size {} in block {} decompressed to size {}, expected {}", - ChunkHash, - OffsetInBlock, - ChunkCompressedSize, - BlockRawHash, - Decompressed.GetSize(), - m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex])); - } - - ZEN_ASSERT_SLOW(ChunkHash == IoHash::HashBuffer(Decompressed)); - for (const ChunkedContentLookup::ChunkSequenceLocation* Target : ChunkTargetPtrs) - { - OutOps.WriteOps.push_back( - BlockWriteOps::WriteOpData{.Target = Target, .ChunkBufferIndex = OutOps.ChunkBuffers.size()}); - } - OutOps.ChunkBuffers.emplace_back(std::move(Decompressed)); - } - } - } - - OffsetInBlock += ChunkCompressedSize; - } - { - ZEN_TRACE_CPU("Sort"); - std::sort(OutOps.WriteOps.begin(), - OutOps.WriteOps.end(), - [](const BlockWriteOps::WriteOpData& Lhs, const BlockWriteOps::WriteOpData& Rhs) { - if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex) - { - return true; - } - if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex) - { - return false; - } - return Lhs.Target->Offset < Rhs.Target->Offset; - }); - } - return true; -} - -void -BuildsOperationUpdateFolder::WriteBlockChunkOpsToCache(std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, - const BlockWriteOps& Ops, - BufferedWriteFileCache& WriteCache, - ParallelWork& Work) -{ - ZEN_TRACE_CPU("WriteBlockChunkOpsToCache"); - - { - BufferedWriteFileCache::Local LocalWriter(WriteCache); - for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps) - { - if (Work.IsAborted()) - { - break; - } - const CompositeBuffer& Chunk = Ops.ChunkBuffers[WriteOp.ChunkBufferIndex]; - const uint32_t SequenceIndex = WriteOp.Target->SequenceIndex; - ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() <= - m_RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]); - ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() > 0); - const uint64_t FileOffset = WriteOp.Target->Offset; - const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; - - WriteSequenceChunkToCache(LocalWriter, Chunk, SequenceIndex, FileOffset, PathIndex); - } - } - if (!Work.IsAborted()) - { - // Write tracking, updating this must be done without any files open (BufferedWriteFileCache::Local) - std::vector<uint32_t> CompletedChunkSequences; - for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps) - { - const uint32_t RemoteSequenceIndex = WriteOp.Target->SequenceIndex; - if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) - { - CompletedChunkSequences.push_back(RemoteSequenceIndex); - } - } - WriteCache.Close(CompletedChunkSequences); - VerifyAndCompleteChunkSequencesAsync(CompletedChunkSequences, Work); - } -} - -bool -BuildsOperationUpdateFolder::WriteChunksBlockToCache(const ChunkBlockDescription& BlockDescription, - std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, - ParallelWork& Work, - CompositeBuffer&& BlockBuffer, - std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, - BufferedWriteFileCache& WriteCache) -{ - ZEN_TRACE_CPU("WriteChunksBlockToCache"); - - IoBuffer BlockMemoryBuffer = MakeBufferMemoryBased(BlockBuffer); - const MemoryView BlockView = BlockMemoryBuffer.GetView(); - - BlockWriteOps Ops; - if ((BlockDescription.HeaderSize == 0) || BlockDescription.ChunkCompressedLengths.empty()) - { - ZEN_TRACE_CPU("WriteChunksBlockToCache_Legacy"); - - uint64_t HeaderSize; - const std::vector<uint32_t> ChunkCompressedLengths = - ReadChunkBlockHeader(BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder()), HeaderSize); - - if (GetBlockWriteOps(BlockDescription.BlockHash, - BlockDescription.ChunkRawHashes, - ChunkCompressedLengths, - SequenceIndexChunksLeftToWriteCounters, - RemoteChunkIndexNeedsCopyFromSourceFlags, - BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder() + HeaderSize), - 0, - gsl::narrow<uint32_t>(BlockDescription.ChunkRawHashes.size() - 1), - Ops)) - { - WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work); - return true; - } - return false; - } - - if (GetBlockWriteOps(BlockDescription.BlockHash, - BlockDescription.ChunkRawHashes, - BlockDescription.ChunkCompressedLengths, - SequenceIndexChunksLeftToWriteCounters, - RemoteChunkIndexNeedsCopyFromSourceFlags, - BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize), - 0, - gsl::narrow<uint32_t>(BlockDescription.ChunkRawHashes.size() - 1), - Ops)) - { - WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work); - return true; - } - return false; -} - -bool -BuildsOperationUpdateFolder::WritePartialBlockChunksToCache(const ChunkBlockDescription& BlockDescription, - std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, - ParallelWork& Work, - CompositeBuffer&& PartialBlockBuffer, - uint32_t FirstIncludedBlockChunkIndex, - uint32_t LastIncludedBlockChunkIndex, - std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, - BufferedWriteFileCache& WriteCache) -{ - ZEN_TRACE_CPU("WritePartialBlockChunksToCache"); - - IoBuffer BlockMemoryBuffer = MakeBufferMemoryBased(PartialBlockBuffer); - const MemoryView BlockView = BlockMemoryBuffer.GetView(); - - BlockWriteOps Ops; - if (GetBlockWriteOps(BlockDescription.BlockHash, - BlockDescription.ChunkRawHashes, - BlockDescription.ChunkCompressedLengths, - SequenceIndexChunksLeftToWriteCounters, - RemoteChunkIndexNeedsCopyFromSourceFlags, - BlockView, - FirstIncludedBlockChunkIndex, - LastIncludedBlockChunkIndex, - Ops)) - { - WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work); - return true; - } - else - { - return false; - } -} - -void -BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::path& ZenFolderPath, - uint32_t RemoteChunkIndex, - std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>&& ChunkTargetPtrs, - BufferedWriteFileCache& WriteCache, - ParallelWork& Work, - IoBuffer&& Payload, - std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, - std::atomic<uint64_t>& WritePartsComplete, - const uint64_t TotalPartWriteCount, - FilteredRate& FilteredWrittenBytesPerSecond, - bool EnableBacklog) -{ - ZEN_TRACE_CPU("AsyncWriteDownloadedChunk"); - - const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; - - const uint64_t Size = Payload.GetSize(); - - std::filesystem::path CompressedChunkPath; - - // Check if the dowloaded chunk is file based and we can move it directly without rewriting it - { - IoBufferFileReference FileRef; - if (Payload.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == Size)) - { - ZEN_TRACE_CPU("MoveTempChunk"); - std::error_code Ec; - std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); - if (!Ec) - { - Payload.SetDeleteOnClose(false); - Payload = {}; - CompressedChunkPath = m_TempDownloadFolderPath / ChunkHash.ToHexString(); - RenameFile(TempBlobPath, CompressedChunkPath, Ec); - if (Ec) - { - CompressedChunkPath = std::filesystem::path{}; - - // Re-open the temp file again - BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); - Payload = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, Size, true); - Payload.SetDeleteOnClose(true); - } - } - } - } - - if (CompressedChunkPath.empty() && (Size > m_Options.MaximumInMemoryPayloadSize)) - { - ZEN_TRACE_CPU("WriteTempChunk"); - // Could not be moved and rather large, lets store it on disk - CompressedChunkPath = m_TempDownloadFolderPath / ChunkHash.ToHexString(); - TemporaryFile::SafeWriteFile(CompressedChunkPath, Payload); - Payload = {}; - } - - Work.ScheduleWork( - m_IOWorkerPool, - [&ZenFolderPath, - this, - SequenceIndexChunksLeftToWriteCounters, - &Work, - CompressedChunkPath, - RemoteChunkIndex, - TotalPartWriteCount, - &WriteCache, - &WritePartsComplete, - &FilteredWrittenBytesPerSecond, - ChunkTargetPtrs = std::move(ChunkTargetPtrs), - CompressedPart = IoBuffer(std::move(Payload))](std::atomic<bool>&) mutable { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Async_WriteChunk"); - - FilteredWrittenBytesPerSecond.Start(); - - const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; - if (CompressedChunkPath.empty()) - { - ZEN_ASSERT(CompressedPart); - } - else - { - ZEN_ASSERT(!CompressedPart); - CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); - if (!CompressedPart) - { - throw std::runtime_error( - fmt::format("Could not open dowloaded compressed chunk {} from {}", ChunkHash, CompressedChunkPath)); - } - } - - bool NeedHashVerify = WriteCompressedChunkToCache(ChunkHash, ChunkTargetPtrs, WriteCache, std::move(CompressedPart)); - if (!m_AbortFlag) - { - WritePartsComplete++; - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); - } - - if (!CompressedChunkPath.empty()) - { - std::error_code Ec = TryRemoveFile(CompressedChunkPath); - if (Ec) - { - ZEN_OPERATION_LOG_DEBUG(m_LogOutput, - "Failed removing file '{}', reason: ({}) {}", - CompressedChunkPath, - Ec.value(), - Ec.message()); - } - } - - std::vector<uint32_t> CompletedSequences = - CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); - WriteCache.Close(CompletedSequences); - if (NeedHashVerify) - { - VerifyAndCompleteChunkSequencesAsync(CompletedSequences, Work); - } - else - { - FinalizeChunkSequences(CompletedSequences); - } - } - } - }, - EnableBacklog ? WorkerThreadPool::EMode::EnableBacklog : WorkerThreadPool::EMode::DisableBacklog); -} - -void -BuildsOperationUpdateFolder::VerifyAndCompleteChunkSequencesAsync(std::span<const uint32_t> RemoteSequenceIndexes, ParallelWork& Work) -{ - if (RemoteSequenceIndexes.empty()) - { - return; - } - ZEN_TRACE_CPU("VerifyAndCompleteChunkSequence"); - if (m_Options.ValidateCompletedSequences) - { - for (uint32_t RemoteSequenceIndexOffset = 1; RemoteSequenceIndexOffset < RemoteSequenceIndexes.size(); RemoteSequenceIndexOffset++) - { - const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset]; - Work.ScheduleWork(m_IOWorkerPool, [this, RemoteSequenceIndex](std::atomic<bool>&) { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("Async_VerifyAndFinalizeSequence"); - - VerifySequence(RemoteSequenceIndex); - if (!m_AbortFlag) - { - const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; - FinalizeChunkSequence(SequenceRawHash); - } - } - }); - } - const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[0]; - - VerifySequence(RemoteSequenceIndex); - const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; - FinalizeChunkSequence(SequenceRawHash); - } - else - { - for (uint32_t RemoteSequenceIndexOffset = 0; RemoteSequenceIndexOffset < RemoteSequenceIndexes.size(); RemoteSequenceIndexOffset++) - { - const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset]; - const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; - FinalizeChunkSequence(SequenceRawHash); - } - } -} - -bool -BuildsOperationUpdateFolder::CompleteSequenceChunk(uint32_t RemoteSequenceIndex, - std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters) -{ - uint32_t PreviousValue = SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1); - ZEN_ASSERT(PreviousValue >= 1); - ZEN_ASSERT(PreviousValue != (uint32_t)-1); - return PreviousValue == 1; -} - -std::vector<uint32_t> -BuildsOperationUpdateFolder::CompleteChunkTargets(const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs, - std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters) -{ - ZEN_TRACE_CPU("CompleteChunkTargets"); - - std::vector<uint32_t> CompletedSequenceIndexes; - for (const ChunkedContentLookup::ChunkSequenceLocation* Location : ChunkTargetPtrs) - { - const uint32_t RemoteSequenceIndex = Location->SequenceIndex; - if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) - { - CompletedSequenceIndexes.push_back(RemoteSequenceIndex); - } - } - return CompletedSequenceIndexes; -} - -void -BuildsOperationUpdateFolder::FinalizeChunkSequence(const IoHash& SequenceRawHash) -{ - ZEN_TRACE_CPU("FinalizeChunkSequence"); - - ZEN_ASSERT_SLOW(!IsFile(GetFinalChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash))); - std::error_code Ec; - RenameFile(GetTempChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash), - GetFinalChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash), - Ec); - if (Ec) - { - throw std::system_error(Ec); - } -} - -void -BuildsOperationUpdateFolder::FinalizeChunkSequences(std::span<const uint32_t> RemoteSequenceIndexes) -{ - ZEN_TRACE_CPU("FinalizeChunkSequences"); - - for (uint32_t SequenceIndex : RemoteSequenceIndexes) - { - FinalizeChunkSequence(m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); - } -} - -void -BuildsOperationUpdateFolder::VerifySequence(uint32_t RemoteSequenceIndex) -{ - ZEN_TRACE_CPU("VerifySequence"); - - ZEN_ASSERT(m_Options.ValidateCompletedSequences); - - const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; - { - ZEN_TRACE_CPU("HashSequence"); - const std::uint32_t RemotePathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]; - const uint64_t ExpectedSize = m_RemoteContent.RawSizes[RemotePathIndex]; - IoBuffer VerifyBuffer = IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash)); - const uint64_t VerifySize = VerifyBuffer.GetSize(); - if (VerifySize != ExpectedSize) - { - throw std::runtime_error(fmt::format("Written chunk sequence {} size {} does not match expected size {}", - SequenceRawHash, - VerifySize, - ExpectedSize)); - } - - const IoHash VerifyChunkHash = IoHash::HashBuffer(std::move(VerifyBuffer), &m_ValidatedChunkByteCount); - if (VerifyChunkHash != SequenceRawHash) - { - throw std::runtime_error( - fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash)); - } - } -} - -////////////////////// BuildsOperationUploadFolder - -BuildsOperationUploadFolder::BuildsOperationUploadFolder(OperationLogOutput& OperationLogOutput, - StorageInstance& Storage, - std::atomic<bool>& AbortFlag, - std::atomic<bool>& PauseFlag, - WorkerThreadPool& IOWorkerPool, - WorkerThreadPool& NetworkPool, - const Oid& BuildId, - const std::filesystem::path& Path, - bool CreateBuild, - const CbObject& MetaData, - const Options& Options) -: m_LogOutput(OperationLogOutput) -, m_Storage(Storage) -, m_AbortFlag(AbortFlag) -, m_PauseFlag(PauseFlag) -, m_IOWorkerPool(IOWorkerPool) -, m_NetworkPool(NetworkPool) -, m_BuildId(BuildId) -, m_Path(Path) -, m_CreateBuild(CreateBuild) -, m_MetaData(MetaData) -, m_Options(Options) -{ - m_NonCompressableExtensionHashes.reserve(Options.NonCompressableExtensions.size()); - for (const std::string& Extension : Options.NonCompressableExtensions) - { - m_NonCompressableExtensionHashes.insert(HashStringAsLowerDjb2(Extension)); - } -} - -BuildsOperationUploadFolder::PrepareBuildResult -BuildsOperationUploadFolder::PrepareBuild() -{ - ZEN_TRACE_CPU("PrepareBuild"); - - PrepareBuildResult Result; - Result.PreferredMultipartChunkSize = m_Options.PreferredMultipartChunkSize; - Stopwatch Timer; - if (m_CreateBuild) - { - ZEN_TRACE_CPU("CreateBuild"); - - Stopwatch PutBuildTimer; - CbObject PutBuildResult = m_Storage.BuildStorage->PutBuild(m_BuildId, m_MetaData); - Result.PrepareBuildTimeMs = PutBuildTimer.GetElapsedTimeMs(); - if (auto ChunkSize = PutBuildResult["chunkSize"sv].AsUInt64(); ChunkSize != 0) - { - Result.PreferredMultipartChunkSize = ChunkSize; - } - Result.PayloadSize = m_MetaData.GetSize(); - } - else - { - ZEN_TRACE_CPU("PutBuild"); - Stopwatch GetBuildTimer; - CbObject Build = m_Storage.BuildStorage->GetBuild(m_BuildId); - Result.PrepareBuildTimeMs = GetBuildTimer.GetElapsedTimeMs(); - Result.PayloadSize = Build.GetSize(); - if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0) - { - Result.PreferredMultipartChunkSize = ChunkSize; - } - else if (m_Options.AllowMultiparts) - { - ZEN_OPERATION_LOG_WARN(m_LogOutput, - "PreferredMultipartChunkSize is unknown. Defaulting to '{}'", - NiceBytes(Result.PreferredMultipartChunkSize)); - } - } - - if (!m_Options.IgnoreExistingBlocks) - { - ZEN_TRACE_CPU("FindBlocks"); - Stopwatch KnownBlocksTimer; - CbObject BlockDescriptionList = m_Storage.BuildStorage->FindBlocks(m_BuildId, m_Options.FindBlockMaxCount); - if (BlockDescriptionList) - { - Result.KnownBlocks = ParseChunkBlockDescriptionList(BlockDescriptionList); - } - Result.FindBlocksTimeMs = KnownBlocksTimer.GetElapsedTimeMs(); - } - Result.ElapsedTimeMs = Timer.GetElapsedTimeMs(); - return Result; -} - -std::vector<BuildsOperationUploadFolder::UploadPart> -BuildsOperationUploadFolder::ReadFolder() -{ - std::vector<UploadPart> UploadParts; - std::filesystem::path ExcludeManifestPath = m_Path / m_Options.ZenExcludeManifestName; - tsl::robin_set<std::string> ExcludeAssetPaths; - if (IsFile(ExcludeManifestPath)) - { - std::filesystem::path AbsoluteExcludeManifestPath = - MakeSafeAbsolutePath(ExcludeManifestPath.is_absolute() ? ExcludeManifestPath : m_Path / ExcludeManifestPath); - BuildManifest Manifest = ParseBuildManifest(AbsoluteExcludeManifestPath); - const std::vector<std::filesystem::path>& AssetPaths = Manifest.Parts.front().Files; - ExcludeAssetPaths.reserve(AssetPaths.size()); - for (const std::filesystem::path& AssetPath : AssetPaths) - { - ExcludeAssetPaths.insert(AssetPath.generic_string()); - } - } - - UploadParts.resize(1); - - UploadPart& Part = UploadParts.front(); - GetFolderContentStatistics& LocalFolderScanStats = Part.LocalFolderScanStats; - - Part.Content = GetFolderContent( - Part.LocalFolderScanStats, - m_Path, - [this](const std::string_view& RelativePath) { return IsAcceptedFolder(RelativePath); }, - [this, &ExcludeAssetPaths](const std::string_view& RelativePath, uint64_t Size, uint32_t Attributes) -> bool { - ZEN_UNUSED(Size, Attributes); - if (!IsAcceptedFile(RelativePath)) - { - return false; - } - if (ExcludeAssetPaths.contains(std::filesystem::path(RelativePath).generic_string())) - { - return false; - } - return true; - }, - m_IOWorkerPool, - m_LogOutput.GetProgressUpdateDelayMS(), - [&](bool, std::ptrdiff_t) { - ZEN_OPERATION_LOG_INFO(m_LogOutput, "Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), m_Path); - }, - m_AbortFlag); - Part.TotalRawSize = std::accumulate(Part.Content.RawSizes.begin(), Part.Content.RawSizes.end(), std::uint64_t(0)); - - return UploadParts; -} - -std::vector<BuildsOperationUploadFolder::UploadPart> -BuildsOperationUploadFolder::ReadManifestParts(const std::filesystem::path& ManifestPath) -{ - std::vector<UploadPart> UploadParts; - Stopwatch ManifestParseTimer; - std::filesystem::path AbsoluteManifestPath = MakeSafeAbsolutePath(ManifestPath.is_absolute() ? ManifestPath : m_Path / ManifestPath); - BuildManifest Manifest = ParseBuildManifest(AbsoluteManifestPath); - if (Manifest.Parts.empty()) - { - throw std::runtime_error(fmt::format("Manifest file at '{}' is invalid", ManifestPath)); - } - - UploadParts.resize(Manifest.Parts.size()); - for (size_t PartIndex = 0; PartIndex < Manifest.Parts.size(); PartIndex++) - { - BuildManifest::Part& PartManifest = Manifest.Parts[PartIndex]; - if (ManifestPath.is_relative()) - { - PartManifest.Files.push_back(ManifestPath); - } - - UploadPart& Part = UploadParts[PartIndex]; - FolderContent& Content = Part.Content; - - GetFolderContentStatistics& LocalFolderScanStats = Part.LocalFolderScanStats; - - const std::vector<std::filesystem::path>& AssetPaths = PartManifest.Files; - Content = GetValidFolderContent( - m_IOWorkerPool, - LocalFolderScanStats, - m_Path, - AssetPaths, - [](uint64_t PathCount, uint64_t CompletedPathCount) { ZEN_UNUSED(PathCount, CompletedPathCount); }, - 1000, - m_AbortFlag, - m_PauseFlag); - - if (Content.Paths.size() != AssetPaths.size()) - { - const tsl::robin_set<std::filesystem::path> FoundPaths(Content.Paths.begin(), Content.Paths.end()); - ExtendableStringBuilder<1024> SB; - for (const std::filesystem::path& AssetPath : AssetPaths) - { - if (!FoundPaths.contains(AssetPath)) - { - SB << "\n " << AssetPath.generic_string(); - } - } - throw std::runtime_error( - fmt::format("Manifest file at '{}' references files that does not exist{}", ManifestPath, SB.ToView())); - } - - Part.PartId = PartManifest.PartId; - Part.PartName = PartManifest.PartName; - Part.TotalRawSize = std::accumulate(Part.Content.RawSizes.begin(), Part.Content.RawSizes.end(), std::uint64_t(0)); - } - - return UploadParts; -} - -std::vector<std::pair<Oid, std::string>> -BuildsOperationUploadFolder::Execute(const Oid& BuildPartId, - const std::string_view BuildPartName, - const std::filesystem::path& ManifestPath, - ChunkingController& ChunkController, - ChunkingCache& ChunkCache) -{ - ZEN_TRACE_CPU("BuildsOperationUploadFolder::Execute"); - try - { - Stopwatch ReadPartsTimer; - std::vector<UploadPart> UploadParts = ManifestPath.empty() ? ReadFolder() : ReadManifestParts(ManifestPath); - - for (UploadPart& Part : UploadParts) - { - if (Part.PartId == Oid::Zero) - { - if (UploadParts.size() != 1) - { - throw std::runtime_error(fmt::format("Multi part upload manifest '{}' must contains build part id", ManifestPath)); - } - - if (BuildPartId == Oid::Zero) - { - Part.PartId = Oid::NewOid(); - } - else - { - Part.PartId = BuildPartId; - } - } - if (Part.PartName.empty()) - { - if (UploadParts.size() != 1) - { - throw std::runtime_error(fmt::format("Multi part upload manifest '{}' must contains build part name", ManifestPath)); - } - if (BuildPartName.empty()) - { - throw std::runtime_error("Build part name must be set"); - } - Part.PartName = std::string(BuildPartName); - } - } - - if (!m_Options.IsQuiet) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Reading {} parts took {}", - UploadParts.size(), - NiceTimeSpanMs(ReadPartsTimer.GetElapsedTimeMs())); - } - - const uint32_t PartsUploadStepCount = gsl::narrow<uint32_t>(uint32_t(PartTaskSteps::StepCount) * UploadParts.size()); - - const uint32_t PrepareBuildStep = 0; - const uint32_t UploadPartsStep = 1; - const uint32_t FinalizeBuildStep = UploadPartsStep + PartsUploadStepCount; - const uint32_t CleanupStep = FinalizeBuildStep + 1; - const uint32_t StepCount = CleanupStep + 1; - - auto EndProgress = MakeGuard([&]() { m_LogOutput.SetLogOperationProgress(StepCount, StepCount); }); - - Stopwatch ProcessTimer; - - CleanAndRemoveDirectory(m_IOWorkerPool, m_AbortFlag, m_PauseFlag, m_Options.TempDir); - CreateDirectories(m_Options.TempDir); - auto _ = MakeGuard([&]() { CleanAndRemoveDirectory(m_IOWorkerPool, m_AbortFlag, m_PauseFlag, m_Options.TempDir); }); - - m_LogOutput.SetLogOperationProgress(PrepareBuildStep, StepCount); - - m_PrepBuildResultFuture = m_NetworkPool.EnqueueTask(std::packaged_task<PrepareBuildResult()>{[this] { return PrepareBuild(); }}, - WorkerThreadPool::EMode::EnableBacklog); - - for (uint32_t PartIndex = 0; PartIndex < UploadParts.size(); PartIndex++) - { - const uint32_t PartStepOffset = UploadPartsStep + (PartIndex * uint32_t(PartTaskSteps::StepCount)); - - const UploadPart& Part = UploadParts[PartIndex]; - UploadBuildPart(ChunkController, ChunkCache, PartIndex, Part, PartStepOffset, StepCount); - if (m_AbortFlag) - { - return {}; - } - } - - m_LogOutput.SetLogOperationProgress(FinalizeBuildStep, StepCount); - - if (m_CreateBuild && !m_AbortFlag) - { - Stopwatch FinalizeBuildTimer; - m_Storage.BuildStorage->FinalizeBuild(m_BuildId); - if (!m_Options.IsQuiet) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, "FinalizeBuild took {}", NiceTimeSpanMs(FinalizeBuildTimer.GetElapsedTimeMs())); - } - } - - m_LogOutput.SetLogOperationProgress(CleanupStep, StepCount); - - std::vector<std::pair<Oid, std::string>> Result; - Result.reserve(UploadParts.size()); - for (UploadPart& Part : UploadParts) - { - Result.push_back(std::make_pair(Part.PartId, Part.PartName)); - } - return Result; - } - catch (const std::exception&) - { - m_AbortFlag = true; - throw; - } -} - -bool -BuildsOperationUploadFolder::IsAcceptedFolder(const std::string_view& RelativePath) const -{ - for (const std::string& ExcludeFolder : m_Options.ExcludeFolders) - { - if (RelativePath.starts_with(ExcludeFolder)) - { - if (RelativePath.length() == ExcludeFolder.length()) - { - return false; - } - else if (RelativePath[ExcludeFolder.length()] == '/') - { - return false; - } - } - } - return true; -} - -bool -BuildsOperationUploadFolder::IsAcceptedFile(const std::string_view& RelativePath) const -{ - if (RelativePath == m_Options.ZenExcludeManifestName) - { - return false; - } - for (const std::string& ExcludeExtension : m_Options.ExcludeExtensions) - { - if (RelativePath.ends_with(ExcludeExtension)) - { - return false; - } - } - return true; -} - -void -BuildsOperationUploadFolder::ArrangeChunksIntoBlocks(const ChunkedFolderContent& Content, - const ChunkedContentLookup& Lookup, - std::vector<uint32_t>& ChunkIndexes, - std::vector<std::vector<uint32_t>>& OutBlocks) -{ - ZEN_TRACE_CPU("ArrangeChunksIntoBlocks"); - std::sort(ChunkIndexes.begin(), ChunkIndexes.end(), [&Content, &Lookup](uint32_t Lhs, uint32_t Rhs) { - const ChunkedContentLookup::ChunkSequenceLocation& LhsLocation = GetChunkSequenceLocations(Lookup, Lhs)[0]; - const ChunkedContentLookup::ChunkSequenceLocation& RhsLocation = GetChunkSequenceLocations(Lookup, Rhs)[0]; - if (LhsLocation.SequenceIndex < RhsLocation.SequenceIndex) - { - return true; - } - else if (LhsLocation.SequenceIndex > RhsLocation.SequenceIndex) - { - return false; - } - return LhsLocation.Offset < RhsLocation.Offset; - }); - - uint64_t MaxBlockSizeLowThreshold = m_Options.BlockParameters.MaxBlockSize - (m_Options.BlockParameters.MaxBlockSize / 16); - - uint64_t BlockSize = 0; - - uint32_t ChunkIndexStart = 0; - for (uint32_t ChunkIndexOffset = 0; ChunkIndexOffset < ChunkIndexes.size();) - { - const uint32_t ChunkIndex = ChunkIndexes[ChunkIndexOffset]; - const uint64_t ChunkSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; - - if (((BlockSize + ChunkSize) > m_Options.BlockParameters.MaxBlockSize) || - (ChunkIndexOffset - ChunkIndexStart) > m_Options.BlockParameters.MaxChunksPerBlock) - { - // Within the span of MaxBlockSizeLowThreshold and MaxBlockSize, see if there is a break - // between source paths for chunks. Break the block at the last such break if any. - ZEN_ASSERT(ChunkIndexOffset > ChunkIndexStart); - - const uint32_t ChunkSequenceIndex = Lookup.ChunkSequenceLocations[Lookup.ChunkSequenceLocationOffset[ChunkIndex]].SequenceIndex; - - uint64_t ScanBlockSize = BlockSize; - - uint32_t ScanChunkIndexOffset = ChunkIndexOffset - 1; - while (ScanChunkIndexOffset > (ChunkIndexStart + 2)) - { - const uint32_t TestChunkIndex = ChunkIndexes[ScanChunkIndexOffset]; - const uint64_t TestChunkSize = Content.ChunkedContent.ChunkRawSizes[TestChunkIndex]; - if ((ScanBlockSize - TestChunkSize) < MaxBlockSizeLowThreshold) - { - break; - } - - const uint32_t TestSequenceIndex = - Lookup.ChunkSequenceLocations[Lookup.ChunkSequenceLocationOffset[TestChunkIndex]].SequenceIndex; - if (ChunkSequenceIndex != TestSequenceIndex) - { - ChunkIndexOffset = ScanChunkIndexOffset + 1; - break; - } - - ScanBlockSize -= TestChunkSize; - ScanChunkIndexOffset--; - } - - std::vector<uint32_t> ChunksInBlock; - ChunksInBlock.reserve(ChunkIndexOffset - ChunkIndexStart); - for (uint32_t AddIndexOffset = ChunkIndexStart; AddIndexOffset < ChunkIndexOffset; AddIndexOffset++) - { - const uint32_t AddChunkIndex = ChunkIndexes[AddIndexOffset]; - ChunksInBlock.push_back(AddChunkIndex); - } - OutBlocks.emplace_back(std::move(ChunksInBlock)); - BlockSize = 0; - ChunkIndexStart = ChunkIndexOffset; - } - else - { - ChunkIndexOffset++; - BlockSize += ChunkSize; - } - } - if (ChunkIndexStart < ChunkIndexes.size()) - { - std::vector<uint32_t> ChunksInBlock; - ChunksInBlock.reserve(ChunkIndexes.size() - ChunkIndexStart); - for (uint32_t AddIndexOffset = ChunkIndexStart; AddIndexOffset < ChunkIndexes.size(); AddIndexOffset++) - { - const uint32_t AddChunkIndex = ChunkIndexes[AddIndexOffset]; - ChunksInBlock.push_back(AddChunkIndex); - } - OutBlocks.emplace_back(std::move(ChunksInBlock)); - } -} - -void -BuildsOperationUploadFolder::GenerateBuildBlocks(const ChunkedFolderContent& Content, - const ChunkedContentLookup& Lookup, - const std::vector<std::vector<uint32_t>>& NewBlockChunks, - GeneratedBlocks& OutBlocks, - GenerateBlocksStatistics& GenerateBlocksStats, - UploadStatistics& UploadStats) -{ - ZEN_TRACE_CPU("GenerateBuildBlocks"); - const std::size_t NewBlockCount = NewBlockChunks.size(); - if (NewBlockCount > 0) - { - std::unique_ptr<OperationLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Generate Blocks")); - OperationLogOutput::ProgressBar& Progress(*ProgressBarPtr); - - OutBlocks.BlockDescriptions.resize(NewBlockCount); - OutBlocks.BlockSizes.resize(NewBlockCount); - OutBlocks.BlockMetaDatas.resize(NewBlockCount); - OutBlocks.BlockHeaders.resize(NewBlockCount); - OutBlocks.MetaDataHasBeenUploaded.resize(NewBlockCount, 0); - OutBlocks.BlockHashToBlockIndex.reserve(NewBlockCount); - - RwLock Lock; - - WorkerThreadPool& GenerateBlobsPool = m_IOWorkerPool; - WorkerThreadPool& UploadBlocksPool = m_NetworkPool; - - FilteredRate FilteredGeneratedBytesPerSecond; - FilteredRate FilteredUploadedBytesPerSecond; - - ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - - std::atomic<uint64_t> QueuedPendingBlocksForUpload = 0; - - for (size_t BlockIndex = 0; BlockIndex < NewBlockCount; BlockIndex++) - { - if (Work.IsAborted()) - { - break; - } - const std::vector<uint32_t>& ChunksInBlock = NewBlockChunks[BlockIndex]; - Work.ScheduleWork( - GenerateBlobsPool, - [this, - &Content, - &Lookup, - &Work, - &UploadBlocksPool, - NewBlockCount, - ChunksInBlock, - &Lock, - &OutBlocks, - &GenerateBlocksStats, - &UploadStats, - &FilteredGeneratedBytesPerSecond, - &QueuedPendingBlocksForUpload, - &FilteredUploadedBytesPerSecond, - BlockIndex](std::atomic<bool>&) { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("GenerateBuildBlocks_Generate"); - - FilteredGeneratedBytesPerSecond.Start(); - - Stopwatch GenerateTimer; - CompressedBuffer CompressedBlock = - GenerateBlock(Content, Lookup, ChunksInBlock, OutBlocks.BlockDescriptions[BlockIndex]); - if (m_Options.IsVerbose) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Generated block {} ({}) containing {} chunks in {}", - OutBlocks.BlockDescriptions[BlockIndex].BlockHash, - NiceBytes(CompressedBlock.GetCompressedSize()), - OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size(), - NiceTimeSpanMs(GenerateTimer.GetElapsedTimeMs())); - } - - OutBlocks.BlockSizes[BlockIndex] = CompressedBlock.GetCompressedSize(); - { - CbObjectWriter Writer; - Writer.AddString("createdBy", "zen"); - OutBlocks.BlockMetaDatas[BlockIndex] = Writer.Save(); - } - GenerateBlocksStats.GeneratedBlockByteCount += OutBlocks.BlockSizes[BlockIndex]; - GenerateBlocksStats.GeneratedBlockCount++; - - Lock.WithExclusiveLock([&]() { - OutBlocks.BlockHashToBlockIndex.insert_or_assign(OutBlocks.BlockDescriptions[BlockIndex].BlockHash, BlockIndex); - }); - - { - std::span<const SharedBuffer> Segments = CompressedBlock.GetCompressed().GetSegments(); - ZEN_ASSERT(Segments.size() >= 2); - OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]); - } - - if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount) - { - FilteredGeneratedBytesPerSecond.Stop(); - } - - if (QueuedPendingBlocksForUpload.load() > 16) - { - std::span<const SharedBuffer> Segments = CompressedBlock.GetCompressed().GetSegments(); - ZEN_ASSERT(Segments.size() >= 2); - OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]); - } - else - { - if (!m_AbortFlag) - { - QueuedPendingBlocksForUpload++; - - Work.ScheduleWork( - UploadBlocksPool, - [this, - NewBlockCount, - &GenerateBlocksStats, - &UploadStats, - &FilteredUploadedBytesPerSecond, - &QueuedPendingBlocksForUpload, - &OutBlocks, - BlockIndex, - Payload = std::move(CompressedBlock)](std::atomic<bool>&) mutable { - auto _ = MakeGuard([&QueuedPendingBlocksForUpload] { QueuedPendingBlocksForUpload--; }); - if (!m_AbortFlag) - { - if (GenerateBlocksStats.GeneratedBlockCount == NewBlockCount) - { - ZEN_TRACE_CPU("GenerateBuildBlocks_Save"); - - FilteredUploadedBytesPerSecond.Stop(); - std::span<const SharedBuffer> Segments = Payload.GetCompressed().GetSegments(); - ZEN_ASSERT(Segments.size() >= 2); - OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]); - } - else - { - ZEN_TRACE_CPU("GenerateBuildBlocks_Upload"); - - FilteredUploadedBytesPerSecond.Start(); - - const CbObject BlockMetaData = - BuildChunkBlockDescription(OutBlocks.BlockDescriptions[BlockIndex], - OutBlocks.BlockMetaDatas[BlockIndex]); - - const IoHash& BlockHash = OutBlocks.BlockDescriptions[BlockIndex].BlockHash; - const uint64_t CompressedBlockSize = Payload.GetCompressedSize(); - - if (m_Storage.CacheStorage && m_Options.PopulateCache) - { - m_Storage.CacheStorage->PutBuildBlob(m_BuildId, - BlockHash, - ZenContentType::kCompressedBinary, - Payload.GetCompressed()); - } - - m_Storage.BuildStorage->PutBuildBlob(m_BuildId, - BlockHash, - ZenContentType::kCompressedBinary, - std::move(Payload).GetCompressed()); - UploadStats.BlocksBytes += CompressedBlockSize; - - if (m_Options.IsVerbose) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Uploaded block {} ({}) containing {} chunks", - BlockHash, - NiceBytes(CompressedBlockSize), - OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); - } - - if (m_Storage.CacheStorage && m_Options.PopulateCache) - { - m_Storage.CacheStorage->PutBlobMetadatas(m_BuildId, - std::vector<IoHash>({BlockHash}), - std::vector<CbObject>({BlockMetaData})); - } - - bool MetadataSucceeded = - m_Storage.BuildStorage->PutBlockMetadata(m_BuildId, BlockHash, BlockMetaData); - if (MetadataSucceeded) - { - if (m_Options.IsVerbose) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Uploaded block {} metadata ({})", - BlockHash, - NiceBytes(BlockMetaData.GetSize())); - } - - OutBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; - UploadStats.BlocksBytes += BlockMetaData.GetSize(); - } - - UploadStats.BlockCount++; - if (UploadStats.BlockCount == NewBlockCount) - { - FilteredUploadedBytesPerSecond.Stop(); - } - } - } - }); - } - } - } - }); - } - - Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(PendingWork); - - FilteredGeneratedBytesPerSecond.Update(GenerateBlocksStats.GeneratedBlockByteCount.load()); - FilteredUploadedBytesPerSecond.Update(UploadStats.BlocksBytes.load()); - - std::string Details = fmt::format("Generated {}/{} ({}, {}B/s). Uploaded {}/{} ({}, {}bits/s)", - GenerateBlocksStats.GeneratedBlockCount.load(), - NewBlockCount, - NiceBytes(GenerateBlocksStats.GeneratedBlockByteCount.load()), - NiceNum(FilteredGeneratedBytesPerSecond.GetCurrent()), - UploadStats.BlockCount.load(), - NewBlockCount, - NiceBytes(UploadStats.BlocksBytes.load()), - NiceNum(FilteredUploadedBytesPerSecond.GetCurrent() * 8)); - - Progress.UpdateState({.Task = "Generating blocks", - .Details = Details, - .TotalCount = gsl::narrow<uint64_t>(NewBlockCount), - .RemainingCount = gsl::narrow<uint64_t>(NewBlockCount - GenerateBlocksStats.GeneratedBlockCount.load()), - .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, - false); - }); - - ZEN_ASSERT(m_AbortFlag || QueuedPendingBlocksForUpload.load() == 0); - - Progress.Finish(); - - GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGeneratedBytesPerSecond.GetElapsedTimeUS(); - UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS(); - } -} - -std::vector<uint32_t> -BuildsOperationUploadFolder::CalculateAbsoluteChunkOrders( - const std::span<const IoHash> LocalChunkHashes, - const std::span<const uint32_t> LocalChunkOrder, - const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& ChunkHashToLocalChunkIndex, - const std::span<const uint32_t>& LooseChunkIndexes, - const std::span<const ChunkBlockDescription>& BlockDescriptions) -{ - ZEN_TRACE_CPU("CalculateAbsoluteChunkOrders"); - - std::vector<IoHash> TmpAbsoluteChunkHashes; - if (m_Options.DoExtraContentValidation) - { - TmpAbsoluteChunkHashes.reserve(LocalChunkHashes.size()); - } - std::vector<uint32_t> LocalChunkIndexToAbsoluteChunkIndex; - LocalChunkIndexToAbsoluteChunkIndex.resize(LocalChunkHashes.size(), (uint32_t)-1); - std::uint32_t AbsoluteChunkCount = 0; - for (uint32_t ChunkIndex : LooseChunkIndexes) - { - LocalChunkIndexToAbsoluteChunkIndex[ChunkIndex] = AbsoluteChunkCount; - if (m_Options.DoExtraContentValidation) - { - TmpAbsoluteChunkHashes.push_back(LocalChunkHashes[ChunkIndex]); - } - AbsoluteChunkCount++; - } - for (const ChunkBlockDescription& Block : BlockDescriptions) - { - for (const IoHash& ChunkHash : Block.ChunkRawHashes) - { - if (auto It = ChunkHashToLocalChunkIndex.find(ChunkHash); It != ChunkHashToLocalChunkIndex.end()) - { - const uint32_t LocalChunkIndex = It->second; - ZEN_ASSERT_SLOW(LocalChunkHashes[LocalChunkIndex] == ChunkHash); - LocalChunkIndexToAbsoluteChunkIndex[LocalChunkIndex] = AbsoluteChunkCount; - } - if (m_Options.DoExtraContentValidation) - { - TmpAbsoluteChunkHashes.push_back(ChunkHash); - } - AbsoluteChunkCount++; - } - } - std::vector<uint32_t> AbsoluteChunkOrder; - AbsoluteChunkOrder.reserve(LocalChunkHashes.size()); - for (const uint32_t LocalChunkIndex : LocalChunkOrder) - { - const uint32_t AbsoluteChunkIndex = LocalChunkIndexToAbsoluteChunkIndex[LocalChunkIndex]; - if (m_Options.DoExtraContentValidation) - { - ZEN_ASSERT(LocalChunkHashes[LocalChunkIndex] == TmpAbsoluteChunkHashes[AbsoluteChunkIndex]); - } - AbsoluteChunkOrder.push_back(AbsoluteChunkIndex); - } - if (m_Options.DoExtraContentValidation) - { - uint32_t OrderIndex = 0; - while (OrderIndex < LocalChunkOrder.size()) - { - const uint32_t LocalChunkIndex = LocalChunkOrder[OrderIndex]; - const IoHash& LocalChunkHash = LocalChunkHashes[LocalChunkIndex]; - const uint32_t AbsoluteChunkIndex = AbsoluteChunkOrder[OrderIndex]; - const IoHash& AbsoluteChunkHash = TmpAbsoluteChunkHashes[AbsoluteChunkIndex]; - ZEN_ASSERT(LocalChunkHash == AbsoluteChunkHash); - OrderIndex++; - } - } - return AbsoluteChunkOrder; -} - -CompositeBuffer -BuildsOperationUploadFolder::FetchChunk(const ChunkedFolderContent& Content, - const ChunkedContentLookup& Lookup, - const IoHash& ChunkHash, - ReadFileCache& OpenFileCache) -{ - ZEN_TRACE_CPU("FetchChunk"); - auto It = Lookup.ChunkHashToChunkIndex.find(ChunkHash); - ZEN_ASSERT(It != Lookup.ChunkHashToChunkIndex.end()); - uint32_t ChunkIndex = It->second; - std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkLocations = GetChunkSequenceLocations(Lookup, ChunkIndex); - ZEN_ASSERT(!ChunkLocations.empty()); - CompositeBuffer Chunk = - OpenFileCache.GetRange(ChunkLocations[0].SequenceIndex, ChunkLocations[0].Offset, Content.ChunkedContent.ChunkRawSizes[ChunkIndex]); - if (!Chunk) - { - throw std::runtime_error(fmt::format("Unable to read chunk at {}, size {} from '{}'", - ChunkLocations[0].Offset, - Content.ChunkedContent.ChunkRawSizes[ChunkIndex], - Content.Paths[Lookup.SequenceIndexFirstPathIndex[ChunkLocations[0].SequenceIndex]])); - } - ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == ChunkHash); - return Chunk; -}; - -CompressedBuffer -BuildsOperationUploadFolder::GenerateBlock(const ChunkedFolderContent& Content, - const ChunkedContentLookup& Lookup, - const std::vector<uint32_t>& ChunksInBlock, - ChunkBlockDescription& OutBlockDescription) -{ - ZEN_TRACE_CPU("GenerateBlock"); - ReadFileCache OpenFileCache(m_DiskStats.OpenReadCount, - m_DiskStats.CurrentOpenFileCount, - m_DiskStats.ReadCount, - m_DiskStats.ReadByteCount, - m_Path, - Content, - Lookup, - 4); - - std::vector<std::pair<IoHash, FetchChunkFunc>> BlockContent; - BlockContent.reserve(ChunksInBlock.size()); - for (uint32_t ChunkIndex : ChunksInBlock) - { - BlockContent.emplace_back(std::make_pair( - Content.ChunkedContent.ChunkHashes[ChunkIndex], - [this, &Content, &Lookup, &OpenFileCache, ChunkIndex](const IoHash& ChunkHash) -> std::pair<uint64_t, CompositeBuffer> { - CompositeBuffer Chunk = FetchChunk(Content, Lookup, ChunkHash, OpenFileCache); - ZEN_ASSERT(Chunk); - uint64_t RawSize = Chunk.GetSize(); - - const bool ShouldCompressChunk = RawSize >= m_Options.MinimumSizeForCompressInBlock && - IsChunkCompressable(m_NonCompressableExtensionHashes, Content, Lookup, ChunkIndex); - - const OodleCompressionLevel CompressionLevel = - ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None; - return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid, CompressionLevel).GetCompressed()}; - })); - } - - return GenerateChunkBlock(std::move(BlockContent), OutBlockDescription); -}; - -CompressedBuffer -BuildsOperationUploadFolder::RebuildBlock(const ChunkedFolderContent& Content, - const ChunkedContentLookup& Lookup, - CompositeBuffer&& HeaderBuffer, - const std::vector<uint32_t>& ChunksInBlock) -{ - ZEN_TRACE_CPU("RebuildBlock"); - ReadFileCache OpenFileCache(m_DiskStats.OpenReadCount, - m_DiskStats.CurrentOpenFileCount, - m_DiskStats.ReadCount, - m_DiskStats.ReadByteCount, - m_Path, - Content, - Lookup, - 4); - - std::vector<SharedBuffer> ResultBuffers; - ResultBuffers.reserve(HeaderBuffer.GetSegments().size() + ChunksInBlock.size()); - ResultBuffers.insert(ResultBuffers.end(), HeaderBuffer.GetSegments().begin(), HeaderBuffer.GetSegments().end()); - for (uint32_t ChunkIndex : ChunksInBlock) - { - std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkLocations = GetChunkSequenceLocations(Lookup, ChunkIndex); - ZEN_ASSERT(!ChunkLocations.empty()); - CompositeBuffer Chunk = OpenFileCache.GetRange(ChunkLocations[0].SequenceIndex, - ChunkLocations[0].Offset, - Content.ChunkedContent.ChunkRawSizes[ChunkIndex]); - ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == Content.ChunkedContent.ChunkHashes[ChunkIndex]); - - const uint64_t RawSize = Chunk.GetSize(); - const bool ShouldCompressChunk = RawSize >= m_Options.MinimumSizeForCompressInBlock && - IsChunkCompressable(m_NonCompressableExtensionHashes, Content, Lookup, ChunkIndex); - - const OodleCompressionLevel CompressionLevel = ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None; - - CompositeBuffer CompressedChunk = - CompressedBuffer::Compress(std::move(Chunk), OodleCompressor::Mermaid, CompressionLevel).GetCompressed(); - ResultBuffers.insert(ResultBuffers.end(), CompressedChunk.GetSegments().begin(), CompressedChunk.GetSegments().end()); - } - return CompressedBuffer::FromCompressedNoValidate(CompositeBuffer(std::move(ResultBuffers))); -}; - -void -BuildsOperationUploadFolder::UploadBuildPart(ChunkingController& ChunkController, - ChunkingCache& ChunkCache, - uint32_t PartIndex, - const UploadPart& Part, - uint32_t PartStepOffset, - uint32_t StepCount) -{ - Stopwatch UploadTimer; - - ChunkingStatistics ChunkingStats; - FindBlocksStatistics FindBlocksStats; - ReuseBlocksStatistics ReuseBlocksStats; - UploadStatistics UploadStats; - GenerateBlocksStatistics GenerateBlocksStats; - - LooseChunksStatistics LooseChunksStats; - ChunkedFolderContent LocalContent; - - m_LogOutput.SetLogOperationProgress(PartStepOffset + (uint32_t)PartTaskSteps::ChunkPartContent, StepCount); - - Stopwatch ScanTimer; - { - std::unique_ptr<OperationLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Scan Folder")); - OperationLogOutput::ProgressBar& Progress(*ProgressBarPtr); - - FilteredRate FilteredBytesHashed; - FilteredBytesHashed.Start(); - LocalContent = ChunkFolderContent( - ChunkingStats, - m_IOWorkerPool, - m_Path, - Part.Content, - ChunkController, - ChunkCache, - m_LogOutput.GetProgressUpdateDelayMS(), - [&](bool IsAborted, bool IsPaused, std::ptrdiff_t) { - FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load()); - std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found", - ChunkingStats.FilesProcessed.load(), - Part.Content.Paths.size(), - NiceBytes(ChunkingStats.BytesHashed.load()), - NiceBytes(Part.TotalRawSize), - NiceNum(FilteredBytesHashed.GetCurrent()), - ChunkingStats.UniqueChunksFound.load(), - NiceBytes(ChunkingStats.UniqueBytesFound.load())); - Progress.UpdateState({.Task = "Scanning files ", - .Details = Details, - .TotalCount = Part.TotalRawSize, - .RemainingCount = Part.TotalRawSize - ChunkingStats.BytesHashed.load(), - .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, - false); - }, - m_AbortFlag, - m_PauseFlag); - FilteredBytesHashed.Stop(); - Progress.Finish(); - if (m_AbortFlag) - { - return; - } - } - - if (!m_Options.IsQuiet) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Found {} ({}) files divided into {} ({}) unique chunks in '{}' in {}. Average hash rate {}B/sec", - Part.Content.Paths.size(), - NiceBytes(Part.TotalRawSize), - ChunkingStats.UniqueChunksFound.load(), - NiceBytes(ChunkingStats.UniqueBytesFound.load()), - m_Path, - NiceTimeSpanMs(ScanTimer.GetElapsedTimeMs()), - NiceNum(GetBytesPerSecond(ChunkingStats.ElapsedWallTimeUS, ChunkingStats.BytesHashed))); - } - - const ChunkedContentLookup LocalLookup = BuildChunkedContentLookup(LocalContent); - - std::vector<size_t> ReuseBlockIndexes; - std::vector<uint32_t> NewBlockChunkIndexes; - - if (PartIndex == 0) - { - const PrepareBuildResult PrepBuildResult = m_PrepBuildResultFuture.get(); - - m_FindBlocksStats.FindBlockTimeMS = PrepBuildResult.ElapsedTimeMs; - m_FindBlocksStats.FoundBlockCount = PrepBuildResult.KnownBlocks.size(); - - if (!m_Options.IsQuiet) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Build prepare took {}. {} took {}, payload size {}{}", - NiceTimeSpanMs(PrepBuildResult.ElapsedTimeMs), - m_CreateBuild ? "PutBuild" : "GetBuild", - NiceTimeSpanMs(PrepBuildResult.PrepareBuildTimeMs), - NiceBytes(PrepBuildResult.PayloadSize), - m_Options.IgnoreExistingBlocks ? "" - : fmt::format(". Found {} blocks in {}", - PrepBuildResult.KnownBlocks.size(), - NiceTimeSpanMs(PrepBuildResult.FindBlocksTimeMs))); - } - - m_PreferredMultipartChunkSize = PrepBuildResult.PreferredMultipartChunkSize; - - m_LargeAttachmentSize = m_Options.AllowMultiparts ? m_PreferredMultipartChunkSize * 4u : (std::uint64_t)-1; - - m_KnownBlocks = std::move(PrepBuildResult.KnownBlocks); - } - - ZEN_ASSERT(m_PreferredMultipartChunkSize != 0); - ZEN_ASSERT(m_LargeAttachmentSize != 0); - - m_LogOutput.SetLogOperationProgress(PartStepOffset + (uint32_t)PartTaskSteps::CalculateDelta, StepCount); - - Stopwatch BlockArrangeTimer; - - std::vector<std::uint32_t> LooseChunkIndexes; - { - bool EnableBlocks = true; - std::vector<std::uint32_t> BlockChunkIndexes; - for (uint32_t ChunkIndex = 0; ChunkIndex < LocalContent.ChunkedContent.ChunkHashes.size(); ChunkIndex++) - { - const uint64_t ChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; - if (!EnableBlocks || ChunkRawSize == 0 || ChunkRawSize > m_Options.BlockParameters.MaxChunkEmbedSize) - { - LooseChunkIndexes.push_back(ChunkIndex); - LooseChunksStats.ChunkByteCount += ChunkRawSize; - } - else - { - BlockChunkIndexes.push_back(ChunkIndex); - FindBlocksStats.PotentialChunkByteCount += ChunkRawSize; - } - } - FindBlocksStats.PotentialChunkCount += BlockChunkIndexes.size(); - LooseChunksStats.ChunkCount = LooseChunkIndexes.size(); - - if (m_Options.IgnoreExistingBlocks) - { - if (!m_Options.IsQuiet) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, "Ignoring any existing blocks in store"); - } - NewBlockChunkIndexes = std::move(BlockChunkIndexes); - } - else - { - ReuseBlockIndexes = FindReuseBlocks(m_LogOutput, - m_Options.BlockReuseMinPercentLimit, - m_Options.IsVerbose, - ReuseBlocksStats, - m_KnownBlocks, - LocalContent.ChunkedContent.ChunkHashes, - BlockChunkIndexes, - NewBlockChunkIndexes); - FindBlocksStats.AcceptedBlockCount += ReuseBlockIndexes.size(); - - for (const ChunkBlockDescription& Description : m_KnownBlocks) - { - for (uint32_t ChunkRawLength : Description.ChunkRawLengths) - { - FindBlocksStats.FoundBlockByteCount += ChunkRawLength; - } - FindBlocksStats.FoundBlockChunkCount += Description.ChunkRawHashes.size(); - } - } - } - - std::vector<std::vector<uint32_t>> NewBlockChunks; - ArrangeChunksIntoBlocks(LocalContent, LocalLookup, NewBlockChunkIndexes, NewBlockChunks); - - FindBlocksStats.NewBlocksCount += NewBlockChunks.size(); - for (uint32_t ChunkIndex : NewBlockChunkIndexes) - { - FindBlocksStats.NewBlocksChunkByteCount += LocalContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; - } - FindBlocksStats.NewBlocksChunkCount += NewBlockChunkIndexes.size(); - - const double AcceptedByteCountPercent = FindBlocksStats.PotentialChunkByteCount > 0 - ? (100.0 * ReuseBlocksStats.AcceptedRawByteCount / FindBlocksStats.PotentialChunkByteCount) - : 0.0; - - const double AcceptedReduntantByteCountPercent = - ReuseBlocksStats.AcceptedByteCount > 0 ? (100.0 * ReuseBlocksStats.AcceptedReduntantByteCount) / - (ReuseBlocksStats.AcceptedByteCount + ReuseBlocksStats.AcceptedReduntantByteCount) - : 0.0; - if (!m_Options.IsQuiet) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Found {} chunks in {} ({}) blocks eligible for reuse in {}\n" - " Reusing {} ({}) matching chunks in {} blocks ({:.1f}%)\n" - " Accepting {} ({}) redundant chunks ({:.1f}%)\n" - " Rejected {} ({}) chunks in {} blocks\n" - " Arranged {} ({}) chunks in {} new blocks\n" - " Keeping {} ({}) chunks as loose chunks\n" - " Discovery completed in {}", - FindBlocksStats.FoundBlockChunkCount, - FindBlocksStats.FoundBlockCount, - NiceBytes(FindBlocksStats.FoundBlockByteCount), - NiceTimeSpanMs(FindBlocksStats.FindBlockTimeMS), - - ReuseBlocksStats.AcceptedChunkCount, - NiceBytes(ReuseBlocksStats.AcceptedRawByteCount), - FindBlocksStats.AcceptedBlockCount, - AcceptedByteCountPercent, - - ReuseBlocksStats.AcceptedReduntantChunkCount, - NiceBytes(ReuseBlocksStats.AcceptedReduntantByteCount), - AcceptedReduntantByteCountPercent, - - ReuseBlocksStats.RejectedChunkCount, - NiceBytes(ReuseBlocksStats.RejectedByteCount), - ReuseBlocksStats.RejectedBlockCount, - - FindBlocksStats.NewBlocksChunkCount, - NiceBytes(FindBlocksStats.NewBlocksChunkByteCount), - FindBlocksStats.NewBlocksCount, - - LooseChunksStats.ChunkCount, - NiceBytes(LooseChunksStats.ChunkByteCount), - - NiceTimeSpanMs(BlockArrangeTimer.GetElapsedTimeMs())); - } - - m_LogOutput.SetLogOperationProgress(PartStepOffset + (uint32_t)PartTaskSteps::GenerateBlocks, StepCount); - GeneratedBlocks NewBlocks; - - if (!NewBlockChunks.empty()) - { - Stopwatch GenerateBuildBlocksTimer; - auto __ = MakeGuard([&]() { - uint64_t BlockGenerateTimeUs = GenerateBuildBlocksTimer.GetElapsedTimeUs(); - if (!m_Options.IsQuiet) - { - ZEN_OPERATION_LOG_INFO( - m_LogOutput, - "Generated {} ({}) and uploaded {} ({}) blocks in {}. Generate speed: {}B/sec. Transfer speed {}bits/sec.", - GenerateBlocksStats.GeneratedBlockCount.load(), - NiceBytes(GenerateBlocksStats.GeneratedBlockByteCount), - UploadStats.BlockCount.load(), - NiceBytes(UploadStats.BlocksBytes.load()), - NiceTimeSpanMs(BlockGenerateTimeUs / 1000), - NiceNum(GetBytesPerSecond(GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS, - GenerateBlocksStats.GeneratedBlockByteCount)), - NiceNum(GetBytesPerSecond(UploadStats.ElapsedWallTimeUS, UploadStats.BlocksBytes * 8))); - } - }); - GenerateBuildBlocks(LocalContent, LocalLookup, NewBlockChunks, NewBlocks, GenerateBlocksStats, UploadStats); - } - - m_LogOutput.SetLogOperationProgress(PartStepOffset + (uint32_t)PartTaskSteps::BuildPartManifest, StepCount); - - CbObject PartManifest; - { - CbObjectWriter PartManifestWriter; - Stopwatch ManifestGenerationTimer; - auto __ = MakeGuard([&]() { - if (!m_Options.IsQuiet) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Generated build part manifest in {} ({})", - NiceTimeSpanMs(ManifestGenerationTimer.GetElapsedTimeMs()), - NiceBytes(PartManifestWriter.GetSaveSize())); - } - }); - - PartManifestWriter.BeginObject("chunker"sv); - { - PartManifestWriter.AddString("name"sv, ChunkController.GetName()); - PartManifestWriter.AddObject("parameters"sv, ChunkController.GetParameters()); - } - PartManifestWriter.EndObject(); // chunker - - std::vector<IoHash> AllChunkBlockHashes; - std::vector<ChunkBlockDescription> AllChunkBlockDescriptions; - AllChunkBlockHashes.reserve(ReuseBlockIndexes.size() + NewBlocks.BlockDescriptions.size()); - AllChunkBlockDescriptions.reserve(ReuseBlockIndexes.size() + NewBlocks.BlockDescriptions.size()); - for (size_t ReuseBlockIndex : ReuseBlockIndexes) - { - AllChunkBlockDescriptions.push_back(m_KnownBlocks[ReuseBlockIndex]); - AllChunkBlockHashes.push_back(m_KnownBlocks[ReuseBlockIndex].BlockHash); - } - AllChunkBlockDescriptions.insert(AllChunkBlockDescriptions.end(), - NewBlocks.BlockDescriptions.begin(), - NewBlocks.BlockDescriptions.end()); - for (const ChunkBlockDescription& BlockDescription : NewBlocks.BlockDescriptions) - { - AllChunkBlockHashes.push_back(BlockDescription.BlockHash); - } - - std::vector<IoHash> AbsoluteChunkHashes; - if (m_Options.DoExtraContentValidation) - { - tsl::robin_map<IoHash, size_t, IoHash::Hasher> ChunkHashToAbsoluteChunkIndex; - AbsoluteChunkHashes.reserve(LocalContent.ChunkedContent.ChunkHashes.size()); - for (uint32_t ChunkIndex : LooseChunkIndexes) - { - ChunkHashToAbsoluteChunkIndex.insert({LocalContent.ChunkedContent.ChunkHashes[ChunkIndex], AbsoluteChunkHashes.size()}); - AbsoluteChunkHashes.push_back(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]); - } - for (const ChunkBlockDescription& Block : AllChunkBlockDescriptions) - { - for (const IoHash& ChunkHash : Block.ChunkRawHashes) - { - ChunkHashToAbsoluteChunkIndex.insert({ChunkHash, AbsoluteChunkHashes.size()}); - AbsoluteChunkHashes.push_back(ChunkHash); - } - } - for (const IoHash& ChunkHash : LocalContent.ChunkedContent.ChunkHashes) - { - ZEN_ASSERT(AbsoluteChunkHashes[ChunkHashToAbsoluteChunkIndex.at(ChunkHash)] == ChunkHash); - ZEN_ASSERT(LocalContent.ChunkedContent.ChunkHashes[LocalLookup.ChunkHashToChunkIndex.at(ChunkHash)] == ChunkHash); - } - for (const uint32_t ChunkIndex : LocalContent.ChunkedContent.ChunkOrders) - { - ZEN_ASSERT(AbsoluteChunkHashes[ChunkHashToAbsoluteChunkIndex.at(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex])] == - LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]); - ZEN_ASSERT(LocalLookup.ChunkHashToChunkIndex.at(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]) == ChunkIndex); - } - } - std::vector<uint32_t> AbsoluteChunkOrders = CalculateAbsoluteChunkOrders(LocalContent.ChunkedContent.ChunkHashes, - LocalContent.ChunkedContent.ChunkOrders, - LocalLookup.ChunkHashToChunkIndex, - LooseChunkIndexes, - AllChunkBlockDescriptions); - - if (m_Options.DoExtraContentValidation) - { - for (uint32_t ChunkOrderIndex = 0; ChunkOrderIndex < LocalContent.ChunkedContent.ChunkOrders.size(); ChunkOrderIndex++) - { - uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[ChunkOrderIndex]; - uint32_t AbsoluteChunkIndex = AbsoluteChunkOrders[ChunkOrderIndex]; - const IoHash& LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; - const IoHash& AbsoluteChunkHash = AbsoluteChunkHashes[AbsoluteChunkIndex]; - ZEN_ASSERT(LocalChunkHash == AbsoluteChunkHash); - } - } - - WriteBuildContentToCompactBinary(PartManifestWriter, - LocalContent.Platform, - LocalContent.Paths, - LocalContent.RawHashes, - LocalContent.RawSizes, - LocalContent.Attributes, - LocalContent.ChunkedContent.SequenceRawHashes, - LocalContent.ChunkedContent.ChunkCounts, - LocalContent.ChunkedContent.ChunkHashes, - LocalContent.ChunkedContent.ChunkRawSizes, - AbsoluteChunkOrders, - LooseChunkIndexes, - AllChunkBlockHashes); - - if (m_Options.DoExtraContentValidation) - { - ChunkedFolderContent VerifyFolderContent; - - std::vector<uint32_t> OutAbsoluteChunkOrders; - std::vector<IoHash> OutLooseChunkHashes; - std::vector<uint64_t> OutLooseChunkRawSizes; - std::vector<IoHash> OutBlockRawHashes; - ReadBuildContentFromCompactBinary(PartManifestWriter.Save(), - VerifyFolderContent.Platform, - VerifyFolderContent.Paths, - VerifyFolderContent.RawHashes, - VerifyFolderContent.RawSizes, - VerifyFolderContent.Attributes, - VerifyFolderContent.ChunkedContent.SequenceRawHashes, - VerifyFolderContent.ChunkedContent.ChunkCounts, - OutAbsoluteChunkOrders, - OutLooseChunkHashes, - OutLooseChunkRawSizes, - OutBlockRawHashes); - ZEN_ASSERT(OutBlockRawHashes == AllChunkBlockHashes); - - for (uint32_t OrderIndex = 0; OrderIndex < OutAbsoluteChunkOrders.size(); OrderIndex++) - { - uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[OrderIndex]; - const IoHash LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; - - uint32_t VerifyChunkIndex = OutAbsoluteChunkOrders[OrderIndex]; - const IoHash VerifyChunkHash = AbsoluteChunkHashes[VerifyChunkIndex]; - - ZEN_ASSERT(LocalChunkHash == VerifyChunkHash); - } - - CalculateLocalChunkOrders(OutAbsoluteChunkOrders, - OutLooseChunkHashes, - OutLooseChunkRawSizes, - AllChunkBlockDescriptions, - VerifyFolderContent.ChunkedContent.ChunkHashes, - VerifyFolderContent.ChunkedContent.ChunkRawSizes, - VerifyFolderContent.ChunkedContent.ChunkOrders, - m_Options.DoExtraContentValidation); - - ZEN_ASSERT(LocalContent.Paths == VerifyFolderContent.Paths); - ZEN_ASSERT(LocalContent.RawHashes == VerifyFolderContent.RawHashes); - ZEN_ASSERT(LocalContent.RawSizes == VerifyFolderContent.RawSizes); - ZEN_ASSERT(LocalContent.Attributes == VerifyFolderContent.Attributes); - ZEN_ASSERT(LocalContent.ChunkedContent.SequenceRawHashes == VerifyFolderContent.ChunkedContent.SequenceRawHashes); - ZEN_ASSERT(LocalContent.ChunkedContent.ChunkCounts == VerifyFolderContent.ChunkedContent.ChunkCounts); - - for (uint32_t OrderIndex = 0; OrderIndex < LocalContent.ChunkedContent.ChunkOrders.size(); OrderIndex++) - { - uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[OrderIndex]; - const IoHash LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; - uint64_t LocalChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[LocalChunkIndex]; - - uint32_t VerifyChunkIndex = VerifyFolderContent.ChunkedContent.ChunkOrders[OrderIndex]; - const IoHash VerifyChunkHash = VerifyFolderContent.ChunkedContent.ChunkHashes[VerifyChunkIndex]; - uint64_t VerifyChunkRawSize = VerifyFolderContent.ChunkedContent.ChunkRawSizes[VerifyChunkIndex]; - - ZEN_ASSERT(LocalChunkHash == VerifyChunkHash); - ZEN_ASSERT(LocalChunkRawSize == VerifyChunkRawSize); - } - } - PartManifest = PartManifestWriter.Save(); - } - - m_LogOutput.SetLogOperationProgress(PartStepOffset + (uint32_t)PartTaskSteps::UploadBuildPart, StepCount); - - Stopwatch PutBuildPartResultTimer; - std::pair<IoHash, std::vector<IoHash>> PutBuildPartResult = - m_Storage.BuildStorage->PutBuildPart(m_BuildId, Part.PartId, Part.PartName, PartManifest); - if (!m_Options.IsQuiet) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "PutBuildPart took {}, payload size {}. {} attachments are needed.", - NiceTimeSpanMs(PutBuildPartResultTimer.GetElapsedTimeMs()), - NiceBytes(PartManifest.GetSize()), - PutBuildPartResult.second.size()); - } - IoHash PartHash = PutBuildPartResult.first; - - auto UploadAttachments = - [this, &LooseChunksStats, &UploadStats, &LocalContent, &LocalLookup, &NewBlockChunks, &NewBlocks, &LooseChunkIndexes]( - std::span<IoHash> RawHashes, - std::vector<IoHash>& OutUnknownChunks) { - if (!m_AbortFlag) - { - UploadStatistics TempUploadStats; - LooseChunksStatistics TempLooseChunksStats; - - Stopwatch TempUploadTimer; - auto __ = MakeGuard([&]() { - if (!m_Options.IsQuiet) - { - uint64_t TempChunkUploadTimeUs = TempUploadTimer.GetElapsedTimeUs(); - ZEN_OPERATION_LOG_INFO( - m_LogOutput, - "Uploaded {} ({}) blocks. " - "Compressed {} ({} {}B/s) and uploaded {} ({}) chunks. " - "Transferred {} ({}bits/s) in {}", - TempUploadStats.BlockCount.load(), - NiceBytes(TempUploadStats.BlocksBytes), - - TempLooseChunksStats.CompressedChunkCount.load(), - NiceBytes(TempLooseChunksStats.CompressedChunkBytes.load()), - NiceNum(GetBytesPerSecond(TempLooseChunksStats.CompressChunksElapsedWallTimeUS, - TempLooseChunksStats.ChunkByteCount)), - TempUploadStats.ChunkCount.load(), - NiceBytes(TempUploadStats.ChunksBytes), - - NiceBytes(TempUploadStats.BlocksBytes + TempUploadStats.ChunksBytes), - NiceNum(GetBytesPerSecond(TempUploadStats.ElapsedWallTimeUS, TempUploadStats.ChunksBytes * 8)), - NiceTimeSpanMs(TempChunkUploadTimeUs / 1000)); - } - }); - UploadPartBlobs(LocalContent, - LocalLookup, - RawHashes, - NewBlockChunks, - NewBlocks, - LooseChunkIndexes, - m_LargeAttachmentSize, - TempUploadStats, - TempLooseChunksStats, - OutUnknownChunks); - UploadStats += TempUploadStats; - LooseChunksStats += TempLooseChunksStats; - } - }; - - m_LogOutput.SetLogOperationProgress(PartStepOffset + (uint32_t)PartTaskSteps::UploadAttachments, StepCount); - - std::vector<IoHash> UnknownChunks; - if (m_Options.IgnoreExistingBlocks) - { - if (m_Options.IsVerbose) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "PutBuildPart uploading all attachments, needs are: {}", - FormatArray<IoHash>(PutBuildPartResult.second, "\n "sv)); - } - - std::vector<IoHash> ForceUploadChunkHashes; - ForceUploadChunkHashes.reserve(LooseChunkIndexes.size()); - - for (uint32_t ChunkIndex : LooseChunkIndexes) - { - ForceUploadChunkHashes.push_back(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]); - } - - for (size_t BlockIndex = 0; BlockIndex < NewBlocks.BlockHeaders.size(); BlockIndex++) - { - if (NewBlocks.BlockHeaders[BlockIndex]) - { - // Block was not uploaded during generation - ForceUploadChunkHashes.push_back(NewBlocks.BlockDescriptions[BlockIndex].BlockHash); - } - } - UploadAttachments(ForceUploadChunkHashes, UnknownChunks); - } - else if (!PutBuildPartResult.second.empty()) - { - if (m_Options.IsVerbose) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "PutBuildPart needs attachments: {}", - FormatArray<IoHash>(PutBuildPartResult.second, "\n "sv)); - } - UploadAttachments(PutBuildPartResult.second, UnknownChunks); - } - - auto BuildUnkownChunksResponse = [](const std::vector<IoHash>& UnknownChunks, bool WillRetry) { - return fmt::format( - "The following build blobs was reported as needed for upload but was reported as existing at the start of the " - "operation.{}{}", - WillRetry ? " Treating this as a transient inconsistency issue and will attempt to retry finalization."sv : ""sv, - FormatArray<IoHash>(UnknownChunks, "\n "sv)); - }; - - if (!UnknownChunks.empty()) - { - ZEN_OPERATION_LOG_WARN(m_LogOutput, "{}", BuildUnkownChunksResponse(UnknownChunks, /*WillRetry*/ true)); - } - - uint32_t FinalizeBuildPartRetryCount = 5; - while (!m_AbortFlag && (FinalizeBuildPartRetryCount--) > 0) - { - Stopwatch FinalizeBuildPartTimer; - std::vector<IoHash> Needs = m_Storage.BuildStorage->FinalizeBuildPart(m_BuildId, Part.PartId, PartHash); - if (!m_Options.IsQuiet) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "FinalizeBuildPart took {}. {} attachments are missing.", - NiceTimeSpanMs(FinalizeBuildPartTimer.GetElapsedTimeMs()), - Needs.size()); - } - if (Needs.empty()) - { - break; - } - if (m_Options.IsVerbose) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, "FinalizeBuildPart needs attachments: {}", FormatArray<IoHash>(Needs, "\n "sv)); - } - - std::vector<IoHash> RetryUnknownChunks; - UploadAttachments(Needs, RetryUnknownChunks); - if (RetryUnknownChunks == UnknownChunks) - { - if (FinalizeBuildPartRetryCount > 0) - { - // Back off a bit - Sleep(1000); - } - } - else - { - UnknownChunks = RetryUnknownChunks; - ZEN_OPERATION_LOG_WARN(m_LogOutput, - "{}", - BuildUnkownChunksResponse(UnknownChunks, /*WillRetry*/ FinalizeBuildPartRetryCount != 0)); - } - } - - if (!UnknownChunks.empty()) - { - throw std::runtime_error(BuildUnkownChunksResponse(UnknownChunks, /*WillRetry*/ false)); - } - - if (!NewBlocks.BlockDescriptions.empty() && !m_AbortFlag) - { - uint64_t UploadBlockMetadataCount = 0; - Stopwatch UploadBlockMetadataTimer; - - uint32_t FailedMetadataUploadCount = 1; - int32_t MetadataUploadRetryCount = 3; - while ((MetadataUploadRetryCount-- > 0) && (FailedMetadataUploadCount > 0)) - { - FailedMetadataUploadCount = 0; - for (size_t BlockIndex = 0; BlockIndex < NewBlocks.BlockDescriptions.size(); BlockIndex++) - { - if (m_AbortFlag) - { - break; - } - const IoHash& BlockHash = NewBlocks.BlockDescriptions[BlockIndex].BlockHash; - if (!NewBlocks.MetaDataHasBeenUploaded[BlockIndex]) - { - const CbObject BlockMetaData = - BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]); - if (m_Storage.CacheStorage && m_Options.PopulateCache) - { - m_Storage.CacheStorage->PutBlobMetadatas(m_BuildId, - std::vector<IoHash>({BlockHash}), - std::vector<CbObject>({BlockMetaData})); - } - bool MetadataSucceeded = m_Storage.BuildStorage->PutBlockMetadata(m_BuildId, BlockHash, BlockMetaData); - if (MetadataSucceeded) - { - UploadStats.BlocksBytes += BlockMetaData.GetSize(); - NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; - UploadBlockMetadataCount++; - } - else - { - FailedMetadataUploadCount++; - } - } - } - } - if (UploadBlockMetadataCount > 0) - { - uint64_t ElapsedUS = UploadBlockMetadataTimer.GetElapsedTimeUs(); - UploadStats.ElapsedWallTimeUS += ElapsedUS; - if (!m_Options.IsQuiet) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Uploaded metadata for {} blocks in {}", - UploadBlockMetadataCount, - NiceTimeSpanMs(ElapsedUS / 1000)); - } - } - - // The newly generated blocks are now known blocks so the next part upload can use those blocks as well - m_KnownBlocks.insert(m_KnownBlocks.end(), NewBlocks.BlockDescriptions.begin(), NewBlocks.BlockDescriptions.end()); - } - - m_LogOutput.SetLogOperationProgress(PartStepOffset + (uint32_t)PartTaskSteps::PutBuildPartStats, StepCount); - - m_Storage.BuildStorage->PutBuildPartStats( - m_BuildId, - Part.PartId, - {{"totalSize", double(Part.LocalFolderScanStats.FoundFileByteCount.load())}, - {"reusedRatio", AcceptedByteCountPercent / 100.0}, - {"reusedBlockCount", double(FindBlocksStats.AcceptedBlockCount)}, - {"reusedBlockByteCount", double(ReuseBlocksStats.AcceptedRawByteCount)}, - {"newBlockCount", double(FindBlocksStats.NewBlocksCount)}, - {"newBlockByteCount", double(FindBlocksStats.NewBlocksChunkByteCount)}, - {"uploadedCount", double(UploadStats.BlockCount.load() + UploadStats.ChunkCount.load())}, - {"uploadedByteCount", double(UploadStats.BlocksBytes.load() + UploadStats.ChunksBytes.load())}, - {"uploadedBytesPerSec", - double(GetBytesPerSecond(UploadStats.ElapsedWallTimeUS, UploadStats.ChunksBytes + UploadStats.BlocksBytes))}, - {"elapsedTimeSec", double(UploadTimer.GetElapsedTimeMs() / 1000.0)}}); - - m_LocalFolderScanStats += Part.LocalFolderScanStats; - m_ChunkingStats += ChunkingStats; - m_FindBlocksStats += FindBlocksStats; - m_ReuseBlocksStats += ReuseBlocksStats; - m_UploadStats += UploadStats; - m_GenerateBlocksStats += GenerateBlocksStats; - m_LooseChunksStats += LooseChunksStats; -} - -void -BuildsOperationUploadFolder::UploadPartBlobs(const ChunkedFolderContent& Content, - const ChunkedContentLookup& Lookup, - std::span<IoHash> RawHashes, - const std::vector<std::vector<uint32_t>>& NewBlockChunks, - GeneratedBlocks& NewBlocks, - std::span<const uint32_t> LooseChunkIndexes, - const std::uint64_t LargeAttachmentSize, - UploadStatistics& TempUploadStats, - LooseChunksStatistics& TempLooseChunksStats, - std::vector<IoHash>& OutUnknownChunks) -{ - ZEN_TRACE_CPU("UploadPartBlobs"); - { - std::unique_ptr<OperationLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Upload Blobs")); - OperationLogOutput::ProgressBar& Progress(*ProgressBarPtr); - - WorkerThreadPool& ReadChunkPool = m_IOWorkerPool; - WorkerThreadPool& UploadChunkPool = m_NetworkPool; - - FilteredRate FilteredGenerateBlockBytesPerSecond; - FilteredRate FilteredCompressedBytesPerSecond; - FilteredRate FilteredUploadedBytesPerSecond; - - ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - - std::atomic<size_t> UploadedBlockSize = 0; - std::atomic<size_t> UploadedBlockCount = 0; - std::atomic<size_t> UploadedRawChunkSize = 0; - std::atomic<size_t> UploadedCompressedChunkSize = 0; - std::atomic<uint32_t> UploadedChunkCount = 0; - - tsl::robin_map<uint32_t, uint32_t> ChunkIndexToLooseChunkOrderIndex; - ChunkIndexToLooseChunkOrderIndex.reserve(LooseChunkIndexes.size()); - for (uint32_t OrderIndex = 0; OrderIndex < LooseChunkIndexes.size(); OrderIndex++) - { - ChunkIndexToLooseChunkOrderIndex.insert_or_assign(LooseChunkIndexes[OrderIndex], OrderIndex); - } - - std::vector<size_t> BlockIndexes; - std::vector<uint32_t> LooseChunkOrderIndexes; - - uint64_t TotalLooseChunksSize = 0; - uint64_t TotalBlocksSize = 0; - for (const IoHash& RawHash : RawHashes) - { - if (auto It = NewBlocks.BlockHashToBlockIndex.find(RawHash); It != NewBlocks.BlockHashToBlockIndex.end()) - { - BlockIndexes.push_back(It->second); - TotalBlocksSize += NewBlocks.BlockSizes[It->second]; - } - else if (auto ChunkIndexIt = Lookup.ChunkHashToChunkIndex.find(RawHash); ChunkIndexIt != Lookup.ChunkHashToChunkIndex.end()) - { - const uint32_t ChunkIndex = ChunkIndexIt->second; - if (auto LooseOrderIndexIt = ChunkIndexToLooseChunkOrderIndex.find(ChunkIndex); - LooseOrderIndexIt != ChunkIndexToLooseChunkOrderIndex.end()) - { - LooseChunkOrderIndexes.push_back(LooseOrderIndexIt->second); - TotalLooseChunksSize += Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; - } - } - else - { - OutUnknownChunks.push_back(RawHash); - } - } - if (BlockIndexes.empty() && LooseChunkOrderIndexes.empty()) - { - return; - } - - uint64_t TotalRawSize = TotalLooseChunksSize + TotalBlocksSize; - - const size_t UploadBlockCount = BlockIndexes.size(); - const uint32_t UploadChunkCount = gsl::narrow<uint32_t>(LooseChunkOrderIndexes.size()); - - auto AsyncUploadBlock = [this, - &Work, - &NewBlocks, - UploadBlockCount, - &UploadedBlockCount, - UploadChunkCount, - &UploadedChunkCount, - &UploadedBlockSize, - &TempUploadStats, - &FilteredUploadedBytesPerSecond, - &UploadChunkPool](const size_t BlockIndex, - const IoHash BlockHash, - CompositeBuffer&& Payload, - std::atomic<uint64_t>& QueuedPendingInMemoryBlocksForUpload) { - bool IsInMemoryBlock = true; - if (QueuedPendingInMemoryBlocksForUpload.load() > 16) - { - ZEN_TRACE_CPU("AsyncUploadBlock_WriteTempBlock"); - std::filesystem::path TempFilePath = m_Options.TempDir / (BlockHash.ToHexString()); - Payload = CompositeBuffer(WriteToTempFile(std::move(Payload), TempFilePath)); - IsInMemoryBlock = false; - } - else - { - QueuedPendingInMemoryBlocksForUpload++; - } - - Work.ScheduleWork( - UploadChunkPool, - [this, - &QueuedPendingInMemoryBlocksForUpload, - &NewBlocks, - UploadBlockCount, - &UploadedBlockCount, - UploadChunkCount, - &UploadedChunkCount, - &UploadedBlockSize, - &TempUploadStats, - &FilteredUploadedBytesPerSecond, - IsInMemoryBlock, - BlockIndex, - BlockHash, - Payload = CompositeBuffer(std::move(Payload))](std::atomic<bool>&) mutable { - auto _ = MakeGuard([IsInMemoryBlock, &QueuedPendingInMemoryBlocksForUpload] { - if (IsInMemoryBlock) - { - QueuedPendingInMemoryBlocksForUpload--; - } - }); - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("AsyncUploadBlock"); - - const uint64_t PayloadSize = Payload.GetSize(); - - FilteredUploadedBytesPerSecond.Start(); - const CbObject BlockMetaData = - BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]); - - if (m_Storage.CacheStorage && m_Options.PopulateCache) - { - m_Storage.CacheStorage->PutBuildBlob(m_BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload); - } - m_Storage.BuildStorage->PutBuildBlob(m_BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload); - if (m_Options.IsVerbose) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Uploaded block {} ({}) containing {} chunks", - BlockHash, - NiceBytes(PayloadSize), - NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); - } - UploadedBlockSize += PayloadSize; - TempUploadStats.BlocksBytes += PayloadSize; - - if (m_Storage.CacheStorage && m_Options.PopulateCache) - { - m_Storage.CacheStorage->PutBlobMetadatas(m_BuildId, - std::vector<IoHash>({BlockHash}), - std::vector<CbObject>({BlockMetaData})); - } - bool MetadataSucceeded = m_Storage.BuildStorage->PutBlockMetadata(m_BuildId, BlockHash, BlockMetaData); - if (MetadataSucceeded) - { - if (m_Options.IsVerbose) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Uploaded block {} metadata ({})", - BlockHash, - NiceBytes(BlockMetaData.GetSize())); - } - - NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true; - TempUploadStats.BlocksBytes += BlockMetaData.GetSize(); - } - - TempUploadStats.BlockCount++; - - UploadedBlockCount++; - if (UploadedBlockCount == UploadBlockCount && UploadedChunkCount == UploadChunkCount) - { - FilteredUploadedBytesPerSecond.Stop(); - } - } - }); - }; - - auto AsyncUploadLooseChunk = [this, - LargeAttachmentSize, - &Work, - &UploadChunkPool, - &FilteredUploadedBytesPerSecond, - &UploadedBlockCount, - &UploadedChunkCount, - UploadBlockCount, - UploadChunkCount, - &UploadedCompressedChunkSize, - &UploadedRawChunkSize, - &TempUploadStats](const IoHash& RawHash, uint64_t RawSize, CompositeBuffer&& Payload) { - Work.ScheduleWork( - UploadChunkPool, - [this, - &Work, - LargeAttachmentSize, - &FilteredUploadedBytesPerSecond, - &UploadChunkPool, - &UploadedBlockCount, - &UploadedChunkCount, - UploadBlockCount, - UploadChunkCount, - &UploadedCompressedChunkSize, - &UploadedRawChunkSize, - &TempUploadStats, - RawHash, - RawSize, - Payload = CompositeBuffer(std::move(Payload))](std::atomic<bool>&) mutable { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("AsyncUploadLooseChunk"); - - const uint64_t PayloadSize = Payload.GetSize(); - - if (m_Storage.CacheStorage && m_Options.PopulateCache) - { - m_Storage.CacheStorage->PutBuildBlob(m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); - } - - if (PayloadSize >= LargeAttachmentSize) - { - ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart"); - TempUploadStats.MultipartAttachmentCount++; - std::vector<std::function<void()>> MultipartWork = m_Storage.BuildStorage->PutLargeBuildBlob( - m_BuildId, - RawHash, - ZenContentType::kCompressedBinary, - PayloadSize, - [Payload = std::move(Payload), &FilteredUploadedBytesPerSecond](uint64_t Offset, - uint64_t Size) mutable -> IoBuffer { - FilteredUploadedBytesPerSecond.Start(); - - IoBuffer PartPayload = Payload.Mid(Offset, Size).Flatten().AsIoBuffer(); - PartPayload.SetContentType(ZenContentType::kBinary); - return PartPayload; - }, - [RawSize, - &TempUploadStats, - &UploadedCompressedChunkSize, - &UploadChunkPool, - &UploadedBlockCount, - UploadBlockCount, - &UploadedChunkCount, - UploadChunkCount, - &FilteredUploadedBytesPerSecond, - &UploadedRawChunkSize](uint64_t SentBytes, bool IsComplete) { - TempUploadStats.ChunksBytes += SentBytes; - UploadedCompressedChunkSize += SentBytes; - if (IsComplete) - { - TempUploadStats.ChunkCount++; - UploadedChunkCount++; - if (UploadedBlockCount == UploadBlockCount && UploadedChunkCount == UploadChunkCount) - { - FilteredUploadedBytesPerSecond.Stop(); - } - UploadedRawChunkSize += RawSize; - } - }); - for (auto& WorkPart : MultipartWork) - { - Work.ScheduleWork(UploadChunkPool, [Work = std::move(WorkPart)](std::atomic<bool>& AbortFlag) { - ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart_Work"); - if (!AbortFlag) - { - Work(); - } - }); - } - if (m_Options.IsVerbose) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, "Uploaded multipart chunk {} ({})", RawHash, NiceBytes(PayloadSize)); - } - } - else - { - ZEN_TRACE_CPU("AsyncUploadLooseChunk_Singlepart"); - m_Storage.BuildStorage->PutBuildBlob(m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); - if (m_Options.IsVerbose) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, "Uploaded chunk {} ({})", RawHash, NiceBytes(PayloadSize)); - } - TempUploadStats.ChunksBytes += Payload.GetSize(); - TempUploadStats.ChunkCount++; - UploadedCompressedChunkSize += Payload.GetSize(); - UploadedRawChunkSize += RawSize; - UploadedChunkCount++; - if (UploadedChunkCount == UploadChunkCount) - { - FilteredUploadedBytesPerSecond.Stop(); - } - } - } - }); - }; - - std::vector<size_t> GenerateBlockIndexes; - - std::atomic<uint64_t> GeneratedBlockCount = 0; - std::atomic<uint64_t> GeneratedBlockByteCount = 0; - - std::atomic<uint64_t> QueuedPendingInMemoryBlocksForUpload = 0; - - // Start generation of any non-prebuilt blocks and schedule upload - for (const size_t BlockIndex : BlockIndexes) - { - const IoHash& BlockHash = NewBlocks.BlockDescriptions[BlockIndex].BlockHash; - if (!m_AbortFlag) - { - Work.ScheduleWork( - ReadChunkPool, - [this, - BlockHash = IoHash(BlockHash), - BlockIndex, - &FilteredGenerateBlockBytesPerSecond, - &Content, - &Lookup, - &NewBlocks, - &NewBlockChunks, - &GenerateBlockIndexes, - &GeneratedBlockCount, - &GeneratedBlockByteCount, - &AsyncUploadBlock, - &QueuedPendingInMemoryBlocksForUpload](std::atomic<bool>&) { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("UploadPartBlobs_GenerateBlock"); - - FilteredGenerateBlockBytesPerSecond.Start(); - - Stopwatch GenerateTimer; - CompositeBuffer Payload; - if (NewBlocks.BlockHeaders[BlockIndex]) - { - Payload = - RebuildBlock(Content, Lookup, std::move(NewBlocks.BlockHeaders[BlockIndex]), NewBlockChunks[BlockIndex]) - .GetCompressed(); - } - else - { - ChunkBlockDescription BlockDescription; - CompressedBuffer CompressedBlock = - GenerateBlock(Content, Lookup, NewBlockChunks[BlockIndex], BlockDescription); - if (!CompressedBlock) - { - throw std::runtime_error(fmt::format("Failed generating block {}", BlockHash)); - } - ZEN_ASSERT(BlockDescription.BlockHash == BlockHash); - Payload = std::move(CompressedBlock).GetCompressed(); - } - - GeneratedBlockByteCount += NewBlocks.BlockSizes[BlockIndex]; - GeneratedBlockCount++; - if (GeneratedBlockCount == GenerateBlockIndexes.size()) - { - FilteredGenerateBlockBytesPerSecond.Stop(); - } - if (m_Options.IsVerbose) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "{} block {} ({}) containing {} chunks in {}", - NewBlocks.BlockHeaders[BlockIndex] ? "Regenerated" : "Generated", - NewBlocks.BlockDescriptions[BlockIndex].BlockHash, - NiceBytes(NewBlocks.BlockSizes[BlockIndex]), - NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size(), - NiceTimeSpanMs(GenerateTimer.GetElapsedTimeMs())); - } - if (!m_AbortFlag) - { - AsyncUploadBlock(BlockIndex, BlockHash, std::move(Payload), QueuedPendingInMemoryBlocksForUpload); - } - } - }); - } - } - - // Start compression of any non-precompressed loose chunks and schedule upload - for (const uint32_t LooseChunkOrderIndex : LooseChunkOrderIndexes) - { - const uint32_t ChunkIndex = LooseChunkIndexes[LooseChunkOrderIndex]; - Work.ScheduleWork( - ReadChunkPool, - [this, - &Content, - &Lookup, - &TempLooseChunksStats, - &LooseChunkOrderIndexes, - &FilteredCompressedBytesPerSecond, - &TempUploadStats, - &AsyncUploadLooseChunk, - ChunkIndex](std::atomic<bool>&) { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("UploadPartBlobs_CompressChunk"); - - FilteredCompressedBytesPerSecond.Start(); - Stopwatch CompressTimer; - CompositeBuffer Payload = CompressChunk(Content, Lookup, ChunkIndex, TempLooseChunksStats); - if (m_Options.IsVerbose) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Compressed chunk {} ({} -> {}) in {}", - Content.ChunkedContent.ChunkHashes[ChunkIndex], - NiceBytes(Content.ChunkedContent.ChunkRawSizes[ChunkIndex]), - NiceBytes(Payload.GetSize()), - NiceTimeSpanMs(CompressTimer.GetElapsedTimeMs())); - } - const uint64_t ChunkRawSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; - TempUploadStats.ReadFromDiskBytes += ChunkRawSize; - if (TempLooseChunksStats.CompressedChunkCount == LooseChunkOrderIndexes.size()) - { - FilteredCompressedBytesPerSecond.Stop(); - } - if (!m_AbortFlag) - { - AsyncUploadLooseChunk(Content.ChunkedContent.ChunkHashes[ChunkIndex], ChunkRawSize, std::move(Payload)); - } - } - }); - } - - Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(PendingWork); - FilteredCompressedBytesPerSecond.Update(TempLooseChunksStats.CompressedChunkRawBytes.load()); - FilteredGenerateBlockBytesPerSecond.Update(GeneratedBlockByteCount.load()); - FilteredUploadedBytesPerSecond.Update(UploadedCompressedChunkSize.load() + UploadedBlockSize.load()); - uint64_t UploadedRawSize = UploadedRawChunkSize.load() + UploadedBlockSize.load(); - uint64_t UploadedCompressedSize = UploadedCompressedChunkSize.load() + UploadedBlockSize.load(); - - std::string Details = fmt::format( - "Compressed {}/{} ({}/{}{}) chunks. " - "Uploaded {}/{} ({}/{}) blobs " - "({}{})", - TempLooseChunksStats.CompressedChunkCount.load(), - LooseChunkOrderIndexes.size(), - NiceBytes(TempLooseChunksStats.CompressedChunkRawBytes), - NiceBytes(TotalLooseChunksSize), - (TempLooseChunksStats.CompressedChunkCount == LooseChunkOrderIndexes.size()) - ? "" - : fmt::format(" {}B/s", NiceNum(FilteredCompressedBytesPerSecond.GetCurrent())), - - UploadedBlockCount.load() + UploadedChunkCount.load(), - UploadBlockCount + UploadChunkCount, - NiceBytes(UploadedRawSize), - NiceBytes(TotalRawSize), - - NiceBytes(UploadedCompressedSize), - (UploadedBlockCount == UploadBlockCount && UploadedChunkCount == UploadChunkCount) - ? "" - : fmt::format(" {}bits/s", NiceNum(FilteredUploadedBytesPerSecond.GetCurrent()))); - - Progress.UpdateState({.Task = "Uploading blobs ", - .Details = Details, - .TotalCount = gsl::narrow<uint64_t>(TotalRawSize), - .RemainingCount = gsl::narrow<uint64_t>(TotalRawSize - UploadedRawSize), - .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, - false); - }); - - ZEN_ASSERT(m_AbortFlag || QueuedPendingInMemoryBlocksForUpload.load() == 0); - - Progress.Finish(); - - TempUploadStats.ElapsedWallTimeUS += FilteredUploadedBytesPerSecond.GetElapsedTimeUS(); - TempLooseChunksStats.CompressChunksElapsedWallTimeUS += FilteredCompressedBytesPerSecond.GetElapsedTimeUS(); - } -} - -CompositeBuffer -BuildsOperationUploadFolder::CompressChunk(const ChunkedFolderContent& Content, - const ChunkedContentLookup& Lookup, - uint32_t ChunkIndex, - LooseChunksStatistics& TempLooseChunksStats) -{ - ZEN_TRACE_CPU("CompressChunk"); - ZEN_ASSERT(!m_Options.TempDir.empty()); - const IoHash& ChunkHash = Content.ChunkedContent.ChunkHashes[ChunkIndex]; - const uint64_t ChunkSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex]; - - const ChunkedContentLookup::ChunkSequenceLocation& Source = GetChunkSequenceLocations(Lookup, ChunkIndex)[0]; - const std::uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[Source.SequenceIndex]; - IoBuffer RawSource = IoBufferBuilder::MakeFromFile((m_Path / Content.Paths[PathIndex]).make_preferred(), Source.Offset, ChunkSize); - if (!RawSource) - { - throw std::runtime_error(fmt::format("Failed fetching chunk {}", ChunkHash)); - } - if (RawSource.GetSize() != ChunkSize) - { - throw std::runtime_error(fmt::format("Fetched chunk {} has invalid size", ChunkHash)); - } - - const bool ShouldCompressChunk = IsChunkCompressable(m_NonCompressableExtensionHashes, Content, Lookup, ChunkIndex); - const OodleCompressionLevel CompressionLevel = ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None; - - if (ShouldCompressChunk) - { - std::filesystem::path TempFilePath = m_Options.TempDir / ChunkHash.ToHexString(); - - BasicFile CompressedFile; - std::error_code Ec; - CompressedFile.Open(TempFilePath, BasicFile::Mode::kTruncateDelete, Ec); - if (Ec) - { - throw std::runtime_error(fmt::format("Failed creating temporary file for compressing blob {}, reason: ({}) {}", - ChunkHash, - Ec.value(), - Ec.message())); - } - - uint64_t StreamRawBytes = 0; - uint64_t StreamCompressedBytes = 0; - - bool CouldCompress = CompressedBuffer::CompressToStream( - CompositeBuffer(SharedBuffer(RawSource)), - [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { - ZEN_UNUSED(SourceOffset); - TempLooseChunksStats.CompressedChunkRawBytes += SourceSize; - CompressedFile.Write(RangeBuffer, Offset); - TempLooseChunksStats.CompressedChunkBytes += RangeBuffer.GetSize(); - StreamRawBytes += SourceSize; - StreamCompressedBytes += RangeBuffer.GetSize(); - }, - OodleCompressor::Mermaid, - CompressionLevel); - if (CouldCompress) - { - uint64_t CompressedSize = CompressedFile.FileSize(); - void* FileHandle = CompressedFile.Detach(); - IoBuffer TempPayload = IoBuffer(IoBuffer::File, - FileHandle, - 0, - CompressedSize, - /*IsWholeFile*/ true); - ZEN_ASSERT(TempPayload); - TempPayload.SetDeleteOnClose(true); - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(TempPayload), RawHash, RawSize); - ZEN_ASSERT(Compressed); - ZEN_ASSERT(RawHash == ChunkHash); - ZEN_ASSERT(RawSize == ChunkSize); - - TempLooseChunksStats.CompressedChunkCount++; - - return Compressed.GetCompressed(); - } - else - { - TempLooseChunksStats.CompressedChunkRawBytes -= StreamRawBytes; - TempLooseChunksStats.CompressedChunkBytes -= StreamCompressedBytes; - } - CompressedFile.Close(); - RemoveFile(TempFilePath, Ec); - ZEN_UNUSED(Ec); - } - - CompressedBuffer CompressedBlob = - CompressedBuffer::Compress(SharedBuffer(std::move(RawSource)), OodleCompressor::Mermaid, CompressionLevel); - if (!CompressedBlob) - { - throw std::runtime_error(fmt::format("Failed to compress large blob {}", ChunkHash)); - } - ZEN_ASSERT_SLOW(CompressedBlob.DecodeRawHash() == ChunkHash); - ZEN_ASSERT_SLOW(CompressedBlob.DecodeRawSize() == ChunkSize); - - TempLooseChunksStats.CompressedChunkRawBytes += ChunkSize; - TempLooseChunksStats.CompressedChunkBytes += CompressedBlob.GetCompressedSize(); - - // If we use none-compression, the compressed blob references the data and has 64 kb in memory so we don't need to write it to disk - if (ShouldCompressChunk) - { - std::filesystem::path TempFilePath = m_Options.TempDir / (ChunkHash.ToHexString()); - IoBuffer TempPayload = WriteToTempFile(std::move(CompressedBlob).GetCompressed(), TempFilePath); - CompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(TempPayload)); - } - - TempLooseChunksStats.CompressedChunkCount++; - return std::move(CompressedBlob).GetCompressed(); -} - -BuildsOperationValidateBuildPart::BuildsOperationValidateBuildPart(OperationLogOutput& OperationLogOutput, - BuildStorageBase& Storage, - std::atomic<bool>& AbortFlag, - std::atomic<bool>& PauseFlag, - WorkerThreadPool& IOWorkerPool, - WorkerThreadPool& NetworkPool, - const Oid& BuildId, - const Oid& BuildPartId, - const std::string_view BuildPartName, - const Options& Options) - -: m_LogOutput(OperationLogOutput) -, m_Storage(Storage) -, m_AbortFlag(AbortFlag) -, m_PauseFlag(PauseFlag) -, m_IOWorkerPool(IOWorkerPool) -, m_NetworkPool(NetworkPool) -, m_BuildId(BuildId) -, m_BuildPartId(BuildPartId) -, m_BuildPartName(BuildPartName) -, m_Options(Options) -{ -} - -void -BuildsOperationValidateBuildPart::Execute() -{ - ZEN_TRACE_CPU("ValidateBuildPart"); - try - { - enum class TaskSteps : uint32_t - { - FetchBuild, - FetchBuildPart, - ValidateBlobs, - Cleanup, - StepCount - }; - - auto EndProgress = - MakeGuard([&]() { m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::StepCount, (uint32_t)TaskSteps::StepCount); }); - - Stopwatch Timer; - auto _ = MakeGuard([&]() { - if (!m_Options.IsQuiet) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Validated build part {}/{} ('{}') in {}", - m_BuildId, - m_BuildPartId, - m_BuildPartName, - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - } - }); - - m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::FetchBuild, (uint32_t)TaskSteps::StepCount); - - CbObject Build = m_Storage.GetBuild(m_BuildId); - if (!m_BuildPartName.empty()) - { - m_BuildPartId = Build["parts"sv].AsObjectView()[m_BuildPartName].AsObjectId(); - if (m_BuildPartId == Oid::Zero) - { - throw std::runtime_error(fmt::format("Build {} does not have a part named '{}'", m_BuildId, m_BuildPartName)); - } - } - m_ValidateStats.BuildBlobSize = Build.GetSize(); - uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u; - if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0) - { - PreferredMultipartChunkSize = ChunkSize; - } - - m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::FetchBuildPart, (uint32_t)TaskSteps::StepCount); - - CbObject BuildPart = m_Storage.GetBuildPart(m_BuildId, m_BuildPartId); - m_ValidateStats.BuildPartSize = BuildPart.GetSize(); - if (!m_Options.IsQuiet) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Validating build part {}/{} ({})", - m_BuildId, - m_BuildPartId, - NiceBytes(BuildPart.GetSize())); - } - std::vector<IoHash> ChunkAttachments; - if (const CbObjectView ChunkAttachmentsView = BuildPart["chunkAttachments"sv].AsObjectView()) - { - for (CbFieldView LooseFileView : ChunkAttachmentsView["rawHashes"sv]) - { - ChunkAttachments.push_back(LooseFileView.AsBinaryAttachment()); - } - } - m_ValidateStats.ChunkAttachmentCount = ChunkAttachments.size(); - std::vector<IoHash> BlockAttachments; - if (const CbObjectView BlockAttachmentsView = BuildPart["blockAttachments"sv].AsObjectView()) - { - { - for (CbFieldView BlocksView : BlockAttachmentsView["rawHashes"sv]) - { - BlockAttachments.push_back(BlocksView.AsBinaryAttachment()); - } - } - } - m_ValidateStats.BlockAttachmentCount = BlockAttachments.size(); - - std::vector<ChunkBlockDescription> VerifyBlockDescriptions = - ParseChunkBlockDescriptionList(m_Storage.GetBlockMetadatas(m_BuildId, BlockAttachments)); - if (VerifyBlockDescriptions.size() != BlockAttachments.size()) - { - throw std::runtime_error(fmt::format("Uploaded blocks metadata could not all be found, {} blocks metadata is missing", - BlockAttachments.size() - VerifyBlockDescriptions.size())); - } - - ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - - const std::filesystem::path TempFolder = ".zen-tmp"; - - CleanAndRemoveDirectory(m_IOWorkerPool, m_AbortFlag, m_PauseFlag, TempFolder); - CreateDirectories(TempFolder); - auto __ = MakeGuard([this, TempFolder]() { CleanAndRemoveDirectory(m_IOWorkerPool, m_AbortFlag, m_PauseFlag, TempFolder); }); - - m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::ValidateBlobs, (uint32_t)TaskSteps::StepCount); - - std::unique_ptr<OperationLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Validate Blobs")); - OperationLogOutput::ProgressBar& Progress(*ProgressBarPtr); - - uint64_t AttachmentsToVerifyCount = ChunkAttachments.size() + BlockAttachments.size(); - FilteredRate FilteredDownloadedBytesPerSecond; - FilteredRate FilteredVerifiedBytesPerSecond; - - std::atomic<uint64_t> MultipartAttachmentCount = 0; - - for (const IoHash& ChunkAttachment : ChunkAttachments) - { - Work.ScheduleWork( - m_NetworkPool, - [this, - &Work, - AttachmentsToVerifyCount, - &TempFolder, - PreferredMultipartChunkSize, - &FilteredDownloadedBytesPerSecond, - &FilteredVerifiedBytesPerSecond, - &ChunkAttachments, - ChunkAttachment = IoHash(ChunkAttachment)](std::atomic<bool>&) { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("ValidateBuildPart_GetChunk"); - - FilteredDownloadedBytesPerSecond.Start(); - DownloadLargeBlob( - m_Storage, - TempFolder, - m_BuildId, - ChunkAttachment, - PreferredMultipartChunkSize, - Work, - m_NetworkPool, - m_DownloadStats.DownloadedChunkByteCount, - m_DownloadStats.MultipartAttachmentCount, - [this, - &Work, - AttachmentsToVerifyCount, - &FilteredDownloadedBytesPerSecond, - &FilteredVerifiedBytesPerSecond, - ChunkHash = IoHash(ChunkAttachment)](IoBuffer&& Payload) { - m_DownloadStats.DownloadedChunkCount++; - Payload.SetContentType(ZenContentType::kCompressedBinary); - if (!m_AbortFlag) - { - Work.ScheduleWork( - m_IOWorkerPool, - [this, - AttachmentsToVerifyCount, - &FilteredDownloadedBytesPerSecond, - &FilteredVerifiedBytesPerSecond, - Payload = IoBuffer(std::move(Payload)), - ChunkHash](std::atomic<bool>&) mutable { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("ValidateBuildPart_Validate"); - - if (m_DownloadStats.DownloadedChunkCount + m_DownloadStats.DownloadedBlockCount == - AttachmentsToVerifyCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - - FilteredVerifiedBytesPerSecond.Start(); - - uint64_t CompressedSize; - uint64_t DecompressedSize; - ValidateBlob(m_AbortFlag, std::move(Payload), ChunkHash, CompressedSize, DecompressedSize); - m_ValidateStats.VerifiedAttachmentCount++; - m_ValidateStats.VerifiedByteCount += DecompressedSize; - if (m_ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount) - { - FilteredVerifiedBytesPerSecond.Stop(); - } - } - }); - } - }); - } - }); - } - - for (const IoHash& BlockAttachment : BlockAttachments) - { - Work.ScheduleWork( - m_NetworkPool, - [this, - &Work, - AttachmentsToVerifyCount, - &FilteredDownloadedBytesPerSecond, - &FilteredVerifiedBytesPerSecond, - BlockAttachment = IoHash(BlockAttachment)](std::atomic<bool>&) { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("ValidateBuildPart_GetBlock"); - - FilteredDownloadedBytesPerSecond.Start(); - IoBuffer Payload = m_Storage.GetBuildBlob(m_BuildId, BlockAttachment); - m_DownloadStats.DownloadedBlockCount++; - m_DownloadStats.DownloadedBlockByteCount += Payload.GetSize(); - if (m_DownloadStats.DownloadedChunkCount + m_DownloadStats.DownloadedBlockCount == AttachmentsToVerifyCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - if (!Payload) - { - throw std::runtime_error(fmt::format("Block attachment {} could not be found", BlockAttachment)); - } - if (!m_AbortFlag) - { - Work.ScheduleWork( - m_IOWorkerPool, - [this, - &FilteredVerifiedBytesPerSecond, - AttachmentsToVerifyCount, - Payload = std::move(Payload), - BlockAttachment](std::atomic<bool>&) mutable { - if (!m_AbortFlag) - { - ZEN_TRACE_CPU("ValidateBuildPart_ValidateBlock"); - - FilteredVerifiedBytesPerSecond.Start(); - - uint64_t CompressedSize; - uint64_t DecompressedSize; - ValidateChunkBlock(std::move(Payload), BlockAttachment, CompressedSize, DecompressedSize); - m_ValidateStats.VerifiedAttachmentCount++; - m_ValidateStats.VerifiedByteCount += DecompressedSize; - if (m_ValidateStats.VerifiedAttachmentCount.load() == AttachmentsToVerifyCount) - { - FilteredVerifiedBytesPerSecond.Stop(); - } - } - }); - } - } - }); - } - - Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(PendingWork); - - const uint64_t DownloadedAttachmentCount = m_DownloadStats.DownloadedChunkCount + m_DownloadStats.DownloadedBlockCount; - const uint64_t DownloadedByteCount = m_DownloadStats.DownloadedChunkByteCount + m_DownloadStats.DownloadedBlockByteCount; - - FilteredDownloadedBytesPerSecond.Update(DownloadedByteCount); - FilteredVerifiedBytesPerSecond.Update(m_ValidateStats.VerifiedByteCount); - - std::string Details = fmt::format("Downloaded {}/{} ({}, {}bits/s). Verified {}/{} ({}, {}B/s)", - DownloadedAttachmentCount, - AttachmentsToVerifyCount, - NiceBytes(DownloadedByteCount), - NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8), - m_ValidateStats.VerifiedAttachmentCount.load(), - AttachmentsToVerifyCount, - NiceBytes(m_ValidateStats.VerifiedByteCount.load()), - NiceNum(FilteredVerifiedBytesPerSecond.GetCurrent())); - - Progress.UpdateState( - {.Task = "Validating blobs ", - .Details = Details, - .TotalCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2), - .RemainingCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2 - - (DownloadedAttachmentCount + m_ValidateStats.VerifiedAttachmentCount.load())), - .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, - false); - }); - - Progress.Finish(); - m_ValidateStats.ElapsedWallTimeUS = Timer.GetElapsedTimeUs(); - - m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::Cleanup, (uint32_t)TaskSteps::StepCount); - } - catch (const std::exception&) - { - m_AbortFlag = true; - throw; - } -} - -BuildsOperationPrimeCache::BuildsOperationPrimeCache(OperationLogOutput& OperationLogOutput, - StorageInstance& Storage, - std::atomic<bool>& AbortFlag, - std::atomic<bool>& PauseFlag, - WorkerThreadPool& NetworkPool, - const Oid& BuildId, - std::span<const Oid> BuildPartIds, - const Options& Options, - BuildStorageCache::Statistics& StorageCacheStats) -: m_LogOutput(OperationLogOutput) -, m_Storage(Storage) -, m_AbortFlag(AbortFlag) -, m_PauseFlag(PauseFlag) -, m_NetworkPool(NetworkPool) -, m_BuildId(BuildId) -, m_BuildPartIds(BuildPartIds.begin(), BuildPartIds.end()) -, m_Options(Options) -, m_StorageCacheStats(StorageCacheStats) -{ - m_TempPath = m_Options.ZenFolderPath / "tmp"; - CreateDirectories(m_TempPath); -} - -void -BuildsOperationPrimeCache::Execute() -{ - ZEN_TRACE_CPU("BuildsOperationPrimeCache::Execute"); - - Stopwatch PrimeTimer; - - tsl::robin_map<IoHash, uint64_t, IoHash::Hasher> LooseChunkRawSizes; - - tsl::robin_set<IoHash, IoHash::Hasher> BuildBlobs; - - for (const Oid& BuildPartId : m_BuildPartIds) - { - CbObject BuildPart = m_Storage.BuildStorage->GetBuildPart(m_BuildId, BuildPartId); - - CbObjectView BlockAttachmentsView = BuildPart["blockAttachments"sv].AsObjectView(); - std::vector<IoHash> BlockAttachments = compactbinary_helpers::ReadBinaryAttachmentArray("rawHashes"sv, BlockAttachmentsView); - - CbObjectView ChunkAttachmentsView = BuildPart["chunkAttachments"sv].AsObjectView(); - std::vector<IoHash> ChunkAttachments = compactbinary_helpers::ReadBinaryAttachmentArray("rawHashes"sv, ChunkAttachmentsView); - std::vector<uint64_t> ChunkRawSizes = compactbinary_helpers::ReadArray<uint64_t>("chunkRawSizes"sv, ChunkAttachmentsView); - if (ChunkAttachments.size() != ChunkRawSizes.size()) - { - throw std::runtime_error(fmt::format("Mismatch of loose chunk raw size array, expected {}, found {}", - ChunkAttachments.size(), - ChunkRawSizes.size())); - } - - BuildBlobs.reserve(ChunkAttachments.size() + BlockAttachments.size()); - BuildBlobs.insert(BlockAttachments.begin(), BlockAttachments.end()); - BuildBlobs.insert(ChunkAttachments.begin(), ChunkAttachments.end()); - - for (size_t ChunkAttachmentIndex = 0; ChunkAttachmentIndex < ChunkAttachments.size(); ChunkAttachmentIndex++) - { - LooseChunkRawSizes.insert_or_assign(ChunkAttachments[ChunkAttachmentIndex], ChunkRawSizes[ChunkAttachmentIndex]); - } - } - - if (!m_Options.IsQuiet) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, "Found {} referenced blobs", BuildBlobs.size()); - } - - if (BuildBlobs.empty()) - { - return; - } - - std::vector<IoHash> BlobsToDownload; - BlobsToDownload.reserve(BuildBlobs.size()); - - if (m_Storage.CacheStorage && !BuildBlobs.empty() && !m_Options.ForceUpload) - { - ZEN_TRACE_CPU("BlobCacheExistCheck"); - Stopwatch Timer; - - const std::vector<IoHash> BlobHashes(BuildBlobs.begin(), BuildBlobs.end()); - const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult = - m_Storage.CacheStorage->BlobsExists(m_BuildId, BlobHashes); - - if (CacheExistsResult.size() == BlobHashes.size()) - { - for (size_t BlobIndex = 0; BlobIndex < BlobHashes.size(); BlobIndex++) - { - if (!CacheExistsResult[BlobIndex].HasBody) - { - BlobsToDownload.push_back(BlobHashes[BlobIndex]); - } - } - size_t FoundCount = BuildBlobs.size() - BlobsToDownload.size(); - - if (FoundCount > 0 && !m_Options.IsQuiet) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Remote cache : Found {} out of {} needed blobs in {}", - FoundCount, - BuildBlobs.size(), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - } - } - } - else - { - BlobsToDownload.insert(BlobsToDownload.end(), BuildBlobs.begin(), BuildBlobs.end()); - } - - if (BlobsToDownload.empty()) - { - return; - } - - std::atomic<uint64_t> MultipartAttachmentCount; - std::atomic<size_t> CompletedDownloadCount; - FilteredRate FilteredDownloadedBytesPerSecond; - - { - std::unique_ptr<OperationLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Downloading")); - OperationLogOutput::ProgressBar& Progress(*ProgressBarPtr); - - ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - - const size_t BlobCount = BlobsToDownload.size(); - - for (size_t BlobIndex = 0; BlobIndex < BlobCount; BlobIndex++) - { - Work.ScheduleWork(m_NetworkPool, - [this, - &Work, - &BlobsToDownload, - BlobCount, - &LooseChunkRawSizes, - &CompletedDownloadCount, - &FilteredDownloadedBytesPerSecond, - &MultipartAttachmentCount, - BlobIndex](std::atomic<bool>&) { - if (!m_AbortFlag) - { - const IoHash& BlobHash = BlobsToDownload[BlobIndex]; - - bool IsLargeBlob = false; - - if (auto It = LooseChunkRawSizes.find(BlobHash); It != LooseChunkRawSizes.end()) - { - IsLargeBlob = It->second >= m_Options.LargeAttachmentSize; - } - - FilteredDownloadedBytesPerSecond.Start(); - - if (IsLargeBlob) - { - DownloadLargeBlob( - *m_Storage.BuildStorage, - m_TempPath, - m_BuildId, - BlobHash, - m_Options.PreferredMultipartChunkSize, - Work, - m_NetworkPool, - m_DownloadStats.DownloadedChunkByteCount, - MultipartAttachmentCount, - [this, BlobCount, BlobHash, &FilteredDownloadedBytesPerSecond, &CompletedDownloadCount]( - IoBuffer&& Payload) { - m_DownloadStats.DownloadedChunkCount++; - m_DownloadStats.RequestsCompleteCount++; - - if (!m_AbortFlag) - { - if (Payload && m_Storage.CacheStorage) - { - m_Storage.CacheStorage->PutBuildBlob(m_BuildId, - BlobHash, - ZenContentType::kCompressedBinary, - CompositeBuffer(SharedBuffer(Payload))); - } - } - CompletedDownloadCount++; - if (CompletedDownloadCount == BlobCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - }); - } - else - { - IoBuffer Payload = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlobHash); - m_DownloadStats.DownloadedBlockCount++; - m_DownloadStats.DownloadedBlockByteCount += Payload.GetSize(); - m_DownloadStats.RequestsCompleteCount++; - - if (!m_AbortFlag) - { - if (Payload && m_Storage.CacheStorage) - { - m_Storage.CacheStorage->PutBuildBlob(m_BuildId, - BlobHash, - ZenContentType::kCompressedBinary, - CompositeBuffer(SharedBuffer(std::move(Payload)))); - } - } - CompletedDownloadCount++; - if (CompletedDownloadCount == BlobCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - } - } - }); - } - - Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(PendingWork); - - uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() + m_DownloadStats.DownloadedBlockByteCount.load(); - FilteredDownloadedBytesPerSecond.Update(DownloadedBytes); - - std::string DownloadRateString = (CompletedDownloadCount == BlobCount) - ? "" - : fmt::format(" {}bits/s", NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8)); - std::string UploadDetails = m_Storage.CacheStorage ? fmt::format(" {} ({}) uploaded.", - m_StorageCacheStats.PutBlobCount.load(), - NiceBytes(m_StorageCacheStats.PutBlobByteCount.load())) - : ""; - - std::string Details = fmt::format("{}/{} ({}{}) downloaded.{}", - CompletedDownloadCount.load(), - BlobCount, - NiceBytes(DownloadedBytes), - DownloadRateString, - UploadDetails); - Progress.UpdateState({.Task = "Downloading", - .Details = Details, - .TotalCount = BlobCount, - .RemainingCount = BlobCount - CompletedDownloadCount.load(), - .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, - false); - }); - - FilteredDownloadedBytesPerSecond.Stop(); - - Progress.Finish(); - } - if (m_AbortFlag) - { - return; - } - - if (m_Storage.CacheStorage) - { - m_Storage.CacheStorage->Flush(m_LogOutput.GetProgressUpdateDelayMS(), [this](intptr_t Remaining) -> bool { - ZEN_UNUSED(Remaining); - if (!m_Options.IsQuiet) - { - ZEN_OPERATION_LOG_INFO(m_LogOutput, "Waiting for {} blobs to finish upload to '{}'", Remaining, m_Storage.CacheHost.Name); - } - return !m_AbortFlag; - }); - } - - if (!m_Options.IsQuiet) - { - uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() + m_DownloadStats.DownloadedBlockByteCount.load(); - ZEN_OPERATION_LOG_INFO(m_LogOutput, - "Downloaded {} ({}bits/s) in {}. {} as multipart. Completed in {}", - NiceBytes(DownloadedBytes), - NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), DownloadedBytes * 8)), - NiceTimeSpanMs(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS() / 1000), - MultipartAttachmentCount.load(), - NiceTimeSpanMs(PrimeTimer.GetElapsedTimeMs())); - } -} - -CompositeBuffer -ValidateBlob(std::atomic<bool>& AbortFlag, - BuildStorageBase& Storage, - const Oid& BuildId, - const IoHash& BlobHash, - uint64_t& OutCompressedSize, - uint64_t& OutDecompressedSize) -{ - ZEN_TRACE_CPU("ValidateBlob"); - IoBuffer Payload = Storage.GetBuildBlob(BuildId, BlobHash); - if (!Payload) - { - throw std::runtime_error(fmt::format("Blob {} could not be found", BlobHash)); - } - return ValidateBlob(AbortFlag, std::move(Payload), BlobHash, OutCompressedSize, OutDecompressedSize); -} - -ChunkBlockDescription -BuildsOperationValidateBuildPart::ValidateChunkBlock(IoBuffer&& Payload, - const IoHash& BlobHash, - uint64_t& OutCompressedSize, - uint64_t& OutDecompressedSize) -{ - CompositeBuffer BlockBuffer = ValidateBlob(m_AbortFlag, std::move(Payload), BlobHash, OutCompressedSize, OutDecompressedSize); - if (!BlockBuffer) - { - throw std::runtime_error(fmt::format("Chunk block blob {} is not compressed using 'None' compression level", BlobHash)); - } - return GetChunkBlockDescription(BlockBuffer.Flatten(), BlobHash); -} - -std::vector<std::pair<Oid, std::string>> -ResolveBuildPartNames(CbObjectView BuildObject, - const Oid& BuildId, - const std::vector<Oid>& BuildPartIds, - std::span<const std::string> BuildPartNames, - std::uint64_t& OutPreferredMultipartChunkSize) -{ - std::vector<std::pair<Oid, std::string>> Result; - { - CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView(); - if (!PartsObject) - { - throw std::runtime_error("Build object does not have a 'parts' object"); - } - - OutPreferredMultipartChunkSize = BuildObject["chunkSize"sv].AsUInt64(OutPreferredMultipartChunkSize); - - std::vector<std::pair<Oid, std::string>> AvailableParts; - - for (CbFieldView PartView : PartsObject) - { - const std::string BuildPartName = std::string(PartView.GetName()); - const Oid BuildPartId = PartView.AsObjectId(); - if (BuildPartId == Oid::Zero) - { - ExtendableStringBuilder<128> SB; - for (CbFieldView ScanPartView : PartsObject) - { - SB.Append(fmt::format("\n {}: {}", ScanPartView.GetName(), ScanPartView.AsObjectId())); - } - throw std::runtime_error(fmt::format("Build object parts does not have a '{}' object id{}", BuildPartName, SB.ToView())); - } - AvailableParts.push_back({BuildPartId, BuildPartName}); - } - - if (BuildPartIds.empty() && BuildPartNames.empty()) - { - Result = AvailableParts; - } - else - { - for (const std::string& BuildPartName : BuildPartNames) - { - if (auto It = std::find_if(AvailableParts.begin(), - AvailableParts.end(), - [&BuildPartName](const auto& Part) { return Part.second == BuildPartName; }); - It != AvailableParts.end()) - { - Result.push_back(*It); - } - else - { - throw std::runtime_error(fmt::format("Build {} object does not have a part named '{}'", BuildId, BuildPartName)); - } - } - for (const Oid& BuildPartId : BuildPartIds) - { - if (auto It = std::find_if(AvailableParts.begin(), - AvailableParts.end(), - [&BuildPartId](const auto& Part) { return Part.first == BuildPartId; }); - It != AvailableParts.end()) - { - Result.push_back(*It); - } - else - { - throw std::runtime_error(fmt::format("Build {} object does not have a part with id '{}'", BuildId, BuildPartId)); - } - } - } - - if (Result.empty()) - { - throw std::runtime_error(fmt::format("Build object does not have any parts", BuildId)); - } - } - return Result; -} - -ChunkedFolderContent -GetRemoteContent(OperationLogOutput& Output, - StorageInstance& Storage, - const Oid& BuildId, - const std::vector<std::pair<Oid, std::string>>& BuildParts, - const BuildManifest& Manifest, - std::span<const std::string> IncludeWildcards, - std::span<const std::string> ExcludeWildcards, - std::unique_ptr<ChunkingController>& OutChunkController, - std::vector<ChunkedFolderContent>& OutPartContents, - std::vector<ChunkBlockDescription>& OutBlockDescriptions, - std::vector<IoHash>& OutLooseChunkHashes, - bool IsQuiet, - bool IsVerbose, - bool DoExtraContentVerify) -{ - ZEN_TRACE_CPU("GetRemoteContent"); - - Stopwatch GetBuildPartTimer; - const Oid BuildPartId = BuildParts[0].first; - const std::string_view BuildPartName = BuildParts[0].second; - CbObject BuildPartManifest = Storage.BuildStorage->GetBuildPart(BuildId, BuildPartId); - if (!IsQuiet) - { - ZEN_OPERATION_LOG_INFO(Output, - "GetBuildPart {} ('{}') took {}. Payload size: {}", - BuildPartId, - BuildPartName, - NiceTimeSpanMs(GetBuildPartTimer.GetElapsedTimeMs()), - NiceBytes(BuildPartManifest.GetSize())); - ZEN_OPERATION_LOG_INFO(Output, "{}", GetCbObjectAsNiceString(BuildPartManifest, " "sv, "\n"sv)); - } - - { - CbObjectView Chunker = BuildPartManifest["chunker"sv].AsObjectView(); - std::string_view ChunkerName = Chunker["name"sv].AsString(); - CbObjectView Parameters = Chunker["parameters"sv].AsObjectView(); - OutChunkController = CreateChunkingController(ChunkerName, Parameters); - } - - auto ParseBuildPartManifest = [&Output, IsQuiet, IsVerbose, DoExtraContentVerify]( - StorageInstance& Storage, - const Oid& BuildId, - const Oid& BuildPartId, - CbObject BuildPartManifest, - std::span<const std::string> IncludeWildcards, - std::span<const std::string> ExcludeWildcards, - const BuildManifest::Part* OptionalManifest, - ChunkedFolderContent& OutRemoteContent, - std::vector<ChunkBlockDescription>& OutBlockDescriptions, - std::vector<IoHash>& OutLooseChunkHashes) { - std::vector<uint32_t> AbsoluteChunkOrders; - std::vector<uint64_t> LooseChunkRawSizes; - std::vector<IoHash> BlockRawHashes; - - ReadBuildContentFromCompactBinary(BuildPartManifest, - OutRemoteContent.Platform, - OutRemoteContent.Paths, - OutRemoteContent.RawHashes, - OutRemoteContent.RawSizes, - OutRemoteContent.Attributes, - OutRemoteContent.ChunkedContent.SequenceRawHashes, - OutRemoteContent.ChunkedContent.ChunkCounts, - AbsoluteChunkOrders, - OutLooseChunkHashes, - LooseChunkRawSizes, - BlockRawHashes); - - // TODO: GetBlockDescriptions for all BlockRawHashes in one go - check for local block descriptions when we cache them - - { - if (!IsQuiet) - { - ZEN_OPERATION_LOG_INFO(Output, "Fetching metadata for {} blocks", BlockRawHashes.size()); - } - - Stopwatch GetBlockMetadataTimer; - - bool AttemptFallback = false; - OutBlockDescriptions = GetBlockDescriptions(Output, - *Storage.BuildStorage, - Storage.CacheStorage.get(), - BuildId, - BlockRawHashes, - AttemptFallback, - IsQuiet, - IsVerbose); - - if (!IsQuiet) - { - ZEN_OPERATION_LOG_INFO(Output, - "GetBlockMetadata for {} took {}. Found {} blocks", - BuildPartId, - NiceTimeSpanMs(GetBlockMetadataTimer.GetElapsedTimeMs()), - OutBlockDescriptions.size()); - } - } - - CalculateLocalChunkOrders(AbsoluteChunkOrders, - OutLooseChunkHashes, - LooseChunkRawSizes, - OutBlockDescriptions, - OutRemoteContent.ChunkedContent.ChunkHashes, - OutRemoteContent.ChunkedContent.ChunkRawSizes, - OutRemoteContent.ChunkedContent.ChunkOrders, - DoExtraContentVerify); - - std::vector<std::filesystem::path> DeletedPaths; - - if (OptionalManifest) - { - tsl::robin_set<std::string> PathsInManifest; - PathsInManifest.reserve(OptionalManifest->Files.size()); - for (const std::filesystem::path& ManifestPath : OptionalManifest->Files) - { - PathsInManifest.insert(ToLower(ManifestPath.generic_string())); - } - for (const std::filesystem::path& RemotePath : OutRemoteContent.Paths) - { - if (!PathsInManifest.contains(ToLower(RemotePath.generic_string()))) - { - DeletedPaths.push_back(RemotePath); - } - } - } - - if (!IncludeWildcards.empty() || !ExcludeWildcards.empty()) - { - for (const std::filesystem::path& RemotePath : OutRemoteContent.Paths) - { - if (!IncludePath(IncludeWildcards, ExcludeWildcards, ToLower(RemotePath.generic_string()), /*CaseSensitive*/ true)) - { - DeletedPaths.push_back(RemotePath); - } - } - } - - if (!DeletedPaths.empty()) - { - OutRemoteContent = DeletePathsFromChunkedContent(OutRemoteContent, DeletedPaths); - InlineRemoveUnusedHashes(OutLooseChunkHashes, OutRemoteContent.ChunkedContent.ChunkHashes); - } - -#if ZEN_BUILD_DEBUG - ValidateChunkedFolderContent(OutRemoteContent, OutBlockDescriptions, OutLooseChunkHashes, IncludeWildcards, ExcludeWildcards); -#endif // ZEN_BUILD_DEBUG - }; - - auto FindManifest = [&Manifest](const Oid& BuildPartId, std::string_view BuildPartName) -> const BuildManifest::Part* { - if (Manifest.Parts.empty()) - { - return nullptr; - } - if (Manifest.Parts.size() == 1) - { - if (Manifest.Parts[0].PartId == Oid::Zero && Manifest.Parts[0].PartName.empty()) - { - return &Manifest.Parts[0]; - } - } - - auto It = std::find_if(Manifest.Parts.begin(), Manifest.Parts.end(), [BuildPartId, BuildPartName](const BuildManifest::Part& Part) { - if (Part.PartId != Oid::Zero) - { - return Part.PartId == BuildPartId; - } - if (!Part.PartName.empty()) - { - return Part.PartName == BuildPartName; - } - return false; - }); - if (It != Manifest.Parts.end()) - { - return &(*It); - } - return nullptr; - }; - - OutPartContents.resize(1); - ParseBuildPartManifest(Storage, - BuildId, - BuildPartId, - BuildPartManifest, - IncludeWildcards, - ExcludeWildcards, - FindManifest(BuildPartId, BuildPartName), - OutPartContents[0], - OutBlockDescriptions, - OutLooseChunkHashes); - ChunkedFolderContent RemoteContent; - if (BuildParts.size() > 1) - { - std::vector<ChunkBlockDescription> OverlayBlockDescriptions; - std::vector<IoHash> OverlayLooseChunkHashes; - for (size_t PartIndex = 1; PartIndex < BuildParts.size(); PartIndex++) - { - const Oid& OverlayBuildPartId = BuildParts[PartIndex].first; - const std::string& OverlayBuildPartName = BuildParts[PartIndex].second; - Stopwatch GetOverlayBuildPartTimer; - CbObject OverlayBuildPartManifest = Storage.BuildStorage->GetBuildPart(BuildId, OverlayBuildPartId); - if (!IsQuiet) - { - ZEN_OPERATION_LOG_INFO(Output, - "GetBuildPart {} ('{}') took {}. Payload size: {}", - OverlayBuildPartId, - OverlayBuildPartName, - NiceTimeSpanMs(GetOverlayBuildPartTimer.GetElapsedTimeMs()), - NiceBytes(OverlayBuildPartManifest.GetSize())); - } - - ChunkedFolderContent OverlayPartContent; - std::vector<ChunkBlockDescription> OverlayPartBlockDescriptions; - std::vector<IoHash> OverlayPartLooseChunkHashes; - - ParseBuildPartManifest(Storage, - BuildId, - OverlayBuildPartId, - OverlayBuildPartManifest, - IncludeWildcards, - ExcludeWildcards, - FindManifest(OverlayBuildPartId, OverlayBuildPartName), - OverlayPartContent, - OverlayPartBlockDescriptions, - OverlayPartLooseChunkHashes); - OutPartContents.push_back(OverlayPartContent); - OverlayBlockDescriptions.insert(OverlayBlockDescriptions.end(), - OverlayPartBlockDescriptions.begin(), - OverlayPartBlockDescriptions.end()); - OverlayLooseChunkHashes.insert(OverlayLooseChunkHashes.end(), - OverlayPartLooseChunkHashes.begin(), - OverlayPartLooseChunkHashes.end()); - } - - RemoteContent = MergeChunkedFolderContents(OutPartContents[0], std::span<const ChunkedFolderContent>(OutPartContents).subspan(1)); - { - tsl::robin_set<IoHash> AllBlockHashes; - for (const ChunkBlockDescription& Description : OutBlockDescriptions) - { - AllBlockHashes.insert(Description.BlockHash); - } - for (const ChunkBlockDescription& Description : OverlayBlockDescriptions) - { - if (!AllBlockHashes.contains(Description.BlockHash)) - { - AllBlockHashes.insert(Description.BlockHash); - OutBlockDescriptions.push_back(Description); - } - } - } - { - tsl::robin_set<IoHash> AllLooseChunkHashes(OutLooseChunkHashes.begin(), OutLooseChunkHashes.end()); - for (const IoHash& OverlayLooseChunkHash : OverlayLooseChunkHashes) - { - if (!AllLooseChunkHashes.contains(OverlayLooseChunkHash)) - { - AllLooseChunkHashes.insert(OverlayLooseChunkHash); - OutLooseChunkHashes.push_back(OverlayLooseChunkHash); - } - } - } - } - else - { - RemoteContent = OutPartContents[0]; - } - return RemoteContent; -} - -std::string -GetCbObjectAsNiceString(CbObjectView Object, std::string_view Prefix, std::string_view Suffix) -{ - ExtendableStringBuilder<512> SB; - std::vector<std::pair<std::string, std::string>> NameStringValuePairs; - for (CbFieldView Field : Object) - { - std::string_view Name = Field.GetName(); - switch (CbValue Accessor = Field.GetValue(); Accessor.GetType()) - { - case CbFieldType::String: - NameStringValuePairs.push_back({std::string(Name), std::string(Accessor.AsString())}); - break; - case CbFieldType::IntegerPositive: - NameStringValuePairs.push_back({std::string(Name), fmt::format("{}", Accessor.AsIntegerPositive())}); - break; - case CbFieldType::IntegerNegative: - NameStringValuePairs.push_back({std::string(Name), fmt::format("{}", Accessor.AsIntegerNegative())}); - break; - case CbFieldType::Float32: - { - const float Value = Accessor.AsFloat32(); - if (std::isfinite(Value)) - { - NameStringValuePairs.push_back({std::string(Name), fmt::format("{:.9g}", Value)}); - } - else - { - NameStringValuePairs.push_back({std::string(Name), "null"}); - } - } - break; - case CbFieldType::Float64: - { - const double Value = Accessor.AsFloat64(); - if (std::isfinite(Value)) - { - NameStringValuePairs.push_back({std::string(Name), fmt::format("{:.17g}", Value)}); - } - else - { - NameStringValuePairs.push_back({std::string(Name), "null"}); - } - } - break; - case CbFieldType::BoolFalse: - NameStringValuePairs.push_back({std::string(Name), "false"}); - break; - case CbFieldType::BoolTrue: - NameStringValuePairs.push_back({std::string(Name), "true"}); - break; - case CbFieldType::Hash: - { - NameStringValuePairs.push_back({std::string(Name), Accessor.AsHash().ToHexString()}); - } - break; - case CbFieldType::Uuid: - { - StringBuilder<Oid::StringLength + 1> Builder; - Accessor.AsUuid().ToString(Builder); - NameStringValuePairs.push_back({std::string(Name), Builder.ToString()}); - } - break; - case CbFieldType::DateTime: - { - ExtendableStringBuilder<64> Builder; - Builder << DateTime(Accessor.AsDateTimeTicks()).ToIso8601(); - NameStringValuePairs.push_back({std::string(Name), Builder.ToString()}); - } - break; - case CbFieldType::TimeSpan: - { - ExtendableStringBuilder<64> Builder; - const TimeSpan Span(Accessor.AsTimeSpanTicks()); - if (Span.GetDays() == 0) - { - Builder << Span.ToString("%h:%m:%s.%n"); - } - else - { - Builder << Span.ToString("%d.%h:%m:%s.%n"); - } - NameStringValuePairs.push_back({std::string(Name), Builder.ToString()}); - break; - } - case CbFieldType::ObjectId: - NameStringValuePairs.push_back({std::string(Name), Accessor.AsObjectId().ToString()}); - break; - } - } - std::string::size_type LongestKey = 0; - for (const std::pair<std::string, std::string>& KeyValue : NameStringValuePairs) - { - LongestKey = Max(KeyValue.first.length(), LongestKey); - } - for (const std::pair<std::string, std::string>& KeyValue : NameStringValuePairs) - { - SB.Append(fmt::format("{}{:<{}}: {}{}", Prefix, KeyValue.first, LongestKey, KeyValue.second, Suffix)); - } - return SB.ToString(); -} - -#if ZEN_WITH_TESTS - -namespace buildstorageoperations_testutils { - struct TestState - { - TestState(const std::filesystem::path& InRootPath) - : RootPath(InRootPath) - , LogOutput(CreateStandardLogOutput(Log)) - , ChunkController(CreateStandardChunkingController(StandardChunkingControllerSettings{})) - , ChunkCache(CreateMemoryChunkingCache()) - , WorkerPool(2) - , NetworkPool(2) - { - } - - void Initialize() - { - StoragePath = RootPath / "storage"; - TempPath = RootPath / "temp"; - SystemRootDir = RootPath / "sysroot"; - ZenFolderPath = RootPath / ".zen"; - - CreateDirectories(TempPath); - CreateDirectories(StoragePath); - - Storage.BuildStorage = CreateFileBuildStorage(StoragePath, StorageStats, false); - } - - void CreateSourceData(const std::filesystem::path& Source, std::span<const std::string> Paths, std::span<const uint64_t> Sizes) - { - const std::filesystem::path SourcePath = RootPath / Source; - CreateDirectories(SourcePath); - for (size_t FileIndex = 0; FileIndex < Paths.size(); FileIndex++) - { - const std::string& FilePath = Paths[FileIndex]; - const uint64_t FileSize = Sizes[FileIndex]; - IoBuffer FileData = FileSize > 0 ? CreateSemiRandomBlob(FileSize) : IoBuffer{}; - WriteFile(SourcePath / FilePath, FileData); - } - } - - std::vector<std::pair<Oid, std::string>> Upload(const Oid& BuildId, - const Oid& BuildPartId, - const std::string_view BuildPartName, - const std::filesystem::path& Source, - const std::filesystem::path& ManifestPath) - { - const std::filesystem::path SourcePath = RootPath / Source; - CbObject MetaData; - BuildsOperationUploadFolder Upload(*LogOutput, - Storage, - AbortFlag, - PauseFlag, - WorkerPool, - NetworkPool, - BuildId, - SourcePath, - true, - MetaData, - BuildsOperationUploadFolder::Options{.TempDir = TempPath}); - return Upload.Execute(BuildPartId, BuildPartName, ManifestPath, *ChunkController, *ChunkCache); - } - - void ValidateUpload(const Oid& BuildId, const std::vector<std::pair<Oid, std::string>>& Parts) - { - for (auto Part : Parts) - { - BuildsOperationValidateBuildPart Validate(*LogOutput, - *Storage.BuildStorage, - AbortFlag, - PauseFlag, - WorkerPool, - NetworkPool, - BuildId, - Part.first, - Part.second, - BuildsOperationValidateBuildPart::Options{}); - Validate.Execute(); - } - } - - FolderContent Download(const Oid& BuildId, - const Oid& BuildPartId, - const std::string_view BuildPartName, - const std::filesystem::path& Target, - bool Append) - { - const std::filesystem::path TargetPath = RootPath / Target; - - CreateDirectories(TargetPath); - - uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u; - CbObject BuildObject = Storage.BuildStorage->GetBuild(BuildId); - std::vector<Oid> PartIds; - if (BuildPartId != Oid::Zero) - { - PartIds.push_back(BuildPartId); - } - std::vector<std::string> PartNames; - if (!BuildPartName.empty()) - { - PartNames.push_back(std::string(BuildPartName)); - } - std::vector<std::pair<Oid, std::string>> AllBuildParts = - ResolveBuildPartNames(BuildObject, BuildId, PartIds, PartNames, PreferredMultipartChunkSize); - - std::vector<ChunkedFolderContent> PartContents; - - std::vector<ChunkBlockDescription> BlockDescriptions; - std::vector<IoHash> LooseChunkHashes; - - ChunkedFolderContent RemoteContent = GetRemoteContent(*LogOutput, - Storage, - BuildId, - AllBuildParts, - {}, - {}, - {}, - ChunkController, - PartContents, - BlockDescriptions, - LooseChunkHashes, - /*IsQuiet*/ false, - /*IsVerbose*/ false, - /*DoExtraContentVerify*/ true); - - GetFolderContentStatistics LocalFolderScanStats; - - struct ContentVisitor : public GetDirectoryContentVisitor - { - virtual void AsyncVisitDirectory(const std::filesystem::path& RelativeRoot, DirectoryContent&& Content) - { - RwLock::ExclusiveLockScope _(ExistingPathsLock); - for (const std::filesystem::path& FileName : Content.FileNames) - { - if (RelativeRoot.empty()) - { - ExistingPaths.push_back(FileName); - } - else - { - ExistingPaths.push_back(RelativeRoot / FileName); - } - } - } - - RwLock ExistingPathsLock; - std::vector<std::filesystem::path> ExistingPaths; - } Visitor; - - Latch PendingWorkCount(1); - - GetDirectoryContent(TargetPath, - DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive, - Visitor, - WorkerPool, - PendingWorkCount); - - PendingWorkCount.CountDown(); - PendingWorkCount.Wait(); - - FolderContent CurrentLocalFolderState = GetValidFolderContent( - WorkerPool, - LocalFolderScanStats, - TargetPath, - Visitor.ExistingPaths, - [](uint64_t PathCount, uint64_t CompletedPathCount) { ZEN_UNUSED(PathCount, CompletedPathCount); }, - 1000, - AbortFlag, - PauseFlag); - - ChunkingStatistics LocalChunkingStats; - ChunkedFolderContent LocalContent = ChunkFolderContent( - LocalChunkingStats, - WorkerPool, - TargetPath, - CurrentLocalFolderState, - *ChunkController, - *ChunkCache, - 1000, - [&](bool IsAborted, bool IsPaused, std::ptrdiff_t) { ZEN_UNUSED(IsAborted, IsPaused); }, - AbortFlag, - PauseFlag); - - if (Append) - { - RemoteContent = ApplyChunkedContentOverlay(LocalContent, RemoteContent, {}, {}); - } - - const ChunkedContentLookup LocalLookup = BuildChunkedContentLookup(LocalContent); - const ChunkedContentLookup RemoteLookup = BuildChunkedContentLookup(RemoteContent); - - BuildsOperationUpdateFolder Download(*LogOutput, - Storage, - AbortFlag, - PauseFlag, - WorkerPool, - NetworkPool, - BuildId, - TargetPath, - LocalContent, - LocalLookup, - RemoteContent, - RemoteLookup, - BlockDescriptions, - LooseChunkHashes, - BuildsOperationUpdateFolder::Options{.SystemRootDir = SystemRootDir, - .ZenFolderPath = ZenFolderPath, - .ValidateCompletedSequences = true}); - FolderContent ResultingState; - Download.Execute(ResultingState); - - return ResultingState; - } - - void ValidateDownload(std::span<const std::string> Paths, - std::span<const uint64_t> Sizes, - const std::filesystem::path& Source, - const std::filesystem::path& Target, - const FolderContent& DownloadContent) - { - const std::filesystem::path SourcePath = RootPath / Source; - const std::filesystem::path TargetPath = RootPath / Target; - - CHECK_EQ(Paths.size(), DownloadContent.Paths.size()); - tsl::robin_map<std::string, uint64_t> ExpectedSizes; - tsl::robin_map<std::string, IoHash> ExpectedHashes; - for (size_t Index = 0; Index < Paths.size(); Index++) - { - const std::string LookupString = std::filesystem::path(Paths[Index]).generic_string(); - ExpectedSizes.insert_or_assign(LookupString, Sizes[Index]); - std::filesystem::path FilePath = SourcePath / Paths[Index]; - const IoHash SourceHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(FilePath.make_preferred())); - ExpectedHashes.insert_or_assign(LookupString, SourceHash); - } - for (size_t Index = 0; Index < DownloadContent.Paths.size(); Index++) - { - const std::string LookupString = std::filesystem::path(DownloadContent.Paths[Index]).generic_string(); - auto SizeIt = ExpectedSizes.find(LookupString); - CHECK_NE(SizeIt, ExpectedSizes.end()); - CHECK_EQ(SizeIt->second, DownloadContent.RawSizes[Index]); - std::filesystem::path FilePath = TargetPath / DownloadContent.Paths[Index]; - const IoHash DownloadedHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(FilePath.make_preferred())); - auto HashIt = ExpectedHashes.find(LookupString); - CHECK_NE(HashIt, ExpectedHashes.end()); - CHECK_EQ(HashIt->second, DownloadedHash); - } - } - - const std::filesystem::path RootPath; - std::filesystem::path StoragePath; - std::filesystem::path TempPath; - std::filesystem::path SystemRootDir; - std::filesystem::path ZenFolderPath; - - LoggerRef Log = ConsoleLog(); - std::unique_ptr<OperationLogOutput> LogOutput; - - std::unique_ptr<ChunkingController> ChunkController; - std::unique_ptr<ChunkingCache> ChunkCache; - - StorageInstance Storage; - BuildStorageBase::Statistics StorageStats; - - WorkerThreadPool WorkerPool; - WorkerThreadPool NetworkPool; - - std::atomic<bool> AbortFlag; - std::atomic<bool> PauseFlag; - }; - -} // namespace buildstorageoperations_testutils - -TEST_SUITE_BEGIN("remotestore.buildstorageoperations"); - -TEST_CASE("buildstorageoperations.upload.folder") -{ - using namespace buildstorageoperations_testutils; - - FastRandom BaseRandom; - - const size_t FileCount = 11; - - const std::string Paths[FileCount] = {{"file_1"}, - {"file_2.exe"}, - {"file_3.txt"}, - {"dir_1/dir1_file_1.exe"}, - {"dir_1/dir1_file_2.pdb"}, - {"dir_1/dir1_file_3.txt"}, - {"dir_2/dir2_dir1/dir2_dir1_file_1.exe"}, - {"dir_2/dir2_dir1/dir2_dir1_file_2.pdb"}, - {"dir_2/dir2_dir1/dir2_dir1_file_3.dll"}, - {"dir_2/dir2_dir2/dir2_dir2_file_1.txt"}, - {"dir_2/dir2_dir2/dir2_dir2_file_2.json"}}; - const uint64_t Sizes[FileCount] = - {6u * 1024u, 0, 798, 19u * 1024u, 7u * 1024u, 93, 31u * 1024u, 17u * 1024u, 13u * 1024u, 2u * 1024u, 3u * 1024u}; - - ScopedTemporaryDirectory SourceFolder; - TestState State(SourceFolder.Path()); - State.Initialize(); - State.CreateSourceData("source", Paths, Sizes); - - const Oid BuildId = Oid::NewOid(); - const Oid BuildPartId = Oid::NewOid(); - const std::string BuildPartName = "default"; - - auto Result = State.Upload(BuildId, BuildPartId, BuildPartName, "source", {}); - - CHECK_EQ(Result.size(), 1u); - CHECK_EQ(Result[0].first, BuildPartId); - CHECK_EQ(Result[0].second, BuildPartName); - State.ValidateUpload(BuildId, Result); - - FolderContent DownloadContent = State.Download(BuildId, Oid::Zero, {}, "download", /* Append */ false); - CHECK_EQ(DownloadContent.Paths.size(), FileCount); - State.ValidateDownload(Paths, Sizes, "source", "download", DownloadContent); -} - -TEST_CASE("buildstorageoperations.upload.manifest") -{ - using namespace buildstorageoperations_testutils; - - FastRandom BaseRandom; - - const size_t FileCount = 11; - - const std::string Paths[FileCount] = {{"file_1"}, - {"file_2.exe"}, - {"file_3.txt"}, - {"dir_1/dir1_file_1.exe"}, - {"dir_1/dir1_file_2.pdb"}, - {"dir_1/dir1_file_3.txt"}, - {"dir_2/dir2_dir1/dir2_dir1_file_1.exe"}, - {"dir_2/dir2_dir1/dir2_dir1_file_2.pdb"}, - {"dir_2/dir2_dir1/dir2_dir1_file_3.dll"}, - {"dir_2/dir2_dir2/dir2_dir2_file_1.txt"}, - {"dir_2/dir2_dir2/dir2_dir2_file_2.json"}}; - const uint64_t Sizes[FileCount] = - {6u * 1024u, 0, 798, 19u * 1024u, 7u * 1024u, 93, 31u * 1024u, 17u * 1024u, 13u * 1024u, 2u * 1024u, 3u * 1024u}; - - ScopedTemporaryDirectory SourceFolder; - TestState State(SourceFolder.Path()); - State.Initialize(); - State.CreateSourceData("source", Paths, Sizes); - - std::span<const std::string> ManifestFiles(Paths); - ManifestFiles = ManifestFiles.subspan(0, FileCount / 2); - - std::span<const uint64_t> ManifestSizes(Sizes); - ManifestSizes = ManifestSizes.subspan(0, FileCount / 2); - - ExtendableStringBuilder<1024> Manifest; - for (const std::string& FilePath : ManifestFiles) - { - Manifest << FilePath << "\n"; - } - - WriteFile(State.RootPath / "manifest.txt", IoBuffer(IoBuffer::Wrap, Manifest.Data(), Manifest.Size())); - - const Oid BuildId = Oid::NewOid(); - const Oid BuildPartId = Oid::NewOid(); - const std::string BuildPartName = "default"; - - auto Result = State.Upload(BuildId, BuildPartId, BuildPartName, "source", State.RootPath / "manifest.txt"); - - CHECK_EQ(Result.size(), 1u); - CHECK_EQ(Result[0].first, BuildPartId); - CHECK_EQ(Result[0].second, BuildPartName); - State.ValidateUpload(BuildId, Result); - - FolderContent DownloadContent = State.Download(BuildId, Oid::Zero, {}, "download", /* Append */ false); - State.ValidateDownload(ManifestFiles, ManifestSizes, "source", "download", DownloadContent); -} - -TEST_CASE("buildstorageoperations.memorychunkingcache") -{ - using namespace buildstorageoperations_testutils; - - FastRandom BaseRandom; - - const size_t FileCount = 11; - - const std::string Paths[FileCount] = {{"file_1"}, - {"file_2.exe"}, - {"file_3.txt"}, - {"dir_1/dir1_file_1.exe"}, - {"dir_1/dir1_file_2.pdb"}, - {"dir_1/dir1_file_3.txt"}, - {"dir_2/dir2_dir1/dir2_dir1_file_1.exe"}, - {"dir_2/dir2_dir1/dir2_dir1_file_2.pdb"}, - {"dir_2/dir2_dir1/dir2_dir1_file_3.dll"}, - {"dir_2/dir2_dir2/dir2_dir2_file_1.txt"}, - {"dir_2/dir2_dir2/dir2_dir2_file_2.json"}}; - const uint64_t Sizes[FileCount] = - {6u * 1024u, 0, 798, 19u * 1024u, 7u * 1024u, 93, 31u * 1024u, 17u * 1024u, 13u * 1024u, 2u * 1024u, 3u * 1024u}; - - ScopedTemporaryDirectory SourceFolder; - TestState State(SourceFolder.Path()); - State.Initialize(); - State.CreateSourceData("source", Paths, Sizes); - - const Oid BuildId = Oid::NewOid(); - const Oid BuildPartId = Oid::NewOid(); - const std::string BuildPartName = "default"; - - { - const std::filesystem::path SourcePath = SourceFolder.Path() / "source"; - CbObject MetaData; - BuildsOperationUploadFolder Upload(*State.LogOutput, - State.Storage, - State.AbortFlag, - State.PauseFlag, - State.WorkerPool, - State.NetworkPool, - BuildId, - SourcePath, - true, - MetaData, - BuildsOperationUploadFolder::Options{.TempDir = State.TempPath}); - auto Result = Upload.Execute(BuildPartId, BuildPartName, {}, *State.ChunkController, *State.ChunkCache); - - CHECK_EQ(Upload.m_ChunkingStats.FilesStoredInCache.load(), FileCount - 1); // Zero size files are not stored in cache - CHECK_EQ(Upload.m_ChunkingStats.BytesStoredInCache.load(), std::accumulate(&Sizes[0], &Sizes[FileCount], uint64_t(0))); - CHECK(Upload.m_ChunkingStats.ChunksStoredInCache.load() >= FileCount - 1); // Zero size files are not stored in cache - - CHECK_EQ(Result.size(), 1u); - CHECK_EQ(Result[0].first, BuildPartId); - CHECK_EQ(Result[0].second, BuildPartName); - } - - auto Result = State.Upload(BuildId, BuildPartId, BuildPartName, "source", {}); - - const Oid BuildId2 = Oid::NewOid(); - const Oid BuildPartId2 = Oid::NewOid(); - - { - const std::filesystem::path SourcePath = SourceFolder.Path() / "source"; - CbObject MetaData; - BuildsOperationUploadFolder Upload(*State.LogOutput, - State.Storage, - State.AbortFlag, - State.PauseFlag, - State.WorkerPool, - State.NetworkPool, - BuildId2, - SourcePath, - true, - MetaData, - BuildsOperationUploadFolder::Options{.TempDir = State.TempPath}); - Upload.Execute(BuildPartId2, BuildPartName, {}, *State.ChunkController, *State.ChunkCache); - - CHECK_EQ(Upload.m_ChunkingStats.FilesFoundInCache.load(), FileCount - 1); // Zero size files are not stored in cache - CHECK_EQ(Upload.m_ChunkingStats.BytesFoundInCache.load(), std::accumulate(&Sizes[0], &Sizes[FileCount], uint64_t(0))); - CHECK(Upload.m_ChunkingStats.ChunksFoundInCache.load() >= FileCount - 1); // Zero size files are not stored in cache - } - - FolderContent DownloadContent = State.Download(BuildId2, BuildPartId2, {}, "download", /* Append */ false); - State.ValidateDownload(Paths, Sizes, "source", "download", DownloadContent); -} - -TEST_CASE("buildstorageoperations.upload.multipart") -{ - // Disabled since it relies on authentication and specific block being present in cloud storage - if (false) - { - using namespace buildstorageoperations_testutils; - - FastRandom BaseRandom; - - const size_t FileCount = 11; - - const std::string Paths[FileCount] = {{"file_1"}, - {"file_2.exe"}, - {"file_3.txt"}, - {"dir_1/dir1_file_1.exe"}, - {"dir_1/dir1_file_2.pdb"}, - {"dir_1/dir1_file_3.txt"}, - {"dir_2/dir2_dir1/dir2_dir1_file_1.exe"}, - {"dir_2/dir2_dir1/dir2_dir1_file_2.pdb"}, - {"dir_2/dir2_dir1/dir2_dir1_file_3.dll"}, - {"dir_2/dir2_dir2/dir2_dir2_file_1.txt"}, - {"dir_2/dir2_dir2/dir2_dir2_file_2.json"}}; - const uint64_t Sizes[FileCount] = - {6u * 1024u, 0, 798, 19u * 1024u, 7u * 1024u, 93, 31u * 1024u, 17u * 1024u, 13u * 1024u, 2u * 1024u, 3u * 1024u}; - - ScopedTemporaryDirectory SourceFolder; - TestState State(SourceFolder.Path()); - State.Initialize(); - State.CreateSourceData("source", Paths, Sizes); - - std::span<const std::string> ManifestFiles1(Paths); - ManifestFiles1 = ManifestFiles1.subspan(0, FileCount / 2); - - std::span<const uint64_t> ManifestSizes1(Sizes); - ManifestSizes1 = ManifestSizes1.subspan(0, FileCount / 2); - - std::span<const std::string> ManifestFiles2(Paths); - ManifestFiles2 = ManifestFiles2.subspan(FileCount / 2 - 1); - - std::span<const uint64_t> ManifestSizes2(Sizes); - ManifestSizes2 = ManifestSizes2.subspan(FileCount / 2 - 1); - - const Oid BuildPart1Id = Oid::NewOid(); - const std::string BuildPart1Name = "part1"; - const Oid BuildPart2Id = Oid::NewOid(); - const std::string BuildPart2Name = "part2"; - { - CbObjectWriter Writer; - Writer.BeginObject("parts"sv); - { - Writer.BeginObject(BuildPart1Name); - { - Writer.AddObjectId("partId"sv, BuildPart1Id); - Writer.BeginArray("files"sv); - for (const std::string& ManifestFile : ManifestFiles1) - { - Writer.AddString(ManifestFile); - } - Writer.EndArray(); // files - } - Writer.EndObject(); // part1 - - Writer.BeginObject(BuildPart2Name); - { - Writer.AddObjectId("partId"sv, BuildPart2Id); - Writer.BeginArray("files"sv); - for (const std::string& ManifestFile : ManifestFiles2) - { - Writer.AddString(ManifestFile); - } - Writer.EndArray(); // files - } - Writer.EndObject(); // part2 - } - Writer.EndObject(); // parts - - ExtendableStringBuilder<1024> Manifest; - CompactBinaryToJson(Writer.Save(), Manifest); - WriteFile(State.RootPath / "manifest.json", IoBuffer(IoBuffer::Wrap, Manifest.Data(), Manifest.Size())); - } - - const Oid BuildId = Oid::NewOid(); - - auto Result = State.Upload(BuildId, {}, {}, "source", State.RootPath / "manifest.json"); - - CHECK_EQ(Result.size(), 2u); - CHECK_EQ(Result[0].first, BuildPart1Id); - CHECK_EQ(Result[0].second, BuildPart1Name); - CHECK_EQ(Result[1].first, BuildPart2Id); - CHECK_EQ(Result[1].second, BuildPart2Name); - State.ValidateUpload(BuildId, Result); - - FolderContent DownloadContent = State.Download(BuildId, Oid::Zero, {}, "download", /* Append */ false); - State.ValidateDownload(Paths, Sizes, "source", "download", DownloadContent); - - FolderContent Part1DownloadContent = State.Download(BuildId, BuildPart1Id, {}, "download_part1", /* Append */ false); - State.ValidateDownload(ManifestFiles1, ManifestSizes1, "source", "download_part1", Part1DownloadContent); - - FolderContent Part2DownloadContent = State.Download(BuildId, Oid::Zero, BuildPart2Name, "download_part2", /* Append */ false); - State.ValidateDownload(ManifestFiles2, ManifestSizes2, "source", "download_part2", Part2DownloadContent); - - (void)State.Download(BuildId, BuildPart1Id, BuildPart1Name, "download_part1+2", /* Append */ false); - FolderContent Part1And2DownloadContent = State.Download(BuildId, BuildPart2Id, {}, "download_part1+2", /* Append */ true); - State.ValidateDownload(Paths, Sizes, "source", "download_part1+2", Part1And2DownloadContent); - } -} - -TEST_CASE("buildstorageoperations.partial.block.download" * doctest::skip(true)) -{ - const std::string OidcExecutableName = "OidcToken" ZEN_EXE_SUFFIX_LITERAL; - std::filesystem::path OidcTokenExePath = (GetRunningExecutablePath().parent_path() / OidcExecutableName).make_preferred(); - - HttpClientSettings ClientSettings{ - .LogCategory = "httpbuildsclient", - .AccessTokenProvider = - httpclientauth::CreateFromOidcTokenExecutable(OidcTokenExePath, "https://jupiter.devtools.epicgames.com", true, false, false), - .AssumeHttp2 = false, - .AllowResume = true, - .RetryCount = 0, - .Verbose = false}; - - HttpClient HttpClient("https://euc.jupiter.devtools.epicgames.com", ClientSettings); - - const std::string_view Namespace = "fortnite.oplog"; - const std::string_view Bucket = "fortnitegame.staged-build.fortnite-main.ps4-client"; - const Oid BuildId = Oid::FromHexString("09a76ea92ad301d4724fafad"); - - { - HttpClient::Response Response = HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, Bucket, BuildId), - HttpClient::Accept(ZenContentType::kCbObject)); - CbValidateError ValidateResult = CbValidateError::None; - CbObject Object = ValidateAndReadCompactBinaryObject(IoBuffer(Response.ResponsePayload), ValidateResult); - REQUIRE(ValidateResult == CbValidateError::None); - } - - std::vector<ChunkBlockDescription> BlockDescriptions; - { - CbObjectWriter Request; - - Request.BeginArray("blocks"sv); - { - Request.AddHash(IoHash::FromHexString("7c353ed782675a5e8f968e61e51fc797ecdc2882")); - } - Request.EndArray(); - - IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); - Payload.SetContentType(ZenContentType::kCbObject); - - HttpClient::Response BlockDescriptionsResponse = - HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/blocks/getBlockMetadata", Namespace, Bucket, BuildId), - Payload, - HttpClient::Accept(ZenContentType::kCbObject)); - REQUIRE(BlockDescriptionsResponse.IsSuccess()); - - CbValidateError ValidateResult = CbValidateError::None; - CbObject Object = ValidateAndReadCompactBinaryObject(IoBuffer(BlockDescriptionsResponse.ResponsePayload), ValidateResult); - REQUIRE(ValidateResult == CbValidateError::None); - - { - CbArrayView BlocksArray = Object["blocks"sv].AsArrayView(); - for (CbFieldView Block : BlocksArray) - { - ChunkBlockDescription Description = ParseChunkBlockDescription(Block.AsObjectView()); - BlockDescriptions.emplace_back(std::move(Description)); - } - } - } - - REQUIRE(!BlockDescriptions.empty()); - - const IoHash BlockHash = BlockDescriptions.back().BlockHash; - - const ChunkBlockDescription& BlockDescription = BlockDescriptions.front(); - REQUIRE(!BlockDescription.ChunkRawHashes.empty()); - REQUIRE(!BlockDescription.ChunkCompressedLengths.empty()); - - std::vector<std::pair<uint64_t, uint64_t>> ChunkOffsetAndSizes; - uint64_t Offset = gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); - - for (uint32_t ChunkCompressedSize : BlockDescription.ChunkCompressedLengths) - { - ChunkOffsetAndSizes.push_back(std::make_pair(Offset, ChunkCompressedSize)); - Offset += ChunkCompressedSize; - } - - ScopedTemporaryDirectory SourceFolder; - - auto Validate = [&](std::span<const uint32_t> ChunkIndexesToFetch) { - std::vector<std::pair<uint64_t, uint64_t>> Ranges; - for (uint32_t ChunkIndex : ChunkIndexesToFetch) - { - Ranges.push_back(ChunkOffsetAndSizes[ChunkIndex]); - } - - HttpClient::KeyValueMap Headers; - if (!Ranges.empty()) - { - ExtendableStringBuilder<512> SB; - for (const std::pair<uint64_t, uint64_t>& R : Ranges) - { - if (SB.Size() > 0) - { - SB << ", "; - } - SB << R.first << "-" << R.first + R.second - 1; - } - Headers.Entries.insert({"Range", fmt::format("bytes={}", SB.ToView())}); - } - - HttpClient::Response GetBlobRangesResponse = HttpClient.Download( - fmt::format("/api/v2/builds/{}/{}/{}/blobs/{}?supportsRedirect=false", Namespace, Bucket, BuildId, BlockHash), - SourceFolder.Path(), - Headers); - - REQUIRE(GetBlobRangesResponse.IsSuccess()); - [[maybe_unused]] MemoryView RangesMemoryView = GetBlobRangesResponse.ResponsePayload.GetView(); - - std::vector<std::pair<uint64_t, uint64_t>> PayloadRanges = GetBlobRangesResponse.GetRanges(Ranges); - if (PayloadRanges.empty()) - { - // We got the whole blob, use the ranges as is - PayloadRanges = Ranges; - } - - REQUIRE(PayloadRanges.size() == Ranges.size()); - - for (uint32_t RangeIndex = 0; RangeIndex < PayloadRanges.size(); RangeIndex++) - { - const std::pair<uint64_t, uint64_t>& PayloadRange = PayloadRanges[RangeIndex]; - - CHECK_EQ(PayloadRange.second, Ranges[RangeIndex].second); - - IoBuffer ChunkPayload(GetBlobRangesResponse.ResponsePayload, PayloadRange.first, PayloadRange.second); - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(SharedBuffer(ChunkPayload), RawHash, RawSize); - CHECK(CompressedChunk); - CHECK_EQ(RawHash, BlockDescription.ChunkRawHashes[ChunkIndexesToFetch[RangeIndex]]); - CHECK_EQ(RawSize, BlockDescription.ChunkRawLengths[ChunkIndexesToFetch[RangeIndex]]); - } - }; - - { - // Single - std::vector<uint32_t> ChunkIndexesToFetch{uint32_t(BlockDescription.ChunkCompressedLengths.size() / 2)}; - Validate(ChunkIndexesToFetch); - } - { - // Many - std::vector<uint32_t> ChunkIndexesToFetch; - for (uint32_t Index = 0; Index < BlockDescription.ChunkCompressedLengths.size() / 16; Index++) - { - ChunkIndexesToFetch.push_back(uint32_t(BlockDescription.ChunkCompressedLengths.size() / 6 + Index * 7)); - ChunkIndexesToFetch.push_back(uint32_t(BlockDescription.ChunkCompressedLengths.size() / 6 + Index * 7 + 1)); - ChunkIndexesToFetch.push_back(uint32_t(BlockDescription.ChunkCompressedLengths.size() / 6 + Index * 7 + 3)); - } - Validate(ChunkIndexesToFetch); - } - - { - // First and last - std::vector<uint32_t> ChunkIndexesToFetch{0, uint32_t(BlockDescription.ChunkCompressedLengths.size() - 1)}; - Validate(ChunkIndexesToFetch); - } -} -TEST_SUITE_END(); - -void -buildstorageoperations_forcelink() -{ -} - -#endif // ZEN_WITH_TESTS - -} // namespace zen |