aboutsummaryrefslogtreecommitdiff
path: root/src/zenremotestore/builds/buildstorageoperations.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenremotestore/builds/buildstorageoperations.cpp')
-rw-r--r--src/zenremotestore/builds/buildstorageoperations.cpp4742
1 files changed, 4742 insertions, 0 deletions
diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp
new file mode 100644
index 000000000..f3a585737
--- /dev/null
+++ b/src/zenremotestore/builds/buildstorageoperations.cpp
@@ -0,0 +1,4742 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenremotestore/builds/buildstorageoperations.h>
+
+#include <zenremotestore/builds/buildstorage.h>
+#include <zenremotestore/builds/buildstoragecache.h>
+#include <zenremotestore/chunking/chunkblock.h>
+
+#include <zencore/basicfile.h>
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinaryfile.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/parallelwork.h>
+#include <zencore/scopeguard.h>
+#include <zencore/string.h>
+#include <zencore/timer.h>
+#include <zencore/trace.h>
+#include <zenutil/bufferedopenfile.h>
+
+#include <numeric>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <tsl/robin_map.h>
+#include <tsl/robin_set.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+using namespace std::literals;
+
+#define DO_LOG_OUTPUT(OutputTarget, InLevel, fmtstr, ...) \
+ do \
+ { \
+ using namespace std::literals; \
+ ZEN_CHECK_FORMAT_STRING(fmtstr##sv, ##__VA_ARGS__); \
+ OutputTarget.EmitLogMessage(InLevel, fmtstr, zen::logging::LogCaptureArguments(__VA_ARGS__)); \
+ } while (false)
+
+#define LOG_OUTPUT(OutputTarget, fmtstr, ...) DO_LOG_OUTPUT(OutputTarget, zen::logging::level::Info, fmtstr, ##__VA_ARGS__)
+#define LOG_OUTPUT_DEBUG(OutputTarget, fmtstr, ...) DO_LOG_OUTPUT(OutputTarget, zen::logging::level::Debug, fmtstr, ##__VA_ARGS__)
+#define LOG_OUTPUT_WARN(OutputTarget, fmtstr, ...) DO_LOG_OUTPUT(OutputTarget, zen::logging::level::Warn, fmtstr, ##__VA_ARGS__)
+
+namespace {
+ std::filesystem::path ZenTempCacheFolderPath(const std::filesystem::path& ZenFolderPath)
+ {
+ return ZenTempFolderPath(ZenFolderPath) / "cache"; // Decompressed and verified data - chunks & sequences
+ }
+ std::filesystem::path ZenTempBlockFolderPath(const std::filesystem::path& ZenFolderPath)
+ {
+ return ZenTempFolderPath(ZenFolderPath) / "blocks"; // Temp storage for whole and partial blocks
+ }
+ std::filesystem::path ZenTempDownloadFolderPath(const std::filesystem::path& ZenFolderPath)
+ {
+ return ZenTempFolderPath(ZenFolderPath) / "download"; // Temp storage for decompressed and validated chunks
+ }
+
+ uint64_t GetBytesPerSecond(uint64_t ElapsedWallTimeUS, uint64_t Count)
+ {
+ if (ElapsedWallTimeUS == 0)
+ {
+ return 0;
+ }
+ return Count * 1000000 / ElapsedWallTimeUS;
+ }
+
+ std::filesystem::path GetTempChunkedSequenceFileName(const std::filesystem::path& CacheFolderPath, const IoHash& RawHash)
+ {
+ return CacheFolderPath / (RawHash.ToHexString() + ".tmp");
+ }
+
+ std::filesystem::path GetFinalChunkedSequenceFileName(const std::filesystem::path& CacheFolderPath, const IoHash& RawHash)
+ {
+ return CacheFolderPath / RawHash.ToHexString();
+ }
+
+ uint32_t SetNativeFileAttributes(const std::filesystem::path FilePath, SourcePlatform SourcePlatform, uint32_t Attributes)
+ {
+#if ZEN_PLATFORM_WINDOWS
+ if (SourcePlatform == SourcePlatform::Windows)
+ {
+ SetFileAttributesToPath(FilePath, Attributes);
+ return Attributes;
+ }
+ else
+ {
+ uint32_t CurrentAttributes = GetFileAttributesFromPath(FilePath);
+ uint32_t NewAttributes = zen::MakeFileAttributeReadOnly(CurrentAttributes, zen::IsFileModeReadOnly(Attributes));
+ if (CurrentAttributes != NewAttributes)
+ {
+ SetFileAttributesToPath(FilePath, NewAttributes);
+ }
+ return NewAttributes;
+ }
+#endif // ZEN_PLATFORM_WINDOWS
+#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ if (SourcePlatform != SourcePlatform::Windows)
+ {
+ zen::SetFileMode(FilePath, Attributes);
+ return Attributes;
+ }
+ else
+ {
+ uint32_t CurrentMode = zen::GetFileMode(FilePath);
+ uint32_t NewMode = zen::MakeFileModeReadOnly(CurrentMode, zen::IsFileAttributeReadOnly(Attributes));
+ if (CurrentMode != NewMode)
+ {
+ zen::SetFileMode(FilePath, NewMode);
+ }
+ return NewMode;
+ }
+#endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ };
+
+ uint32_t GetNativeFileAttributes(const std::filesystem::path FilePath)
+ {
+#if ZEN_PLATFORM_WINDOWS
+ return GetFileAttributesFromPath(FilePath);
+#endif // ZEN_PLATFORM_WINDOWS
+#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ return GetFileMode(FilePath);
+#endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ }
+
+ bool IsFileWithRetry(const std::filesystem::path& Path)
+ {
+ std::error_code Ec;
+ bool Result = IsFile(Path, Ec);
+ for (size_t Retries = 0; Ec && Retries < 3; Retries++)
+ {
+ Sleep(100 + int(Retries * 50));
+ Ec.clear();
+ Result = IsFile(Path, Ec);
+ }
+ if (Ec)
+ {
+ throw std::system_error(std::error_code(Ec.value(), std::system_category()),
+ fmt::format("Check path '{}' is file failed with: {} ({})", Path, Ec.message(), Ec.value()));
+ }
+ return Result;
+ }
+
+ bool SetFileReadOnlyWithRetry(const std::filesystem::path& Path, bool ReadOnly)
+ {
+ std::error_code Ec;
+ bool Result = SetFileReadOnly(Path, ReadOnly, Ec);
+ for (size_t Retries = 0; Ec && Retries < 3; Retries++)
+ {
+ Sleep(100 + int(Retries * 50));
+ if (!IsFileWithRetry(Path))
+ {
+ return false;
+ }
+ Ec.clear();
+ Result = SetFileReadOnly(Path, ReadOnly, Ec);
+ }
+ if (Ec)
+ {
+ throw std::system_error(
+ std::error_code(Ec.value(), std::system_category()),
+ fmt::format("Failed {} read only flag for '{}' failed with: {}", ReadOnly ? "setting" : "clearing", Path, Ec.message()));
+ }
+ return Result;
+ }
+
+ void RenameFileWithRetry(BuildOpLogOutput& LogOutput, const std::filesystem::path& SourcePath, const std::filesystem::path& TargetPath)
+ {
+ std::error_code Ec;
+ RenameFile(SourcePath, TargetPath, Ec);
+ for (size_t Retries = 0; Ec && Retries < 10; Retries++)
+ {
+ ZEN_ASSERT_SLOW(IsFile(SourcePath));
+ if (Retries > 5)
+ {
+ LOG_OUTPUT_WARN(LogOutput, "Unable to overwrite file {} ({}: {}), retrying...", TargetPath, Ec.value(), Ec.message());
+ }
+ Sleep(50 + int(Retries * 150));
+ Ec.clear();
+ RenameFile(SourcePath, TargetPath, Ec);
+ }
+ if (Ec)
+ {
+ throw std::system_error(std::error_code(Ec.value(), std::system_category()),
+ fmt::format("Rename from '{}' to '{}' failed with: {}", SourcePath, TargetPath, Ec.message()));
+ }
+ }
+
+ void TryRemoveFile(BuildOpLogOutput& LogOutput, const std::filesystem::path& Path)
+ {
+ std::error_code Ec;
+ RemoveFile(Path, Ec);
+ if (Ec)
+ {
+ if (IsFile(Path, Ec))
+ {
+ Ec.clear();
+ RemoveFile(Path, Ec);
+ if (Ec)
+ {
+ LOG_OUTPUT_DEBUG(LogOutput, "Failed removing file '{}', reason: {}", Path, Ec.message());
+ }
+ }
+ }
+ }
+
+ void RemoveFileWithRetry(const std::filesystem::path& Path)
+ {
+ std::error_code Ec;
+ RemoveFile(Path, Ec);
+ for (size_t Retries = 0; Ec && Retries < 6; Retries++)
+ {
+ Sleep(100 + int(Retries * 50));
+ if (!IsFileWithRetry(Path))
+ {
+ return;
+ }
+ Ec.clear();
+ RemoveFile(Path, Ec);
+ }
+ if (Ec)
+ {
+ throw std::system_error(std::error_code(Ec.value(), std::system_category()),
+ fmt::format("Removing file '{}' failed with: {}", Path, Ec.message()));
+ }
+ }
+
+ void RemoveDirWithRetry(const std::filesystem::path& Path)
+ {
+ std::error_code Ec;
+ RemoveDir(Path, Ec);
+ for (size_t Retries = 0; Ec && Retries < 3; Retries++)
+ {
+ Sleep(100 + int(Retries * 50));
+ if (!IsDir(Path))
+ {
+ return;
+ }
+ Ec.clear();
+ RemoveDir(Path, Ec);
+ }
+ if (Ec)
+ {
+ throw std::system_error(std::error_code(Ec.value(), std::system_category()),
+ fmt::format("Removing directory '{}' failed with: {}", Path, Ec.message()));
+ }
+ }
+
+ bool CleanDirectory(WorkerThreadPool& IOWorkerPool,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag,
+ BuildOpLogOutput& LogOutput,
+ bool IsQuiet,
+ const std::filesystem::path& Path,
+ std::span<const std::string> ExcludeDirectories)
+ {
+ ZEN_TRACE_CPU("CleanDirectory");
+ Stopwatch Timer;
+
+ std::unique_ptr<BuildOpLogOutput::ProgressBar> ProgressBarPtr(LogOutput.CreateProgressBar("Clean Folder"));
+ BuildOpLogOutput::ProgressBar& Progress(*ProgressBarPtr);
+
+ std::atomic<bool> CleanWipe = true;
+ std::atomic<uint64_t> DiscoveredItemCount = 0;
+ std::atomic<uint64_t> DeletedItemCount = 0;
+ std::atomic<uint64_t> DeletedByteCount = 0;
+ ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ struct AsyncVisitor : public GetDirectoryContentVisitor
+ {
+ AsyncVisitor(const std::filesystem::path& InPath,
+ std::atomic<bool>& InAbortFlag,
+ BuildOpLogOutput& InLogOutput,
+ std::atomic<bool>& InCleanWipe,
+ std::atomic<uint64_t>& InDiscoveredItemCount,
+ std::atomic<uint64_t>& InDeletedItemCount,
+ std::atomic<uint64_t>& InDeletedByteCount,
+ std::span<const std::string> InExcludeDirectories)
+ : Path(InPath)
+ , AbortFlag(InAbortFlag)
+ , LogOutput(InLogOutput)
+ , CleanWipe(InCleanWipe)
+ , DiscoveredItemCount(InDiscoveredItemCount)
+ , DeletedItemCount(InDeletedItemCount)
+ , DeletedByteCount(InDeletedByteCount)
+ , ExcludeDirectories(InExcludeDirectories)
+ {
+ }
+ virtual void AsyncVisitDirectory(const std::filesystem::path& RelativeRoot, DirectoryContent&& Content) override
+ {
+ ZEN_TRACE_CPU("CleanDirectory_AsyncVisitDirectory");
+ if (!AbortFlag)
+ {
+ if (!Content.FileNames.empty())
+ {
+ DiscoveredItemCount += Content.FileNames.size();
+
+ const std::string RelativeRootString = RelativeRoot.generic_string();
+ bool RemoveContent = true;
+ for (const std::string_view ExcludeDirectory : ExcludeDirectories)
+ {
+ if (RelativeRootString.starts_with(ExcludeDirectory))
+ {
+ if (RelativeRootString.length() > ExcludeDirectory.length())
+ {
+ const char MaybePathDelimiter = RelativeRootString[ExcludeDirectory.length()];
+ if (MaybePathDelimiter == '/' || MaybePathDelimiter == '\\' ||
+ MaybePathDelimiter == std::filesystem::path::preferred_separator)
+ {
+ RemoveContent = false;
+ break;
+ }
+ }
+ else
+ {
+ RemoveContent = false;
+ break;
+ }
+ }
+ }
+ if (RemoveContent)
+ {
+ ZEN_TRACE_CPU("DeleteFiles");
+ for (size_t FileIndex = 0; FileIndex < Content.FileNames.size(); FileIndex++)
+ {
+ const std::filesystem::path& FileName = Content.FileNames[FileIndex];
+ const std::filesystem::path FilePath = (Path / RelativeRoot / FileName).make_preferred();
+ try
+ {
+ SetFileReadOnlyWithRetry(FilePath, false);
+ RemoveFileWithRetry(FilePath);
+ DeletedItemCount++;
+ DeletedByteCount += Content.FileSizes[FileIndex];
+ }
+ catch (const std::exception& Ex)
+ {
+ LOG_OUTPUT_WARN(LogOutput, "Failed removing file {}. Reason: {}", FilePath, Ex.what());
+ CleanWipe = false;
+ }
+ }
+ }
+ }
+ }
+ }
+ const std::filesystem::path& Path;
+ std::atomic<bool>& AbortFlag;
+ BuildOpLogOutput& LogOutput;
+ std::atomic<bool>& CleanWipe;
+ std::atomic<uint64_t>& DiscoveredItemCount;
+ std::atomic<uint64_t>& DeletedItemCount;
+ std::atomic<uint64_t>& DeletedByteCount;
+ std::span<const std::string> ExcludeDirectories;
+ } Visitor(Path, AbortFlag, LogOutput, CleanWipe, DiscoveredItemCount, DeletedItemCount, DeletedByteCount, ExcludeDirectories);
+
+ GetDirectoryContent(
+ Path,
+ DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive | DirectoryContentFlags::IncludeFileSizes,
+ Visitor,
+ IOWorkerPool,
+ Work.PendingWork());
+
+ DirectoryContent LocalDirectoryContent;
+ GetDirectoryContent(Path, DirectoryContentFlags::IncludeDirs | DirectoryContentFlags::IncludeFiles, LocalDirectoryContent);
+ DiscoveredItemCount += LocalDirectoryContent.Directories.size();
+ std::vector<std::filesystem::path> DirectoriesToDelete;
+ DirectoriesToDelete.reserve(LocalDirectoryContent.Directories.size());
+ for (std::filesystem::path& LocalDirPath : LocalDirectoryContent.Directories)
+ {
+ bool Leave = false;
+ for (const std::string_view ExcludeDirectory : ExcludeDirectories)
+ {
+ if (LocalDirPath == (Path / ExcludeDirectory))
+ {
+ Leave = true;
+ break;
+ }
+ }
+ if (!Leave)
+ {
+ DirectoriesToDelete.emplace_back(std::move(LocalDirPath));
+ DiscoveredItemCount++;
+ }
+ }
+
+ uint64_t LastUpdateTimeMs = Timer.GetElapsedTimeMs();
+
+ Work.Wait(LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
+ LastUpdateTimeMs = Timer.GetElapsedTimeMs();
+
+ uint64_t Deleted = DeletedItemCount.load();
+ uint64_t DeletedBytes = DeletedByteCount.load();
+ uint64_t Discovered = DiscoveredItemCount.load();
+ Progress.UpdateState({.Task = "Cleaning folder ",
+ .Details = fmt::format("Found {}, Deleted {} ({})", Discovered, Deleted, NiceBytes(DeletedBytes)),
+ .TotalCount = Discovered,
+ .RemainingCount = Discovered - Deleted,
+ .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ });
+
+ for (const std::filesystem::path& DirectoryToDelete : DirectoriesToDelete)
+ {
+ ZEN_TRACE_CPU("DeleteDirs");
+ try
+ {
+ std::error_code Ec;
+ zen::CleanDirectory(DirectoryToDelete, true, Ec);
+ if (Ec)
+ {
+ Sleep(200);
+ zen::CleanDirectory(DirectoryToDelete, true);
+ Ec.clear();
+ }
+
+ RemoveDirWithRetry(DirectoryToDelete);
+
+ DeletedItemCount++;
+ }
+ catch (const std::exception& Ex)
+ {
+ LOG_OUTPUT_WARN(LogOutput, "Failed removing directory {}. Reason: {}", DirectoryToDelete, Ex.what());
+ CleanWipe = false;
+ }
+
+ uint64_t NowMs = Timer.GetElapsedTimeMs();
+ if ((NowMs - LastUpdateTimeMs) >= LogOutput.GetProgressUpdateDelayMS())
+ {
+ LastUpdateTimeMs = NowMs;
+
+ uint64_t Deleted = DeletedItemCount.load();
+ uint64_t DeletedBytes = DeletedByteCount.load();
+ uint64_t Discovered = DiscoveredItemCount.load();
+ Progress.UpdateState({.Task = "Cleaning folder ",
+ .Details = fmt::format("Found {}, Deleted {} ({})", Discovered, Deleted, NiceBytes(DeletedBytes)),
+ .TotalCount = Discovered,
+ .RemainingCount = Discovered - Deleted,
+ .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(AbortFlag, PauseFlag)},
+ false);
+ }
+ }
+
+ Progress.Finish();
+ if (AbortFlag)
+ {
+ return false;
+ }
+
+ uint64_t ElapsedTimeMs = Timer.GetElapsedTimeMs();
+ if (ElapsedTimeMs >= 200 && !IsQuiet)
+ {
+ LOG_OUTPUT(LogOutput,
+ "Wiped folder '{}' {} ({}) in {}",
+ Path,
+ DiscoveredItemCount.load(),
+ NiceBytes(DeletedByteCount.load()),
+ NiceTimeSpanMs(ElapsedTimeMs));
+ }
+ return CleanWipe;
+ }
+
+ void CopyFile(bool AllowFileClone,
+ bool UseSparseFiles,
+ const std::filesystem::path& SourceFilePath,
+ const std::filesystem::path& TargetFilePath,
+ uint64_t RawSize,
+ std::atomic<uint64_t>& WriteCount,
+ std::atomic<uint64_t>& WriteByteCount,
+ std::atomic<uint64_t>& CloneCount,
+ std::atomic<uint64_t>& CloneByteCount)
+ {
+ ZEN_TRACE_CPU("CopyFile");
+ if (AllowFileClone && TryCloneFile(SourceFilePath, TargetFilePath))
+ {
+ WriteCount += 1;
+ WriteByteCount += RawSize;
+ CloneCount += 1;
+ CloneByteCount += RawSize;
+ }
+ else
+ {
+ BasicFile TargetFile(TargetFilePath, BasicFile::Mode::kTruncate);
+ if (UseSparseFiles)
+ {
+ PrepareFileForScatteredWrite(TargetFile.Handle(), RawSize);
+ }
+ uint64_t Offset = 0;
+ if (!ScanFile(SourceFilePath, 512u * 1024u, [&](const void* Data, size_t Size) {
+ TargetFile.Write(Data, Size, Offset);
+ Offset += Size;
+ WriteCount++;
+ WriteByteCount += Size;
+ }))
+ {
+ throw std::runtime_error(fmt::format("Failed to copy scavenged file '{}' to '{}'", SourceFilePath, TargetFilePath));
+ }
+ }
+ }
+
+} // namespace
+
+bool
+ReadStateObject(BuildOpLogOutput& LogOutput,
+ CbObjectView StateView,
+ Oid& OutBuildId,
+ std::vector<Oid>& BuildPartsIds,
+ std::vector<std::string>& BuildPartsNames,
+ std::vector<ChunkedFolderContent>& OutPartContents,
+ FolderContent& OutLocalFolderState)
+{
+ try
+ {
+ CbObjectView BuildView = StateView["builds"sv].AsArrayView().CreateViewIterator().AsObjectView();
+ OutBuildId = BuildView["buildId"sv].AsObjectId();
+ for (CbFieldView PartView : BuildView["parts"sv].AsArrayView())
+ {
+ CbObjectView PartObjectView = PartView.AsObjectView();
+ BuildPartsIds.push_back(PartObjectView["partId"sv].AsObjectId());
+ BuildPartsNames.push_back(std::string(PartObjectView["partName"sv].AsString()));
+ OutPartContents.push_back(LoadChunkedFolderContentToCompactBinary(PartObjectView["content"sv].AsObjectView()));
+ }
+ OutLocalFolderState = LoadFolderContentToCompactBinary(StateView["localFolderState"sv].AsObjectView());
+ return true;
+ }
+ catch (const std::exception& Ex)
+ {
+ LOG_OUTPUT_WARN(LogOutput, "Unable to read local state: ", Ex.what());
+ return false;
+ }
+}
+
+bool
+ReadStateFile(BuildOpLogOutput& LogOutput,
+ const std::filesystem::path& StateFilePath,
+ FolderContent& OutLocalFolderState,
+ ChunkedFolderContent& OutLocalContent)
+{
+ ZEN_TRACE_CPU("ReadStateFile");
+ bool HasLocalState = false;
+ try
+ {
+ CbObject CurrentStateObject = LoadCompactBinaryObject(StateFilePath).Object;
+ if (CurrentStateObject)
+ {
+ Oid CurrentBuildId;
+ std::vector<Oid> SavedBuildPartIds;
+ std::vector<std::string> SavedBuildPartsNames;
+ std::vector<ChunkedFolderContent> SavedPartContents;
+ if (ReadStateObject(LogOutput,
+ CurrentStateObject,
+ CurrentBuildId,
+ SavedBuildPartIds,
+ SavedBuildPartsNames,
+ SavedPartContents,
+ OutLocalFolderState))
+ {
+ if (!SavedPartContents.empty())
+ {
+ if (SavedPartContents.size() == 1)
+ {
+ OutLocalContent = std::move(SavedPartContents[0]);
+ }
+ else
+ {
+ OutLocalContent = MergeChunkedFolderContents(SavedPartContents[0],
+ std::span<const ChunkedFolderContent>(SavedPartContents).subspan(1));
+ }
+ HasLocalState = true;
+ }
+ }
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ LOG_OUTPUT_WARN(LogOutput, "Failed reading state file {}, falling back to scannning. Reason: {}", StateFilePath, Ex.what());
+ }
+ return HasLocalState;
+}
+
+struct BlockRangeDescriptor
+{
+ uint32_t BlockIndex = (uint32_t)-1;
+ uint64_t RangeStart = 0;
+ uint64_t RangeLength = 0;
+ uint32_t ChunkBlockIndexStart = 0;
+ uint32_t ChunkBlockIndexCount = 0;
+};
+
+BlockRangeDescriptor
+MergeBlockRanges(std::span<const BlockRangeDescriptor> Ranges)
+{
+ ZEN_ASSERT(Ranges.size() > 1);
+ const BlockRangeDescriptor& First = Ranges.front();
+ const BlockRangeDescriptor& Last = Ranges.back();
+
+ return BlockRangeDescriptor{.BlockIndex = First.BlockIndex,
+ .RangeStart = First.RangeStart,
+ .RangeLength = Last.RangeStart + Last.RangeLength - First.RangeStart,
+ .ChunkBlockIndexStart = First.ChunkBlockIndexStart,
+ .ChunkBlockIndexCount = Last.ChunkBlockIndexStart + Last.ChunkBlockIndexCount - First.ChunkBlockIndexStart};
+}
+
+std::optional<std::vector<BlockRangeDescriptor>>
+MakeOptionalBlockRangeVector(uint64_t TotalBlockSize, const BlockRangeDescriptor& Range)
+{
+ if (Range.RangeLength == TotalBlockSize)
+ {
+ return {};
+ }
+ else
+ {
+ return std::vector<BlockRangeDescriptor>{Range};
+ }
+};
+
+struct BlockRangeLimit
+{
+ uint16_t SizePercent;
+ uint16_t MaxRangeCount;
+};
+
+static const uint16_t FullBlockRangePercentLimit = 95;
+
+const std::vector<BlockRangeLimit> ForceMergeLimits = {{.SizePercent = FullBlockRangePercentLimit, .MaxRangeCount = 1},
+ {.SizePercent = 90, .MaxRangeCount = 2},
+ {.SizePercent = 85, .MaxRangeCount = 8},
+ {.SizePercent = 80, .MaxRangeCount = 16},
+ {.SizePercent = 70, .MaxRangeCount = 32},
+ {.SizePercent = 60, .MaxRangeCount = 48},
+ {.SizePercent = 2, .MaxRangeCount = 56},
+ {.SizePercent = 0, .MaxRangeCount = 64}};
+
+const BlockRangeLimit*
+GetBlockRangeLimitForRange(std::span<const BlockRangeLimit> Limits, uint64_t TotalBlockSize, std::span<const BlockRangeDescriptor> Ranges)
+{
+ if (Ranges.size() > 1)
+ {
+ const std::uint64_t WantedSize =
+ std::accumulate(Ranges.begin(), Ranges.end(), uint64_t(0), [](uint64_t Current, const BlockRangeDescriptor& Range) {
+ return Current + Range.RangeLength;
+ });
+
+ const double RangeRequestedPercent = (WantedSize * 100.0) / TotalBlockSize;
+
+ for (const BlockRangeLimit& Limit : Limits)
+ {
+ if (RangeRequestedPercent >= Limit.SizePercent && Ranges.size() > Limit.MaxRangeCount)
+ {
+ return &Limit;
+ }
+ }
+ }
+ return nullptr;
+};
+
+std::vector<BlockRangeDescriptor>
+CollapseBlockRanges(const uint64_t AlwaysAcceptableGap, std::span<const BlockRangeDescriptor> BlockRanges)
+{
+ ZEN_ASSERT(BlockRanges.size() > 1);
+ std::vector<BlockRangeDescriptor> CollapsedBlockRanges;
+
+ auto BlockRangesIt = BlockRanges.begin();
+ CollapsedBlockRanges.push_back(*BlockRangesIt++);
+ for (; BlockRangesIt != BlockRanges.end(); BlockRangesIt++)
+ {
+ BlockRangeDescriptor& LastRange = CollapsedBlockRanges.back();
+
+ const uint64_t BothRangeSize = BlockRangesIt->RangeLength + LastRange.RangeLength;
+
+ const uint64_t Gap = BlockRangesIt->RangeStart - (LastRange.RangeStart + LastRange.RangeLength);
+ if (Gap <= Max(BothRangeSize / 16, AlwaysAcceptableGap))
+ {
+ LastRange.ChunkBlockIndexCount =
+ (BlockRangesIt->ChunkBlockIndexStart + BlockRangesIt->ChunkBlockIndexCount) - LastRange.ChunkBlockIndexStart;
+ LastRange.RangeLength = (BlockRangesIt->RangeStart + BlockRangesIt->RangeLength) - LastRange.RangeStart;
+ }
+ else
+ {
+ CollapsedBlockRanges.push_back(*BlockRangesIt);
+ }
+ }
+
+ return CollapsedBlockRanges;
+};
+
+uint64_t
+CalculateNextGap(std::span<const BlockRangeDescriptor> BlockRanges)
+{
+ ZEN_ASSERT(BlockRanges.size() > 1);
+ uint64_t AcceptableGap = (uint64_t)-1;
+ for (size_t RangeIndex = 0; RangeIndex < BlockRanges.size() - 1; RangeIndex++)
+ {
+ const BlockRangeDescriptor& Range = BlockRanges[RangeIndex];
+ const BlockRangeDescriptor& NextRange = BlockRanges[RangeIndex + 1];
+
+ const uint64_t Gap = NextRange.RangeStart - (Range.RangeStart + Range.RangeLength);
+ AcceptableGap = Min(Gap, AcceptableGap);
+ }
+ AcceptableGap = RoundUp(AcceptableGap, 16u * 1024u);
+ return AcceptableGap;
+};
+
+std::optional<std::vector<BlockRangeDescriptor>>
+CalculateBlockRanges(BuildOpLogOutput& LogOutput,
+ bool IsVerbose,
+ uint32_t BlockIndex,
+ const ChunkBlockDescription& BlockDescription,
+ std::span<const uint32_t> BlockChunkIndexNeeded,
+ bool LimitToSingleRange,
+ const uint64_t ChunkStartOffsetInBlock,
+ const uint64_t TotalBlockSize,
+ uint64_t& OutTotalWantedChunksSize)
+{
+ ZEN_TRACE_CPU("CalculateBlockRanges");
+
+ std::vector<BlockRangeDescriptor> BlockRanges;
+ {
+ uint64_t CurrentOffset = ChunkStartOffsetInBlock;
+ uint32_t ChunkBlockIndex = 0;
+ uint32_t NeedBlockChunkIndexOffset = 0;
+ BlockRangeDescriptor NextRange{.BlockIndex = BlockIndex};
+ while (NeedBlockChunkIndexOffset < BlockChunkIndexNeeded.size() && ChunkBlockIndex < BlockDescription.ChunkRawHashes.size())
+ {
+ const uint32_t ChunkCompressedLength = BlockDescription.ChunkCompressedLengths[ChunkBlockIndex];
+ if (ChunkBlockIndex < BlockChunkIndexNeeded[NeedBlockChunkIndexOffset])
+ {
+ if (NextRange.RangeLength > 0)
+ {
+ BlockRanges.push_back(NextRange);
+ NextRange = {.BlockIndex = BlockIndex};
+ }
+ ChunkBlockIndex++;
+ CurrentOffset += ChunkCompressedLength;
+ }
+ else if (ChunkBlockIndex == BlockChunkIndexNeeded[NeedBlockChunkIndexOffset])
+ {
+ if (NextRange.RangeLength == 0)
+ {
+ NextRange.RangeStart = CurrentOffset;
+ NextRange.ChunkBlockIndexStart = ChunkBlockIndex;
+ }
+ NextRange.RangeLength += ChunkCompressedLength;
+ NextRange.ChunkBlockIndexCount++;
+ ChunkBlockIndex++;
+ CurrentOffset += ChunkCompressedLength;
+ NeedBlockChunkIndexOffset++;
+ }
+ else
+ {
+ ZEN_ASSERT(false);
+ }
+ }
+ if (NextRange.RangeLength > 0)
+ {
+ BlockRanges.push_back(NextRange);
+ }
+ }
+ ZEN_ASSERT(!BlockRanges.empty());
+
+ OutTotalWantedChunksSize =
+ std::accumulate(BlockRanges.begin(), BlockRanges.end(), uint64_t(0), [](uint64_t Current, const BlockRangeDescriptor& Range) {
+ return Current + Range.RangeLength;
+ });
+
+ double RangeWantedPercent = (OutTotalWantedChunksSize * 100.0) / TotalBlockSize;
+
+ if (BlockRanges.size() == 1)
+ {
+ if (IsVerbose)
+ {
+ LOG_OUTPUT(LogOutput,
+ "Range request of {} ({:.2f}%) using single range from block {} ({}) as is",
+ NiceBytes(OutTotalWantedChunksSize),
+ RangeWantedPercent,
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize));
+ }
+ return BlockRanges;
+ }
+
+ if (LimitToSingleRange)
+ {
+ const BlockRangeDescriptor MergedRange = MergeBlockRanges(BlockRanges);
+ if (IsVerbose)
+ {
+ const double RangeRequestedPercent = (MergedRange.RangeLength * 100.0) / TotalBlockSize;
+ const double WastedPercent = ((MergedRange.RangeLength - OutTotalWantedChunksSize) * 100.0) / MergedRange.RangeLength;
+
+ LOG_OUTPUT(
+ LogOutput,
+ "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) limited to single block range {} ({:.2f}%) wasting "
+ "{:.2f}% ({})",
+ NiceBytes(OutTotalWantedChunksSize),
+ RangeWantedPercent,
+ BlockRanges.size(),
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize),
+ NiceBytes(MergedRange.RangeLength),
+ RangeRequestedPercent,
+ WastedPercent,
+ NiceBytes(MergedRange.RangeLength - OutTotalWantedChunksSize));
+ }
+ return MakeOptionalBlockRangeVector(TotalBlockSize, MergedRange);
+ }
+
+ if (RangeWantedPercent > FullBlockRangePercentLimit)
+ {
+ const BlockRangeDescriptor MergedRange = MergeBlockRanges(BlockRanges);
+ if (IsVerbose)
+ {
+ const double RangeRequestedPercent = (MergedRange.RangeLength * 100.0) / TotalBlockSize;
+ const double WastedPercent = ((MergedRange.RangeLength - OutTotalWantedChunksSize) * 100.0) / MergedRange.RangeLength;
+
+ LOG_OUTPUT(LogOutput,
+ "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) exceeds {}%. Merged to single block range {} "
+ "({:.2f}%) wasting {:.2f}% ({})",
+ NiceBytes(OutTotalWantedChunksSize),
+ RangeWantedPercent,
+ BlockRanges.size(),
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize),
+ FullBlockRangePercentLimit,
+ NiceBytes(MergedRange.RangeLength),
+ RangeRequestedPercent,
+ WastedPercent,
+ NiceBytes(MergedRange.RangeLength - OutTotalWantedChunksSize));
+ }
+ return MakeOptionalBlockRangeVector(TotalBlockSize, MergedRange);
+ }
+
+ std::vector<BlockRangeDescriptor> CollapsedBlockRanges = CollapseBlockRanges(16u * 1024u, BlockRanges);
+ while (GetBlockRangeLimitForRange(ForceMergeLimits, TotalBlockSize, CollapsedBlockRanges))
+ {
+ CollapsedBlockRanges = CollapseBlockRanges(CalculateNextGap(CollapsedBlockRanges), CollapsedBlockRanges);
+ }
+
+ const std::uint64_t WantedCollapsedSize =
+ std::accumulate(CollapsedBlockRanges.begin(),
+ CollapsedBlockRanges.end(),
+ uint64_t(0),
+ [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; });
+
+ const double CollapsedRangeRequestedPercent = (WantedCollapsedSize * 100.0) / TotalBlockSize;
+
+ if (IsVerbose)
+ {
+ const double WastedPercent = ((WantedCollapsedSize - OutTotalWantedChunksSize) * 100.0) / WantedCollapsedSize;
+
+ LOG_OUTPUT(
+ LogOutput,
+ "Range request of {} ({:.2f}%) using {} ranges from block {} ({}) collapsed to {} {:.2f}% using {} ranges wasting {:.2f}% "
+ "({})",
+ NiceBytes(OutTotalWantedChunksSize),
+ RangeWantedPercent,
+ BlockRanges.size(),
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize),
+ NiceBytes(WantedCollapsedSize),
+ CollapsedRangeRequestedPercent,
+ CollapsedBlockRanges.size(),
+ WastedPercent,
+ NiceBytes(WantedCollapsedSize - OutTotalWantedChunksSize));
+ }
+ return CollapsedBlockRanges;
+}
+
+bool
+IsSingleFileChunk(const ChunkedFolderContent& RemoteContent,
+ const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> Locations)
+{
+ if (Locations.size() == 1)
+ {
+ const uint32_t FirstSequenceIndex = Locations[0]->SequenceIndex;
+ if (RemoteContent.ChunkedContent.ChunkCounts[FirstSequenceIndex] == 1)
+ {
+ ZEN_ASSERT_SLOW(Locations[0]->Offset == 0);
+ return true;
+ }
+ }
+ return false;
+}
+
+IoBuffer
+MakeBufferMemoryBased(const CompositeBuffer& PartialBlockBuffer)
+{
+ ZEN_TRACE_CPU("MakeBufferMemoryBased");
+ IoBuffer BlockMemoryBuffer;
+ std::span<const SharedBuffer> Segments = PartialBlockBuffer.GetSegments();
+ if (Segments.size() == 1)
+ {
+ IoBufferFileReference FileRef = {};
+ if (PartialBlockBuffer.GetSegments().front().AsIoBuffer().GetFileReference(FileRef))
+ {
+ BlockMemoryBuffer = UniqueBuffer::Alloc(FileRef.FileChunkSize).MoveToShared().AsIoBuffer();
+ BasicFile Reader;
+ Reader.Attach(FileRef.FileHandle);
+ auto _ = MakeGuard([&Reader]() { Reader.Detach(); });
+ MutableMemoryView ReadMem = BlockMemoryBuffer.GetMutableView();
+ Reader.Read(ReadMem.GetData(), FileRef.FileChunkSize, FileRef.FileChunkOffset);
+ return BlockMemoryBuffer;
+ }
+ else
+ {
+ return PartialBlockBuffer.GetSegments().front().AsIoBuffer();
+ }
+ }
+ else
+ {
+ // Not a homogenous memory buffer, read all to memory
+
+ BlockMemoryBuffer = UniqueBuffer::Alloc(PartialBlockBuffer.GetSize()).MoveToShared().AsIoBuffer();
+ MutableMemoryView ReadMem = BlockMemoryBuffer.GetMutableView();
+ for (const SharedBuffer& Segment : Segments)
+ {
+ IoBufferFileReference FileRef = {};
+ if (Segment.AsIoBuffer().GetFileReference(FileRef))
+ {
+ BasicFile Reader;
+ Reader.Attach(FileRef.FileHandle);
+ Reader.Read(ReadMem.GetData(), FileRef.FileChunkSize, FileRef.FileChunkOffset);
+ Reader.Detach();
+ ReadMem = ReadMem.Mid(FileRef.FileChunkSize);
+ }
+ else
+ {
+ ReadMem = ReadMem.CopyFrom(Segment.AsIoBuffer().GetView());
+ }
+ }
+ return BlockMemoryBuffer;
+ }
+}
+
+void
+StreamDecompress(std::atomic<bool>& AbortFlag,
+ const std::filesystem::path& CacheFolderPath,
+ const IoHash& SequenceRawHash,
+ CompositeBuffer&& CompressedPart,
+ DiskStatistics& DiskStats)
+{
+ ZEN_TRACE_CPU("StreamDecompress");
+ const std::filesystem::path TempChunkSequenceFileName = GetTempChunkedSequenceFileName(CacheFolderPath, SequenceRawHash);
+ TemporaryFile DecompressedTemp;
+ std::error_code Ec;
+ DecompressedTemp.CreateTemporary(TempChunkSequenceFileName.parent_path(), Ec);
+ if (Ec)
+ {
+ throw std::runtime_error(
+ fmt::format("Failed creating temporary file for decompressing large blob {}. Reason: {}", SequenceRawHash, Ec.message()));
+ }
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompressedPart, RawHash, RawSize);
+ if (!Compressed)
+ {
+ throw std::runtime_error(fmt::format("Failed to parse header of compressed large blob {}", SequenceRawHash));
+ }
+ if (RawHash != SequenceRawHash)
+ {
+ throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, SequenceRawHash));
+ }
+ PrepareFileForScatteredWrite(DecompressedTemp.Handle(), RawSize);
+
+ IoHashStream Hash;
+ bool CouldDecompress =
+ Compressed.DecompressToStream(0,
+ (uint64_t)-1,
+ [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
+ ZEN_UNUSED(SourceOffset);
+ ZEN_TRACE_CPU("StreamDecompress_Write");
+ DiskStats.ReadByteCount += SourceSize;
+ if (!AbortFlag)
+ {
+ for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
+ {
+ Hash.Append(Segment.GetView());
+ DecompressedTemp.Write(Segment, Offset);
+ Offset += Segment.GetSize();
+ DiskStats.WriteByteCount += Segment.GetSize();
+ DiskStats.WriteCount++;
+ }
+ return true;
+ }
+ return false;
+ });
+
+ if (AbortFlag)
+ {
+ return;
+ }
+
+ if (!CouldDecompress)
+ {
+ throw std::runtime_error(fmt::format("Failed to decompress large blob {}", SequenceRawHash));
+ }
+ const IoHash VerifyHash = Hash.GetHash();
+ if (VerifyHash != SequenceRawHash)
+ {
+ throw std::runtime_error(
+ fmt::format("Decompressed blob payload hash {} does not match expected hash {}", VerifyHash, SequenceRawHash));
+ }
+ DecompressedTemp.MoveTemporaryIntoPlace(TempChunkSequenceFileName, Ec);
+ if (Ec)
+ {
+ throw std::runtime_error(
+ fmt::format("Failed moving temporary file for decompressing large blob {}. Reason: {}", SequenceRawHash, Ec.message()));
+ }
+ // WriteChunkStats.ChunkCountWritten++;
+}
+
+class FilteredRate
+{
+public:
+ FilteredRate() {}
+
+ void Start()
+ {
+ if (StartTimeUS == (uint64_t)-1)
+ {
+ uint64_t Expected = (uint64_t)-1;
+ if (StartTimeUS.compare_exchange_weak(Expected, Timer.GetElapsedTimeUs()))
+ {
+ LastTimeUS = StartTimeUS.load();
+ }
+ }
+ }
+ void Stop()
+ {
+ if (EndTimeUS == (uint64_t)-1)
+ {
+ uint64_t Expected = (uint64_t)-1;
+ EndTimeUS.compare_exchange_weak(Expected, Timer.GetElapsedTimeUs());
+ }
+ }
+
+ void Update(uint64_t Count)
+ {
+ if (LastTimeUS == (uint64_t)-1)
+ {
+ return;
+ }
+ uint64_t TimeUS = Timer.GetElapsedTimeUs();
+ uint64_t TimeDeltaUS = TimeUS - LastTimeUS;
+ if (TimeDeltaUS >= 2000000)
+ {
+ uint64_t Delta = Count - LastCount;
+ uint64_t PerSecond = (Delta * 1000000) / TimeDeltaUS;
+
+ LastPerSecond = PerSecond;
+
+ LastCount = Count;
+
+ FilteredPerSecond = (PerSecond + (LastPerSecond * 7)) / 8;
+
+ LastTimeUS = TimeUS;
+ }
+ }
+
+ uint64_t GetCurrent() const // If Stopped - return total count / total time
+ {
+ if (LastTimeUS == (uint64_t)-1)
+ {
+ return 0;
+ }
+ return FilteredPerSecond;
+ }
+
+ uint64_t GetElapsedTimeUS() const
+ {
+ if (StartTimeUS == (uint64_t)-1)
+ {
+ return 0;
+ }
+ if (EndTimeUS == (uint64_t)-1)
+ {
+ return 0;
+ }
+ uint64_t TimeDeltaUS = EndTimeUS - StartTimeUS;
+ return TimeDeltaUS;
+ }
+
+ bool IsActive() const { return (StartTimeUS != (uint64_t)-1) && (EndTimeUS == (uint64_t)-1); }
+
+private:
+ Stopwatch Timer;
+ std::atomic<uint64_t> StartTimeUS = (uint64_t)-1;
+ std::atomic<uint64_t> EndTimeUS = (uint64_t)-1;
+ std::atomic<uint64_t> LastTimeUS = (uint64_t)-1;
+ uint64_t LastCount = 0;
+ uint64_t LastPerSecond = 0;
+ uint64_t FilteredPerSecond = 0;
+};
+
+EPartialBlockRequestMode
+PartialBlockRequestModeFromString(const std::string_view ModeString)
+{
+ switch (HashStringAsLowerDjb2(ModeString))
+ {
+ case HashStringDjb2("false"):
+ return EPartialBlockRequestMode::Off;
+ case HashStringDjb2("zencacheonly"):
+ return EPartialBlockRequestMode::ZenCacheOnly;
+ case HashStringDjb2("mixed"):
+ return EPartialBlockRequestMode::Mixed;
+ case HashStringDjb2("true"):
+ return EPartialBlockRequestMode::All;
+ default:
+ return EPartialBlockRequestMode::Invalid;
+ }
+}
+
+std::filesystem::path
+ZenStateFilePath(const std::filesystem::path& ZenFolderPath)
+{
+ return ZenFolderPath / "current_state.cbo";
+}
+std::filesystem::path
+ZenTempFolderPath(const std::filesystem::path& ZenFolderPath)
+{
+ return ZenFolderPath / "tmp";
+}
+
+BuildsOperationUpdateFolder::BuildsOperationUpdateFolder(BuildOpLogOutput& LogOutput,
+ StorageInstance& Storage,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag,
+ WorkerThreadPool& IOWorkerPool,
+ WorkerThreadPool& NetworkPool,
+ const Oid& BuildId,
+ const std::filesystem::path& Path,
+ const ChunkedFolderContent& LocalContent,
+ const ChunkedContentLookup& LocalLookup,
+ const ChunkedFolderContent& RemoteContent,
+ const ChunkedContentLookup& RemoteLookup,
+ const std::vector<ChunkBlockDescription>& BlockDescriptions,
+ const std::vector<IoHash>& LooseChunkHashes,
+ const Options& Options,
+ DiskStatistics& DiskStats)
+: m_LogOutput(LogOutput)
+, m_Storage(Storage)
+, m_AbortFlag(AbortFlag)
+, m_PauseFlag(PauseFlag)
+, m_IOWorkerPool(IOWorkerPool)
+, m_NetworkPool(NetworkPool)
+, m_BuildId(BuildId)
+, m_Path(Path)
+, m_LocalContent(LocalContent)
+, m_LocalLookup(LocalLookup)
+, m_RemoteContent(RemoteContent)
+, m_RemoteLookup(RemoteLookup)
+, m_BlockDescriptions(BlockDescriptions)
+, m_LooseChunkHashes(LooseChunkHashes)
+, m_Options(Options)
+, m_DiskStats(DiskStats)
+, m_CacheFolderPath(ZenTempCacheFolderPath(m_Options.ZenFolderPath))
+{
+}
+
+void
+BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
+{
+ ZEN_TRACE_CPU("BuildsOperationUpdateFolder::Execute");
+
+ ZEN_ASSERT((!m_Options.PrimeCacheOnly) ||
+ (m_Options.PrimeCacheOnly && (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Off)));
+
+ Stopwatch IndexTimer;
+
+ if (!m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput, "Indexed local and remote content in {}", NiceTimeSpanMs(IndexTimer.GetElapsedTimeMs()));
+ }
+
+ Stopwatch CacheMappingTimer;
+
+ std::vector<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters(m_RemoteContent.ChunkedContent.SequenceRawHashes.size());
+ std::vector<bool> RemoteChunkIndexNeedsCopyFromLocalFileFlags(m_RemoteContent.ChunkedContent.ChunkHashes.size());
+ std::vector<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags(m_RemoteContent.ChunkedContent.ChunkHashes.size());
+
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedChunkHashesFound;
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedSequenceHashesFound;
+ if (!m_Options.PrimeCacheOnly)
+ {
+ ScanCacheFolder(CachedChunkHashesFound, CachedSequenceHashesFound);
+ }
+
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> CachedBlocksFound;
+ if (!m_Options.PrimeCacheOnly)
+ {
+ ScanTempBlocksFolder(CachedBlocksFound);
+ }
+
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> SequenceIndexesLeftToFindToRemoteIndex;
+
+ if (!m_Options.PrimeCacheOnly && m_Options.EnableTargetFolderScavenging)
+ {
+ // Pick up all whole files we can use from current local state
+ ZEN_TRACE_CPU("GetLocalSequences");
+
+ Stopwatch LocalTimer;
+
+ std::vector<uint32_t> MissingSequenceIndexes = ScanTargetFolder(CachedChunkHashesFound, CachedSequenceHashesFound);
+
+ for (uint32_t RemoteSequenceIndex : MissingSequenceIndexes)
+ {
+ // We must write the sequence
+ const uint32_t ChunkCount = m_RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex];
+ const IoHash& RemoteSequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = ChunkCount;
+ SequenceIndexesLeftToFindToRemoteIndex.insert({RemoteSequenceRawHash, RemoteSequenceIndex});
+ }
+ }
+ else
+ {
+ for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < m_RemoteContent.ChunkedContent.SequenceRawHashes.size();
+ RemoteSequenceIndex++)
+ {
+ const uint32_t ChunkCount = m_RemoteContent.ChunkedContent.ChunkCounts[RemoteSequenceIndex];
+ SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = ChunkCount;
+ }
+ }
+
+ std::vector<ChunkedFolderContent> ScavengedContents;
+ std::vector<ChunkedContentLookup> ScavengedLookups;
+ std::vector<std::filesystem::path> ScavengedPaths;
+
+ std::vector<ScavengedSequenceCopyOperation> ScavengedSequenceCopyOperations;
+ uint64_t ScavengedPathsCount = 0;
+
+ if (!m_Options.PrimeCacheOnly && m_Options.EnableOtherDownloadsScavenging)
+ {
+ ZEN_TRACE_CPU("GetScavengedSequences");
+
+ Stopwatch ScavengeTimer;
+
+ if (!SequenceIndexesLeftToFindToRemoteIndex.empty())
+ {
+ std::vector<ScavengeSource> ScavengeSources = FindScavengeSources();
+
+ const size_t ScavengePathCount = ScavengeSources.size();
+
+ ScavengedContents.resize(ScavengePathCount);
+ ScavengedLookups.resize(ScavengePathCount);
+ ScavengedPaths.resize(ScavengePathCount);
+
+ std::unique_ptr<BuildOpLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Scavenging"));
+ BuildOpLogOutput::ProgressBar& ScavengeProgressBar(*ProgressBarPtr);
+
+ ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ std::atomic<uint64_t> PathsFound(0);
+ std::atomic<uint64_t> ChunksFound(0);
+ std::atomic<uint64_t> PathsScavenged(0);
+
+ for (size_t ScavengeIndex = 0; ScavengeIndex < ScavengePathCount; ScavengeIndex++)
+ {
+ Work.ScheduleWork(m_IOWorkerPool,
+ [this,
+ &ScavengeSources,
+ &ScavengedContents,
+ &ScavengedPaths,
+ &ScavengedLookups,
+ &PathsFound,
+ &ChunksFound,
+ &PathsScavenged,
+ ScavengeIndex](std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_FindScavengeContent");
+
+ const ScavengeSource& Source = ScavengeSources[ScavengeIndex];
+ ChunkedFolderContent& ScavengedLocalContent = ScavengedContents[ScavengeIndex];
+ ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengeIndex];
+
+ if (FindScavengeContent(Source, ScavengedLocalContent, ScavengedLookup))
+ {
+ ScavengedPaths[ScavengeIndex] = Source.Path;
+ PathsFound += ScavengedLocalContent.Paths.size();
+ ChunksFound += ScavengedLocalContent.ChunkedContent.ChunkHashes.size();
+ }
+ else
+ {
+ ScavengedPaths[ScavengeIndex].clear();
+ }
+ PathsScavenged++;
+ }
+ });
+ }
+ {
+ ZEN_TRACE_CPU("ScavengeScan_Wait");
+
+ Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
+ std::string Details = fmt::format("{}/{} scanned. {} paths and {} chunks found for scavenging",
+ PathsScavenged.load(),
+ ScavengePathCount,
+ PathsFound.load(),
+ ChunksFound.load());
+ ScavengeProgressBar.UpdateState({.Task = "Scavenging ",
+ .Details = Details,
+ .TotalCount = ScavengePathCount,
+ .RemainingCount = ScavengePathCount - PathsScavenged.load(),
+ .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ });
+ }
+
+ ScavengeProgressBar.Finish();
+ if (m_AbortFlag)
+ {
+ return;
+ }
+
+ for (uint32_t ScavengedContentIndex = 0;
+ ScavengedContentIndex < ScavengedContents.size() && (!SequenceIndexesLeftToFindToRemoteIndex.empty());
+ ScavengedContentIndex++)
+ {
+ const std::filesystem::path& ScavengePath = ScavengedPaths[ScavengedContentIndex];
+ if (!ScavengePath.empty())
+ {
+ const ChunkedFolderContent& ScavengedLocalContent = ScavengedContents[ScavengedContentIndex];
+ const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengedContentIndex];
+
+ for (uint32_t ScavengedSequenceIndex = 0;
+ ScavengedSequenceIndex < ScavengedLocalContent.ChunkedContent.SequenceRawHashes.size();
+ ScavengedSequenceIndex++)
+ {
+ const IoHash& SequenceRawHash = ScavengedLocalContent.ChunkedContent.SequenceRawHashes[ScavengedSequenceIndex];
+ if (auto It = SequenceIndexesLeftToFindToRemoteIndex.find(SequenceRawHash);
+ It != SequenceIndexesLeftToFindToRemoteIndex.end())
+ {
+ const uint32_t RemoteSequenceIndex = It->second;
+ const uint64_t RawSize =
+ m_RemoteContent.RawSizes[m_RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex]];
+ ZEN_ASSERT(RawSize > 0);
+
+ const uint32_t ScavengedPathIndex = ScavengedLookup.SequenceIndexFirstPathIndex[ScavengedSequenceIndex];
+ ZEN_ASSERT_SLOW(IsFile((ScavengePath / ScavengedLocalContent.Paths[ScavengedPathIndex]).make_preferred()));
+
+ ScavengedSequenceCopyOperations.push_back({.ScavengedContentIndex = ScavengedContentIndex,
+ .ScavengedPathIndex = ScavengedPathIndex,
+ .RemoteSequenceIndex = RemoteSequenceIndex,
+ .RawSize = RawSize});
+
+ SequenceIndexesLeftToFindToRemoteIndex.erase(SequenceRawHash);
+ SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex] = 0;
+
+ m_CacheMappingStats.ScavengedPathsMatchingSequencesCount++;
+ m_CacheMappingStats.ScavengedPathsMatchingSequencesByteCount += RawSize;
+ }
+ }
+ ScavengedPathsCount++;
+ }
+ }
+ }
+ m_CacheMappingStats.ScavengeElapsedWallTimeUs += ScavengeTimer.GetElapsedTimeUs();
+ }
+
+ uint32_t RemainingChunkCount = 0;
+ for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < m_RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++)
+ {
+ uint64_t ChunkWriteCount = GetChunkWriteCount(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex);
+ if (ChunkWriteCount > 0)
+ {
+ RemainingChunkCount++;
+ }
+ }
+
+ // Pick up all chunks in current local state
+ tsl::robin_map<IoHash, size_t, IoHash::Hasher> RawHashToCopyChunkDataIndex;
+ std::vector<CopyChunkData> CopyChunkDatas;
+
+ if (!m_Options.PrimeCacheOnly && m_Options.EnableTargetFolderScavenging)
+ {
+ ZEN_TRACE_CPU("GetLocalChunks");
+
+ Stopwatch LocalTimer;
+
+ for (uint32_t LocalSequenceIndex = 0;
+ LocalSequenceIndex < m_LocalContent.ChunkedContent.SequenceRawHashes.size() && (RemainingChunkCount > 0);
+ LocalSequenceIndex++)
+ {
+ const IoHash& LocalSequenceRawHash = m_LocalContent.ChunkedContent.SequenceRawHashes[LocalSequenceIndex];
+ const uint32_t LocalOrderOffset = m_LocalLookup.SequenceIndexChunkOrderOffset[LocalSequenceIndex];
+
+ {
+ uint64_t SourceOffset = 0;
+ const uint32_t LocalChunkCount = m_LocalContent.ChunkedContent.ChunkCounts[LocalSequenceIndex];
+ for (uint32_t LocalOrderIndex = 0; LocalOrderIndex < LocalChunkCount; LocalOrderIndex++)
+ {
+ const uint32_t LocalChunkIndex = m_LocalContent.ChunkedContent.ChunkOrders[LocalOrderOffset + LocalOrderIndex];
+ const IoHash& LocalChunkHash = m_LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex];
+ const uint64_t LocalChunkRawSize = m_LocalContent.ChunkedContent.ChunkRawSizes[LocalChunkIndex];
+
+ if (auto RemoteChunkIt = m_RemoteLookup.ChunkHashToChunkIndex.find(LocalChunkHash);
+ RemoteChunkIt != m_RemoteLookup.ChunkHashToChunkIndex.end())
+ {
+ const uint32_t RemoteChunkIndex = RemoteChunkIt->second;
+ if (!RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex])
+ {
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
+ GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex);
+
+ if (!ChunkTargetPtrs.empty())
+ {
+ CopyChunkData::ChunkTarget Target = {
+ .TargetChunkLocationCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()),
+ .RemoteChunkIndex = RemoteChunkIndex,
+ .CacheFileOffset = SourceOffset};
+ if (auto CopySourceIt = RawHashToCopyChunkDataIndex.find(LocalSequenceRawHash);
+ CopySourceIt != RawHashToCopyChunkDataIndex.end())
+ {
+ CopyChunkData& Data = CopyChunkDatas[CopySourceIt->second];
+ if (Data.TargetChunkLocationPtrs.size() > 1024)
+ {
+ RawHashToCopyChunkDataIndex.insert_or_assign(LocalSequenceRawHash, CopyChunkDatas.size());
+ CopyChunkDatas.push_back(
+ CopyChunkData{.ScavengeSourceIndex = (uint32_t)-1,
+ .SourceSequenceIndex = LocalSequenceIndex,
+ .TargetChunkLocationPtrs = ChunkTargetPtrs,
+ .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}});
+ }
+ else
+ {
+ Data.TargetChunkLocationPtrs.insert(Data.TargetChunkLocationPtrs.end(),
+ ChunkTargetPtrs.begin(),
+ ChunkTargetPtrs.end());
+ Data.ChunkTargets.push_back(Target);
+ }
+ }
+ else
+ {
+ RawHashToCopyChunkDataIndex.insert_or_assign(LocalSequenceRawHash, CopyChunkDatas.size());
+ CopyChunkDatas.push_back(
+ CopyChunkData{.ScavengeSourceIndex = (uint32_t)-1,
+ .SourceSequenceIndex = LocalSequenceIndex,
+ .TargetChunkLocationPtrs = ChunkTargetPtrs,
+ .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}});
+ }
+ m_CacheMappingStats.LocalChunkMatchingRemoteCount++;
+ m_CacheMappingStats.LocalChunkMatchingRemoteByteCount += LocalChunkRawSize;
+ RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex] = true;
+ RemainingChunkCount--;
+ }
+ }
+ }
+ SourceOffset += LocalChunkRawSize;
+ }
+ }
+ }
+ m_CacheMappingStats.LocalScanElapsedWallTimeUs += LocalTimer.GetElapsedTimeUs();
+ }
+
+ if (!m_Options.PrimeCacheOnly && m_Options.EnableOtherDownloadsScavenging)
+ {
+ ZEN_TRACE_CPU("GetScavengeChunks");
+
+ Stopwatch ScavengeTimer;
+
+ for (uint32_t ScavengedContentIndex = 0; ScavengedContentIndex < ScavengedContents.size() && (RemainingChunkCount > 0);
+ ScavengedContentIndex++)
+ {
+ const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengedContentIndex];
+ // const std::filesystem::path& ScavengedPath = ScavengedPaths[ScavengedContentIndex];
+ const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[ScavengedContentIndex];
+
+ for (uint32_t ScavengedSequenceIndex = 0;
+ ScavengedSequenceIndex < ScavengedContent.ChunkedContent.SequenceRawHashes.size() && (RemainingChunkCount > 0);
+ ScavengedSequenceIndex++)
+ {
+ const IoHash& ScavengedSequenceRawHash = ScavengedContent.ChunkedContent.SequenceRawHashes[ScavengedSequenceIndex];
+ const uint32_t ScavengedOrderOffset = ScavengedLookup.SequenceIndexChunkOrderOffset[ScavengedSequenceIndex];
+
+ {
+ uint64_t SourceOffset = 0;
+ const uint32_t ScavengedChunkCount = ScavengedContent.ChunkedContent.ChunkCounts[ScavengedSequenceIndex];
+ for (uint32_t ScavengedOrderIndex = 0; ScavengedOrderIndex < ScavengedChunkCount; ScavengedOrderIndex++)
+ {
+ const uint32_t ScavengedChunkIndex =
+ ScavengedContent.ChunkedContent.ChunkOrders[ScavengedOrderOffset + ScavengedOrderIndex];
+ const IoHash& ScavengedChunkHash = ScavengedContent.ChunkedContent.ChunkHashes[ScavengedChunkIndex];
+ const uint64_t ScavengedChunkRawSize = ScavengedContent.ChunkedContent.ChunkRawSizes[ScavengedChunkIndex];
+
+ if (auto RemoteChunkIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ScavengedChunkHash);
+ RemoteChunkIt != m_RemoteLookup.ChunkHashToChunkIndex.end())
+ {
+ const uint32_t RemoteChunkIndex = RemoteChunkIt->second;
+ if (!RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex])
+ {
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
+ GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex);
+
+ if (!ChunkTargetPtrs.empty())
+ {
+ CopyChunkData::ChunkTarget Target = {
+ .TargetChunkLocationCount = gsl::narrow<uint32_t>(ChunkTargetPtrs.size()),
+ .RemoteChunkIndex = RemoteChunkIndex,
+ .CacheFileOffset = SourceOffset};
+ if (auto CopySourceIt = RawHashToCopyChunkDataIndex.find(ScavengedSequenceRawHash);
+ CopySourceIt != RawHashToCopyChunkDataIndex.end())
+ {
+ CopyChunkData& Data = CopyChunkDatas[CopySourceIt->second];
+ if (Data.TargetChunkLocationPtrs.size() > 1024)
+ {
+ RawHashToCopyChunkDataIndex.insert_or_assign(ScavengedSequenceRawHash, CopyChunkDatas.size());
+ CopyChunkDatas.push_back(
+ CopyChunkData{.ScavengeSourceIndex = ScavengedContentIndex,
+ .SourceSequenceIndex = ScavengedSequenceIndex,
+ .TargetChunkLocationPtrs = ChunkTargetPtrs,
+ .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}});
+ }
+ else
+ {
+ Data.TargetChunkLocationPtrs.insert(Data.TargetChunkLocationPtrs.end(),
+ ChunkTargetPtrs.begin(),
+ ChunkTargetPtrs.end());
+ Data.ChunkTargets.push_back(Target);
+ }
+ }
+ else
+ {
+ RawHashToCopyChunkDataIndex.insert_or_assign(ScavengedSequenceRawHash, CopyChunkDatas.size());
+ CopyChunkDatas.push_back(
+ CopyChunkData{.ScavengeSourceIndex = ScavengedContentIndex,
+ .SourceSequenceIndex = ScavengedSequenceIndex,
+ .TargetChunkLocationPtrs = ChunkTargetPtrs,
+ .ChunkTargets = std::vector<CopyChunkData::ChunkTarget>{Target}});
+ }
+ m_CacheMappingStats.ScavengedChunkMatchingRemoteCount++;
+ m_CacheMappingStats.ScavengedChunkMatchingRemoteByteCount += ScavengedChunkRawSize;
+ RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex] = true;
+ RemainingChunkCount--;
+ }
+ }
+ }
+ SourceOffset += ScavengedChunkRawSize;
+ }
+ }
+ }
+ }
+ m_CacheMappingStats.ScavengeElapsedWallTimeUs += ScavengeTimer.GetElapsedTimeUs();
+ }
+ if (!m_Options.IsQuiet)
+ {
+ if (m_CacheMappingStats.CacheSequenceHashesCount > 0 || m_CacheMappingStats.CacheChunkCount > 0 ||
+ m_CacheMappingStats.CacheBlockCount > 0)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Download cache: Found {} ({}) chunk sequences, {} ({}) chunks, {} ({}) blocks in {}",
+ m_CacheMappingStats.CacheSequenceHashesCount,
+ NiceBytes(m_CacheMappingStats.CacheSequenceHashesByteCount),
+ m_CacheMappingStats.CacheChunkCount,
+ NiceBytes(m_CacheMappingStats.CacheChunkByteCount),
+ m_CacheMappingStats.CacheBlockCount,
+ NiceBytes(m_CacheMappingStats.CacheBlocksByteCount),
+ NiceTimeSpanMs(m_CacheMappingStats.CacheScanElapsedWallTimeUs / 1000));
+ }
+
+ if (m_CacheMappingStats.LocalPathsMatchingSequencesCount > 0 || m_CacheMappingStats.LocalChunkMatchingRemoteCount > 0)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Local state : Found {} ({}) chunk sequences, {} ({}) chunks in {}",
+ m_CacheMappingStats.LocalPathsMatchingSequencesCount,
+ NiceBytes(m_CacheMappingStats.LocalPathsMatchingSequencesByteCount),
+ m_CacheMappingStats.LocalChunkMatchingRemoteCount,
+ NiceBytes(m_CacheMappingStats.LocalChunkMatchingRemoteByteCount),
+ NiceTimeSpanMs(m_CacheMappingStats.LocalScanElapsedWallTimeUs / 1000));
+ }
+ if (m_CacheMappingStats.ScavengedPathsMatchingSequencesCount > 0 || m_CacheMappingStats.ScavengedChunkMatchingRemoteCount > 0)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Scavenge of {} paths, found {} ({}) chunk sequences, {} ({}) chunks in {}",
+ ScavengedPathsCount,
+ m_CacheMappingStats.ScavengedPathsMatchingSequencesCount,
+ NiceBytes(m_CacheMappingStats.ScavengedPathsMatchingSequencesByteCount),
+ m_CacheMappingStats.ScavengedChunkMatchingRemoteCount,
+ NiceBytes(m_CacheMappingStats.ScavengedChunkMatchingRemoteByteCount),
+ NiceTimeSpanMs(m_CacheMappingStats.ScavengeElapsedWallTimeUs / 1000));
+ }
+ }
+
+ uint64_t BytesToWrite = 0;
+
+ for (uint32_t RemoteChunkIndex = 0; RemoteChunkIndex < m_RemoteContent.ChunkedContent.ChunkHashes.size(); RemoteChunkIndex++)
+ {
+ uint64_t ChunkWriteCount = GetChunkWriteCount(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex);
+ if (ChunkWriteCount > 0)
+ {
+ BytesToWrite += m_RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] * ChunkWriteCount;
+ if (!RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex])
+ {
+ RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex] = true;
+ }
+ }
+ }
+
+ for (const ScavengedSequenceCopyOperation& ScavengeCopyOp : ScavengedSequenceCopyOperations)
+ {
+ BytesToWrite += ScavengeCopyOp.RawSize;
+ }
+
+ uint64_t TotalRequestCount = 0;
+ uint64_t TotalPartWriteCount = 0;
+ std::atomic<uint64_t> WritePartsComplete = 0;
+
+ {
+ ZEN_TRACE_CPU("WriteChunks");
+
+ Stopwatch WriteTimer;
+
+ FilteredRate FilteredDownloadedBytesPerSecond;
+ FilteredRate FilteredWrittenBytesPerSecond;
+
+ std::unique_ptr<BuildOpLogOutput::ProgressBar> WriteProgressBarPtr(
+ m_LogOutput.CreateProgressBar(m_Options.PrimeCacheOnly ? "Downloading" : "Writing"));
+ BuildOpLogOutput::ProgressBar& WriteProgressBar(*WriteProgressBarPtr);
+ ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ struct LooseChunkHashWorkData
+ {
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs;
+ uint32_t RemoteChunkIndex = (uint32_t)-1;
+ };
+
+ std::vector<LooseChunkHashWorkData> LooseChunkHashWorks;
+ TotalPartWriteCount += CopyChunkDatas.size();
+ TotalPartWriteCount += ScavengedSequenceCopyOperations.size();
+
+ for (const IoHash ChunkHash : m_LooseChunkHashes)
+ {
+ auto RemoteChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash);
+ ZEN_ASSERT(RemoteChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end());
+ const uint32_t RemoteChunkIndex = RemoteChunkIndexIt->second;
+ if (RemoteChunkIndexNeedsCopyFromLocalFileFlags[RemoteChunkIndex])
+ {
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput, "Skipping chunk {} due to cache reuse", ChunkHash);
+ }
+ continue;
+ }
+ bool NeedsCopy = true;
+ if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex].compare_exchange_strong(NeedsCopy, false))
+ {
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
+ GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, RemoteChunkIndex);
+
+ if (ChunkTargetPtrs.empty())
+ {
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput, "Skipping chunk {} due to cache reuse", ChunkHash);
+ }
+ }
+ else
+ {
+ TotalRequestCount++;
+ TotalPartWriteCount++;
+ LooseChunkHashWorks.push_back(
+ LooseChunkHashWorkData{.ChunkTargetPtrs = ChunkTargetPtrs, .RemoteChunkIndex = RemoteChunkIndex});
+ }
+ }
+ }
+
+ uint32_t BlockCount = gsl::narrow<uint32_t>(m_BlockDescriptions.size());
+
+ std::vector<bool> ChunkIsPickedUpByBlock(m_RemoteContent.ChunkedContent.ChunkHashes.size(), false);
+ auto GetNeededChunkBlockIndexes =
+ [this, &RemoteChunkIndexNeedsCopyFromSourceFlags, &ChunkIsPickedUpByBlock](const ChunkBlockDescription& BlockDescription) {
+ ZEN_TRACE_CPU("GetNeededChunkBlockIndexes");
+ std::vector<uint32_t> NeededBlockChunkIndexes;
+ for (uint32_t ChunkBlockIndex = 0; ChunkBlockIndex < BlockDescription.ChunkRawHashes.size(); ChunkBlockIndex++)
+ {
+ const IoHash& ChunkHash = BlockDescription.ChunkRawHashes[ChunkBlockIndex];
+ if (auto It = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); It != m_RemoteLookup.ChunkHashToChunkIndex.end())
+ {
+ const uint32_t RemoteChunkIndex = It->second;
+ if (!ChunkIsPickedUpByBlock[RemoteChunkIndex])
+ {
+ if (RemoteChunkIndexNeedsCopyFromSourceFlags[RemoteChunkIndex])
+ {
+ ChunkIsPickedUpByBlock[RemoteChunkIndex] = true;
+ NeededBlockChunkIndexes.push_back(ChunkBlockIndex);
+ }
+ }
+ }
+ }
+ return NeededBlockChunkIndexes;
+ };
+
+ std::vector<uint32_t> CachedChunkBlockIndexes;
+ std::vector<uint32_t> FetchBlockIndexes;
+ std::vector<std::vector<uint32_t>> AllBlockChunkIndexNeeded;
+
+ for (uint32_t BlockIndex = 0; BlockIndex < BlockCount; BlockIndex++)
+ {
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
+
+ std::vector<uint32_t> BlockChunkIndexNeeded = GetNeededChunkBlockIndexes(BlockDescription);
+ if (!BlockChunkIndexNeeded.empty())
+ {
+ if (m_Options.PrimeCacheOnly)
+ {
+ FetchBlockIndexes.push_back(BlockIndex);
+ }
+ else
+ {
+ bool UsingCachedBlock = false;
+ if (auto It = CachedBlocksFound.find(BlockDescription.BlockHash); It != CachedBlocksFound.end())
+ {
+ TotalPartWriteCount++;
+
+ std::filesystem::path BlockPath =
+ ZenTempBlockFolderPath(m_Options.ZenFolderPath) / BlockDescription.BlockHash.ToHexString();
+ if (IsFile(BlockPath))
+ {
+ CachedChunkBlockIndexes.push_back(BlockIndex);
+ UsingCachedBlock = true;
+ }
+ }
+ if (!UsingCachedBlock)
+ {
+ FetchBlockIndexes.push_back(BlockIndex);
+ }
+ }
+ }
+ AllBlockChunkIndexNeeded.emplace_back(std::move(BlockChunkIndexNeeded));
+ }
+
+ struct BlobsExistsResult
+ {
+ tsl::robin_set<IoHash> ExistingBlobs;
+ uint64_t ElapsedTimeMs = 0;
+ };
+
+ BlobsExistsResult ExistsResult;
+
+ if (m_Storage.BuildCacheStorage)
+ {
+ ZEN_TRACE_CPU("BlobCacheExistCheck");
+ Stopwatch Timer;
+
+ tsl::robin_set<IoHash> BlobHashesSet;
+
+ BlobHashesSet.reserve(LooseChunkHashWorks.size() + FetchBlockIndexes.size());
+ for (LooseChunkHashWorkData& LooseChunkHashWork : LooseChunkHashWorks)
+ {
+ BlobHashesSet.insert(m_RemoteContent.ChunkedContent.ChunkHashes[LooseChunkHashWork.RemoteChunkIndex]);
+ }
+ for (uint32_t BlockIndex : FetchBlockIndexes)
+ {
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
+ BlobHashesSet.insert(BlockDescription.BlockHash);
+ }
+
+ if (!BlobHashesSet.empty())
+ {
+ const std::vector<IoHash> BlobHashes(BlobHashesSet.begin(), BlobHashesSet.end());
+ const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult =
+ m_Storage.BuildCacheStorage->BlobsExists(m_BuildId, BlobHashes);
+
+ if (CacheExistsResult.size() == BlobHashes.size())
+ {
+ ExistsResult.ExistingBlobs.reserve(CacheExistsResult.size());
+ for (size_t BlobIndex = 0; BlobIndex < BlobHashes.size(); BlobIndex++)
+ {
+ if (CacheExistsResult[BlobIndex].HasBody)
+ {
+ ExistsResult.ExistingBlobs.insert(BlobHashes[BlobIndex]);
+ }
+ }
+ }
+ ExistsResult.ElapsedTimeMs = Timer.GetElapsedTimeMs();
+ if (!ExistsResult.ExistingBlobs.empty() && !m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Remote cache : Found {} out of {} needed blobs in {}",
+ ExistsResult.ExistingBlobs.size(),
+ BlobHashes.size(),
+ NiceTimeSpanMs(ExistsResult.ElapsedTimeMs));
+ }
+ }
+ }
+
+ std::vector<BlockRangeDescriptor> BlockRangeWorks;
+ std::vector<uint32_t> FullBlockWorks;
+ {
+ Stopwatch Timer;
+
+ std::vector<uint32_t> PartialBlockIndexes;
+
+ for (uint32_t BlockIndex : FetchBlockIndexes)
+ {
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
+
+ const std::vector<uint32_t> BlockChunkIndexNeeded = std::move(AllBlockChunkIndexNeeded[BlockIndex]);
+ if (!BlockChunkIndexNeeded.empty())
+ {
+ bool WantsToDoPartialBlockDownload = BlockChunkIndexNeeded.size() < BlockDescription.ChunkRawHashes.size();
+ bool CanDoPartialBlockDownload = (BlockDescription.HeaderSize > 0) && (BlockDescription.ChunkCompressedLengths.size() ==
+ BlockDescription.ChunkRawHashes.size());
+
+ bool AllowedToDoPartialRequest = false;
+ bool BlockExistInCache = ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash);
+ switch (m_Options.PartialBlockRequestMode)
+ {
+ case EPartialBlockRequestMode::Off:
+ break;
+ case EPartialBlockRequestMode::ZenCacheOnly:
+ AllowedToDoPartialRequest = BlockExistInCache;
+ break;
+ case EPartialBlockRequestMode::Mixed:
+ case EPartialBlockRequestMode::All:
+ AllowedToDoPartialRequest = true;
+ break;
+ default:
+ ZEN_ASSERT(false);
+ break;
+ }
+
+ const uint32_t ChunkStartOffsetInBlock =
+ gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize);
+
+ const uint64_t TotalBlockSize = std::accumulate(BlockDescription.ChunkCompressedLengths.begin(),
+ BlockDescription.ChunkCompressedLengths.end(),
+ std::uint64_t(ChunkStartOffsetInBlock));
+
+ if (AllowedToDoPartialRequest && WantsToDoPartialBlockDownload && CanDoPartialBlockDownload)
+ {
+ ZEN_TRACE_CPU("PartialBlockAnalysis");
+
+ bool LimitToSingleRange =
+ BlockExistInCache ? false : m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Mixed;
+ uint64_t TotalWantedChunksSize = 0;
+ std::optional<std::vector<BlockRangeDescriptor>> MaybeBlockRanges = CalculateBlockRanges(m_LogOutput,
+ m_Options.IsQuiet,
+ BlockIndex,
+ BlockDescription,
+ BlockChunkIndexNeeded,
+ LimitToSingleRange,
+ ChunkStartOffsetInBlock,
+ TotalBlockSize,
+ TotalWantedChunksSize);
+ ZEN_ASSERT(TotalWantedChunksSize <= TotalBlockSize);
+
+ if (MaybeBlockRanges.has_value())
+ {
+ const std::vector<BlockRangeDescriptor>& BlockRanges = MaybeBlockRanges.value();
+ ZEN_ASSERT(!BlockRanges.empty());
+ BlockRangeWorks.insert(BlockRangeWorks.end(), BlockRanges.begin(), BlockRanges.end());
+ TotalRequestCount += BlockRanges.size();
+ TotalPartWriteCount += BlockRanges.size();
+
+ uint64_t RequestedSize = std::accumulate(
+ BlockRanges.begin(),
+ BlockRanges.end(),
+ uint64_t(0),
+ [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; });
+ PartialBlockIndexes.push_back(BlockIndex);
+
+ if (RequestedSize > TotalWantedChunksSize)
+ {
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Requesting {} chunks ({}) from block {} ({}) using {} requests (extra bytes {})",
+ BlockChunkIndexNeeded.size(),
+ NiceBytes(RequestedSize),
+ BlockDescription.BlockHash,
+ NiceBytes(TotalBlockSize),
+ BlockRanges.size(),
+ NiceBytes(RequestedSize - TotalWantedChunksSize));
+ }
+ }
+ }
+ else
+ {
+ FullBlockWorks.push_back(BlockIndex);
+ TotalRequestCount++;
+ TotalPartWriteCount++;
+ }
+ }
+ else
+ {
+ FullBlockWorks.push_back(BlockIndex);
+ TotalRequestCount++;
+ TotalPartWriteCount++;
+ }
+ }
+ }
+
+ if (!PartialBlockIndexes.empty())
+ {
+ uint64_t TotalFullBlockRequestBytes = 0;
+ for (uint32_t BlockIndex : FullBlockWorks)
+ {
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
+ uint32_t CurrentOffset =
+ gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize);
+
+ TotalFullBlockRequestBytes += std::accumulate(BlockDescription.ChunkCompressedLengths.begin(),
+ BlockDescription.ChunkCompressedLengths.end(),
+ std::uint64_t(CurrentOffset));
+ }
+
+ uint64_t TotalPartialBlockBytes = 0;
+ for (uint32_t BlockIndex : PartialBlockIndexes)
+ {
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
+ uint32_t CurrentOffset =
+ gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize);
+
+ TotalPartialBlockBytes += std::accumulate(BlockDescription.ChunkCompressedLengths.begin(),
+ BlockDescription.ChunkCompressedLengths.end(),
+ std::uint64_t(CurrentOffset));
+ }
+
+ uint64_t NonPartialTotalBlockBytes = TotalFullBlockRequestBytes + TotalPartialBlockBytes;
+
+ const uint64_t TotalPartialBlockRequestBytes =
+ std::accumulate(BlockRangeWorks.begin(),
+ BlockRangeWorks.end(),
+ uint64_t(0),
+ [](uint64_t Current, const BlockRangeDescriptor& Range) { return Current + Range.RangeLength; });
+ uint64_t TotalExtraPartialBlocksRequests = BlockRangeWorks.size() - PartialBlockIndexes.size();
+
+ uint64_t TotalSavedBlocksSize = TotalPartialBlockBytes - TotalPartialBlockRequestBytes;
+ double SavedSizePercent = (TotalSavedBlocksSize * 100.0) / NonPartialTotalBlockBytes;
+
+ if (!m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Analisys of partial block requests saves download of {} out of {} ({:.1f}%) using {} extra "
+ "requests. Completed in {}",
+ NiceBytes(TotalSavedBlocksSize),
+ NiceBytes(NonPartialTotalBlockBytes),
+ SavedSizePercent,
+ TotalExtraPartialBlocksRequests,
+ NiceTimeSpanMs(ExistsResult.ElapsedTimeMs));
+ }
+ }
+ }
+
+ BufferedWriteFileCache WriteCache;
+
+ for (uint32_t ScavengeOpIndex = 0; ScavengeOpIndex < ScavengedSequenceCopyOperations.size(); ScavengeOpIndex++)
+ {
+ if (m_AbortFlag)
+ {
+ break;
+ }
+ if (!m_Options.PrimeCacheOnly)
+ {
+ Work.ScheduleWork(
+ m_IOWorkerPool,
+ [this,
+ &ScavengedPaths,
+ &ScavengedSequenceCopyOperations,
+ &ScavengedContents,
+ &FilteredWrittenBytesPerSecond,
+ ScavengeOpIndex,
+ &WritePartsComplete,
+ TotalPartWriteCount](std::atomic<bool>&) mutable {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_WriteScavenged");
+
+ FilteredWrittenBytesPerSecond.Start();
+
+ const ScavengedSequenceCopyOperation& ScavengeOp = ScavengedSequenceCopyOperations[ScavengeOpIndex];
+ const ChunkedFolderContent& ScavengedContent = ScavengedContents[ScavengeOp.ScavengedContentIndex];
+ const std::filesystem::path& ScavengeRootPath = ScavengedPaths[ScavengeOp.ScavengedContentIndex];
+
+ WriteScavengedSequenceToCache(ScavengeRootPath, ScavengedContent, ScavengeOp);
+
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
+ }
+ });
+ }
+ }
+
+ for (uint32_t LooseChunkHashWorkIndex = 0; LooseChunkHashWorkIndex < LooseChunkHashWorks.size(); LooseChunkHashWorkIndex++)
+ {
+ if (m_AbortFlag)
+ {
+ break;
+ }
+
+ LooseChunkHashWorkData& LooseChunkHashWork = LooseChunkHashWorks[LooseChunkHashWorkIndex];
+
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs = std::move(LooseChunkHashWork.ChunkTargetPtrs);
+ const uint32_t RemoteChunkIndex = LooseChunkHashWork.RemoteChunkIndex;
+
+ if (m_Options.PrimeCacheOnly &&
+ ExistsResult.ExistingBlobs.contains(m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]))
+ {
+ m_DownloadStats.RequestsCompleteCount++;
+ continue;
+ }
+
+ Work.ScheduleWork(
+ m_IOWorkerPool,
+ [this,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &Work,
+ &ExistsResult,
+ &WritePartsComplete,
+ RemoteChunkIndex,
+ ChunkTargetPtrs,
+ TotalRequestCount,
+ TotalPartWriteCount,
+ &WriteCache,
+ &FilteredDownloadedBytesPerSecond,
+ &FilteredWrittenBytesPerSecond](std::atomic<bool>&) mutable {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_ReadPreDownloadedChunk");
+
+ std::filesystem::path ExistingCompressedChunkPath;
+ if (!m_Options.PrimeCacheOnly)
+ {
+ const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+ ExistingCompressedChunkPath = FindDownloadedChunk(ChunkHash);
+ if (!ExistingCompressedChunkPath.empty())
+ {
+ m_DownloadStats.RequestsCompleteCount++;
+ if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ }
+ }
+ if (!m_AbortFlag)
+
+ {
+ if (!ExistingCompressedChunkPath.empty())
+ {
+ Work.ScheduleWork(
+ m_IOWorkerPool,
+ [this,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &WriteCache,
+ &Work,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ &FilteredWrittenBytesPerSecond,
+ RemoteChunkIndex,
+ ChunkTargetPtrs,
+ CompressedChunkPath = std::move(ExistingCompressedChunkPath)](std::atomic<bool>& AbortFlag) mutable {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_WritePreDownloadedChunk");
+
+ FilteredWrittenBytesPerSecond.Start();
+
+ const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+
+ IoBuffer CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath);
+ if (!CompressedPart)
+ {
+ throw std::runtime_error(fmt::format("Could not open dowloaded compressed chunk {} from {}",
+ ChunkHash,
+ CompressedChunkPath));
+ }
+
+ bool NeedHashVerify = WriteCompressedChunkToCache(ChunkHash,
+ ChunkTargetPtrs,
+ WriteCache,
+ std::move(CompressedPart));
+ WritePartsComplete++;
+
+ if (!AbortFlag)
+ {
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
+
+ TryRemoveFile(m_LogOutput, CompressedChunkPath);
+
+ std::vector<uint32_t> CompletedSequences =
+ CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters);
+ WriteCache.Close(CompletedSequences);
+ if (NeedHashVerify)
+ {
+ VerifyAndCompleteChunkSequencesAsync(CompletedSequences, Work);
+ }
+ else
+ {
+ FinalizeChunkSequences(CompletedSequences);
+ }
+ }
+ }
+ });
+ }
+ else
+ {
+ Work.ScheduleWork(
+ m_NetworkPool,
+ [this,
+ &ExistsResult,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &WriteCache,
+ &Work,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ TotalRequestCount,
+ &FilteredDownloadedBytesPerSecond,
+ &FilteredWrittenBytesPerSecond,
+ RemoteChunkIndex,
+ ChunkTargetPtrs](std::atomic<bool>&) mutable {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_DownloadChunk");
+
+ const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+ FilteredDownloadedBytesPerSecond.Start();
+ IoBuffer BuildBlob;
+ const bool ExistsInCache =
+ m_Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash);
+ if (ExistsInCache)
+ {
+ BuildBlob = m_Storage.BuildCacheStorage->GetBuildBlob(m_BuildId, ChunkHash);
+ }
+ if (BuildBlob)
+ {
+ uint64_t BlobSize = BuildBlob.GetSize();
+ m_DownloadStats.DownloadedChunkCount++;
+ m_DownloadStats.DownloadedChunkByteCount += BlobSize;
+ m_DownloadStats.RequestsCompleteCount++;
+ if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ AsyncWriteDownloadedChunk(m_Options.ZenFolderPath,
+ RemoteChunkIndex,
+ std::move(ChunkTargetPtrs),
+ WriteCache,
+ Work,
+ std::move(BuildBlob),
+ SequenceIndexChunksLeftToWriteCounters,
+ WritePartsComplete,
+ TotalPartWriteCount,
+ FilteredWrittenBytesPerSecond);
+ }
+ else
+ {
+ if (m_RemoteContent.ChunkedContent.ChunkRawSizes[RemoteChunkIndex] >=
+ m_Options.LargeAttachmentSize)
+ {
+ DownloadLargeBlob(*m_Storage.BuildStorage,
+ ZenTempDownloadFolderPath(m_Options.ZenFolderPath),
+ m_BuildId,
+ ChunkHash,
+ m_Options.PreferredMultipartChunkSize,
+ Work,
+ m_NetworkPool,
+ m_DownloadStats,
+ [this,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &WriteCache,
+ &Work,
+ ChunkHash,
+ TotalPartWriteCount,
+ TotalRequestCount,
+ &WritePartsComplete,
+ &FilteredWrittenBytesPerSecond,
+ &FilteredDownloadedBytesPerSecond,
+ RemoteChunkIndex,
+ ChunkTargetPtrs](IoBuffer&& Payload) mutable {
+ m_DownloadStats.RequestsCompleteCount++;
+ if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ if (Payload && m_Storage.BuildCacheStorage)
+ {
+ m_Storage.BuildCacheStorage->PutBuildBlob(
+ m_BuildId,
+ ChunkHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(SharedBuffer(Payload)));
+ }
+ if (!m_Options.PrimeCacheOnly)
+ {
+ if (!m_AbortFlag)
+ {
+ AsyncWriteDownloadedChunk(
+ m_Options.ZenFolderPath,
+ RemoteChunkIndex,
+ std::move(ChunkTargetPtrs),
+ WriteCache,
+ Work,
+ std::move(Payload),
+ SequenceIndexChunksLeftToWriteCounters,
+ WritePartsComplete,
+ TotalPartWriteCount,
+ FilteredWrittenBytesPerSecond);
+ }
+ }
+ });
+ }
+ else
+ {
+ BuildBlob = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, ChunkHash);
+ if (BuildBlob && m_Storage.BuildCacheStorage)
+ {
+ m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId,
+ ChunkHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(SharedBuffer(BuildBlob)));
+ }
+ if (!BuildBlob)
+ {
+ throw std::runtime_error(fmt::format("Chunk {} is missing", ChunkHash));
+ }
+ if (!m_Options.PrimeCacheOnly)
+ {
+ if (!m_AbortFlag)
+ {
+ uint64_t BlobSize = BuildBlob.GetSize();
+ m_DownloadStats.DownloadedChunkCount++;
+ m_DownloadStats.DownloadedChunkByteCount += BlobSize;
+ m_DownloadStats.RequestsCompleteCount++;
+ if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ AsyncWriteDownloadedChunk(m_Options.ZenFolderPath,
+ RemoteChunkIndex,
+ std::move(ChunkTargetPtrs),
+ WriteCache,
+ Work,
+ std::move(BuildBlob),
+ SequenceIndexChunksLeftToWriteCounters,
+ WritePartsComplete,
+ TotalPartWriteCount,
+ FilteredWrittenBytesPerSecond);
+ }
+ }
+ }
+ }
+ }
+ });
+ }
+ }
+ }
+ });
+ }
+
+ std::unique_ptr<CloneQueryInterface> CloneQuery;
+ if (m_Options.AllowFileClone)
+ {
+ CloneQuery = GetCloneQueryInterface(m_CacheFolderPath);
+ }
+
+ for (size_t CopyDataIndex = 0; CopyDataIndex < CopyChunkDatas.size(); CopyDataIndex++)
+ {
+ ZEN_ASSERT(!m_Options.PrimeCacheOnly);
+ if (m_AbortFlag)
+ {
+ break;
+ }
+
+ Work.ScheduleWork(m_IOWorkerPool,
+ [this,
+ &CloneQuery,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &WriteCache,
+ &Work,
+ &FilteredWrittenBytesPerSecond,
+ &CopyChunkDatas,
+ &ScavengedContents,
+ &ScavengedLookups,
+ &ScavengedPaths,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ CopyDataIndex](std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_CopyLocal");
+
+ FilteredWrittenBytesPerSecond.Start();
+ const CopyChunkData& CopyData = CopyChunkDatas[CopyDataIndex];
+
+ std::vector<uint32_t> WrittenSequenceIndexes = WriteLocalChunkToCache(CloneQuery.get(),
+ CopyData,
+ ScavengedContents,
+ ScavengedLookups,
+ ScavengedPaths,
+ WriteCache);
+ WritePartsComplete++;
+ if (!m_AbortFlag)
+ {
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
+
+ // Write tracking, updating this must be done without any files open
+ std::vector<uint32_t> CompletedChunkSequences;
+ for (uint32_t RemoteSequenceIndex : WrittenSequenceIndexes)
+ {
+ if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters))
+ {
+ CompletedChunkSequences.push_back(RemoteSequenceIndex);
+ }
+ }
+ WriteCache.Close(CompletedChunkSequences);
+ VerifyAndCompleteChunkSequencesAsync(CompletedChunkSequences, Work);
+ }
+ }
+ });
+ }
+
+ for (uint32_t BlockIndex : CachedChunkBlockIndexes)
+ {
+ ZEN_ASSERT(!m_Options.PrimeCacheOnly);
+ if (m_AbortFlag)
+ {
+ break;
+ }
+
+ Work.ScheduleWork(m_IOWorkerPool,
+ [this,
+ &RemoteChunkIndexNeedsCopyFromSourceFlags,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &WriteCache,
+ &Work,
+ &FilteredWrittenBytesPerSecond,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ BlockIndex](std::atomic<bool>&) mutable {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_WriteCachedBlock");
+
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
+ FilteredWrittenBytesPerSecond.Start();
+
+ std::filesystem::path BlockChunkPath =
+ ZenTempBlockFolderPath(m_Options.ZenFolderPath) / BlockDescription.BlockHash.ToHexString();
+ IoBuffer BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
+ if (!BlockBuffer)
+ {
+ throw std::runtime_error(
+ fmt::format("Can not read block {} at {}", BlockDescription.BlockHash, BlockChunkPath));
+ }
+
+ if (!m_AbortFlag)
+ {
+ if (!WriteChunksBlockToCache(BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ CompositeBuffer(std::move(BlockBuffer)),
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ WriteCache))
+ {
+ std::error_code DummyEc;
+ RemoveFile(BlockChunkPath, DummyEc);
+ throw std::runtime_error(fmt::format("Block {} is malformed", BlockDescription.BlockHash));
+ }
+
+ TryRemoveFile(m_LogOutput, BlockChunkPath);
+
+ WritePartsComplete++;
+
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
+ }
+ }
+ });
+ }
+
+ for (size_t BlockRangeIndex = 0; BlockRangeIndex < BlockRangeWorks.size(); BlockRangeIndex++)
+ {
+ ZEN_ASSERT(!m_Options.PrimeCacheOnly);
+ if (m_AbortFlag)
+ {
+ break;
+ }
+ const BlockRangeDescriptor BlockRange = BlockRangeWorks[BlockRangeIndex];
+ ZEN_ASSERT(BlockRange.BlockIndex != (uint32_t)-1);
+ const uint32_t BlockIndex = BlockRange.BlockIndex;
+ Work.ScheduleWork(
+ m_NetworkPool,
+ [this,
+ &RemoteChunkIndexNeedsCopyFromSourceFlags,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &ExistsResult,
+ &WriteCache,
+ &FilteredDownloadedBytesPerSecond,
+ TotalRequestCount,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ &FilteredWrittenBytesPerSecond,
+ &Work,
+ BlockIndex,
+ BlockRange](std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_GetPartialBlock");
+
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
+
+ FilteredDownloadedBytesPerSecond.Start();
+ IoBuffer BlockBuffer;
+ if (m_Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash))
+ {
+ BlockBuffer = m_Storage.BuildCacheStorage->GetBuildBlob(m_BuildId,
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength);
+ }
+ if (!BlockBuffer)
+ {
+ BlockBuffer = m_Storage.BuildStorage->GetBuildBlob(m_BuildId,
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength);
+ }
+ if (!BlockBuffer)
+ {
+ throw std::runtime_error(fmt::format("Block {} is missing when fetching range {} -> {}",
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeStart + BlockRange.RangeLength));
+ }
+ if (!m_AbortFlag)
+ {
+ uint64_t BlockSize = BlockBuffer.GetSize();
+ m_DownloadStats.DownloadedBlockCount++;
+ m_DownloadStats.DownloadedBlockByteCount += BlockSize;
+ m_DownloadStats.RequestsCompleteCount++;
+ if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+
+ std::filesystem::path BlockChunkPath;
+
+ // Check if the dowloaded block is file based and we can move it directly without rewriting it
+ {
+ IoBufferFileReference FileRef;
+ if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) &&
+ (FileRef.FileChunkSize == BlockSize))
+ {
+ ZEN_TRACE_CPU("MoveTempPartialBlock");
+
+ std::error_code Ec;
+ std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
+ if (!Ec)
+ {
+ BlockBuffer.SetDeleteOnClose(false);
+ BlockBuffer = {};
+ BlockChunkPath =
+ ZenTempBlockFolderPath(m_Options.ZenFolderPath) / fmt::format("{}_{:x}_{:x}",
+ BlockDescription.BlockHash,
+ BlockRange.RangeStart,
+ BlockRange.RangeLength);
+ RenameFile(TempBlobPath, BlockChunkPath, Ec);
+ if (Ec)
+ {
+ BlockChunkPath = std::filesystem::path{};
+
+ // Re-open the temp file again
+ BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
+ BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true);
+ BlockBuffer.SetDeleteOnClose(true);
+ }
+ }
+ }
+ }
+
+ if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u))
+ {
+ ZEN_TRACE_CPU("WriteTempPartialBlock");
+ // Could not be moved and rather large, lets store it on disk
+ BlockChunkPath =
+ ZenTempBlockFolderPath(m_Options.ZenFolderPath) /
+ fmt::format("{}_{:x}_{:x}", BlockDescription.BlockHash, BlockRange.RangeStart, BlockRange.RangeLength);
+ TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
+ BlockBuffer = {};
+ }
+
+ if (!m_AbortFlag)
+ {
+ Work.ScheduleWork(
+ m_IOWorkerPool,
+ [this,
+ &RemoteChunkIndexNeedsCopyFromSourceFlags,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &WritePartsComplete,
+ &WriteCache,
+ &Work,
+ TotalPartWriteCount,
+ &FilteredWrittenBytesPerSecond,
+ BlockIndex,
+ BlockRange,
+ BlockChunkPath,
+ BlockPartialBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_WritePartialBlock");
+
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
+
+ if (BlockChunkPath.empty())
+ {
+ ZEN_ASSERT(BlockPartialBuffer);
+ }
+ else
+ {
+ ZEN_ASSERT(!BlockPartialBuffer);
+ BlockPartialBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
+ if (!BlockPartialBuffer)
+ {
+ throw std::runtime_error(fmt::format("Could not open downloaded block {} from {}",
+ BlockDescription.BlockHash,
+ BlockChunkPath));
+ }
+ }
+
+ FilteredWrittenBytesPerSecond.Start();
+
+ if (!WritePartialBlockChunksToCache(
+ BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ CompositeBuffer(std::move(BlockPartialBuffer)),
+ BlockRange.ChunkBlockIndexStart,
+ BlockRange.ChunkBlockIndexStart + BlockRange.ChunkBlockIndexCount - 1,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ WriteCache))
+ {
+ std::error_code DummyEc;
+ RemoveFile(BlockChunkPath, DummyEc);
+ throw std::runtime_error(
+ fmt::format("Partial block {} is malformed", BlockDescription.BlockHash));
+ }
+
+ if (!BlockChunkPath.empty())
+ {
+ TryRemoveFile(m_LogOutput, BlockChunkPath);
+ }
+
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
+ }
+ });
+ }
+ }
+ }
+ });
+ }
+
+ for (uint32_t BlockIndex : FullBlockWorks)
+ {
+ if (m_AbortFlag)
+ {
+ break;
+ }
+
+ if (m_Options.PrimeCacheOnly && ExistsResult.ExistingBlobs.contains(m_BlockDescriptions[BlockIndex].BlockHash))
+ {
+ m_DownloadStats.RequestsCompleteCount++;
+ continue;
+ }
+
+ Work.ScheduleWork(
+ m_NetworkPool,
+ [this,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ &FilteredWrittenBytesPerSecond,
+ &ExistsResult,
+ &Work,
+ &WriteCache,
+ &RemoteChunkIndexNeedsCopyFromSourceFlags,
+ &SequenceIndexChunksLeftToWriteCounters,
+ &FilteredDownloadedBytesPerSecond,
+ TotalRequestCount,
+ BlockIndex](std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_GetFullBlock");
+
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
+
+ FilteredDownloadedBytesPerSecond.Start();
+
+ IoBuffer BlockBuffer;
+ const bool ExistsInCache =
+ m_Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash);
+ if (ExistsInCache)
+ {
+ BlockBuffer = m_Storage.BuildCacheStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash);
+ }
+ if (!BlockBuffer)
+ {
+ BlockBuffer = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash);
+ if (BlockBuffer && m_Storage.BuildCacheStorage)
+ {
+ m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId,
+ BlockDescription.BlockHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(SharedBuffer(BlockBuffer)));
+ }
+ }
+ if (!BlockBuffer)
+ {
+ throw std::runtime_error(fmt::format("Block {} is missing", BlockDescription.BlockHash));
+ }
+ if (!m_AbortFlag)
+ {
+ uint64_t BlockSize = BlockBuffer.GetSize();
+ m_DownloadStats.DownloadedBlockCount++;
+ m_DownloadStats.DownloadedBlockByteCount += BlockSize;
+ m_DownloadStats.RequestsCompleteCount++;
+ if (m_DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+
+ if (!m_Options.PrimeCacheOnly)
+ {
+ std::filesystem::path BlockChunkPath;
+
+ // Check if the dowloaded block is file based and we can move it directly without rewriting it
+ {
+ IoBufferFileReference FileRef;
+ if (BlockBuffer.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) &&
+ (FileRef.FileChunkSize == BlockSize))
+ {
+ ZEN_TRACE_CPU("MoveTempFullBlock");
+ std::error_code Ec;
+ std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
+ if (!Ec)
+ {
+ BlockBuffer.SetDeleteOnClose(false);
+ BlockBuffer = {};
+ BlockChunkPath =
+ ZenTempBlockFolderPath(m_Options.ZenFolderPath) / BlockDescription.BlockHash.ToHexString();
+ RenameFile(TempBlobPath, BlockChunkPath, Ec);
+ if (Ec)
+ {
+ BlockChunkPath = std::filesystem::path{};
+
+ // Re-open the temp file again
+ BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
+ BlockBuffer = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, BlockSize, true);
+ BlockBuffer.SetDeleteOnClose(true);
+ }
+ }
+ }
+ }
+
+ if (BlockChunkPath.empty() && (BlockSize > 512u * 1024u))
+ {
+ ZEN_TRACE_CPU("WriteTempFullBlock");
+ // Could not be moved and rather large, lets store it on disk
+ BlockChunkPath =
+ ZenTempBlockFolderPath(m_Options.ZenFolderPath) / BlockDescription.BlockHash.ToHexString();
+ TemporaryFile::SafeWriteFile(BlockChunkPath, BlockBuffer);
+ BlockBuffer = {};
+ }
+
+ if (!m_AbortFlag)
+ {
+ Work.ScheduleWork(
+ m_IOWorkerPool,
+ [this,
+ &Work,
+ &RemoteChunkIndexNeedsCopyFromSourceFlags,
+ &SequenceIndexChunksLeftToWriteCounters,
+ BlockIndex,
+ &WriteCache,
+ &WritePartsComplete,
+ TotalPartWriteCount,
+ &FilteredWrittenBytesPerSecond,
+ BlockChunkPath,
+ BlockBuffer = std::move(BlockBuffer)](std::atomic<bool>&) mutable {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_WriteFullBlock");
+
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
+
+ if (BlockChunkPath.empty())
+ {
+ ZEN_ASSERT(BlockBuffer);
+ }
+ else
+ {
+ ZEN_ASSERT(!BlockBuffer);
+ BlockBuffer = IoBufferBuilder::MakeFromFile(BlockChunkPath);
+ if (!BlockBuffer)
+ {
+ throw std::runtime_error(fmt::format("Could not open dowloaded block {} from {}",
+ BlockDescription.BlockHash,
+ BlockChunkPath));
+ }
+ }
+
+ FilteredWrittenBytesPerSecond.Start();
+ if (!WriteChunksBlockToCache(BlockDescription,
+ SequenceIndexChunksLeftToWriteCounters,
+ Work,
+ CompositeBuffer(std::move(BlockBuffer)),
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ WriteCache))
+ {
+ std::error_code DummyEc;
+ RemoveFile(BlockChunkPath, DummyEc);
+ throw std::runtime_error(
+ fmt::format("Block {} is malformed", BlockDescription.BlockHash));
+ }
+
+ if (!BlockChunkPath.empty())
+ {
+ TryRemoveFile(m_LogOutput, BlockChunkPath);
+ }
+
+ WritePartsComplete++;
+
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
+ }
+ });
+ }
+ }
+ }
+ }
+ });
+ }
+
+ {
+ ZEN_TRACE_CPU("WriteChunks_Wait");
+
+ Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
+ uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() +
+ m_DownloadStats.DownloadedBlockByteCount.load() +
+ +m_DownloadStats.DownloadedPartialBlockByteCount.load();
+ FilteredWrittenBytesPerSecond.Update(m_DiskStats.WriteByteCount.load());
+ FilteredDownloadedBytesPerSecond.Update(DownloadedBytes);
+ std::string DownloadRateString = (m_DownloadStats.RequestsCompleteCount == TotalRequestCount)
+ ? ""
+ : fmt::format(" {}bits/s", NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8));
+ std::string CloneDetails;
+ if (m_DiskStats.CloneCount.load() > 0)
+ {
+ CloneDetails = fmt::format(" ({} cloned)", NiceBytes(m_DiskStats.CloneByteCount.load()));
+ }
+ std::string WriteDetails = m_Options.PrimeCacheOnly ? ""
+ : fmt::format(" {}/{} ({}B/s) written{}.",
+ NiceBytes(m_DiskStats.WriteByteCount.load()),
+ NiceBytes(BytesToWrite),
+ NiceNum(FilteredWrittenBytesPerSecond.GetCurrent()),
+ CloneDetails);
+ std::string Details = fmt::format("{}/{} ({}{}) downloaded.{}",
+ m_DownloadStats.RequestsCompleteCount.load(),
+ TotalRequestCount,
+ NiceBytes(DownloadedBytes),
+ DownloadRateString,
+ WriteDetails);
+ WriteProgressBar.UpdateState(
+ {.Task = m_Options.PrimeCacheOnly ? "Downloading " : "Writing chunks ",
+ .Details = Details,
+ .TotalCount = m_Options.PrimeCacheOnly ? TotalRequestCount : BytesToWrite,
+ .RemainingCount = m_Options.PrimeCacheOnly ? (TotalRequestCount - m_DownloadStats.RequestsCompleteCount.load())
+ : (BytesToWrite - m_DiskStats.WriteByteCount.load()),
+ .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ });
+ }
+
+ CloneQuery.reset();
+
+ FilteredWrittenBytesPerSecond.Stop();
+ FilteredDownloadedBytesPerSecond.Stop();
+
+ WriteProgressBar.Finish();
+ if (m_AbortFlag)
+ {
+ return;
+ }
+
+ if (!m_Options.PrimeCacheOnly)
+ {
+ uint32_t RawSequencesMissingWriteCount = 0;
+ for (uint32_t SequenceIndex = 0; SequenceIndex < SequenceIndexChunksLeftToWriteCounters.size(); SequenceIndex++)
+ {
+ const auto& SequenceIndexChunksLeftToWriteCounter = SequenceIndexChunksLeftToWriteCounters[SequenceIndex];
+ if (SequenceIndexChunksLeftToWriteCounter.load() != 0)
+ {
+ RawSequencesMissingWriteCount++;
+ const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex];
+ const std::filesystem::path& IncompletePath = m_RemoteContent.Paths[PathIndex];
+ ZEN_ASSERT(!IncompletePath.empty());
+ const uint32_t ExpectedSequenceCount = m_RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex];
+ if (!m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "{}: Max count {}, Current count {}",
+ IncompletePath,
+ ExpectedSequenceCount,
+ SequenceIndexChunksLeftToWriteCounter.load());
+ }
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounter.load() <= ExpectedSequenceCount);
+ }
+ }
+ ZEN_ASSERT(RawSequencesMissingWriteCount == 0);
+ }
+
+ const uint64_t DownloadedBytes = m_DownloadStats.DownloadedChunkByteCount.load() + m_DownloadStats.DownloadedBlockByteCount.load() +
+ m_DownloadStats.DownloadedPartialBlockByteCount.load();
+ if (!m_Options.IsQuiet)
+ {
+ std::string CloneDetails;
+ if (m_DiskStats.CloneCount.load() > 0)
+ {
+ CloneDetails = fmt::format(" ({} cloned)", NiceBytes(m_DiskStats.CloneByteCount.load()));
+ }
+ LOG_OUTPUT(m_LogOutput,
+ "Downloaded {} ({}bits/s) in {}. Wrote {} ({}B/s){} in {}. Completed in {}",
+ NiceBytes(DownloadedBytes),
+ NiceNum(GetBytesPerSecond(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS(), DownloadedBytes * 8)),
+ NiceTimeSpanMs(FilteredDownloadedBytesPerSecond.GetElapsedTimeUS() / 1000),
+ NiceBytes(m_DiskStats.WriteByteCount.load()),
+ NiceNum(GetBytesPerSecond(FilteredWrittenBytesPerSecond.GetElapsedTimeUS(), m_DiskStats.WriteByteCount.load())),
+ CloneDetails,
+ NiceTimeSpanMs(FilteredWrittenBytesPerSecond.GetElapsedTimeUS() / 1000),
+ NiceTimeSpanMs(WriteTimer.GetElapsedTimeMs()));
+ }
+
+ m_WriteChunkStats.WriteChunksElapsedWallTimeUs = WriteTimer.GetElapsedTimeUs();
+ m_WriteChunkStats.DownloadTimeUs = FilteredDownloadedBytesPerSecond.GetElapsedTimeUS();
+ m_WriteChunkStats.WriteTimeUs = FilteredWrittenBytesPerSecond.GetElapsedTimeUS();
+ }
+
+ if (m_Options.PrimeCacheOnly)
+ {
+ return;
+ }
+
+ tsl::robin_map<uint32_t, uint32_t> RemotePathIndexToLocalPathIndex;
+ RemotePathIndexToLocalPathIndex.reserve(m_RemoteContent.Paths.size());
+
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> SequenceHashToLocalPathIndex;
+ std::vector<uint32_t> RemoveLocalPathIndexes;
+
+ if (m_AbortFlag)
+ {
+ return;
+ }
+
+ {
+ ZEN_TRACE_CPU("PrepareTarget");
+
+ tsl::robin_set<IoHash, IoHash::Hasher> CachedRemoteSequences;
+ tsl::robin_map<std::string, uint32_t> RemotePathToRemoteIndex;
+ RemotePathToRemoteIndex.reserve(m_RemoteContent.Paths.size());
+ for (uint32_t RemotePathIndex = 0; RemotePathIndex < m_RemoteContent.Paths.size(); RemotePathIndex++)
+ {
+ RemotePathToRemoteIndex.insert({m_RemoteContent.Paths[RemotePathIndex].generic_string(), RemotePathIndex});
+ }
+
+ std::vector<uint32_t> FilesToCache;
+
+ uint64_t MatchCount = 0;
+ uint64_t PathMismatchCount = 0;
+ uint64_t HashMismatchCount = 0;
+ std::atomic<uint64_t> CachedCount = 0;
+ std::atomic<uint64_t> CachedByteCount = 0;
+ uint64_t SkippedCount = 0;
+ uint64_t DeleteCount = 0;
+ for (uint32_t LocalPathIndex = 0; LocalPathIndex < m_LocalContent.Paths.size(); LocalPathIndex++)
+ {
+ if (m_AbortFlag)
+ {
+ break;
+ }
+ const IoHash& RawHash = m_LocalContent.RawHashes[LocalPathIndex];
+ const std::filesystem::path& LocalPath = m_LocalContent.Paths[LocalPathIndex];
+
+ ZEN_ASSERT_SLOW(IsFile((m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred()));
+
+ if (m_Options.EnableTargetFolderScavenging)
+ {
+ if (!m_Options.WipeTargetFolder)
+ {
+ if (auto RemotePathIt = RemotePathToRemoteIndex.find(LocalPath.generic_string());
+ RemotePathIt != RemotePathToRemoteIndex.end())
+ {
+ const uint32_t RemotePathIndex = RemotePathIt->second;
+ if (m_RemoteContent.RawHashes[RemotePathIndex] == RawHash)
+ {
+ // It is already in it's desired place
+ RemotePathIndexToLocalPathIndex[RemotePathIndex] = LocalPathIndex;
+ SequenceHashToLocalPathIndex.insert({RawHash, LocalPathIndex});
+ MatchCount++;
+ continue;
+ }
+ else
+ {
+ HashMismatchCount++;
+ }
+ }
+ else
+ {
+ PathMismatchCount++;
+ }
+ }
+ if (m_RemoteLookup.RawHashToSequenceIndex.contains(RawHash))
+ {
+ if (!CachedRemoteSequences.contains(RawHash))
+ {
+ ZEN_TRACE_CPU("MoveToCache");
+ // We need it
+ FilesToCache.push_back(LocalPathIndex);
+ CachedRemoteSequences.insert(RawHash);
+ }
+ else
+ {
+ // We already have it
+ SkippedCount++;
+ }
+ }
+ else if (!m_Options.WipeTargetFolder)
+ {
+ // We don't need it
+ RemoveLocalPathIndexes.push_back(LocalPathIndex);
+ DeleteCount++;
+ }
+ }
+ else
+ {
+ // Delete local file as we did not scavenge the folder
+ RemoveLocalPathIndexes.push_back(LocalPathIndex);
+ DeleteCount++;
+ }
+ }
+
+ if (m_AbortFlag)
+ {
+ return;
+ }
+
+ {
+ ZEN_TRACE_CPU("CopyToCache");
+
+ Stopwatch Timer;
+
+ std::unique_ptr<BuildOpLogOutput::ProgressBar> CacheLocalProgressBarPtr(m_LogOutput.CreateProgressBar("Cache Local Data"));
+ BuildOpLogOutput::ProgressBar& CacheLocalProgressBar(*CacheLocalProgressBarPtr);
+ ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ for (uint32_t LocalPathIndex : FilesToCache)
+ {
+ if (m_AbortFlag)
+ {
+ break;
+ }
+ Work.ScheduleWork(m_IOWorkerPool, [this, &CachedCount, &CachedByteCount, LocalPathIndex](std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_CopyToCache");
+
+ const IoHash& RawHash = m_LocalContent.RawHashes[LocalPathIndex];
+ const std::filesystem::path& LocalPath = m_LocalContent.Paths[LocalPathIndex];
+ const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RawHash);
+ ZEN_ASSERT_SLOW(!IsFileWithRetry(CacheFilePath));
+ const std::filesystem::path LocalFilePath = (m_Path / LocalPath).make_preferred();
+ RenameFileWithRetry(m_LogOutput, LocalFilePath, CacheFilePath);
+ CachedCount++;
+ CachedByteCount += m_LocalContent.RawSizes[LocalPathIndex];
+ }
+ });
+ }
+
+ {
+ ZEN_TRACE_CPU("CopyToCache_Wait");
+
+ Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
+ const uint64_t WorkTotal = FilesToCache.size();
+ const uint64_t WorkComplete = CachedCount.load();
+ std::string Details = fmt::format("{}/{} ({}) files", WorkComplete, WorkTotal, NiceBytes(CachedByteCount));
+ CacheLocalProgressBar.UpdateState(
+ {.Task = "Caching local ",
+ .Details = Details,
+ .TotalCount = gsl::narrow<uint64_t>(WorkTotal),
+ .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete),
+ .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ });
+ }
+
+ CacheLocalProgressBar.Finish();
+ if (m_AbortFlag)
+ {
+ return;
+ }
+
+ LOG_OUTPUT_DEBUG(m_LogOutput,
+ "Local state prep: Match: {}, PathMismatch: {}, HashMismatch: {}, Cached: {} ({}), Skipped: {}, "
+ "Delete: {}",
+ MatchCount,
+ PathMismatchCount,
+ HashMismatchCount,
+ CachedCount.load(),
+ NiceBytes(CachedByteCount.load()),
+ SkippedCount,
+ DeleteCount);
+ }
+ }
+
+ if (m_Options.WipeTargetFolder)
+ {
+ ZEN_TRACE_CPU("WipeTarget");
+ Stopwatch Timer;
+
+ // Clean target folder
+ if (!CleanDirectory(m_IOWorkerPool,
+ m_AbortFlag,
+ m_PauseFlag,
+ m_LogOutput,
+ m_Options.IsQuiet,
+ m_Path,
+ m_Options.DefaultExcludeFolders))
+ {
+ LOG_OUTPUT_WARN(m_LogOutput, "Some files in {} could not be removed", m_Path);
+ }
+ m_RebuildFolderStateStats.CleanFolderElapsedWallTimeUs = Timer.GetElapsedTimeUs();
+ }
+
+ if (m_AbortFlag)
+ {
+ return;
+ }
+
+ {
+ ZEN_TRACE_CPU("FinalizeTree");
+ Stopwatch Timer;
+
+ std::unique_ptr<BuildOpLogOutput::ProgressBar> RebuildProgressBarPtr(m_LogOutput.CreateProgressBar("Rebuild State"));
+ BuildOpLogOutput::ProgressBar& RebuildProgressBar(*RebuildProgressBarPtr);
+ ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ OutLocalFolderState.Paths.resize(m_RemoteContent.Paths.size());
+ OutLocalFolderState.RawSizes.resize(m_RemoteContent.Paths.size());
+ OutLocalFolderState.Attributes.resize(m_RemoteContent.Paths.size());
+ OutLocalFolderState.ModificationTicks.resize(m_RemoteContent.Paths.size());
+
+ std::atomic<uint64_t> DeletedCount = 0;
+
+ for (uint32_t LocalPathIndex : RemoveLocalPathIndexes)
+ {
+ if (m_AbortFlag)
+ {
+ break;
+ }
+ Work.ScheduleWork(m_IOWorkerPool, [this, &DeletedCount, LocalPathIndex](std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_RemoveFile");
+
+ const std::filesystem::path LocalFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred();
+ SetFileReadOnlyWithRetry(LocalFilePath, false);
+ RemoveFileWithRetry(LocalFilePath);
+ DeletedCount++;
+ }
+ });
+ }
+
+ std::atomic<uint64_t> TargetsComplete = 0;
+
+ struct FinalizeTarget
+ {
+ IoHash RawHash;
+ uint32_t RemotePathIndex;
+ };
+
+ std::vector<FinalizeTarget> Targets;
+ Targets.reserve(m_RemoteContent.Paths.size());
+ for (uint32_t RemotePathIndex = 0; RemotePathIndex < m_RemoteContent.Paths.size(); RemotePathIndex++)
+ {
+ Targets.push_back(FinalizeTarget{.RawHash = m_RemoteContent.RawHashes[RemotePathIndex], .RemotePathIndex = RemotePathIndex});
+ }
+ std::sort(Targets.begin(), Targets.end(), [](const FinalizeTarget& Lhs, const FinalizeTarget& Rhs) {
+ if (Lhs.RawHash < Rhs.RawHash)
+ {
+ return true;
+ }
+ else if (Lhs.RawHash > Rhs.RawHash)
+ {
+ return false;
+ }
+ return Lhs.RemotePathIndex < Rhs.RemotePathIndex;
+ });
+
+ size_t TargetOffset = 0;
+ while (TargetOffset < Targets.size())
+ {
+ if (m_AbortFlag)
+ {
+ break;
+ }
+
+ size_t TargetCount = 1;
+ while ((TargetOffset + TargetCount) < Targets.size() &&
+ (Targets[TargetOffset + TargetCount].RawHash == Targets[TargetOffset].RawHash))
+ {
+ TargetCount++;
+ }
+
+ Work.ScheduleWork(
+ m_IOWorkerPool,
+ [this,
+ &SequenceHashToLocalPathIndex,
+ &Targets,
+ &RemotePathIndexToLocalPathIndex,
+ &OutLocalFolderState,
+ BaseTargetOffset = TargetOffset,
+ TargetCount,
+ &TargetsComplete](std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_FinalizeChunkSequence");
+
+ size_t TargetOffset = BaseTargetOffset;
+ const IoHash& RawHash = Targets[TargetOffset].RawHash;
+
+ if (RawHash == IoHash::Zero)
+ {
+ ZEN_TRACE_CPU("CreateEmptyFiles");
+ while (TargetOffset < (BaseTargetOffset + TargetCount))
+ {
+ const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex;
+ ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash);
+ const std::filesystem::path& TargetPath = m_RemoteContent.Paths[RemotePathIndex];
+ std::filesystem::path TargetFilePath = (m_Path / TargetPath).make_preferred();
+ if (!RemotePathIndexToLocalPathIndex[RemotePathIndex])
+ {
+ if (IsFileWithRetry(TargetFilePath))
+ {
+ SetFileReadOnlyWithRetry(TargetFilePath, false);
+ }
+ else
+ {
+ CreateDirectories(TargetFilePath.parent_path());
+ }
+ BasicFile OutputFile;
+ OutputFile.Open(TargetFilePath, BasicFile::Mode::kTruncate);
+ }
+ OutLocalFolderState.Paths[RemotePathIndex] = TargetPath;
+ OutLocalFolderState.RawSizes[RemotePathIndex] = m_RemoteContent.RawSizes[RemotePathIndex];
+
+ OutLocalFolderState.Attributes[RemotePathIndex] =
+ m_RemoteContent.Attributes.empty()
+ ? GetNativeFileAttributes(TargetFilePath)
+ : SetNativeFileAttributes(TargetFilePath,
+ m_RemoteContent.Platform,
+ m_RemoteContent.Attributes[RemotePathIndex]);
+ OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath);
+
+ TargetOffset++;
+ TargetsComplete++;
+ }
+ }
+ else
+ {
+ ZEN_TRACE_CPU("FinalizeFile");
+ ZEN_ASSERT(m_RemoteLookup.RawHashToSequenceIndex.contains(RawHash));
+ const uint32_t FirstRemotePathIndex = Targets[TargetOffset].RemotePathIndex;
+ const std::filesystem::path& FirstTargetPath = m_RemoteContent.Paths[FirstRemotePathIndex];
+ std::filesystem::path FirstTargetFilePath = (m_Path / FirstTargetPath).make_preferred();
+
+ if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(FirstRemotePathIndex);
+ InPlaceIt != RemotePathIndexToLocalPathIndex.end())
+ {
+ ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath));
+ }
+ else
+ {
+ if (IsFileWithRetry(FirstTargetFilePath))
+ {
+ SetFileReadOnlyWithRetry(FirstTargetFilePath, false);
+ }
+ else
+ {
+ CreateDirectories(FirstTargetFilePath.parent_path());
+ }
+
+ if (auto InplaceIt = SequenceHashToLocalPathIndex.find(RawHash);
+ InplaceIt != SequenceHashToLocalPathIndex.end())
+ {
+ ZEN_TRACE_CPU("Copy");
+ const uint32_t LocalPathIndex = InplaceIt->second;
+ const std::filesystem::path& SourcePath = m_LocalContent.Paths[LocalPathIndex];
+ std::filesystem::path SourceFilePath = (m_Path / SourcePath).make_preferred();
+ ZEN_ASSERT_SLOW(IsFileWithRetry(SourceFilePath));
+
+ LOG_OUTPUT_DEBUG(m_LogOutput, "Copying from '{}' -> '{}'", SourceFilePath, FirstTargetFilePath);
+ const uint64_t RawSize = m_LocalContent.RawSizes[LocalPathIndex];
+ std::atomic<uint64_t> WriteCount;
+ std::atomic<uint64_t> WriteByteCount;
+ std::atomic<uint64_t> CloneCount;
+ std::atomic<uint64_t> CloneByteCount;
+ CopyFile(m_Options.AllowFileClone,
+ m_Options.UseSparseFiles,
+ SourceFilePath,
+ FirstTargetFilePath,
+ RawSize,
+ WriteCount,
+ WriteByteCount,
+ CloneCount,
+ CloneByteCount);
+ m_RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++;
+ }
+ else
+ {
+ ZEN_TRACE_CPU("Rename");
+ const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RawHash);
+ ZEN_ASSERT_SLOW(IsFileWithRetry(CacheFilePath));
+
+ RenameFileWithRetry(m_LogOutput, CacheFilePath, FirstTargetFilePath);
+
+ m_RebuildFolderStateStats.FinalizeTreeFilesMovedCount++;
+ }
+ }
+
+ OutLocalFolderState.Paths[FirstRemotePathIndex] = FirstTargetPath;
+ OutLocalFolderState.RawSizes[FirstRemotePathIndex] = m_RemoteContent.RawSizes[FirstRemotePathIndex];
+
+ OutLocalFolderState.Attributes[FirstRemotePathIndex] =
+ m_RemoteContent.Attributes.empty()
+ ? GetNativeFileAttributes(FirstTargetFilePath)
+ : SetNativeFileAttributes(FirstTargetFilePath,
+ m_RemoteContent.Platform,
+ m_RemoteContent.Attributes[FirstRemotePathIndex]);
+ OutLocalFolderState.ModificationTicks[FirstRemotePathIndex] = GetModificationTickFromPath(FirstTargetFilePath);
+
+ TargetOffset++;
+ TargetsComplete++;
+
+ while (TargetOffset < (BaseTargetOffset + TargetCount))
+ {
+ const uint32_t RemotePathIndex = Targets[TargetOffset].RemotePathIndex;
+ ZEN_ASSERT(Targets[TargetOffset].RawHash == RawHash);
+ const std::filesystem::path& TargetPath = m_RemoteContent.Paths[RemotePathIndex];
+ std::filesystem::path TargetFilePath = (m_Path / TargetPath).make_preferred();
+
+ if (auto InPlaceIt = RemotePathIndexToLocalPathIndex.find(RemotePathIndex);
+ InPlaceIt != RemotePathIndexToLocalPathIndex.end())
+ {
+ ZEN_ASSERT_SLOW(IsFileWithRetry(TargetFilePath));
+ }
+ else
+ {
+ ZEN_TRACE_CPU("Copy");
+ if (IsFileWithRetry(TargetFilePath))
+ {
+ SetFileReadOnlyWithRetry(TargetFilePath, false);
+ }
+ else
+ {
+ CreateDirectories(TargetFilePath.parent_path());
+ }
+
+ ZEN_ASSERT_SLOW(IsFileWithRetry(FirstTargetFilePath));
+ LOG_OUTPUT_DEBUG(m_LogOutput, "Copying from '{}' -> '{}'", FirstTargetFilePath, TargetFilePath);
+ const uint64_t RawSize = m_RemoteContent.RawSizes[RemotePathIndex];
+ std::atomic<uint64_t> WriteCount;
+ std::atomic<uint64_t> WriteByteCount;
+ std::atomic<uint64_t> CloneCount;
+ std::atomic<uint64_t> CloneByteCount;
+ CopyFile(m_Options.AllowFileClone,
+ m_Options.UseSparseFiles,
+ FirstTargetFilePath,
+ TargetFilePath,
+ RawSize,
+ WriteCount,
+ WriteByteCount,
+ CloneCount,
+ CloneByteCount);
+ m_RebuildFolderStateStats.FinalizeTreeFilesCopiedCount++;
+ }
+
+ OutLocalFolderState.Paths[RemotePathIndex] = TargetPath;
+ OutLocalFolderState.RawSizes[RemotePathIndex] = m_RemoteContent.RawSizes[RemotePathIndex];
+
+ OutLocalFolderState.Attributes[RemotePathIndex] =
+ m_RemoteContent.Attributes.empty()
+ ? GetNativeFileAttributes(TargetFilePath)
+ : SetNativeFileAttributes(TargetFilePath,
+ m_RemoteContent.Platform,
+ m_RemoteContent.Attributes[RemotePathIndex]);
+ OutLocalFolderState.ModificationTicks[RemotePathIndex] = GetModificationTickFromPath(TargetFilePath);
+
+ TargetOffset++;
+ TargetsComplete++;
+ }
+ }
+ }
+ });
+
+ TargetOffset += TargetCount;
+ }
+
+ {
+ ZEN_TRACE_CPU("FinalizeTree_Wait");
+
+ Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
+ const uint64_t WorkTotal = Targets.size() + RemoveLocalPathIndexes.size();
+ const uint64_t WorkComplete = TargetsComplete.load() + DeletedCount.load();
+ std::string Details = fmt::format("{}/{} files", WorkComplete, WorkTotal);
+ RebuildProgressBar.UpdateState({.Task = "Rebuilding state ",
+ .Details = Details,
+ .TotalCount = gsl::narrow<uint64_t>(WorkTotal),
+ .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete),
+ .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ });
+ }
+
+ m_RebuildFolderStateStats.FinalizeTreeElapsedWallTimeUs = Timer.GetElapsedTimeUs();
+ RebuildProgressBar.Finish();
+ }
+}
+
+void
+BuildsOperationUpdateFolder::ScanCacheFolder(tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& OutCachedChunkHashesFound,
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& OutCachedSequenceHashesFound)
+{
+ ZEN_TRACE_CPU("ScanCacheFolder");
+
+ Stopwatch CacheTimer;
+
+ DirectoryContent CacheDirContent;
+ GetDirectoryContent(m_CacheFolderPath, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeFileSizes, CacheDirContent);
+ for (size_t Index = 0; Index < CacheDirContent.Files.size(); Index++)
+ {
+ if (m_Options.EnableTargetFolderScavenging)
+ {
+ IoHash FileHash;
+ if (IoHash::TryParse(CacheDirContent.Files[Index].filename().string(), FileHash))
+ {
+ if (auto ChunkIt = m_RemoteLookup.ChunkHashToChunkIndex.find(FileHash);
+ ChunkIt != m_RemoteLookup.ChunkHashToChunkIndex.end())
+ {
+ const uint32_t ChunkIndex = ChunkIt->second;
+ const uint64_t ChunkSize = m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex];
+ if (ChunkSize == CacheDirContent.FileSizes[Index])
+ {
+ OutCachedChunkHashesFound.insert({FileHash, ChunkIndex});
+ m_CacheMappingStats.CacheChunkCount++;
+ m_CacheMappingStats.CacheChunkByteCount += ChunkSize;
+ continue;
+ }
+ }
+ else if (auto SequenceIt = m_RemoteLookup.RawHashToSequenceIndex.find(FileHash);
+ SequenceIt != m_RemoteLookup.RawHashToSequenceIndex.end())
+ {
+ const uint32_t SequenceIndex = SequenceIt->second;
+ const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex];
+ const uint64_t SequenceSize = m_RemoteContent.RawSizes[PathIndex];
+ if (SequenceSize == CacheDirContent.FileSizes[Index])
+ {
+ OutCachedSequenceHashesFound.insert({FileHash, SequenceIndex});
+ m_CacheMappingStats.CacheSequenceHashesCount++;
+ m_CacheMappingStats.CacheSequenceHashesByteCount += SequenceSize;
+
+ const std::filesystem::path CacheFilePath =
+ GetFinalChunkedSequenceFileName(m_CacheFolderPath,
+ m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
+ ZEN_ASSERT_SLOW(IsFile(CacheFilePath));
+
+ continue;
+ }
+ }
+ }
+ }
+ TryRemoveFile(m_LogOutput, CacheDirContent.Files[Index]);
+ }
+ m_CacheMappingStats.CacheScanElapsedWallTimeUs += CacheTimer.GetElapsedTimeUs();
+}
+
+void
+BuildsOperationUpdateFolder::ScanTempBlocksFolder(tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& OutCachedBlocksFound)
+{
+ ZEN_TRACE_CPU("ScanTempBlocksFolder");
+
+ Stopwatch CacheTimer;
+
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> AllBlockSizes;
+ AllBlockSizes.reserve(m_BlockDescriptions.size());
+ for (uint32_t BlockIndex = 0; BlockIndex < m_BlockDescriptions.size(); BlockIndex++)
+ {
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
+ AllBlockSizes.insert({BlockDescription.BlockHash, BlockIndex});
+ }
+
+ DirectoryContent BlockDirContent;
+ GetDirectoryContent(ZenTempBlockFolderPath(m_Options.ZenFolderPath),
+ DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeFileSizes,
+ BlockDirContent);
+ OutCachedBlocksFound.reserve(BlockDirContent.Files.size());
+ for (size_t Index = 0; Index < BlockDirContent.Files.size(); Index++)
+ {
+ if (m_Options.EnableTargetFolderScavenging)
+ {
+ IoHash FileHash;
+ if (IoHash::TryParse(BlockDirContent.Files[Index].filename().string(), FileHash))
+ {
+ if (auto BlockIt = AllBlockSizes.find(FileHash); BlockIt != AllBlockSizes.end())
+ {
+ const uint32_t BlockIndex = BlockIt->second;
+ const ChunkBlockDescription& BlockDescription = m_BlockDescriptions[BlockIndex];
+ uint64_t BlockSize = CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize;
+ for (uint64_t ChunkSize : BlockDescription.ChunkCompressedLengths)
+ {
+ BlockSize += ChunkSize;
+ }
+
+ if (BlockSize == BlockDirContent.FileSizes[Index])
+ {
+ OutCachedBlocksFound.insert({FileHash, BlockIndex});
+ m_CacheMappingStats.CacheBlockCount++;
+ m_CacheMappingStats.CacheBlocksByteCount += BlockSize;
+ continue;
+ }
+ }
+ }
+ }
+ TryRemoveFile(m_LogOutput, BlockDirContent.Files[Index]);
+ }
+
+ m_CacheMappingStats.CacheScanElapsedWallTimeUs += CacheTimer.GetElapsedTimeUs();
+}
+
+std::vector<BuildsOperationUpdateFolder::ScavengeSource>
+BuildsOperationUpdateFolder::FindScavengeSources()
+{
+ ZEN_TRACE_CPU("FindScavengeSources");
+
+ std::vector<ScavengeSource> Result;
+ DirectoryContent Content;
+ GetDirectoryContent(m_Options.SystemRootDir / "builds" / "downloads", DirectoryContentFlags::IncludeFiles, Content);
+ for (const std::filesystem::path& EntryPath : Content.Files)
+ {
+ bool DeleteEntry = false;
+ IoHash EntryPathHash;
+ if (IoHash::TryParse(EntryPath.stem().string(), EntryPathHash))
+ {
+ // Read state and verify that it is valid
+ IoBuffer MetaDataJson = ReadFile(EntryPath).Flatten();
+ std::string_view Json(reinterpret_cast<const char*>(MetaDataJson.GetData()), MetaDataJson.GetSize());
+ std::string JsonError;
+ CbObject DownloadInfo = LoadCompactBinaryFromJson(Json, JsonError).AsObject();
+ if (JsonError.empty())
+ {
+ std::filesystem::path StateFilePath = DownloadInfo["statePath"].AsU8String();
+ if (IsFile(StateFilePath))
+ {
+ std::filesystem::path Path = DownloadInfo["path"].AsU8String();
+ if (!std::filesystem::equivalent(Path, m_Path))
+ {
+ if (IsDir(Path))
+ {
+ Result.push_back({.StateFilePath = std::move(StateFilePath), .Path = std::move(Path)});
+ }
+ else
+ {
+ DeleteEntry = true;
+ }
+ }
+ }
+ else
+ {
+ DeleteEntry = true;
+ }
+ }
+ else
+ {
+ LOG_OUTPUT_WARN(m_LogOutput, "Invalid download state file at {}. '{}'", EntryPath, JsonError);
+ DeleteEntry = true;
+ }
+ }
+
+ if (DeleteEntry)
+ {
+ std::error_code DummyEc;
+ std::filesystem::remove(EntryPath, DummyEc);
+ }
+ }
+ return Result;
+}
+
+std::vector<uint32_t>
+BuildsOperationUpdateFolder::ScanTargetFolder(const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& CachedChunkHashesFound,
+ const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& CachedSequenceHashesFound)
+{
+ ZEN_TRACE_CPU("ScanTargetFolder");
+
+ Stopwatch LocalTimer;
+
+ std::vector<uint32_t> MissingSequenceIndexes;
+
+ for (uint32_t RemoteSequenceIndex = 0; RemoteSequenceIndex < m_RemoteContent.ChunkedContent.SequenceRawHashes.size();
+ RemoteSequenceIndex++)
+ {
+ const IoHash& RemoteSequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ const uint32_t RemotePathIndex = GetFirstPathIndexForSeqeuenceIndex(m_RemoteLookup, RemoteSequenceIndex);
+ const uint64_t RemoteRawSize = m_RemoteContent.RawSizes[RemotePathIndex];
+ if (auto CacheSequenceIt = CachedSequenceHashesFound.find(RemoteSequenceRawHash);
+ CacheSequenceIt != CachedSequenceHashesFound.end())
+ {
+ const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RemoteSequenceRawHash);
+ ZEN_ASSERT_SLOW(IsFile(CacheFilePath));
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput, "Found sequence {} at {} ({})", RemoteSequenceRawHash, CacheFilePath, NiceBytes(RemoteRawSize));
+ }
+ }
+ else if (auto CacheChunkIt = CachedChunkHashesFound.find(RemoteSequenceRawHash); CacheChunkIt != CachedChunkHashesFound.end())
+ {
+ const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RemoteSequenceRawHash);
+ ZEN_ASSERT_SLOW(IsFile(CacheFilePath));
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput, "Found chunk {} at {} ({})", RemoteSequenceRawHash, CacheFilePath, NiceBytes(RemoteRawSize));
+ }
+ }
+ else if (auto It = m_LocalLookup.RawHashToSequenceIndex.find(RemoteSequenceRawHash);
+ It != m_LocalLookup.RawHashToSequenceIndex.end())
+ {
+ const uint32_t LocalSequenceIndex = It->second;
+ const uint32_t LocalPathIndex = GetFirstPathIndexForSeqeuenceIndex(m_LocalLookup, LocalSequenceIndex);
+ const std::filesystem::path LocalFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred();
+ ZEN_ASSERT_SLOW(IsFile(LocalFilePath));
+ m_CacheMappingStats.LocalPathsMatchingSequencesCount++;
+ m_CacheMappingStats.LocalPathsMatchingSequencesByteCount += RemoteRawSize;
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput, "Found sequence {} at {} ({})", RemoteSequenceRawHash, LocalFilePath, NiceBytes(RemoteRawSize));
+ }
+ }
+ else
+ {
+ MissingSequenceIndexes.push_back(RemoteSequenceIndex);
+ }
+ }
+
+ m_CacheMappingStats.LocalScanElapsedWallTimeUs += LocalTimer.GetElapsedTimeUs();
+ return MissingSequenceIndexes;
+}
+
+bool
+BuildsOperationUpdateFolder::FindScavengeContent(const ScavengeSource& Source,
+ ChunkedFolderContent& OutScavengedLocalContent,
+ ChunkedContentLookup& OutScavengedLookup)
+{
+ ZEN_TRACE_CPU("FindScavengeContent");
+
+ FolderContent LocalFolderState;
+ if (!ReadStateFile(m_LogOutput, Source.StateFilePath, LocalFolderState, OutScavengedLocalContent))
+ {
+ return false;
+ }
+ if (!IsDir(Source.Path))
+ {
+ return false;
+ }
+
+ OutScavengedLookup = BuildChunkedContentLookup(OutScavengedLocalContent);
+
+ std::vector<uint32_t> PathIndexesToScavange;
+ uint32_t ScavengedStatePathCount = gsl::narrow<uint32_t>(OutScavengedLocalContent.Paths.size());
+ PathIndexesToScavange.reserve(ScavengedStatePathCount);
+ for (uint32_t ScavengedStatePathIndex = 0; ScavengedStatePathIndex < ScavengedStatePathCount; ScavengedStatePathIndex++)
+ {
+ const IoHash& SequenceHash = OutScavengedLocalContent.RawHashes[ScavengedStatePathIndex];
+ if (auto ScavengeSequenceIt = OutScavengedLookup.RawHashToSequenceIndex.find(SequenceHash);
+ ScavengeSequenceIt != OutScavengedLookup.RawHashToSequenceIndex.end())
+ {
+ const uint32_t ScavengeSequenceIndex = ScavengeSequenceIt->second;
+ if (m_RemoteLookup.RawHashToSequenceIndex.contains(SequenceHash))
+ {
+ PathIndexesToScavange.push_back(ScavengedStatePathIndex);
+ }
+ else
+ {
+ const uint32_t ScavengeChunkCount = OutScavengedLocalContent.ChunkedContent.ChunkCounts[ScavengeSequenceIndex];
+ for (uint32_t ScavengeChunkIndexOffset = 0; ScavengeChunkIndexOffset < ScavengeChunkCount; ScavengeChunkIndexOffset++)
+ {
+ const size_t ScavengeChunkOrderIndex =
+ OutScavengedLookup.ChunkSequenceLocationOffset[ScavengeSequenceIndex] + ScavengeChunkIndexOffset;
+ const uint32_t ScavengeChunkIndex = OutScavengedLocalContent.ChunkedContent.ChunkOrders[ScavengeChunkOrderIndex];
+ const IoHash& ScavengeChunkHash = OutScavengedLocalContent.ChunkedContent.ChunkHashes[ScavengeChunkIndex];
+ if (m_RemoteLookup.ChunkHashToChunkIndex.contains(ScavengeChunkHash))
+ {
+ PathIndexesToScavange.push_back(ScavengedStatePathIndex);
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ if (PathIndexesToScavange.empty())
+ {
+ OutScavengedLookup = {};
+ OutScavengedLocalContent = {};
+ return false;
+ }
+
+ std::vector<std::filesystem::path> PathsToScavenge;
+ PathsToScavenge.reserve(PathIndexesToScavange.size());
+ for (uint32_t ScavengedStatePathIndex : PathIndexesToScavange)
+ {
+ PathsToScavenge.push_back(OutScavengedLocalContent.Paths[ScavengedStatePathIndex]);
+ }
+
+ FolderContent ValidFolderContent = GetValidFolderContent(m_ScavengedFolderScanStats, Source.Path, PathsToScavenge, {});
+
+ if (!LocalFolderState.AreKnownFilesEqual(ValidFolderContent))
+ {
+ std::vector<std::filesystem::path> DeletedPaths;
+ FolderContent UpdatedContent = GetUpdatedContent(LocalFolderState, ValidFolderContent, DeletedPaths);
+
+ // If the files are modified since the state was saved we ignore the files since we don't
+ // want to incur the cost of scanning/hashing scavenged files
+ DeletedPaths.insert(DeletedPaths.end(), UpdatedContent.Paths.begin(), UpdatedContent.Paths.end());
+ if (!DeletedPaths.empty())
+ {
+ OutScavengedLocalContent = DeletePathsFromChunkedContent(OutScavengedLocalContent, OutScavengedLookup, DeletedPaths);
+ OutScavengedLookup = BuildChunkedContentLookup(OutScavengedLocalContent);
+ }
+ }
+
+ if (OutScavengedLocalContent.Paths.empty())
+ {
+ OutScavengedLookup = {};
+ OutScavengedLocalContent = {};
+ return false;
+ }
+
+ return true;
+}
+
+std::filesystem::path
+BuildsOperationUpdateFolder::FindDownloadedChunk(const IoHash& ChunkHash)
+{
+ ZEN_TRACE_CPU("FindDownloadedChunk");
+
+ std::filesystem::path CompressedChunkPath = ZenTempDownloadFolderPath(m_Options.ZenFolderPath) / ChunkHash.ToHexString();
+ if (IsFile(CompressedChunkPath))
+ {
+ IoBuffer ExistingCompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath);
+ if (ExistingCompressedPart)
+ {
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (CompressedBuffer::ValidateCompressedHeader(ExistingCompressedPart, RawHash, RawSize))
+ {
+ return CompressedChunkPath;
+ }
+ else
+ {
+ std::error_code DummyEc;
+ RemoveFile(CompressedChunkPath, DummyEc);
+ }
+ }
+ }
+ return {};
+}
+
+FolderContent
+BuildsOperationUpdateFolder::GetValidFolderContent(GetFolderContentStatistics& LocalFolderScanStats,
+ const std::filesystem::path& Path,
+ std::span<const std::filesystem::path> PathsToCheck,
+ std::function<void(uint64_t PathCount, uint64_t CompletedPathCount)>&& ProgressCallback)
+{
+ ZEN_TRACE_CPU("GetValidFolderContent");
+
+ FolderContent Result;
+ const uint32_t PathCount = gsl::narrow<uint32_t>(PathsToCheck.size());
+
+ Result.Paths.resize(PathCount);
+ Result.RawSizes.resize(PathCount);
+ Result.Attributes.resize(PathCount);
+ Result.ModificationTicks.resize(PathCount);
+
+ {
+ Stopwatch Timer;
+ auto _ = MakeGuard([&LocalFolderScanStats, &Timer]() { LocalFolderScanStats.ElapsedWallTimeUS = Timer.GetElapsedTimeUs(); });
+
+ ParallelWork Work(m_AbortFlag,
+ m_PauseFlag,
+ ProgressCallback ? WorkerThreadPool::EMode::EnableBacklog : WorkerThreadPool::EMode::DisableBacklog);
+ std::atomic<uint64_t> CompletedPathCount = 0;
+ uint32_t PathIndex = 0;
+
+ while (PathIndex < PathCount)
+ {
+ uint32_t PathRangeCount = Min(128u, PathCount - PathIndex);
+ Work.ScheduleWork(m_IOWorkerPool,
+ [PathIndex, PathRangeCount, &PathsToCheck, &Path, &Result, &CompletedPathCount, &LocalFolderScanStats](
+ std::atomic<bool>& AbortFlag) {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_ValidateFiles");
+
+ for (uint32_t PathRangeIndex = PathIndex; PathRangeIndex < PathIndex + PathRangeCount;
+ PathRangeIndex++)
+ {
+ const std::filesystem::path& FilePath = PathsToCheck[PathRangeIndex];
+ std::filesystem::path LocalFilePath = (Path / FilePath).make_preferred();
+ if (TryGetFileProperties(LocalFilePath,
+ Result.RawSizes[PathRangeIndex],
+ Result.ModificationTicks[PathRangeIndex],
+ Result.Attributes[PathRangeIndex]))
+ {
+ Result.Paths[PathRangeIndex] = std::move(FilePath);
+ LocalFolderScanStats.FoundFileCount++;
+ LocalFolderScanStats.FoundFileByteCount += Result.RawSizes[PathRangeIndex];
+ LocalFolderScanStats.AcceptedFileCount++;
+ LocalFolderScanStats.AcceptedFileByteCount += Result.RawSizes[PathRangeIndex];
+ }
+ CompletedPathCount++;
+ }
+ }
+ });
+ PathIndex += PathRangeCount;
+ }
+ Work.Wait(200, [&](bool, bool, ptrdiff_t) {
+ if (ProgressCallback)
+ {
+ ProgressCallback(PathCount, CompletedPathCount.load());
+ }
+ });
+ }
+
+ uint32_t WritePathIndex = 0;
+ for (uint32_t ReadPathIndex = 0; ReadPathIndex < PathCount; ReadPathIndex++)
+ {
+ if (!Result.Paths[ReadPathIndex].empty())
+ {
+ if (WritePathIndex < ReadPathIndex)
+ {
+ Result.Paths[WritePathIndex] = std::move(Result.Paths[ReadPathIndex]);
+ Result.RawSizes[WritePathIndex] = Result.RawSizes[ReadPathIndex];
+ Result.Attributes[WritePathIndex] = Result.Attributes[ReadPathIndex];
+ Result.ModificationTicks[WritePathIndex] = Result.ModificationTicks[ReadPathIndex];
+ }
+ WritePathIndex++;
+ }
+ }
+
+ Result.Paths.resize(WritePathIndex);
+ Result.RawSizes.resize(WritePathIndex);
+ Result.Attributes.resize(WritePathIndex);
+ Result.ModificationTicks.resize(WritePathIndex);
+ return Result;
+}
+
+std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>
+BuildsOperationUpdateFolder::GetRemainingChunkTargets(std::span<const std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ uint32_t ChunkIndex)
+{
+ ZEN_TRACE_CPU("GetRemainingChunkTargets");
+
+ std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkSources = GetChunkSequenceLocations(m_RemoteLookup, ChunkIndex);
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs;
+ if (!ChunkSources.empty())
+ {
+ ChunkTargetPtrs.reserve(ChunkSources.size());
+ for (const ChunkedContentLookup::ChunkSequenceLocation& Source : ChunkSources)
+ {
+ if (SequenceIndexChunksLeftToWriteCounters[Source.SequenceIndex].load() > 0)
+ {
+ ChunkTargetPtrs.push_back(&Source);
+ }
+ }
+ }
+ return ChunkTargetPtrs;
+};
+
+uint64_t
+BuildsOperationUpdateFolder::GetChunkWriteCount(std::span<const std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ uint32_t ChunkIndex)
+{
+ ZEN_TRACE_CPU("GetChunkWriteCount");
+
+ uint64_t WriteCount = 0;
+ std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkSources = GetChunkSequenceLocations(m_RemoteLookup, ChunkIndex);
+ for (const ChunkedContentLookup::ChunkSequenceLocation& Source : ChunkSources)
+ {
+ if (SequenceIndexChunksLeftToWriteCounters[Source.SequenceIndex].load() > 0)
+ {
+ WriteCount++;
+ }
+ }
+ return WriteCount;
+};
+
+void
+BuildsOperationUpdateFolder::WriteScavengedSequenceToCache(const std::filesystem::path& ScavengeRootPath,
+ const ChunkedFolderContent& ScavengedContent,
+ const ScavengedSequenceCopyOperation& ScavengeOp)
+{
+ ZEN_TRACE_CPU("WriteScavengedSequenceToCache");
+
+ const std::filesystem::path ScavengedPath = ScavengedContent.Paths[ScavengeOp.ScavengedPathIndex];
+ const std::filesystem::path ScavengedFilePath = (ScavengeRootPath / ScavengedPath).make_preferred();
+ ZEN_ASSERT_SLOW(FileSizeFromPath(ScavengedFilePath) == ScavengeOp.RawSize);
+
+ const IoHash& RemoteSequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[ScavengeOp.RemoteSequenceIndex];
+ const std::filesystem::path TempFilePath = GetTempChunkedSequenceFileName(m_CacheFolderPath, RemoteSequenceRawHash);
+
+ const uint64_t RawSize = ScavengedContent.RawSizes[ScavengeOp.ScavengedPathIndex];
+ CopyFile(m_Options.AllowFileClone,
+ m_Options.UseSparseFiles,
+ ScavengedFilePath,
+ TempFilePath,
+ RawSize,
+ m_DiskStats.WriteCount,
+ m_DiskStats.WriteByteCount,
+ m_DiskStats.CloneCount,
+ m_DiskStats.CloneByteCount);
+
+ const std::filesystem::path CacheFilePath = GetFinalChunkedSequenceFileName(m_CacheFolderPath, RemoteSequenceRawHash);
+ RenameFile(TempFilePath, CacheFilePath);
+}
+
+std::vector<uint32_t>
+BuildsOperationUpdateFolder::WriteLocalChunkToCache(CloneQueryInterface* CloneQuery,
+ const CopyChunkData& CopyData,
+ const std::vector<ChunkedFolderContent>& ScavengedContents,
+ const std::vector<ChunkedContentLookup>& ScavengedLookups,
+ const std::vector<std::filesystem::path>& ScavengedPaths,
+ BufferedWriteFileCache& WriteCache)
+{
+ ZEN_TRACE_CPU("WriteLocalChunkToCache");
+
+ std::filesystem::path SourceFilePath;
+
+ if (CopyData.ScavengeSourceIndex == (uint32_t)-1)
+ {
+ const uint32_t LocalPathIndex = m_LocalLookup.SequenceIndexFirstPathIndex[CopyData.SourceSequenceIndex];
+ SourceFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred();
+ }
+ else
+ {
+ const ChunkedFolderContent& ScavengedContent = ScavengedContents[CopyData.ScavengeSourceIndex];
+ const ChunkedContentLookup& ScavengedLookup = ScavengedLookups[CopyData.ScavengeSourceIndex];
+ const std::filesystem::path ScavengedPath = ScavengedPaths[CopyData.ScavengeSourceIndex];
+ const uint32_t ScavengedPathIndex = ScavengedLookup.SequenceIndexFirstPathIndex[CopyData.SourceSequenceIndex];
+ SourceFilePath = (ScavengedPath / ScavengedContent.Paths[ScavengedPathIndex]).make_preferred();
+ }
+ ZEN_ASSERT_SLOW(IsFile(SourceFilePath));
+ ZEN_ASSERT(!CopyData.TargetChunkLocationPtrs.empty());
+
+ uint64_t CacheLocalFileBytesRead = 0;
+
+ size_t TargetStart = 0;
+ const std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> AllTargets(CopyData.TargetChunkLocationPtrs);
+
+ struct WriteOp
+ {
+ const ChunkedContentLookup::ChunkSequenceLocation* Target = nullptr;
+ uint64_t CacheFileOffset = (uint64_t)-1;
+ uint32_t ChunkIndex = (uint32_t)-1;
+ };
+
+ std::vector<WriteOp> WriteOps;
+
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Sort");
+ WriteOps.reserve(AllTargets.size());
+ for (const CopyChunkData::ChunkTarget& ChunkTarget : CopyData.ChunkTargets)
+ {
+ std::span<const ChunkedContentLookup::ChunkSequenceLocation* const> TargetRange =
+ AllTargets.subspan(TargetStart, ChunkTarget.TargetChunkLocationCount);
+ for (const ChunkedContentLookup::ChunkSequenceLocation* Target : TargetRange)
+ {
+ WriteOps.push_back(
+ WriteOp{.Target = Target, .CacheFileOffset = ChunkTarget.CacheFileOffset, .ChunkIndex = ChunkTarget.RemoteChunkIndex});
+ }
+ TargetStart += ChunkTarget.TargetChunkLocationCount;
+ }
+
+ std::sort(WriteOps.begin(), WriteOps.end(), [](const WriteOp& Lhs, const WriteOp& Rhs) {
+ if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
+ {
+ return true;
+ }
+ else if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
+ {
+ return false;
+ }
+ if (Lhs.Target->Offset < Rhs.Target->Offset)
+ {
+ return true;
+ }
+ return false;
+ });
+ }
+
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Write");
+
+ tsl::robin_set<uint32_t> ChunkIndexesWritten;
+
+ BufferedOpenFile SourceFile(SourceFilePath,
+ m_DiskStats.OpenReadCount,
+ m_DiskStats.CurrentOpenFileCount,
+ m_DiskStats.ReadCount,
+ m_DiskStats.ReadByteCount);
+
+ bool CanCloneSource = CloneQuery && CloneQuery->CanClone(SourceFile.Handle());
+
+ BufferedWriteFileCache::Local LocalWriter(WriteCache);
+
+ for (size_t WriteOpIndex = 0; WriteOpIndex < WriteOps.size();)
+ {
+ if (m_AbortFlag)
+ {
+ break;
+ }
+ const WriteOp& Op = WriteOps[WriteOpIndex];
+
+ const uint32_t RemoteSequenceIndex = Op.Target->SequenceIndex;
+ const uint32_t RemotePathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex];
+ const uint64_t TargetSize = m_RemoteContent.RawSizes[RemotePathIndex];
+ const uint64_t ChunkSize = m_RemoteContent.ChunkedContent.ChunkRawSizes[Op.ChunkIndex];
+
+ uint64_t ReadLength = ChunkSize;
+ size_t WriteCount = 1;
+ uint64_t OpSourceEnd = Op.CacheFileOffset + ChunkSize;
+ uint64_t OpTargetEnd = Op.Target->Offset + ChunkSize;
+ while ((WriteOpIndex + WriteCount) < WriteOps.size())
+ {
+ const WriteOp& NextOp = WriteOps[WriteOpIndex + WriteCount];
+ if (NextOp.Target->SequenceIndex != Op.Target->SequenceIndex)
+ {
+ break;
+ }
+ if (NextOp.Target->Offset != OpTargetEnd)
+ {
+ break;
+ }
+ if (NextOp.CacheFileOffset != OpSourceEnd)
+ {
+ break;
+ }
+ const uint64_t NextChunkLength = m_RemoteContent.ChunkedContent.ChunkRawSizes[NextOp.ChunkIndex];
+ if (ReadLength + NextChunkLength > 512u * 1024u)
+ {
+ break;
+ }
+ ReadLength += NextChunkLength;
+ OpSourceEnd += NextChunkLength;
+ OpTargetEnd += NextChunkLength;
+ WriteCount++;
+ }
+
+ {
+ bool DidClone = false;
+
+ if (CanCloneSource)
+ {
+ uint64_t PreBytes = 0;
+ uint64_t PostBytes = 0;
+ uint64_t ClonableBytes =
+ CloneQuery->GetClonableRange(Op.CacheFileOffset, Op.Target->Offset, ReadLength, PreBytes, PostBytes);
+ if (ClonableBytes > 0)
+ {
+ // We need to open the file...
+ BufferedWriteFileCache::Local::Writer* Writer = LocalWriter.GetWriter(RemoteSequenceIndex);
+ if (!Writer)
+ {
+ Writer = LocalWriter.PutWriter(RemoteSequenceIndex, std::make_unique<BufferedWriteFileCache::Local::Writer>());
+
+ Writer->File = std::make_unique<BasicFile>();
+
+ const std::filesystem::path FileName =
+ GetTempChunkedSequenceFileName(m_CacheFolderPath,
+ m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex]);
+ Writer->File->Open(FileName, BasicFile::Mode::kWrite);
+ if (m_Options.UseSparseFiles)
+ {
+ PrepareFileForScatteredWrite(Writer->File->Handle(), TargetSize);
+ }
+ }
+ DidClone = CloneQuery->TryClone(SourceFile.Handle(),
+ Writer->File->Handle(),
+ Op.CacheFileOffset + PreBytes,
+ Op.Target->Offset + PreBytes,
+ ClonableBytes,
+ TargetSize);
+ if (DidClone)
+ {
+ m_DiskStats.WriteCount++;
+ m_DiskStats.WriteByteCount += ClonableBytes;
+
+ m_DiskStats.CloneCount++;
+ m_DiskStats.CloneByteCount += ClonableBytes;
+
+ if (PreBytes > 0)
+ {
+ CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, PreBytes);
+ const uint64_t FileOffset = Op.Target->Offset;
+
+ WriteSequenceChunkToCache(LocalWriter, ChunkSource, RemoteSequenceIndex, FileOffset, RemotePathIndex);
+ }
+ if (PostBytes > 0)
+ {
+ CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset + ReadLength - PostBytes, PostBytes);
+ const uint64_t FileOffset = Op.Target->Offset + ReadLength - PostBytes;
+
+ WriteSequenceChunkToCache(LocalWriter, ChunkSource, RemoteSequenceIndex, FileOffset, RemotePathIndex);
+ }
+ }
+ }
+ }
+
+ if (!DidClone)
+ {
+ CompositeBuffer ChunkSource = SourceFile.GetRange(Op.CacheFileOffset, ReadLength);
+
+ const uint64_t FileOffset = Op.Target->Offset;
+
+ WriteSequenceChunkToCache(LocalWriter, ChunkSource, RemoteSequenceIndex, FileOffset, RemotePathIndex);
+ }
+ }
+
+ CacheLocalFileBytesRead += ReadLength; // TODO: This should be the sum of unique chunk sizes?
+
+ WriteOpIndex += WriteCount;
+ }
+ }
+
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput, "Copied {} from {}", NiceBytes(CacheLocalFileBytesRead), SourceFilePath);
+ }
+
+ std::vector<uint32_t> Result;
+ Result.reserve(WriteOps.size());
+
+ for (const WriteOp& Op : WriteOps)
+ {
+ Result.push_back(Op.Target->SequenceIndex);
+ }
+ return Result;
+}
+
+bool
+BuildsOperationUpdateFolder::WriteCompressedChunkToCache(
+ const IoHash& ChunkHash,
+ const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs,
+ BufferedWriteFileCache& WriteCache,
+ IoBuffer&& CompressedPart)
+{
+ ZEN_TRACE_CPU("WriteCompressedChunkToCache");
+
+ auto ChunkHashToChunkIndexIt = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash);
+ ZEN_ASSERT(ChunkHashToChunkIndexIt != m_RemoteLookup.ChunkHashToChunkIndex.end());
+ if (IsSingleFileChunk(m_RemoteContent, ChunkTargetPtrs))
+ {
+ const std::uint32_t SequenceIndex = ChunkTargetPtrs.front()->SequenceIndex;
+ const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex];
+ StreamDecompress(m_AbortFlag, m_CacheFolderPath, SequenceRawHash, CompositeBuffer(std::move(CompressedPart)), m_DiskStats);
+ return false;
+ }
+ else
+ {
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(CompositeBuffer(std::move(CompressedPart)), RawHash, RawSize);
+ if (!Compressed)
+ {
+ throw std::runtime_error(fmt::format("Failed to parse header of compressed large blob {}", ChunkHash));
+ }
+ if (RawHash != ChunkHash)
+ {
+ throw std::runtime_error(fmt::format("RawHash in header {} in large blob {} does match.", RawHash, ChunkHash));
+ }
+
+ BufferedWriteFileCache::Local LocalWriter(WriteCache);
+
+ IoHashStream Hash;
+ bool CouldDecompress = Compressed.DecompressToStream(
+ 0,
+ (uint64_t)-1,
+ [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
+ ZEN_UNUSED(SourceOffset);
+ ZEN_TRACE_CPU("Async_StreamDecompress_Write");
+ m_DiskStats.ReadByteCount += SourceSize;
+ if (!m_AbortFlag)
+ {
+ for (const ChunkedContentLookup::ChunkSequenceLocation* TargetPtr : ChunkTargetPtrs)
+ {
+ const auto& Target = *TargetPtr;
+ const uint64_t FileOffset = Target.Offset + Offset;
+ const uint32_t SequenceIndex = Target.SequenceIndex;
+ const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex];
+
+ WriteSequenceChunkToCache(LocalWriter, RangeBuffer, SequenceIndex, FileOffset, PathIndex);
+ }
+
+ return true;
+ }
+ return false;
+ });
+
+ if (m_AbortFlag)
+ {
+ return false;
+ }
+
+ if (!CouldDecompress)
+ {
+ throw std::runtime_error(fmt::format("Failed to decompress large chunk {}", ChunkHash));
+ }
+
+ return true;
+ }
+}
+
+void
+BuildsOperationUpdateFolder::WriteSequenceChunkToCache(BufferedWriteFileCache::Local& LocalWriter,
+ const CompositeBuffer& Chunk,
+ const uint32_t SequenceIndex,
+ const uint64_t FileOffset,
+ const uint32_t PathIndex)
+{
+ ZEN_TRACE_CPU("WriteSequenceChunkToCache");
+
+ const uint64_t SequenceSize = m_RemoteContent.RawSizes[PathIndex];
+
+ auto OpenFile = [&](BasicFile& File) {
+ const std::filesystem::path FileName =
+ GetTempChunkedSequenceFileName(m_CacheFolderPath, m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
+ File.Open(FileName, BasicFile::Mode::kWrite);
+ if (m_Options.UseSparseFiles)
+ {
+ PrepareFileForScatteredWrite(File.Handle(), SequenceSize);
+ }
+ };
+
+ const uint64_t ChunkSize = Chunk.GetSize();
+ ZEN_ASSERT(FileOffset + ChunkSize <= SequenceSize);
+ if (ChunkSize == SequenceSize)
+ {
+ BasicFile SingleChunkFile;
+ OpenFile(SingleChunkFile);
+
+ m_DiskStats.CurrentOpenFileCount++;
+ auto _ = MakeGuard([this]() { m_DiskStats.CurrentOpenFileCount--; });
+ SingleChunkFile.Write(Chunk, FileOffset);
+
+ m_DiskStats.WriteCount++;
+ m_DiskStats.WriteByteCount += ChunkSize;
+ }
+ else
+ {
+ const uint64_t MaxWriterBufferSize = 256u * 1025u;
+
+ BufferedWriteFileCache::Local::Writer* Writer = LocalWriter.GetWriter(SequenceIndex);
+ if (Writer)
+ {
+ if ((!Writer->Writer) && (ChunkSize < MaxWriterBufferSize))
+ {
+ Writer->Writer = std::make_unique<BasicFileWriter>(*Writer->File, Min(SequenceSize, MaxWriterBufferSize));
+ }
+ Writer->Write(Chunk, FileOffset);
+
+ m_DiskStats.WriteCount++;
+ m_DiskStats.WriteByteCount += ChunkSize;
+ }
+ else
+ {
+ Writer = LocalWriter.PutWriter(SequenceIndex, std::make_unique<BufferedWriteFileCache::Local::Writer>());
+
+ Writer->File = std::make_unique<BasicFile>();
+ OpenFile(*Writer->File);
+ if (ChunkSize < MaxWriterBufferSize)
+ {
+ Writer->Writer = std::make_unique<BasicFileWriter>(*Writer->File, Min(SequenceSize, MaxWriterBufferSize));
+ }
+ Writer->Write(Chunk, FileOffset);
+
+ m_DiskStats.WriteCount++;
+ m_DiskStats.WriteByteCount += ChunkSize;
+ }
+ }
+}
+
+bool
+BuildsOperationUpdateFolder::GetBlockWriteOps(std::span<const IoHash> ChunkRawHashes,
+ std::span<const uint32_t> ChunkCompressedLengths,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
+ const MemoryView BlockView,
+ uint32_t FirstIncludedBlockChunkIndex,
+ uint32_t LastIncludedBlockChunkIndex,
+ BlockWriteOps& OutOps)
+{
+ ZEN_TRACE_CPU("GetBlockWriteOps");
+
+ uint32_t OffsetInBlock = 0;
+ for (uint32_t ChunkBlockIndex = FirstIncludedBlockChunkIndex; ChunkBlockIndex <= LastIncludedBlockChunkIndex; ChunkBlockIndex++)
+ {
+ const uint32_t ChunkCompressedSize = ChunkCompressedLengths[ChunkBlockIndex];
+ const IoHash& ChunkHash = ChunkRawHashes[ChunkBlockIndex];
+ if (auto It = m_RemoteLookup.ChunkHashToChunkIndex.find(ChunkHash); It != m_RemoteLookup.ChunkHashToChunkIndex.end())
+ {
+ const uint32_t ChunkIndex = It->second;
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*> ChunkTargetPtrs =
+ GetRemainingChunkTargets(SequenceIndexChunksLeftToWriteCounters, ChunkIndex);
+
+ if (!ChunkTargetPtrs.empty())
+ {
+ bool NeedsWrite = true;
+ if (RemoteChunkIndexNeedsCopyFromSourceFlags[ChunkIndex].compare_exchange_strong(NeedsWrite, false))
+ {
+ MemoryView ChunkMemoryView = BlockView.Mid(OffsetInBlock, ChunkCompressedSize);
+ IoHash VerifyChunkHash;
+ uint64_t VerifyChunkSize;
+ CompressedBuffer CompressedChunk =
+ CompressedBuffer::FromCompressed(SharedBuffer::MakeView(ChunkMemoryView), VerifyChunkHash, VerifyChunkSize);
+ ZEN_ASSERT(CompressedChunk);
+ ZEN_ASSERT(VerifyChunkHash == ChunkHash);
+ ZEN_ASSERT(VerifyChunkSize == m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]);
+
+ OodleCompressor ChunkCompressor;
+ OodleCompressionLevel ChunkCompressionLevel;
+ uint64_t ChunkBlockSize;
+
+ bool GetCompressParametersSuccess =
+ CompressedChunk.TryGetCompressParameters(ChunkCompressor, ChunkCompressionLevel, ChunkBlockSize);
+ ZEN_ASSERT(GetCompressParametersSuccess);
+
+ IoBuffer Decompressed;
+ if (ChunkCompressionLevel == OodleCompressionLevel::None)
+ {
+ MemoryView ChunkDecompressedMemoryView = ChunkMemoryView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder());
+ Decompressed =
+ IoBuffer(IoBuffer::Wrap, ChunkDecompressedMemoryView.GetData(), ChunkDecompressedMemoryView.GetSize());
+ }
+ else
+ {
+ Decompressed = CompressedChunk.Decompress().AsIoBuffer();
+ }
+ ZEN_ASSERT(Decompressed.GetSize() == m_RemoteContent.ChunkedContent.ChunkRawSizes[ChunkIndex]);
+ ZEN_ASSERT_SLOW(ChunkHash == IoHash::HashBuffer(Decompressed));
+ for (const ChunkedContentLookup::ChunkSequenceLocation* Target : ChunkTargetPtrs)
+ {
+ OutOps.WriteOps.push_back(
+ BlockWriteOps::WriteOpData{.Target = Target, .ChunkBufferIndex = OutOps.ChunkBuffers.size()});
+ }
+ OutOps.ChunkBuffers.emplace_back(std::move(Decompressed));
+ }
+ }
+ }
+
+ OffsetInBlock += ChunkCompressedSize;
+ }
+ {
+ ZEN_TRACE_CPU("Sort");
+ std::sort(OutOps.WriteOps.begin(),
+ OutOps.WriteOps.end(),
+ [](const BlockWriteOps::WriteOpData& Lhs, const BlockWriteOps::WriteOpData& Rhs) {
+ if (Lhs.Target->SequenceIndex < Rhs.Target->SequenceIndex)
+ {
+ return true;
+ }
+ if (Lhs.Target->SequenceIndex > Rhs.Target->SequenceIndex)
+ {
+ return false;
+ }
+ return Lhs.Target->Offset < Rhs.Target->Offset;
+ });
+ }
+ return true;
+}
+
+void
+BuildsOperationUpdateFolder::WriteBlockChunkOpsToCache(std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ const BlockWriteOps& Ops,
+ BufferedWriteFileCache& WriteCache,
+ ParallelWork& Work)
+{
+ ZEN_TRACE_CPU("WriteBlockChunkOpsToCache");
+
+ {
+ BufferedWriteFileCache::Local LocalWriter(WriteCache);
+ for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps)
+ {
+ if (Work.IsAborted())
+ {
+ break;
+ }
+ const CompositeBuffer& Chunk = Ops.ChunkBuffers[WriteOp.ChunkBufferIndex];
+ const uint32_t SequenceIndex = WriteOp.Target->SequenceIndex;
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() <=
+ m_RemoteContent.ChunkedContent.ChunkCounts[SequenceIndex]);
+ ZEN_ASSERT(SequenceIndexChunksLeftToWriteCounters[SequenceIndex].load() > 0);
+ const uint64_t FileOffset = WriteOp.Target->Offset;
+ const uint32_t PathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[SequenceIndex];
+
+ WriteSequenceChunkToCache(LocalWriter, Chunk, SequenceIndex, FileOffset, PathIndex);
+ }
+ }
+ if (!Work.IsAborted())
+ {
+ // Write tracking, updating this must be done without any files open (BufferedWriteFileCache::Local)
+ std::vector<uint32_t> CompletedChunkSequences;
+ for (const BlockWriteOps::WriteOpData& WriteOp : Ops.WriteOps)
+ {
+ const uint32_t RemoteSequenceIndex = WriteOp.Target->SequenceIndex;
+ if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters))
+ {
+ CompletedChunkSequences.push_back(RemoteSequenceIndex);
+ }
+ }
+ WriteCache.Close(CompletedChunkSequences);
+ VerifyAndCompleteChunkSequencesAsync(CompletedChunkSequences, Work);
+ }
+}
+
+bool
+BuildsOperationUpdateFolder::WriteChunksBlockToCache(const ChunkBlockDescription& BlockDescription,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ ParallelWork& Work,
+ CompositeBuffer&& BlockBuffer,
+ std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
+ BufferedWriteFileCache& WriteCache)
+{
+ ZEN_TRACE_CPU("WriteChunksBlockToCache");
+
+ IoBuffer BlockMemoryBuffer = MakeBufferMemoryBased(BlockBuffer);
+ const MemoryView BlockView = BlockMemoryBuffer.GetView();
+
+ BlockWriteOps Ops;
+ if ((BlockDescription.HeaderSize == 0) || BlockDescription.ChunkCompressedLengths.empty())
+ {
+ ZEN_TRACE_CPU("WriteChunksBlockToCache_Legacy");
+
+ uint64_t HeaderSize;
+ const std::vector<uint32_t> ChunkCompressedLengths =
+ ReadChunkBlockHeader(BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder()), HeaderSize);
+
+ if (GetBlockWriteOps(BlockDescription.ChunkRawHashes,
+ ChunkCompressedLengths,
+ SequenceIndexChunksLeftToWriteCounters,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder() + HeaderSize),
+ 0,
+ gsl::narrow<uint32_t>(BlockDescription.ChunkRawHashes.size() - 1),
+ Ops))
+ {
+ WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work);
+ return true;
+ }
+ return false;
+ }
+
+ if (GetBlockWriteOps(BlockDescription.ChunkRawHashes,
+ BlockDescription.ChunkCompressedLengths,
+ SequenceIndexChunksLeftToWriteCounters,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ BlockView.Mid(CompressedBuffer::GetHeaderSizeForNoneEncoder() + BlockDescription.HeaderSize),
+ 0,
+ gsl::narrow<uint32_t>(BlockDescription.ChunkRawHashes.size() - 1),
+ Ops))
+ {
+ WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work);
+ return true;
+ }
+ return false;
+}
+
+bool
+BuildsOperationUpdateFolder::WritePartialBlockChunksToCache(const ChunkBlockDescription& BlockDescription,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ ParallelWork& Work,
+ CompositeBuffer&& PartialBlockBuffer,
+ uint32_t FirstIncludedBlockChunkIndex,
+ uint32_t LastIncludedBlockChunkIndex,
+ std::span<std::atomic<bool>> RemoteChunkIndexNeedsCopyFromSourceFlags,
+ BufferedWriteFileCache& WriteCache)
+{
+ ZEN_TRACE_CPU("WritePartialBlockChunksToCache");
+
+ IoBuffer BlockMemoryBuffer = MakeBufferMemoryBased(PartialBlockBuffer);
+ const MemoryView BlockView = BlockMemoryBuffer.GetView();
+
+ BlockWriteOps Ops;
+ if (GetBlockWriteOps(BlockDescription.ChunkRawHashes,
+ BlockDescription.ChunkCompressedLengths,
+ SequenceIndexChunksLeftToWriteCounters,
+ RemoteChunkIndexNeedsCopyFromSourceFlags,
+ BlockView,
+ FirstIncludedBlockChunkIndex,
+ LastIncludedBlockChunkIndex,
+ Ops))
+ {
+ WriteBlockChunkOpsToCache(SequenceIndexChunksLeftToWriteCounters, Ops, WriteCache, Work);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+}
+
+void
+BuildsOperationUpdateFolder::AsyncWriteDownloadedChunk(const std::filesystem::path& ZenFolderPath,
+ uint32_t RemoteChunkIndex,
+ std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>&& ChunkTargetPtrs,
+ BufferedWriteFileCache& WriteCache,
+ ParallelWork& Work,
+ IoBuffer&& Payload,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters,
+ std::atomic<uint64_t>& WritePartsComplete,
+ const uint64_t TotalPartWriteCount,
+ FilteredRate& FilteredWrittenBytesPerSecond)
+{
+ ZEN_TRACE_CPU("AsyncWriteDownloadedChunk");
+
+ const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+
+ const uint64_t Size = Payload.GetSize();
+
+ std::filesystem::path CompressedChunkPath;
+
+ // Check if the dowloaded chunk is file based and we can move it directly without rewriting it
+ {
+ IoBufferFileReference FileRef;
+ if (Payload.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == Size))
+ {
+ ZEN_TRACE_CPU("MoveTempChunk");
+ std::error_code Ec;
+ std::filesystem::path TempBlobPath = PathFromHandle(FileRef.FileHandle, Ec);
+ if (!Ec)
+ {
+ Payload.SetDeleteOnClose(false);
+ Payload = {};
+ CompressedChunkPath = ZenTempDownloadFolderPath(ZenFolderPath) / ChunkHash.ToHexString();
+ RenameFile(TempBlobPath, CompressedChunkPath, Ec);
+ if (Ec)
+ {
+ CompressedChunkPath = std::filesystem::path{};
+
+ // Re-open the temp file again
+ BasicFile OpenTemp(TempBlobPath, BasicFile::Mode::kDelete);
+ Payload = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, Size, true);
+ Payload.SetDeleteOnClose(true);
+ }
+ }
+ }
+ }
+
+ if (CompressedChunkPath.empty() && (Size > 512u * 1024u))
+ {
+ ZEN_TRACE_CPU("WriteTempChunk");
+ // Could not be moved and rather large, lets store it on disk
+ CompressedChunkPath = ZenTempDownloadFolderPath(ZenFolderPath) / ChunkHash.ToHexString();
+ TemporaryFile::SafeWriteFile(CompressedChunkPath, Payload);
+ Payload = {};
+ }
+
+ Work.ScheduleWork(
+ m_IOWorkerPool,
+ [&ZenFolderPath,
+ this,
+ SequenceIndexChunksLeftToWriteCounters,
+ &Work,
+ CompressedChunkPath,
+ RemoteChunkIndex,
+ TotalPartWriteCount,
+ &WriteCache,
+ &WritePartsComplete,
+ &FilteredWrittenBytesPerSecond,
+ ChunkTargetPtrs = std::move(ChunkTargetPtrs),
+ CompressedPart = std::move(Payload)](std::atomic<bool>&) mutable {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_WriteChunk");
+
+ FilteredWrittenBytesPerSecond.Start();
+
+ const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
+ if (CompressedChunkPath.empty())
+ {
+ ZEN_ASSERT(CompressedPart);
+ }
+ else
+ {
+ ZEN_ASSERT(!CompressedPart);
+ CompressedPart = IoBufferBuilder::MakeFromFile(CompressedChunkPath);
+ if (!CompressedPart)
+ {
+ throw std::runtime_error(
+ fmt::format("Could not open dowloaded compressed chunk {} from {}", ChunkHash, CompressedChunkPath));
+ }
+ }
+
+ bool NeedHashVerify = WriteCompressedChunkToCache(ChunkHash, ChunkTargetPtrs, WriteCache, std::move(CompressedPart));
+ if (!m_AbortFlag)
+ {
+ WritePartsComplete++;
+ if (WritePartsComplete == TotalPartWriteCount)
+ {
+ FilteredWrittenBytesPerSecond.Stop();
+ }
+
+ if (!CompressedChunkPath.empty())
+ {
+ TryRemoveFile(m_LogOutput, CompressedChunkPath);
+ }
+
+ std::vector<uint32_t> CompletedSequences =
+ CompleteChunkTargets(ChunkTargetPtrs, SequenceIndexChunksLeftToWriteCounters);
+ WriteCache.Close(CompletedSequences);
+ if (NeedHashVerify)
+ {
+ VerifyAndCompleteChunkSequencesAsync(CompletedSequences, Work);
+ }
+ else
+ {
+ FinalizeChunkSequences(CompletedSequences);
+ }
+ }
+ }
+ });
+}
+
+void
+BuildsOperationUpdateFolder::VerifyAndCompleteChunkSequencesAsync(std::span<const uint32_t> RemoteSequenceIndexes, ParallelWork& Work)
+{
+ if (RemoteSequenceIndexes.empty())
+ {
+ return;
+ }
+ ZEN_TRACE_CPU("VerifyAndCompleteChunkSequence");
+ for (uint32_t RemoteSequenceIndexOffset = 1; RemoteSequenceIndexOffset < RemoteSequenceIndexes.size(); RemoteSequenceIndexOffset++)
+ {
+ const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[RemoteSequenceIndexOffset];
+ Work.ScheduleWork(m_IOWorkerPool, [this, RemoteSequenceIndex](std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_VerifyAndFinalizeSequence");
+
+ VerifySequence(RemoteSequenceIndex);
+ if (!m_AbortFlag)
+ {
+ const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ FinalizeChunkSequence(SequenceRawHash);
+ }
+ }
+ });
+ }
+ const uint32_t RemoteSequenceIndex = RemoteSequenceIndexes[0];
+
+ VerifySequence(RemoteSequenceIndex);
+ const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ FinalizeChunkSequence(SequenceRawHash);
+}
+
+bool
+BuildsOperationUpdateFolder::CompleteSequenceChunk(uint32_t RemoteSequenceIndex,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters)
+{
+ uint32_t PreviousValue = SequenceIndexChunksLeftToWriteCounters[RemoteSequenceIndex].fetch_sub(1);
+ ZEN_ASSERT(PreviousValue >= 1);
+ ZEN_ASSERT(PreviousValue != (uint32_t)-1);
+ return PreviousValue == 1;
+}
+
+std::vector<uint32_t>
+BuildsOperationUpdateFolder::CompleteChunkTargets(const std::vector<const ChunkedContentLookup::ChunkSequenceLocation*>& ChunkTargetPtrs,
+ std::span<std::atomic<uint32_t>> SequenceIndexChunksLeftToWriteCounters)
+{
+ ZEN_TRACE_CPU("CompleteChunkTargets");
+
+ std::vector<uint32_t> CompletedSequenceIndexes;
+ for (const ChunkedContentLookup::ChunkSequenceLocation* Location : ChunkTargetPtrs)
+ {
+ const uint32_t RemoteSequenceIndex = Location->SequenceIndex;
+ if (CompleteSequenceChunk(RemoteSequenceIndex, SequenceIndexChunksLeftToWriteCounters))
+ {
+ CompletedSequenceIndexes.push_back(RemoteSequenceIndex);
+ }
+ }
+ return CompletedSequenceIndexes;
+}
+
+void
+BuildsOperationUpdateFolder::FinalizeChunkSequence(const IoHash& SequenceRawHash)
+{
+ ZEN_TRACE_CPU("FinalizeChunkSequence");
+
+ ZEN_ASSERT_SLOW(!IsFile(GetFinalChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash)));
+ std::error_code Ec;
+ RenameFile(GetTempChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash),
+ GetFinalChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash),
+ Ec);
+ if (Ec)
+ {
+ throw std::system_error(Ec);
+ }
+}
+
+void
+BuildsOperationUpdateFolder::FinalizeChunkSequences(std::span<const uint32_t> RemoteSequenceIndexes)
+{
+ ZEN_TRACE_CPU("FinalizeChunkSequences");
+
+ for (uint32_t SequenceIndex : RemoteSequenceIndexes)
+ {
+ FinalizeChunkSequence(m_RemoteContent.ChunkedContent.SequenceRawHashes[SequenceIndex]);
+ }
+}
+
+void
+BuildsOperationUpdateFolder::VerifySequence(uint32_t RemoteSequenceIndex)
+{
+ ZEN_TRACE_CPU("VerifySequence");
+
+ const IoHash& SequenceRawHash = m_RemoteContent.ChunkedContent.SequenceRawHashes[RemoteSequenceIndex];
+ {
+ ZEN_TRACE_CPU("HashSequence");
+ const std::uint32_t RemotePathIndex = m_RemoteLookup.SequenceIndexFirstPathIndex[RemoteSequenceIndex];
+ const uint64_t ExpectedSize = m_RemoteContent.RawSizes[RemotePathIndex];
+ IoBuffer VerifyBuffer = IoBufferBuilder::MakeFromFile(GetTempChunkedSequenceFileName(m_CacheFolderPath, SequenceRawHash));
+ const uint64_t VerifySize = VerifyBuffer.GetSize();
+ if (VerifySize != ExpectedSize)
+ {
+ throw std::runtime_error(fmt::format("Written chunk sequence {} size {} does not match expected size {}",
+ SequenceRawHash,
+ VerifySize,
+ ExpectedSize));
+ }
+
+ const IoHash VerifyChunkHash = IoHash::HashBuffer(std::move(VerifyBuffer));
+ if (VerifyChunkHash != SequenceRawHash)
+ {
+ throw std::runtime_error(
+ fmt::format("Written chunk sequence {} hash does not match expected hash {}", VerifyChunkHash, SequenceRawHash));
+ }
+ }
+}
+
+void
+DownloadLargeBlob(BuildStorage& Storage,
+ const std::filesystem::path& DownloadFolder,
+ const Oid& BuildId,
+ const IoHash& ChunkHash,
+ const std::uint64_t PreferredMultipartChunkSize,
+ ParallelWork& Work,
+ WorkerThreadPool& NetworkPool,
+ DownloadStatistics& DownloadStats,
+ std::function<void(IoBuffer&& Payload)>&& OnDownloadComplete)
+{
+ ZEN_TRACE_CPU("DownloadLargeBlob");
+
+ struct WorkloadData
+ {
+ TemporaryFile TempFile;
+ };
+ std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>());
+
+ std::error_code Ec;
+ Workload->TempFile.CreateTemporary(DownloadFolder, Ec);
+ if (Ec)
+ {
+ throw std::runtime_error(
+ fmt::format("Failed opening temporary file '{}': {} ({})", Workload->TempFile.GetPath(), Ec.message(), Ec.value()));
+ }
+ std::vector<std::function<void()>> WorkItems = Storage.GetLargeBuildBlob(
+ BuildId,
+ ChunkHash,
+ PreferredMultipartChunkSize,
+ [&Work, Workload, &DownloadStats](uint64_t Offset, const IoBuffer& Chunk) {
+ DownloadStats.DownloadedChunkByteCount += Chunk.GetSize();
+
+ if (!Work.IsAborted())
+ {
+ ZEN_TRACE_CPU("Async_DownloadLargeBlob_OnReceive");
+ Workload->TempFile.Write(Chunk.GetView(), Offset);
+ }
+ },
+ [&Work, Workload, &DownloadStats, OnDownloadComplete = std::move(OnDownloadComplete)]() {
+ DownloadStats.DownloadedChunkCount++;
+ if (!Work.IsAborted())
+ {
+ ZEN_TRACE_CPU("Async_DownloadLargeBlob_OnComplete");
+
+ uint64_t PayloadSize = Workload->TempFile.FileSize();
+ void* FileHandle = Workload->TempFile.Detach();
+ ZEN_ASSERT(FileHandle != nullptr);
+ IoBuffer Payload(IoBuffer::File, FileHandle, 0, PayloadSize, true);
+ Payload.SetDeleteOnClose(true);
+ OnDownloadComplete(std::move(Payload));
+ }
+ });
+ if (!WorkItems.empty())
+ {
+ DownloadStats.MultipartAttachmentCount++;
+ }
+ for (auto& WorkItem : WorkItems)
+ {
+ Work.ScheduleWork(NetworkPool, [WorkItem = std::move(WorkItem)](std::atomic<bool>& AbortFlag) {
+ if (!AbortFlag)
+ {
+ ZEN_TRACE_CPU("Async_DownloadLargeBlob_Work");
+
+ WorkItem();
+ }
+ });
+ }
+}
+
+} // namespace zen