aboutsummaryrefslogtreecommitdiff
path: root/src/zenremotestore/builds/buildupdatefolder.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenremotestore/builds/buildupdatefolder.cpp')
-rw-r--r--src/zenremotestore/builds/buildupdatefolder.cpp4947
1 files changed, 4947 insertions, 0 deletions
diff --git a/src/zenremotestore/builds/buildupdatefolder.cpp b/src/zenremotestore/builds/buildupdatefolder.cpp
new file mode 100644
index 000000000..98972740a
--- /dev/null
+++ b/src/zenremotestore/builds/buildupdatefolder.cpp
@@ -0,0 +1,4947 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenremotestore/builds/buildupdatefolder.h>
+
+#include <zencore/basicfile.h>
+#include <zencore/fmtutils.h>
+#include <zencore/parallelwork.h>
+#include <zencore/scopeguard.h>
+#include <zencore/trace.h>
+#include <zenremotestore/builds/buildcontent.h>
+#include <zenremotestore/builds/buildmanifest.h>
+#include <zenremotestore/chunking/chunkingcache.h>
+#include <zenremotestore/chunking/chunkingcontroller.h>
+#include <zenremotestore/transferthreadworkers.h>
+#include <zenutil/filesystemutils.h>
+#include <zenutil/filteredrate.h>
+#include <zenutil/progress.h>
+
+#include <numeric>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <tsl/robin_set.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+using namespace std::literals;
+
+namespace {
+ std::filesystem::path ZenTempCacheFolderPath(const std::filesystem::path& ZenFolderPath)
+ {
+ return ZenTempFolderPath(ZenFolderPath) / "cache"; // Decompressed and verified data - chunks & sequences
+ }
+ std::filesystem::path ZenTempBlockFolderPath(const std::filesystem::path& ZenFolderPath)
+ {
+ return ZenTempFolderPath(ZenFolderPath) / "blocks"; // Temp storage for whole and partial blocks
+ }
+ std::filesystem::path ZenTempDownloadFolderPath(const std::filesystem::path& ZenFolderPath)
+ {
+ return ZenTempFolderPath(ZenFolderPath) / "download"; // Temp storage for decompressed and validated chunks
+ }
+ std::filesystem::path GetTempChunkedSequenceFileName(const std::filesystem::path& CacheFolderPath, const IoHash& RawHash)
+ {
+ return CacheFolderPath / (RawHash.ToHexString() + ".tmp");
+ }
+
+ std::filesystem::path GetFinalChunkedSequenceFileName(const std::filesystem::path& CacheFolderPath, const IoHash& RawHash)
+ {
+ return CacheFolderPath / RawHash.ToHexString();
+ }
+ bool CleanDirectory(LoggerRef InLog,
+ ProgressBase& Progress,
+ WorkerThreadPool& IOWorkerPool,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag,
+ bool IsQuiet,
+ const std::filesystem::path& Path,
+ std::span<const std::string> ExcludeDirectories)
+ {
+ ZEN_TRACE_CPU("CleanDirectory");
+ ZEN_SCOPED_LOG(InLog);
+ Stopwatch Timer;
+
+ std::unique_ptr<ProgressBase::ProgressBar> ProgressBar = Progress.CreateProgressBar("Clean Folder");
+
+ CleanDirectoryResult Result = CleanDirectory(
+ IOWorkerPool,
+ AbortFlag,
+ PauseFlag,
+ Path,
+ ExcludeDirectories,
+ [&](const std::string_view Details, uint64_t TotalCount, uint64_t RemainingCount, bool IsPaused, bool IsAborted) {
+ ProgressBar->UpdateState({.Task = "Cleaning folder ",
+ .Details = std::string(Details),
+ .TotalCount = TotalCount,
+ .RemainingCount = RemainingCount,
+ .Status = ProgressBase::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ },
+ Progress.GetProgressUpdateDelayMS());
+
+ ProgressBar->Finish();
+
+ if (AbortFlag)
+ {
+ return false;
+ }
+
+ uint64_t ElapsedTimeMs = Timer.GetElapsedTimeMs();
+
+ if (!Result.FailedRemovePaths.empty())
+ {
+ ExtendableStringBuilder<512> SB;
+ for (size_t FailedPathIndex = 0; FailedPathIndex < Result.FailedRemovePaths.size(); FailedPathIndex++)
+ {
+ SB << fmt::format("\n '{}': ({}) {}",
+ Result.FailedRemovePaths[FailedPathIndex].first,
+ Result.FailedRemovePaths[FailedPathIndex].second.value(),
+ Result.FailedRemovePaths[FailedPathIndex].second.message());
+ }
+ ZEN_WARN("Clean failed to remove files from '{}': {}", Path, SB.ToView());
+ }
+
+ if (ElapsedTimeMs >= 200 && !IsQuiet)
+ {
+ ZEN_INFO("Wiped folder '{}' {} ({}) in {}",
+ Path,
+ Result.FoundCount,
+ NiceBytes(Result.DeletedByteCount),
+ NiceTimeSpanMs(ElapsedTimeMs));
+ }
+
+ return Result.FailedRemovePaths.empty();
+ }
+ uint32_t SetNativeFileAttributes(const std::filesystem::path FilePath, SourcePlatform SourcePlatform, uint32_t Attributes)
+ {
+#if ZEN_PLATFORM_WINDOWS
+ if (SourcePlatform == SourcePlatform::Windows)
+ {
+ SetFileAttributesToPath(FilePath, Attributes);
+ return Attributes;
+ }
+ else
+ {
+ uint32_t CurrentAttributes = GetFileAttributesFromPath(FilePath);
+ uint32_t NewAttributes = zen::MakeFileAttributeReadOnly(CurrentAttributes, zen::IsFileModeReadOnly(Attributes));
+ if (CurrentAttributes != NewAttributes)
+ {
+ SetFileAttributesToPath(FilePath, NewAttributes);
+ }
+ return NewAttributes;
+ }
+#endif // ZEN_PLATFORM_WINDOWS
+#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ if (SourcePlatform != SourcePlatform::Windows)
+ {
+ zen::SetFileMode(FilePath, Attributes);
+ return Attributes;
+ }
+ else
+ {
+ uint32_t CurrentMode = zen::GetFileMode(FilePath);
+ uint32_t NewMode = zen::MakeFileModeReadOnly(CurrentMode, zen::IsFileAttributeReadOnly(Attributes));
+ if (CurrentMode != NewMode)
+ {
+ zen::SetFileMode(FilePath, NewMode);
+ }
+ return NewMode;
+ }
+#endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ };
+
+ uint32_t GetNativeFileAttributes(const std::filesystem::path FilePath)
+ {
+#if ZEN_PLATFORM_WINDOWS
+ return GetFileAttributesFromPath(FilePath);
+#endif // ZEN_PLATFORM_WINDOWS
+#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ return GetFileMode(FilePath);
+#endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ }
+ std::filesystem::path TryMoveDownloadedChunk(IoBuffer& BlockBuffer, const std::filesystem::path& Path, bool ForceDiskBased)
+ {
+ uint64_t BlockSize = BlockBuffer.GetSize();
+ IoBufferFileReference FileRef;
+ if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == BlockSize))
+ {
+ ZEN_TRACE_CPU("MoveTempFullBlock");
+ std::error_code Ec;
+ std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
+ if (!Ec)
+ {
+ BlockBuffer.SetDeleteOnClose(false);
+ BlockBuffer = {};
+ RenameFile(TempBlobPath, Path, Ec);
+ if (Ec)
+ {
+ // Re-open the temp file again
+ BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
+ BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true);
+ BlockBuffer.SetDeleteOnClose(true);
+ }
+ else
+ {
+ return Path;
+ }
+ }
+ }
+
+ if (ForceDiskBased)
+ {
+ // Could not be moved and rather large, lets store it on disk
+ ZEN_TRACE_CPU("WriteTempFullBlock");
+ TemporaryFile::SafeWriteFile(Path, BlockBuffer);
+ BlockBuffer = {};
+ return Path;
+ }
+
+ return {};
+ }
+ bool IsSingleFileChunk(const ChunkedFolderContent& RemoteContent,
+ const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> Locations)
+ {
+ if (Locations.size() == 1)
+ {
+ const uint32_t FirstSequenceIndex = Locations[0]->SequenceIndex;
+ if (RemoteContent.ChunkedContent.ChunkCounts[FirstSequenceIndex] == 1)
+ {
+ ZEN_ASSERT_SLOW(Locations[0]->Offset == 0);
+ return true;
+ }
+ }
+ return false;
+ }
+ IoBuffer MakeBufferMemoryBased(const CompositeBuffer& PartialBlockBuffer)
+ {
+ ZEN_TRACE_CPU("MakeBufferMemoryBased");
+ IoBuffer BlockMemoryBuffer;
+ std::span<const SharedBuffer> Segments = PartialBlockBuffer.GetSegments();
+ if (Segments.size() == 1)
+ {
+ IoBufferFileReference FileRef = {};
+ if (PartialBlockBuffer.GetSegments().front().AsIoBuffer().GetFileReference(FileRef))
+ {
+ BlockMemoryBuffer = UniqueBuffer::Alloc(FileRef.FileChunkSize).MoveToShared().AsIoBuffer();
+ BasicFile Reader;
+ Reader.Attach(FileRef.FileHandle);
+ auto _ = MakeGuard([&Reader]() { Reader.Detach(); });
+ MutableMemoryView ReadMem = BlockMemoryBuffer.GetMutableView();
+ Reader.Read(ReadMem.GetData(), FileRef.FileChunkSize, FileRef.FileChunkOffset);
+ return BlockMemoryBuffer;
+ }
+ else
+ {
+ return PartialBlockBuffer.GetSegments().front().AsIoBuffer();
+ }
+ }
+ else
+ {
+ // Not a homogenous memory buffer, read all to memory
+
+ BlockMemoryBuffer = UniqueBuffer::Alloc(PartialBlockBuffer.GetSize()).MoveToShared().AsIoBuffer();
+ MutableMemoryView ReadMem = BlockMemoryBuffer.GetMutableView();
+ for (const SharedBuffer& Segment : Segments)
+ {
+ IoBufferFileReference FileRef = {};
+ if (Segment.AsIoBuffer().GetFileReference(FileRef))
+ {
+ BasicFile Reader;
+ Reader.Attach(FileRef.FileHandle);
+ auto _ = MakeGuard([&Reader]() { Reader.Detach(); });
+ Reader.Read(ReadMem.GetData(), FileRef.FileChunkSize, FileRef.FileChunkOffset);
+ ReadMem = ReadMem.Mid(FileRef.FileChunkSize);
+ }
+ else
+ {
+ ReadMem = ReadMem.CopyFrom(Segment.AsIoBuffer().GetView());
+ }
+ }
+ return BlockMemoryBuffer;
+ }
+ }
+
+ FolderContent CheckFolderFiles(ProgressBase& Progress,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag,
+ std::string_view ProgressLabel,
+ TransferThreadWorkers& Workers,
+ GetFolderContentStatistics& LocalFolderScanStats,
+ const std::filesystem::path& Path,
+ std::span<const std::filesystem::path> PathsToCheck)
+ {
+ std::unique_ptr<ProgressBase::ProgressBar> ProgressBar = Progress.CreateProgressBar(ProgressLabel);
+ FolderContent Result = GetValidFolderContent(
+ Workers.GetIOWorkerPool(),
+ LocalFolderScanStats,
+ Path,
+ PathsToCheck,
+ [&ProgressBar, &LocalFolderScanStats, &AbortFlag, &PauseFlag](uint64_t PathCount, uint64_t CompletedPathCount) {
+ std::string Details =
+ fmt::format("{}/{} checked, {} found", CompletedPathCount, PathCount, LocalFolderScanStats.FoundFileCount.load());
+ ProgressBar->UpdateState({.Task = "Checking files ",
+ .Details = Details,
+ .TotalCount = PathCount,
+ .RemainingCount = PathCount - CompletedPathCount,
+ .Status = ProgressBase::ProgressBar::State::CalculateStatus(AbortFlag, PauseFlag)},
+ false);
+ },
+ Progress.GetProgressUpdateDelayMS(),
+ AbortFlag,
+ PauseFlag);
+ ProgressBar->Finish();
+ return Result;
+ }
+
+ ChunkedFolderContent ScanFolderFiles(ProgressBase& Progress,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag,
+ std::string_view ProgressLabel,
+ TransferThreadWorkers& Workers,
+ const std::filesystem::path& Path,
+ const FolderContent& FolderSource,
+ ChunkingController& ChunkController,
+ ChunkingCache& ChunkCache,
+ ChunkingStatistics& OutChunkingStats)
+ {
+ uint64_t ByteCountToScan = 0;
+ for (const uint64_t RawSize : FolderSource.RawSizes)
+ {
+ ByteCountToScan += RawSize;
+ }
+ std::unique_ptr<ProgressBase::ProgressBar> ProgressBar = Progress.CreateProgressBar(ProgressLabel);
+ FilteredRate FilteredBytesHashed;
+ FilteredBytesHashed.Start();
+ ChunkingStatistics LocalChunkingStats;
+ ChunkedFolderContent Result = ChunkFolderContent(
+ LocalChunkingStats,
+ Workers.GetIOWorkerPool(),
+ Path,
+ FolderSource,
+ ChunkController,
+ ChunkCache,
+ Progress.GetProgressUpdateDelayMS(),
+ [&](bool IsAborted, bool IsPaused, std::ptrdiff_t) {
+ FilteredBytesHashed.Update(LocalChunkingStats.BytesHashed.load());
+ std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found",
+ LocalChunkingStats.FilesProcessed.load(),
+ FolderSource.Paths.size(),
+ NiceBytes(LocalChunkingStats.BytesHashed.load()),
+ NiceBytes(ByteCountToScan),
+ NiceNum(FilteredBytesHashed.GetCurrent()),
+ LocalChunkingStats.UniqueChunksFound.load(),
+ NiceBytes(LocalChunkingStats.UniqueBytesFound.load()));
+ ProgressBar->UpdateState({.Task = "Scanning files ",
+ .Details = Details,
+ .TotalCount = ByteCountToScan,
+ .RemainingCount = ByteCountToScan - LocalChunkingStats.BytesHashed.load(),
+ .Status = ProgressBase::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ },
+ AbortFlag,
+ PauseFlag);
+ OutChunkingStats += LocalChunkingStats;
+ FilteredBytesHashed.Stop();
+ ProgressBar->Finish();
+ return Result;
+ }
+} // namespace
+
+BuildsOperationUpdateFolder::BuildsOperationUpdateFolder(LoggerRef Log,
+ ProgressBase& Progress,
+ StorageInstance& Storage,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag,
+ WorkerThreadPool& IOWorkerPool,
+ WorkerThreadPool& NetworkPool,
+ const Oid& BuildId,
+ const std::filesystem::path& Path,
+ const ChunkedFolderContent& LocalContent,
+ const ChunkedContentLookup& LocalLookup,
+ const ChunkedFolderContent& RemoteContent,
+ const ChunkedContentLookup& RemoteLookup,
+ const std::vector<ChunkBlockDescription>& BlockDescriptions,
+ const std::vector<IoHash>& LooseChunkHashes,
+ const Options& Options)
+: m_Log(Log)
+, m_Progress(Progress)
+, m_Storage(Storage)
+, m_AbortFlag(AbortFlag)
+, m_PauseFlag(PauseFlag)
+, m_IOWorkerPool(IOWorkerPool)
+, m_NetworkPool(NetworkPool)
+, m_BuildId(BuildId)
+, m_Path(Path)
+, m_LocalContent(LocalContent)
+, m_LocalLookup(LocalLookup)
+, m_RemoteContent(RemoteContent)
+, m_RemoteLookup(RemoteLookup)
+, m_BlockDescriptions(BlockDescriptions)
+, m_LooseChunkHashes(LooseChunkHashes)
+, m_Options(Options)
+, m_CacheFolderPath(ZenTempCacheFolderPath(m_Options.ZenFolderPath))
+, m_TempDownloadFolderPath(ZenTempDownloadFolderPath(m_Options.ZenFolderPath))
+, m_TempBlockFolderPath(ZenTempBlockFolderPath(m_Options.ZenFolderPath))
+{
+}
+
+void
+BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
+{
+ ZEN_TRACE_CPU("BuildsOperationUpdateFolder::Execute");
+ try
+ {
+ enum class TaskSteps : uint32_t
+ {
+ ScanExistingData,
+ WriteChunks,
+ PrepareTarget,
+ FinalizeTarget,
+ Cleanup,
+ StepCount
+ };
+
+ auto EndProgress =
+ MakeGuard([&]() { m_Progress.SetLogOperationProgress((uint32_t)TaskSteps::StepCount, (uint32_t)TaskSteps::StepCount); });
+
+ m_Progress.SetLogOperationProgress((uint32_t)TaskSteps::ScanExistingData, (uint32_t)TaskSteps::StepCount);
+
+ CreateDirectories(m_CacheFolderPath);
+ CreateDirectories(m_TempDownloadFolderPath);
+ CreateDirectories(m_TempBlockFolderPath);
+
+ std::vector<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters(m_RemoteContent.ChunkedContent.SequenceRawHashes.size());
+ std::vector<bool> RemoteChunkIndexNeedsCopyFromLocalFileFlags(m_RemoteContent.ChunkedContent.ChunkHashes.size());
+ std::vector<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags(m_RemoteContent.ChunkedContent.ChunkHashes.size());
+
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedChunkHashesFound;
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedSequenceHashesFound;
+ ScanCacheFolder(CachedChunkHashesFound, CachedSequenceHashesFound);
+
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedBlocksFound;
+ ScanTempBlocksFolder(CachedBlocksFound);
+
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> SequenceIndexesLeftToFindToRemoteIndex;
+ InitializeSequenceCounters(SequenceIndexChunksLeftToWriteCounters,
+ SequenceIndexesLeftToFindToRemoteIndex,
+ CachedChunkHashesFound,
+ CachedSequenceHashesFound);
+
+ std::vector<ChunkedFolderContent> ScavengedContents;
+ std::vector<ChunkedContentLookup> ScavengedLookups;
+ std::vector<std::filesystem::path> ScavengedPaths;
+
+ std::vector<ScavengedSequenceCopyOperation> ScavengedSequenceCopyOperations;
+ uint64_t ScavengedPathsCount = 0;
+
+ if (m_Options.EnableOtherDownloadsScavenging)
+ {
+ ZEN_TRACE_CPU("GetScavengedSequences");
+
+ Stopwatch ScavengeTimer;
+
+ if (!SequenceIndexesLeftToFindToRemoteIndex.empty())
+ {
+ std::vector<ScavengeSource> ScavengeSources = FindScavengeSources();
+ ScanScavengeSources(ScavengeSources, ScavengedContents, ScavengedLookups, ScavengedPaths);
+ if (m_AbortFlag)
+ {
+ return;
+ }
+
+ MatchScavengedSequencesToRemote(ScavengedContents,
+ ScavengedLookups,
+ ScavengedPaths,
+ SequenceIndexesLeftToFindToRemoteIndex,
+ SequenceIndexChunksLeftToWriteCounters,
+ ScavengedSequenceCopyOperations,
+ ScavengedPathsCount);
+ }
+ m_CacheMappingStats.ScavengeElapsedWallTimeUs += ScavengeTimer.GetElapsedTimeUs();
+ }
+
+ uint32_t RemainingChunkCount = 0;
+ for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < m_RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++)
+ {
+ uint64_t ChunkWriteCount = GetChunkWriteCount(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex);
+ if (ChunkWriteCount > 0)
+ {
+ RemainingChunkCount++;
+ }
+ }
+
+ // Pick up all chunks in current local state
+ tsl::robin_map<IoHash, size_t, IoHash::Hasher> RawHashToCopyChunkDataIndex;
+ std::vector<CopyChunkData> CopyChunkDatas;
+
+ if (m_Options.EnableTargetFolderScavenging)
+ {
+ ZEN_TRACE_CPU("GetLocalChunks");
+
+ Stopwatch LocalTimer;
+
+ ScavengeSourceForChunks(RemainingChunkCount,
+ RemoteChunkIndexNeedsCopyFromLocalFileFlags,
+ RawHashToCopyChunkDataIndex,
+ SequenceIndexChunksLeftToWriteCounters,
+ m_LocalContent,
+ m_LocalLookup,
+ CopyChunkDatas,
+ uint32_t(-1),
+ m_CacheMappingStats.LocalChunkMatchingRemoteCount,
+ m_CacheMappingStats.LocalChunkMatchingRemoteByteCount);
+
+ m_CacheMappingStats.LocalScanElapsedWallTimeUs += LocalTimer.GetElapsedTimeUs();
+ }
+
+ if (m_Options.EnableOtherDownloadsScavenging)
+ {
+ ZEN_TRACE_CPU("GetScavengeChunks");
+
+ Stopwatch ScavengeTimer;
+
+ for (uint32_t ScavengedContentIndex = 0; ScavengedContentIndex < ScavengedContents.size() && (RemainingChunkCount > 0);
+ ScavengedContentIndex++)
+ {
+ const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengedContentIndex];
+ const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengedContentIndex];
+
+ ScavengeSourceForChunks(RemainingChunkCount,
+ RemoteChunkIndexNeedsCopyFromLocalFileFlags,
+ RawHashToCopyChunkDataIndex,
+ SequenceIndexChunksLeftToWriteCounters,
+ ScavengedContent,
+ ScavengedLookup,
+ CopyChunkDatas,
+ ScavengedContentIndex,
+ m_CacheMappingStats.ScavengedChunkMatchingRemoteCount,
+ m_CacheMappingStats.ScavengedChunkMatchingRemoteByteCount);
+ }
+ m_CacheMappingStats.ScavengeElapsedWallTimeUs += ScavengeTimer.GetElapsedTimeUs();
+ }
+
+ if (!m_Options.IsQuiet)
+ {
+ if (m_CacheMappingStats.CacheSequenceHashesCount > 0 || m_CacheMappingStats.CacheChunkCount > 0 ||
+ m_CacheMappingStats.CacheBlockCount > 0)
+ {
+ ZEN_INFO("Download cache: Found {} ({}) chunk sequences, {} ({}) chunks, {} ({}) blocks in {}",
+ m_CacheMappingStats.CacheSequenceHashesCount,
+ NiceBytes(m_CacheMappingStats.CacheSequenceHashesByteCount),
+ m_CacheMappingStats.CacheChunkCount,
+ NiceBytes(m_CacheMappingStats.CacheChunkByteCount),
+ m_CacheMappingStats.CacheBlockCount,
+ NiceBytes(m_CacheMappingStats.CacheBlocksByteCount),
+ NiceTimeSpanMs(m_CacheMappingStats.CacheScanElapsedWallTimeUs / 1000));
+ }
+
+ if (m_CacheMappingStats.LocalPathsMatchingSequencesCount > 0 || m_CacheMappingStats.LocalChunkMatchingRemoteCount > 0)
+ {
+ ZEN_INFO("Local state : Found {} ({}) chunk sequences, {} ({}) chunks in {}",
+ m_CacheMappingStats.LocalPathsMatchingSequencesCount,
+ NiceBytes(m_CacheMappingStats.LocalPathsMatchingSequencesByteCount),
+ m_CacheMappingStats.LocalChunkMatchingRemoteCount,
+ NiceBytes(m_CacheMappingStats.LocalChunkMatchingRemoteByteCount),
+ NiceTimeSpanMs(m_CacheMappingStats.LocalScanElapsedWallTimeUs / 1000));
+ }
+ if (m_CacheMappingStats.ScavengedPathsMatchingSequencesCount > 0 || m_CacheMappingStats.ScavengedChunkMatchingRemoteCount > 0)
+ {
+ ZEN_INFO("Scavenge of {} paths, found {} ({}) chunk sequences, {} ({}) chunks in {}",
+ ScavengedPathsCount,
+ m_CacheMappingStats.ScavengedPathsMatchingSequencesCount,
+ NiceBytes(m_CacheMappingStats.ScavengedPathsMatchingSequencesByteCount),
+ m_CacheMappingStats.ScavengedChunkMatchingRemoteCount,
+ NiceBytes(m_CacheMappingStats.ScavengedChunkMatchingRemoteByteCount),
+ NiceTimeSpanMs(m_CacheMappingStats.ScavengeElapsedWallTimeUs / 1000));
+ }
+ }
+
+ uint64_t BytesToWrite = CalculateBytesToWriteAndFlagNeededChunks(SequenceIndexChunksLeftToWriteCounters,
+ RemoteChunkIndexNeedsCopyFromLocalFileFlags,
+ RemoteChunkIndexNeedsCopyFromSourceFlags);
+
+ for (const ScavengedSequenceCopyOperation& ScavengeCopyOp : ScavengedSequenceCopyOperations)
+ {
+ BytesToWrite += ScavengeCopyOp.RawSize;
+ }
+
+ uint64_t BytesToValidate = m_Options.ValidateCompletedSequences ? BytesToWrite : 0;
+
+ uint64_t TotalRequestCount = 0;
+ uint64_t TotalPartWriteCount = 0;
+ std::atomic<uint64_t> WritePartsComplete = 0;
+
+ tsl::robin_map<std::string, uint32_t> RemotePathToRemoteIndex;
+ RemotePathToRemoteIndex.reserve(m_RemoteContent.Paths.size());
+ for (uint32_t RemotePathIndex = 0; RemotePathIndex < m_RemoteContent.Paths.size(); RemotePathIndex++)
+ {
+ RemotePathToRemoteIndex.insert({m_RemoteContent.Paths[RemotePathIndex].generic_string(), RemotePathIndex});
+ }
+
+ CheckRequiredDiskSpace(RemotePathToRemoteIndex);
+
+ BlobsExistsResult ExistsResult;
+ {
+ ChunkBlockAnalyser BlockAnalyser(
+ Log(),
+ m_BlockDescriptions,
+ ChunkBlockAnalyser::Options{.IsQuiet = m_Options.IsQuiet,
+ .IsVerbose = m_Options.IsVerbose,
+ .HostLatencySec = m_Storage.BuildStorageHost.LatencySec,
+ .HostHighSpeedLatencySec = m_Storage.CacheHost.LatencySec,
+ .HostMaxRangeCountPerRequest = m_Storage.BuildStorageHost.Caps.MaxRangeCountPerRequest,
+ .HostHighSpeedMaxRangeCountPerRequest = m_Storage.CacheHost.Caps.MaxRangeCountPerRequest});
+
+ std::vector<ChunkBlockAnalyser::NeededBlock> NeededBlocks = BlockAnalyser.GetNeeded(
+ m_RemoteLookup.ChunkHashToChunkIndex,
+ [&](uint32_t RemoteChunkIndex) -> bool { return RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex]; });
+
+ std::vector<uint32_t> FetchBlockIndexes;
+ std::vector<uint32_t> CachedChunkBlockIndexes;
+ ClassifyCachedAndFetchBlocks(NeededBlocks, CachedBlocksFound, TotalPartWriteCount, CachedChunkBlockIndexes, FetchBlockIndexes);
+
+ std::vector<uint32_t> NeededLooseChunkIndexes = DetermineNeededLooseChunkIndexes(SequenceIndexChunksLeftToWriteCounters,
+ RemoteChunkIndexNeedsCopyFromLocalFileFlags,
+ RemoteChunkIndexNeedsCopyFromSourceFlags);
+
+ ExistsResult = QueryBlobCacheExists(NeededLooseChunkIndexes, FetchBlockIndexes);
+
+ std::vector<ChunkBlockAnalyser::EPartialBlockDownloadMode> BlockPartialDownloadModes =
+ DeterminePartialDownloadModes(ExistsResult);
+ ZEN_ASSERT(BlockPartialDownloadModes.size() == m_BlockDescriptions.size());
+
+ ChunkBlockAnalyser::BlockResult PartialBlocks =
+ BlockAnalyser.CalculatePartialBlockDownloads(NeededBlocks, BlockPartialDownloadModes);
+
+ TotalRequestCount += NeededLooseChunkIndexes.size();
+ TotalPartWriteCount += NeededLooseChunkIndexes.size();
+ TotalRequestCount += PartialBlocks.BlockRanges.size();
+ TotalPartWriteCount += PartialBlocks.BlockRanges.size();
+ TotalRequestCount += PartialBlocks.FullBlockIndexes.size();
+ TotalPartWriteCount += PartialBlocks.FullBlockIndexes.size();
+
+ std::vector<LooseChunkHashWorkData> LooseChunkHashWorks =
+ BuildLooseChunkHashWorks(NeededLooseChunkIndexes, SequenceIndexChunksLeftToWriteCounters);
+
+ ZEN_TRACE_CPU("WriteChunks");
+
+ m_Progress.SetLogOperationProgress((uint32_t)TaskSteps::WriteChunks, (uint32_t)TaskSteps::StepCount);
+
+ Stopwatch WriteTimer;
+
+ FilteredRate FilteredDownloadedBytesPerSecond;
+ FilteredRate FilteredWrittenBytesPerSecond;
+
+ std::unique_ptr<ProgressBase::ProgressBar> ProgressBar = m_Progress.CreateProgressBar("Writing");
+ ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ TotalPartWriteCount += CopyChunkDatas.size();
+ TotalPartWriteCount += ScavengedSequenceCopyOperations.size();
+
+ BufferedWriteFileCache WriteCache;
+
+ WriteChunksContext Context{.Work = Work,
+ .WriteCache = WriteCache,
+ .SequenceIndexChunksLeftToWriteCounters = SequenceIndexChunksLeftToWriteCounters,
+ .RemoteChunkIndexNeedsCopyFromSourceFlags = RemoteChunkIndexNeedsCopyFromSourceFlags,
+ .WritePartsComplete = WritePartsComplete,
+ .TotalPartWriteCount = TotalPartWriteCount,
+ .TotalRequestCount = TotalRequestCount,
+ .ExistsResult = ExistsResult,
+ .FilteredDownloadedBytesPerSecond = FilteredDownloadedBytesPerSecond,
+ .FilteredWrittenBytesPerSecond = FilteredWrittenBytesPerSecond};
+
+ ScheduleScavengedSequenceWrites(Context, ScavengedSequenceCopyOperations, ScavengedContents, ScavengedPaths);
+ ScheduleLooseChunkWrites(Context, LooseChunkHashWorks);
+
+ std::unique_ptr<CloneQueryInterface> CloneQuery =
+ m_Options.AllowFileClone ? GetCloneQueryInterface(m_CacheFolderPath) : nullptr;
+
+ ScheduleLocalChunkCopies(Context, CopyChunkDatas, CloneQuery.get(), ScavengedContents, ScavengedLookups, ScavengedPaths);
+ ScheduleCachedBlockWrites(Context, CachedChunkBlockIndexes);
+ SchedulePartialBlockDownloads(Context, PartialBlocks);
+ ScheduleFullBlockDownloads(Context, PartialBlocks.FullBlockIndexes);
+
+ {
+ ZEN_TRACE_CPU("WriteChunks_Wait");
+
+ Work.Wait(m_Progress.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
+ uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() +
+ m_DownloadStats.DownloadedBlockByteCount.load() +
+ +m_DownloadStats.DownloadedPartialBlockByteCount.load();
+ FilteredWrittenBytesPerSecond.Update(m_DiskStats.WriteByteCount.load());
+ FilteredDownloadedBytesPerSecond.Update(DownloadedBytes);
+ std::string DownloadRateString =
+ (m_DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ ? ""
+ : fmt::format(" {}bits/s", NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8));
+ std::string CloneDetails;
+ if (m_DiskStats.CloneCount.load() > 0)
+ {
+ CloneDetails = fmt::format(" ({} cloned)", NiceBytes(m_DiskStats.CloneByteCount.load()));
+ }
+ std::string WriteDetails = fmt::format(" {}/{} ({}B/s) written{}",
+ NiceBytes(m_WrittenChunkByteCount.load()),
+ NiceBytes(BytesToWrite),
+ NiceNum(FilteredWrittenBytesPerSecond.GetCurrent()),
+ CloneDetails);
+
+ std::string Details = fmt::format("{}/{} ({}{}) downloaded.{}",
+ m_DownloadStats.RequestsCompleteCount.load(),
+ TotalRequestCount,
+ NiceBytes(DownloadedBytes),
+ DownloadRateString,
+ WriteDetails);
+
+ std::string Task;
+ if ((m_WrittenChunkByteCount < BytesToWrite) || (BytesToValidate == 0))
+ {
+ Task = "Writing chunks ";
+ }
+ else
+ {
+ Task = "Verifying chunks ";
+ }
+
+ ProgressBar->UpdateState({.Task = Task,
+ .Details = Details,
+ .TotalCount = (BytesToWrite + BytesToValidate),
+ .RemainingCount = ((BytesToWrite + BytesToValidate) -
+ (m_WrittenChunkByteCount.load() + m_ValidatedChunkByteCount.load())),
+ .Status = ProgressBase::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ });
+ }
+
+ CloneQuery.reset();
+
+ FilteredWrittenBytesPerSecond.Stop();
+ FilteredDownloadedBytesPerSecond.Stop();
+
+ ProgressBar->Finish();
+ if (m_AbortFlag)
+ {
+ return;
+ }
+
+ VerifyWriteChunksComplete(SequenceIndexChunksLeftToWriteCounters, BytesToWrite, BytesToValidate);
+
+ const uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() +
+ m_DownloadStats.DownloadedBlockByteCount.load() +
+ m_DownloadStats.DownloadedPartialBlockByteCount.load();
+ if (!m_Options.IsQuiet)
+ {
+ std::string CloneDetails;
+ if (m_DiskStats.CloneCount.load() > 0)
+ {
+ CloneDetails = fmt::format(" ({} cloned)", NiceBytes(m_DiskStats.CloneByteCount.load()));
+ }
+ ZEN_INFO("Downloaded {} ({}bits/s) in {}. Wrote {} ({}B/s){} in {}. Completed in {}",
+ NiceBytes(DownloadedBytes),
+ NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), DownloadedBytes * 8)),
+ NiceTimeSpanMs(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS() / 1000),
+ NiceBytes(m_WrittenChunkByteCount.load()),
+ NiceNum(GetBytesPerSecond(FilteredWrittenBytesPerSecond.GetElapsedTimeUS(), m_DiskStats.WriteByteCount.load())),
+ CloneDetails,
+ NiceTimeSpanMs(FilteredWrittenBytesPerSecond.GetElapsedTimeUS() / 1000),
+ NiceTimeSpanMs(WriteTimer.GetElapsedTimeMs()));
+ }
+
+ m_WriteChunkStats.WriteChunksElapsedWallTimeUs = WriteTimer.GetElapsedTimeUs();
+ m_WriteChunkStats.DownloadTimeUs = FilteredDownloadedBytesPerSecond.GetElapsedTimeUS();
+ m_WriteChunkStats.WriteTimeUs = FilteredWrittenBytesPerSecond.GetElapsedTimeUS();
+ }
+
+ m_Progress.SetLogOperationProgress((uint32_t)TaskSteps::PrepareTarget, (uint32_t)TaskSteps::StepCount);
+
+ if (m_AbortFlag)
+ {
+ return;
+ }
+
+ LocalPathCategorization Categorization = CategorizeLocalPaths(RemotePathToRemoteIndex);
+
+ if (m_AbortFlag)
+ {
+ return;
+ }
+
+ std::atomic<uint64_t> CachedCount = 0;
+ std::atomic<uint64_t> CachedByteCount = 0;
+ ScheduleLocalFileCaching(Categorization.FilesToCache, CachedCount, CachedByteCount);
+ if (m_AbortFlag)
+ {
+ return;
+ }
+
+ ZEN_DEBUG(
+ "Local state prep: Match: {}, PathMismatch: {}, HashMismatch: {}, Cached: {} ({}), Skipped: {}, "
+ "Delete: {}",
+ Categorization.MatchCount,
+ Categorization.PathMismatchCount,
+ Categorization.HashMismatchCount,
+ CachedCount.load(),
+ NiceBytes(CachedByteCount.load()),
+ Categorization.SkippedCount,
+ Categorization.DeleteCount);
+
+ m_Progress.SetLogOperationProgress((uint32_t)TaskSteps::FinalizeTarget, (uint32_t)TaskSteps::StepCount);
+
+ if (m_Options.WipeTargetFolder)
+ {
+ ZEN_TRACE_CPU("WipeTarget");
+ Stopwatch Timer;
+
+ // Clean target folder
+ if (!CleanDirectory(Log(),
+ m_Progress,
+ m_IOWorkerPool,
+ m_AbortFlag,
+ m_PauseFlag,
+ m_Options.IsQuiet,
+ m_Path,
+ m_Options.ExcludeFolders))
+ {
+ ZEN_WARN("Some files in {} could not be removed", m_Path);
+ }
+ m_RebuildFolderStateStats.CleanFolderElapsedWallTimeUs = Timer.GetElapsedTimeUs();
+ }
+
+ if (m_AbortFlag)
+ {
+ return;
+ }
+
+ {
+ ZEN_TRACE_CPU("FinalizeTree");
+
+ Stopwatch Timer;
+
+ std::unique_ptr<ProgressBase::ProgressBar> ProgressBar = m_Progress.CreateProgressBar("Rebuild State");
+ ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ OutLocalFolderState.Paths.resize(m_RemoteContent.Paths.size());
+ OutLocalFolderState.RawSizes.resize(m_RemoteContent.Paths.size());
+ OutLocalFolderState.Attributes.resize(m_RemoteContent.Paths.size());
+ OutLocalFolderState.ModificationTicks.resize(m_RemoteContent.Paths.size());
+
+ std::atomic<uint64_t> DeletedCount = 0;
+ std::atomic<uint64_t> TargetsComplete = 0;
+
+ ScheduleLocalFileRemovals(Work, Categorization.RemoveLocalPathIndexes, DeletedCount);
+
+ std::vector<FinalizeTarget> Targets = BuildSortedFinalizeTargets();
+
+ ScheduleTargetFinalization(Work,
+ Targets,
+ Categorization.SequenceHashToLocalPathIndex,
+ Categorization.RemotePathIndexToLocalPathIndex,
+ OutLocalFolderState,
+ TargetsComplete);
+
+ {
+ ZEN_TRACE_CPU("FinalizeTree_Wait");
+
+ Work.Wait(m_Progress.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
+ const uint64_t WorkTotal = Targets.size() + Categorization.RemoveLocalPathIndexes.size();
+ const uint64_t WorkComplete = TargetsComplete.load() + DeletedCount.load();
+ std::string Details = fmt::format("{}/{} files", WorkComplete, WorkTotal);
+ ProgressBar->UpdateState({.Task = "Rebuilding state ",
+ .Details = Details,
+ .TotalCount = gsl::narrow<uint64_t>(WorkTotal),
+ .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete),
+ .Status = ProgressBase::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ });
+ }
+
+ m_RebuildFolderStateStats.FinalizeTreeElapsedWallTimeUs = Timer.GetElapsedTimeUs();
+ ProgressBar->Finish();
+ }
+ m_Progress.SetLogOperationProgress((uint32_t)TaskSteps::Cleanup, (uint32_t)TaskSteps::StepCount);
+ }
+ catch (const std::exception&)
+ {
+ m_AbortFlag = true;
+ throw;
+ }
+}
+
+void
+BuildsOperationUpdateFolder::ScanCacheFolder(tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& OutCachedChunkHashesFound,
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& OutCachedSequenceHashesFound)
+{
+ ZEN_TRACE_CPU("ScanCacheFolder");
+
+ Stopwatch CacheTimer;
+
+ DirectoryContent CacheDirContent;
+ GetDirectoryContent(m_CacheFolderPath, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeFileSizes, CacheDirContent);
+ for (size_t Index = 0; Index < CacheDirContent.Files.size(); Index++)
+ {
+ if (m_Options.EnableTargetFolderScavenging)
+ {
+ IoHash FileHash;
+ if (IoHash::TryParse(CacheDirContent.Files[Index].filename().string(), FileHash))
+ {
+ if (auto ChunkIt = m_RemoteLookup.ChunkHashToChunkIndex.find(FileHash);
+ ChunkIt != m_RemoteLookup.ChunkHashToChunkIndex.end())
+ {
+ const uint32_t ChunkIndex = ChunkIt->second;
+ const uint64_t ChunkSize = m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex];
+ if (ChunkSize == CacheDirContent.FileSizes[Index])
+ {
+ OutCachedChunkHashesFound.insert({FileHash, ChunkIndex});
+ m_CacheMappingStats.CacheChunkCount++;
+ m_CacheMappingStats.CacheChunkByteCount += ChunkSize;
+ continue;
+ }
+ }
+ else if (auto SequenceIt = m_RemoteLookup.RawHashToSequenceIndex.find(FileHash);
+ SequenceIt != m_RemoteLookup.RawHashToSequenceIndex.end())
+ {
+ const uint32_t SequenceIndex = SequenceIt->second;
+ const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex];
+ const uint64_t SequenceSize = m_RemoteContent.RawSizes[PathIndex];
+ if (SequenceSize == CacheDirContent.FileSizes[Index])
+ {
+ OutCachedSequenceHashesFound.insert({FileHash, SequenceIndex});
+ m_CacheMappingStats.CacheSequenceHashesCount++;
+ m_CacheMappingStats.CacheSequenceHashesByteCount += SequenceSize;
+
+ const std::filesystem::path CacheFilePath =
+ GetFinalChunkedSequenceFileName(m_CacheFolderPath,
+ m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
+ ZEN_ASSERT_SLOW(IsFile(CacheFilePath));
+
+ continue;
+ }
+ }
+ }
+ }
+ std::error_code Ec = TryRemoveFile(CacheDirContent.Files[Index]);
+ if (Ec)
+ {
+ ZEN_DEBUG("Failed removing file '{}', reason: ({}) {}", CacheDirContent.Files[Index], Ec.value(), Ec.message());
+ }
+ }
+ m_CacheMappingStats.CacheScanElapsedWallTimeUs += CacheTimer.GetElapsedTimeUs();
+}
+
+void
+BuildsOperationUpdateFolder::ScanTempBlocksFolder(tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& OutCachedBlocksFound)
+{
+ ZEN_TRACE_CPU("ScanTempBlocksFolder");
+
+ Stopwatch CacheTimer;
+
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> AllBlockSizes;
+ AllBlockSizes.reserve(m_BlockDescriptions.size());
+ for (uint32_t BlockIndex = 0; BlockIndex < m_BlockDescriptions.size(); BlockIndex++)
+ {
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
+ AllBlockSizes.insert({BlockDescription.BlockHash, BlockIndex});
+ }
+
+ DirectoryContent BlockDirContent;
+ GetDirectoryContent(m_TempBlockFolderPath,
+ DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeFileSizes,
+ BlockDirContent);
+ OutCachedBlocksFound.reserve(BlockDirContent.Files.size());
+ for (size_t Index = 0; Index < BlockDirContent.Files.size(); Index++)
+ {
+ if (m_Options.EnableTargetFolderScavenging)
+ {
+ IoHash FileHash;
+ if (IoHash::TryParse(BlockDirContent.Files[Index].filename().string(), FileHash))
+ {
+ if (auto BlockIt = AllBlockSizes.find(FileHash); BlockIt != AllBlockSizes.end())
+ {
+ const uint32_t BlockIndex = BlockIt->second;
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
+ uint64_t BlockSize = CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize;
+ for (uint64_t ChunkSize : BlockDescription.ChunkCompressedLengths)
+ {
+ BlockSize += ChunkSize;
+ }
+
+ if (BlockSize == BlockDirContent.FileSizes[Index])
+ {
+ OutCachedBlocksFound.insert({FileHash, BlockIndex});
+ m_CacheMappingStats.CacheBlockCount++;
+ m_CacheMappingStats.CacheBlocksByteCount += BlockSize;
+ continue;
+ }
+ }
+ }
+ }
+ std::error_code Ec = TryRemoveFile(BlockDirContent.Files[Index]);
+ if (Ec)
+ {
+ ZEN_DEBUG("Failed removing file '{}', reason: ({}) {}", BlockDirContent.Files[Index], Ec.value(), Ec.message());
+ }
+ }
+
+ m_CacheMappingStats.CacheScanElapsedWallTimeUs += CacheTimer.GetElapsedTimeUs();
+}
+
+void
+BuildsOperationUpdateFolder::InitializeSequenceCounters(std::vector<std::atomic<uint32_t>>& OutSequenceCounters,
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& OutSequencesLeftToFind,
+ const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& CachedChunkHashesFound,
+ const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& CachedSequenceHashesFound)
+{
+ if (m_Options.EnableTargetFolderScavenging)
+ {
+ // Pick up all whole files we can use from current local state
+ ZEN_TRACE_CPU("GetLocalSequences");
+
+ std::vector<uint32_t> MissingSequenceIndexes = ScanTargetFolder(CachedChunkHashesFound, CachedSequenceHashesFound);
+
+ for (uint32_t RemoteSequenceIndex : MissingSequenceIndexes)
+ {
+ // We must write the sequence
+ const uint32_t ChunkCount = m_RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex];
+ const IoHash& RemoteSequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ OutSequenceCounters[RemoteSequenceIndex] = ChunkCount;
+ OutSequencesLeftToFind.insert({RemoteSequenceRawHash, RemoteSequenceIndex});
+ }
+ }
+ else
+ {
+ for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < m_RemoteContent.ChunkedContent.SequenceRawHashes.size();
+ RemoteSequenceIndex++)
+ {
+ OutSequenceCounters[RemoteSequenceIndex] = m_RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex];
+ }
+ }
+}
+
+void
+BuildsOperationUpdateFolder::MatchScavengedSequencesToRemote(std::span<const ChunkedFolderContent> Contents,
+ std::span<const ChunkedContentLookup> Lookups,
+ std::span<const std::filesystem::path> Paths,
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& InOutSequencesLeftToFind,
+ std::vector<std::atomic<uint32_t>>& InOutSequenceCounters,
+ std::vector<ScavengedSequenceCopyOperation>& OutCopyOperations,
+ uint64_t& OutScavengedPathsCount)
+{
+ for (uint32_t ScavengedContentIndex = 0; ScavengedContentIndex < Contents.size() && !InOutSequencesLeftToFind.empty();
+ ScavengedContentIndex++)
+ {
+ const std::filesystem::path& ScavengePath = Paths[ScavengedContentIndex];
+ if (ScavengePath.empty())
+ {
+ continue;
+ }
+ const ChunkedFolderContent& ScavengedLocalContent = Contents[ScavengedContentIndex];
+ const ChunkedContentLookup& ScavengedLookup = Lookups[ScavengedContentIndex];
+
+ for (uint32_t ScavengedSequenceIndex = 0; ScavengedSequenceIndex < ScavengedLocalContent.ChunkedContent.SequenceRawHashes.size();
+ ScavengedSequenceIndex++)
+ {
+ const IoHash& SequenceRawHash = ScavengedLocalContent.ChunkedContent.SequenceRawHashes[ScavengedSequenceIndex];
+ auto It = InOutSequencesLeftToFind.find(SequenceRawHash);
+ if (It == InOutSequencesLeftToFind.end())
+ {
+ continue;
+ }
+ const uint32_t RemoteSequenceIndex = It->second;
+ const uint64_t RawSize = m_RemoteContent.RawSizes[m_RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]];
+ ZEN_ASSERT(RawSize > 0);
+
+ const uint32_t ScavengedPathIndex = ScavengedLookup.SequenceIndexFirstPathIndex[ScavengedSequenceIndex];
+ ZEN_ASSERT_SLOW(IsFile((ScavengePath / ScavengedLocalContent.Paths[ScavengedPathIndex]).make_preferred()));
+
+ OutCopyOperations.push_back({.ScavengedContentIndex = ScavengedContentIndex,
+ .ScavengedPathIndex = ScavengedPathIndex,
+ .RemoteSequenceIndex = RemoteSequenceIndex,
+ .RawSize = RawSize});
+
+ InOutSequencesLeftToFind.erase(SequenceRawHash);
+ InOutSequenceCounters[RemoteSequenceIndex] = 0;
+
+ m_CacheMappingStats.ScavengedPathsMatchingSequencesCount++;
+ m_CacheMappingStats.ScavengedPathsMatchingSequencesByteCount += RawSize;
+ }
+ OutScavengedPathsCount++;
+ }
+}
+
+uint64_t
+BuildsOperationUpdateFolder::CalculateBytesToWriteAndFlagNeededChunks(std::span<const std::atomic<uint32_t>> SequenceCounters,
+ const std::vector<bool>& NeedsCopyFromLocalFileFlags,
+ std::span<std::atomic<bool>> OutNeedsCopyFromSourceFlags)
+{
+ uint64_t BytesToWrite = 0;
+ for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < m_RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++)
+ {
+ const uint64_t ChunkWriteCount = GetChunkWriteCount(SequenceCounters, RemoteChunkIndex);
+ if (ChunkWriteCount > 0)
+ {
+ BytesToWrite += m_RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] * ChunkWriteCount;
+ if (!NeedsCopyFromLocalFileFlags[RemoteChunkIndex])
+ {
+ OutNeedsCopyFromSourceFlags[RemoteChunkIndex] = true;
+ }
+ }
+ }
+ return BytesToWrite;
+}
+
+void
+BuildsOperationUpdateFolder::ClassifyCachedAndFetchBlocks(std::span<const ChunkBlockAnalyser::NeededBlock> NeededBlocks,
+ const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& CachedBlocksFound,
+ uint64_t& TotalPartWriteCount,
+ std::vector<uint32_t>& OutCachedChunkBlockIndexes,
+ std::vector<uint32_t>& OutFetchBlockIndexes)
+{
+ ZEN_TRACE_CPU("BlockCacheFileExists");
+ for (const ChunkBlockAnalyser::NeededBlock& NeededBlock : NeededBlocks)
+ {
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[NeededBlock.BlockIndex];
+ bool UsingCachedBlock = false;
+ if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end())
+ {
+ TotalPartWriteCount++;
+
+ std::filesystem::path BlockPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString();
+ if (IsFile(BlockPath))
+ {
+ OutCachedChunkBlockIndexes.push_back(NeededBlock.BlockIndex);
+ UsingCachedBlock = true;
+ }
+ }
+ if (!UsingCachedBlock)
+ {
+ OutFetchBlockIndexes.push_back(NeededBlock.BlockIndex);
+ }
+ }
+}
+
+std::vector<uint32_t>
+BuildsOperationUpdateFolder::DetermineNeededLooseChunkIndexes(std::span<const std::atomic<uint32_t>> SequenceCounters,
+ const std::vector<bool>& NeedsCopyFromLocalFileFlags,
+ std::span<std::atomic<bool>> NeedsCopyFromSourceFlags)
+{
+ std::vector<uint32_t> NeededLooseChunkIndexes;
+ NeededLooseChunkIndexes.reserve(m_LooseChunkHashes.size());
+ for (uint32_t LooseChunkIndex = 0; LooseChunkIndex < m_LooseChunkHashes.size(); LooseChunkIndex++)
+ {
+ const IoHash& ChunkHash = m_LooseChunkHashes[LooseChunkIndex];
+ auto RemoteChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash);
+ ZEN_ASSERT(RemoteChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end());
+ const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second;
+
+ if (NeedsCopyFromLocalFileFlags[RemoteChunkIndex])
+ {
+ if (m_Options.IsVerbose)
+ {
+ ZEN_INFO("Skipping chunk {} due to cache reuse", m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]);
+ }
+ continue;
+ }
+
+ bool NeedsCopy = true;
+ if (NeedsCopyFromSourceFlags[RemoteChunkIndex].compare_exchange_strong(NeedsCopy, false))
+ {
+ const uint64_t WriteCount = GetChunkWriteCount(SequenceCounters, RemoteChunkIndex);
+ if (WriteCount == 0)
+ {
+ if (m_Options.IsVerbose)
+ {
+ ZEN_INFO("Skipping chunk {} due to cache reuse", m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]);
+ }
+ }
+ else
+ {
+ NeededLooseChunkIndexes.push_back(LooseChunkIndex);
+ }
+ }
+ }
+ return NeededLooseChunkIndexes;
+}
+
+BuildsOperationUpdateFolder::BlobsExistsResult
+BuildsOperationUpdateFolder::QueryBlobCacheExists(std::span<const uint32_t> NeededLooseChunkIndexes,
+ std::span<const uint32_t> FetchBlockIndexes)
+{
+ BlobsExistsResult Result;
+ if (!m_Storage.CacheStorage)
+ {
+ return Result;
+ }
+
+ ZEN_TRACE_CPU("BlobCacheExistCheck");
+ Stopwatch Timer;
+
+ std::vector<IoHash> BlobHashes;
+ BlobHashes.reserve(NeededLooseChunkIndexes.size() + FetchBlockIndexes.size());
+
+ for (const uint32_t LooseChunkIndex : NeededLooseChunkIndexes)
+ {
+ BlobHashes.push_back(m_LooseChunkHashes[LooseChunkIndex]);
+ }
+
+ for (uint32_t BlockIndex : FetchBlockIndexes)
+ {
+ BlobHashes.push_back(m_BlockDescriptions[BlockIndex].BlockHash);
+ }
+
+ const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult = m_Storage.CacheStorage->BlobsExists(m_BuildId, BlobHashes);
+
+ if (CacheExistsResult.size() == BlobHashes.size())
+ {
+ Result.ExistingBlobs.reserve(CacheExistsResult.size());
+ for (size_t BlobIndex = 0; BlobIndex < BlobHashes.size(); BlobIndex++)
+ {
+ if (CacheExistsResult[BlobIndex].HasBody)
+ {
+ Result.ExistingBlobs.insert(BlobHashes[BlobIndex]);
+ }
+ }
+ }
+ Result.ElapsedTimeMs = Timer.GetElapsedTimeMs();
+ if (!Result.ExistingBlobs.empty() && !m_Options.IsQuiet)
+ {
+ ZEN_INFO("Remote cache : Found {} out of {} needed blobs in {}",
+ Result.ExistingBlobs.size(),
+ BlobHashes.size(),
+ NiceTimeSpanMs(Result.ElapsedTimeMs));
+ }
+ return Result;
+}
+
+std::vector<ChunkBlockAnalyser::EPartialBlockDownloadMode>
+BuildsOperationUpdateFolder::DeterminePartialDownloadModes(const BlobsExistsResult& ExistsResult)
+{
+ std::vector<ChunkBlockAnalyser::EPartialBlockDownloadMode> Modes;
+
+ if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Off)
+ {
+ Modes.resize(m_BlockDescriptions.size(), ChunkBlockAnalyser::EPartialBlockDownloadMode::Off);
+ return Modes;
+ }
+
+ const bool MultiRangeCache = m_Storage.CacheHost.Caps.MaxRangeCountPerRequest > 1;
+ const bool MultiRangeBuild = m_Storage.BuildStorageHost.Caps.MaxRangeCountPerRequest > 1;
+ ChunkBlockAnalyser::EPartialBlockDownloadMode CachePartialDownloadMode =
+ MultiRangeCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed
+ : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange;
+ ChunkBlockAnalyser::EPartialBlockDownloadMode CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::Off;
+
+ switch (m_Options.PartialBlockRequestMode)
+ {
+ case EPartialBlockRequestMode::Off:
+ break;
+ case EPartialBlockRequestMode::ZenCacheOnly:
+ CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::Off;
+ break;
+ case EPartialBlockRequestMode::Mixed:
+ CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange;
+ break;
+ case EPartialBlockRequestMode::All:
+ CloudPartialDownloadMode = MultiRangeBuild ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange
+ : ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange;
+ break;
+ default:
+ ZEN_ASSERT(false);
+ break;
+ }
+
+ Modes.reserve(m_BlockDescriptions.size());
+ for (uint32_t BlockIndex = 0; BlockIndex < m_BlockDescriptions.size(); BlockIndex++)
+ {
+ const bool BlockExistInCache = ExistsResult.ExistingBlobs.contains(m_BlockDescriptions[BlockIndex].BlockHash);
+ Modes.push_back(BlockExistInCache ? CachePartialDownloadMode : CloudPartialDownloadMode);
+ }
+ return Modes;
+}
+
+std::vector<BuildsOperationUpdateFolder::LooseChunkHashWorkData>
+BuildsOperationUpdateFolder::BuildLooseChunkHashWorks(std::span<const uint32_t> NeededLooseChunkIndexes,
+ std::span<const std::atomic<uint32_t>> SequenceCounters)
+{
+ std::vector<LooseChunkHashWorkData> LooseChunkHashWorks;
+ LooseChunkHashWorks.reserve(NeededLooseChunkIndexes.size());
+ for (uint32_t LooseChunkIndex : NeededLooseChunkIndexes)
+ {
+ const IoHash& ChunkHash = m_LooseChunkHashes[LooseChunkIndex];
+ auto RemoteChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash);
+ ZEN_ASSERT(RemoteChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end());
+ const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second;
+
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
+ GetRemainingChunkTargets(SequenceCounters, RemoteChunkIndex);
+
+ ZEN_ASSERT(!ChunkTargetPtrs.empty());
+ LooseChunkHashWorks.push_back(LooseChunkHashWorkData{.ChunkTargetPtrs = ChunkTargetPtrs, .RemoteChunkIndex = RemoteChunkIndex});
+ }
+ return LooseChunkHashWorks;
+}
+
+void
+BuildsOperationUpdateFolder::VerifyWriteChunksComplete(std::span<const std::atomic<uint32_t>> SequenceCounters,
+ uint64_t BytesToWrite,
+ uint64_t BytesToValidate)
+{
+ uint32_t RawSequencesMissingWriteCount = 0;
+ for (uint32_t SequenceIndex = 0; SequenceIndex < SequenceCounters.size(); SequenceIndex++)
+ {
+ const auto& Counter = SequenceCounters[SequenceIndex];
+ if (Counter.load() != 0)
+ {
+ RawSequencesMissingWriteCount++;
+ const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex];
+ const std::filesystem::path& IncompletePath = m_RemoteContent.Paths[PathIndex];
+ ZEN_ASSERT(!IncompletePath.empty());
+ const uint32_t ExpectedSequenceCount = m_RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex];
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_INFO("{}: Max count {}, Current count {}", IncompletePath, ExpectedSequenceCount, Counter.load());
+ }
+ ZEN_ASSERT(Counter.load() <= ExpectedSequenceCount);
+ }
+ }
+ ZEN_ASSERT(RawSequencesMissingWriteCount == 0);
+ ZEN_ASSERT(m_WrittenChunkByteCount == BytesToWrite);
+ ZEN_ASSERT(m_ValidatedChunkByteCount == BytesToValidate);
+}
+
+std::vector<BuildsOperationUpdateFolder::FinalizeTarget>
+BuildsOperationUpdateFolder::BuildSortedFinalizeTargets()
+{
+ std::vector<FinalizeTarget> Targets;
+ Targets.reserve(m_RemoteContent.Paths.size());
+ for (uint32_t RemotePathIndex = 0; RemotePathIndex < m_RemoteContent.Paths.size(); RemotePathIndex++)
+ {
+ Targets.push_back(FinalizeTarget{.RawHash = m_RemoteContent.RawHashes[RemotePathIndex], .RemotePathIndex = RemotePathIndex});
+ }
+ std::sort(Targets.begin(), Targets.end(), [](const FinalizeTarget& Lhs, const FinalizeTarget& Rhs) {
+ return std::tie(Lhs.RawHash, Lhs.RemotePathIndex) < std::tie(Rhs.RawHash, Rhs.RemotePathIndex);
+ });
+ return Targets;
+}
+
+void
+BuildsOperationUpdateFolder::ScanScavengeSources(std::span<const ScavengeSource> Sources,
+ std::vector<ChunkedFolderContent>& OutContents,
+ std::vector<ChunkedContentLookup>& OutLookups,
+ std::vector<std::filesystem::path>& OutPaths)
+{
+ ZEN_TRACE_CPU("ScanScavengeSources");
+
+ const size_t ScavengePathCount = Sources.size();
+ OutContents.resize(ScavengePathCount);
+ OutLookups.resize(ScavengePathCount);
+ OutPaths.resize(ScavengePathCount);
+
+ std::unique_ptr<ProgressBase::ProgressBar> ProgressBar = m_Progress.CreateProgressBar("Scavenging");
+
+ ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ std::atomic<uint64_t> PathsFound(0);
+ std::atomic<uint64_t> ChunksFound(0);
+ std::atomic<uint64_t> PathsScavenged(0);
+
+ for (size_t ScavengeIndex = 0; ScavengeIndex < ScavengePathCount; ScavengeIndex++)
+ {
+ Work.ScheduleWork(m_IOWorkerPool,
+ [this, &Sources, &OutContents, &OutPaths, &OutLookups, &PathsFound, &ChunksFound, &PathsScavenged, ScavengeIndex](
+ std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_FindScavengeContent");
+
+ const ScavengeSource& Source = Sources[ScavengeIndex];
+ ChunkedFolderContent& ScavengedLocalContent = OutContents[ScavengeIndex];
+ ChunkedContentLookup& ScavengedLookup = OutLookups[ScavengeIndex];
+
+ if (FindScavengeContent(Source, ScavengedLocalContent, ScavengedLookup))
+ {
+ OutPaths[ScavengeIndex] = Source.Path;
+ PathsFound += ScavengedLocalContent.Paths.size();
+ ChunksFound += ScavengedLocalContent.ChunkedContent.ChunkHashes.size();
+ }
+ else
+ {
+ OutPaths[ScavengeIndex].clear();
+ }
+ PathsScavenged++;
+ }
+ });
+ }
+ {
+ ZEN_TRACE_CPU("ScavengeScan_Wait");
+
+ Work.Wait(m_Progress.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
+ std::string Details = fmt::format("{}/{} scanned. {} paths and {} chunks found for scavenging",
+ PathsScavenged.load(),
+ ScavengePathCount,
+ PathsFound.load(),
+ ChunksFound.load());
+ ProgressBar->UpdateState({.Task = "Scavenging ",
+ .Details = Details,
+ .TotalCount = ScavengePathCount,
+ .RemainingCount = ScavengePathCount - PathsScavenged.load(),
+ .Status = ProgressBase::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ });
+ }
+
+ ProgressBar->Finish();
+}
+
+BuildsOperationUpdateFolder::LocalPathCategorization
+BuildsOperationUpdateFolder::CategorizeLocalPaths(const tsl::robin_map<std::string, uint32_t>& RemotePathToRemoteIndex)
+{
+ ZEN_TRACE_CPU("PrepareTarget");
+
+ LocalPathCategorization Result;
+ tsl::robin_set<IoHash, IoHash::Hasher> CachedRemoteSequences;
+
+ Result.RemotePathIndexToLocalPathIndex.reserve(m_RemoteContent.Paths.size());
+
+ for (uint32_t LocalPathIndex = 0; LocalPathIndex < m_LocalContent.Paths.size(); LocalPathIndex++)
+ {
+ if (m_AbortFlag)
+ {
+ break;
+ }
+ const IoHash& RawHash = m_LocalContent.RawHashes[LocalPathIndex];
+ const std::filesystem::path& LocalPath = m_LocalContent.Paths[LocalPathIndex];
+
+ ZEN_ASSERT_SLOW(IsFile((m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred()));
+
+ if (m_Options.EnableTargetFolderScavenging)
+ {
+ if (!m_Options.WipeTargetFolder)
+ {
+ // Check if it is already in the correct place
+ if (auto RemotePathIt = RemotePathToRemoteIndex.find(LocalPath.generic_string());
+ RemotePathIt != RemotePathToRemoteIndex.end())
+ {
+ const uint32_t RemotePathIndex = RemotePathIt->second;
+ if (m_RemoteContent.RawHashes[RemotePathIndex] == RawHash)
+ {
+ // It is already in it's correct place
+ Result.RemotePathIndexToLocalPathIndex[RemotePathIndex] = LocalPathIndex;
+ Result.SequenceHashToLocalPathIndex.insert({RawHash, LocalPathIndex});
+ Result.MatchCount++;
+ continue;
+ }
+ else
+ {
+ Result.HashMismatchCount++;
+ }
+ }
+ else
+ {
+ Result.PathMismatchCount++;
+ }
+ }
+
+ // Do we need it?
+ if (m_RemoteLookup.RawHashToSequenceIndex.contains(RawHash))
+ {
+ if (!CachedRemoteSequences.contains(RawHash))
+ {
+ // We need it, make sure we move it to the cache
+ Result.FilesToCache.push_back(LocalPathIndex);
+ CachedRemoteSequences.insert(RawHash);
+ continue;
+ }
+ else
+ {
+ Result.SkippedCount++;
+ }
+ }
+ }
+
+ if (!m_Options.WipeTargetFolder)
+ {
+ // Explicitly delete the unneeded local file
+ Result.RemoveLocalPathIndexes.push_back(LocalPathIndex);
+ Result.DeleteCount++;
+ }
+ }
+
+ return Result;
+}
+
+void
+BuildsOperationUpdateFolder::ScheduleLocalFileCaching(std::span<const uint32_t> FilesToCache,
+ std::atomic<uint64_t>& OutCachedCount,
+ std::atomic<uint64_t>& OutCachedByteCount)
+{
+ ZEN_TRACE_CPU("CopyToCache");
+
+ std::unique_ptr<ProgressBase::ProgressBar> ProgressBar = m_Progress.CreateProgressBar("Cache Local Data");
+ ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ for (uint32_t LocalPathIndex : FilesToCache)
+ {
+ if (m_AbortFlag)
+ {
+ break;
+ }
+ Work.ScheduleWork(m_IOWorkerPool, [this, &OutCachedCount, &OutCachedByteCount, LocalPathIndex](std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_CopyToCache");
+
+ const IoHash& RawHash = m_LocalContent.RawHashes[LocalPathIndex];
+ const std::filesystem::path& LocalPath = m_LocalContent.Paths[LocalPathIndex];
+ const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RawHash);
+ ZEN_ASSERT_SLOW(!IsFileWithRetry(CacheFilePath));
+ const std::filesystem::path LocalFilePath = (m_Path / LocalPath).make_preferred();
+
+ std::error_code Ec = RenameFileWithRetry(LocalFilePath, CacheFilePath);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to move file from '{}' to '{}', reason: ({}) {}, retrying...",
+ LocalFilePath,
+ CacheFilePath,
+ Ec.value(),
+ Ec.message());
+ Ec = RenameFileWithRetry(LocalFilePath, CacheFilePath);
+ if (Ec)
+ {
+ throw std::system_error(std::error_code(Ec.value(), std::system_category()),
+ fmt::format("Failed to file from '{}' to '{}', reason: ({}) {}",
+ LocalFilePath,
+ CacheFilePath,
+ Ec.value(),
+ Ec.message()));
+ }
+ }
+
+ OutCachedCount++;
+ OutCachedByteCount += m_LocalContent.RawSizes[LocalPathIndex];
+ }
+ });
+ }
+
+ {
+ ZEN_TRACE_CPU("CopyToCache_Wait");
+
+ Work.Wait(m_Progress.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
+ const uint64_t WorkTotal = FilesToCache.size();
+ const uint64_t WorkComplete = OutCachedCount.load();
+ std::string Details = fmt::format("{}/{} ({}) files", WorkComplete, WorkTotal, NiceBytes(OutCachedByteCount));
+ ProgressBar->UpdateState({.Task = "Caching local ",
+ .Details = Details,
+ .TotalCount = gsl::narrow<uint64_t>(WorkTotal),
+ .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete),
+ .Status = ProgressBase::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ });
+ }
+
+ ProgressBar->Finish();
+}
+
+void
+BuildsOperationUpdateFolder::ScheduleScavengedSequenceWrites(WriteChunksContext& Context,
+ std::span<const ScavengedSequenceCopyOperation> CopyOperations,
+ const std::vector<ChunkedFolderContent>& ScavengedContents,
+ const std::vector<std::filesystem::path>& ScavengedPaths)
+{
+ for (uint32_t ScavengeOpIndex = 0; ScavengeOpIndex < CopyOperations.size(); ScavengeOpIndex++)
+ {
+ if (m_AbortFlag)
+ {
+ break;
+ }
+ Context.Work.ScheduleWork(
+ m_IOWorkerPool,
+ [this, &Context, &CopyOperations, &ScavengedContents, &ScavengedPaths, ScavengeOpIndex](std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_WriteScavenged");
+
+ Context.FilteredWrittenBytesPerSecond.Start();
+
+ const ScavengedSequenceCopyOperation& ScavengeOp = CopyOperations[ScavengeOpIndex];
+ const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengeOp.ScavengedContentIndex];
+ const std::filesystem::path& ScavengeRootPath = ScavengedPaths[ScavengeOp.ScavengedContentIndex];
+
+ WriteScavengedSequenceToCache(ScavengeRootPath, ScavengedContent, ScavengeOp);
+
+ if (Context.WritePartsComplete.fetch_add(1) + 1 == Context.TotalPartWriteCount)
+ {
+ Context.FilteredWrittenBytesPerSecond.Stop();
+ }
+ }
+ });
+ }
+}
+
+void
+BuildsOperationUpdateFolder::ScheduleLooseChunkWrites(WriteChunksContext& Context, std::vector<LooseChunkHashWorkData>& LooseChunkHashWorks)
+{
+ for (uint32_t LooseChunkHashWorkIndex = 0; LooseChunkHashWorkIndex < LooseChunkHashWorks.size(); LooseChunkHashWorkIndex++)
+ {
+ if (m_AbortFlag)
+ {
+ break;
+ }
+
+ Context.Work.ScheduleWork(
+ m_IOWorkerPool,
+ [this, &Context, &LooseChunkHashWorks, LooseChunkHashWorkIndex](std::atomic<bool>&) {
+ ZEN_TRACE_CPU("Async_ReadPreDownloadedChunk");
+ if (!m_AbortFlag)
+ {
+ LooseChunkHashWorkData& LooseChunkHashWork = LooseChunkHashWorks[LooseChunkHashWorkIndex];
+ const uint32_t RemoteChunkIndex = LooseChunkHashWork.RemoteChunkIndex;
+ WriteLooseChunk(RemoteChunkIndex,
+ Context.ExistsResult,
+ Context.SequenceIndexChunksLeftToWriteCounters,
+ Context.WritePartsComplete,
+ std::move(LooseChunkHashWork.ChunkTargetPtrs),
+ Context.WriteCache,
+ Context.Work,
+ Context.TotalRequestCount,
+ Context.TotalPartWriteCount,
+ Context.FilteredDownloadedBytesPerSecond,
+ Context.FilteredWrittenBytesPerSecond);
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+}
+
+void
+BuildsOperationUpdateFolder::ScheduleLocalChunkCopies(WriteChunksContext& Context,
+ std::span<const CopyChunkData> CopyChunkDatas,
+ CloneQueryInterface* CloneQuery,
+ const std::vector<ChunkedFolderContent>& ScavengedContents,
+ const std::vector<ChunkedContentLookup>& ScavengedLookups,
+ const std::vector<std::filesystem::path>& ScavengedPaths)
+{
+ for (size_t CopyDataIndex = 0; CopyDataIndex < CopyChunkDatas.size(); CopyDataIndex++)
+ {
+ if (m_AbortFlag)
+ {
+ break;
+ }
+
+ Context.Work.ScheduleWork(
+ m_IOWorkerPool,
+ [this, &Context, CloneQuery, &CopyChunkDatas, &ScavengedContents, &ScavengedLookups, &ScavengedPaths, CopyDataIndex](
+ std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_CopyLocal");
+
+ Context.FilteredWrittenBytesPerSecond.Start();
+ const CopyChunkData& CopyData = CopyChunkDatas[CopyDataIndex];
+
+ std::vector<uint32_t> WrittenSequenceIndexes = WriteLocalChunkToCache(CloneQuery,
+ CopyData,
+ ScavengedContents,
+ ScavengedLookups,
+ ScavengedPaths,
+ Context.WriteCache);
+ bool WritePartsDone = Context.WritePartsComplete.fetch_add(1) + 1 == Context.TotalPartWriteCount;
+ if (!m_AbortFlag)
+ {
+ if (WritePartsDone)
+ {
+ Context.FilteredWrittenBytesPerSecond.Stop();
+ }
+
+ // Write tracking, updating this must be done without any files open
+ std::vector<uint32_t> CompletedChunkSequences;
+ for (uint32_t RemoteSequenceIndex : WrittenSequenceIndexes)
+ {
+ if (CompleteSequenceChunk(RemoteSequenceIndex, Context.SequenceIndexChunksLeftToWriteCounters))
+ {
+ CompletedChunkSequences.push_back(RemoteSequenceIndex);
+ }
+ }
+ Context.WriteCache.Close(CompletedChunkSequences);
+ VerifyAndCompleteChunkSequencesAsync(CompletedChunkSequences, Context.Work);
+ }
+ }
+ });
+ }
+}
+
+void
+BuildsOperationUpdateFolder::ScheduleCachedBlockWrites(WriteChunksContext& Context, std::span<const uint32_t> CachedBlockIndexes)
+{
+ for (uint32_t BlockIndex : CachedBlockIndexes)
+ {
+ if (m_AbortFlag)
+ {
+ break;
+ }
+
+ Context.Work.ScheduleWork(m_IOWorkerPool, [this, &Context, BlockIndex](std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_WriteCachedBlock");
+
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
+ Context.FilteredWrittenBytesPerSecond.Start();
+
+ std::filesystem::path BlockChunkPath = m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString();
+ IoBuffer BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
+ if (!BlockBuffer)
+ {
+ throw std::runtime_error(fmt::format("Can not read block {} at {}", BlockDescription.BlockHash, BlockChunkPath));
+ }
+
+ if (!m_AbortFlag)
+ {
+ if (!WriteChunksBlockToCache(BlockDescription,
+ Context.SequenceIndexChunksLeftToWriteCounters,
+ Context.Work,
+ CompositeBuffer(std::move(BlockBuffer)),
+ Context.RemoteChunkIndexNeedsCopyFromSourceFlags,
+ Context.WriteCache))
+ {
+ std::error_code DummyEc;
+ RemoveFile(BlockChunkPath, DummyEc);
+ throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash));
+ }
+
+ std::error_code Ec = TryRemoveFile(BlockChunkPath);
+ if (Ec)
+ {
+ ZEN_DEBUG("Failed removing file '{}', reason: ({}) {}", BlockChunkPath, Ec.value(), Ec.message());
+ }
+
+ if (Context.WritePartsComplete.fetch_add(1) + 1 == Context.TotalPartWriteCount)
+ {
+ Context.FilteredWrittenBytesPerSecond.Stop();
+ }
+ }
+ }
+ });
+ }
+}
+
+void
+BuildsOperationUpdateFolder::SchedulePartialBlockDownloads(WriteChunksContext& Context,
+ const ChunkBlockAnalyser::BlockResult& PartialBlocks)
+{
+ for (size_t BlockRangeIndex = 0; BlockRangeIndex < PartialBlocks.BlockRanges.size();)
+ {
+ if (m_AbortFlag)
+ {
+ break;
+ }
+
+ size_t RangeCount = 1;
+ size_t RangesLeft = PartialBlocks.BlockRanges.size() - BlockRangeIndex;
+ const ChunkBlockAnalyser::BlockRangeDescriptor& CurrentBlockRange = PartialBlocks.BlockRanges[BlockRangeIndex];
+ while (RangeCount < RangesLeft &&
+ CurrentBlockRange.BlockIndex == PartialBlocks.BlockRanges[BlockRangeIndex + RangeCount].BlockIndex)
+ {
+ RangeCount++;
+ }
+
+ Context.Work.ScheduleWork(
+ m_NetworkPool,
+ [this, &Context, &PartialBlocks, BlockRangeStartIndex = BlockRangeIndex, RangeCount = RangeCount](std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_GetPartialBlockRanges");
+
+ Context.FilteredDownloadedBytesPerSecond.Start();
+
+ DownloadPartialBlock(
+ PartialBlocks.BlockRanges,
+ BlockRangeStartIndex,
+ RangeCount,
+ Context.ExistsResult,
+ Context.TotalRequestCount,
+ Context.FilteredDownloadedBytesPerSecond,
+ [this, &Context, &PartialBlocks](IoBuffer&& InMemoryBuffer,
+ const std::filesystem::path& OnDiskPath,
+ size_t BlockRangeStartIndex,
+ std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengths) {
+ if (!m_AbortFlag)
+ {
+ Context.Work.ScheduleWork(
+ m_IOWorkerPool,
+ [this,
+ &Context,
+ &PartialBlocks,
+ BlockRangeStartIndex,
+ BlockChunkPath = std::filesystem::path(OnDiskPath),
+ BlockPartialBuffer = std::move(InMemoryBuffer),
+ OffsetAndLengths =
+ std::vector<std::pair<uint64_t, uint64_t>>(OffsetAndLengths.begin(), OffsetAndLengths.end())](
+ std::atomic<bool>&) mutable {
+ if (!m_AbortFlag)
+ {
+ WritePartialBlockToCache(Context,
+ BlockRangeStartIndex,
+ std::move(BlockPartialBuffer),
+ BlockChunkPath,
+ OffsetAndLengths,
+ PartialBlocks);
+ }
+ },
+ OnDiskPath.empty() ? WorkerThreadPool::EMode::DisableBacklog : WorkerThreadPool::EMode::EnableBacklog);
+ }
+ });
+ }
+ });
+ BlockRangeIndex += RangeCount;
+ }
+}
+
+void
+BuildsOperationUpdateFolder::WritePartialBlockToCache(WriteChunksContext& Context,
+ size_t BlockRangeStartIndex,
+ IoBuffer BlockPartialBuffer,
+ const std::filesystem::path& BlockChunkPath,
+ std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengths,
+ const ChunkBlockAnalyser::BlockResult& PartialBlocks)
+{
+ ZEN_TRACE_CPU("Async_WritePartialBlock");
+
+ const uint32_t BlockIndex = PartialBlocks.BlockRanges[BlockRangeStartIndex].BlockIndex;
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
+
+ if (BlockChunkPath.empty())
+ {
+ ZEN_ASSERT(BlockPartialBuffer);
+ }
+ else
+ {
+ ZEN_ASSERT(!BlockPartialBuffer);
+ BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
+ if (!BlockPartialBuffer)
+ {
+ throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}", BlockDescription.BlockHash, BlockChunkPath));
+ }
+ }
+
+ Context.FilteredWrittenBytesPerSecond.Start();
+
+ const size_t RangeCount = OffsetAndLengths.size();
+
+ for (size_t PartialRangeIndex = 0; PartialRangeIndex < RangeCount; PartialRangeIndex++)
+ {
+ const std::pair<uint64_t, uint64_t>& OffsetAndLength = OffsetAndLengths[PartialRangeIndex];
+ IoBuffer BlockRangeBuffer(BlockPartialBuffer, OffsetAndLength.first, OffsetAndLength.second);
+
+ const ChunkBlockAnalyser::BlockRangeDescriptor& RangeDescriptor =
+ PartialBlocks.BlockRanges[BlockRangeStartIndex + PartialRangeIndex];
+
+ if (!WritePartialBlockChunksToCache(BlockDescription,
+ Context.SequenceIndexChunksLeftToWriteCounters,
+ Context.Work,
+ CompositeBuffer(std::move(BlockRangeBuffer)),
+ RangeDescriptor.ChunkBlockIndexStart,
+ RangeDescriptor.ChunkBlockIndexStart + RangeDescriptor.ChunkBlockIndexCount - 1,
+ Context.RemoteChunkIndexNeedsCopyFromSourceFlags,
+ Context.WriteCache))
+ {
+ std::error_code DummyEc;
+ RemoveFile(BlockChunkPath, DummyEc);
+ throw std::runtime_error(fmt::format("Partial block {} is malformed", BlockDescription.BlockHash));
+ }
+
+ if (Context.WritePartsComplete.fetch_add(1) + 1 == Context.TotalPartWriteCount)
+ {
+ Context.FilteredWrittenBytesPerSecond.Stop();
+ }
+ }
+ std::error_code Ec = TryRemoveFile(BlockChunkPath);
+ if (Ec)
+ {
+ ZEN_DEBUG("Failed removing file '{}', reason: ({}) {}", BlockChunkPath, Ec.value(), Ec.message());
+ }
+}
+
+void
+BuildsOperationUpdateFolder::ScheduleFullBlockDownloads(WriteChunksContext& Context, std::span<const uint32_t> FullBlockIndexes)
+{
+ for (uint32_t BlockIndex : FullBlockIndexes)
+ {
+ if (m_AbortFlag)
+ {
+ break;
+ }
+
+ Context.Work.ScheduleWork(m_NetworkPool, [this, &Context, BlockIndex](std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_GetFullBlock");
+
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
+
+ Context.FilteredDownloadedBytesPerSecond.Start();
+
+ IoBuffer BlockBuffer;
+ const bool ExistsInCache =
+ m_Storage.CacheStorage && Context.ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash);
+ if (ExistsInCache)
+ {
+ BlockBuffer = m_Storage.CacheStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash);
+ }
+ if (!BlockBuffer)
+ {
+ try
+ {
+ BlockBuffer = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash);
+ }
+ catch (const std::exception&)
+ {
+ // Silence http errors due to abort
+ if (!m_AbortFlag)
+ {
+ throw;
+ }
+ }
+ }
+ if (!m_AbortFlag)
+ {
+ if (!BlockBuffer)
+ {
+ throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
+ }
+
+ uint64_t BlockSize = BlockBuffer.GetSize();
+ m_DownloadStats.DownloadedBlockCount++;
+ m_DownloadStats.DownloadedBlockByteCount += BlockSize;
+ if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == Context.TotalRequestCount)
+ {
+ Context.FilteredDownloadedBytesPerSecond.Stop();
+ }
+
+ const bool PutInCache = !ExistsInCache && m_Storage.CacheStorage && m_Options.PopulateCache;
+
+ std::filesystem::path BlockChunkPath =
+ TryMoveDownloadedChunk(BlockBuffer,
+ m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(),
+ /* ForceDiskBased */ PutInCache || (BlockSize > m_Options.MaximumInMemoryPayloadSize));
+
+ if (PutInCache)
+ {
+ ZEN_ASSERT(!BlockChunkPath.empty());
+ IoBuffer CacheBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
+ if (CacheBuffer)
+ {
+ m_Storage.CacheStorage->PutBuildBlob(m_BuildId,
+ BlockDescription.BlockHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(SharedBuffer(CacheBuffer)));
+ }
+ }
+
+ if (!m_AbortFlag)
+ {
+ Context.Work.ScheduleWork(
+ m_IOWorkerPool,
+ [this, &Context, BlockIndex, BlockChunkPath, BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable {
+ if (!m_AbortFlag)
+ {
+ WriteFullBlockToCache(Context, BlockIndex, std::move(BlockBuffer), BlockChunkPath);
+ }
+ },
+ BlockChunkPath.empty() ? WorkerThreadPool::EMode::DisableBacklog : WorkerThreadPool::EMode::EnableBacklog);
+ }
+ }
+ }
+ });
+ }
+}
+
+void
+BuildsOperationUpdateFolder::WriteFullBlockToCache(WriteChunksContext& Context,
+ uint32_t BlockIndex,
+ IoBuffer BlockBuffer,
+ const std::filesystem::path& BlockChunkPath)
+{
+ ZEN_TRACE_CPU("Async_WriteFullBlock");
+
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
+
+ if (BlockChunkPath.empty())
+ {
+ ZEN_ASSERT(BlockBuffer);
+ }
+ else
+ {
+ ZEN_ASSERT(!BlockBuffer);
+ BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
+ if (!BlockBuffer)
+ {
+ throw std::runtime_error(fmt::format("Could not open dowloaded block {} from {}", BlockDescription.BlockHash, BlockChunkPath));
+ }
+ }
+
+ Context.FilteredWrittenBytesPerSecond.Start();
+ if (!WriteChunksBlockToCache(BlockDescription,
+ Context.SequenceIndexChunksLeftToWriteCounters,
+ Context.Work,
+ CompositeBuffer(std::move(BlockBuffer)),
+ Context.RemoteChunkIndexNeedsCopyFromSourceFlags,
+ Context.WriteCache))
+ {
+ std::error_code DummyEc;
+ RemoveFile(BlockChunkPath, DummyEc);
+ throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash));
+ }
+
+ if (!BlockChunkPath.empty())
+ {
+ std::error_code Ec = TryRemoveFile(BlockChunkPath);
+ if (Ec)
+ {
+ ZEN_DEBUG("Failed removing file '{}', reason: ({}) {}", BlockChunkPath, Ec.value(), Ec.message());
+ }
+ }
+
+ if (Context.WritePartsComplete.fetch_add(1) + 1 == Context.TotalPartWriteCount)
+ {
+ Context.FilteredWrittenBytesPerSecond.Stop();
+ }
+}
+
+void
+BuildsOperationUpdateFolder::ScheduleLocalFileRemovals(ParallelWork& Work,
+ std::span<const uint32_t> RemoveLocalPathIndexes,
+ std::atomic<uint64_t>& DeletedCount)
+{
+ for (uint32_t LocalPathIndex : RemoveLocalPathIndexes)
+ {
+ if (m_AbortFlag)
+ {
+ break;
+ }
+ Work.ScheduleWork(m_IOWorkerPool, [this, &DeletedCount, LocalPathIndex](std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_RemoveFile");
+
+ const std::filesystem::path LocalFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred();
+ SetFileReadOnlyWithRetry(LocalFilePath, false);
+ RemoveFileWithRetry(LocalFilePath);
+ DeletedCount++;
+ }
+ });
+ }
+}
+
+void
+BuildsOperationUpdateFolder::ScheduleTargetFinalization(
+ ParallelWork& Work,
+ std::span<const FinalizeTarget> Targets,
+ const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& SequenceHashToLocalPathIndex,
+ const tsl::robin_map<uint32_t, uint32_t>& RemotePathIndexToLocalPathIndex,
+ FolderContent& OutLocalFolderState,
+ std::atomic<uint64_t>& TargetsComplete)
+{
+ size_t TargetOffset = 0;
+ while (TargetOffset < Targets.size())
+ {
+ if (m_AbortFlag)
+ {
+ break;
+ }
+
+ size_t TargetCount = 1;
+ while ((TargetOffset + TargetCount) < Targets.size() &&
+ (Targets[TargetOffset + TargetCount].RawHash == Targets[TargetOffset].RawHash))
+ {
+ TargetCount++;
+ }
+
+ Work.ScheduleWork(m_IOWorkerPool,
+ [this,
+ &SequenceHashToLocalPathIndex,
+ Targets,
+ &RemotePathIndexToLocalPathIndex,
+ &OutLocalFolderState,
+ BaseTargetOffset = TargetOffset,
+ TargetCount,
+ &TargetsComplete](std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ FinalizeTargetGroup(BaseTargetOffset,
+ TargetCount,
+ Targets,
+ SequenceHashToLocalPathIndex,
+ RemotePathIndexToLocalPathIndex,
+ OutLocalFolderState,
+ TargetsComplete);
+ }
+ });
+
+ TargetOffset += TargetCount;
+ }
+}
+
+void
+BuildsOperationUpdateFolder::FinalizeTargetGroup(size_t BaseOffset,
+ size_t Count,
+ std::span<const FinalizeTarget> Targets,
+ const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& SequenceHashToLocalPathIndex,
+ const tsl::robin_map<uint32_t, uint32_t>& RemotePathIndexToLocalPathIndex,
+ FolderContent& OutLocalFolderState,
+ std::atomic<uint64_t>& TargetsComplete)
+{
+ ZEN_TRACE_CPU("Async_FinalizeChunkSequence");
+
+ size_t TargetOffset = BaseOffset;
+ const IoHash& RawHash = Targets[TargetOffset].RawHash;
+
+ if (RawHash == IoHash::Zero)
+ {
+ ZEN_TRACE_CPU("CreateEmptyFiles");
+ while (TargetOffset < (BaseOffset + Count))
+ {
+ const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex;
+ ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash);
+ const std::filesystem::path& TargetPath = m_RemoteContent.Paths[RemotePathIndex];
+ std::filesystem::path TargetFilePath = (m_Path / TargetPath).make_preferred();
+ auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(RemotePathIndex);
+ if (InPlaceIt == RemotePathIndexToLocalPathIndex.end() || InPlaceIt->second == 0)
+ {
+ if (IsFileWithRetry(TargetFilePath))
+ {
+ SetFileReadOnlyWithRetry(TargetFilePath, false);
+ }
+ else
+ {
+ CreateDirectories(TargetFilePath.parent_path());
+ }
+ BasicFile OutputFile;
+ OutputFile.Open(TargetFilePath, BasicFile::Mode::kTruncate);
+ }
+ OutLocalFolderState.Paths[RemotePathIndex] = TargetPath;
+ OutLocalFolderState.RawSizes[RemotePathIndex] = m_RemoteContent.RawSizes[RemotePathIndex];
+
+ OutLocalFolderState.Attributes[RemotePathIndex] =
+ m_RemoteContent.Attributes.empty()
+ ? GetNativeFileAttributes(TargetFilePath)
+ : SetNativeFileAttributes(TargetFilePath, m_RemoteContent.Platform, m_RemoteContent.Attributes[RemotePathIndex]);
+ OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath);
+
+ TargetOffset++;
+ TargetsComplete++;
+ }
+ }
+ else
+ {
+ ZEN_TRACE_CPU("FinalizeFile");
+ ZEN_ASSERT(m_RemoteLookup.RawHashToSequenceIndex.contains(RawHash));
+ const uint32_t FirstRemotePathIndex = Targets[TargetOffset].RemotePathIndex;
+ const std::filesystem::path& FirstTargetPath = m_RemoteContent.Paths[FirstRemotePathIndex];
+ std::filesystem::path FirstTargetFilePath = (m_Path / FirstTargetPath).make_preferred();
+
+ if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(FirstRemotePathIndex); InPlaceIt != RemotePathIndexToLocalPathIndex.end())
+ {
+ ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath));
+ }
+ else
+ {
+ if (IsFileWithRetry(FirstTargetFilePath))
+ {
+ SetFileReadOnlyWithRetry(FirstTargetFilePath, false);
+ }
+ else
+ {
+ CreateDirectories(FirstTargetFilePath.parent_path());
+ }
+
+ if (auto InplaceIt = SequenceHashToLocalPathIndex.find(RawHash); InplaceIt != SequenceHashToLocalPathIndex.end())
+ {
+ ZEN_TRACE_CPU("Copy");
+ const uint32_t LocalPathIndex = InplaceIt->second;
+ const std::filesystem::path& SourcePath = m_LocalContent.Paths[LocalPathIndex];
+ std::filesystem::path SourceFilePath = (m_Path / SourcePath).make_preferred();
+ ZEN_ASSERT_SLOW(IsFileWithRetry(SourceFilePath));
+
+ ZEN_DEBUG("Copying from '{}' -> '{}'", SourceFilePath, FirstTargetFilePath);
+ const uint64_t RawSize = m_LocalContent.RawSizes[LocalPathIndex];
+ FastCopyFile(m_Options.AllowFileClone,
+ m_Options.UseSparseFiles,
+ SourceFilePath,
+ FirstTargetFilePath,
+ RawSize,
+ m_DiskStats.WriteCount,
+ m_DiskStats.WriteByteCount,
+ m_DiskStats.CloneCount,
+ m_DiskStats.CloneByteCount);
+
+ m_RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++;
+ }
+ else
+ {
+ ZEN_TRACE_CPU("Rename");
+ const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RawHash);
+ ZEN_ASSERT_SLOW(IsFileWithRetry(CacheFilePath));
+
+ std::error_code Ec = RenameFileWithRetry(CacheFilePath, FirstTargetFilePath);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to move file from '{}' to '{}', reason: ({}) {}, retrying...",
+ CacheFilePath,
+ FirstTargetFilePath,
+ Ec.value(),
+ Ec.message());
+ Ec = RenameFileWithRetry(CacheFilePath, FirstTargetFilePath);
+ if (Ec)
+ {
+ throw std::system_error(std::error_code(Ec.value(), std::system_category()),
+ fmt::format("Failed to move file from '{}' to '{}', reason: ({}) {}",
+ CacheFilePath,
+ FirstTargetFilePath,
+ Ec.value(),
+ Ec.message()));
+ }
+ }
+
+ m_RebuildFolderStateStats.FinalizeTreeFilesMovedCount++;
+ }
+ }
+
+ OutLocalFolderState.Paths[FirstRemotePathIndex] = FirstTargetPath;
+ OutLocalFolderState.RawSizes[FirstRemotePathIndex] = m_RemoteContent.RawSizes[FirstRemotePathIndex];
+
+ OutLocalFolderState.Attributes[FirstRemotePathIndex] =
+ m_RemoteContent.Attributes.empty()
+ ? GetNativeFileAttributes(FirstTargetFilePath)
+ : SetNativeFileAttributes(FirstTargetFilePath, m_RemoteContent.Platform, m_RemoteContent.Attributes[FirstRemotePathIndex]);
+ OutLocalFolderState.ModificationTicks[FirstRemotePathIndex] = GetModificationTickFromPath(FirstTargetFilePath);
+
+ TargetOffset++;
+ TargetsComplete++;
+
+ while (TargetOffset < (BaseOffset + Count))
+ {
+ const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex;
+ ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash);
+ const std::filesystem::path& TargetPath = m_RemoteContent.Paths[RemotePathIndex];
+ std::filesystem::path TargetFilePath = (m_Path / TargetPath).make_preferred();
+
+ if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(RemotePathIndex); InPlaceIt != RemotePathIndexToLocalPathIndex.end())
+ {
+ ZEN_ASSERT_SLOW(IsFileWithRetry(TargetFilePath));
+ }
+ else
+ {
+ ZEN_TRACE_CPU("Copy");
+ if (IsFileWithRetry(TargetFilePath))
+ {
+ SetFileReadOnlyWithRetry(TargetFilePath, false);
+ }
+ else
+ {
+ CreateDirectories(TargetFilePath.parent_path());
+ }
+
+ ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath));
+ ZEN_DEBUG("Copying from '{}' -> '{}'", FirstTargetFilePath, TargetFilePath);
+ const uint64_t RawSize = m_RemoteContent.RawSizes[RemotePathIndex];
+ FastCopyFile(m_Options.AllowFileClone,
+ m_Options.UseSparseFiles,
+ FirstTargetFilePath,
+ TargetFilePath,
+ RawSize,
+ m_DiskStats.WriteCount,
+ m_DiskStats.WriteByteCount,
+ m_DiskStats.CloneCount,
+ m_DiskStats.CloneByteCount);
+
+ m_RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++;
+ }
+
+ OutLocalFolderState.Paths[RemotePathIndex] = TargetPath;
+ OutLocalFolderState.RawSizes[RemotePathIndex] = m_RemoteContent.RawSizes[RemotePathIndex];
+
+ OutLocalFolderState.Attributes[RemotePathIndex] =
+ m_RemoteContent.Attributes.empty()
+ ? GetNativeFileAttributes(TargetFilePath)
+ : SetNativeFileAttributes(TargetFilePath, m_RemoteContent.Platform, m_RemoteContent.Attributes[RemotePathIndex]);
+ OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath);
+
+ TargetOffset++;
+ TargetsComplete++;
+ }
+ }
+}
+
+std::vector<BuildsOperationUpdateFolder::ScavengeSource>
+BuildsOperationUpdateFolder::FindScavengeSources()
+{
+ ZEN_TRACE_CPU("FindScavengeSources");
+
+ const bool TargetPathExists = IsDir(m_Path);
+
+ std::vector<std::filesystem::path> StatePaths = GetDownloadedStatePaths(m_Options.SystemRootDir);
+
+ std::vector<ScavengeSource> Result;
+ for (const std::filesystem::path& EntryPath : StatePaths)
+ {
+ if (IsFile(EntryPath))
+ {
+ bool DeleteEntry = false;
+
+ try
+ {
+ BuildsDownloadInfo Info = ReadDownloadedInfoFile(EntryPath);
+ const bool LocalPathExists = !Info.LocalPath.empty() && IsDir(Info.LocalPath);
+ const bool LocalStateFileExists = IsFile(Info.StateFilePath);
+ if (LocalPathExists && LocalStateFileExists)
+ {
+ if (TargetPathExists && std::filesystem::equivalent(Info.LocalPath, m_Path))
+ {
+ DeleteEntry = true;
+ }
+ else
+ {
+ Result.push_back({.StateFilePath = std::move(Info.StateFilePath), .Path = std::move(Info.LocalPath)});
+ }
+ }
+ else
+ {
+ DeleteEntry = true;
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("{}", Ex.what());
+ DeleteEntry = true;
+ }
+
+ if (DeleteEntry)
+ {
+ std::error_code DummyEc;
+ std::filesystem::remove(EntryPath, DummyEc);
+ }
+ }
+ }
+ return Result;
+}
+
+std::vector<uint32_t>
+BuildsOperationUpdateFolder::ScanTargetFolder(const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& CachedChunkHashesFound,
+ const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& CachedSequenceHashesFound)
+{
+ ZEN_TRACE_CPU("ScanTargetFolder");
+
+ Stopwatch LocalTimer;
+
+ std::vector<uint32_t> MissingSequenceIndexes;
+
+ for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < m_RemoteContent.ChunkedContent.SequenceRawHashes.size();
+ RemoteSequenceIndex++)
+ {
+ const IoHash& RemoteSequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ const uint32_t RemotePathIndex = GetFirstPathIndexForSeqeuenceIndex(m_RemoteLookup, RemoteSequenceIndex);
+ const uint64_t RemoteRawSize = m_RemoteContent.RawSizes[RemotePathIndex];
+ if (auto CacheSequenceIt = CachedSequenceHashesFound.find(RemoteSequenceRawHash);
+ CacheSequenceIt != CachedSequenceHashesFound.end())
+ {
+ const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RemoteSequenceRawHash);
+ ZEN_ASSERT_SLOW(IsFile(CacheFilePath));
+ if (m_Options.IsVerbose)
+ {
+ ZEN_INFO("Found sequence {} at {} ({})", RemoteSequenceRawHash, CacheFilePath, NiceBytes(RemoteRawSize));
+ }
+ }
+ else if (auto CacheChunkIt = CachedChunkHashesFound.find(RemoteSequenceRawHash); CacheChunkIt != CachedChunkHashesFound.end())
+ {
+ const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RemoteSequenceRawHash);
+ ZEN_ASSERT_SLOW(IsFile(CacheFilePath));
+ if (m_Options.IsVerbose)
+ {
+ ZEN_INFO("Found chunk {} at {} ({})", RemoteSequenceRawHash, CacheFilePath, NiceBytes(RemoteRawSize));
+ }
+ }
+ else if (auto It = m_LocalLookup.RawHashToSequenceIndex.find(RemoteSequenceRawHash);
+ It != m_LocalLookup.RawHashToSequenceIndex.end())
+ {
+ const uint32_t LocalSequenceIndex = It->second;
+ const uint32_t LocalPathIndex = GetFirstPathIndexForSeqeuenceIndex(m_LocalLookup, LocalSequenceIndex);
+ const std::filesystem::path LocalFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred();
+ ZEN_ASSERT_SLOW(IsFile(LocalFilePath));
+ m_CacheMappingStats.LocalPathsMatchingSequencesCount++;
+ m_CacheMappingStats.LocalPathsMatchingSequencesByteCount += RemoteRawSize;
+ if (m_Options.IsVerbose)
+ {
+ ZEN_INFO("Found sequence {} at {} ({})", RemoteSequenceRawHash, LocalFilePath, NiceBytes(RemoteRawSize));
+ }
+ }
+ else
+ {
+ MissingSequenceIndexes.push_back(RemoteSequenceIndex);
+ }
+ }
+
+ m_CacheMappingStats.LocalScanElapsedWallTimeUs += LocalTimer.GetElapsedTimeUs();
+ return MissingSequenceIndexes;
+}
+
+bool
+BuildsOperationUpdateFolder::FindScavengeContent(const ScavengeSource& Source,
+ ChunkedFolderContent& OutScavengedLocalContent,
+ ChunkedContentLookup& OutScavengedLookup)
+{
+ ZEN_TRACE_CPU("FindScavengeContent");
+
+ FolderContent LocalFolderState;
+ try
+ {
+ BuildSaveState SavedState = ReadBuildSaveStateFile(Source.StateFilePath);
+ if (SavedState.Version == BuildSaveState::NoVersion)
+ {
+ ZEN_DEBUG("Skipping old build state at '{}', state files before version {} can not be trusted during scavenge",
+ Source.StateFilePath,
+ BuildSaveState::kVersion1);
+ return false;
+ }
+ OutScavengedLocalContent = std::move(SavedState.State.ChunkedContent);
+ LocalFolderState = std::move(SavedState.FolderState);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_DEBUG("Skipping invalid build state at '{}', reason: {}", Source.StateFilePath, Ex.what());
+ return false;
+ }
+
+ tsl::robin_set<uint32_t> PathIndexesToScavenge;
+ PathIndexesToScavenge.reserve(OutScavengedLocalContent.Paths.size());
+ std::vector<uint32_t> ChunkOrderOffsets = BuildChunkOrderOffset(OutScavengedLocalContent.ChunkedContent.ChunkCounts);
+
+ {
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> RawHashToPathIndex;
+
+ RawHashToPathIndex.reserve(OutScavengedLocalContent.Paths.size());
+ for (uint32_t ScavengedPathIndex = 0; ScavengedPathIndex < OutScavengedLocalContent.RawHashes.size(); ScavengedPathIndex++)
+ {
+ if (!RawHashToPathIndex.contains(OutScavengedLocalContent.RawHashes[ScavengedPathIndex]))
+ {
+ RawHashToPathIndex.insert_or_assign(OutScavengedLocalContent.RawHashes[ScavengedPathIndex], ScavengedPathIndex);
+ }
+ }
+
+ for (uint32_t ScavengeSequenceIndex = 0; ScavengeSequenceIndex < OutScavengedLocalContent.ChunkedContent.SequenceRawHashes.size();
+ ScavengeSequenceIndex++)
+ {
+ const IoHash& SequenceHash = OutScavengedLocalContent.ChunkedContent.SequenceRawHashes[ScavengeSequenceIndex];
+ if (auto It = RawHashToPathIndex.find(SequenceHash); It != RawHashToPathIndex.end())
+ {
+ uint32_t PathIndex = It->second;
+ if (!PathIndexesToScavenge.contains(PathIndex))
+ {
+ if (m_RemoteLookup.RawHashToSequenceIndex.contains(SequenceHash))
+ {
+ PathIndexesToScavenge.insert(PathIndex);
+ }
+ else
+ {
+ uint32_t ChunkOrderIndexStart = ChunkOrderOffsets[ScavengeSequenceIndex];
+ const uint32_t ChunkCount = OutScavengedLocalContent.ChunkedContent.ChunkCounts[ScavengeSequenceIndex];
+ for (uint32_t ChunkOrderIndex = 0; ChunkOrderIndex < ChunkCount; ChunkOrderIndex++)
+ {
+ const uint32_t ChunkIndex =
+ OutScavengedLocalContent.ChunkedContent.ChunkOrders[ChunkOrderIndexStart + ChunkOrderIndex];
+ const IoHash& ChunkHash = OutScavengedLocalContent.ChunkedContent.ChunkHashes[ChunkIndex];
+ if (m_RemoteLookup.ChunkHashToChunkIndex.contains(ChunkHash))
+ {
+ PathIndexesToScavenge.insert(PathIndex);
+ break;
+ }
+ }
+ }
+ }
+ }
+ else
+ {
+ ZEN_WARN("Scavenged state file at '{}' for '{}' is invalid, skipping scavenging for sequence {}",
+ Source.StateFilePath,
+ Source.Path,
+ SequenceHash);
+ }
+ }
+ }
+
+ if (PathIndexesToScavenge.empty())
+ {
+ OutScavengedLocalContent = {};
+ return false;
+ }
+
+ std::vector<std::filesystem::path> PathsToScavenge;
+ PathsToScavenge.reserve(PathIndexesToScavenge.size());
+ for (uint32_t ScavengedStatePathIndex : PathIndexesToScavenge)
+ {
+ PathsToScavenge.push_back(OutScavengedLocalContent.Paths[ScavengedStatePathIndex]);
+ }
+
+ FolderContent ValidFolderContent =
+ GetValidFolderContent(m_IOWorkerPool, m_ScavengedFolderScanStats, Source.Path, PathsToScavenge, {}, 0, m_AbortFlag, m_PauseFlag);
+
+ if (!LocalFolderState.AreKnownFilesEqual(ValidFolderContent))
+ {
+ std::vector<std::filesystem::path> DeletedPaths;
+ FolderContent UpdatedContent = GetUpdatedContent(LocalFolderState, ValidFolderContent, DeletedPaths);
+
+ // If the files are modified since the state was saved we ignore the files since we don't
+ // want to incur the cost of scanning/hashing scavenged files
+ DeletedPaths.insert(DeletedPaths.end(), UpdatedContent.Paths.begin(), UpdatedContent.Paths.end());
+ if (!DeletedPaths.empty())
+ {
+ OutScavengedLocalContent =
+ DeletePathsFromChunkedContent(OutScavengedLocalContent,
+ BuildHashLookup(OutScavengedLocalContent.ChunkedContent.SequenceRawHashes),
+ ChunkOrderOffsets,
+ DeletedPaths);
+ }
+ }
+
+ if (OutScavengedLocalContent.Paths.empty())
+ {
+ OutScavengedLocalContent = {};
+ return false;
+ }
+
+ OutScavengedLookup = BuildChunkedContentLookup(OutScavengedLocalContent);
+
+ return true;
+}
+
+void
+BuildsOperationUpdateFolder::ScavengeSourceForChunks(uint32_t& InOutRemainingChunkCount,
+ std::vector<bool>& InOutRemoteChunkIndexNeedsCopyFromLocalFileFlags,
+ tsl::robin_map<IoHash, size_t, IoHash::Hasher>& InOutRawHashToCopyChunkDataIndex,
+ const std::vector<std::atomic<uint32_t>>& SequenceIndexChunksLeftToWriteCounters,
+ const ChunkedFolderContent& ScavengedContent,
+ const ChunkedContentLookup& ScavengedLookup,
+ std::vector<CopyChunkData>& InOutCopyChunkDatas,
+ uint32_t ScavengedContentIndex,
+ uint64_t& InOutChunkMatchingRemoteCount,
+ uint64_t& InOutChunkMatchingRemoteByteCount)
+{
+ for (uint32_t RemoteChunkIndex = 0;
+ RemoteChunkIndex < m_RemoteContent.ChunkedContent.ChunkHashes.size() && (InOutRemainingChunkCount > 0);
+ RemoteChunkIndex++)
+ {
+ if (!InOutRemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex])
+ {
+ const IoHash& RemoteChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+ if (auto It = ScavengedLookup.ChunkHashToChunkIndex.find(RemoteChunkHash); It != ScavengedLookup.ChunkHashToChunkIndex.end())
+ {
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
+ GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex);
+
+ if (!ChunkTargetPtrs.empty())
+ {
+ const uint32_t ScavengedChunkIndex = It->second;
+ const uint64_t ScavengedChunkRawSize = ScavengedContent.ChunkedContent.ChunkRawSizes[ScavengedChunkIndex];
+ const size_t ChunkSequenceLocationOffset = ScavengedLookup.ChunkSequenceLocationOffset[ScavengedChunkIndex];
+ const ChunkedContentLookup::ChunkSequenceLocation& ScavengeLocation =
+ ScavengedLookup.ChunkSequenceLocations[ChunkSequenceLocationOffset];
+ const IoHash& ScavengedSequenceRawHash =
+ ScavengedContent.ChunkedContent.SequenceRawHashes[ScavengeLocation.SequenceIndex];
+
+ CopyChunkData::ChunkTarget Target = {.TargetChunkLocationCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()),
+ .RemoteChunkIndex = RemoteChunkIndex,
+ .CacheFileOffset = ScavengeLocation.Offset};
+ if (auto CopySourceIt = InOutRawHashToCopyChunkDataIndex.find(ScavengedSequenceRawHash);
+ CopySourceIt != InOutRawHashToCopyChunkDataIndex.end())
+ {
+ CopyChunkData& Data = InOutCopyChunkDatas[CopySourceIt->second];
+ if (Data.TargetChunkLocationPtrs.size() > 1024)
+ {
+ InOutRawHashToCopyChunkDataIndex.insert_or_assign(ScavengedSequenceRawHash, InOutCopyChunkDatas.size());
+ InOutCopyChunkDatas.push_back(CopyChunkData{.ScavengeSourceIndex = ScavengedContentIndex,
+ .SourceSequenceIndex = ScavengeLocation.SequenceIndex,
+ .TargetChunkLocationPtrs = ChunkTargetPtrs,
+ .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}});
+ }
+ else
+ {
+ Data.TargetChunkLocationPtrs.insert(Data.TargetChunkLocationPtrs.end(),
+ ChunkTargetPtrs.begin(),
+ ChunkTargetPtrs.end());
+ Data.ChunkTargets.push_back(Target);
+ }
+ }
+ else
+ {
+ InOutRawHashToCopyChunkDataIndex.insert_or_assign(ScavengedSequenceRawHash, InOutCopyChunkDatas.size());
+ InOutCopyChunkDatas.push_back(CopyChunkData{.ScavengeSourceIndex = ScavengedContentIndex,
+ .SourceSequenceIndex = ScavengeLocation.SequenceIndex,
+ .TargetChunkLocationPtrs = ChunkTargetPtrs,
+ .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}});
+ }
+ InOutChunkMatchingRemoteCount++;
+ InOutChunkMatchingRemoteByteCount += ScavengedChunkRawSize;
+ InOutRemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex] = true;
+ InOutRemainingChunkCount--;
+ }
+ }
+ }
+ }
+}
+
+std::filesystem::path
+BuildsOperationUpdateFolder::FindDownloadedChunk(const IoHash& ChunkHash)
+{
+ ZEN_TRACE_CPU("FindDownloadedChunk");
+
+ std::filesystem::path CompressedChunkPath = m_TempDownloadFolderPath / ChunkHash.ToHexString();
+ if (IsFile(CompressedChunkPath))
+ {
+ IoBuffer ExistingCompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath);
+ if (ExistingCompressedPart)
+ {
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (CompressedBuffer::ValidateCompressedHeader(ExistingCompressedPart,
+ RawHash,
+ RawSize,
+ /*OutOptionalTotalCompressedSize*/ nullptr))
+ {
+ return CompressedChunkPath;
+ }
+ else
+ {
+ std::error_code DummyEc;
+ RemoveFile(CompressedChunkPath, DummyEc);
+ }
+ }
+ }
+ return {};
+}
+
+std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>
+BuildsOperationUpdateFolder::GetRemainingChunkTargets(std::span<const std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ uint32_t ChunkIndex)
+{
+ ZEN_TRACE_CPU("GetRemainingChunkTargets");
+
+ std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkSources = GetChunkSequenceLocations(m_RemoteLookup, ChunkIndex);
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs;
+ if (!ChunkSources.empty())
+ {
+ ChunkTargetPtrs.reserve(ChunkSources.size());
+ for (const ChunkedContentLookup::ChunkSequenceLocation& Source : ChunkSources)
+ {
+ if (SequenceIndexChunksLeftToWriteCounters[Source.SequenceIndex].load() > 0)
+ {
+ ChunkTargetPtrs.push_back(&Source);
+ }
+ }
+ }
+ return ChunkTargetPtrs;
+};
+
+uint64_t
+BuildsOperationUpdateFolder::GetChunkWriteCount(std::span<const std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ uint32_t ChunkIndex)
+{
+ ZEN_TRACE_CPU("GetChunkWriteCount");
+
+ uint64_t WriteCount = 0;
+ std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkSources = GetChunkSequenceLocations(m_RemoteLookup, ChunkIndex);
+ for (const ChunkedContentLookup::ChunkSequenceLocation& Source : ChunkSources)
+ {
+ if (SequenceIndexChunksLeftToWriteCounters[Source.SequenceIndex].load() > 0)
+ {
+ WriteCount++;
+ }
+ }
+ return WriteCount;
+};
+
+void
+BuildsOperationUpdateFolder::CheckRequiredDiskSpace(const tsl::robin_map<std::string, uint32_t>& RemotePathToRemoteIndex)
+{
+ tsl::robin_set<uint32_t> ExistingRemotePaths;
+
+ if (m_Options.EnableTargetFolderScavenging)
+ {
+ for (uint32_t LocalPathIndex = 0; LocalPathIndex < m_LocalContent.Paths.size(); LocalPathIndex++)
+ {
+ const IoHash& RawHash = m_LocalContent.RawHashes[LocalPathIndex];
+ const std::filesystem::path& LocalPath = m_LocalContent.Paths[LocalPathIndex];
+
+ if (auto RemotePathIt = RemotePathToRemoteIndex.find(LocalPath.generic_string()); RemotePathIt != RemotePathToRemoteIndex.end())
+ {
+ const uint32_t RemotePathIndex = RemotePathIt->second;
+ if (m_RemoteContent.RawHashes[RemotePathIndex] == RawHash)
+ {
+ ExistingRemotePaths.insert(RemotePathIndex);
+ }
+ }
+ }
+ }
+
+ uint64_t RequiredSpace = 0;
+ for (uint32_t RemotePathIndex = 0; RemotePathIndex < m_RemoteContent.Paths.size(); RemotePathIndex++)
+ {
+ if (!ExistingRemotePaths.contains(RemotePathIndex))
+ {
+ RequiredSpace += m_RemoteContent.RawSizes[RemotePathIndex];
+ }
+ }
+
+ std::error_code Ec;
+ DiskSpace Space = DiskSpaceInfo(m_Path, Ec);
+ if (Ec)
+ {
+ throw std::runtime_error(fmt::format("Get free disk space for target path '{}' FAILED, reason: {}", m_Path, Ec.message()));
+ }
+ if (Space.Free < (RequiredSpace + 16u * 1024u * 1024u))
+ {
+ throw std::runtime_error(
+ fmt::format("Not enough free space for target path '{}', {} of free space is needed but only {} is available",
+ m_Path,
+ NiceBytes(RequiredSpace),
+ NiceBytes(Space.Free)));
+ }
+}
+
+void
+BuildsOperationUpdateFolder::WriteScavengedSequenceToCache(const std::filesystem::path& ScavengeRootPath,
+ const ChunkedFolderContent& ScavengedContent,
+ const ScavengedSequenceCopyOperation& ScavengeOp)
+{
+ ZEN_TRACE_CPU("WriteScavengedSequenceToCache");
+
+ const std::filesystem::path ScavengedPath = ScavengedContent.Paths[ScavengeOp.ScavengedPathIndex];
+ const std::filesystem::path ScavengedFilePath = (ScavengeRootPath / ScavengedPath).make_preferred();
+ ZEN_ASSERT_SLOW(FileSizeFromPath(ScavengedFilePath) == ScavengeOp.RawSize);
+
+ const IoHash& RemoteSequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[ScavengeOp.RemoteSequenceIndex];
+ const std::filesystem::path TempFilePath = GetTempChunkedSequenceFileName(m_CacheFolderPath, RemoteSequenceRawHash);
+
+ const uint64_t RawSize = ScavengedContent.RawSizes[ScavengeOp.ScavengedPathIndex];
+ FastCopyFile(m_Options.AllowFileClone,
+ m_Options.UseSparseFiles,
+ ScavengedFilePath,
+ TempFilePath,
+ RawSize,
+ m_DiskStats.WriteCount,
+ m_DiskStats.WriteByteCount,
+ m_DiskStats.CloneCount,
+ m_DiskStats.CloneByteCount);
+
+ const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RemoteSequenceRawHash);
+ RenameFile(TempFilePath, CacheFilePath);
+
+ m_WrittenChunkByteCount += RawSize;
+ if (m_Options.ValidateCompletedSequences)
+ {
+ m_ValidatedChunkByteCount += RawSize;
+ }
+}
+
+void
+BuildsOperationUpdateFolder::WriteLooseChunk(const uint32_t RemoteChunkIndex,
+ const BlobsExistsResult& ExistsResult,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ std::atomic<uint64_t>& WritePartsComplete,
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>&& ChunkTargetPtrs,
+ BufferedWriteFileCache& WriteCache,
+ ParallelWork& Work,
+ uint64_t TotalRequestCount,
+ uint64_t TotalPartWriteCount,
+ FilteredRate& FilteredDownloadedBytesPerSecond,
+ FilteredRate& FilteredWrittenBytesPerSecond)
+{
+ const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+ std::filesystem::path ExistingCompressedChunkPath = FindDownloadedChunk(ChunkHash);
+ if (!ExistingCompressedChunkPath.empty())
+ {
+ if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ }
+ if (!m_AbortFlag)
+ {
+ if (!ExistingCompressedChunkPath.empty())
+ {
+ Work.ScheduleWork(
+ m_IOWorkerPool,
+ [this,
+ SequenceIndexChunksLeftToWriteCounters,
+ &WriteCache,
+ &Work,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ &FilteredWrittenBytesPerSecond,
+ RemoteChunkIndex,
+ ChunkTargetPtrs = std::move(ChunkTargetPtrs),
+ CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>& AbortFlag) {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_WritePreDownloadedChunk");
+
+ FilteredWrittenBytesPerSecond.Start();
+
+ const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+
+ IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath);
+ if (!CompressedPart)
+ {
+ throw std::runtime_error(
+ fmt::format("Could not open dowloaded compressed chunk {} from {}", ChunkHash, CompressedChunkPath));
+ }
+
+ bool NeedHashVerify =
+ WriteCompressedChunkToCache(ChunkHash, ChunkTargetPtrs, WriteCache, std::move(CompressedPart));
+ bool WritePartsDone = WritePartsComplete.fetch_add(1) + 1 == TotalPartWriteCount;
+
+ if (!AbortFlag)
+ {
+ if (WritePartsDone)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
+
+ std::error_code Ec = TryRemoveFile(CompressedChunkPath);
+ if (Ec)
+ {
+ ZEN_DEBUG("Failed removing file '{}', reason: ({}) {}", CompressedChunkPath, Ec.value(), Ec.message());
+ }
+
+ std::vector<uint32_t> CompletedSequences =
+ CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters);
+ WriteCache.Close(CompletedSequences);
+ if (NeedHashVerify)
+ {
+ VerifyAndCompleteChunkSequencesAsync(CompletedSequences, Work);
+ }
+ else
+ {
+ FinalizeChunkSequences(CompletedSequences);
+ }
+ }
+ }
+ });
+ }
+ else
+ {
+ Work.ScheduleWork(m_NetworkPool,
+ [this,
+ &ExistsResult,
+ SequenceIndexChunksLeftToWriteCounters,
+ &WriteCache,
+ &Work,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ TotalRequestCount,
+ &FilteredDownloadedBytesPerSecond,
+ &FilteredWrittenBytesPerSecond,
+ RemoteChunkIndex,
+ ChunkTargetPtrs = std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>(
+ std::move(ChunkTargetPtrs))](std::atomic<bool>&) mutable {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_DownloadChunk");
+
+ FilteredDownloadedBytesPerSecond.Start();
+ DownloadBuildBlob(RemoteChunkIndex,
+ ExistsResult,
+ Work,
+ TotalRequestCount,
+ FilteredDownloadedBytesPerSecond,
+ [this,
+ &ExistsResult,
+ SequenceIndexChunksLeftToWriteCounters,
+ &WriteCache,
+ &Work,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ RemoteChunkIndex,
+ &FilteredWrittenBytesPerSecond,
+ ChunkTargetPtrs = std::move(ChunkTargetPtrs)](IoBuffer&& Payload) mutable {
+ AsyncWriteDownloadedChunk(RemoteChunkIndex,
+ ExistsResult,
+ std::move(ChunkTargetPtrs),
+ WriteCache,
+ Work,
+ std::move(Payload),
+ SequenceIndexChunksLeftToWriteCounters,
+ WritePartsComplete,
+ TotalPartWriteCount,
+ FilteredWrittenBytesPerSecond);
+ });
+ }
+ });
+ }
+ }
+}
+
+void
+BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkIndex,
+ const BlobsExistsResult& ExistsResult,
+ ParallelWork& Work,
+ uint64_t TotalRequestCount,
+ FilteredRate& FilteredDownloadedBytesPerSecond,
+ std::function<void(IoBuffer&& Payload)>&& OnDownloaded)
+{
+ const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+ // FilteredDownloadedBytesPerSecond.Start();
+ IoBuffer BuildBlob;
+ const bool ExistsInCache = m_Storage.CacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash);
+ if (ExistsInCache)
+ {
+ BuildBlob = m_Storage.CacheStorage->GetBuildBlob(m_BuildId, ChunkHash);
+ }
+ if (BuildBlob)
+ {
+ uint64_t BlobSize = BuildBlob.GetSize();
+ m_DownloadStats.DownloadedChunkCount++;
+ m_DownloadStats.DownloadedChunkByteCount += BlobSize;
+ if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ OnDownloaded(std::move(BuildBlob));
+ }
+ else
+ {
+ if (m_RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >= m_Options.LargeAttachmentSize)
+ {
+ DownloadLargeBlob(
+ *m_Storage.BuildStorage,
+ m_TempDownloadFolderPath,
+ m_BuildId,
+ ChunkHash,
+ m_Options.PreferredMultipartChunkSize,
+ Work,
+ m_NetworkPool,
+ m_DownloadStats.DownloadedChunkByteCount,
+ m_DownloadStats.MultipartAttachmentCount,
+ [this, &FilteredDownloadedBytesPerSecond, TotalRequestCount, OnDownloaded = std::move(OnDownloaded)](IoBuffer&& Payload) {
+ m_DownloadStats.DownloadedChunkCount++;
+ if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+
+ OnDownloaded(std::move(Payload));
+ });
+ }
+ else
+ {
+ try
+ {
+ BuildBlob = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, ChunkHash);
+ }
+ catch (const std::exception&)
+ {
+ // Silence http errors due to abort
+ if (!m_AbortFlag)
+ {
+ throw;
+ }
+ }
+ if (!m_AbortFlag)
+ {
+ if (!BuildBlob)
+ {
+ throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash));
+ }
+
+ if (!m_AbortFlag)
+ {
+ uint64_t BlobSize = BuildBlob.GetSize();
+ m_DownloadStats.DownloadedChunkCount++;
+ m_DownloadStats.DownloadedChunkByteCount += BlobSize;
+ if (m_DownloadStats.RequestsCompleteCount.fetch_add(1) + 1 == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+
+ OnDownloaded(std::move(BuildBlob));
+ }
+ }
+ }
+ }
+}
+
+void
+BuildsOperationUpdateFolder::DownloadPartialBlock(
+ std::span<const ChunkBlockAnalyser::BlockRangeDescriptor> BlockRanges,
+ size_t BlockRangeStartIndex,
+ size_t BlockRangeCount,
+ const BlobsExistsResult& ExistsResult,
+ uint64_t TotalRequestCount,
+ FilteredRate& FilteredDownloadedBytesPerSecond,
+ std::function<void(IoBuffer&& InMemoryBuffer,
+ const std::filesystem::path& OnDiskPath,
+ size_t BlockRangeStartIndex,
+ std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengths)>&& OnDownloaded)
+{
+ const uint32_t BlockIndex = BlockRanges[BlockRangeStartIndex].BlockIndex;
+
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
+
+ auto ProcessDownload = [this](
+ const ChunkBlockDescription& BlockDescription,
+ IoBuffer&& BlockRangeBuffer,
+ size_t BlockRangeStartIndex,
+ std::span<const std::pair<uint64_t, uint64_t>> BlockOffsetAndLengths,
+ uint64_t TotalRequestCount,
+ FilteredRate& FilteredDownloadedBytesPerSecond,
+ const std::function<void(IoBuffer && InMemoryBuffer,
+ const std::filesystem::path& OnDiskPath,
+ size_t BlockRangeStartIndex,
+ std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengths)>& OnDownloaded) {
+ uint64_t BlockRangeBufferSize = BlockRangeBuffer.GetSize();
+ m_DownloadStats.DownloadedBlockCount++;
+ m_DownloadStats.DownloadedBlockByteCount += BlockRangeBufferSize;
+ if (m_DownloadStats.RequestsCompleteCount.fetch_add(BlockOffsetAndLengths.size()) + BlockOffsetAndLengths.size() ==
+ TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+
+ IoHashStream RangeId;
+ for (const std::pair<uint64_t, uint64_t>& Range : BlockOffsetAndLengths)
+ {
+ RangeId.Append(&Range.first, sizeof(uint64_t));
+ RangeId.Append(&Range.second, sizeof(uint64_t));
+ }
+ std::filesystem::path BlockChunkPath =
+ TryMoveDownloadedChunk(BlockRangeBuffer,
+ m_TempBlockFolderPath / fmt::format("{}_{}", BlockDescription.BlockHash, RangeId.GetHash()),
+ /* ForceDiskBased */ BlockRangeBufferSize > m_Options.MaximumInMemoryPayloadSize);
+
+ if (!m_AbortFlag)
+ {
+ OnDownloaded(std::move(BlockRangeBuffer), std::move(BlockChunkPath), BlockRangeStartIndex, BlockOffsetAndLengths);
+ }
+ };
+
+ std::vector<std::pair<uint64_t, uint64_t>> Ranges;
+ Ranges.reserve(BlockRangeCount);
+ for (size_t BlockRangeIndex = BlockRangeStartIndex; BlockRangeIndex < BlockRangeStartIndex + BlockRangeCount; BlockRangeIndex++)
+ {
+ const ChunkBlockAnalyser::BlockRangeDescriptor& BlockRange = BlockRanges[BlockRangeIndex];
+ Ranges.push_back(std::make_pair(BlockRange.RangeStart, BlockRange.RangeLength));
+ }
+
+ const bool ExistsInCache = m_Storage.CacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash);
+
+ size_t SubBlockRangeCount = BlockRangeCount;
+ size_t SubRangeCountComplete = 0;
+ std::span<const std::pair<uint64_t, uint64_t>> RangesSpan(Ranges);
+ while (SubRangeCountComplete < SubBlockRangeCount)
+ {
+ if (m_AbortFlag)
+ {
+ break;
+ }
+
+ // First try to get subrange from cache.
+ // If not successful, try to get the ranges from the build store and adapt SubRangeCount...
+
+ size_t SubRangeStartIndex = BlockRangeStartIndex + SubRangeCountComplete;
+ if (ExistsInCache)
+ {
+ size_t SubRangeCount = Min(BlockRangeCount - SubRangeCountComplete, m_Storage.CacheHost.Caps.MaxRangeCountPerRequest);
+
+ if (SubRangeCount == 1)
+ {
+ // Legacy single-range path, prefer that for max compatibility
+
+ const std::pair<uint64_t, uint64_t> SubRange = RangesSpan[SubRangeCountComplete];
+ IoBuffer PayloadBuffer =
+ m_Storage.CacheStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash, SubRange.first, SubRange.second);
+ if (m_AbortFlag)
+ {
+ break;
+ }
+ if (PayloadBuffer)
+ {
+ ProcessDownload(BlockDescription,
+ std::move(PayloadBuffer),
+ SubRangeStartIndex,
+ std::vector<std::pair<uint64_t, uint64_t>>{std::make_pair(0u, SubRange.second)},
+ TotalRequestCount,
+ FilteredDownloadedBytesPerSecond,
+ OnDownloaded);
+ SubRangeCountComplete += SubRangeCount;
+ continue;
+ }
+ }
+ else
+ {
+ auto SubRanges = RangesSpan.subspan(SubRangeCountComplete, SubRangeCount);
+
+ BuildStorageCache::BuildBlobRanges RangeBuffers =
+ m_Storage.CacheStorage->GetBuildBlobRanges(m_BuildId, BlockDescription.BlockHash, SubRanges);
+ if (m_AbortFlag)
+ {
+ break;
+ }
+ if (RangeBuffers.PayloadBuffer)
+ {
+ if (RangeBuffers.Ranges.empty())
+ {
+ SubRangeCount = Ranges.size() - SubRangeCountComplete;
+ ProcessDownload(BlockDescription,
+ std::move(RangeBuffers.PayloadBuffer),
+ SubRangeStartIndex,
+ RangesSpan.subspan(SubRangeCountComplete, SubRangeCount),
+ TotalRequestCount,
+ FilteredDownloadedBytesPerSecond,
+ OnDownloaded);
+ SubRangeCountComplete += SubRangeCount;
+ continue;
+ }
+ else if (RangeBuffers.Ranges.size() == SubRangeCount)
+ {
+ ProcessDownload(BlockDescription,
+ std::move(RangeBuffers.PayloadBuffer),
+ SubRangeStartIndex,
+ RangeBuffers.Ranges,
+ TotalRequestCount,
+ FilteredDownloadedBytesPerSecond,
+ OnDownloaded);
+ SubRangeCountComplete += SubRangeCount;
+ continue;
+ }
+ }
+ }
+ }
+
+ size_t SubRangeCount = Min(BlockRangeCount - SubRangeCountComplete, m_Storage.BuildStorageHost.Caps.MaxRangeCountPerRequest);
+
+ auto SubRanges = RangesSpan.subspan(SubRangeCountComplete, SubRangeCount);
+
+ BuildStorageBase::BuildBlobRanges RangeBuffers;
+
+ try
+ {
+ RangeBuffers = m_Storage.BuildStorage->GetBuildBlobRanges(m_BuildId, BlockDescription.BlockHash, SubRanges);
+ }
+ catch (const std::exception&)
+ {
+ // Silence http errors due to abort
+ if (!m_AbortFlag)
+ {
+ throw;
+ }
+ }
+
+ if (!m_AbortFlag)
+ {
+ if (RangeBuffers.PayloadBuffer)
+ {
+ if (RangeBuffers.Ranges.empty())
+ {
+ // Jupiter will ignore the ranges and send the whole payload if it fetches the payload from S3
+ // Upload to cache (if enabled) and use the whole payload for the remaining ranges
+
+ const uint64_t Size = RangeBuffers.PayloadBuffer.GetSize();
+
+ const bool PopulateCache = !ExistsInCache && m_Storage.CacheStorage && m_Options.PopulateCache;
+
+ std::filesystem::path BlockPath =
+ TryMoveDownloadedChunk(RangeBuffers.PayloadBuffer,
+ m_TempBlockFolderPath / BlockDescription.BlockHash.ToHexString(),
+ /* ForceDiskBased */ PopulateCache || Size > m_Options.MaximumInMemoryPayloadSize);
+ if (!BlockPath.empty())
+ {
+ RangeBuffers.PayloadBuffer = IoBufferBuilder::MakeFromFile(BlockPath);
+ if (!RangeBuffers.PayloadBuffer)
+ {
+ throw std::runtime_error(
+ fmt::format("Failed to read block {} from temporary path '{}'", BlockDescription.BlockHash, BlockPath));
+ }
+ RangeBuffers.PayloadBuffer.SetDeleteOnClose(true);
+ }
+
+ if (PopulateCache)
+ {
+ m_Storage.CacheStorage->PutBuildBlob(m_BuildId,
+ BlockDescription.BlockHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(SharedBuffer(RangeBuffers.PayloadBuffer)));
+ }
+
+ if (m_AbortFlag)
+ {
+ break;
+ }
+
+ SubRangeCount = Ranges.size() - SubRangeCountComplete;
+ ProcessDownload(BlockDescription,
+ std::move(RangeBuffers.PayloadBuffer),
+ SubRangeStartIndex,
+ RangesSpan.subspan(SubRangeCountComplete, SubRangeCount),
+ TotalRequestCount,
+ FilteredDownloadedBytesPerSecond,
+ OnDownloaded);
+ }
+ else
+ {
+ if (RangeBuffers.Ranges.size() != SubRanges.size())
+ {
+ throw std::runtime_error(fmt::format("Fetching {} ranges from {} resulted in {} ranges",
+ SubRanges.size(),
+ BlockDescription.BlockHash,
+ RangeBuffers.Ranges.size()));
+ }
+ ProcessDownload(BlockDescription,
+ std::move(RangeBuffers.PayloadBuffer),
+ SubRangeStartIndex,
+ RangeBuffers.Ranges,
+ TotalRequestCount,
+ FilteredDownloadedBytesPerSecond,
+ OnDownloaded);
+ }
+ }
+ else
+ {
+ throw std::runtime_error(
+ fmt::format("Block {} is missing when fetching {} ranges", BlockDescription.BlockHash, SubRangeCount));
+ }
+
+ SubRangeCountComplete += SubRangeCount;
+ }
+ }
+}
+
+std::vector<uint32_t>
+BuildsOperationUpdateFolder::WriteLocalChunkToCache(CloneQueryInterface* CloneQuery,
+ const CopyChunkData& CopyData,
+ const std::vector<ChunkedFolderContent>& ScavengedContents,
+ const std::vector<ChunkedContentLookup>& ScavengedLookups,
+ const std::vector<std::filesystem::path>& ScavengedPaths,
+ BufferedWriteFileCache& WriteCache)
+{
+ ZEN_TRACE_CPU("WriteLocalChunkToCache");
+
+ std::filesystem::path SourceFilePath;
+
+ if (CopyData.ScavengeSourceIndex == (uint32_t)-1)
+ {
+ const uint32_t LocalPathIndex = m_LocalLookup.SequenceIndexFirstPathIndex[CopyData.SourceSequenceIndex];
+ SourceFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred();
+ }
+ else
+ {
+ const ChunkedFolderContent& ScavengedContent = ScavengedContents[CopyData.ScavengeSourceIndex];
+ const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[CopyData.ScavengeSourceIndex];
+ const std::filesystem::path ScavengedPath = ScavengedPaths[CopyData.ScavengeSourceIndex];
+ const uint32_t ScavengedPathIndex = ScavengedLookup.SequenceIndexFirstPathIndex[CopyData.SourceSequenceIndex];
+ SourceFilePath = (ScavengedPath / ScavengedContent.Paths[ScavengedPathIndex]).make_preferred();
+ }
+ ZEN_ASSERT_SLOW(IsFile(SourceFilePath));
+ ZEN_ASSERT(!CopyData.TargetChunkLocationPtrs.empty());
+
+ uint64_t CacheLocalFileBytesRead = 0;
+
+ size_t TargetStart = 0;
+ const std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> AllTargets(CopyData.TargetChunkLocationPtrs);
+
+ struct WriteOp
+ {
+ const ChunkedContentLookup::ChunkSequenceLocation* Target = nullptr;
+ uint64_t CacheFileOffset = (uint64_t)-1;
+ uint32_t ChunkIndex = (uint32_t)-1;
+ };
+
+ std::vector<WriteOp> WriteOps;
+
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Sort");
+ WriteOps.reserve(AllTargets.size());
+ for (const CopyChunkData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets)
+ {
+ std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> TargetRange =
+ AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount);
+ for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange)
+ {
+ WriteOps.push_back(
+ WriteOp{.Target = Target, .CacheFileOffset = ChunkTarget.CacheFileOffset, .ChunkIndex = ChunkTarget.RemoteChunkIndex});
+ }
+ TargetStart += ChunkTarget.TargetChunkLocationCount;
+ }
+
+ std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) {
+ if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
+ {
+ return true;
+ }
+ else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
+ {
+ return false;
+ }
+ if (Lhs.Target->Offset < Rhs.Target->Offset)
+ {
+ return true;
+ }
+ return false;
+ });
+ }
+
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Write");
+
+ tsl::robin_set<uint32_t> ChunkIndexesWritten;
+
+ BufferedOpenFile SourceFile(SourceFilePath,
+ m_DiskStats.OpenReadCount,
+ m_DiskStats.CurrentOpenFileCount,
+ m_DiskStats.ReadCount,
+ m_DiskStats.ReadByteCount);
+
+ bool CanCloneSource = CloneQuery && CloneQuery->CanClone(SourceFile.Handle());
+
+ BufferedWriteFileCache::Local LocalWriter(WriteCache);
+
+ for (size_t WriteOpIndex = 0; WriteOpIndex < WriteOps.size();)
+ {
+ if (m_AbortFlag)
+ {
+ break;
+ }
+ const WriteOp& Op = WriteOps[WriteOpIndex];
+
+ const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
+ const uint32_t RemotePathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex];
+ const uint64_t TargetSize = m_RemoteContent.RawSizes[RemotePathIndex];
+ const uint64_t ChunkSize = m_RemoteContent.ChunkedContent.ChunkRawSizes[Op.ChunkIndex];
+
+ uint64_t ReadLength = ChunkSize;
+ size_t WriteCount = 1;
+ uint64_t OpSourceEnd = Op.CacheFileOffset + ChunkSize;
+ uint64_t OpTargetEnd = Op.Target->Offset + ChunkSize;
+ while ((WriteOpIndex + WriteCount) < WriteOps.size())
+ {
+ const WriteOp& NextOp = WriteOps[WriteOpIndex + WriteCount];
+ if (NextOp.Target->SequenceIndex != Op.Target->SequenceIndex)
+ {
+ break;
+ }
+ if (NextOp.Target->Offset != OpTargetEnd)
+ {
+ break;
+ }
+ if (NextOp.CacheFileOffset != OpSourceEnd)
+ {
+ break;
+ }
+ const uint64_t NextChunkLength = m_RemoteContent.ChunkedContent.ChunkRawSizes[NextOp.ChunkIndex];
+ if (ReadLength + NextChunkLength > BufferedOpenFile::BlockSize)
+ {
+ break;
+ }
+ ReadLength += NextChunkLength;
+ OpSourceEnd += NextChunkLength;
+ OpTargetEnd += NextChunkLength;
+ WriteCount++;
+ }
+
+ {
+ bool DidClone = false;
+
+ if (CanCloneSource)
+ {
+ uint64_t PreBytes = 0;
+ uint64_t PostBytes = 0;
+ uint64_t ClonableBytes =
+ CloneQuery->GetClonableRange(Op.CacheFileOffset, Op.Target->Offset, ReadLength, PreBytes, PostBytes);
+ if (ClonableBytes > 0)
+ {
+ // We need to open the file...
+ BufferedWriteFileCache::Local::Writer* Writer = LocalWriter.GetWriter(RemoteSequenceIndex);
+ if (!Writer)
+ {
+ Writer = LocalWriter.PutWriter(RemoteSequenceIndex, std::make_unique<BufferedWriteFileCache::Local::Writer>());
+
+ Writer->File = std::make_unique<BasicFile>();
+
+ const std::filesystem::path FileName =
+ GetTempChunkedSequenceFileName(m_CacheFolderPath,
+ m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]);
+ Writer->File->Open(FileName, BasicFile::Mode::kWrite);
+ if (m_Options.UseSparseFiles)
+ {
+ PrepareFileForScatteredWrite(Writer->File->Handle(), TargetSize);
+ }
+ }
+ DidClone = CloneQuery->TryClone(SourceFile.Handle(),
+ Writer->File->Handle(),
+ Op.CacheFileOffset + PreBytes,
+ Op.Target->Offset + PreBytes,
+ ClonableBytes,
+ TargetSize);
+ if (DidClone)
+ {
+ m_DiskStats.WriteCount++;
+ m_DiskStats.WriteByteCount += ClonableBytes;
+
+ m_DiskStats.CloneCount++;
+ m_DiskStats.CloneByteCount += ClonableBytes;
+
+ m_WrittenChunkByteCount += ClonableBytes;
+
+ if (PreBytes > 0)
+ {
+ CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, PreBytes);
+ const uint64_t FileOffset = Op.Target->Offset;
+
+ WriteSequenceChunkToCache(LocalWriter, ChunkSource, RemoteSequenceIndex, FileOffset, RemotePathIndex);
+ }
+ if (PostBytes > 0)
+ {
+ CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset + ReadLength - PostBytes, PostBytes);
+ const uint64_t FileOffset = Op.Target->Offset + ReadLength - PostBytes;
+
+ WriteSequenceChunkToCache(LocalWriter, ChunkSource, RemoteSequenceIndex, FileOffset, RemotePathIndex);
+ }
+ }
+ }
+ }
+
+ if (!DidClone)
+ {
+ CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ReadLength);
+
+ const uint64_t FileOffset = Op.Target->Offset;
+
+ WriteSequenceChunkToCache(LocalWriter, ChunkSource, RemoteSequenceIndex, FileOffset, RemotePathIndex);
+ }
+ }
+
+ CacheLocalFileBytesRead += ReadLength; // TODO: This should be the sum of unique chunk sizes?
+
+ WriteOpIndex += WriteCount;
+ }
+ }
+
+ if (m_Options.IsVerbose)
+ {
+ ZEN_INFO("Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), SourceFilePath);
+ }
+
+ std::vector<uint32_t> Result;
+ Result.reserve(WriteOps.size());
+
+ for (const WriteOp& Op : WriteOps)
+ {
+ Result.push_back(Op.Target->SequenceIndex);
+ }
+ return Result;
+}
+
+bool
+BuildsOperationUpdateFolder::WriteCompressedChunkToCache(
+ const IoHash& ChunkHash,
+ const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs,
+ BufferedWriteFileCache& WriteCache,
+ IoBuffer&& CompressedPart)
+{
+ ZEN_TRACE_CPU("WriteCompressedChunkToCache");
+
+ auto ChunkHashToChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash);
+ ZEN_ASSERT(ChunkHashToChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end());
+ if (IsSingleFileChunk(m_RemoteContent, ChunkTargetPtrs))
+ {
+ const std::uint32_t SequenceIndex = ChunkTargetPtrs.front()->SequenceIndex;
+ const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex];
+ StreamDecompress(SequenceRawHash, CompositeBuffer(std::move(CompressedPart)));
+ return false;
+ }
+ else
+ {
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompositeBuffer(std::move(CompressedPart)), RawHash, RawSize);
+ if (!Compressed)
+ {
+ throw std::runtime_error(fmt::format("Failed to parse header of compressed large blob {}", ChunkHash));
+ }
+ if (RawHash != ChunkHash)
+ {
+ throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, ChunkHash));
+ }
+
+ BufferedWriteFileCache::Local LocalWriter(WriteCache);
+
+ IoHashStream Hash;
+ bool CouldDecompress = Compressed.DecompressToStream(
+ 0,
+ (uint64_t)-1,
+ [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
+ ZEN_UNUSED(SourceOffset);
+ ZEN_TRACE_CPU("Async_StreamDecompress_Write");
+ m_DiskStats.ReadByteCount += SourceSize;
+ if (!m_AbortFlag)
+ {
+ for (const ChunkedContentLookup::ChunkSequenceLocation* TargetPtr : ChunkTargetPtrs)
+ {
+ const auto& Target = *TargetPtr;
+ const uint64_t FileOffset = Target.Offset + Offset;
+ const uint32_t SequenceIndex = Target.SequenceIndex;
+ const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex];
+
+ WriteSequenceChunkToCache(LocalWriter, RangeBuffer, SequenceIndex, FileOffset, PathIndex);
+ }
+
+ return true;
+ }
+ return false;
+ });
+
+ if (m_AbortFlag)
+ {
+ return false;
+ }
+
+ if (!CouldDecompress)
+ {
+ throw std::runtime_error(fmt::format("Failed to decompress large chunk {}", ChunkHash));
+ }
+
+ return true;
+ }
+}
+
+void
+BuildsOperationUpdateFolder::StreamDecompress(const IoHash& SequenceRawHash, CompositeBuffer&& CompressedPart)
+{
+ ZEN_TRACE_CPU("StreamDecompress");
+ const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash);
+ TemporaryFile DecompressedTemp;
+ std::error_code Ec;
+ DecompressedTemp.CreateTemporary(TempChunkSequenceFileName.parent_path(), Ec);
+ if (Ec)
+ {
+ throw std::runtime_error(fmt::format("Failed creating temporary file for decompressing large blob {}, reason: ({}) {}",
+ SequenceRawHash,
+ Ec.value(),
+ Ec.message()));
+ }
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompressedPart, RawHash, RawSize);
+ if (!Compressed)
+ {
+ throw std::runtime_error(fmt::format("Failed to parse header of compressed large blob {}", SequenceRawHash));
+ }
+ if (RawHash != SequenceRawHash)
+ {
+ throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, SequenceRawHash));
+ }
+ PrepareFileForScatteredWrite(DecompressedTemp.Handle(), RawSize);
+
+ IoHashStream Hash;
+ bool CouldDecompress =
+ Compressed.DecompressToStream(0,
+ (uint64_t)-1,
+ [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
+ ZEN_UNUSED(SourceOffset);
+ ZEN_TRACE_CPU("StreamDecompress_Write");
+ m_DiskStats.ReadCount++;
+ m_DiskStats.ReadByteCount += SourceSize;
+ if (!m_AbortFlag)
+ {
+ for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
+ {
+ if (m_Options.ValidateCompletedSequences)
+ {
+ Hash.Append(Segment.GetView());
+ m_ValidatedChunkByteCount += Segment.GetSize();
+ }
+ DecompressedTemp.Write(Segment, Offset);
+ Offset += Segment.GetSize();
+ m_DiskStats.WriteByteCount += Segment.GetSize();
+ m_DiskStats.WriteCount++;
+ m_WrittenChunkByteCount += Segment.GetSize();
+ }
+ return true;
+ }
+ return false;
+ });
+
+ if (m_AbortFlag)
+ {
+ return;
+ }
+
+ if (!CouldDecompress)
+ {
+ throw std::runtime_error(fmt::format("Failed to decompress large blob {}", SequenceRawHash));
+ }
+ if (m_Options.ValidateCompletedSequences)
+ {
+ const IoHash VerifyHash = Hash.GetHash();
+ if (VerifyHash != SequenceRawHash)
+ {
+ throw std::runtime_error(
+ fmt::format("Decompressed blob payload hash {} does not match expected hash {}", VerifyHash, SequenceRawHash));
+ }
+ }
+ DecompressedTemp.MoveTemporaryIntoPlace(TempChunkSequenceFileName, Ec);
+ if (Ec)
+ {
+ throw std::runtime_error(fmt::format("Failed moving temporary file for decompressing large blob {}, reason: ({}) {}",
+ SequenceRawHash,
+ Ec.value(),
+ Ec.message()));
+ }
+ // WriteChunkStats.ChunkCountWritten++;
+}
+
+void
+BuildsOperationUpdateFolder::WriteSequenceChunkToCache(BufferedWriteFileCache::Local& LocalWriter,
+ const CompositeBuffer& Chunk,
+ const uint32_t SequenceIndex,
+ const uint64_t FileOffset,
+ const uint32_t PathIndex)
+{
+ ZEN_TRACE_CPU("WriteSequenceChunkToCache");
+
+ const uint64_t SequenceSize = m_RemoteContent.RawSizes[PathIndex];
+
+ auto OpenFile = [&](BasicFile& File) {
+ const std::filesystem::path FileName =
+ GetTempChunkedSequenceFileName(m_CacheFolderPath, m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
+ File.Open(FileName, BasicFile::Mode::kWrite);
+ if (m_Options.UseSparseFiles)
+ {
+ PrepareFileForScatteredWrite(File.Handle(), SequenceSize);
+ }
+ };
+
+ const uint64_t ChunkSize = Chunk.GetSize();
+ ZEN_ASSERT(FileOffset + ChunkSize <= SequenceSize);
+ if (ChunkSize == SequenceSize)
+ {
+ BasicFile SingleChunkFile;
+ OpenFile(SingleChunkFile);
+
+ m_DiskStats.CurrentOpenFileCount++;
+ auto _ = MakeGuard([this]() { m_DiskStats.CurrentOpenFileCount--; });
+ SingleChunkFile.Write(Chunk, FileOffset);
+ }
+ else
+ {
+ const uint64_t MaxWriterBufferSize = 256u * 1025u;
+
+ BufferedWriteFileCache::Local::Writer* Writer = LocalWriter.GetWriter(SequenceIndex);
+ if (Writer)
+ {
+ if ((!Writer->Writer) && (ChunkSize < MaxWriterBufferSize))
+ {
+ Writer->Writer = std::make_unique<BasicFileWriter>(*Writer->File, Min(SequenceSize, MaxWriterBufferSize));
+ }
+ Writer->Write(Chunk, FileOffset);
+ }
+ else
+ {
+ Writer = LocalWriter.PutWriter(SequenceIndex, std::make_unique<BufferedWriteFileCache::Local::Writer>());
+
+ Writer->File = std::make_unique<BasicFile>();
+ OpenFile(*Writer->File);
+ if (ChunkSize < MaxWriterBufferSize)
+ {
+ Writer->Writer = std::make_unique<BasicFileWriter>(*Writer->File, Min(SequenceSize, MaxWriterBufferSize));
+ }
+ Writer->Write(Chunk, FileOffset);
+ }
+ }
+ m_DiskStats.WriteCount++;
+ m_DiskStats.WriteByteCount += ChunkSize;
+ m_WrittenChunkByteCount += ChunkSize;
+}
+
+bool
+BuildsOperationUpdateFolder::GetBlockWriteOps(const IoHash& BlockRawHash,
+ std::span<const IoHash> ChunkRawHashes,
+ std::span<const uint32_t> ChunkCompressedLengths,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
+ const MemoryView BlockView,
+ uint32_t FirstIncludedBlockChunkIndex,
+ uint32_t LastIncludedBlockChunkIndex,
+ BlockWriteOps& OutOps)
+{
+ ZEN_TRACE_CPU("GetBlockWriteOps");
+
+ uint32_t OffsetInBlock = 0;
+ for (uint32_t ChunkBlockIndex = FirstIncludedBlockChunkIndex; ChunkBlockIndex <= LastIncludedBlockChunkIndex; ChunkBlockIndex++)
+ {
+ const uint32_t ChunkCompressedSize = ChunkCompressedLengths[ChunkBlockIndex];
+ const IoHash& ChunkHash = ChunkRawHashes[ChunkBlockIndex];
+ if (auto It = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); It != m_RemoteLookup.ChunkHashToChunkIndex.end())
+ {
+ const uint32_t ChunkIndex = It->second;
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
+ GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, ChunkIndex);
+
+ if (!ChunkTargetPtrs.empty())
+ {
+ bool NeedsWrite = true;
+ if (RemoteChunkIndexNeedsCopyFromSourceFlags[ChunkIndex].compare_exchange_strong(NeedsWrite, false))
+ {
+ MemoryView ChunkMemoryView = BlockView.Mid(OffsetInBlock, ChunkCompressedSize);
+ IoHash VerifyChunkHash;
+ uint64_t VerifyChunkSize;
+ CompressedBuffer CompressedChunk =
+ CompressedBuffer::FromCompressed(SharedBuffer::MakeView(ChunkMemoryView), VerifyChunkHash, VerifyChunkSize);
+ if (!CompressedChunk)
+ {
+ throw std::runtime_error(fmt::format("Chunk {} at {}, size {} in block {} is not a valid compressed buffer",
+ ChunkHash,
+ OffsetInBlock,
+ ChunkCompressedSize,
+ BlockRawHash));
+ }
+ if (VerifyChunkHash != ChunkHash)
+ {
+ throw std::runtime_error(fmt::format("Chunk {} at {}, size {} in block {} has a mismatching content hash {}",
+ ChunkHash,
+ OffsetInBlock,
+ ChunkCompressedSize,
+ BlockRawHash,
+ VerifyChunkHash));
+ }
+ if (VerifyChunkSize != m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex])
+ {
+ throw std::runtime_error(
+ fmt::format("Chunk {} at {}, size {} in block {} has a mismatching raw size {}, expected {}",
+ ChunkHash,
+ OffsetInBlock,
+ ChunkCompressedSize,
+ BlockRawHash,
+ VerifyChunkSize,
+ m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]));
+ }
+
+ OodleCompressor ChunkCompressor;
+ OodleCompressionLevel ChunkCompressionLevel;
+ uint64_t ChunkBlockSize;
+
+ bool GetCompressParametersSuccess =
+ CompressedChunk.TryGetCompressParameters(ChunkCompressor, ChunkCompressionLevel, ChunkBlockSize);
+ ZEN_ASSERT(GetCompressParametersSuccess);
+
+ IoBuffer Decompressed;
+ if (ChunkCompressionLevel == OodleCompressionLevel::None)
+ {
+ MemoryView ChunkDecompressedMemoryView = ChunkMemoryView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder());
+ Decompressed =
+ IoBuffer(IoBuffer::Wrap, ChunkDecompressedMemoryView.GetData(), ChunkDecompressedMemoryView.GetSize());
+ }
+ else
+ {
+ Decompressed = CompressedChunk.Decompress().AsIoBuffer();
+ }
+
+ if (Decompressed.GetSize() != m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex])
+ {
+ throw std::runtime_error(fmt::format("Chunk {} at {}, size {} in block {} decompressed to size {}, expected {}",
+ ChunkHash,
+ OffsetInBlock,
+ ChunkCompressedSize,
+ BlockRawHash,
+ Decompressed.GetSize(),
+ m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]));
+ }
+
+ ZEN_ASSERT_SLOW(ChunkHash == IoHash::HashBuffer(Decompressed));
+ for (const ChunkedContentLookup::ChunkSequenceLocation* Target : ChunkTargetPtrs)
+ {
+ OutOps.WriteOps.push_back(
+ BlockWriteOps::WriteOpData{.Target = Target, .ChunkBufferIndex = OutOps.ChunkBuffers.size()});
+ }
+ OutOps.ChunkBuffers.emplace_back(std::move(Decompressed));
+ }
+ }
+ }
+
+ OffsetInBlock += ChunkCompressedSize;
+ }
+ {
+ ZEN_TRACE_CPU("Sort");
+ std::sort(OutOps.WriteOps.begin(),
+ OutOps.WriteOps.end(),
+ [](const BlockWriteOps::WriteOpData& Lhs, const BlockWriteOps::WriteOpData& Rhs) {
+ if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
+ {
+ return true;
+ }
+ if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
+ {
+ return false;
+ }
+ return Lhs.Target->Offset < Rhs.Target->Offset;
+ });
+ }
+ return true;
+}
+
+void
+BuildsOperationUpdateFolder::WriteBlockChunkOpsToCache(std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ const BlockWriteOps& Ops,
+ BufferedWriteFileCache& WriteCache,
+ ParallelWork& Work)
+{
+ ZEN_TRACE_CPU("WriteBlockChunkOpsToCache");
+
+ {
+ BufferedWriteFileCache::Local LocalWriter(WriteCache);
+ for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps)
+ {
+ if (Work.IsAborted())
+ {
+ break;
+ }
+ const CompositeBuffer& Chunk = Ops.ChunkBuffers[WriteOp.ChunkBufferIndex];
+ const uint32_t SequenceIndex = WriteOp.Target->SequenceIndex;
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() <=
+ m_RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]);
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() > 0);
+ const uint64_t FileOffset = WriteOp.Target->Offset;
+ const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex];
+
+ WriteSequenceChunkToCache(LocalWriter, Chunk, SequenceIndex, FileOffset, PathIndex);
+ }
+ }
+ if (!Work.IsAborted())
+ {
+ // Write tracking, updating this must be done without any files open (BufferedWriteFileCache::Local)
+ std::vector<uint32_t> CompletedChunkSequences;
+ for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps)
+ {
+ const uint32_t RemoteSequenceIndex = WriteOp.Target->SequenceIndex;
+ if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters))
+ {
+ CompletedChunkSequences.push_back(RemoteSequenceIndex);
+ }
+ }
+ WriteCache.Close(CompletedChunkSequences);
+ VerifyAndCompleteChunkSequencesAsync(CompletedChunkSequences, Work);
+ }
+}
+
+bool
+BuildsOperationUpdateFolder::WriteChunksBlockToCache(const ChunkBlockDescription& BlockDescription,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ ParallelWork& Work,
+ CompositeBuffer&& BlockBuffer,
+ std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
+ BufferedWriteFileCache& WriteCache)
+{
+ ZEN_TRACE_CPU("WriteChunksBlockToCache");
+
+ IoBuffer BlockMemoryBuffer = MakeBufferMemoryBased(BlockBuffer);
+ const MemoryView BlockView = BlockMemoryBuffer.GetView();
+
+ BlockWriteOps Ops;
+ if ((BlockDescription.HeaderSize == 0) || BlockDescription.ChunkCompressedLengths.empty())
+ {
+ ZEN_TRACE_CPU("WriteChunksBlockToCache_Legacy");
+
+ uint64_t HeaderSize;
+ const std::vector<uint32_t> ChunkCompressedLengths =
+ ReadChunkBlockHeader(BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder()), HeaderSize);
+
+ if (GetBlockWriteOps(BlockDescription.BlockHash,
+ BlockDescription.ChunkRawHashes,
+ ChunkCompressedLengths,
+ SequenceIndexChunksLeftToWriteCounters,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder() + HeaderSize),
+ 0,
+ gsl::narrow<uint32_t>(BlockDescription.ChunkRawHashes.size() - 1),
+ Ops))
+ {
+ WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work);
+ return true;
+ }
+ return false;
+ }
+
+ if (GetBlockWriteOps(BlockDescription.BlockHash,
+ BlockDescription.ChunkRawHashes,
+ BlockDescription.ChunkCompressedLengths,
+ SequenceIndexChunksLeftToWriteCounters,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize),
+ 0,
+ gsl::narrow<uint32_t>(BlockDescription.ChunkRawHashes.size() - 1),
+ Ops))
+ {
+ WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work);
+ return true;
+ }
+ return false;
+}
+
+bool
+BuildsOperationUpdateFolder::WritePartialBlockChunksToCache(const ChunkBlockDescription& BlockDescription,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ ParallelWork& Work,
+ CompositeBuffer&& PartialBlockBuffer,
+ uint32_t FirstIncludedBlockChunkIndex,
+ uint32_t LastIncludedBlockChunkIndex,
+ std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
+ BufferedWriteFileCache& WriteCache)
+{
+ ZEN_TRACE_CPU("WritePartialBlockChunksToCache");
+
+ IoBuffer BlockMemoryBuffer = MakeBufferMemoryBased(PartialBlockBuffer);
+ const MemoryView BlockView = BlockMemoryBuffer.GetView();
+
+ BlockWriteOps Ops;
+ if (GetBlockWriteOps(BlockDescription.BlockHash,
+ BlockDescription.ChunkRawHashes,
+ BlockDescription.ChunkCompressedLengths,
+ SequenceIndexChunksLeftToWriteCounters,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ BlockView,
+ FirstIncludedBlockChunkIndex,
+ LastIncludedBlockChunkIndex,
+ Ops))
+ {
+ WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+}
+
+void
+BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(uint32_t RemoteChunkIndex,
+ const BlobsExistsResult& ExistsResult,
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>&& ChunkTargetPtrs,
+ BufferedWriteFileCache& WriteCache,
+ ParallelWork& Work,
+ IoBuffer&& Payload,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ std::atomic<uint64_t>& WritePartsComplete,
+ const uint64_t TotalPartWriteCount,
+ FilteredRate& FilteredWrittenBytesPerSecond)
+{
+ ZEN_TRACE_CPU("AsyncWriteDownloadedChunk");
+
+ const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+
+ const uint64_t Size = Payload.GetSize();
+
+ const bool ExistsInCache = m_Storage.CacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash);
+
+ const bool PopulateCache = !ExistsInCache && m_Storage.CacheStorage && m_Options.PopulateCache;
+
+ std::filesystem::path CompressedChunkPath =
+ TryMoveDownloadedChunk(Payload,
+ m_TempDownloadFolderPath / ChunkHash.ToHexString(),
+ /* ForceDiskBased */ PopulateCache || Size > m_Options.MaximumInMemoryPayloadSize);
+ if (PopulateCache)
+ {
+ IoBuffer CacheBlob = IoBufferBuilder::MakeFromFile(CompressedChunkPath);
+ if (CacheBlob)
+ {
+ m_Storage.CacheStorage->PutBuildBlob(m_BuildId,
+ ChunkHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(SharedBuffer(CacheBlob)));
+ }
+ }
+
+ IoBufferFileReference FileRef;
+ bool EnableBacklog = !CompressedChunkPath.empty() || Payload.GetFileReference(FileRef);
+
+ Work.ScheduleWork(
+ m_IOWorkerPool,
+ [this,
+ SequenceIndexChunksLeftToWriteCounters,
+ &Work,
+ CompressedChunkPath,
+ RemoteChunkIndex,
+ TotalPartWriteCount,
+ &WriteCache,
+ &WritePartsComplete,
+ &FilteredWrittenBytesPerSecond,
+ ChunkTargetPtrs = std::move(ChunkTargetPtrs),
+ CompressedPart = IoBuffer(std::move(Payload))](std::atomic<bool>&) mutable {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_WriteChunk");
+
+ FilteredWrittenBytesPerSecond.Start();
+
+ const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+ if (CompressedChunkPath.empty())
+ {
+ ZEN_ASSERT(CompressedPart);
+ }
+ else
+ {
+ ZEN_ASSERT(!CompressedPart);
+ CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath);
+ if (!CompressedPart)
+ {
+ throw std::runtime_error(
+ fmt::format("Could not open dowloaded compressed chunk {} from {}", ChunkHash, CompressedChunkPath));
+ }
+ }
+
+ bool NeedHashVerify = WriteCompressedChunkToCache(ChunkHash, ChunkTargetPtrs, WriteCache, std::move(CompressedPart));
+ if (!m_AbortFlag)
+ {
+ if (WritePartsComplete.fetch_add(1) + 1 == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
+
+ if (!CompressedChunkPath.empty())
+ {
+ std::error_code Ec = TryRemoveFile(CompressedChunkPath);
+ if (Ec)
+ {
+ ZEN_DEBUG("Failed removing file '{}', reason: ({}) {}", CompressedChunkPath, Ec.value(), Ec.message());
+ }
+ }
+
+ std::vector<uint32_t> CompletedSequences =
+ CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters);
+ WriteCache.Close(CompletedSequences);
+ if (NeedHashVerify)
+ {
+ VerifyAndCompleteChunkSequencesAsync(CompletedSequences, Work);
+ }
+ else
+ {
+ FinalizeChunkSequences(CompletedSequences);
+ }
+ }
+ }
+ },
+ EnableBacklog ? WorkerThreadPool::EMode::EnableBacklog : WorkerThreadPool::EMode::DisableBacklog);
+}
+
+void
+BuildsOperationUpdateFolder::VerifyAndCompleteChunkSequencesAsync(std::span<const uint32_t> RemoteSequenceIndexes, ParallelWork& Work)
+{
+ if (RemoteSequenceIndexes.empty())
+ {
+ return;
+ }
+ ZEN_TRACE_CPU("VerifyAndCompleteChunkSequence");
+ if (m_Options.ValidateCompletedSequences)
+ {
+ for (uint32_t RemoteSequenceIndexOffset = 1; RemoteSequenceIndexOffset < RemoteSequenceIndexes.size(); RemoteSequenceIndexOffset++)
+ {
+ const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset];
+ Work.ScheduleWork(m_IOWorkerPool, [this, RemoteSequenceIndex](std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_VerifyAndFinalizeSequence");
+
+ VerifySequence(RemoteSequenceIndex);
+ if (!m_AbortFlag)
+ {
+ const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ FinalizeChunkSequence(SequenceRawHash);
+ }
+ }
+ });
+ }
+ const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[0];
+
+ VerifySequence(RemoteSequenceIndex);
+ const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ FinalizeChunkSequence(SequenceRawHash);
+ }
+ else
+ {
+ for (uint32_t RemoteSequenceIndexOffset = 0; RemoteSequenceIndexOffset < RemoteSequenceIndexes.size(); RemoteSequenceIndexOffset++)
+ {
+ const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset];
+ const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ FinalizeChunkSequence(SequenceRawHash);
+ }
+ }
+}
+
+bool
+BuildsOperationUpdateFolder::CompleteSequenceChunk(uint32_t RemoteSequenceIndex,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters)
+{
+ uint32_t PreviousValue = SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1);
+ ZEN_ASSERT(PreviousValue >= 1);
+ ZEN_ASSERT(PreviousValue != (uint32_t)-1);
+ return PreviousValue == 1;
+}
+
+std::vector<uint32_t>
+BuildsOperationUpdateFolder::CompleteChunkTargets(const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters)
+{
+ ZEN_TRACE_CPU("CompleteChunkTargets");
+
+ std::vector<uint32_t> CompletedSequenceIndexes;
+ for (const ChunkedContentLookup::ChunkSequenceLocation* Location : ChunkTargetPtrs)
+ {
+ const uint32_t RemoteSequenceIndex = Location->SequenceIndex;
+ if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters))
+ {
+ CompletedSequenceIndexes.push_back(RemoteSequenceIndex);
+ }
+ }
+ return CompletedSequenceIndexes;
+}
+
+void
+BuildsOperationUpdateFolder::FinalizeChunkSequence(const IoHash& SequenceRawHash)
+{
+ ZEN_TRACE_CPU("FinalizeChunkSequence");
+
+ ZEN_ASSERT_SLOW(!IsFile(GetFinalChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash)));
+ std::error_code Ec;
+ RenameFile(GetTempChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash),
+ GetFinalChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash),
+ Ec);
+ if (Ec)
+ {
+ throw std::system_error(Ec);
+ }
+}
+
+void
+BuildsOperationUpdateFolder::FinalizeChunkSequences(std::span<const uint32_t> RemoteSequenceIndexes)
+{
+ ZEN_TRACE_CPU("FinalizeChunkSequences");
+
+ for (uint32_t SequenceIndex : RemoteSequenceIndexes)
+ {
+ FinalizeChunkSequence(m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
+ }
+}
+
+void
+BuildsOperationUpdateFolder::VerifySequence(uint32_t RemoteSequenceIndex)
+{
+ ZEN_TRACE_CPU("VerifySequence");
+
+ ZEN_ASSERT(m_Options.ValidateCompletedSequences);
+
+ const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ {
+ ZEN_TRACE_CPU("HashSequence");
+ const std::uint32_t RemotePathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex];
+ const uint64_t ExpectedSize = m_RemoteContent.RawSizes[RemotePathIndex];
+ IoBuffer VerifyBuffer = IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash));
+ const uint64_t VerifySize = VerifyBuffer.GetSize();
+ if (VerifySize != ExpectedSize)
+ {
+ throw std::runtime_error(fmt::format("Written chunk sequence {} size {} does not match expected size {}",
+ SequenceRawHash,
+ VerifySize,
+ ExpectedSize));
+ }
+
+ const IoHash VerifyChunkHash = IoHash::HashBuffer(std::move(VerifyBuffer), &m_ValidatedChunkByteCount);
+ if (VerifyChunkHash != SequenceRawHash)
+ {
+ throw std::runtime_error(
+ fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash));
+ }
+ }
+}
+
+void
+VerifyFolder(ProgressBase& Progress,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag,
+ TransferThreadWorkers& Workers,
+ const ChunkedFolderContent& Content,
+ const ChunkedContentLookup& Lookup,
+ const std::filesystem::path& Path,
+ const std::vector<std::string>& ExcludeFolders,
+ bool VerifyFileHash,
+ VerifyFolderStatistics& VerifyFolderStats)
+{
+ ZEN_TRACE_CPU("VerifyFolder");
+
+ Stopwatch Timer;
+
+ std::unique_ptr<ProgressBase::ProgressBar> ProgressBar = Progress.CreateProgressBar("Verify Files");
+
+ WorkerThreadPool& VerifyPool = Workers.GetIOWorkerPool();
+
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ const uint32_t PathCount = gsl::narrow<uint32_t>(Content.Paths.size());
+
+ RwLock ErrorLock;
+ std::vector<std::string> Errors;
+
+ auto IsAcceptedFolder = [ExcludeFolders = ExcludeFolders](const std::string_view& RelativePath) -> bool {
+ for (const std::string& ExcludeFolder : ExcludeFolders)
+ {
+ if (RelativePath.starts_with(ExcludeFolder))
+ {
+ if (RelativePath.length() == ExcludeFolder.length())
+ {
+ return false;
+ }
+ else if (RelativePath[ExcludeFolder.length()] == '/')
+ {
+ return false;
+ }
+ }
+ }
+ return true;
+ };
+
+ for (uint32_t PathIndex = 0; PathIndex < PathCount; PathIndex++)
+ {
+ if (Work.IsAborted())
+ {
+ break;
+ }
+
+ Work.ScheduleWork(
+ VerifyPool,
+ [&Path, &Content, &Lookup, &ErrorLock, &Errors, &VerifyFolderStats, VerifyFileHash, &IsAcceptedFolder, PathIndex, &AbortFlag](
+ std::atomic<bool>&) {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("VerifyFile_work");
+
+ // TODO: Convert ScheduleWork body to function
+
+ const std::filesystem::path TargetPath = (Path / Content.Paths[PathIndex]).make_preferred();
+ if (IsAcceptedFolder(TargetPath.parent_path().generic_string()))
+ {
+ const uint64_t ExpectedSize = Content.RawSizes[PathIndex];
+ if (!IsFile(TargetPath))
+ {
+ ErrorLock.WithExclusiveLock([&]() {
+ Errors.push_back(fmt::format("File {} with expected size {} does not exist", TargetPath, ExpectedSize));
+ });
+ VerifyFolderStats.FilesFailed++;
+ }
+ else
+ {
+ std::error_code Ec;
+ uint64_t SizeOnDisk = gsl::narrow<uint64_t>(FileSizeFromPath(TargetPath, Ec));
+ if (Ec)
+ {
+ ErrorLock.WithExclusiveLock([&]() {
+ Errors.push_back(
+ fmt::format("Failed to get size of file {}: {} ({})", TargetPath, Ec.message(), Ec.value()));
+ });
+ VerifyFolderStats.FilesFailed++;
+ }
+ else if (SizeOnDisk < ExpectedSize)
+ {
+ ErrorLock.WithExclusiveLock([&]() {
+ Errors.push_back(fmt::format("Size of file {} is smaller than expected. Expected: {}, Found: {}",
+ TargetPath,
+ ExpectedSize,
+ SizeOnDisk));
+ });
+ VerifyFolderStats.FilesFailed++;
+ }
+ else if (SizeOnDisk > ExpectedSize)
+ {
+ ErrorLock.WithExclusiveLock([&]() {
+ Errors.push_back(fmt::format("Size of file {} is bigger than expected. Expected: {}, Found: {}",
+ TargetPath,
+ ExpectedSize,
+ SizeOnDisk));
+ });
+ VerifyFolderStats.FilesFailed++;
+ }
+ else if (SizeOnDisk > 0 && VerifyFileHash)
+ {
+ const IoHash& ExpectedRawHash = Content.RawHashes[PathIndex];
+ IoBuffer Buffer = IoBufferBuilder::MakeFromFile(TargetPath);
+ IoHash RawHash = IoHash::HashBuffer(Buffer);
+ if (RawHash != ExpectedRawHash)
+ {
+ uint64_t FileOffset = 0;
+ const uint32_t SequenceIndex = Lookup.RawHashToSequenceIndex.at(ExpectedRawHash);
+ const uint32_t OrderOffset = Lookup.SequenceIndexChunkOrderOffset[SequenceIndex];
+ for (uint32_t OrderIndex = OrderOffset;
+ OrderIndex < OrderOffset + Content.ChunkedContent.ChunkCounts[SequenceIndex];
+ OrderIndex++)
+ {
+ uint32_t ChunkIndex = Content.ChunkedContent.ChunkOrders[OrderIndex];
+ uint64_t ChunkSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex];
+ IoHash ChunkHash = Content.ChunkedContent.ChunkHashes[ChunkIndex];
+ IoBuffer FileChunk = IoBuffer(Buffer, FileOffset, ChunkSize);
+ if (IoHash::HashBuffer(FileChunk) != ChunkHash)
+ {
+ ErrorLock.WithExclusiveLock([&]() {
+ Errors.push_back(fmt::format(
+ "WARNING: Hash of file {} does not match expected hash. Expected: {}, Found: {}. "
+ "Mismatch at chunk {}",
+ TargetPath,
+ ExpectedRawHash,
+ RawHash,
+ OrderIndex - OrderOffset));
+ });
+ break;
+ }
+ FileOffset += ChunkSize;
+ }
+ VerifyFolderStats.FilesFailed++;
+ }
+ VerifyFolderStats.ReadBytes += SizeOnDisk;
+ }
+ }
+ }
+ VerifyFolderStats.FilesVerified++;
+ }
+ },
+ [&, PathIndex](std::exception_ptr Ex, std::atomic<bool>&) {
+ std::string Description;
+ try
+ {
+ std::rethrow_exception(Ex);
+ }
+ catch (const std::exception& Ex)
+ {
+ Description = Ex.what();
+ }
+ ErrorLock.WithExclusiveLock([&]() {
+ Errors.push_back(fmt::format("Failed verifying file '{}'. Reason: {}",
+ (Path / Content.Paths[PathIndex]).make_preferred(),
+ Description));
+ });
+ VerifyFolderStats.FilesFailed++;
+ });
+ }
+
+ Work.Wait(Progress.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
+ std::string Details = fmt::format("Verified {}/{} ({}). Failed files: {}",
+ VerifyFolderStats.FilesVerified.load(),
+ PathCount,
+ NiceBytes(VerifyFolderStats.ReadBytes.load()),
+ VerifyFolderStats.FilesFailed.load());
+ ProgressBar->UpdateState({.Task = "Verifying files ",
+ .Details = Details,
+ .TotalCount = gsl::narrow<uint64_t>(PathCount),
+ .RemainingCount = gsl::narrow<uint64_t>(PathCount - VerifyFolderStats.FilesVerified.load()),
+ .Status = ProgressBase::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ });
+ VerifyFolderStats.VerifyElapsedWallTimeUs = Timer.GetElapsedTimeUs();
+
+ ProgressBar->Finish();
+ if (AbortFlag)
+ {
+ return;
+ }
+
+ for (const std::string& Error : Errors)
+ {
+ ZEN_CONSOLE_ERROR("{}", Error);
+ }
+ if (!Errors.empty())
+ {
+ throw std::runtime_error(fmt::format("Verify failed with {} errors", Errors.size()));
+ }
+}
+
+std::vector<std::filesystem::path>
+GetNewPaths(const std::span<const std::filesystem::path> KnownPaths, const std::span<const std::filesystem::path> Paths)
+{
+ tsl::robin_set<std::string> KnownPathsSet;
+ KnownPathsSet.reserve(KnownPaths.size());
+ for (const std::filesystem::path& LocalPath : KnownPaths)
+ {
+ KnownPathsSet.insert(LocalPath.generic_string());
+ }
+
+ std::vector<std::filesystem::path> NewPaths;
+ for (const std::filesystem::path& UntrackedPath : Paths)
+ {
+ if (!KnownPathsSet.contains(UntrackedPath.generic_string()))
+ {
+ NewPaths.push_back(UntrackedPath);
+ }
+ }
+ return NewPaths;
+}
+
+BuildSaveState
+GetLocalStateFromPaths(ProgressBase& Progress,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag,
+ TransferThreadWorkers& Workers,
+ GetFolderContentStatistics& LocalFolderScanStats,
+ ChunkingStatistics& ChunkingStats,
+ const std::filesystem::path& Path,
+ ChunkingController& ChunkController,
+ ChunkingCache& ChunkCache,
+ std::span<const std::filesystem::path> PathsToCheck)
+{
+ FolderContent FolderState =
+ CheckFolderFiles(Progress, AbortFlag, PauseFlag, "Check Files", Workers, LocalFolderScanStats, Path, PathsToCheck);
+
+ ChunkedFolderContent ChunkedContent;
+ if (FolderState.Paths.size() > 0)
+ {
+ ChunkedContent = ScanFolderFiles(Progress,
+ AbortFlag,
+ PauseFlag,
+ "Scan Files",
+ Workers,
+ Path,
+ FolderState,
+ ChunkController,
+ ChunkCache,
+ ChunkingStats);
+ }
+
+ return BuildSaveState{.State = BuildState{.ChunkedContent = std::move(ChunkedContent)}, .FolderState = FolderState, .LocalPath = Path};
+}
+
+BuildSaveState
+GetLocalContent(ProgressBase& Progress,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag,
+ bool IsQuiet,
+ TransferThreadWorkers& Workers,
+ GetFolderContentStatistics& LocalFolderScanStats,
+ ChunkingStatistics& ChunkingStats,
+ const std::filesystem::path& Path,
+ const std::filesystem::path& StateFilePath,
+ ChunkingController& ChunkController,
+ ChunkingCache& ChunkCache)
+{
+ Stopwatch ReadStateTimer;
+ bool FileExists = IsFile(StateFilePath);
+ if (!FileExists)
+ {
+ ZEN_CONSOLE("No known local state file in {}, falling back to scanning", Path);
+ return {};
+ }
+
+ BuildSaveState SavedLocalState;
+ try
+ {
+ SavedLocalState = ReadBuildSaveStateFile(StateFilePath);
+ if (!IsQuiet)
+ {
+ ZEN_CONSOLE("Read local state file {} in {}", StateFilePath, NiceTimeSpanMs(ReadStateTimer.GetElapsedTimeMs()));
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_CONSOLE_WARN("Failed reading state file {}, falling back to scannning. Reason: {}", StateFilePath, Ex.what());
+ return {};
+ }
+
+ FolderContent CurrentLocalFolderState = CheckFolderFiles(Progress,
+ AbortFlag,
+ PauseFlag,
+ "Check Known Files",
+ Workers,
+ LocalFolderScanStats,
+ Path,
+ SavedLocalState.FolderState.Paths);
+ if (AbortFlag)
+ {
+ return {};
+ }
+
+ if (!SavedLocalState.FolderState.AreKnownFilesEqual(CurrentLocalFolderState))
+ {
+ const size_t LocalStatePathCount = SavedLocalState.FolderState.Paths.size();
+ std::vector<std::filesystem::path> DeletedPaths;
+ FolderContent UpdatedContent = GetUpdatedContent(SavedLocalState.FolderState, CurrentLocalFolderState, DeletedPaths);
+ if (!DeletedPaths.empty())
+ {
+ SavedLocalState.State.ChunkedContent = DeletePathsFromChunkedContent(SavedLocalState.State.ChunkedContent, DeletedPaths);
+ }
+
+ if (!IsQuiet)
+ {
+ ZEN_CONSOLE("Updating state, {} local files deleted and {} local files updated out of {}",
+ DeletedPaths.size(),
+ UpdatedContent.Paths.size(),
+ LocalStatePathCount);
+ }
+ if (UpdatedContent.Paths.size() > 0)
+ {
+ ChunkedFolderContent UpdatedLocalContent = ScanFolderFiles(Progress,
+ AbortFlag,
+ PauseFlag,
+ "Scan Known Files",
+ Workers,
+ Path,
+ UpdatedContent,
+ ChunkController,
+ ChunkCache,
+ ChunkingStats);
+ if (AbortFlag)
+ {
+ return {};
+ }
+ SavedLocalState.State.ChunkedContent =
+ MergeChunkedFolderContents(SavedLocalState.State.ChunkedContent, {{UpdatedLocalContent}});
+ }
+ }
+ else
+ {
+ // Remove files from LocalContent no longer in LocalFolderState
+ tsl::robin_set<std::string> LocalFolderPaths;
+ LocalFolderPaths.reserve(SavedLocalState.FolderState.Paths.size());
+ for (const std::filesystem::path& LocalFolderPath : SavedLocalState.FolderState.Paths)
+ {
+ LocalFolderPaths.insert(LocalFolderPath.generic_string());
+ }
+ std::vector<std::filesystem::path> DeletedPaths;
+ for (const std::filesystem::path& LocalContentPath : SavedLocalState.State.ChunkedContent.Paths)
+ {
+ if (!LocalFolderPaths.contains(LocalContentPath.generic_string()))
+ {
+ DeletedPaths.push_back(LocalContentPath);
+ }
+ }
+ if (!DeletedPaths.empty())
+ {
+ SavedLocalState.State.ChunkedContent = DeletePathsFromChunkedContent(SavedLocalState.State.ChunkedContent, DeletedPaths);
+ }
+ }
+
+ SavedLocalState.FolderState = CurrentLocalFolderState;
+
+ return SavedLocalState;
+}
+
+void
+DownloadFolder(LoggerRef InLog,
+ ProgressBase& Progress,
+ TransferThreadWorkers& Workers,
+ StorageInstance& Storage,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag,
+ const BuildStorageCache::Statistics& StorageCacheStats,
+ const Oid& BuildId,
+ const std::vector<Oid>& BuildPartIds,
+ std::span<const std::string> BuildPartNames,
+ const std::filesystem::path& DownloadSpecPath,
+ const std::filesystem::path& Path,
+ const DownloadOptions& Options)
+{
+ ZEN_TRACE_CPU("DownloadFolder");
+ ZEN_SCOPED_LOG(InLog);
+
+ Progress.SetLogOperationName("Download Folder");
+
+ enum TaskSteps : uint32_t
+ {
+ CheckState,
+ CompareState,
+ Download,
+ Verify,
+ Cleanup,
+ StepCount
+ };
+
+ auto EndProgress = MakeGuard([&]() { Progress.SetLogOperationProgress(TaskSteps::StepCount, TaskSteps::StepCount); });
+
+ Stopwatch DownloadTimer;
+
+ Progress.SetLogOperationProgress(TaskSteps::CheckState, TaskSteps::StepCount);
+
+ const std::filesystem::path ZenTempFolder = ZenTempFolderPath(Options.ZenFolderPath);
+ CreateDirectories(ZenTempFolder);
+
+ std::uint64_t PreferredMultipartChunkSize = 32u * 1024u * 1024u;
+
+ CbObject BuildObject = GetBuild(*Storage.BuildStorage, BuildId, Options.IsQuiet);
+
+ std::vector<std::pair<Oid, std::string>> AllBuildParts =
+ ResolveBuildPartNames(BuildObject, BuildId, BuildPartIds, BuildPartNames, PreferredMultipartChunkSize);
+
+ BuildManifest Manifest;
+ if (!DownloadSpecPath.empty())
+ {
+ const std::filesystem::path AbsoluteDownloadSpecPath =
+ DownloadSpecPath.is_relative() ? MakeSafeAbsolutePath(Path / DownloadSpecPath) : MakeSafeAbsolutePath(DownloadSpecPath);
+ Manifest = ParseBuildManifest(AbsoluteDownloadSpecPath);
+ }
+
+ std::vector<ChunkedFolderContent> PartContents;
+
+ std::unique_ptr<ChunkingController> ChunkController;
+
+ std::vector<ChunkBlockDescription> BlockDescriptions;
+ std::vector<IoHash> LooseChunkHashes;
+
+ Progress.SetLogOperationProgress(TaskSteps::CompareState, TaskSteps::StepCount);
+
+ ChunkedFolderContent RemoteContent = GetRemoteContent(InLog,
+ Storage,
+ BuildId,
+ AllBuildParts,
+ Manifest,
+ Options.IncludeWildcards,
+ Options.ExcludeWildcards,
+ ChunkController,
+ PartContents,
+ BlockDescriptions,
+ LooseChunkHashes,
+ Options.IsQuiet,
+ Options.IsVerbose,
+ Options.DoExtraContentVerify);
+
+ const std::uint64_t LargeAttachmentSize = Options.AllowMultiparts ? PreferredMultipartChunkSize * 4u : (std::uint64_t)-1;
+ GetFolderContentStatistics LocalFolderScanStats;
+ ChunkingStatistics ChunkingStats;
+
+ BuildSaveState LocalState;
+
+ if (IsDir(Path))
+ {
+ if (!ChunkController && !Options.IsQuiet)
+ {
+ ZEN_CONSOLE_INFO("Unspecified chunking algorithm, using default");
+ ChunkController = CreateStandardChunkingController(StandardChunkingControllerSettings{});
+ }
+ std::unique_ptr<ChunkingCache> ChunkCache(CreateNullChunkingCache());
+
+ LocalState = GetLocalContent(Progress,
+ AbortFlag,
+ PauseFlag,
+ Options.IsQuiet,
+ Workers,
+ LocalFolderScanStats,
+ ChunkingStats,
+ Path,
+ ZenStateFilePath(Path / ZenFolderName),
+ *ChunkController,
+ *ChunkCache);
+
+ std::vector<std::filesystem::path> UntrackedPaths = GetNewPaths(LocalState.State.ChunkedContent.Paths, RemoteContent.Paths);
+
+ BuildSaveState UntrackedLocalContent = GetLocalStateFromPaths(Progress,
+ AbortFlag,
+ PauseFlag,
+ Workers,
+ LocalFolderScanStats,
+ ChunkingStats,
+ Path,
+ *ChunkController,
+ *ChunkCache,
+ UntrackedPaths);
+
+ if (!UntrackedLocalContent.State.ChunkedContent.Paths.empty())
+ {
+ LocalState.State.ChunkedContent =
+ MergeChunkedFolderContents(LocalState.State.ChunkedContent,
+ std::vector<ChunkedFolderContent>{UntrackedLocalContent.State.ChunkedContent});
+
+ // TODO: Helper
+ LocalState.FolderState.Paths.insert(LocalState.FolderState.Paths.begin(),
+ UntrackedLocalContent.FolderState.Paths.begin(),
+ UntrackedLocalContent.FolderState.Paths.end());
+ LocalState.FolderState.RawSizes.insert(LocalState.FolderState.RawSizes.begin(),
+ UntrackedLocalContent.FolderState.RawSizes.begin(),
+ UntrackedLocalContent.FolderState.RawSizes.end());
+ LocalState.FolderState.Attributes.insert(LocalState.FolderState.Attributes.begin(),
+ UntrackedLocalContent.FolderState.Attributes.begin(),
+ UntrackedLocalContent.FolderState.Attributes.end());
+ LocalState.FolderState.ModificationTicks.insert(LocalState.FolderState.ModificationTicks.begin(),
+ UntrackedLocalContent.FolderState.ModificationTicks.begin(),
+ UntrackedLocalContent.FolderState.ModificationTicks.end());
+ }
+
+ if (Options.AppendNewContent)
+ {
+ RemoteContent = ApplyChunkedContentOverlay(LocalState.State.ChunkedContent,
+ RemoteContent,
+ Options.IncludeWildcards,
+ Options.ExcludeWildcards);
+ }
+#if ZEN_BUILD_DEBUG
+ ValidateChunkedFolderContent(RemoteContent,
+ BlockDescriptions,
+ LooseChunkHashes,
+ Options.IncludeWildcards,
+ Options.ExcludeWildcards);
+#endif // ZEN_BUILD_DEBUG
+ }
+ else
+ {
+ CreateDirectories(Path);
+ }
+ if (AbortFlag)
+ {
+ return;
+ }
+
+ LocalState.LocalPath = Path;
+
+ {
+ BuildsSelection::Build RemoteBuildState = {.Id = BuildId,
+ .IncludeWildcards = Options.IncludeWildcards,
+ .ExcludeWildcards = Options.ExcludeWildcards};
+ RemoteBuildState.Parts.reserve(BuildPartIds.size());
+ for (size_t PartIndex = 0; PartIndex < BuildPartIds.size(); PartIndex++)
+ {
+ RemoteBuildState.Parts.push_back(
+ {BuildsSelection::BuildPart{.Id = BuildPartIds[PartIndex],
+ .Name = PartIndex < BuildPartNames.size() ? BuildPartNames[PartIndex] : ""}});
+ }
+
+ if (Options.AppendNewContent)
+ {
+ LocalState.State.Selection.Builds.emplace_back(std::move(RemoteBuildState));
+ }
+ else
+ {
+ LocalState.State.Selection.Builds = std::vector<BuildsSelection::Build>{std::move(RemoteBuildState)};
+ }
+ }
+
+ if ((Options.EnableTargetFolderScavenging || Options.AppendNewContent) && !Options.CleanTargetFolder &&
+ CompareChunkedContent(RemoteContent, LocalState.State.ChunkedContent))
+ {
+ if (!Options.IsQuiet)
+ {
+ ZEN_CONSOLE("Local state is identical to build to download. All done. Completed in {}.",
+ NiceTimeSpanMs(DownloadTimer.GetElapsedTimeMs()));
+ }
+
+ Stopwatch WriteStateTimer;
+
+ CbObject StateObject = CreateBuildSaveStateObject(LocalState);
+ CreateDirectories(ZenStateFilePath(Options.ZenFolderPath).parent_path());
+ TemporaryFile::SafeWriteFile(ZenStateFilePath(Options.ZenFolderPath), StateObject.GetView());
+ if (!Options.IsQuiet)
+ {
+ ZEN_CONSOLE("Wrote local state in {}", NiceTimeSpanMs(WriteStateTimer.GetElapsedTimeMs()));
+ }
+
+ AddDownloadedPath(Options.SystemRootDir,
+ BuildsDownloadInfo{.Selection = LocalState.State.Selection,
+ .LocalPath = Path,
+ .StateFilePath = ZenStateFilePath(Options.ZenFolderPath),
+ .Iso8601Date = DateTime::Now().ToIso8601()});
+ }
+ else
+ {
+ ExtendableStringBuilder<128> BuildPartString;
+ for (const std::pair<Oid, std::string>& BuildPart : AllBuildParts)
+ {
+ BuildPartString.Append(fmt::format(" {} ({})", BuildPart.second, BuildPart.first));
+ }
+
+ uint64_t RawSize = std::accumulate(RemoteContent.RawSizes.begin(), RemoteContent.RawSizes.end(), std::uint64_t(0));
+
+ if (!Options.IsQuiet)
+ {
+ ZEN_CONSOLE("Downloading build {}, parts:{} to '{}' ({})", BuildId, BuildPartString.ToView(), Path, NiceBytes(RawSize));
+ }
+
+ Stopwatch IndexTimer;
+
+ const ChunkedContentLookup LocalLookup = BuildChunkedContentLookup(LocalState.State.ChunkedContent);
+ const ChunkedContentLookup RemoteLookup = BuildChunkedContentLookup(RemoteContent);
+
+ if (!Options.IsQuiet)
+ {
+ ZEN_INFO("Indexed local and remote content in {}", NiceTimeSpanMs(IndexTimer.GetElapsedTimeMs()));
+ }
+
+ Progress.SetLogOperationProgress(TaskSteps::Download, TaskSteps::StepCount);
+
+ BuildsOperationUpdateFolder Updater(
+ InLog,
+ Progress,
+ Storage,
+ AbortFlag,
+ PauseFlag,
+ Workers.GetIOWorkerPool(),
+ Workers.GetNetworkPool(),
+ BuildId,
+ Path,
+ LocalState.State.ChunkedContent,
+ LocalLookup,
+ RemoteContent,
+ RemoteLookup,
+ BlockDescriptions,
+ LooseChunkHashes,
+ BuildsOperationUpdateFolder::Options{
+ .IsQuiet = Options.IsQuiet,
+ .IsVerbose = Options.IsVerbose,
+ .AllowFileClone = Options.AllowFileClone,
+ .UseSparseFiles = Options.UseSparseFiles,
+ .SystemRootDir = Options.SystemRootDir,
+ .ZenFolderPath = Options.ZenFolderPath,
+ .LargeAttachmentSize = LargeAttachmentSize,
+ .PreferredMultipartChunkSize = PreferredMultipartChunkSize,
+ .PartialBlockRequestMode = Options.PartialBlockRequestMode,
+ .WipeTargetFolder = Options.CleanTargetFolder,
+ .EnableOtherDownloadsScavenging = Options.EnableOtherDownloadsScavenging,
+ .EnableTargetFolderScavenging = Options.EnableTargetFolderScavenging || Options.AppendNewContent,
+ .ValidateCompletedSequences = Options.PostDownloadVerify,
+ .ExcludeFolders = Options.ExcludeFolders,
+ .MaximumInMemoryPayloadSize = Options.MaximumInMemoryPayloadSize,
+ .PopulateCache = Options.PopulateCache});
+ {
+ Progress.PushLogOperation("Download");
+ auto _ = MakeGuard([&Progress]() { Progress.PopLogOperation(); });
+ FolderContent UpdatedLocalFolderState;
+ Updater.Execute(UpdatedLocalFolderState);
+
+ LocalState.State.ChunkedContent = RemoteContent;
+ LocalState.FolderState = std::move(UpdatedLocalFolderState);
+ }
+
+ VerifyFolderStatistics VerifyFolderStats;
+ if (!AbortFlag)
+ {
+ AddDownloadedPath(Options.SystemRootDir,
+ BuildsDownloadInfo{.Selection = LocalState.State.Selection,
+ .LocalPath = Path,
+ .StateFilePath = ZenStateFilePath(Options.ZenFolderPath),
+ .Iso8601Date = DateTime::Now().ToIso8601()});
+
+ Progress.SetLogOperationProgress(TaskSteps::Verify, TaskSteps::StepCount);
+
+ VerifyFolder(Progress,
+ AbortFlag,
+ PauseFlag,
+ Workers,
+ RemoteContent,
+ RemoteLookup,
+ Path,
+ Options.ExcludeFolders,
+ Options.PostDownloadVerify,
+ VerifyFolderStats);
+
+ Stopwatch WriteStateTimer;
+ CbObject StateObject = CreateBuildSaveStateObject(LocalState);
+
+ CreateDirectories(ZenStateFilePath(Options.ZenFolderPath).parent_path());
+ TemporaryFile::SafeWriteFile(ZenStateFilePath(Options.ZenFolderPath), StateObject.GetView());
+ if (!Options.IsQuiet)
+ {
+ ZEN_CONSOLE("Wrote local state in {}", NiceTimeSpanMs(WriteStateTimer.GetElapsedTimeMs()));
+ }
+
+#if 0
+ ExtendableStringBuilder<1024> SB;
+ CompactBinaryToJson(StateObject, SB);
+ WriteFile(ZenStateFileJsonPath(Options.ZenFolderPath), IoBuffer(IoBuffer::Wrap, SB.Data(), SB.Size()));
+#endif // 0
+ const uint64_t DownloadCount = Updater.m_DownloadStats.DownloadedChunkCount.load() +
+ Updater.m_DownloadStats.DownloadedBlockCount.load() +
+ Updater.m_DownloadStats.DownloadedPartialBlockCount.load();
+ const uint64_t DownloadByteCount = Updater.m_DownloadStats.DownloadedChunkByteCount.load() +
+ Updater.m_DownloadStats.DownloadedBlockByteCount.load() +
+ Updater.m_DownloadStats.DownloadedPartialBlockByteCount.load();
+ const uint64_t DownloadTimeMs = DownloadTimer.GetElapsedTimeMs();
+
+ if (!Options.IsQuiet)
+ {
+ std::string CloneInfo;
+ if (Updater.m_DiskStats.CloneByteCount > 0)
+ {
+ CloneInfo = fmt::format(" ({} cloned)", NiceBytes(Updater.m_DiskStats.CloneByteCount.load()));
+ }
+
+ std::string DownloadDetails;
+ {
+ ExtendableStringBuilder<128> SB;
+ BuildStorageBase::ExtendedStatistics ExtendedDownloadStats;
+ if (Storage.BuildStorage->GetExtendedStatistics(ExtendedDownloadStats))
+ {
+ if (!ExtendedDownloadStats.ReceivedBytesPerSource.empty())
+ {
+ for (auto& It : ExtendedDownloadStats.ReceivedBytesPerSource)
+ {
+ if (SB.Size() > 0)
+ {
+ SB.Append(", "sv);
+ }
+ SB.Append(It.first);
+ SB.Append(": "sv);
+ SB.Append(NiceBytes(It.second));
+ }
+ }
+ }
+ if (Storage.CacheStorage)
+ {
+ if (SB.Size() > 0)
+ {
+ SB.Append(", "sv);
+ }
+ SB.Append("Cache: ");
+ SB.Append(NiceBytes(StorageCacheStats.TotalBytesRead.load()));
+ }
+ if (SB.Size() > 0)
+ {
+ DownloadDetails = fmt::format(" ({})", SB.ToView());
+ }
+ }
+
+ ZEN_CONSOLE(
+ "Downloaded build {}, parts:{} in {}\n"
+ " Scavenge: {} (Target: {}, Cache: {}, Others: {})\n"
+ " Download: {} ({}) {}bits/s{}\n"
+ " Write: {} ({}) {}B/s{}\n"
+ " Clean: {}\n"
+ " Finalize: {}\n"
+ " Verify: {}",
+ BuildId,
+ BuildPartString.ToView(),
+ NiceTimeSpanMs(DownloadTimeMs),
+
+ NiceTimeSpanMs((Updater.m_CacheMappingStats.CacheScanElapsedWallTimeUs +
+ Updater.m_CacheMappingStats.LocalScanElapsedWallTimeUs +
+ Updater.m_CacheMappingStats.ScavengeElapsedWallTimeUs) /
+ 1000),
+ NiceTimeSpanMs(Updater.m_CacheMappingStats.LocalScanElapsedWallTimeUs / 1000),
+ NiceTimeSpanMs(Updater.m_CacheMappingStats.CacheScanElapsedWallTimeUs / 1000),
+ NiceTimeSpanMs(Updater.m_CacheMappingStats.ScavengeElapsedWallTimeUs / 1000),
+
+ DownloadCount,
+ NiceBytes(DownloadByteCount),
+ NiceNum(GetBytesPerSecond(Updater.m_WriteChunkStats.DownloadTimeUs, DownloadByteCount * 8)),
+ DownloadDetails,
+
+ Updater.m_DiskStats.WriteCount.load(),
+ NiceBytes(Updater.m_WrittenChunkByteCount.load()),
+ NiceNum(GetBytesPerSecond(Updater.m_WriteChunkStats.WriteTimeUs, Updater.m_DiskStats.WriteByteCount.load())),
+ CloneInfo,
+
+ NiceTimeSpanMs(Updater.m_RebuildFolderStateStats.CleanFolderElapsedWallTimeUs / 1000),
+
+ NiceTimeSpanMs(Updater.m_RebuildFolderStateStats.FinalizeTreeElapsedWallTimeUs / 1000),
+
+ NiceTimeSpanMs(VerifyFolderStats.VerifyElapsedWallTimeUs / 1000));
+ }
+ }
+ }
+
+ Progress.SetLogOperationProgress(TaskSteps::Cleanup, TaskSteps::StepCount);
+
+ CleanAndRemoveDirectory(Workers.GetIOWorkerPool(), AbortFlag, PauseFlag, ZenTempFolder);
+}
+} // namespace zen