diff options
| author | Dan Engelbrecht <[email protected]> | 2025-10-13 13:32:36 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-13 13:32:36 +0200 |
| commit | a6925de9bca8579637fa8a4152ab2b77ef5ca90e (patch) | |
| tree | e2742dcc584e78de7f8921bd655c22929b37da80 /src/zen/cmds/builds_cmd.cpp | |
| parent | move service common code into base class (#567) (diff) | |
| download | archived-zen-a6925de9bca8579637fa8a4152ab2b77ef5ca90e.tar.xz archived-zen-a6925de9bca8579637fa8a4152ab2b77ef5ca90e.zip | |
refactor builds cmd (#566)
Move builds download code from builds_cmd.cpp to remotestorelib
Diffstat (limited to 'src/zen/cmds/builds_cmd.cpp')
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 4598 |
1 files changed, 152 insertions, 4446 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index e40d4366b..00bba6f4f 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -24,6 +24,7 @@ #include <zenhttp/httpclientauth.h> #include <zenhttp/httpcommon.h> #include <zenremotestore/builds/buildstoragecache.h> +#include <zenremotestore/builds/buildstorageoperations.h> #include <zenremotestore/builds/filebuildstorage.h> #include <zenremotestore/builds/jupiterbuildstorage.h> #include <zenremotestore/chunking/chunkblock.h> @@ -31,6 +32,7 @@ #include <zenremotestore/chunking/chunkedfile.h> #include <zenremotestore/chunking/chunkingcontroller.h> #include <zenremotestore/jupiter/jupiterhost.h> +#include <zenutil/bufferedopenfile.h> #include <zenutil/bufferedwritefilecache.h> #include <zenutil/wildcard.h> #include <zenutil/workerpools.h> @@ -334,7 +336,6 @@ namespace { std::filesystem::path ZenStateFilePath(const std::filesystem::path& ZenFolderPath) { return ZenFolderPath / "current_state.cbo"; } // std::filesystem::path ZenStateFileJsonPath(const std::filesystem::path& ZenFolderPath) { return ZenFolderPath / "current_state.json"; // } - std::filesystem::path ZenTempFolderPath(const std::filesystem::path& ZenFolderPath) { return ZenFolderPath / "tmp"; } std::filesystem::path ZenTempCacheFolderPath(const std::filesystem::path& ZenFolderPath) { @@ -356,11 +357,6 @@ namespace { return ZenTempFolderPath(ZenFolderPath) / "download"; // Temp storage for decompressed and validated chunks } - // std::filesystem::path ZenTempStorageFolderPath(const std::filesystem::path& ZenFolderPath) - // { - // return ZenTempFolderPath(ZenFolderPath) / "storage"; // Temp storage folder for BuildStorage implementations - // } - const std::string ZenExcludeManifestName = ".zen_exclude_manifest.txt"; const std::string UnsyncFolderName = ".unsync"; @@ -368,8 +364,8 @@ namespace { const std::string UGSFolderName = ".ugs"; const std::string LegacyZenTempFolderName = ".zen-tmp"; - const std::vector<std::string_view> DefaultExcludeFolders({UnsyncFolderName, ZenFolderName, UGSFolderName, LegacyZenTempFolderName}); - const std::vector<std::string_view> DefaultExcludeExtensions({}); + const std::vector<std::string> DefaultExcludeFolders({UnsyncFolderName, ZenFolderName, UGSFolderName, LegacyZenTempFolderName}); + const std::vector<std::string> DefaultExcludeExtensions({}); static bool IsVerbose = false; static bool IsQuiet = false; @@ -408,6 +404,62 @@ namespace { ); + class ConsoleOpLogProgressBar : public BuildOpLogOutput::ProgressBar + { + public: + ConsoleOpLogProgressBar(zen::ProgressBar::Mode InMode, std::string_view InSubTask) : m_Inner(InMode, InSubTask) {} + + virtual void UpdateState(const State& NewState, bool DoLinebreak) + { + zen::ProgressBar::State State = {.Task = NewState.Task, + .Details = NewState.Details, + .TotalCount = NewState.TotalCount, + .RemainingCount = NewState.RemainingCount, + .Status = ConvertStatus(NewState.Status)}; + m_Inner.UpdateState(State, DoLinebreak); + } + virtual void Finish() { m_Inner.Finish(); } + + private: + zen::ProgressBar::State::EStatus ConvertStatus(State::EStatus Status) + { + switch (Status) + { + case State::EStatus::Running: + return zen::ProgressBar::State::EStatus::Running; + case State::EStatus::Aborted: + return zen::ProgressBar::State::EStatus::Aborted; + case State::EStatus::Paused: + return zen::ProgressBar::State::EStatus::Paused; + default: + return (zen::ProgressBar::State::EStatus)Status; + } + } + zen::ProgressBar m_Inner; + }; + + class ConsoleOpLogOutput : public BuildOpLogOutput + { + public: + ConsoleOpLogOutput(zen::ProgressBar::Mode InMode) : m_Mode(InMode) {} + virtual void EmitLogMessage(int LogLevel, std::string_view Format, fmt::format_args Args) + { + logging::EmitConsoleLogMessage(LogLevel, Format, Args); + } + + virtual void SetLogOperationName(std::string_view Name) { zen::ProgressBar::SetLogOperationName(m_Mode, Name); } + virtual void SetLogOperationProgress(uint32_t StepIndex, uint32_t StepCount) + { + zen::ProgressBar::SetLogOperationProgress(m_Mode, StepIndex, StepCount); + } + virtual uint32_t GetProgressUpdateDelayMS() { return GetUpdateDelayMS(m_Mode); } + + virtual ProgressBar* CreateProgressBar(std::string_view InSubTask) { return new ConsoleOpLogProgressBar(m_Mode, InSubTask); } + + private: + zen::ProgressBar::Mode m_Mode; + }; + bool IncludePath(std::span<const std::string> IncludeWildcards, std::span<const std::string> ExcludeWildcards, const std::filesystem::path& Path) @@ -481,46 +533,6 @@ namespace { return Result; } - void RenameFileWithRetry(const std::filesystem::path& SourcePath, const std::filesystem::path& TargetPath) - { - std::error_code Ec; - RenameFile(SourcePath, TargetPath, Ec); - for (size_t Retries = 0; Ec && Retries < 10; Retries++) - { - ZEN_ASSERT_SLOW(IsFile(SourcePath)); - if (Retries > 5) - { - ZEN_CONSOLE_WARN("Unable to overwrite file {} ({}: {}), retrying...", TargetPath, Ec.value(), Ec.message()); - } - Sleep(50 + int(Retries * 150)); - Ec.clear(); - RenameFile(SourcePath, TargetPath, Ec); - } - if (Ec) - { - throw std::system_error(std::error_code(Ec.value(), std::system_category()), - fmt::format("Rename from '{}' to '{}' failed with: {}", SourcePath, TargetPath, Ec.message())); - } - } - - void TryRemoveFile(const std::filesystem::path& Path) - { - std::error_code Ec; - RemoveFile(Path, Ec); - if (Ec) - { - if (IsFile(Path, Ec)) - { - Ec.clear(); - RemoveFile(Path, Ec); - if (Ec) - { - ZEN_DEBUG("Failed removing file '{}', reason: {}", Path, Ec.message()); - } - } - } - } - void RemoveFileWithRetry(const std::filesystem::path& Path) { std::error_code Ec; @@ -563,89 +575,6 @@ namespace { } } - void CopyFile(const std::filesystem::path& SourceFilePath, - const std::filesystem::path& TargetFilePath, - uint64_t RawSize, - std::atomic<uint64_t>& WriteCount, - std::atomic<uint64_t>& WriteByteCount, - std::atomic<uint64_t>& CloneCount, - std::atomic<uint64_t>& CloneByteCount) - { - if (AllowFileClone && TryCloneFile(SourceFilePath, TargetFilePath)) - { - WriteCount += 1; - WriteByteCount += RawSize; - CloneCount += 1; - CloneByteCount += RawSize; - } - else - { - BasicFile TargetFile(TargetFilePath, BasicFile::Mode::kTruncate); - if (UseSparseFiles) - { - PrepareFileForScatteredWrite(TargetFile.Handle(), RawSize); - } - uint64_t Offset = 0; - if (!ScanFile(SourceFilePath, 512u * 1024u, [&](const void* Data, size_t Size) { - TargetFile.Write(Data, Size, Offset); - Offset += Size; - WriteCount++; - WriteByteCount += Size; - })) - { - throw std::runtime_error(fmt::format("Failed to copy scavenged file '{}' to '{}'", SourceFilePath, TargetFilePath)); - } - } - } - - uint32_t SetNativeFileAttributes(const std::filesystem::path FilePath, SourcePlatform SourcePlatform, uint32_t Attributes) - { -#if ZEN_PLATFORM_WINDOWS - if (SourcePlatform == SourcePlatform::Windows) - { - SetFileAttributes(FilePath, Attributes); - return Attributes; - } - else - { - uint32_t CurrentAttributes = GetFileAttributes(FilePath); - uint32_t NewAttributes = MakeFileAttributeReadOnly(CurrentAttributes, IsFileModeReadOnly(Attributes)); - if (CurrentAttributes != NewAttributes) - { - SetFileAttributes(FilePath, NewAttributes); - } - return NewAttributes; - } -#endif // ZEN_PLATFORM_WINDOWS -#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC - if (SourcePlatform != SourcePlatform::Windows) - { - SetFileMode(FilePath, Attributes); - return Attributes; - } - else - { - uint32_t CurrentMode = GetFileMode(FilePath); - uint32_t NewMode = MakeFileModeReadOnly(CurrentMode, IsFileAttributeReadOnly(Attributes)); - if (CurrentMode != NewMode) - { - SetFileMode(FilePath, NewMode); - } - return NewMode; - } -#endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC - }; - - uint32_t GetNativeFileAttributes(const std::filesystem::path FilePath) - { -#if ZEN_PLATFORM_WINDOWS - return GetFileAttributes(FilePath); -#endif // ZEN_PLATFORM_WINDOWS -#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC - return GetFileMode(FilePath); -#endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC - } - template<typename T> std::string FormatArray(std::span<const T> Items, std::string_view Prefix) { @@ -657,7 +586,7 @@ namespace { return SB.ToString(); } - bool CleanDirectory(const std::filesystem::path& Path, std::span<const std::string_view> ExcludeDirectories) + bool CleanDirectory(const std::filesystem::path& Path, std::span<const std::string> ExcludeDirectories) { ZEN_TRACE_CPU("CleanDirectory"); Stopwatch Timer; @@ -672,12 +601,12 @@ namespace { struct AsyncVisitor : public GetDirectoryContentVisitor { - AsyncVisitor(const std::filesystem::path& InPath, - std::atomic<bool>& InCleanWipe, - std::atomic<uint64_t>& InDiscoveredItemCount, - std::atomic<uint64_t>& InDeletedItemCount, - std::atomic<uint64_t>& InDeletedByteCount, - std::span<const std::string_view> InExcludeDirectories) + AsyncVisitor(const std::filesystem::path& InPath, + std::atomic<bool>& InCleanWipe, + std::atomic<uint64_t>& InDiscoveredItemCount, + std::atomic<uint64_t>& InDeletedItemCount, + std::atomic<uint64_t>& InDeletedByteCount, + std::span<const std::string> InExcludeDirectories) : Path(InPath) , CleanWipe(InCleanWipe) , DiscoveredItemCount(InDiscoveredItemCount) @@ -742,12 +671,12 @@ namespace { } } } - const std::filesystem::path& Path; - std::atomic<bool>& CleanWipe; - std::atomic<uint64_t>& DiscoveredItemCount; - std::atomic<uint64_t>& DeletedItemCount; - std::atomic<uint64_t>& DeletedByteCount; - std::span<const std::string_view> ExcludeDirectories; + const std::filesystem::path& Path; + std::atomic<bool>& CleanWipe; + std::atomic<uint64_t>& DiscoveredItemCount; + std::atomic<uint64_t>& DeletedItemCount; + std::atomic<uint64_t>& DeletedByteCount; + std::span<const std::string> ExcludeDirectories; } Visitor(Path, CleanWipe, DiscoveredItemCount, DeletedItemCount, DeletedByteCount, ExcludeDirectories); GetDirectoryContent( @@ -765,7 +694,7 @@ namespace { for (std::filesystem::path& LocalDirPath : LocalDirectoryContent.Directories) { bool Leave = false; - for (const std::string_view ExcludeDirectory : ExcludeDirectories) + for (const std::string& ExcludeDirectory : ExcludeDirectories) { if (LocalDirPath == (Path / ExcludeDirectory)) { @@ -957,29 +886,6 @@ namespace { return Count * 1000000 / ElapsedWallTimeUS; } - std::filesystem::path GetTempChunkedSequenceFileName(const std::filesystem::path& CacheFolderPath, const IoHash& RawHash) - { - return CacheFolderPath / (RawHash.ToHexString() + ".tmp"); - } - - std::filesystem::path GetFinalChunkedSequenceFileName(const std::filesystem::path& CacheFolderPath, const IoHash& RawHash) - { - return CacheFolderPath / RawHash.ToHexString(); - } - - struct DiskStatistics - { - std::atomic<uint64_t> OpenReadCount = 0; - std::atomic<uint64_t> OpenWriteCount = 0; - std::atomic<uint64_t> ReadCount = 0; - std::atomic<uint64_t> ReadByteCount = 0; - std::atomic<uint64_t> WriteCount = 0; - std::atomic<uint64_t> WriteByteCount = 0; - std::atomic<uint64_t> CloneCount = 0; - std::atomic<uint64_t> CloneByteCount = 0; - std::atomic<uint64_t> CurrentOpenFileCount = 0; - }; - struct FindBlocksStatistics { uint64_t FindBlockTimeMS = 0; @@ -1061,66 +967,6 @@ namespace { } }; - struct CacheMappingStatistics - { - uint64_t CacheChunkCount = 0; - uint64_t CacheChunkByteCount = 0; - - uint64_t CacheBlockCount = 0; - uint64_t CacheBlocksByteCount = 0; - - uint64_t CacheSequenceHashesCount = 0; - uint64_t CacheSequenceHashesByteCount = 0; - - uint64_t CacheScanElapsedWallTimeUs = 0; - - uint32_t LocalPathsMatchingSequencesCount = 0; - uint64_t LocalPathsMatchingSequencesByteCount = 0; - - uint64_t LocalChunkMatchingRemoteCount = 0; - uint64_t LocalChunkMatchingRemoteByteCount = 0; - - uint64_t LocalScanElapsedWallTimeUs = 0; - - uint32_t ScavengedPathsMatchingSequencesCount = 0; - uint64_t ScavengedPathsMatchingSequencesByteCount = 0; - - uint64_t ScavengedChunkMatchingRemoteCount = 0; - uint64_t ScavengedChunkMatchingRemoteByteCount = 0; - - uint64_t ScavengeElapsedWallTimeUs = 0; - }; - - struct DownloadStatistics - { - std::atomic<uint64_t> RequestsCompleteCount = 0; - - std::atomic<uint64_t> DownloadedChunkCount = 0; - std::atomic<uint64_t> DownloadedChunkByteCount = 0; - std::atomic<uint64_t> MultipartAttachmentCount = 0; - - std::atomic<uint64_t> DownloadedBlockCount = 0; - std::atomic<uint64_t> DownloadedBlockByteCount = 0; - - std::atomic<uint64_t> DownloadedPartialBlockCount = 0; - std::atomic<uint64_t> DownloadedPartialBlockByteCount = 0; - }; - - struct WriteChunkStatistics - { - uint64_t DownloadTimeUs = 0; - uint64_t WriteTimeUs = 0; - uint64_t WriteChunksElapsedWallTimeUs = 0; - }; - - struct RebuildFolderStateStatistics - { - uint64_t CleanFolderElapsedWallTimeUs = 0; - std::atomic<uint32_t> FinalizeTreeFilesMovedCount = 0; - std::atomic<uint32_t> FinalizeTreeFilesCopiedCount = 0; - uint64_t FinalizeTreeElapsedWallTimeUs = 0; - }; - struct VerifyFolderStatistics { std::atomic<uint64_t> FilesVerified = 0; @@ -1129,16 +975,6 @@ namespace { uint64_t VerifyElapsedWallTimeUs = 0; }; - struct StorageInstance - { - std::unique_ptr<HttpClient> BuildStorageHttp; - std::unique_ptr<BuildStorage> BuildStorage; - std::string StorageName; - std::unique_ptr<HttpClient> CacheHttp; - std::unique_ptr<BuildStorageCache> BuildCacheStorage; - std::string CacheName; - }; - std::vector<uint32_t> CalculateAbsoluteChunkOrders(const std::span<const IoHash> LocalChunkHashes, const std::span<const uint32_t> LocalChunkOrder, const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& ChunkHashToLocalChunkIndex, @@ -1632,405 +1468,6 @@ namespace { TemporaryFile::SafeWriteFile(WritePath, JsonPayload); } - struct ScavengeSource - { - std::filesystem::path StateFilePath; - std::filesystem::path Path; - }; - - std::vector<ScavengeSource> GetDownloadedStatePaths(const std::filesystem::path& SystemRootDir) - { - std::vector<ScavengeSource> Result; - DirectoryContent Content; - GetDirectoryContent(SystemRootDir / "builds" / "downloads", DirectoryContentFlags::IncludeFiles, Content); - for (const std::filesystem::path& EntryPath : Content.Files) - { - bool DeleteEntry = false; - IoHash EntryPathHash; - if (IoHash::TryParse(EntryPath.stem().string(), EntryPathHash)) - { - // Read state and verify that it is valid - IoBuffer MetaDataJson = ReadFile(EntryPath).Flatten(); - std::string_view Json(reinterpret_cast<const char*>(MetaDataJson.GetData()), MetaDataJson.GetSize()); - std::string JsonError; - CbObject DownloadInfo = LoadCompactBinaryFromJson(Json, JsonError).AsObject(); - if (JsonError.empty()) - { - std::filesystem::path StateFilePath = DownloadInfo["statePath"].AsU8String(); - if (IsFile(StateFilePath)) - { - std::filesystem::path Path = DownloadInfo["path"].AsU8String(); - if (IsDir(Path)) - { - Result.push_back({.StateFilePath = std::move(StateFilePath), .Path = std::move(Path)}); - } - else - { - DeleteEntry = true; - } - } - else - { - DeleteEntry = true; - } - } - else - { - ZEN_CONSOLE_WARN("Invalid download state file at {}. '{}'", EntryPath, JsonError); - DeleteEntry = true; - } - } - - if (DeleteEntry) - { - std::error_code DummyEc; - std::filesystem::remove(EntryPath, DummyEc); - } - } - return Result; - } - - struct BlockRangeDescriptor - { - uint32_t BlockIndex = (uint32_t)-1; - uint64_t RangeStart = 0; - uint64_t RangeLength = 0; - uint32_t ChunkBlockIndexStart = 0; - uint32_t ChunkBlockIndexCount = 0; - }; - - BlockRangeDescriptor MergeBlockRanges(std::span<const BlockRangeDescriptor> Ranges) - { - ZEN_ASSERT(Ranges.size() > 1); - const BlockRangeDescriptor& First = Ranges.front(); - const BlockRangeDescriptor& Last = Ranges.back(); - - return BlockRangeDescriptor{ - .BlockIndex = First.BlockIndex, - .RangeStart = First.RangeStart, - .RangeLength = Last.RangeStart + Last.RangeLength - First.RangeStart, - .ChunkBlockIndexStart = First.ChunkBlockIndexStart, - .ChunkBlockIndexCount = Last.ChunkBlockIndexStart + Last.ChunkBlockIndexCount - First.ChunkBlockIndexStart}; - } - - std::optional<std::vector<BlockRangeDescriptor>> MakeOptionalBlockRangeVector(uint64_t TotalBlockSize, - const BlockRangeDescriptor& Range) - { - if (Range.RangeLength == TotalBlockSize) - { - return {}; - } - else - { - return std::vector<BlockRangeDescriptor>{Range}; - } - }; - - struct BlockRangeLimit - { - uint16_t SizePercent; - uint16_t MaxRangeCount; - }; - - static const uint16_t FullBlockRangePercentLimit = 95; - - static const std::vector<BlockRangeLimit> ForceMergeLimits = {{.SizePercent = FullBlockRangePercentLimit, .MaxRangeCount = 1}, - {.SizePercent = 90, .MaxRangeCount = 2}, - {.SizePercent = 85, .MaxRangeCount = 8}, - {.SizePercent = 80, .MaxRangeCount = 16}, - {.SizePercent = 70, .MaxRangeCount = 32}, - {.SizePercent = 60, .MaxRangeCount = 48}, - {.SizePercent = 2, .MaxRangeCount = 56}, - {.SizePercent = 0, .MaxRangeCount = 64}}; - - const BlockRangeLimit* GetBlockRangeLimitForRange(std::span<const BlockRangeLimit> Limits, - uint64_t TotalBlockSize, - std::span<const BlockRangeDescriptor> Ranges) - { - if (Ranges.size() > 1) - { - const std::uint64_t WantedSize = - std::accumulate(Ranges.begin(), Ranges.end(), uint64_t(0), [](uint64_t Current, const BlockRangeDescriptor& Range) { - return Current + Range.RangeLength; - }); - - const double RangeRequestedPercent = (WantedSize * 100.0) / TotalBlockSize; - - for (const BlockRangeLimit& Limit : Limits) - { - if (RangeRequestedPercent >= Limit.SizePercent && Ranges.size() > Limit.MaxRangeCount) - { - return &Limit; - } - } - } - return nullptr; - }; - - std::vector<BlockRangeDescriptor> CollapseBlockRanges(const uint64_t AlwaysAcceptableGap, - std::span<const BlockRangeDescriptor> BlockRanges) - { - ZEN_ASSERT(BlockRanges.size() > 1); - std::vector<BlockRangeDescriptor> CollapsedBlockRanges; - - auto BlockRangesIt = BlockRanges.begin(); - CollapsedBlockRanges.push_back(*BlockRangesIt++); - for (; BlockRangesIt != BlockRanges.end(); BlockRangesIt++) - { - BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back(); - - const uint64_t BothRangeSize = BlockRangesIt->RangeLength + LastRange.RangeLength; - - const uint64_t Gap = BlockRangesIt->RangeStart - (LastRange.RangeStart + LastRange.RangeLength); - if (Gap <= Max(BothRangeSize / 16, AlwaysAcceptableGap)) - { - LastRange.ChunkBlockIndexCount = - (BlockRangesIt->ChunkBlockIndexStart + BlockRangesIt->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart; - LastRange.RangeLength = (BlockRangesIt->RangeStart + BlockRangesIt->RangeLength) - LastRange.RangeStart; - } - else - { - CollapsedBlockRanges.push_back(*BlockRangesIt); - } - } - - return CollapsedBlockRanges; - }; - - uint64_t CalculateNextGap(std::span<const BlockRangeDescriptor> BlockRanges) - { - ZEN_ASSERT(BlockRanges.size() > 1); - uint64_t AcceptableGap = (uint64_t)-1; - for (size_t RangeIndex = 0; RangeIndex < BlockRanges.size() - 1; RangeIndex++) - { - const BlockRangeDescriptor& Range = BlockRanges[RangeIndex]; - const BlockRangeDescriptor& NextRange = BlockRanges[RangeIndex + 1]; - - const uint64_t Gap = NextRange.RangeStart - (Range.RangeStart + Range.RangeLength); - AcceptableGap = Min(Gap, AcceptableGap); - } - AcceptableGap = RoundUp(AcceptableGap, 16u * 1024u); - return AcceptableGap; - }; - - std::optional<std::vector<BlockRangeDescriptor>> CalculateBlockRanges(uint32_t BlockIndex, - const ChunkBlockDescription& BlockDescription, - std::span<const uint32_t> BlockChunkIndexNeeded, - bool LimitToSingleRange, - const uint64_t ChunkStartOffsetInBlock, - const uint64_t TotalBlockSize, - uint64_t& OutTotalWantedChunksSize) - { - ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_PartialAnalysis"); - - std::vector<BlockRangeDescriptor> BlockRanges; - { - uint64_t CurrentOffset = ChunkStartOffsetInBlock; - uint32_t ChunkBlockIndex = 0; - uint32_t NeedBlockChunkIndexOffset = 0; - BlockRangeDescriptor NextRange{.BlockIndex = BlockIndex}; - while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() && ChunkBlockIndex < BlockDescription.ChunkRawHashes.size()) - { - const uint32_t ChunkCompressedLength = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex]; - if (ChunkBlockIndex < BlockChunkIndexNeeded[NeedBlockChunkIndexOffset]) - { - if (NextRange.RangeLength > 0) - { - BlockRanges.push_back(NextRange); - NextRange = {.BlockIndex = BlockIndex}; - } - ChunkBlockIndex++; - CurrentOffset += ChunkCompressedLength; - } - else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset]) - { - if (NextRange.RangeLength == 0) - { - NextRange.RangeStart = CurrentOffset; - NextRange.ChunkBlockIndexStart = ChunkBlockIndex; - } - NextRange.RangeLength += ChunkCompressedLength; - NextRange.ChunkBlockIndexCount++; - ChunkBlockIndex++; - CurrentOffset += ChunkCompressedLength; - NeedBlockChunkIndexOffset++; - } - else - { - ZEN_ASSERT(false); - } - } - if (NextRange.RangeLength > 0) - { - BlockRanges.push_back(NextRange); - } - } - ZEN_ASSERT(!BlockRanges.empty()); - - OutTotalWantedChunksSize = - std::accumulate(BlockRanges.begin(), BlockRanges.end(), uint64_t(0), [](uint64_t Current, const BlockRangeDescriptor& Range) { - return Current + Range.RangeLength; - }); - - double RangeWantedPercent = (OutTotalWantedChunksSize * 100.0) / TotalBlockSize; - - if (BlockRanges.size() == 1) - { - ZEN_CONSOLE_VERBOSE("Range request of {} ({:.2f}%) using single range from block {} ({}) as is", - NiceBytes(OutTotalWantedChunksSize), - RangeWantedPercent, - BlockDescription.BlockHash, - NiceBytes(TotalBlockSize)); - return BlockRanges; - } - - if (LimitToSingleRange) - { - const BlockRangeDescriptor MergedRange = MergeBlockRanges(BlockRanges); - const double RangeRequestedPercent = (MergedRange.RangeLength * 100.0) / TotalBlockSize; - const double WastedPercent = ((MergedRange.RangeLength - OutTotalWantedChunksSize) * 100.0) / MergedRange.RangeLength; - ZEN_CONSOLE_VERBOSE( - "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) limited to single block range {} ({:.2f}%) wasting " - "{:.2f}% ({})", - NiceBytes(OutTotalWantedChunksSize), - RangeWantedPercent, - BlockRanges.size(), - BlockDescription.BlockHash, - NiceBytes(TotalBlockSize), - NiceBytes(MergedRange.RangeLength), - RangeRequestedPercent, - WastedPercent, - NiceBytes(MergedRange.RangeLength - OutTotalWantedChunksSize)); - return MakeOptionalBlockRangeVector(TotalBlockSize, MergedRange); - } - - if (RangeWantedPercent > FullBlockRangePercentLimit) - { - const BlockRangeDescriptor MergedRange = MergeBlockRanges(BlockRanges); - const double RangeRequestedPercent = (MergedRange.RangeLength * 100.0) / TotalBlockSize; - const double WastedPercent = ((MergedRange.RangeLength - OutTotalWantedChunksSize) * 100.0) / MergedRange.RangeLength; - ZEN_CONSOLE_VERBOSE( - "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) exceeds {}%. Merged to single block range {} " - "({:.2f}%) wasting {:.2f}% ({})", - NiceBytes(OutTotalWantedChunksSize), - RangeWantedPercent, - BlockRanges.size(), - BlockDescription.BlockHash, - NiceBytes(TotalBlockSize), - FullBlockRangePercentLimit, - NiceBytes(MergedRange.RangeLength), - RangeRequestedPercent, - WastedPercent, - NiceBytes(MergedRange.RangeLength - OutTotalWantedChunksSize)); - return MakeOptionalBlockRangeVector(TotalBlockSize, MergedRange); - } - - std::vector<BlockRangeDescriptor> CollapsedBlockRanges = CollapseBlockRanges(16u * 1024u, BlockRanges); - while (GetBlockRangeLimitForRange(ForceMergeLimits, TotalBlockSize, CollapsedBlockRanges)) - { - CollapsedBlockRanges = CollapseBlockRanges(CalculateNextGap(CollapsedBlockRanges), CollapsedBlockRanges); - } - - const std::uint64_t WantedCollapsedSize = - std::accumulate(CollapsedBlockRanges.begin(), - CollapsedBlockRanges.end(), - uint64_t(0), - [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; }); - - const double CollapsedRangeRequestedPercent = (WantedCollapsedSize * 100.0) / TotalBlockSize; - - { - const double WastedPercent = ((WantedCollapsedSize - OutTotalWantedChunksSize) * 100.0) / WantedCollapsedSize; - - ZEN_CONSOLE_VERBOSE( - "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) collapsed to {} {:.2f}% using {} ranges wasting {:.2f}% " - "({})", - NiceBytes(OutTotalWantedChunksSize), - RangeWantedPercent, - BlockRanges.size(), - BlockDescription.BlockHash, - NiceBytes(TotalBlockSize), - NiceBytes(WantedCollapsedSize), - CollapsedRangeRequestedPercent, - CollapsedBlockRanges.size(), - WastedPercent, - NiceBytes(WantedCollapsedSize - OutTotalWantedChunksSize)); - return CollapsedBlockRanges; - } - }; - - class BufferedOpenFile - { - public: - BufferedOpenFile(const std::filesystem::path Path, DiskStatistics& DiskStats) - : m_Source(Path, BasicFile::Mode::kRead) - , m_SourceSize(m_Source.FileSize()) - , m_DiskStats(DiskStats) - { - m_DiskStats.OpenReadCount++; - m_DiskStats.CurrentOpenFileCount++; - } - ~BufferedOpenFile() { m_DiskStats.CurrentOpenFileCount--; } - BufferedOpenFile() = delete; - BufferedOpenFile(const BufferedOpenFile&) = delete; - BufferedOpenFile(BufferedOpenFile&&) = delete; - BufferedOpenFile& operator=(BufferedOpenFile&&) = delete; - BufferedOpenFile& operator=(const BufferedOpenFile&) = delete; - - const uint64_t BlockSize = 256u * 1024u; - CompositeBuffer GetRange(uint64_t Offset, uint64_t Size) - { - ZEN_TRACE_CPU("BufferedOpenFile::GetRange"); - - ZEN_ASSERT((m_CacheBlockIndex == (uint64_t)-1) || m_Cache); - auto _ = MakeGuard([&]() { ZEN_ASSERT((m_CacheBlockIndex == (uint64_t)-1) || m_Cache); }); - - ZEN_ASSERT((Offset + Size) <= m_SourceSize); - const uint64_t BlockIndexStart = Offset / BlockSize; - const uint64_t BlockIndexEnd = (Offset + Size - 1) / BlockSize; - - std::vector<SharedBuffer> BufferRanges; - BufferRanges.reserve(BlockIndexEnd - BlockIndexStart + 1); - - uint64_t ReadOffset = Offset; - for (uint64_t BlockIndex = BlockIndexStart; BlockIndex <= BlockIndexEnd; BlockIndex++) - { - const uint64_t BlockStartOffset = BlockIndex * BlockSize; - if (m_CacheBlockIndex != BlockIndex) - { - uint64_t CacheSize = Min(BlockSize, m_SourceSize - BlockStartOffset); - ZEN_ASSERT(CacheSize > 0); - m_Cache = IoBuffer(CacheSize); - m_Source.Read(m_Cache.GetMutableView().GetData(), CacheSize, BlockStartOffset); - m_DiskStats.ReadCount++; - m_DiskStats.ReadByteCount += CacheSize; - m_CacheBlockIndex = BlockIndex; - } - - const uint64_t BytesRead = ReadOffset - Offset; - ZEN_ASSERT(BlockStartOffset <= ReadOffset); - const uint64_t OffsetIntoBlock = ReadOffset - BlockStartOffset; - ZEN_ASSERT(OffsetIntoBlock < m_Cache.GetSize()); - const uint64_t BlockBytes = Min(m_Cache.GetSize() - OffsetIntoBlock, Size - BytesRead); - BufferRanges.emplace_back(SharedBuffer(IoBuffer(m_Cache, OffsetIntoBlock, BlockBytes))); - ReadOffset += BlockBytes; - } - CompositeBuffer Result(std::move(BufferRanges)); - ZEN_ASSERT(Result.GetSize() == Size); - return Result; - } - - public: - void* Handle() { return m_Source.Handle(); } - - private: - BasicFile m_Source; - const uint64_t m_SourceSize; - DiskStatistics& m_DiskStats; - uint64_t m_CacheBlockIndex = (uint64_t)-1; - IoBuffer m_Cache; - }; - class ReadFileCache { public: @@ -2079,7 +1516,12 @@ namespace { m_OpenFiles.pop_back(); } m_OpenFiles.insert(m_OpenFiles.begin(), - std::make_pair(SequenceIndex, std::make_unique<BufferedOpenFile>(LocalFilePath, m_DiskStats))); + std::make_pair(SequenceIndex, + std::make_unique<BufferedOpenFile>(LocalFilePath, + m_DiskStats.OpenReadCount, + m_DiskStats.CurrentOpenFileCount, + m_DiskStats.ReadCount, + m_DiskStats.ReadByteCount))); CompositeBuffer Result = m_OpenFiles.front().second->GetRange(Offset, Size); return Result; } @@ -2290,72 +1732,6 @@ namespace { return CompressedBuffer::FromCompressedNoValidate(CompositeBuffer(std::move(ResultBuffers))); }; - void DownloadLargeBlob(BuildStorage& Storage, - const std::filesystem::path& DownloadFolder, - const Oid& BuildId, - const IoHash& ChunkHash, - const std::uint64_t PreferredMultipartChunkSize, - ParallelWork& Work, - WorkerThreadPool& NetworkPool, - DownloadStatistics& DownloadStats, - std::function<void(IoBuffer&& Payload)>&& OnDownloadComplete) - { - ZEN_TRACE_CPU("DownloadLargeBlob"); - - struct WorkloadData - { - TemporaryFile TempFile; - }; - std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>()); - - std::error_code Ec; - Workload->TempFile.CreateTemporary(DownloadFolder, Ec); - if (Ec) - { - throw std::runtime_error( - fmt::format("Failed opening temporary file '{}': {} ({})", Workload->TempFile.GetPath(), Ec.message(), Ec.value())); - } - std::vector<std::function<void()>> WorkItems = Storage.GetLargeBuildBlob( - BuildId, - ChunkHash, - PreferredMultipartChunkSize, - [Workload, &DownloadStats](uint64_t Offset, const IoBuffer& Chunk) { - DownloadStats.DownloadedChunkByteCount += Chunk.GetSize(); - - if (!AbortFlag.load()) - { - ZEN_TRACE_CPU("DownloadLargeBlob_Save"); - Workload->TempFile.Write(Chunk.GetView(), Offset); - } - }, - [Workload, &DownloadStats, OnDownloadComplete = std::move(OnDownloadComplete)]() { - DownloadStats.DownloadedChunkCount++; - if (!AbortFlag.load()) - { - uint64_t PayloadSize = Workload->TempFile.FileSize(); - void* FileHandle = Workload->TempFile.Detach(); - ZEN_ASSERT(FileHandle != nullptr); - IoBuffer Payload(IoBuffer::File, FileHandle, 0, PayloadSize, true); - Payload.SetDeleteOnClose(true); - OnDownloadComplete(std::move(Payload)); - } - }); - if (!WorkItems.empty()) - { - DownloadStats.MultipartAttachmentCount++; - } - for (auto& WorkItem : WorkItems) - { - Work.ScheduleWork(NetworkPool, [WorkItem = std::move(WorkItem)](std::atomic<bool>&) { - ZEN_TRACE_CPU("DownloadLargeBlob_Work"); - if (!AbortFlag) - { - WorkItem(); - } - }); - } - } - struct ValidateStatistics { uint64_t BuildBlobSize = 0; @@ -3843,7 +3219,7 @@ namespace { ChunkingStatistics ChunkingStats; { auto IsAcceptedFolder = [ExcludeFolders = DefaultExcludeFolders](const std::string_view& RelativePath) -> bool { - for (const std::string_view& ExcludeFolder : ExcludeFolders) + for (const std::string& ExcludeFolder : ExcludeFolders) { if (RelativePath.starts_with(ExcludeFolder)) { @@ -3862,7 +3238,7 @@ namespace { auto IsAcceptedFile = [ExcludeExtensions = DefaultExcludeExtensions](const std::string_view& RelativePath, uint64_t, uint32_t) -> bool { - for (const std::string_view& ExcludeExtension : ExcludeExtensions) + for (const std::string& ExcludeExtension : ExcludeExtensions) { if (RelativePath.ends_with(ExcludeExtension)) { @@ -3961,7 +3337,7 @@ namespace { const std::filesystem::path AssetFilePath = (Path / AssetPath).make_preferred(); Content.RawSizes.push_back(FileSizeFromPath(AssetFilePath)); #if ZEN_PLATFORM_WINDOWS - Content.Attributes.push_back(GetFileAttributes(AssetFilePath)); + Content.Attributes.push_back(GetFileAttributesFromPath(AssetFilePath)); #endif // ZEN_PLATFORM_WINDOWS #if ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX Content.Attributes.push_back(GetFileMode(AssetFilePath)); @@ -3975,7 +3351,7 @@ namespace { const std::filesystem::path ManifestFilePath = (Path / ManifestPath).make_preferred(); Content.RawSizes.push_back(FileSizeFromPath(ManifestFilePath)); #if ZEN_PLATFORM_WINDOWS - Content.Attributes.push_back(GetFileAttributes(ManifestFilePath)); + Content.Attributes.push_back(GetFileAttributesFromPath(ManifestFilePath)); #endif // ZEN_PLATFORM_WINDOWS #if ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX Content.Attributes.push_back(GetFileMode(ManifestFilePath)); @@ -4853,7 +4229,7 @@ namespace { std::vector<std::string> Errors; auto IsAcceptedFolder = [ExcludeFolders = DefaultExcludeFolders](const std::string_view& RelativePath) -> bool { - for (const std::string_view& ExcludeFolder : ExcludeFolders) + for (const std::string& ExcludeFolder : ExcludeFolders) { if (RelativePath.starts_with(ExcludeFolder)) { @@ -5025,863 +4401,7 @@ namespace { } } - std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> GetRemainingChunkTargets( - std::span<const std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, - const ChunkedContentLookup& Lookup, - uint32_t ChunkIndex) - { - std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkSources = GetChunkSequenceLocations(Lookup, ChunkIndex); - std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs; - if (!ChunkSources.empty()) - { - ChunkTargetPtrs.reserve(ChunkSources.size()); - for (const ChunkedContentLookup::ChunkSequenceLocation& Source : ChunkSources) - { - if (SequenceIndexChunksLeftToWriteCounters[Source.SequenceIndex].load() > 0) - { - ChunkTargetPtrs.push_back(&Source); - } - } - } - return ChunkTargetPtrs; - }; - - uint64_t GetChunkWriteCount(std::span<const std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, - const ChunkedContentLookup& Lookup, - uint32_t ChunkIndex) - { - uint64_t WriteCount = 0; - std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkSources = GetChunkSequenceLocations(Lookup, ChunkIndex); - for (const ChunkedContentLookup::ChunkSequenceLocation& Source : ChunkSources) - { - if (SequenceIndexChunksLeftToWriteCounters[Source.SequenceIndex].load() > 0) - { - WriteCount++; - } - } - return WriteCount; - }; - - void FinalizeChunkSequence(const std::filesystem::path& TargetFolder, const IoHash& SequenceRawHash) - { - ZEN_TRACE_CPU("FinalizeChunkSequence"); - ZEN_ASSERT_SLOW(!IsFile(GetFinalChunkedSequenceFileName(TargetFolder, SequenceRawHash))); - std::error_code Ec; - RenameFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash), - GetFinalChunkedSequenceFileName(TargetFolder, SequenceRawHash), - Ec); - if (Ec) - { - throw std::system_error(Ec); - } - } - - void FinalizeChunkSequences(const std::filesystem::path& TargetFolder, - const ChunkedFolderContent& RemoteContent, - std::span<const uint32_t> RemoteSequenceIndexes) - { - ZEN_TRACE_CPU("FinalizeChunkSequences"); - for (uint32_t SequenceIndex : RemoteSequenceIndexes) - { - FinalizeChunkSequence(TargetFolder, RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); - } - } - - void VerifySequence(const std::filesystem::path& TargetFolder, - const ChunkedFolderContent& RemoteContent, - const ChunkedContentLookup& Lookup, - uint32_t RemoteSequenceIndex) - { - ZEN_TRACE_CPU("VerifySequence"); - const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; - { - ZEN_TRACE_CPU("HashSequence"); - const std::uint32_t RemotePathIndex = Lookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]; - const uint64_t ExpectedSize = RemoteContent.RawSizes[RemotePathIndex]; - IoBuffer VerifyBuffer = IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(TargetFolder, SequenceRawHash)); - const uint64_t VerifySize = VerifyBuffer.GetSize(); - if (VerifySize != ExpectedSize) - { - throw std::runtime_error(fmt::format("Written chunk sequence {} size {} does not match expected size {}", - SequenceRawHash, - VerifySize, - ExpectedSize)); - } - - const IoHash VerifyChunkHash = IoHash::HashBuffer(std::move(VerifyBuffer)); - if (VerifyChunkHash != SequenceRawHash) - { - throw std::runtime_error( - fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash)); - } - } - } - - void VerifyAndCompleteChunkSequencesAsync(const std::filesystem::path& TargetFolder, - const ChunkedFolderContent& RemoteContent, - const ChunkedContentLookup& Lookup, - std::span<const uint32_t> RemoteSequenceIndexes, - ParallelWork& Work, - WorkerThreadPool& VerifyPool) - { - if (RemoteSequenceIndexes.empty()) - { - return; - } - ZEN_TRACE_CPU("VerifyAndCompleteChunkSequence"); - for (uint32_t RemoteSequenceIndexOffset = 1; RemoteSequenceIndexOffset < RemoteSequenceIndexes.size(); RemoteSequenceIndexOffset++) - { - const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset]; - Work.ScheduleWork(VerifyPool, [&RemoteContent, &Lookup, TargetFolder, RemoteSequenceIndex](std::atomic<bool>&) { - if (!AbortFlag) - { - ZEN_TRACE_CPU("VerifyAndCompleteChunkSequenceAsync"); - VerifySequence(TargetFolder, RemoteContent, Lookup, RemoteSequenceIndex); - if (!AbortFlag) - { - const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; - FinalizeChunkSequence(TargetFolder, SequenceRawHash); - } - } - }); - } - const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[0]; - - VerifySequence(TargetFolder, RemoteContent, Lookup, RemoteSequenceIndex); - const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; - FinalizeChunkSequence(TargetFolder, SequenceRawHash); - } - - bool CompleteSequenceChunk(uint32_t RemoteSequenceIndex, std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters) - { - uint32_t PreviousValue = SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1); - ZEN_ASSERT(PreviousValue >= 1); - ZEN_ASSERT(PreviousValue != (uint32_t)-1); - return PreviousValue == 1; - } - - std::vector<uint32_t> CompleteChunkTargets(const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs, - std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters) - { - ZEN_TRACE_CPU("CompleteChunkTargets"); - - std::vector<uint32_t> CompletedSequenceIndexes; - for (const ChunkedContentLookup::ChunkSequenceLocation* Location : ChunkTargetPtrs) - { - const uint32_t RemoteSequenceIndex = Location->SequenceIndex; - if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) - { - CompletedSequenceIndexes.push_back(RemoteSequenceIndex); - } - } - return CompletedSequenceIndexes; - } - - void WriteSequenceChunk(const std::filesystem::path& TargetFolderPath, - const ChunkedFolderContent& RemoteContent, - BufferedWriteFileCache::Local& LocalWriter, - const CompositeBuffer& Chunk, - const uint32_t SequenceIndex, - const uint64_t FileOffset, - const uint32_t PathIndex, - DiskStatistics& DiskStats) - { - ZEN_TRACE_CPU("WriteSequenceChunk"); - - const uint64_t SequenceSize = RemoteContent.RawSizes[PathIndex]; - - auto OpenFile = [&](BasicFile& File) { - const std::filesystem::path FileName = - GetTempChunkedSequenceFileName(TargetFolderPath, RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); - File.Open(FileName, BasicFile::Mode::kWrite); - if (UseSparseFiles) - { - PrepareFileForScatteredWrite(File.Handle(), SequenceSize); - } - }; - - const uint64_t ChunkSize = Chunk.GetSize(); - ZEN_ASSERT(FileOffset + ChunkSize <= SequenceSize); - if (ChunkSize == SequenceSize) - { - BasicFile SingleChunkFile; - OpenFile(SingleChunkFile); - - DiskStats.CurrentOpenFileCount++; - auto _ = MakeGuard([&DiskStats]() { DiskStats.CurrentOpenFileCount--; }); - SingleChunkFile.Write(Chunk, FileOffset); - - DiskStats.WriteCount++; - DiskStats.WriteByteCount += ChunkSize; - } - else - { - const uint64_t MaxWriterBufferSize = 256u * 1025u; - - BufferedWriteFileCache::Local::Writer* Writer = LocalWriter.GetWriter(SequenceIndex); - if (Writer) - { - if ((!Writer->Writer) && (ChunkSize < MaxWriterBufferSize)) - { - Writer->Writer = std::make_unique<BasicFileWriter>(*Writer->File, Min(SequenceSize, MaxWriterBufferSize)); - } - Writer->Write(Chunk, FileOffset); - - DiskStats.WriteCount++; - DiskStats.WriteByteCount += ChunkSize; - } - else - { - Writer = LocalWriter.PutWriter(SequenceIndex, std::make_unique<BufferedWriteFileCache::Local::Writer>()); - - Writer->File = std::make_unique<BasicFile>(); - OpenFile(*Writer->File); - if (ChunkSize < MaxWriterBufferSize) - { - Writer->Writer = std::make_unique<BasicFileWriter>(*Writer->File, Min(SequenceSize, MaxWriterBufferSize)); - } - Writer->Write(Chunk, FileOffset); - - DiskStats.WriteCount++; - DiskStats.WriteByteCount += ChunkSize; - } - } - } - - struct BlockWriteOps - { - std::vector<CompositeBuffer> ChunkBuffers; - struct WriteOpData - { - const ChunkedContentLookup::ChunkSequenceLocation* Target = nullptr; - size_t ChunkBufferIndex = (size_t)-1; - }; - std::vector<WriteOpData> WriteOps; - }; - - void WriteBlockChunkOps(const std::filesystem::path& CacheFolderPath, - const ChunkedFolderContent& RemoteContent, - const ChunkedContentLookup& Lookup, - std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, - const BlockWriteOps& Ops, - BufferedWriteFileCache& WriteCache, - ParallelWork& Work, - WorkerThreadPool& VerifyPool, - DiskStatistics& DiskStats) - { - ZEN_TRACE_CPU("WriteBlockChunkOps"); - - { - BufferedWriteFileCache::Local LocalWriter(WriteCache); - for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps) - { - if (AbortFlag) - { - break; - } - const CompositeBuffer& Chunk = Ops.ChunkBuffers[WriteOp.ChunkBufferIndex]; - const uint32_t SequenceIndex = WriteOp.Target->SequenceIndex; - ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() <= - RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]); - ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() > 0); - const uint64_t FileOffset = WriteOp.Target->Offset; - const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex]; - - WriteSequenceChunk(CacheFolderPath, RemoteContent, LocalWriter, Chunk, SequenceIndex, FileOffset, PathIndex, DiskStats); - } - } - if (!AbortFlag) - { - // Write tracking, updating this must be done without any files open (BufferedWriteFileCache::Local) - std::vector<uint32_t> CompletedChunkSequences; - for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps) - { - const uint32_t RemoteSequenceIndex = WriteOp.Target->SequenceIndex; - if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) - { - CompletedChunkSequences.push_back(RemoteSequenceIndex); - } - } - WriteCache.Close(CompletedChunkSequences); - VerifyAndCompleteChunkSequencesAsync(CacheFolderPath, RemoteContent, Lookup, CompletedChunkSequences, Work, VerifyPool); - } - } - - IoBuffer MakeBufferMemoryBased(const CompositeBuffer& PartialBlockBuffer) - { - ZEN_TRACE_CPU("MakeBufferMemoryBased"); - IoBuffer BlockMemoryBuffer; - std::span<const SharedBuffer> Segments = PartialBlockBuffer.GetSegments(); - if (Segments.size() == 1) - { - IoBufferFileReference FileRef = {}; - if (PartialBlockBuffer.GetSegments().front().AsIoBuffer().GetFileReference(FileRef)) - { - BlockMemoryBuffer = UniqueBuffer::Alloc(FileRef.FileChunkSize).MoveToShared().AsIoBuffer(); - BasicFile Reader; - Reader.Attach(FileRef.FileHandle); - auto _ = MakeGuard([&Reader]() { Reader.Detach(); }); - MutableMemoryView ReadMem = BlockMemoryBuffer.GetMutableView(); - Reader.Read(ReadMem.GetData(), FileRef.FileChunkSize, FileRef.FileChunkOffset); - return BlockMemoryBuffer; - } - else - { - return PartialBlockBuffer.GetSegments().front().AsIoBuffer(); - } - } - else - { - // Not a homogenous memory buffer, read all to memory - - BlockMemoryBuffer = UniqueBuffer::Alloc(PartialBlockBuffer.GetSize()).MoveToShared().AsIoBuffer(); - MutableMemoryView ReadMem = BlockMemoryBuffer.GetMutableView(); - for (const SharedBuffer& Segment : Segments) - { - IoBufferFileReference FileRef = {}; - if (Segment.AsIoBuffer().GetFileReference(FileRef)) - { - BasicFile Reader; - Reader.Attach(FileRef.FileHandle); - Reader.Read(ReadMem.GetData(), FileRef.FileChunkSize, FileRef.FileChunkOffset); - Reader.Detach(); - ReadMem = ReadMem.Mid(FileRef.FileChunkSize); - } - else - { - ReadMem = ReadMem.CopyFrom(Segment.AsIoBuffer().GetView()); - } - } - return BlockMemoryBuffer; - } - } - - bool GetBlockWriteOps(const ChunkedFolderContent& RemoteContent, - const ChunkedContentLookup& Lookup, - std::span<const IoHash> ChunkRawHashes, - std::span<const uint32_t> ChunkCompressedLengths, - std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, - std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, - const MemoryView BlockView, - uint32_t FirstIncludedBlockChunkIndex, - uint32_t LastIncludedBlockChunkIndex, - BlockWriteOps& OutOps) - { - ZEN_TRACE_CPU("GetBlockWriteOps"); - uint32_t OffsetInBlock = 0; - for (uint32_t ChunkBlockIndex = FirstIncludedBlockChunkIndex; ChunkBlockIndex <= LastIncludedBlockChunkIndex; ChunkBlockIndex++) - { - const uint32_t ChunkCompressedSize = ChunkCompressedLengths[ChunkBlockIndex]; - const IoHash& ChunkHash = ChunkRawHashes[ChunkBlockIndex]; - if (auto It = Lookup.ChunkHashToChunkIndex.find(ChunkHash); It != Lookup.ChunkHashToChunkIndex.end()) - { - const uint32_t ChunkIndex = It->second; - std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = - GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, Lookup, ChunkIndex); - - if (!ChunkTargetPtrs.empty()) - { - bool NeedsWrite = true; - if (RemoteChunkIndexNeedsCopyFromSourceFlags[ChunkIndex].compare_exchange_strong(NeedsWrite, false)) - { - MemoryView ChunkMemoryView = BlockView.Mid(OffsetInBlock, ChunkCompressedSize); - IoHash VerifyChunkHash; - uint64_t VerifyChunkSize; - CompressedBuffer CompressedChunk = - CompressedBuffer::FromCompressed(SharedBuffer::MakeView(ChunkMemoryView), VerifyChunkHash, VerifyChunkSize); - ZEN_ASSERT(CompressedChunk); - ZEN_ASSERT(VerifyChunkHash == ChunkHash); - ZEN_ASSERT(VerifyChunkSize == RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]); - - OodleCompressor ChunkCompressor; - OodleCompressionLevel ChunkCompressionLevel; - uint64_t ChunkBlockSize; - - bool GetCompressParametersSuccess = - CompressedChunk.TryGetCompressParameters(ChunkCompressor, ChunkCompressionLevel, ChunkBlockSize); - ZEN_ASSERT(GetCompressParametersSuccess); - - IoBuffer Decompressed; - if (ChunkCompressionLevel == OodleCompressionLevel::None) - { - MemoryView ChunkDecompressedMemoryView = ChunkMemoryView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder()); - Decompressed = - IoBuffer(IoBuffer::Wrap, ChunkDecompressedMemoryView.GetData(), ChunkDecompressedMemoryView.GetSize()); - } - else - { - Decompressed = CompressedChunk.Decompress().AsIoBuffer(); - } - ZEN_ASSERT(Decompressed.GetSize() == RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]); - ZEN_ASSERT_SLOW(ChunkHash == IoHash::HashBuffer(Decompressed)); - for (const ChunkedContentLookup::ChunkSequenceLocation* Target : ChunkTargetPtrs) - { - OutOps.WriteOps.push_back( - BlockWriteOps::WriteOpData{.Target = Target, .ChunkBufferIndex = OutOps.ChunkBuffers.size()}); - } - OutOps.ChunkBuffers.emplace_back(std::move(Decompressed)); - } - } - } - - OffsetInBlock += ChunkCompressedSize; - } - { - ZEN_TRACE_CPU("GetBlockWriteOps_sort"); - std::sort(OutOps.WriteOps.begin(), - OutOps.WriteOps.end(), - [](const BlockWriteOps::WriteOpData& Lhs, const BlockWriteOps::WriteOpData& Rhs) { - if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex) - { - return true; - } - if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex) - { - return false; - } - return Lhs.Target->Offset < Rhs.Target->Offset; - }); - } - return true; - } - - bool WriteBlockToDisk(const std::filesystem::path& CacheFolderPath, - const ChunkedFolderContent& RemoteContent, - const ChunkBlockDescription& BlockDescription, - std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, - ParallelWork& Work, - WorkerThreadPool& VerifyPool, - CompositeBuffer&& BlockBuffer, - const ChunkedContentLookup& Lookup, - std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, - BufferedWriteFileCache& WriteCache, - DiskStatistics& DiskStats) - { - ZEN_TRACE_CPU("WriteBlockToDisk"); - - IoBuffer BlockMemoryBuffer = MakeBufferMemoryBased(BlockBuffer); - const MemoryView BlockView = BlockMemoryBuffer.GetView(); - - BlockWriteOps Ops; - if ((BlockDescription.HeaderSize == 0) || BlockDescription.ChunkCompressedLengths.empty()) - { - ZEN_TRACE_CPU("WriteBlockToDisk_Legacy"); - - uint64_t HeaderSize; - const std::vector<uint32_t> ChunkCompressedLengths = - ReadChunkBlockHeader(BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder()), HeaderSize); - - if (GetBlockWriteOps(RemoteContent, - Lookup, - BlockDescription.ChunkRawHashes, - ChunkCompressedLengths, - SequenceIndexChunksLeftToWriteCounters, - RemoteChunkIndexNeedsCopyFromSourceFlags, - BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder() + HeaderSize), - 0, - gsl::narrow<uint32_t>(BlockDescription.ChunkRawHashes.size() - 1), - Ops)) - { - WriteBlockChunkOps(CacheFolderPath, - RemoteContent, - Lookup, - SequenceIndexChunksLeftToWriteCounters, - Ops, - WriteCache, - Work, - VerifyPool, - DiskStats); - return true; - } - return false; - } - - if (GetBlockWriteOps(RemoteContent, - Lookup, - BlockDescription.ChunkRawHashes, - BlockDescription.ChunkCompressedLengths, - SequenceIndexChunksLeftToWriteCounters, - RemoteChunkIndexNeedsCopyFromSourceFlags, - BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize), - 0, - gsl::narrow<uint32_t>(BlockDescription.ChunkRawHashes.size() - 1), - Ops)) - { - WriteBlockChunkOps(CacheFolderPath, - RemoteContent, - Lookup, - SequenceIndexChunksLeftToWriteCounters, - Ops, - WriteCache, - Work, - VerifyPool, - DiskStats); - return true; - } - return false; - } - - bool WritePartialBlockToDisk(const std::filesystem::path& CacheFolderPath, - const ChunkedFolderContent& RemoteContent, - const ChunkBlockDescription& BlockDescription, - std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, - ParallelWork& Work, - WorkerThreadPool& VerifyPool, - CompositeBuffer&& PartialBlockBuffer, - uint32_t FirstIncludedBlockChunkIndex, - uint32_t LastIncludedBlockChunkIndex, - const ChunkedContentLookup& Lookup, - std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags, - BufferedWriteFileCache& WriteCache, - DiskStatistics& DiskStats) - { - ZEN_TRACE_CPU("WritePartialBlockToDisk"); - - IoBuffer BlockMemoryBuffer = MakeBufferMemoryBased(PartialBlockBuffer); - const MemoryView BlockView = BlockMemoryBuffer.GetView(); - - BlockWriteOps Ops; - if (GetBlockWriteOps(RemoteContent, - Lookup, - BlockDescription.ChunkRawHashes, - BlockDescription.ChunkCompressedLengths, - SequenceIndexChunksLeftToWriteCounters, - RemoteChunkIndexNeedsCopyFromSourceFlags, - BlockView, - FirstIncludedBlockChunkIndex, - LastIncludedBlockChunkIndex, - Ops)) - { - WriteBlockChunkOps(CacheFolderPath, - RemoteContent, - Lookup, - SequenceIndexChunksLeftToWriteCounters, - Ops, - WriteCache, - Work, - VerifyPool, - DiskStats); - return true; - } - else - { - return false; - } - } - - bool IsSingleFileChunk(const ChunkedFolderContent& RemoteContent, - const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> Locations) - { - if (Locations.size() == 1) - { - const uint32_t FirstSequenceIndex = Locations[0]->SequenceIndex; - if (RemoteContent.ChunkedContent.ChunkCounts[FirstSequenceIndex] == 1) - { - ZEN_ASSERT_SLOW(Locations[0]->Offset == 0); - return true; - } - } - return false; - } - - void StreamDecompress(const std::filesystem::path& CacheFolderPath, - const IoHash& SequenceRawHash, - CompositeBuffer&& CompressedPart, - DiskStatistics& DiskStats) - { - ZEN_TRACE_CPU("StreamDecompress"); - const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash); - TemporaryFile DecompressedTemp; - std::error_code Ec; - DecompressedTemp.CreateTemporary(TempChunkSequenceFileName.parent_path(), Ec); - if (Ec) - { - throw std::runtime_error( - fmt::format("Failed creating temporary file for decompressing large blob {}. Reason: {}", SequenceRawHash, Ec.message())); - } - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompressedPart, RawHash, RawSize); - if (!Compressed) - { - throw std::runtime_error(fmt::format("Failed to parse header of compressed large blob {}", SequenceRawHash)); - } - if (RawHash != SequenceRawHash) - { - throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, SequenceRawHash)); - } - PrepareFileForScatteredWrite(DecompressedTemp.Handle(), RawSize); - - IoHashStream Hash; - bool CouldDecompress = Compressed.DecompressToStream( - 0, - (uint64_t)-1, - [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { - ZEN_UNUSED(SourceOffset); - ZEN_TRACE_CPU("StreamDecompress_Write"); - DiskStats.ReadByteCount += SourceSize; - if (!AbortFlag) - { - for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) - { - Hash.Append(Segment.GetView()); - DecompressedTemp.Write(Segment, Offset); - Offset += Segment.GetSize(); - DiskStats.WriteByteCount += Segment.GetSize(); - DiskStats.WriteCount++; - } - return true; - } - return false; - }); - - if (AbortFlag) - { - return; - } - - if (!CouldDecompress) - { - throw std::runtime_error(fmt::format("Failed to decompress large blob {}", SequenceRawHash)); - } - const IoHash VerifyHash = Hash.GetHash(); - if (VerifyHash != SequenceRawHash) - { - throw std::runtime_error( - fmt::format("Decompressed blob payload hash {} does not match expected hash {}", VerifyHash, SequenceRawHash)); - } - DecompressedTemp.MoveTemporaryIntoPlace(TempChunkSequenceFileName, Ec); - if (Ec) - { - throw std::runtime_error( - fmt::format("Failed moving temporary file for decompressing large blob {}. Reason: {}", SequenceRawHash, Ec.message())); - } - // WriteChunkStats.ChunkCountWritten++; - } - - bool WriteCompressedChunk(const std::filesystem::path& TargetFolder, - const ChunkedFolderContent& RemoteContent, - const ChunkedContentLookup& RemoteLookup, - const IoHash& ChunkHash, - const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs, - BufferedWriteFileCache& WriteCache, - IoBuffer&& CompressedPart, - DiskStatistics& DiskStats) - { - ZEN_TRACE_CPU("WriteCompressedChunk"); - auto ChunkHashToChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); - ZEN_ASSERT(ChunkHashToChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end()); - if (IsSingleFileChunk(RemoteContent, ChunkTargetPtrs)) - { - const std::uint32_t SequenceIndex = ChunkTargetPtrs.front()->SequenceIndex; - const IoHash& SequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]; - StreamDecompress(TargetFolder, SequenceRawHash, CompositeBuffer(std::move(CompressedPart)), DiskStats); - return false; - } - else - { - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompositeBuffer(std::move(CompressedPart)), RawHash, RawSize); - if (!Compressed) - { - throw std::runtime_error(fmt::format("Failed to parse header of compressed large blob {}", ChunkHash)); - } - if (RawHash != ChunkHash) - { - throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, ChunkHash)); - } - - BufferedWriteFileCache::Local LocalWriter(WriteCache); - - IoHashStream Hash; - bool CouldDecompress = Compressed.DecompressToStream( - 0, - (uint64_t)-1, - [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { - ZEN_UNUSED(SourceOffset); - ZEN_TRACE_CPU("StreamDecompress_Write"); - DiskStats.ReadByteCount += SourceSize; - if (!AbortFlag) - { - for (const ChunkedContentLookup::ChunkSequenceLocation* TargetPtr : ChunkTargetPtrs) - { - const auto& Target = *TargetPtr; - const uint64_t FileOffset = Target.Offset + Offset; - const uint32_t SequenceIndex = Target.SequenceIndex; - const uint32_t PathIndex = RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; - - WriteSequenceChunk(TargetFolder, - RemoteContent, - LocalWriter, - RangeBuffer, - SequenceIndex, - FileOffset, - PathIndex, - DiskStats); - } - - return true; - } - return false; - }); - - if (AbortFlag) - { - return false; - } - - if (!CouldDecompress) - { - throw std::runtime_error(fmt::format("Failed to decompress large chunk {}", ChunkHash)); - } - - return true; - } - } - - void AsyncWriteDownloadedChunk(const std::filesystem::path& ZenFolderPath, - const ChunkedFolderContent& RemoteContent, - const ChunkedContentLookup& RemoteLookup, - uint32_t RemoteChunkIndex, - std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>&& ChunkTargetPtrs, - BufferedWriteFileCache& WriteCache, - ParallelWork& Work, - WorkerThreadPool& WritePool, - IoBuffer&& Payload, - std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters, - std::atomic<uint64_t>& WritePartsComplete, - const uint64_t TotalPartWriteCount, - FilteredRate& FilteredWrittenBytesPerSecond, - DiskStatistics& DiskStats) - { - ZEN_TRACE_CPU("AsyncWriteDownloadedChunk"); - - const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; - - const uint64_t Size = Payload.GetSize(); - - std::filesystem::path CompressedChunkPath; - - // Check if the dowloaded chunk is file based and we can move it directly without rewriting it - { - IoBufferFileReference FileRef; - if (Payload.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == Size)) - { - ZEN_TRACE_CPU("MoveTempChunk"); - std::error_code Ec; - std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); - if (!Ec) - { - Payload.SetDeleteOnClose(false); - Payload = {}; - CompressedChunkPath = ZenTempDownloadFolderPath(ZenFolderPath) / ChunkHash.ToHexString(); - RenameFile(TempBlobPath, CompressedChunkPath, Ec); - if (Ec) - { - CompressedChunkPath = std::filesystem::path{}; - - // Re-open the temp file again - BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); - Payload = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, Size, true); - Payload.SetDeleteOnClose(true); - } - } - } - } - - if (CompressedChunkPath.empty() && (Size > 512u * 1024u)) - { - ZEN_TRACE_CPU("WriteTempChunk"); - // Could not be moved and rather large, lets store it on disk - CompressedChunkPath = ZenTempDownloadFolderPath(ZenFolderPath) / ChunkHash.ToHexString(); - TemporaryFile::SafeWriteFile(CompressedChunkPath, Payload); - Payload = {}; - } - - Work.ScheduleWork( - WritePool, - [&ZenFolderPath, - &RemoteContent, - &RemoteLookup, - SequenceIndexChunksLeftToWriteCounters, - &Work, - &WritePool, - CompressedChunkPath, - RemoteChunkIndex, - TotalPartWriteCount, - &WriteCache, - &DiskStats, - &WritePartsComplete, - &FilteredWrittenBytesPerSecond, - ChunkTargetPtrs = std::move(ChunkTargetPtrs), - CompressedPart = std::move(Payload)](std::atomic<bool>&) mutable { - ZEN_TRACE_CPU("UpdateFolder_WriteChunk"); - - if (!AbortFlag) - { - FilteredWrittenBytesPerSecond.Start(); - - const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; - if (CompressedChunkPath.empty()) - { - ZEN_ASSERT(CompressedPart); - } - else - { - ZEN_ASSERT(!CompressedPart); - CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); - if (!CompressedPart) - { - throw std::runtime_error( - fmt::format("Could not open dowloaded compressed chunk {} from {}", ChunkHash, CompressedChunkPath)); - } - } - - std::filesystem::path TargetFolder = ZenTempCacheFolderPath(ZenFolderPath); - - bool NeedHashVerify = WriteCompressedChunk(TargetFolder, - RemoteContent, - RemoteLookup, - ChunkHash, - ChunkTargetPtrs, - WriteCache, - std::move(CompressedPart), - DiskStats); - if (!AbortFlag) - { - WritePartsComplete++; - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); - } - - if (!CompressedChunkPath.empty()) - { - TryRemoveFile(CompressedChunkPath); - } - - std::vector<uint32_t> CompletedSequences = - CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); - WriteCache.Close(CompletedSequences); - if (NeedHashVerify) - { - VerifyAndCompleteChunkSequencesAsync(TargetFolder, - RemoteContent, - RemoteLookup, - CompletedSequences, - Work, - WritePool); - } - else - { - FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences); - } - } - } - }); - }; - + // TODO: Move out of this file and make it one with implementation in buildstorageoperations.cpp bool ReadStateFile(const std::filesystem::path& StateFilePath, FolderContent& OutLocalFolderState, ChunkedFolderContent& OutLocalContent) @@ -6008,2859 +4528,6 @@ namespace { return Result; } - enum EPartialBlockRequestMode - { - Off, - ZenCacheOnly, - Mixed, - All, - Invalid - }; - static EPartialBlockRequestMode PartialBlockRequestModeFromString(const std::string_view ModeString) - { - switch (HashStringAsLowerDjb2(ModeString)) - { - case HashStringDjb2("false"): - return EPartialBlockRequestMode::Off; - case HashStringDjb2("zencacheonly"): - return EPartialBlockRequestMode::ZenCacheOnly; - case HashStringDjb2("mixed"): - return EPartialBlockRequestMode::Mixed; - case HashStringDjb2("true"): - return EPartialBlockRequestMode::All; - default: - return EPartialBlockRequestMode::Invalid; - } - } - - struct UpdateOptions - { - std::filesystem::path SystemRootDir; - std::filesystem::path ZenFolderPath; - std::uint64_t LargeAttachmentSize = DefaultPreferredMultipartChunkSize * 4u; - std::uint64_t PreferredMultipartChunkSize = DefaultPreferredMultipartChunkSize; - EPartialBlockRequestMode PartialBlockRequestMode = EPartialBlockRequestMode::Mixed; - bool WipeTargetFolder = false; - bool PrimeCacheOnly = false; - bool EnableOtherDownloadsScavenging = true; - bool EnableTargetFolderScavenging = true; - }; - - void UpdateFolder(StorageInstance& Storage, - const Oid& BuildId, - const std::filesystem::path& Path, - const ChunkedFolderContent& LocalContent, - const ChunkedFolderContent& RemoteContent, - const std::vector<ChunkBlockDescription>& BlockDescriptions, - const std::vector<IoHash>& LooseChunkHashes, - const UpdateOptions& Options, - FolderContent& OutLocalFolderState, - DiskStatistics& DiskStats, - CacheMappingStatistics& CacheMappingStats, - DownloadStatistics& DownloadStats, - WriteChunkStatistics& WriteChunkStats, - RebuildFolderStateStatistics& RebuildFolderStateStats) - { - ZEN_TRACE_CPU("UpdateFolder"); - - ZEN_ASSERT((!Options.PrimeCacheOnly) || - (Options.PrimeCacheOnly && (Options.PartialBlockRequestMode == EPartialBlockRequestMode::Off))); - - Stopwatch IndexTimer; - - const ChunkedContentLookup LocalLookup = BuildChunkedContentLookup(LocalContent); - - const ChunkedContentLookup RemoteLookup = BuildChunkedContentLookup(RemoteContent); - - if (!IsQuiet) - { - ZEN_CONSOLE("Indexed local and remote content in {}", NiceTimeSpanMs(IndexTimer.GetElapsedTimeMs())); - } - - const std::filesystem::path CacheFolderPath = ZenTempCacheFolderPath(Options.ZenFolderPath); - - Stopwatch CacheMappingTimer; - - std::vector<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters(RemoteContent.ChunkedContent.SequenceRawHashes.size()); - std::vector<bool> RemoteChunkIndexNeedsCopyFromLocalFileFlags(RemoteContent.ChunkedContent.ChunkHashes.size()); - std::vector<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags(RemoteContent.ChunkedContent.ChunkHashes.size()); - - tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedChunkHashesFound; - tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedSequenceHashesFound; - if (!Options.PrimeCacheOnly) - { - ZEN_TRACE_CPU("UpdateFolder_CheckChunkCache"); - - Stopwatch CacheTimer; - - DirectoryContent CacheDirContent; - GetDirectoryContent(CacheFolderPath, - DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeFileSizes, - CacheDirContent); - for (size_t Index = 0; Index < CacheDirContent.Files.size(); Index++) - { - if (Options.EnableTargetFolderScavenging) - { - IoHash FileHash; - if (IoHash::TryParse(CacheDirContent.Files[Index].filename().string(), FileHash)) - { - if (auto ChunkIt = RemoteLookup.ChunkHashToChunkIndex.find(FileHash); - ChunkIt != RemoteLookup.ChunkHashToChunkIndex.end()) - { - const uint32_t ChunkIndex = ChunkIt->second; - const uint64_t ChunkSize = RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]; - if (ChunkSize == CacheDirContent.FileSizes[Index]) - { - CachedChunkHashesFound.insert({FileHash, ChunkIndex}); - CacheMappingStats.CacheChunkCount++; - CacheMappingStats.CacheChunkByteCount += ChunkSize; - continue; - } - } - else if (auto SequenceIt = RemoteLookup.RawHashToSequenceIndex.find(FileHash); - SequenceIt != RemoteLookup.RawHashToSequenceIndex.end()) - { - const uint32_t SequenceIndex = SequenceIt->second; - const uint32_t PathIndex = RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; - const uint64_t SequenceSize = RemoteContent.RawSizes[PathIndex]; - if (SequenceSize == CacheDirContent.FileSizes[Index]) - { - CachedSequenceHashesFound.insert({FileHash, SequenceIndex}); - CacheMappingStats.CacheSequenceHashesCount++; - CacheMappingStats.CacheSequenceHashesByteCount += SequenceSize; - - const std::filesystem::path CacheFilePath = - GetFinalChunkedSequenceFileName(CacheFolderPath, - RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]); - ZEN_ASSERT_SLOW(IsFile(CacheFilePath)); - - continue; - } - } - } - } - TryRemoveFile(CacheDirContent.Files[Index]); - } - CacheMappingStats.CacheScanElapsedWallTimeUs += CacheTimer.GetElapsedTimeUs(); - } - - tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedBlocksFound; - if (!Options.PrimeCacheOnly) - { - ZEN_TRACE_CPU("UpdateFolder_CheckBlockCache"); - - Stopwatch CacheTimer; - - tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> AllBlockSizes; - AllBlockSizes.reserve(BlockDescriptions.size()); - for (uint32_t BlockIndex = 0; BlockIndex < BlockDescriptions.size(); BlockIndex++) - { - const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; - AllBlockSizes.insert({BlockDescription.BlockHash, BlockIndex}); - } - - DirectoryContent BlockDirContent; - GetDirectoryContent(ZenTempBlockFolderPath(Options.ZenFolderPath), - DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeFileSizes, - BlockDirContent); - CachedBlocksFound.reserve(BlockDirContent.Files.size()); - for (size_t Index = 0; Index < BlockDirContent.Files.size(); Index++) - { - if (Options.EnableTargetFolderScavenging) - { - IoHash FileHash; - if (IoHash::TryParse(BlockDirContent.Files[Index].filename().string(), FileHash)) - { - if (auto BlockIt = AllBlockSizes.find(FileHash); BlockIt != AllBlockSizes.end()) - { - const uint32_t BlockIndex = BlockIt->second; - const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; - uint64_t BlockSize = CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize; - for (uint64_t ChunkSize : BlockDescription.ChunkCompressedLengths) - { - BlockSize += ChunkSize; - } - - if (BlockSize == BlockDirContent.FileSizes[Index]) - { - CachedBlocksFound.insert({FileHash, BlockIndex}); - CacheMappingStats.CacheBlockCount++; - CacheMappingStats.CacheBlocksByteCount += BlockSize; - continue; - } - } - } - } - TryRemoveFile(BlockDirContent.Files[Index]); - } - - CacheMappingStats.CacheScanElapsedWallTimeUs += CacheTimer.GetElapsedTimeUs(); - } - - std::vector<uint32_t> LocalPathIndexesMatchingSequenceIndexes; - tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> SequenceIndexesLeftToFindToRemoteIndex; - - if (!Options.PrimeCacheOnly && Options.EnableTargetFolderScavenging) - { - // Pick up all whole files we can use from current local state - ZEN_TRACE_CPU("UpdateFolder_GetLocalSequences"); - - Stopwatch LocalTimer; - - for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < RemoteContent.ChunkedContent.SequenceRawHashes.size(); - RemoteSequenceIndex++) - { - const IoHash& RemoteSequenceRawHash = RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]; - const uint32_t RemotePathIndex = GetFirstPathIndexForSeqeuenceIndex(RemoteLookup, RemoteSequenceIndex); - const uint64_t RemoteRawSize = RemoteContent.RawSizes[RemotePathIndex]; - if (auto CacheSequenceIt = CachedSequenceHashesFound.find(RemoteSequenceRawHash); - CacheSequenceIt != CachedSequenceHashesFound.end()) - { - const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RemoteSequenceRawHash); - ZEN_ASSERT_SLOW(IsFile(CacheFilePath)); - ZEN_CONSOLE_VERBOSE("Found sequence {} at {} ({})", RemoteSequenceRawHash, CacheFilePath, NiceBytes(RemoteRawSize)); - } - else if (auto CacheChunkIt = CachedChunkHashesFound.find(RemoteSequenceRawHash); - CacheChunkIt != CachedChunkHashesFound.end()) - { - const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RemoteSequenceRawHash); - ZEN_ASSERT_SLOW(IsFile(CacheFilePath)); - ZEN_CONSOLE_VERBOSE("Found chunk {} at {} ({})", RemoteSequenceRawHash, CacheFilePath, NiceBytes(RemoteRawSize)); - } - else if (auto It = LocalLookup.RawHashToSequenceIndex.find(RemoteSequenceRawHash); - It != LocalLookup.RawHashToSequenceIndex.end()) - { - const uint32_t LocalSequenceIndex = It->second; - const uint32_t LocalPathIndex = GetFirstPathIndexForSeqeuenceIndex(LocalLookup, LocalSequenceIndex); - const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred(); - ZEN_ASSERT_SLOW(IsFile(LocalFilePath)); - LocalPathIndexesMatchingSequenceIndexes.push_back(LocalPathIndex); - CacheMappingStats.LocalPathsMatchingSequencesCount++; - CacheMappingStats.LocalPathsMatchingSequencesByteCount += RemoteRawSize; - ZEN_CONSOLE_VERBOSE("Found sequence {} at {} ({})", RemoteSequenceRawHash, LocalFilePath, NiceBytes(RemoteRawSize)); - } - else - { - // We must write the sequence - const uint32_t ChunkCount = RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]; - SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = ChunkCount; - SequenceIndexesLeftToFindToRemoteIndex.insert({RemoteSequenceRawHash, RemoteSequenceIndex}); - } - } - - CacheMappingStats.LocalScanElapsedWallTimeUs += LocalTimer.GetElapsedTimeUs(); - } - else - { - for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < RemoteContent.ChunkedContent.SequenceRawHashes.size(); - RemoteSequenceIndex++) - { - const uint32_t ChunkCount = RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]; - SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = ChunkCount; - } - } - - std::vector<ChunkedFolderContent> ScavengedContents; - std::vector<ChunkedContentLookup> ScavengedLookups; - std::vector<std::filesystem::path> ScavengedPaths; - - struct ScavengeCopyOperation - { - uint32_t ScavengedContentIndex = (uint32_t)-1; - uint32_t ScavengedPathIndex = (uint32_t)-1; - uint32_t RemoteSequenceIndex = (uint32_t)-1; - uint64_t RawSize = (uint32_t)-1; - }; - - std::vector<ScavengeCopyOperation> ScavengeCopyOperations; - uint64_t ScavengedPathsCount = 0; - - if (!Options.PrimeCacheOnly && Options.EnableOtherDownloadsScavenging) - { - ZEN_TRACE_CPU("UpdateFolder_GetScavengedSequences"); - - Stopwatch ScavengeTimer; - - if (!SequenceIndexesLeftToFindToRemoteIndex.empty()) - { - std::vector<ScavengeSource> ScavengeSources = GetDownloadedStatePaths(Options.SystemRootDir); - auto EraseIt = std::remove_if(ScavengeSources.begin(), ScavengeSources.end(), [&Path](const ScavengeSource& Source) { - return Source.Path == Path; - }); - ScavengeSources.erase(EraseIt, ScavengeSources.end()); - - const size_t ScavengePathCount = ScavengeSources.size(); - - ScavengedContents.resize(ScavengePathCount); - ScavengedLookups.resize(ScavengePathCount); - ScavengedPaths.resize(ScavengePathCount); - - ProgressBar ScavengeProgressBar(ProgressMode, "Scavenging"); - ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - - std::atomic<uint64_t> PathsFound(0); - std::atomic<uint64_t> ChunksFound(0); - std::atomic<uint64_t> PathsScavenged(0); - - for (size_t ScavengeIndex = 0; ScavengeIndex < ScavengePathCount; ScavengeIndex++) - { - Work.ScheduleWork( - GetIOWorkerPool(), - [&RemoteLookup, - &ScavengeSources, - &ScavengedContents, - &ScavengedPaths, - &ScavengedLookups, - &PathsFound, - &ChunksFound, - &PathsScavenged, - ScavengeIndex](std::atomic<bool>&) { - if (!AbortFlag) - { - const ScavengeSource& Source = ScavengeSources[ScavengeIndex]; - - ChunkedFolderContent& ScavengedLocalContent = ScavengedContents[ScavengeIndex]; - std::filesystem::path& ScavengePath = ScavengedPaths[ScavengeIndex]; - - FolderContent LocalFolderState; - if (ReadStateFile(Source.StateFilePath, LocalFolderState, ScavengedLocalContent)) - { - if (IsDir(Source.Path)) - { - ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengeIndex]; - ScavengedLookup = BuildChunkedContentLookup(ScavengedLocalContent); - - std::vector<uint32_t> PathIndexesToScavange; - uint32_t ScavengedStatePathCount = gsl::narrow<uint32_t>(ScavengedLocalContent.Paths.size()); - PathIndexesToScavange.reserve(ScavengedStatePathCount); - for (uint32_t ScavengedStatePathIndex = 0; ScavengedStatePathIndex < ScavengedStatePathCount; - ScavengedStatePathIndex++) - { - const IoHash& SequenceHash = ScavengedLocalContent.RawHashes[ScavengedStatePathIndex]; - if (auto ScavengeSequenceIt = ScavengedLookup.RawHashToSequenceIndex.find(SequenceHash); - ScavengeSequenceIt != ScavengedLookup.RawHashToSequenceIndex.end()) - { - const uint32_t ScavengeSequenceIndex = ScavengeSequenceIt->second; - if (RemoteLookup.RawHashToSequenceIndex.contains(SequenceHash)) - { - PathIndexesToScavange.push_back(ScavengedStatePathIndex); - } - else - { - const uint32_t ScavengeChunkCount = - ScavengedLocalContent.ChunkedContent.ChunkCounts[ScavengeSequenceIndex]; - for (uint32_t ScavengeChunkIndexOffset = 0; - ScavengeChunkIndexOffset < ScavengeChunkCount; - ScavengeChunkIndexOffset++) - { - const size_t ScavengeChunkOrderIndex = - ScavengedLookup.ChunkSequenceLocationOffset[ScavengeSequenceIndex] + - ScavengeChunkIndexOffset; - const uint32_t ScavengeChunkIndex = - ScavengedLocalContent.ChunkedContent.ChunkOrders[ScavengeChunkOrderIndex]; - const IoHash& ScavengeChunkHash = - ScavengedLocalContent.ChunkedContent.ChunkHashes[ScavengeChunkIndex]; - if (RemoteLookup.ChunkHashToChunkIndex.contains(ScavengeChunkHash)) - { - PathIndexesToScavange.push_back(ScavengedStatePathIndex); - break; - } - } - } - } - } - - if (!PathIndexesToScavange.empty()) - { - std::vector<std::filesystem::path> PathsToScavenge; - PathsToScavenge.reserve(PathIndexesToScavange.size()); - for (uint32_t ScavengedStatePathIndex : PathIndexesToScavange) - { - PathsToScavenge.push_back(ScavengedLocalContent.Paths[ScavengedStatePathIndex]); - } - - GetFolderContentStatistics ScavengedFolderScanStats; - - FolderContent ValidFolderContent = - GetValidFolderContent(ScavengedFolderScanStats, Source.Path, PathsToScavenge, {}); - - if (!LocalFolderState.AreKnownFilesEqual(ValidFolderContent)) - { - std::vector<std::filesystem::path> DeletedPaths; - FolderContent UpdatedContent = - GetUpdatedContent(LocalFolderState, ValidFolderContent, DeletedPaths); - - // If the files are modified since the state was saved we ignore the files since we don't - // want to incur the cost of scanning/hashing scavenged files - DeletedPaths.insert(DeletedPaths.end(), - UpdatedContent.Paths.begin(), - UpdatedContent.Paths.end()); - if (!DeletedPaths.empty()) - { - ScavengedLocalContent = - DeletePathsFromChunkedContent(ScavengedLocalContent, ScavengedLookup, DeletedPaths); - ScavengedLookup = BuildChunkedContentLookup(ScavengedLocalContent); - } - } - - if (!ScavengedLocalContent.Paths.empty()) - { - ScavengePath = Source.Path; - PathsFound += ScavengedLocalContent.Paths.size(); - ChunksFound += ScavengedLocalContent.ChunkedContent.ChunkHashes.size(); - } - } - - if (ScavengePath.empty()) - { - ScavengedLocalContent = {}; - ScavengedLookups[ScavengeIndex] = {}; - ScavengedPaths[ScavengeIndex].clear(); - } - } - } - PathsScavenged++; - } - }); - } - { - ZEN_TRACE_CPU("ScavengeScan_Wait"); - - Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(PendingWork); - std::string Details = fmt::format("{}/{} scanned. {} paths and {} chunks found for scavenging", - PathsScavenged.load(), - ScavengePathCount, - PathsFound.load(), - ChunksFound.load()); - ScavengeProgressBar.UpdateState({.Task = "Scavenging ", - .Details = Details, - .TotalCount = ScavengePathCount, - .RemainingCount = ScavengePathCount - PathsScavenged.load(), - .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, - false); - }); - } - - ScavengeProgressBar.Finish(); - if (AbortFlag) - { - return; - } - - for (uint32_t ScavengedContentIndex = 0; - ScavengedContentIndex < ScavengedContents.size() && (!SequenceIndexesLeftToFindToRemoteIndex.empty()); - ScavengedContentIndex++) - { - const std::filesystem::path& ScavengePath = ScavengedPaths[ScavengedContentIndex]; - if (!ScavengePath.empty()) - { - const ChunkedFolderContent& ScavengedLocalContent = ScavengedContents[ScavengedContentIndex]; - const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengedContentIndex]; - - for (uint32_t ScavengedSequenceIndex = 0; - ScavengedSequenceIndex < ScavengedLocalContent.ChunkedContent.SequenceRawHashes.size(); - ScavengedSequenceIndex++) - { - const IoHash& SequenceRawHash = ScavengedLocalContent.ChunkedContent.SequenceRawHashes[ScavengedSequenceIndex]; - if (auto It = SequenceIndexesLeftToFindToRemoteIndex.find(SequenceRawHash); - It != SequenceIndexesLeftToFindToRemoteIndex.end()) - { - const uint32_t RemoteSequenceIndex = It->second; - const uint64_t RawSize = - RemoteContent.RawSizes[RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]]; - ZEN_ASSERT(RawSize > 0); - - const uint32_t ScavengedPathIndex = ScavengedLookup.SequenceIndexFirstPathIndex[ScavengedSequenceIndex]; - ZEN_ASSERT_SLOW(IsFile((ScavengePath / ScavengedLocalContent.Paths[ScavengedPathIndex]).make_preferred())); - - ScavengeCopyOperations.push_back({.ScavengedContentIndex = ScavengedContentIndex, - .ScavengedPathIndex = ScavengedPathIndex, - .RemoteSequenceIndex = RemoteSequenceIndex, - .RawSize = RawSize}); - - SequenceIndexesLeftToFindToRemoteIndex.erase(SequenceRawHash); - SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = 0; - - CacheMappingStats.ScavengedPathsMatchingSequencesCount++; - CacheMappingStats.ScavengedPathsMatchingSequencesByteCount += RawSize; - } - } - ScavengedPathsCount++; - } - } - } - CacheMappingStats.ScavengeElapsedWallTimeUs += ScavengeTimer.GetElapsedTimeUs(); - } - - uint32_t RemainingChunkCount = 0; - for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++) - { - uint64_t ChunkWriteCount = GetChunkWriteCount(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex); - if (ChunkWriteCount > 0) - { - RemainingChunkCount++; - } - } - - // Pick up all chunks in current local state - // TODO: Rename to LocalStateCopyData - struct CacheCopyData - { - uint32_t ScavengeSourceIndex = (uint32_t)-1; - uint32_t SourceSequenceIndex = (uint32_t)-1; - std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> TargetChunkLocationPtrs; - struct ChunkTarget - { - uint32_t TargetChunkLocationCount = (uint32_t)-1; - uint32_t RemoteChunkIndex = (uint32_t)-1; - uint64_t CacheFileOffset = (uint64_t)-1; - }; - std::vector<ChunkTarget> ChunkTargets; - }; - - tsl::robin_map<IoHash, size_t, IoHash::Hasher> RawHashToCacheCopyDataIndex; - std::vector<CacheCopyData> CacheCopyDatas; - - if (!Options.PrimeCacheOnly && Options.EnableTargetFolderScavenging) - { - ZEN_TRACE_CPU("UpdateFolder_GetLocalChunks"); - - Stopwatch LocalTimer; - - for (uint32_t LocalSequenceIndex = 0; - LocalSequenceIndex < LocalContent.ChunkedContent.SequenceRawHashes.size() && (RemainingChunkCount > 0); - LocalSequenceIndex++) - { - const IoHash& LocalSequenceRawHash = LocalContent.ChunkedContent.SequenceRawHashes[LocalSequenceIndex]; - const uint32_t LocalOrderOffset = LocalLookup.SequenceIndexChunkOrderOffset[LocalSequenceIndex]; - - { - uint64_t SourceOffset = 0; - const uint32_t LocalChunkCount = LocalContent.ChunkedContent.ChunkCounts[LocalSequenceIndex]; - for (uint32_t LocalOrderIndex = 0; LocalOrderIndex < LocalChunkCount; LocalOrderIndex++) - { - const uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[LocalOrderOffset + LocalOrderIndex]; - const IoHash& LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex]; - const uint64_t LocalChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[LocalChunkIndex]; - - if (auto RemoteChunkIt = RemoteLookup.ChunkHashToChunkIndex.find(LocalChunkHash); - RemoteChunkIt != RemoteLookup.ChunkHashToChunkIndex.end()) - { - const uint32_t RemoteChunkIndex = RemoteChunkIt->second; - if (!RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) - { - std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = - GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex); - - if (!ChunkTargetPtrs.empty()) - { - CacheCopyData::ChunkTarget Target = { - .TargetChunkLocationCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()), - .RemoteChunkIndex = RemoteChunkIndex, - .CacheFileOffset = SourceOffset}; - if (auto CopySourceIt = RawHashToCacheCopyDataIndex.find(LocalSequenceRawHash); - CopySourceIt != RawHashToCacheCopyDataIndex.end()) - { - CacheCopyData& Data = CacheCopyDatas[CopySourceIt->second]; - if (Data.TargetChunkLocationPtrs.size() > 1024) - { - RawHashToCacheCopyDataIndex.insert_or_assign(LocalSequenceRawHash, CacheCopyDatas.size()); - CacheCopyDatas.push_back( - CacheCopyData{.ScavengeSourceIndex = (uint32_t)-1, - .SourceSequenceIndex = LocalSequenceIndex, - .TargetChunkLocationPtrs = ChunkTargetPtrs, - .ChunkTargets = std::vector<CacheCopyData::ChunkTarget>{Target}}); - } - else - { - Data.TargetChunkLocationPtrs.insert(Data.TargetChunkLocationPtrs.end(), - ChunkTargetPtrs.begin(), - ChunkTargetPtrs.end()); - Data.ChunkTargets.push_back(Target); - } - } - else - { - RawHashToCacheCopyDataIndex.insert_or_assign(LocalSequenceRawHash, CacheCopyDatas.size()); - CacheCopyDatas.push_back( - CacheCopyData{.ScavengeSourceIndex = (uint32_t)-1, - .SourceSequenceIndex = LocalSequenceIndex, - .TargetChunkLocationPtrs = ChunkTargetPtrs, - .ChunkTargets = std::vector<CacheCopyData::ChunkTarget>{Target}}); - } - CacheMappingStats.LocalChunkMatchingRemoteCount++; - CacheMappingStats.LocalChunkMatchingRemoteByteCount += LocalChunkRawSize; - RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex] = true; - RemainingChunkCount--; - } - } - } - SourceOffset += LocalChunkRawSize; - } - } - } - CacheMappingStats.LocalScanElapsedWallTimeUs += LocalTimer.GetElapsedTimeUs(); - } - - if (!Options.PrimeCacheOnly && Options.EnableOtherDownloadsScavenging) - { - ZEN_TRACE_CPU("UpdateFolder_GetScavengeChunks"); - - Stopwatch ScavengeTimer; - - for (uint32_t ScavengedContentIndex = 0; ScavengedContentIndex < ScavengedContents.size() && (RemainingChunkCount > 0); - ScavengedContentIndex++) - { - const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengedContentIndex]; - // const std::filesystem::path& ScavengedPath = ScavengedPaths[ScavengedContentIndex]; - const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengedContentIndex]; - - for (uint32_t ScavengedSequenceIndex = 0; - ScavengedSequenceIndex < ScavengedContent.ChunkedContent.SequenceRawHashes.size() && (RemainingChunkCount > 0); - ScavengedSequenceIndex++) - { - const IoHash& ScavengedSequenceRawHash = ScavengedContent.ChunkedContent.SequenceRawHashes[ScavengedSequenceIndex]; - const uint32_t ScavengedOrderOffset = ScavengedLookup.SequenceIndexChunkOrderOffset[ScavengedSequenceIndex]; - - { - uint64_t SourceOffset = 0; - const uint32_t ScavengedChunkCount = ScavengedContent.ChunkedContent.ChunkCounts[ScavengedSequenceIndex]; - for (uint32_t ScavengedOrderIndex = 0; ScavengedOrderIndex < ScavengedChunkCount; ScavengedOrderIndex++) - { - const uint32_t ScavengedChunkIndex = - ScavengedContent.ChunkedContent.ChunkOrders[ScavengedOrderOffset + ScavengedOrderIndex]; - const IoHash& ScavengedChunkHash = ScavengedContent.ChunkedContent.ChunkHashes[ScavengedChunkIndex]; - const uint64_t ScavengedChunkRawSize = ScavengedContent.ChunkedContent.ChunkRawSizes[ScavengedChunkIndex]; - - if (auto RemoteChunkIt = RemoteLookup.ChunkHashToChunkIndex.find(ScavengedChunkHash); - RemoteChunkIt != RemoteLookup.ChunkHashToChunkIndex.end()) - { - const uint32_t RemoteChunkIndex = RemoteChunkIt->second; - if (!RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) - { - std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = - GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex); - - if (!ChunkTargetPtrs.empty()) - { - CacheCopyData::ChunkTarget Target = { - .TargetChunkLocationCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()), - .RemoteChunkIndex = RemoteChunkIndex, - .CacheFileOffset = SourceOffset}; - if (auto CopySourceIt = RawHashToCacheCopyDataIndex.find(ScavengedSequenceRawHash); - CopySourceIt != RawHashToCacheCopyDataIndex.end()) - { - CacheCopyData& Data = CacheCopyDatas[CopySourceIt->second]; - if (Data.TargetChunkLocationPtrs.size() > 1024) - { - RawHashToCacheCopyDataIndex.insert_or_assign(ScavengedSequenceRawHash, - CacheCopyDatas.size()); - CacheCopyDatas.push_back( - CacheCopyData{.ScavengeSourceIndex = ScavengedContentIndex, - .SourceSequenceIndex = ScavengedSequenceIndex, - .TargetChunkLocationPtrs = ChunkTargetPtrs, - .ChunkTargets = std::vector<CacheCopyData::ChunkTarget>{Target}}); - } - else - { - Data.TargetChunkLocationPtrs.insert(Data.TargetChunkLocationPtrs.end(), - ChunkTargetPtrs.begin(), - ChunkTargetPtrs.end()); - Data.ChunkTargets.push_back(Target); - } - } - else - { - RawHashToCacheCopyDataIndex.insert_or_assign(ScavengedSequenceRawHash, CacheCopyDatas.size()); - CacheCopyDatas.push_back( - CacheCopyData{.ScavengeSourceIndex = ScavengedContentIndex, - .SourceSequenceIndex = ScavengedSequenceIndex, - .TargetChunkLocationPtrs = ChunkTargetPtrs, - .ChunkTargets = std::vector<CacheCopyData::ChunkTarget>{Target}}); - } - CacheMappingStats.ScavengedChunkMatchingRemoteCount++; - CacheMappingStats.ScavengedChunkMatchingRemoteByteCount += ScavengedChunkRawSize; - RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex] = true; - RemainingChunkCount--; - } - } - } - SourceOffset += ScavengedChunkRawSize; - } - } - } - } - CacheMappingStats.ScavengeElapsedWallTimeUs += ScavengeTimer.GetElapsedTimeUs(); - } - if (!IsQuiet) - { - if (!CachedSequenceHashesFound.empty() || !CachedChunkHashesFound.empty() || !CachedBlocksFound.empty()) - { - ZEN_CONSOLE("Download cache: Found {} ({}) chunk sequences, {} ({}) chunks, {} ({}) blocks in {}", - CachedSequenceHashesFound.size(), - NiceBytes(CacheMappingStats.CacheSequenceHashesByteCount), - CachedChunkHashesFound.size(), - NiceBytes(CacheMappingStats.CacheChunkByteCount), - CachedBlocksFound.size(), - NiceBytes(CacheMappingStats.CacheBlocksByteCount), - NiceTimeSpanMs(CacheMappingStats.CacheScanElapsedWallTimeUs / 1000)); - } - - if (!LocalPathIndexesMatchingSequenceIndexes.empty() || CacheMappingStats.LocalChunkMatchingRemoteCount > 0) - { - ZEN_CONSOLE("Local state : Found {} ({}) chunk sequences, {} ({}) chunks in {}", - LocalPathIndexesMatchingSequenceIndexes.size(), - NiceBytes(CacheMappingStats.LocalPathsMatchingSequencesByteCount), - CacheMappingStats.LocalChunkMatchingRemoteCount, - NiceBytes(CacheMappingStats.LocalChunkMatchingRemoteByteCount), - NiceTimeSpanMs(CacheMappingStats.LocalScanElapsedWallTimeUs / 1000)); - } - if (CacheMappingStats.ScavengedPathsMatchingSequencesCount > 0 || CacheMappingStats.ScavengedChunkMatchingRemoteCount > 0) - { - ZEN_CONSOLE("Scavenge of {} paths, found {} ({}) chunk sequences, {} ({}) chunks in {}", - ScavengedPathsCount, - CacheMappingStats.ScavengedPathsMatchingSequencesCount, - NiceBytes(CacheMappingStats.ScavengedPathsMatchingSequencesByteCount), - CacheMappingStats.ScavengedChunkMatchingRemoteCount, - NiceBytes(CacheMappingStats.ScavengedChunkMatchingRemoteByteCount), - NiceTimeSpanMs(CacheMappingStats.ScavengeElapsedWallTimeUs / 1000)); - } - } - - uint64_t BytesToWrite = 0; - - for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++) - { - uint64_t ChunkWriteCount = GetChunkWriteCount(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex); - if (ChunkWriteCount > 0) - { - BytesToWrite += RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] * ChunkWriteCount; - if (!RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) - { - RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex] = true; - } - } - } - - for (const ScavengeCopyOperation& ScavengeCopyOp : ScavengeCopyOperations) - { - BytesToWrite += ScavengeCopyOp.RawSize; - } - - uint64_t TotalRequestCount = 0; - uint64_t TotalPartWriteCount = 0; - std::atomic<uint64_t> WritePartsComplete = 0; - - { - ZEN_TRACE_CPU("WriteChunks"); - - Stopwatch WriteTimer; - - FilteredRate FilteredDownloadedBytesPerSecond; - FilteredRate FilteredWrittenBytesPerSecond; - - WorkerThreadPool& NetworkPool = GetNetworkPool(); - WorkerThreadPool& WritePool = GetIOWorkerPool(); - - ProgressBar WriteProgressBar(ProgressMode, Options.PrimeCacheOnly ? "Downloading" : "Writing"); - ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - - struct LooseChunkHashWorkData - { - std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs; - uint32_t RemoteChunkIndex = (uint32_t)-1; - }; - - std::vector<LooseChunkHashWorkData> LooseChunkHashWorks; - TotalPartWriteCount += CacheCopyDatas.size(); - TotalPartWriteCount += ScavengeCopyOperations.size(); - - for (const IoHash ChunkHash : LooseChunkHashes) - { - auto RemoteChunkIndexIt = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); - ZEN_ASSERT(RemoteChunkIndexIt != RemoteLookup.ChunkHashToChunkIndex.end()); - const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second; - if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex]) - { - ZEN_CONSOLE_VERBOSE("Skipping chunk {} due to cache reuse", ChunkHash); - continue; - } - bool NeedsCopy = true; - if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex].compare_exchange_strong(NeedsCopy, false)) - { - std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = - GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteLookup, RemoteChunkIndex); - - if (ChunkTargetPtrs.empty()) - { - ZEN_CONSOLE_VERBOSE("Skipping chunk {} due to cache reuse", ChunkHash); - } - else - { - TotalRequestCount++; - TotalPartWriteCount++; - LooseChunkHashWorks.push_back( - LooseChunkHashWorkData{.ChunkTargetPtrs = ChunkTargetPtrs, .RemoteChunkIndex = RemoteChunkIndex}); - } - } - } - - uint32_t BlockCount = gsl::narrow<uint32_t>(BlockDescriptions.size()); - - std::vector<bool> ChunkIsPickedUpByBlock(RemoteContent.ChunkedContent.ChunkHashes.size(), false); - auto GetNeededChunkBlockIndexes = [&RemoteContent, - &RemoteLookup, - &RemoteChunkIndexNeedsCopyFromSourceFlags, - &ChunkIsPickedUpByBlock](const ChunkBlockDescription& BlockDescription) { - ZEN_TRACE_CPU("UpdateFolder_GetNeededChunkBlockIndexes"); - std::vector<uint32_t> NeededBlockChunkIndexes; - for (uint32_t ChunkBlockIndex = 0; ChunkBlockIndex < BlockDescription.ChunkRawHashes.size(); ChunkBlockIndex++) - { - const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex]; - if (auto It = RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); It != RemoteLookup.ChunkHashToChunkIndex.end()) - { - const uint32_t RemoteChunkIndex = It->second; - if (!ChunkIsPickedUpByBlock[RemoteChunkIndex]) - { - if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex]) - { - ChunkIsPickedUpByBlock[RemoteChunkIndex] = true; - NeededBlockChunkIndexes.push_back(ChunkBlockIndex); - } - } - } - } - return NeededBlockChunkIndexes; - }; - - std::vector<uint32_t> CachedChunkBlockIndexes; - std::vector<uint32_t> FetchBlockIndexes; - std::vector<std::vector<uint32_t>> AllBlockChunkIndexNeeded; - - for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; BlockIndex++) - { - const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; - - std::vector<uint32_t> BlockChunkIndexNeeded = GetNeededChunkBlockIndexes(BlockDescription); - if (!BlockChunkIndexNeeded.empty()) - { - if (Options.PrimeCacheOnly) - { - FetchBlockIndexes.push_back(BlockIndex); - } - else - { - bool UsingCachedBlock = false; - if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end()) - { - ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_CacheGet"); - - TotalPartWriteCount++; - - std::filesystem::path BlockPath = - ZenTempBlockFolderPath(Options.ZenFolderPath) / BlockDescription.BlockHash.ToHexString(); - if (IsFile(BlockPath)) - { - CachedChunkBlockIndexes.push_back(BlockIndex); - UsingCachedBlock = true; - } - } - if (!UsingCachedBlock) - { - FetchBlockIndexes.push_back(BlockIndex); - } - } - } - AllBlockChunkIndexNeeded.emplace_back(std::move(BlockChunkIndexNeeded)); - } - - struct BlobsExistsResult - { - tsl::robin_set<IoHash> ExistingBlobs; - uint64_t ElapsedTimeMs = 0; - }; - - BlobsExistsResult ExistsResult; - - if (Storage.BuildCacheStorage) - { - ZEN_TRACE_CPU("BlobCacheExistCheck"); - Stopwatch Timer; - - tsl::robin_set<IoHash> BlobHashesSet; - - BlobHashesSet.reserve(LooseChunkHashWorks.size() + FetchBlockIndexes.size()); - for (LooseChunkHashWorkData& LooseChunkHashWork : LooseChunkHashWorks) - { - BlobHashesSet.insert(RemoteContent.ChunkedContent.ChunkHashes[LooseChunkHashWork.RemoteChunkIndex]); - } - for (uint32_t BlockIndex : FetchBlockIndexes) - { - const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; - BlobHashesSet.insert(BlockDescription.BlockHash); - } - - if (!BlobHashesSet.empty()) - { - const std::vector<IoHash> BlobHashes(BlobHashesSet.begin(), BlobHashesSet.end()); - const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult = - Storage.BuildCacheStorage->BlobsExists(BuildId, BlobHashes); - - if (CacheExistsResult.size() == BlobHashes.size()) - { - ExistsResult.ExistingBlobs.reserve(CacheExistsResult.size()); - for (size_t BlobIndex = 0; BlobIndex < BlobHashes.size(); BlobIndex++) - { - if (CacheExistsResult[BlobIndex].HasBody) - { - ExistsResult.ExistingBlobs.insert(BlobHashes[BlobIndex]); - } - } - } - ExistsResult.ElapsedTimeMs = Timer.GetElapsedTimeMs(); - if (!ExistsResult.ExistingBlobs.empty() && !IsQuiet) - { - ZEN_CONSOLE("Remote cache : Found {} out of {} needed blobs in {}", - ExistsResult.ExistingBlobs.size(), - BlobHashes.size(), - NiceTimeSpanMs(ExistsResult.ElapsedTimeMs)); - } - } - } - - std::vector<BlockRangeDescriptor> BlockRangeWorks; - std::vector<uint32_t> FullBlockWorks; - { - Stopwatch Timer; - - std::vector<uint32_t> PartialBlockIndexes; - - for (uint32_t BlockIndex : FetchBlockIndexes) - { - const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; - - const std::vector<uint32_t> BlockChunkIndexNeeded = std::move(AllBlockChunkIndexNeeded[BlockIndex]); - if (!BlockChunkIndexNeeded.empty()) - { - bool WantsToDoPartialBlockDownload = BlockChunkIndexNeeded.size() < BlockDescription.ChunkRawHashes.size(); - bool CanDoPartialBlockDownload = - (BlockDescription.HeaderSize > 0) && - (BlockDescription.ChunkCompressedLengths.size() == BlockDescription.ChunkRawHashes.size()); - - bool AllowedToDoPartialRequest = false; - bool BlockExistInCache = ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash); - switch (Options.PartialBlockRequestMode) - { - case EPartialBlockRequestMode::Off: - break; - case EPartialBlockRequestMode::ZenCacheOnly: - AllowedToDoPartialRequest = BlockExistInCache; - break; - case EPartialBlockRequestMode::Mixed: - case EPartialBlockRequestMode::All: - AllowedToDoPartialRequest = true; - break; - default: - ZEN_ASSERT(false); - break; - } - - const uint32_t ChunkStartOffsetInBlock = - gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); - - const uint64_t TotalBlockSize = std::accumulate(BlockDescription.ChunkCompressedLengths.begin(), - BlockDescription.ChunkCompressedLengths.end(), - std::uint64_t(ChunkStartOffsetInBlock)); - - if (AllowedToDoPartialRequest && WantsToDoPartialBlockDownload && CanDoPartialBlockDownload) - { - ZEN_TRACE_CPU("UpdateFolder_HandleBlocks_PartialAnalysis"); - - bool LimitToSingleRange = - BlockExistInCache ? false : Options.PartialBlockRequestMode == EPartialBlockRequestMode::Mixed; - uint64_t TotalWantedChunksSize = 0; - std::optional<std::vector<BlockRangeDescriptor>> MaybeBlockRanges = - CalculateBlockRanges(BlockIndex, - BlockDescription, - BlockChunkIndexNeeded, - LimitToSingleRange, - ChunkStartOffsetInBlock, - TotalBlockSize, - TotalWantedChunksSize); - ZEN_ASSERT(TotalWantedChunksSize <= TotalBlockSize); - - if (MaybeBlockRanges.has_value()) - { - const std::vector<BlockRangeDescriptor>& BlockRanges = MaybeBlockRanges.value(); - ZEN_ASSERT(!BlockRanges.empty()); - BlockRangeWorks.insert(BlockRangeWorks.end(), BlockRanges.begin(), BlockRanges.end()); - TotalRequestCount += BlockRanges.size(); - TotalPartWriteCount += BlockRanges.size(); - - uint64_t RequestedSize = std::accumulate( - BlockRanges.begin(), - BlockRanges.end(), - uint64_t(0), - [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; }); - PartialBlockIndexes.push_back(BlockIndex); - - if (RequestedSize > TotalWantedChunksSize) - { - ZEN_CONSOLE_VERBOSE("Requesting {} chunks ({}) from block {} ({}) using {} requests (extra bytes {})", - BlockChunkIndexNeeded.size(), - NiceBytes(RequestedSize), - BlockDescription.BlockHash, - NiceBytes(TotalBlockSize), - BlockRanges.size(), - NiceBytes(RequestedSize - TotalWantedChunksSize)); - } - } - else - { - FullBlockWorks.push_back(BlockIndex); - TotalRequestCount++; - TotalPartWriteCount++; - } - } - else - { - FullBlockWorks.push_back(BlockIndex); - TotalRequestCount++; - TotalPartWriteCount++; - } - } - } - - if (!PartialBlockIndexes.empty()) - { - uint64_t TotalFullBlockRequestBytes = 0; - for (uint32_t BlockIndex : FullBlockWorks) - { - const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; - uint32_t CurrentOffset = - gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); - - TotalFullBlockRequestBytes += std::accumulate(BlockDescription.ChunkCompressedLengths.begin(), - BlockDescription.ChunkCompressedLengths.end(), - std::uint64_t(CurrentOffset)); - } - - uint64_t TotalPartialBlockBytes = 0; - for (uint32_t BlockIndex : PartialBlockIndexes) - { - const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; - uint32_t CurrentOffset = - gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize); - - TotalPartialBlockBytes += std::accumulate(BlockDescription.ChunkCompressedLengths.begin(), - BlockDescription.ChunkCompressedLengths.end(), - std::uint64_t(CurrentOffset)); - } - - uint64_t NonPartialTotalBlockBytes = TotalFullBlockRequestBytes + TotalPartialBlockBytes; - - const uint64_t TotalPartialBlockRequestBytes = - std::accumulate(BlockRangeWorks.begin(), - BlockRangeWorks.end(), - uint64_t(0), - [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; }); - uint64_t TotalExtraPartialBlocksRequests = BlockRangeWorks.size() - PartialBlockIndexes.size(); - - uint64_t TotalSavedBlocksSize = TotalPartialBlockBytes - TotalPartialBlockRequestBytes; - double SavedSizePercent = (TotalSavedBlocksSize * 100.0) / NonPartialTotalBlockBytes; - - if (!IsQuiet) - { - ZEN_CONSOLE( - "Analisys of partial block requests saves download of {} out of {} ({:.1f}%) using {} extra " - "requests. Completed in {}", - NiceBytes(TotalSavedBlocksSize), - NiceBytes(NonPartialTotalBlockBytes), - SavedSizePercent, - TotalExtraPartialBlocksRequests, - NiceTimeSpanMs(ExistsResult.ElapsedTimeMs)); - } - // exit(0); - } - } - - BufferedWriteFileCache WriteCache; - - for (uint32_t ScavengeOpIndex = 0; ScavengeOpIndex < ScavengeCopyOperations.size(); ScavengeOpIndex++) - { - if (AbortFlag) - { - break; - } - if (!Options.PrimeCacheOnly) - { - Work.ScheduleWork( - WritePool, - [&RemoteContent, - &CacheFolderPath, - &ScavengedPaths, - &ScavengeCopyOperations, - &ScavengedContents, - &FilteredWrittenBytesPerSecond, - ScavengeOpIndex, - &WritePartsComplete, - TotalPartWriteCount, - &DiskStats](std::atomic<bool>&) mutable { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_WriteScavenged"); - - FilteredWrittenBytesPerSecond.Start(); - - const ScavengeCopyOperation& ScavengeOp = ScavengeCopyOperations[ScavengeOpIndex]; - const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengeOp.ScavengedContentIndex]; - const std::filesystem::path ScavengedPath = ScavengedContent.Paths[ScavengeOp.ScavengedPathIndex]; - - const std::filesystem::path ScavengedFilePath = - (ScavengedPaths[ScavengeOp.ScavengedContentIndex] / ScavengedPath).make_preferred(); - ZEN_ASSERT_SLOW(FileSizeFromPath(ScavengedFilePath) == ScavengeOp.RawSize); - - const IoHash& RemoteSequenceRawHash = - RemoteContent.ChunkedContent.SequenceRawHashes[ScavengeOp.RemoteSequenceIndex]; - const std::filesystem::path TempFilePath = - GetTempChunkedSequenceFileName(CacheFolderPath, RemoteSequenceRawHash); - - const uint64_t RawSize = ScavengedContent.RawSizes[ScavengeOp.ScavengedPathIndex]; - CopyFile(ScavengedFilePath, - TempFilePath, - RawSize, - DiskStats.WriteCount, - DiskStats.WriteByteCount, - DiskStats.CloneCount, - DiskStats.CloneByteCount); - - const std::filesystem::path CacheFilePath = - GetFinalChunkedSequenceFileName(CacheFolderPath, RemoteSequenceRawHash); - RenameFile(TempFilePath, CacheFilePath); - - WritePartsComplete++; - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); - } - } - }); - } - } - - for (uint32_t LooseChunkHashWorkIndex = 0; LooseChunkHashWorkIndex < LooseChunkHashWorks.size(); LooseChunkHashWorkIndex++) - { - if (AbortFlag) - { - break; - } - - LooseChunkHashWorkData& LooseChunkHashWork = LooseChunkHashWorks[LooseChunkHashWorkIndex]; - - std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = - std::move(LooseChunkHashWork.ChunkTargetPtrs); - const uint32_t RemoteChunkIndex = LooseChunkHashWork.RemoteChunkIndex; - - if (Options.PrimeCacheOnly && - ExistsResult.ExistingBlobs.contains(RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex])) - { - DownloadStats.RequestsCompleteCount++; - continue; - } - - Work.ScheduleWork( - WritePool, - [&Storage, - &Path, - &Options, - &RemoteContent, - &RemoteLookup, - &CacheFolderPath, - &SequenceIndexChunksLeftToWriteCounters, - &Work, - &WritePool, - &NetworkPool, - &ExistsResult, - &DiskStats, - &DownloadStats, - &WriteChunkStats, - &WritePartsComplete, - RemoteChunkIndex, - ChunkTargetPtrs, - BuildId = Oid(BuildId), - TotalRequestCount, - TotalPartWriteCount, - &WriteCache, - &FilteredDownloadedBytesPerSecond, - &FilteredWrittenBytesPerSecond](std::atomic<bool>&) mutable { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_ReadPreDownloaded"); - std::filesystem::path ExistingCompressedChunkPath; - if (!Options.PrimeCacheOnly) - { - const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; - std::filesystem::path CompressedChunkPath = - ZenTempDownloadFolderPath(Options.ZenFolderPath) / ChunkHash.ToHexString(); - if (IsFile(CompressedChunkPath)) - { - IoBuffer ExistingCompressedPart = IoBufferBuilder::MakeFromFile(ExistingCompressedChunkPath); - if (ExistingCompressedPart) - { - IoHash RawHash; - uint64_t RawSize; - if (CompressedBuffer::ValidateCompressedHeader(ExistingCompressedPart, RawHash, RawSize)) - { - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - ExistingCompressedChunkPath = std::move(CompressedChunkPath); - } - else - { - std::error_code DummyEc; - RemoveFile(CompressedChunkPath, DummyEc); - } - } - } - } - if (!AbortFlag) - - { - if (!ExistingCompressedChunkPath.empty()) - { - Work.ScheduleWork( - WritePool, - [&Path, - &Options, - &RemoteContent, - &RemoteLookup, - &CacheFolderPath, - &SequenceIndexChunksLeftToWriteCounters, - &WriteCache, - &Work, - &WritePool, - &DiskStats, - &WriteChunkStats, - &WritePartsComplete, - TotalPartWriteCount, - &FilteredWrittenBytesPerSecond, - RemoteChunkIndex, - ChunkTargetPtrs, - CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>&) mutable { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_WritePreDownloaded"); - - FilteredWrittenBytesPerSecond.Start(); - - const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; - - IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath); - if (!CompressedPart) - { - throw std::runtime_error( - fmt::format("Could not open dowloaded compressed chunk {} from {}", - ChunkHash, - CompressedChunkPath)); - } - - std::filesystem::path TargetFolder = ZenTempCacheFolderPath(Options.ZenFolderPath); - bool NeedHashVerify = WriteCompressedChunk(TargetFolder, - RemoteContent, - RemoteLookup, - ChunkHash, - ChunkTargetPtrs, - WriteCache, - std::move(CompressedPart), - DiskStats); - WritePartsComplete++; - - if (!AbortFlag) - { - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); - } - - TryRemoveFile(CompressedChunkPath); - - std::vector<uint32_t> CompletedSequences = - CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters); - WriteCache.Close(CompletedSequences); - if (NeedHashVerify) - { - VerifyAndCompleteChunkSequencesAsync(TargetFolder, - RemoteContent, - RemoteLookup, - CompletedSequences, - Work, - WritePool); - } - else - { - FinalizeChunkSequences(TargetFolder, RemoteContent, CompletedSequences); - } - } - } - }); - } - else - { - Work.ScheduleWork( - NetworkPool, - [&Path, - &Options, - &Storage, - BuildId = Oid(BuildId), - &RemoteContent, - &RemoteLookup, - &ExistsResult, - &SequenceIndexChunksLeftToWriteCounters, - &WriteCache, - &Work, - &WritePool, - &NetworkPool, - &DiskStats, - &WriteChunkStats, - &WritePartsComplete, - TotalPartWriteCount, - TotalRequestCount, - &FilteredDownloadedBytesPerSecond, - &FilteredWrittenBytesPerSecond, - RemoteChunkIndex, - ChunkTargetPtrs, - &DownloadStats](std::atomic<bool>&) mutable { - if (!AbortFlag) - { - const IoHash& ChunkHash = RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; - FilteredDownloadedBytesPerSecond.Start(); - IoBuffer BuildBlob; - const bool ExistsInCache = - Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash); - if (ExistsInCache) - { - BuildBlob = Storage.BuildCacheStorage->GetBuildBlob(BuildId, ChunkHash); - } - if (BuildBlob) - { - uint64_t BlobSize = BuildBlob.GetSize(); - DownloadStats.DownloadedChunkCount++; - DownloadStats.DownloadedChunkByteCount += BlobSize; - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - AsyncWriteDownloadedChunk(Options.ZenFolderPath, - RemoteContent, - RemoteLookup, - RemoteChunkIndex, - std::move(ChunkTargetPtrs), - WriteCache, - Work, - WritePool, - std::move(BuildBlob), - SequenceIndexChunksLeftToWriteCounters, - WritePartsComplete, - TotalPartWriteCount, - FilteredWrittenBytesPerSecond, - DiskStats); - } - else - { - if (RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= - Options.LargeAttachmentSize) - { - ZEN_TRACE_CPU("UpdateFolder_GetLargeChunk"); - DownloadLargeBlob(*Storage.BuildStorage, - ZenTempDownloadFolderPath(Options.ZenFolderPath), - BuildId, - ChunkHash, - Options.PreferredMultipartChunkSize, - Work, - NetworkPool, - DownloadStats, - [&Storage, - &Options, - &RemoteContent, - &RemoteLookup, - BuildId, - &SequenceIndexChunksLeftToWriteCounters, - &WriteCache, - &Work, - &WritePool, - ChunkHash, - TotalPartWriteCount, - TotalRequestCount, - &WritePartsComplete, - &FilteredWrittenBytesPerSecond, - &FilteredDownloadedBytesPerSecond, - &DownloadStats, - &DiskStats, - RemoteChunkIndex, - ChunkTargetPtrs](IoBuffer&& Payload) mutable { - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - if (Payload && Storage.BuildCacheStorage) - { - Storage.BuildCacheStorage->PutBuildBlob( - BuildId, - ChunkHash, - ZenContentType::kCompressedBinary, - CompositeBuffer(SharedBuffer(Payload))); - } - if (!Options.PrimeCacheOnly) - { - if (!AbortFlag) - { - AsyncWriteDownloadedChunk( - Options.ZenFolderPath, - RemoteContent, - RemoteLookup, - RemoteChunkIndex, - std::move(ChunkTargetPtrs), - WriteCache, - Work, - WritePool, - std::move(Payload), - SequenceIndexChunksLeftToWriteCounters, - WritePartsComplete, - TotalPartWriteCount, - FilteredWrittenBytesPerSecond, - DiskStats); - } - } - }); - } - else - { - ZEN_TRACE_CPU("UpdateFolder_GetChunk"); - BuildBlob = Storage.BuildStorage->GetBuildBlob(BuildId, ChunkHash); - if (BuildBlob && Storage.BuildCacheStorage) - { - Storage.BuildCacheStorage->PutBuildBlob( - BuildId, - ChunkHash, - ZenContentType::kCompressedBinary, - CompositeBuffer(SharedBuffer(BuildBlob))); - } - if (!BuildBlob) - { - throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash)); - } - if (!Options.PrimeCacheOnly) - { - if (!AbortFlag) - { - uint64_t BlobSize = BuildBlob.GetSize(); - DownloadStats.DownloadedChunkCount++; - DownloadStats.DownloadedChunkByteCount += BlobSize; - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - AsyncWriteDownloadedChunk(Options.ZenFolderPath, - RemoteContent, - RemoteLookup, - RemoteChunkIndex, - std::move(ChunkTargetPtrs), - WriteCache, - Work, - WritePool, - std::move(BuildBlob), - SequenceIndexChunksLeftToWriteCounters, - WritePartsComplete, - TotalPartWriteCount, - FilteredWrittenBytesPerSecond, - DiskStats); - } - } - } - } - } - }); - } - } - } - }); - } - - std::unique_ptr<CloneQueryInterface> CloneQuery; - if (AllowFileClone) - { - CloneQuery = GetCloneQueryInterface(CacheFolderPath); - } - - for (size_t CopyDataIndex = 0; CopyDataIndex < CacheCopyDatas.size(); CopyDataIndex++) - { - ZEN_ASSERT(!Options.PrimeCacheOnly); - if (AbortFlag) - { - break; - } - - Work.ScheduleWork( - WritePool, - [&Path, - &LocalContent, - &RemoteContent, - &RemoteLookup, - &CacheFolderPath, - &CloneQuery, - &LocalLookup, - &SequenceIndexChunksLeftToWriteCounters, - &WriteCache, - &Work, - &WritePool, - &FilteredWrittenBytesPerSecond, - &CacheCopyDatas, - &ScavengedContents, - &ScavengedLookups, - &ScavengedPaths, - &WritePartsComplete, - TotalPartWriteCount, - &DiskStats, - CopyDataIndex](std::atomic<bool>&) { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_CopyLocal"); - - FilteredWrittenBytesPerSecond.Start(); - const CacheCopyData& CopyData = CacheCopyDatas[CopyDataIndex]; - - std::filesystem::path SourceFilePath; - - if (CopyData.ScavengeSourceIndex == (uint32_t)-1) - { - const uint32_t LocalPathIndex = LocalLookup.SequenceIndexFirstPathIndex[CopyData.SourceSequenceIndex]; - SourceFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred(); - } - else - { - const ChunkedFolderContent& ScavengedContent = ScavengedContents[CopyData.ScavengeSourceIndex]; - const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[CopyData.ScavengeSourceIndex]; - const std::filesystem::path ScavengedPath = ScavengedPaths[CopyData.ScavengeSourceIndex]; - const uint32_t ScavengedPathIndex = - ScavengedLookup.SequenceIndexFirstPathIndex[CopyData.SourceSequenceIndex]; - SourceFilePath = (ScavengedPath / ScavengedContent.Paths[ScavengedPathIndex]).make_preferred(); - } - ZEN_ASSERT_SLOW(IsFile(SourceFilePath)); - ZEN_ASSERT(!CopyData.TargetChunkLocationPtrs.empty()); - - uint64_t CacheLocalFileBytesRead = 0; - - size_t TargetStart = 0; - const std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> AllTargets( - CopyData.TargetChunkLocationPtrs); - - struct WriteOp - { - const ChunkedContentLookup::ChunkSequenceLocation* Target = nullptr; - uint64_t CacheFileOffset = (uint64_t)-1; - uint32_t ChunkIndex = (uint32_t)-1; - }; - - std::vector<WriteOp> WriteOps; - - if (!AbortFlag) - { - ZEN_TRACE_CPU("Sort"); - WriteOps.reserve(AllTargets.size()); - for (const CacheCopyData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets) - { - std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> TargetRange = - AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount); - for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange) - { - WriteOps.push_back(WriteOp{.Target = Target, - .CacheFileOffset = ChunkTarget.CacheFileOffset, - .ChunkIndex = ChunkTarget.RemoteChunkIndex}); - } - TargetStart += ChunkTarget.TargetChunkLocationCount; - } - - std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) { - if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex) - { - return true; - } - else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex) - { - return false; - } - if (Lhs.Target->Offset < Rhs.Target->Offset) - { - return true; - } - return false; - }); - } - - if (!AbortFlag) - { - ZEN_TRACE_CPU("Write"); - - tsl::robin_set<uint32_t> ChunkIndexesWritten; - - BufferedOpenFile SourceFile(SourceFilePath, DiskStats); - - bool CanCloneSource = CloneQuery && CloneQuery->CanClone(SourceFile.Handle()); - - BufferedWriteFileCache::Local LocalWriter(WriteCache); - - for (size_t WriteOpIndex = 0; WriteOpIndex < WriteOps.size();) - { - if (AbortFlag) - { - break; - } - const WriteOp& Op = WriteOps[WriteOpIndex]; - - const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex; - ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() <= - RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex]); - ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].load() > 0); - const uint32_t RemotePathIndex = RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]; - const uint64_t TargetSize = RemoteContent.RawSizes[RemotePathIndex]; - const uint64_t ChunkSize = RemoteContent.ChunkedContent.ChunkRawSizes[Op.ChunkIndex]; - - uint64_t ReadLength = ChunkSize; - size_t WriteCount = 1; - uint64_t OpSourceEnd = Op.CacheFileOffset + ChunkSize; - uint64_t OpTargetEnd = Op.Target->Offset + ChunkSize; - while ((WriteOpIndex + WriteCount) < WriteOps.size()) - { - const WriteOp& NextOp = WriteOps[WriteOpIndex + WriteCount]; - if (NextOp.Target->SequenceIndex != Op.Target->SequenceIndex) - { - break; - } - if (NextOp.Target->Offset != OpTargetEnd) - { - break; - } - if (NextOp.CacheFileOffset != OpSourceEnd) - { - break; - } - const uint64_t NextChunkLength = RemoteContent.ChunkedContent.ChunkRawSizes[NextOp.ChunkIndex]; - if (ReadLength + NextChunkLength > 512u * 1024u) - { - break; - } - ReadLength += NextChunkLength; - OpSourceEnd += NextChunkLength; - OpTargetEnd += NextChunkLength; - WriteCount++; - } - - { - bool DidClone = false; - - if (CanCloneSource) - { - uint64_t PreBytes = 0; - uint64_t PostBytes = 0; - uint64_t ClonableBytes = CloneQuery->GetClonableRange(Op.CacheFileOffset, - Op.Target->Offset, - ReadLength, - PreBytes, - PostBytes); - if (ClonableBytes > 0) - { - // We need to open the file... - BufferedWriteFileCache::Local::Writer* Writer = LocalWriter.GetWriter(RemoteSequenceIndex); - if (!Writer) - { - Writer = - LocalWriter.PutWriter(RemoteSequenceIndex, - std::make_unique<BufferedWriteFileCache::Local::Writer>()); - - Writer->File = std::make_unique<BasicFile>(); - - const std::filesystem::path FileName = GetTempChunkedSequenceFileName( - CacheFolderPath, - RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]); - Writer->File->Open(FileName, BasicFile::Mode::kWrite); - if (UseSparseFiles) - { - PrepareFileForScatteredWrite(Writer->File->Handle(), TargetSize); - } - } - DidClone = CloneQuery->TryClone(SourceFile.Handle(), - Writer->File->Handle(), - Op.CacheFileOffset + PreBytes, - Op.Target->Offset + PreBytes, - ClonableBytes, - TargetSize); - if (DidClone) - { - DiskStats.WriteCount++; - DiskStats.WriteByteCount += ClonableBytes; - - DiskStats.CloneCount++; - DiskStats.CloneByteCount += ClonableBytes; - - if (PreBytes > 0) - { - CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, PreBytes); - const uint64_t FileOffset = Op.Target->Offset; - - WriteSequenceChunk(CacheFolderPath, - RemoteContent, - LocalWriter, - ChunkSource, - RemoteSequenceIndex, - FileOffset, - RemotePathIndex, - DiskStats); - } - if (PostBytes > 0) - { - CompositeBuffer ChunkSource = - SourceFile.GetRange(Op.CacheFileOffset + ReadLength - PostBytes, PostBytes); - const uint64_t FileOffset = Op.Target->Offset + ReadLength - PostBytes; - - WriteSequenceChunk(CacheFolderPath, - RemoteContent, - LocalWriter, - ChunkSource, - RemoteSequenceIndex, - FileOffset, - RemotePathIndex, - DiskStats); - } - } - } - } - - if (!DidClone) - { - CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ReadLength); - - const uint64_t FileOffset = Op.Target->Offset; - - WriteSequenceChunk(CacheFolderPath, - RemoteContent, - LocalWriter, - ChunkSource, - RemoteSequenceIndex, - FileOffset, - RemotePathIndex, - DiskStats); - } - } - - CacheLocalFileBytesRead += ReadLength; // TODO: This should be the sum of unique chunk sizes? - - WriteOpIndex += WriteCount; - } - } - if (!AbortFlag) - { - // Write tracking, updating this must be done without any files open (BufferedWriteFileCache::Local) - std::vector<uint32_t> CompletedChunkSequences; - for (const WriteOp& Op : WriteOps) - { - const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex; - if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters)) - { - CompletedChunkSequences.push_back(RemoteSequenceIndex); - } - } - WriteCache.Close(CompletedChunkSequences); - VerifyAndCompleteChunkSequencesAsync(CacheFolderPath, - RemoteContent, - RemoteLookup, - CompletedChunkSequences, - Work, - WritePool); - ZEN_CONSOLE_VERBOSE("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), SourceFilePath); - } - WritePartsComplete++; - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); - } - } - }); - } - - for (uint32_t BlockIndex : CachedChunkBlockIndexes) - { - ZEN_ASSERT(!Options.PrimeCacheOnly); - if (AbortFlag) - { - break; - } - - Work.ScheduleWork( - WritePool, - [&Options, - &CacheFolderPath, - &RemoteContent, - &RemoteLookup, - &RemoteChunkIndexNeedsCopyFromSourceFlags, - &SequenceIndexChunksLeftToWriteCounters, - &WriteCache, - &Work, - &WritePool, - &BlockDescriptions, - &FilteredWrittenBytesPerSecond, - &DiskStats, - &WritePartsComplete, - TotalPartWriteCount, - BlockIndex](std::atomic<bool>&) mutable { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_WriteCachedBlock"); - - const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; - FilteredWrittenBytesPerSecond.Start(); - - std::filesystem::path BlockChunkPath = - ZenTempBlockFolderPath(Options.ZenFolderPath) / BlockDescription.BlockHash.ToHexString(); - IoBuffer BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); - if (!BlockBuffer) - { - throw std::runtime_error( - fmt::format("Can not read block {} at {}", BlockDescription.BlockHash, BlockChunkPath)); - } - - if (!AbortFlag) - { - if (!WriteBlockToDisk(CacheFolderPath, - RemoteContent, - BlockDescription, - SequenceIndexChunksLeftToWriteCounters, - Work, - WritePool, - CompositeBuffer(std::move(BlockBuffer)), - RemoteLookup, - RemoteChunkIndexNeedsCopyFromSourceFlags, - WriteCache, - DiskStats)) - { - std::error_code DummyEc; - RemoveFile(BlockChunkPath, DummyEc); - throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash)); - } - - TryRemoveFile(BlockChunkPath); - - WritePartsComplete++; - - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); - } - } - } - }); - } - - for (size_t BlockRangeIndex = 0; BlockRangeIndex < BlockRangeWorks.size(); BlockRangeIndex++) - { - ZEN_ASSERT(!Options.PrimeCacheOnly); - if (AbortFlag) - { - break; - } - const BlockRangeDescriptor BlockRange = BlockRangeWorks[BlockRangeIndex]; - ZEN_ASSERT(BlockRange.BlockIndex != (uint32_t)-1); - const uint32_t BlockIndex = BlockRange.BlockIndex; - Work.ScheduleWork( - NetworkPool, - [&Storage, - &Options, - BuildId, - &RemoteLookup, - &BlockDescriptions, - &RemoteChunkIndexNeedsCopyFromSourceFlags, - &CacheFolderPath, - &RemoteContent, - &SequenceIndexChunksLeftToWriteCounters, - &ExistsResult, - &WriteCache, - &FilteredDownloadedBytesPerSecond, - TotalRequestCount, - &WritePartsComplete, - TotalPartWriteCount, - &FilteredWrittenBytesPerSecond, - &DiskStats, - &DownloadStats, - &Work, - &WritePool, - BlockIndex, - BlockRange](std::atomic<bool>&) { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_GetPartialBlock"); - - const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; - - FilteredDownloadedBytesPerSecond.Start(); - IoBuffer BlockBuffer; - if (Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash)) - { - BlockBuffer = Storage.BuildCacheStorage->GetBuildBlob(BuildId, - BlockDescription.BlockHash, - BlockRange.RangeStart, - BlockRange.RangeLength); - } - if (!BlockBuffer) - { - BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId, - BlockDescription.BlockHash, - BlockRange.RangeStart, - BlockRange.RangeLength); - } - if (!BlockBuffer) - { - throw std::runtime_error(fmt::format("Block {} is missing when fetching range {} -> {}", - BlockDescription.BlockHash, - BlockRange.RangeStart, - BlockRange.RangeStart + BlockRange.RangeLength)); - } - if (!AbortFlag) - { - uint64_t BlockSize = BlockBuffer.GetSize(); - DownloadStats.DownloadedBlockCount++; - DownloadStats.DownloadedBlockByteCount += BlockSize; - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - - std::filesystem::path BlockChunkPath; - - // Check if the dowloaded block is file based and we can move it directly without rewriting it - { - IoBufferFileReference FileRef; - if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && - (FileRef.FileChunkSize == BlockSize)) - { - ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock"); - - std::error_code Ec; - std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); - if (!Ec) - { - BlockBuffer.SetDeleteOnClose(false); - BlockBuffer = {}; - BlockChunkPath = - ZenTempBlockFolderPath(Options.ZenFolderPath) / fmt::format("{}_{:x}_{:x}", - BlockDescription.BlockHash, - BlockRange.RangeStart, - BlockRange.RangeLength); - RenameFile(TempBlobPath, BlockChunkPath, Ec); - if (Ec) - { - BlockChunkPath = std::filesystem::path{}; - - // Re-open the temp file again - BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); - BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); - BlockBuffer.SetDeleteOnClose(true); - } - } - } - } - - if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u)) - { - ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock"); - // Could not be moved and rather large, lets store it on disk - BlockChunkPath = ZenTempBlockFolderPath(Options.ZenFolderPath) / fmt::format("{}_{:x}_{:x}", - BlockDescription.BlockHash, - BlockRange.RangeStart, - BlockRange.RangeLength); - TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); - BlockBuffer = {}; - } - - if (!AbortFlag) - { - Work.ScheduleWork( - WritePool, - [&CacheFolderPath, - &RemoteContent, - &RemoteLookup, - &BlockDescriptions, - &RemoteChunkIndexNeedsCopyFromSourceFlags, - &SequenceIndexChunksLeftToWriteCounters, - &WritePartsComplete, - &WriteCache, - &Work, - TotalPartWriteCount, - &WritePool, - &DiskStats, - &FilteredWrittenBytesPerSecond, - BlockIndex, - BlockRange, - BlockChunkPath, - BlockPartialBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_WritePartialBlock"); - - const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; - - if (BlockChunkPath.empty()) - { - ZEN_ASSERT(BlockPartialBuffer); - } - else - { - ZEN_ASSERT(!BlockPartialBuffer); - BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); - if (!BlockPartialBuffer) - { - throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}", - BlockDescription.BlockHash, - BlockChunkPath)); - } - } - - FilteredWrittenBytesPerSecond.Start(); - - if (!WritePartialBlockToDisk( - CacheFolderPath, - RemoteContent, - BlockDescription, - SequenceIndexChunksLeftToWriteCounters, - Work, - WritePool, - CompositeBuffer(std::move(BlockPartialBuffer)), - BlockRange.ChunkBlockIndexStart, - BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1, - RemoteLookup, - RemoteChunkIndexNeedsCopyFromSourceFlags, - WriteCache, - DiskStats)) - { - std::error_code DummyEc; - RemoveFile(BlockChunkPath, DummyEc); - throw std::runtime_error( - fmt::format("Partial block {} is malformed", BlockDescription.BlockHash)); - } - - if (!BlockChunkPath.empty()) - { - TryRemoveFile(BlockChunkPath); - } - - WritePartsComplete++; - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); - } - } - }); - } - } - } - }); - } - - for (uint32_t BlockIndex : FullBlockWorks) - { - if (AbortFlag) - { - break; - } - - if (Options.PrimeCacheOnly && ExistsResult.ExistingBlobs.contains(BlockDescriptions[BlockIndex].BlockHash)) - { - DownloadStats.RequestsCompleteCount++; - continue; - } - - Work.ScheduleWork( - NetworkPool, - [&Storage, - &Options, - BuildId, - &BlockDescriptions, - &WritePartsComplete, - TotalPartWriteCount, - &FilteredWrittenBytesPerSecond, - &ExistsResult, - &Work, - &WritePool, - &RemoteContent, - &RemoteLookup, - &WriteCache, - &CacheFolderPath, - &RemoteChunkIndexNeedsCopyFromSourceFlags, - &SequenceIndexChunksLeftToWriteCounters, - &FilteredDownloadedBytesPerSecond, - &WriteChunkStats, - &DiskStats, - &DownloadStats, - TotalRequestCount, - BlockIndex](std::atomic<bool>&) { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_GetFullBlock"); - - const ChunkBlockDescription& BlockDescription = BlockDescriptions[BlockIndex]; - - FilteredDownloadedBytesPerSecond.Start(); - - IoBuffer BlockBuffer; - const bool ExistsInCache = - Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash); - if (ExistsInCache) - { - BlockBuffer = Storage.BuildCacheStorage->GetBuildBlob(BuildId, BlockDescription.BlockHash); - } - if (!BlockBuffer) - { - BlockBuffer = Storage.BuildStorage->GetBuildBlob(BuildId, BlockDescription.BlockHash); - if (BlockBuffer && Storage.BuildCacheStorage) - { - Storage.BuildCacheStorage->PutBuildBlob(BuildId, - BlockDescription.BlockHash, - ZenContentType::kCompressedBinary, - CompositeBuffer(SharedBuffer(BlockBuffer))); - } - } - if (!BlockBuffer) - { - throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash)); - } - if (!AbortFlag) - { - uint64_t BlockSize = BlockBuffer.GetSize(); - DownloadStats.DownloadedBlockCount++; - DownloadStats.DownloadedBlockByteCount += BlockSize; - DownloadStats.RequestsCompleteCount++; - if (DownloadStats.RequestsCompleteCount == TotalRequestCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - - if (!Options.PrimeCacheOnly) - { - std::filesystem::path BlockChunkPath; - - // Check if the dowloaded block is file based and we can move it directly without rewriting it - { - IoBufferFileReference FileRef; - if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && - (FileRef.FileChunkSize == BlockSize)) - { - ZEN_TRACE_CPU("UpdateFolder_MoveTempBlock"); - std::error_code Ec; - std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec); - if (!Ec) - { - BlockBuffer.SetDeleteOnClose(false); - BlockBuffer = {}; - BlockChunkPath = ZenTempBlockFolderPath(Options.ZenFolderPath) / - BlockDescription.BlockHash.ToHexString(); - RenameFile(TempBlobPath, BlockChunkPath, Ec); - if (Ec) - { - BlockChunkPath = std::filesystem::path{}; - - // Re-open the temp file again - BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete); - BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true); - BlockBuffer.SetDeleteOnClose(true); - } - } - } - } - - if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u)) - { - ZEN_TRACE_CPU("UpdateFolder_WriteTempBlock"); - // Could not be moved and rather large, lets store it on disk - BlockChunkPath = - ZenTempBlockFolderPath(Options.ZenFolderPath) / BlockDescription.BlockHash.ToHexString(); - TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer); - BlockBuffer = {}; - } - - if (!AbortFlag) - { - Work.ScheduleWork(WritePool, - [&Work, - &WritePool, - &RemoteContent, - &RemoteLookup, - CacheFolderPath, - &RemoteChunkIndexNeedsCopyFromSourceFlags, - &SequenceIndexChunksLeftToWriteCounters, - BlockIndex, - &BlockDescriptions, - &WriteCache, - &WriteChunkStats, - &DiskStats, - &WritePartsComplete, - TotalPartWriteCount, - &FilteredWrittenBytesPerSecond, - BlockChunkPath, - BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable { - if (!AbortFlag) - { - ZEN_TRACE_CPU("UpdateFolder_WriteFullBlock"); - - const ChunkBlockDescription& BlockDescription = - BlockDescriptions[BlockIndex]; - - if (BlockChunkPath.empty()) - { - ZEN_ASSERT(BlockBuffer); - } - else - { - ZEN_ASSERT(!BlockBuffer); - BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath); - if (!BlockBuffer) - { - throw std::runtime_error( - fmt::format("Could not open dowloaded block {} from {}", - BlockDescription.BlockHash, - BlockChunkPath)); - } - } - - FilteredWrittenBytesPerSecond.Start(); - if (!WriteBlockToDisk(CacheFolderPath, - RemoteContent, - BlockDescription, - SequenceIndexChunksLeftToWriteCounters, - Work, - WritePool, - CompositeBuffer(std::move(BlockBuffer)), - RemoteLookup, - RemoteChunkIndexNeedsCopyFromSourceFlags, - WriteCache, - DiskStats)) - { - std::error_code DummyEc; - RemoveFile(BlockChunkPath, DummyEc); - throw std::runtime_error( - fmt::format("Block {} is malformed", BlockDescription.BlockHash)); - } - - if (!BlockChunkPath.empty()) - { - TryRemoveFile(BlockChunkPath); - } - - WritePartsComplete++; - - if (WritePartsComplete == TotalPartWriteCount) - { - FilteredWrittenBytesPerSecond.Stop(); - } - } - }); - } - } - } - } - }); - } - - { - ZEN_TRACE_CPU("WriteChunks_Wait"); - - Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(PendingWork); - uint64_t DownloadedBytes = DownloadStats.DownloadedChunkByteCount.load() + - DownloadStats.DownloadedBlockByteCount.load() + - +DownloadStats.DownloadedPartialBlockByteCount.load(); - FilteredWrittenBytesPerSecond.Update(DiskStats.WriteByteCount.load()); - FilteredDownloadedBytesPerSecond.Update(DownloadedBytes); - std::string DownloadRateString = - (DownloadStats.RequestsCompleteCount == TotalRequestCount) - ? "" - : fmt::format(" {}bits/s", NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8)); - std::string CloneDetails; - if (DiskStats.CloneCount.load() > 0) - { - CloneDetails = fmt::format(" ({} cloned)", NiceBytes(DiskStats.CloneByteCount.load())); - } - std::string WriteDetails = Options.PrimeCacheOnly ? "" - : fmt::format(" {}/{} ({}B/s) written{}.", - NiceBytes(DiskStats.WriteByteCount.load()), - NiceBytes(BytesToWrite), - NiceNum(FilteredWrittenBytesPerSecond.GetCurrent()), - CloneDetails); - std::string Details = fmt::format("{}/{} ({}{}) downloaded.{}", - DownloadStats.RequestsCompleteCount.load(), - TotalRequestCount, - NiceBytes(DownloadedBytes), - DownloadRateString, - WriteDetails); - WriteProgressBar.UpdateState( - {.Task = Options.PrimeCacheOnly ? "Downloading " : "Writing chunks ", - .Details = Details, - .TotalCount = Options.PrimeCacheOnly ? TotalRequestCount : BytesToWrite, - .RemainingCount = Options.PrimeCacheOnly ? (TotalRequestCount - DownloadStats.RequestsCompleteCount.load()) - : (BytesToWrite - DiskStats.WriteByteCount.load()), - .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, - false); - }); - } - - CloneQuery.reset(); - - FilteredWrittenBytesPerSecond.Stop(); - FilteredDownloadedBytesPerSecond.Stop(); - - WriteProgressBar.Finish(); - if (AbortFlag) - { - return; - } - - if (!Options.PrimeCacheOnly) - { - uint32_t RawSequencesMissingWriteCount = 0; - for (uint32_t SequenceIndex = 0; SequenceIndex < SequenceIndexChunksLeftToWriteCounters.size(); SequenceIndex++) - { - const auto& SequenceIndexChunksLeftToWriteCounter = SequenceIndexChunksLeftToWriteCounters[SequenceIndex]; - if (SequenceIndexChunksLeftToWriteCounter.load() != 0) - { - RawSequencesMissingWriteCount++; - const uint32_t PathIndex = RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex]; - const std::filesystem::path& IncompletePath = RemoteContent.Paths[PathIndex]; - ZEN_ASSERT(!IncompletePath.empty()); - const uint32_t ExpectedSequenceCount = RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]; - if (!IsQuiet) - { - ZEN_CONSOLE("{}: Max count {}, Current count {}", - IncompletePath, - ExpectedSequenceCount, - SequenceIndexChunksLeftToWriteCounter.load()); - } - ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounter.load() <= ExpectedSequenceCount); - } - } - ZEN_ASSERT(RawSequencesMissingWriteCount == 0); - } - - const uint64_t DownloadedBytes = DownloadStats.DownloadedChunkByteCount.load() + DownloadStats.DownloadedBlockByteCount.load() + - +DownloadStats.DownloadedPartialBlockByteCount.load(); - if (!IsQuiet) - { - std::string CloneDetails; - if (DiskStats.CloneCount.load() > 0) - { - CloneDetails = fmt::format(" ({} cloned)", NiceBytes(DiskStats.CloneByteCount.load())); - } - ZEN_CONSOLE("Downloaded {} ({}bits/s) in {}. Wrote {} ({}B/s){} in {}. Completed in {}", - NiceBytes(DownloadedBytes), - NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), DownloadedBytes * 8)), - NiceTimeSpanMs(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS() / 1000), - NiceBytes(DiskStats.WriteByteCount.load()), - NiceNum(GetBytesPerSecond(FilteredWrittenBytesPerSecond.GetElapsedTimeUS(), DiskStats.WriteByteCount.load())), - CloneDetails, - NiceTimeSpanMs(FilteredWrittenBytesPerSecond.GetElapsedTimeUS() / 1000), - NiceTimeSpanMs(WriteTimer.GetElapsedTimeMs())); - } - - WriteChunkStats.WriteChunksElapsedWallTimeUs = WriteTimer.GetElapsedTimeUs(); - WriteChunkStats.DownloadTimeUs = FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(); - WriteChunkStats.WriteTimeUs = FilteredWrittenBytesPerSecond.GetElapsedTimeUS(); - } - - if (Options.PrimeCacheOnly) - { - return; - } - - tsl::robin_map<uint32_t, uint32_t> RemotePathIndexToLocalPathIndex; - RemotePathIndexToLocalPathIndex.reserve(RemoteContent.Paths.size()); - - tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> SequenceHashToLocalPathIndex; - std::vector<uint32_t> RemoveLocalPathIndexes; - - if (AbortFlag) - { - return; - } - - { - ZEN_TRACE_CPU("UpdateFolder_PrepareTarget"); - - tsl::robin_set<IoHash, IoHash::Hasher> CachedRemoteSequences; - tsl::robin_map<std::string, uint32_t> RemotePathToRemoteIndex; - RemotePathToRemoteIndex.reserve(RemoteContent.Paths.size()); - for (uint32_t RemotePathIndex = 0; RemotePathIndex < RemoteContent.Paths.size(); RemotePathIndex++) - { - RemotePathToRemoteIndex.insert({RemoteContent.Paths[RemotePathIndex].generic_string(), RemotePathIndex}); - } - - std::vector<uint32_t> FilesToCache; - - uint64_t MatchCount = 0; - uint64_t PathMismatchCount = 0; - uint64_t HashMismatchCount = 0; - std::atomic<uint64_t> CachedCount = 0; - std::atomic<uint64_t> CachedByteCount = 0; - uint64_t SkippedCount = 0; - uint64_t DeleteCount = 0; - for (uint32_t LocalPathIndex = 0; LocalPathIndex < LocalContent.Paths.size(); LocalPathIndex++) - { - if (AbortFlag) - { - break; - } - const IoHash& RawHash = LocalContent.RawHashes[LocalPathIndex]; - const std::filesystem::path& LocalPath = LocalContent.Paths[LocalPathIndex]; - - ZEN_ASSERT_SLOW(IsFile((Path / LocalContent.Paths[LocalPathIndex]).make_preferred())); - - if (!Options.WipeTargetFolder) - { - if (auto RemotePathIt = RemotePathToRemoteIndex.find(LocalPath.generic_string()); - RemotePathIt != RemotePathToRemoteIndex.end()) - { - const uint32_t RemotePathIndex = RemotePathIt->second; - if (RemoteContent.RawHashes[RemotePathIndex] == RawHash) - { - // It is already in it's desired place - RemotePathIndexToLocalPathIndex[RemotePathIndex] = LocalPathIndex; - SequenceHashToLocalPathIndex.insert({RawHash, LocalPathIndex}); - MatchCount++; - continue; - } - else - { - HashMismatchCount++; - } - } - else - { - PathMismatchCount++; - } - } - if (RemoteLookup.RawHashToSequenceIndex.contains(RawHash)) - { - if (!CachedRemoteSequences.contains(RawHash)) - { - ZEN_TRACE_CPU("MoveToCache"); - // We need it - FilesToCache.push_back(LocalPathIndex); - CachedRemoteSequences.insert(RawHash); - } - else - { - // We already have it - SkippedCount++; - } - } - else if (!Options.WipeTargetFolder) - { - // We don't need it - RemoveLocalPathIndexes.push_back(LocalPathIndex); - DeleteCount++; - } - } - - if (AbortFlag) - { - return; - } - - { - ZEN_TRACE_CPU("UpdateFolder_CopyToCache"); - - Stopwatch Timer; - - WorkerThreadPool& WritePool = GetIOWorkerPool(); - - ProgressBar CacheLocalProgressBar(ProgressMode, "Cache Local Data"); - ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - - for (uint32_t LocalPathIndex : FilesToCache) - { - if (AbortFlag) - { - break; - } - Work.ScheduleWork( - WritePool, - [&Path, &LocalContent, &CacheFolderPath, &CachedCount, &CachedByteCount, LocalPathIndex](std::atomic<bool>&) { - ZEN_TRACE_CPU("UpdateFolder_AsyncCopyToCache"); - if (!AbortFlag) - { - const IoHash& RawHash = LocalContent.RawHashes[LocalPathIndex]; - const std::filesystem::path& LocalPath = LocalContent.Paths[LocalPathIndex]; - const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash); - ZEN_ASSERT_SLOW(!IsFileWithRetry(CacheFilePath)); - const std::filesystem::path LocalFilePath = (Path / LocalPath).make_preferred(); - RenameFileWithRetry(LocalFilePath, CacheFilePath); - CachedCount++; - CachedByteCount += LocalContent.RawSizes[LocalPathIndex]; - } - }); - } - - { - ZEN_TRACE_CPU("CacheLocal_Wait"); - - Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(PendingWork); - const uint64_t WorkTotal = FilesToCache.size(); - const uint64_t WorkComplete = CachedCount.load(); - std::string Details = fmt::format("{}/{} ({}) files", WorkComplete, WorkTotal, NiceBytes(CachedByteCount)); - CacheLocalProgressBar.UpdateState({.Task = "Caching local ", - .Details = Details, - .TotalCount = gsl::narrow<uint64_t>(WorkTotal), - .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete), - .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, - false); - }); - } - - CacheLocalProgressBar.Finish(); - if (AbortFlag) - { - return; - } - - ZEN_DEBUG( - "Local state prep: Match: {}, PathMismatch: {}, HashMismatch: {}, Cached: {} ({}), Skipped: {}, " - "Delete: {}", - MatchCount, - PathMismatchCount, - HashMismatchCount, - CachedCount.load(), - NiceBytes(CachedByteCount.load()), - SkippedCount, - DeleteCount); - } - } - - if (Options.WipeTargetFolder) - { - ZEN_TRACE_CPU("UpdateFolder_WipeTarget"); - Stopwatch Timer; - - // Clean target folder - if (!CleanDirectory(Path, DefaultExcludeFolders)) - { - ZEN_CONSOLE_WARN("Some files in {} could not be removed", Path); - } - RebuildFolderStateStats.CleanFolderElapsedWallTimeUs = Timer.GetElapsedTimeUs(); - } - - if (AbortFlag) - { - return; - } - - { - ZEN_TRACE_CPU("UpdateFolder_FinalizeTree"); - Stopwatch Timer; - - WorkerThreadPool& WritePool = GetIOWorkerPool(); - - ProgressBar RebuildProgressBar(ProgressMode, "Rebuild State"); - ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - - OutLocalFolderState.Paths.resize(RemoteContent.Paths.size()); - OutLocalFolderState.RawSizes.resize(RemoteContent.Paths.size()); - OutLocalFolderState.Attributes.resize(RemoteContent.Paths.size()); - OutLocalFolderState.ModificationTicks.resize(RemoteContent.Paths.size()); - - std::atomic<uint64_t> DeletedCount = 0; - - for (uint32_t LocalPathIndex : RemoveLocalPathIndexes) - { - if (AbortFlag) - { - break; - } - Work.ScheduleWork(WritePool, [&Path, &LocalContent, &DeletedCount, LocalPathIndex](std::atomic<bool>&) { - if (!AbortFlag) - { - ZEN_TRACE_CPU("FinalizeTree_Remove"); - const std::filesystem::path LocalFilePath = (Path / LocalContent.Paths[LocalPathIndex]).make_preferred(); - SetFileReadOnlyWithRetry(LocalFilePath, false); - RemoveFileWithRetry(LocalFilePath); - DeletedCount++; - } - }); - } - - std::atomic<uint64_t> TargetsComplete = 0; - - struct FinalizeTarget - { - IoHash RawHash; - uint32_t RemotePathIndex; - }; - - std::vector<FinalizeTarget> Targets; - Targets.reserve(RemoteContent.Paths.size()); - for (uint32_t RemotePathIndex = 0; RemotePathIndex < RemoteContent.Paths.size(); RemotePathIndex++) - { - Targets.push_back(FinalizeTarget{.RawHash = RemoteContent.RawHashes[RemotePathIndex], .RemotePathIndex = RemotePathIndex}); - } - std::sort(Targets.begin(), Targets.end(), [](const FinalizeTarget& Lhs, const FinalizeTarget& Rhs) { - if (Lhs.RawHash < Rhs.RawHash) - { - return true; - } - else if (Lhs.RawHash > Rhs.RawHash) - { - return false; - } - return Lhs.RemotePathIndex < Rhs.RemotePathIndex; - }); - - size_t TargetOffset = 0; - while (TargetOffset < Targets.size()) - { - if (AbortFlag) - { - break; - } - - size_t TargetCount = 1; - while ((TargetOffset + TargetCount) < Targets.size() && - (Targets[TargetOffset + TargetCount].RawHash == Targets[TargetOffset].RawHash)) - { - TargetCount++; - } - - Work.ScheduleWork( - WritePool, - [&Path, - &LocalContent, - &SequenceHashToLocalPathIndex, - &RemoteContent, - &RemoteLookup, - &CacheFolderPath, - &Targets, - &RemotePathIndexToLocalPathIndex, - &RebuildFolderStateStats, - &OutLocalFolderState, - BaseTargetOffset = TargetOffset, - TargetCount, - &TargetsComplete](std::atomic<bool>&) { - if (!AbortFlag) - { - ZEN_TRACE_CPU("FinalizeTree_Work"); - - size_t TargetOffset = BaseTargetOffset; - const IoHash& RawHash = Targets[TargetOffset].RawHash; - - if (RawHash == IoHash::Zero) - { - ZEN_TRACE_CPU("ZeroSize"); - while (TargetOffset < (BaseTargetOffset + TargetCount)) - { - const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex; - ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash); - const std::filesystem::path& TargetPath = RemoteContent.Paths[RemotePathIndex]; - std::filesystem::path TargetFilePath = (Path / TargetPath).make_preferred(); - if (!RemotePathIndexToLocalPathIndex[RemotePathIndex]) - { - if (IsFileWithRetry(TargetFilePath)) - { - SetFileReadOnlyWithRetry(TargetFilePath, false); - } - else - { - CreateDirectories(TargetFilePath.parent_path()); - } - BasicFile OutputFile; - OutputFile.Open(TargetFilePath, BasicFile::Mode::kTruncate); - } - OutLocalFolderState.Paths[RemotePathIndex] = TargetPath; - OutLocalFolderState.RawSizes[RemotePathIndex] = RemoteContent.RawSizes[RemotePathIndex]; - - OutLocalFolderState.Attributes[RemotePathIndex] = - RemoteContent.Attributes.empty() - ? GetNativeFileAttributes(TargetFilePath) - : SetNativeFileAttributes(TargetFilePath, - RemoteContent.Platform, - RemoteContent.Attributes[RemotePathIndex]); - OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath); - - TargetOffset++; - TargetsComplete++; - } - } - else - { - ZEN_TRACE_CPU("Files"); - ZEN_ASSERT(RemoteLookup.RawHashToSequenceIndex.contains(RawHash)); - const uint32_t FirstRemotePathIndex = Targets[TargetOffset].RemotePathIndex; - const std::filesystem::path& FirstTargetPath = RemoteContent.Paths[FirstRemotePathIndex]; - std::filesystem::path FirstTargetFilePath = (Path / FirstTargetPath).make_preferred(); - - if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(FirstRemotePathIndex); - InPlaceIt != RemotePathIndexToLocalPathIndex.end()) - { - ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath)); - } - else - { - if (IsFileWithRetry(FirstTargetFilePath)) - { - SetFileReadOnlyWithRetry(FirstTargetFilePath, false); - } - else - { - CreateDirectories(FirstTargetFilePath.parent_path()); - } - - if (auto InplaceIt = SequenceHashToLocalPathIndex.find(RawHash); - InplaceIt != SequenceHashToLocalPathIndex.end()) - { - ZEN_TRACE_CPU("Copy"); - const uint32_t LocalPathIndex = InplaceIt->second; - const std::filesystem::path& SourcePath = LocalContent.Paths[LocalPathIndex]; - std::filesystem::path SourceFilePath = (Path / SourcePath).make_preferred(); - ZEN_ASSERT_SLOW(IsFileWithRetry(SourceFilePath)); - - ZEN_DEBUG("Copying from '{}' -> '{}'", SourceFilePath, FirstTargetFilePath); - const uint64_t RawSize = LocalContent.RawSizes[LocalPathIndex]; - std::atomic<uint64_t> WriteCount; - std::atomic<uint64_t> WriteByteCount; - std::atomic<uint64_t> CloneCount; - std::atomic<uint64_t> CloneByteCount; - CopyFile(SourceFilePath, - FirstTargetFilePath, - RawSize, - WriteCount, - WriteByteCount, - CloneCount, - CloneByteCount); - RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++; - } - else - { - ZEN_TRACE_CPU("Rename"); - const std::filesystem::path CacheFilePath = - GetFinalChunkedSequenceFileName(CacheFolderPath, RawHash); - ZEN_ASSERT_SLOW(IsFileWithRetry(CacheFilePath)); - - RenameFileWithRetry(CacheFilePath, FirstTargetFilePath); - - RebuildFolderStateStats.FinalizeTreeFilesMovedCount++; - } - } - - OutLocalFolderState.Paths[FirstRemotePathIndex] = FirstTargetPath; - OutLocalFolderState.RawSizes[FirstRemotePathIndex] = RemoteContent.RawSizes[FirstRemotePathIndex]; - - OutLocalFolderState.Attributes[FirstRemotePathIndex] = - RemoteContent.Attributes.empty() - ? GetNativeFileAttributes(FirstTargetFilePath) - : SetNativeFileAttributes(FirstTargetFilePath, - RemoteContent.Platform, - RemoteContent.Attributes[FirstRemotePathIndex]); - OutLocalFolderState.ModificationTicks[FirstRemotePathIndex] = - GetModificationTickFromPath(FirstTargetFilePath); - - TargetOffset++; - TargetsComplete++; - - while (TargetOffset < (BaseTargetOffset + TargetCount)) - { - const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex; - ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash); - const std::filesystem::path& TargetPath = RemoteContent.Paths[RemotePathIndex]; - std::filesystem::path TargetFilePath = (Path / TargetPath).make_preferred(); - - if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(RemotePathIndex); - InPlaceIt != RemotePathIndexToLocalPathIndex.end()) - { - ZEN_ASSERT_SLOW(IsFileWithRetry(TargetFilePath)); - } - else - { - ZEN_TRACE_CPU("Copy"); - if (IsFileWithRetry(TargetFilePath)) - { - SetFileReadOnlyWithRetry(TargetFilePath, false); - } - else - { - CreateDirectories(TargetFilePath.parent_path()); - } - - ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath)); - ZEN_DEBUG("Copying from '{}' -> '{}'", FirstTargetFilePath, TargetFilePath); - const uint64_t RawSize = RemoteContent.RawSizes[RemotePathIndex]; - std::atomic<uint64_t> WriteCount; - std::atomic<uint64_t> WriteByteCount; - std::atomic<uint64_t> CloneCount; - std::atomic<uint64_t> CloneByteCount; - CopyFile(FirstTargetFilePath, - TargetFilePath, - RawSize, - WriteCount, - WriteByteCount, - CloneCount, - CloneByteCount); - RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++; - } - - OutLocalFolderState.Paths[RemotePathIndex] = TargetPath; - OutLocalFolderState.RawSizes[RemotePathIndex] = RemoteContent.RawSizes[RemotePathIndex]; - - OutLocalFolderState.Attributes[RemotePathIndex] = - RemoteContent.Attributes.empty() - ? GetNativeFileAttributes(TargetFilePath) - : SetNativeFileAttributes(TargetFilePath, - RemoteContent.Platform, - RemoteContent.Attributes[RemotePathIndex]); - OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath); - - TargetOffset++; - TargetsComplete++; - } - } - } - }); - - TargetOffset += TargetCount; - } - - { - ZEN_TRACE_CPU("FinalizeTree_Wait"); - - Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(PendingWork); - const uint64_t WorkTotal = Targets.size() + RemoveLocalPathIndexes.size(); - const uint64_t WorkComplete = TargetsComplete.load() + DeletedCount.load(); - std::string Details = fmt::format("{}/{} files", WorkComplete, WorkTotal); - RebuildProgressBar.UpdateState({.Task = "Rebuilding state ", - .Details = Details, - .TotalCount = gsl::narrow<uint64_t>(WorkTotal), - .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete), - .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, - false); - }); - } - - RebuildFolderStateStats.FinalizeTreeElapsedWallTimeUs = Timer.GetElapsedTimeUs(); - RebuildProgressBar.Finish(); - } - } - std::string GetCbObjectAsNiceString(CbObjectView Object, std::string_view Prefix, std::string_view Suffix) { ExtendableStringBuilder<512> SB; @@ -9974,30 +5641,46 @@ namespace { RebuildFolderStateStatistics RebuildFolderStateStats; VerifyFolderStatistics VerifyFolderStats; + const ChunkedContentLookup LocalLookup = BuildChunkedContentLookup(LocalContent); + const ChunkedContentLookup RemoteLookup = BuildChunkedContentLookup(RemoteContent); + ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::Download, TaskSteps::StepCount); - UpdateFolder(Storage, - BuildId, - Path, - LocalContent, - RemoteContent, - BlockDescriptions, - LooseChunkHashes, - UpdateOptions{.SystemRootDir = Options.SystemRootDir, - .ZenFolderPath = Options.ZenFolderPath, - .LargeAttachmentSize = LargeAttachmentSize, - .PreferredMultipartChunkSize = PreferredMultipartChunkSize, - .PartialBlockRequestMode = Options.PartialBlockRequestMode, - .WipeTargetFolder = Options.CleanTargetFolder, - .PrimeCacheOnly = Options.PrimeCacheOnly, - .EnableOtherDownloadsScavenging = Options.EnableOtherDownloadsScavenging, - .EnableTargetFolderScavenging = Options.EnableTargetFolderScavenging}, - LocalFolderState, - DiskStats, - CacheMappingStats, - DownloadStats, - WriteChunkStats, - RebuildFolderStateStats); + { + ConsoleOpLogOutput Output(ProgressMode); + BuildsOperationUpdateFolder Updater( + Output, + Storage, + AbortFlag, + PauseFlag, + GetIOWorkerPool(), + GetNetworkPool(), + BuildId, + Path, + LocalContent, + LocalLookup, + RemoteContent, + RemoteLookup, + BlockDescriptions, + LooseChunkHashes, + BuildsOperationUpdateFolder::Options{.IsQuiet = IsQuiet, + .IsVerbose = IsVerbose, + .AllowFileClone = AllowFileClone, + .UseSparseFiles = UseSparseFiles, + .SystemRootDir = Options.SystemRootDir, + .ZenFolderPath = Options.ZenFolderPath, + .LargeAttachmentSize = LargeAttachmentSize, + .PreferredMultipartChunkSize = PreferredMultipartChunkSize, + .PartialBlockRequestMode = Options.PartialBlockRequestMode, + .WipeTargetFolder = Options.CleanTargetFolder, + .PrimeCacheOnly = Options.PrimeCacheOnly, + .EnableOtherDownloadsScavenging = Options.EnableOtherDownloadsScavenging, + .EnableTargetFolderScavenging = Options.EnableTargetFolderScavenging, + .DefaultExcludeFolders = DefaultExcludeFolders, + .DefaultExcludeExtensions = DefaultExcludeExtensions}, + DiskStats); + Updater.Execute(LocalFolderState); + } if (!AbortFlag) { @@ -10195,7 +5878,7 @@ namespace { { std::unique_ptr<ChunkingController> ChunkController = CreateChunkingControllerWithFixedChunking(ChunkingControllerWithFixedChunkingSettings{}); - std::vector<std::string_view> ExcludeExtensions = DefaultExcludeExtensions; + std::vector<std::string> ExcludeExtensions = DefaultExcludeExtensions; if (OnlyChunked) { ExcludeExtensions.insert(ExcludeExtensions.end(), @@ -10204,7 +5887,7 @@ namespace { } auto IsAcceptedFolder = [ExcludeFolders = DefaultExcludeFolders](const std::string_view& RelativePath) -> bool { - for (const std::string_view& ExcludeFolder : ExcludeFolders) + for (const std::string& ExcludeFolder : ExcludeFolders) { if (RelativePath.starts_with(ExcludeFolder)) { @@ -10222,7 +5905,7 @@ namespace { }; auto IsAcceptedFile = [ExcludeExtensions](const std::string_view& RelativePath, uint64_t, uint32_t) -> bool { - for (const std::string_view& ExcludeExtension : ExcludeExtensions) + for (const std::string& ExcludeExtension : ExcludeExtensions) { if (RelativePath.ends_with(ExcludeExtension)) { @@ -12114,12 +7797,15 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) const std::filesystem::path DownloadPath = m_Path.parent_path() / (m_BuildPartName + "_test"); const std::filesystem::path DownloadPath2 = m_Path.parent_path() / (m_BuildPartName + "_test2"); + const std::filesystem::path DownloadPath3 = m_Path.parent_path() / (m_BuildPartName + "_test3"); - auto ___ = MakeGuard([DownloadPath, DownloadPath2]() { + auto ___ = MakeGuard([DownloadPath, DownloadPath2, DownloadPath3]() { CleanDirectory(DownloadPath, true); DeleteDirectories(DownloadPath); CleanDirectory(DownloadPath2, true); DeleteDirectories(DownloadPath2); + CleanDirectory(DownloadPath3, true); + DeleteDirectories(DownloadPath3); }); if (m_ZenFolderPath.empty()) @@ -12237,7 +7923,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) DownloadContent); auto IsAcceptedFolder = [ExcludeFolders = DefaultExcludeFolders, Path](const std::filesystem::path& AbsolutePath) -> bool { std::string RelativePath = std::filesystem::relative(AbsolutePath, Path).generic_string(); - for (const std::string_view& ExcludeFolder : ExcludeFolders) + for (const std::string& ExcludeFolder : ExcludeFolders) { if (RelativePath.starts_with(ExcludeFolder)) { @@ -12451,6 +8137,26 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { throw std::runtime_error("Test aborted. (Download original)"); } + + ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (original)", BuildId, BuildPartId, m_BuildPartName, DownloadPath3); + DownloadFolder(Storage, + BuildId, + {BuildPartId}, + {}, + DownloadPath3, + DownloadOptions{.SystemRootDir = m_SystemRootDir, + .ZenFolderPath = DownloadPath3 / ZenFolderName, + .AllowMultiparts = m_AllowMultiparts, + .PartialBlockRequestMode = PartialBlockRequestMode, + .CleanTargetFolder = false, + .PostDownloadVerify = true, + .PrimeCacheOnly = false, + .EnableOtherDownloadsScavenging = m_EnableScavenging, + .EnableTargetFolderScavenging = true}); + if (AbortFlag) + { + throw std::runtime_error("Test aborted. (Download original)"); + } } } |