aboutsummaryrefslogtreecommitdiff
path: root/src/zenremotestore/builds/builduploadfolder.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-04-23 18:16:57 +0200
committerStefan Boberg <[email protected]>2026-04-23 18:16:57 +0200
commit0232b991cd7d8e3a2114ea30e4591dd3e7b65c36 (patch)
tree94730e7594fd09ae1fa820391ce311f6daf13905 /src/zenremotestore/builds/builduploadfolder.cpp
parentFix forward declaration order for s_GotSigWinch and SigWinchHandler (diff)
parenttrace: declare Region event name fields as AnsiString (#1012) (diff)
downloadarchived-zen-sb/zen-help.tar.xz
archived-zen-sb/zen-help.zip
Merge branch 'main' into sb/zen-helpsb/zen-help
- Combine HelpCommand (this branch) with HistoryCommand (main) in zen CLI dispatcher - Keep filter-aware TuiPickOne rewrite; adopt main's ASCII arrow glyphs in doc comment
Diffstat (limited to 'src/zenremotestore/builds/builduploadfolder.cpp')
-rw-r--r--src/zenremotestore/builds/builduploadfolder.cpp2634
1 files changed, 2634 insertions, 0 deletions
diff --git a/src/zenremotestore/builds/builduploadfolder.cpp b/src/zenremotestore/builds/builduploadfolder.cpp
new file mode 100644
index 000000000..b536ae464
--- /dev/null
+++ b/src/zenremotestore/builds/builduploadfolder.cpp
@@ -0,0 +1,2634 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenremotestore/builds/builduploadfolder.h>
+
+#include <zencore/basicfile.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/fmtutils.h>
+#include <zencore/parallelwork.h>
+#include <zencore/scopeguard.h>
+#include <zencore/trace.h>
+#include <zenremotestore/builds/buildcontent.h>
+#include <zenremotestore/builds/buildmanifest.h>
+#include <zenremotestore/builds/buildstoragecache.h>
+#include <zenremotestore/chunking/chunkingcache.h>
+#include <zenremotestore/chunking/chunkingcontroller.h>
+#include <zenremotestore/transferthreadworkers.h>
+#include <zenutil/filesystemutils.h>
+#include <zenutil/filteredrate.h>
+#include <zenutil/progress.h>
+
+#include <numeric>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <tsl/robin_set.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+using namespace std::literals;
+
+namespace {
+ bool IsExtensionHashCompressable(const tsl::robin_set<uint32_t>& NonCompressableExtensionHashes, const uint32_t PathHash)
+ {
+ return !NonCompressableExtensionHashes.contains(PathHash);
+ }
+
+ bool IsChunkCompressable(const tsl::robin_set<uint32_t>& NonCompressableExtensionHashes,
+ const ChunkedContentLookup& Lookup,
+ uint32_t ChunkIndex)
+ {
+ const uint32_t ChunkLocationCount = Lookup.ChunkSequenceLocationCounts[ChunkIndex];
+ if (ChunkLocationCount == 0)
+ {
+ return false;
+ }
+ const size_t ChunkLocationOffset = Lookup.ChunkSequenceLocationOffset[ChunkIndex];
+ const uint32_t SequenceIndex = Lookup.ChunkSequenceLocations[ChunkLocationOffset].SequenceIndex;
+ const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex];
+ const uint32_t ExtensionHash = Lookup.PathExtensionHash[PathIndex];
+
+ const bool IsCompressable = IsExtensionHashCompressable(NonCompressableExtensionHashes, ExtensionHash);
+ return IsCompressable;
+ }
+ template<typename T>
+ std::string FormatArray(std::span<const T> Items, std::string_view Prefix)
+ {
+ ExtendableStringBuilder<512> SB;
+ for (const T& Item : Items)
+ {
+ SB.Append(fmt::format("{}{}", Prefix, Item));
+ }
+ return SB.ToString();
+ }
+} // namespace
+
+class ReadFileCache
+{
+public:
+ // A buffered file reader that provides CompositeBuffer where the buffers are owned and the memory never overwritten
+ ReadFileCache(std::atomic<uint64_t>& OpenReadCount,
+ std::atomic<uint64_t>& CurrentOpenFileCount,
+ std::atomic<uint64_t>& ReadCount,
+ std::atomic<uint64_t>& ReadByteCount,
+ const std::filesystem::path& Path,
+ const ChunkedFolderContent& LocalContent,
+ const ChunkedContentLookup& LocalLookup,
+ size_t MaxOpenFileCount)
+ : m_Path(Path)
+ , m_LocalContent(LocalContent)
+ , m_LocalLookup(LocalLookup)
+ , m_OpenReadCount(OpenReadCount)
+ , m_CurrentOpenFileCount(CurrentOpenFileCount)
+ , m_ReadCount(ReadCount)
+ , m_ReadByteCount(ReadByteCount)
+ {
+ m_OpenFiles.reserve(MaxOpenFileCount);
+ }
+ ~ReadFileCache() { m_OpenFiles.clear(); }
+
+ CompositeBuffer GetRange(uint32_t SequenceIndex, uint64_t Offset, uint64_t Size)
+ {
+ ZEN_TRACE_CPU("ReadFileCache::GetRange");
+
+ auto CacheIt =
+ std::find_if(m_OpenFiles.begin(), m_OpenFiles.end(), [SequenceIndex](const auto& Lhs) { return Lhs.first == SequenceIndex; });
+ if (CacheIt != m_OpenFiles.end())
+ {
+ if (CacheIt != m_OpenFiles.begin())
+ {
+ auto CachedFile(std::move(CacheIt->second));
+ m_OpenFiles.erase(CacheIt);
+ m_OpenFiles.insert(m_OpenFiles.begin(), std::make_pair(SequenceIndex, std::move(CachedFile)));
+ }
+ CompositeBuffer Result = m_OpenFiles.front().second->GetRange(Offset, Size);
+ return Result;
+ }
+ const uint32_t LocalPathIndex = m_LocalLookup.SequenceIndexFirstPathIndex[SequenceIndex];
+ const std::filesystem::path LocalFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred();
+ if (Size == m_LocalContent.RawSizes[LocalPathIndex])
+ {
+ IoBuffer Result = IoBufferBuilder::MakeFromFile(LocalFilePath);
+ return CompositeBuffer(SharedBuffer(Result));
+ }
+ if (m_OpenFiles.size() == m_OpenFiles.capacity())
+ {
+ m_OpenFiles.pop_back();
+ }
+ m_OpenFiles.insert(
+ m_OpenFiles.begin(),
+ std::make_pair(
+ SequenceIndex,
+ std::make_unique<BufferedOpenFile>(LocalFilePath, m_OpenReadCount, m_CurrentOpenFileCount, m_ReadCount, m_ReadByteCount)));
+ CompositeBuffer Result = m_OpenFiles.front().second->GetRange(Offset, Size);
+ return Result;
+ }
+
+private:
+ const std::filesystem::path m_Path;
+ const ChunkedFolderContent& m_LocalContent;
+ const ChunkedContentLookup& m_LocalLookup;
+ std::vector<std::pair<uint32_t, std::unique_ptr<BufferedOpenFile>>> m_OpenFiles;
+ std::atomic<uint64_t>& m_OpenReadCount;
+ std::atomic<uint64_t>& m_CurrentOpenFileCount;
+ std::atomic<uint64_t>& m_ReadCount;
+ std::atomic<uint64_t>& m_ReadByteCount;
+};
+
+BuildsOperationUploadFolder::BuildsOperationUploadFolder(LoggerRef Log,
+ ProgressBase& Progress,
+ StorageInstance& Storage,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag,
+ WorkerThreadPool& IOWorkerPool,
+ WorkerThreadPool& NetworkPool,
+ const Oid& BuildId,
+ const std::filesystem::path& Path,
+ bool CreateBuild,
+ const CbObject& MetaData,
+ const Options& Options)
+: m_Log(Log)
+, m_Progress(Progress)
+, m_Storage(Storage)
+, m_AbortFlag(AbortFlag)
+, m_PauseFlag(PauseFlag)
+, m_IOWorkerPool(IOWorkerPool)
+, m_NetworkPool(NetworkPool)
+, m_BuildId(BuildId)
+, m_Path(Path)
+, m_CreateBuild(CreateBuild)
+, m_MetaData(MetaData)
+, m_Options(Options)
+{
+ m_NonCompressableExtensionHashes.reserve(Options.NonCompressableExtensions.size());
+ for (const std::string& Extension : Options.NonCompressableExtensions)
+ {
+ m_NonCompressableExtensionHashes.insert(HashStringAsLowerDjb2(Extension));
+ }
+}
+
+BuildsOperationUploadFolder::PrepareBuildResult
+BuildsOperationUploadFolder::PrepareBuild()
+{
+ ZEN_TRACE_CPU("PrepareBuild");
+
+ PrepareBuildResult Result;
+ Result.PreferredMultipartChunkSize = m_Options.PreferredMultipartChunkSize;
+ Stopwatch Timer;
+ if (m_CreateBuild)
+ {
+ ZEN_TRACE_CPU("CreateBuild");
+
+ Stopwatch PutBuildTimer;
+ CbObject PutBuildResult = m_Storage.BuildStorage->PutBuild(m_BuildId, m_MetaData);
+ Result.PrepareBuildTimeMs = PutBuildTimer.GetElapsedTimeMs();
+ if (auto ChunkSize = PutBuildResult["chunkSize"sv].AsUInt64(); ChunkSize != 0)
+ {
+ Result.PreferredMultipartChunkSize = ChunkSize;
+ }
+ Result.PayloadSize = m_MetaData.GetSize();
+ }
+ else
+ {
+ ZEN_TRACE_CPU("PutBuild");
+ Stopwatch GetBuildTimer;
+ CbObject Build = m_Storage.BuildStorage->GetBuild(m_BuildId);
+ Result.PrepareBuildTimeMs = GetBuildTimer.GetElapsedTimeMs();
+ Result.PayloadSize = Build.GetSize();
+ if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0)
+ {
+ Result.PreferredMultipartChunkSize = ChunkSize;
+ }
+ else if (m_Options.AllowMultiparts)
+ {
+ ZEN_WARN("PreferredMultipartChunkSize is unknown. Defaulting to '{}'", NiceBytes(Result.PreferredMultipartChunkSize));
+ }
+ }
+
+ if (!m_Options.IgnoreExistingBlocks)
+ {
+ ZEN_TRACE_CPU("FindBlocks");
+ Stopwatch KnownBlocksTimer;
+ CbObject BlockDescriptionList = m_Storage.BuildStorage->FindBlocks(m_BuildId, m_Options.FindBlockMaxCount);
+ if (BlockDescriptionList)
+ {
+ Result.KnownBlocks = ParseChunkBlockDescriptionList(BlockDescriptionList);
+ }
+ Result.FindBlocksTimeMs = KnownBlocksTimer.GetElapsedTimeMs();
+ }
+ Result.ElapsedTimeMs = Timer.GetElapsedTimeMs();
+ return Result;
+}
+
+std::vector<BuildsOperationUploadFolder::UploadPart>
+BuildsOperationUploadFolder::ReadFolder()
+{
+ std::vector<UploadPart> UploadParts;
+ std::filesystem::path ExcludeManifestPath = m_Path / m_Options.ZenExcludeManifestName;
+ tsl::robin_set<std::string> ExcludeAssetPaths;
+ if (IsFile(ExcludeManifestPath))
+ {
+ std::filesystem::path AbsoluteExcludeManifestPath =
+ MakeSafeAbsolutePath(ExcludeManifestPath.is_absolute() ? ExcludeManifestPath : m_Path / ExcludeManifestPath);
+ BuildManifest Manifest = ParseBuildManifest(AbsoluteExcludeManifestPath);
+ const std::vector<std::filesystem::path>& AssetPaths = Manifest.Parts.front().Files;
+ ExcludeAssetPaths.reserve(AssetPaths.size());
+ for (const std::filesystem::path& AssetPath : AssetPaths)
+ {
+ ExcludeAssetPaths.insert(AssetPath.generic_string());
+ }
+ }
+
+ UploadParts.resize(1);
+
+ UploadPart& Part = UploadParts.front();
+ GetFolderContentStatistics& LocalFolderScanStats = Part.LocalFolderScanStats;
+
+ Part.Content = GetFolderContent(
+ Part.LocalFolderScanStats,
+ m_Path,
+ [this](const std::string_view& RelativePath) { return IsAcceptedFolder(RelativePath); },
+ [this, &ExcludeAssetPaths](const std::string_view& RelativePath, uint64_t Size, uint32_t Attributes) -> bool {
+ ZEN_UNUSED(Size, Attributes);
+ if (!IsAcceptedFile(RelativePath))
+ {
+ return false;
+ }
+ if (ExcludeAssetPaths.contains(std::filesystem::path(RelativePath).generic_string()))
+ {
+ return false;
+ }
+ return true;
+ },
+ m_IOWorkerPool,
+ m_Progress.GetProgressUpdateDelayMS(),
+ [&](bool, std::ptrdiff_t) { ZEN_INFO("Found {} files in '{}'...", LocalFolderScanStats.AcceptedFileCount.load(), m_Path); },
+ m_AbortFlag);
+ Part.TotalRawSize = std::accumulate(Part.Content.RawSizes.begin(), Part.Content.RawSizes.end(), std::uint64_t(0));
+
+ return UploadParts;
+}
+
+std::vector<BuildsOperationUploadFolder::UploadPart>
+BuildsOperationUploadFolder::ReadManifestParts(const std::filesystem::path& ManifestPath)
+{
+ std::vector<UploadPart> UploadParts;
+ Stopwatch ManifestParseTimer;
+ std::filesystem::path AbsoluteManifestPath = MakeSafeAbsolutePath(ManifestPath.is_absolute() ? ManifestPath : m_Path / ManifestPath);
+ BuildManifest Manifest = ParseBuildManifest(AbsoluteManifestPath);
+ if (Manifest.Parts.empty())
+ {
+ throw std::runtime_error(fmt::format("Manifest file at '{}' is invalid", ManifestPath));
+ }
+
+ UploadParts.resize(Manifest.Parts.size());
+ for (size_t PartIndex = 0; PartIndex < Manifest.Parts.size(); PartIndex++)
+ {
+ BuildManifest::Part& PartManifest = Manifest.Parts[PartIndex];
+ if (ManifestPath.is_relative())
+ {
+ PartManifest.Files.push_back(ManifestPath);
+ }
+
+ UploadPart& Part = UploadParts[PartIndex];
+ FolderContent& Content = Part.Content;
+
+ GetFolderContentStatistics& LocalFolderScanStats = Part.LocalFolderScanStats;
+
+ const std::vector<std::filesystem::path>& AssetPaths = PartManifest.Files;
+ Content = GetValidFolderContent(
+ m_IOWorkerPool,
+ LocalFolderScanStats,
+ m_Path,
+ AssetPaths,
+ [](uint64_t PathCount, uint64_t CompletedPathCount) { ZEN_UNUSED(PathCount, CompletedPathCount); },
+ 1000,
+ m_AbortFlag,
+ m_PauseFlag);
+
+ if (Content.Paths.size() != AssetPaths.size())
+ {
+ const tsl::robin_set<std::filesystem::path> FoundPaths(Content.Paths.begin(), Content.Paths.end());
+ ExtendableStringBuilder<1024> SB;
+ for (const std::filesystem::path& AssetPath : AssetPaths)
+ {
+ if (!FoundPaths.contains(AssetPath))
+ {
+ SB << "\n " << AssetPath.generic_string();
+ }
+ }
+ throw std::runtime_error(
+ fmt::format("Manifest file at '{}' references files that does not exist{}", ManifestPath, SB.ToView()));
+ }
+
+ Part.PartId = PartManifest.PartId;
+ Part.PartName = PartManifest.PartName;
+ Part.TotalRawSize = std::accumulate(Part.Content.RawSizes.begin(), Part.Content.RawSizes.end(), std::uint64_t(0));
+ }
+
+ return UploadParts;
+}
+
+std::vector<std::pair<Oid, std::string>>
+BuildsOperationUploadFolder::Execute(const Oid& BuildPartId,
+ const std::string_view BuildPartName,
+ const std::filesystem::path& ManifestPath,
+ ChunkingController& ChunkController,
+ ChunkingCache& ChunkCache)
+{
+ ZEN_TRACE_CPU("BuildsOperationUploadFolder::Execute");
+ try
+ {
+ Stopwatch ReadPartsTimer;
+ std::vector<UploadPart> UploadParts = ManifestPath.empty() ? ReadFolder() : ReadManifestParts(ManifestPath);
+
+ for (UploadPart& Part : UploadParts)
+ {
+ if (Part.PartId == Oid::Zero)
+ {
+ if (UploadParts.size() != 1)
+ {
+ throw std::runtime_error(fmt::format("Multi part upload manifest '{}' must contains build part id", ManifestPath));
+ }
+
+ if (BuildPartId == Oid::Zero)
+ {
+ Part.PartId = Oid::NewOid();
+ }
+ else
+ {
+ Part.PartId = BuildPartId;
+ }
+ }
+ if (Part.PartName.empty())
+ {
+ if (UploadParts.size() != 1)
+ {
+ throw std::runtime_error(fmt::format("Multi part upload manifest '{}' must contains build part name", ManifestPath));
+ }
+ if (BuildPartName.empty())
+ {
+ throw std::runtime_error("Build part name must be set");
+ }
+ Part.PartName = std::string(BuildPartName);
+ }
+ }
+
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_INFO("Reading {} parts took {}", UploadParts.size(), NiceTimeSpanMs(ReadPartsTimer.GetElapsedTimeMs()));
+ }
+
+ const uint32_t PartsUploadStepCount = gsl::narrow<uint32_t>(uint32_t(PartTaskSteps::StepCount) * UploadParts.size());
+
+ const uint32_t PrepareBuildStep = 0;
+ const uint32_t UploadPartsStep = 1;
+ const uint32_t FinalizeBuildStep = UploadPartsStep + PartsUploadStepCount;
+ const uint32_t CleanupStep = FinalizeBuildStep + 1;
+ const uint32_t StepCount = CleanupStep + 1;
+
+ auto EndProgress = MakeGuard([&]() { m_Progress.SetLogOperationProgress(StepCount, StepCount); });
+
+ Stopwatch ProcessTimer;
+
+ CleanAndRemoveDirectory(m_IOWorkerPool, m_AbortFlag, m_PauseFlag, m_Options.TempDir);
+ CreateDirectories(m_Options.TempDir);
+ auto _ = MakeGuard([&]() { CleanAndRemoveDirectory(m_IOWorkerPool, m_AbortFlag, m_PauseFlag, m_Options.TempDir); });
+
+ m_Progress.SetLogOperationProgress(PrepareBuildStep, StepCount);
+
+ m_PrepBuildResultFuture = m_NetworkPool.EnqueueTask(std::packaged_task<PrepareBuildResult()>{[this] { return PrepareBuild(); }},
+ WorkerThreadPool::EMode::EnableBacklog);
+
+ for (uint32_t PartIndex = 0; PartIndex < UploadParts.size(); PartIndex++)
+ {
+ const uint32_t PartStepOffset = UploadPartsStep + (PartIndex * uint32_t(PartTaskSteps::StepCount));
+
+ const UploadPart& Part = UploadParts[PartIndex];
+ UploadBuildPart(ChunkController, ChunkCache, PartIndex, Part, PartStepOffset, StepCount);
+ if (m_AbortFlag)
+ {
+ return {};
+ }
+ }
+
+ m_Progress.SetLogOperationProgress(FinalizeBuildStep, StepCount);
+
+ if (m_CreateBuild && !m_AbortFlag)
+ {
+ Stopwatch FinalizeBuildTimer;
+ m_Storage.BuildStorage->FinalizeBuild(m_BuildId);
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_INFO("FinalizeBuild took {}", NiceTimeSpanMs(FinalizeBuildTimer.GetElapsedTimeMs()));
+ }
+ }
+
+ m_Progress.SetLogOperationProgress(CleanupStep, StepCount);
+
+ std::vector<std::pair<Oid, std::string>> Result;
+ Result.reserve(UploadParts.size());
+ for (UploadPart& Part : UploadParts)
+ {
+ Result.push_back(std::make_pair(Part.PartId, Part.PartName));
+ }
+ return Result;
+ }
+ catch (const std::exception&)
+ {
+ m_AbortFlag = true;
+ throw;
+ }
+}
+
+bool
+BuildsOperationUploadFolder::IsAcceptedFolder(const std::string_view& RelativePath) const
+{
+ for (const std::string& ExcludeFolder : m_Options.ExcludeFolders)
+ {
+ if (RelativePath.starts_with(ExcludeFolder))
+ {
+ if (RelativePath.length() == ExcludeFolder.length())
+ {
+ return false;
+ }
+ else if (RelativePath[ExcludeFolder.length()] == '/')
+ {
+ return false;
+ }
+ }
+ }
+ return true;
+}
+
+bool
+BuildsOperationUploadFolder::IsAcceptedFile(const std::string_view& RelativePath) const
+{
+ if (RelativePath == m_Options.ZenExcludeManifestName)
+ {
+ return false;
+ }
+ for (const std::string& ExcludeExtension : m_Options.ExcludeExtensions)
+ {
+ if (RelativePath.ends_with(ExcludeExtension))
+ {
+ return false;
+ }
+ }
+ return true;
+}
+
+void
+BuildsOperationUploadFolder::ArrangeChunksIntoBlocks(const ChunkedFolderContent& Content,
+ const ChunkedContentLookup& Lookup,
+ std::vector<uint32_t>& ChunkIndexes,
+ std::vector<std::vector<uint32_t>>& OutBlocks)
+{
+ ZEN_TRACE_CPU("ArrangeChunksIntoBlocks");
+ std::sort(ChunkIndexes.begin(), ChunkIndexes.end(), [&Content, &Lookup](uint32_t Lhs, uint32_t Rhs) {
+ const ChunkedContentLookup::ChunkSequenceLocation& LhsLocation = GetChunkSequenceLocations(Lookup, Lhs)[0];
+ const ChunkedContentLookup::ChunkSequenceLocation& RhsLocation = GetChunkSequenceLocations(Lookup, Rhs)[0];
+ if (LhsLocation.SequenceIndex < RhsLocation.SequenceIndex)
+ {
+ return true;
+ }
+ else if (LhsLocation.SequenceIndex > RhsLocation.SequenceIndex)
+ {
+ return false;
+ }
+ return LhsLocation.Offset < RhsLocation.Offset;
+ });
+
+ uint64_t MaxBlockSizeLowThreshold = m_Options.BlockParameters.MaxBlockSize - (m_Options.BlockParameters.MaxBlockSize / 16);
+
+ uint64_t BlockSize = 0;
+
+ uint32_t ChunkIndexStart = 0;
+ for (uint32_t ChunkIndexOffset = 0; ChunkIndexOffset < ChunkIndexes.size();)
+ {
+ const uint32_t ChunkIndex = ChunkIndexes[ChunkIndexOffset];
+ const uint64_t ChunkSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex];
+
+ if (((BlockSize + ChunkSize) > m_Options.BlockParameters.MaxBlockSize) ||
+ (ChunkIndexOffset - ChunkIndexStart) > m_Options.BlockParameters.MaxChunksPerBlock)
+ {
+ // Within the span of MaxBlockSizeLowThreshold and MaxBlockSize, see if there is a break
+ // between source paths for chunks. Break the block at the last such break if any.
+ ZEN_ASSERT(ChunkIndexOffset > ChunkIndexStart);
+
+ const uint32_t ChunkSequenceIndex = Lookup.ChunkSequenceLocations[Lookup.ChunkSequenceLocationOffset[ChunkIndex]].SequenceIndex;
+
+ uint64_t ScanBlockSize = BlockSize;
+
+ uint32_t ScanChunkIndexOffset = ChunkIndexOffset - 1;
+ while (ScanChunkIndexOffset > (ChunkIndexStart + 2))
+ {
+ const uint32_t TestChunkIndex = ChunkIndexes[ScanChunkIndexOffset];
+ const uint64_t TestChunkSize = Content.ChunkedContent.ChunkRawSizes[TestChunkIndex];
+ if ((ScanBlockSize - TestChunkSize) < MaxBlockSizeLowThreshold)
+ {
+ break;
+ }
+
+ const uint32_t TestSequenceIndex =
+ Lookup.ChunkSequenceLocations[Lookup.ChunkSequenceLocationOffset[TestChunkIndex]].SequenceIndex;
+ if (ChunkSequenceIndex != TestSequenceIndex)
+ {
+ ChunkIndexOffset = ScanChunkIndexOffset + 1;
+ break;
+ }
+
+ ScanBlockSize -= TestChunkSize;
+ ScanChunkIndexOffset--;
+ }
+
+ std::vector<uint32_t> ChunksInBlock;
+ ChunksInBlock.reserve(ChunkIndexOffset - ChunkIndexStart);
+ for (uint32_t AddIndexOffset = ChunkIndexStart; AddIndexOffset < ChunkIndexOffset; AddIndexOffset++)
+ {
+ const uint32_t AddChunkIndex = ChunkIndexes[AddIndexOffset];
+ ChunksInBlock.push_back(AddChunkIndex);
+ }
+ OutBlocks.emplace_back(std::move(ChunksInBlock));
+ BlockSize = 0;
+ ChunkIndexStart = ChunkIndexOffset;
+ }
+ else
+ {
+ ChunkIndexOffset++;
+ BlockSize += ChunkSize;
+ }
+ }
+ if (ChunkIndexStart < ChunkIndexes.size())
+ {
+ std::vector<uint32_t> ChunksInBlock;
+ ChunksInBlock.reserve(ChunkIndexes.size() - ChunkIndexStart);
+ for (uint32_t AddIndexOffset = ChunkIndexStart; AddIndexOffset < ChunkIndexes.size(); AddIndexOffset++)
+ {
+ const uint32_t AddChunkIndex = ChunkIndexes[AddIndexOffset];
+ ChunksInBlock.push_back(AddChunkIndex);
+ }
+ OutBlocks.emplace_back(std::move(ChunksInBlock));
+ }
+}
+
+void
+BuildsOperationUploadFolder::GenerateBuildBlocks(const ChunkedFolderContent& Content,
+ const ChunkedContentLookup& Lookup,
+ const std::vector<std::vector<uint32_t>>& NewBlockChunks,
+ GeneratedBlocks& OutBlocks,
+ GenerateBlocksStatistics& GenerateBlocksStats,
+ UploadStatistics& UploadStats)
+{
+ ZEN_TRACE_CPU("GenerateBuildBlocks");
+ const std::size_t NewBlockCount = NewBlockChunks.size();
+ if (NewBlockCount == 0)
+ {
+ return;
+ }
+
+ std::unique_ptr<ProgressBase::ProgressBar> ProgressBar = m_Progress.CreateProgressBar("Generate Blocks");
+
+ OutBlocks.BlockDescriptions.resize(NewBlockCount);
+ OutBlocks.BlockSizes.resize(NewBlockCount);
+ OutBlocks.BlockMetaDatas.resize(NewBlockCount);
+ OutBlocks.BlockHeaders.resize(NewBlockCount);
+ OutBlocks.MetaDataHasBeenUploaded.resize(NewBlockCount, 0);
+ OutBlocks.BlockHashToBlockIndex.reserve(NewBlockCount);
+
+ RwLock Lock;
+ FilteredRate FilteredGeneratedBytesPerSecond;
+ FilteredRate FilteredUploadedBytesPerSecond;
+ ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ std::atomic<uint64_t> QueuedPendingBlocksForUpload = 0;
+
+ GenerateBuildBlocksContext Context{.Work = Work,
+ .GenerateBlobsPool = m_IOWorkerPool,
+ .UploadBlocksPool = m_NetworkPool,
+ .FilteredGeneratedBytesPerSecond = FilteredGeneratedBytesPerSecond,
+ .FilteredUploadedBytesPerSecond = FilteredUploadedBytesPerSecond,
+ .QueuedPendingBlocksForUpload = QueuedPendingBlocksForUpload,
+ .Lock = Lock,
+ .OutBlocks = OutBlocks,
+ .GenerateBlocksStats = GenerateBlocksStats,
+ .UploadStats = UploadStats,
+ .NewBlockCount = NewBlockCount};
+
+ ScheduleBlockGeneration(Context, Content, Lookup, NewBlockChunks);
+
+ Work.Wait(m_Progress.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
+
+ FilteredGeneratedBytesPerSecond.Update(GenerateBlocksStats.GeneratedBlockByteCount.load());
+ FilteredUploadedBytesPerSecond.Update(UploadStats.BlocksBytes.load());
+
+ std::string Details = fmt::format("Generated {}/{} ({}, {}B/s). Uploaded {}/{} ({}, {}bits/s)",
+ GenerateBlocksStats.GeneratedBlockCount.load(),
+ NewBlockCount,
+ NiceBytes(GenerateBlocksStats.GeneratedBlockByteCount.load()),
+ NiceNum(FilteredGeneratedBytesPerSecond.GetCurrent()),
+ UploadStats.BlockCount.load(),
+ NewBlockCount,
+ NiceBytes(UploadStats.BlocksBytes.load()),
+ NiceNum(FilteredUploadedBytesPerSecond.GetCurrent() * 8));
+
+ ProgressBar->UpdateState({.Task = "Generating blocks",
+ .Details = Details,
+ .TotalCount = gsl::narrow<uint64_t>(NewBlockCount),
+ .RemainingCount = gsl::narrow<uint64_t>(NewBlockCount - GenerateBlocksStats.GeneratedBlockCount.load()),
+ .Status = ProgressBase::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ });
+
+ ZEN_ASSERT(m_AbortFlag || QueuedPendingBlocksForUpload.load() == 0);
+
+ ProgressBar->Finish();
+
+ GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGeneratedBytesPerSecond.GetElapsedTimeUS();
+ UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS();
+}
+
+void
+BuildsOperationUploadFolder::ScheduleBlockGeneration(GenerateBuildBlocksContext& Context,
+ const ChunkedFolderContent& Content,
+ const ChunkedContentLookup& Lookup,
+ const std::vector<std::vector<uint32_t>>& NewBlockChunks)
+{
+ for (size_t BlockIndex = 0; BlockIndex < Context.NewBlockCount; BlockIndex++)
+ {
+ if (Context.Work.IsAborted())
+ {
+ break;
+ }
+ const std::vector<uint32_t>& ChunksInBlock = NewBlockChunks[BlockIndex];
+ Context.Work.ScheduleWork(
+ Context.GenerateBlobsPool,
+ [this, &Context, &Content, &Lookup, ChunksInBlock, BlockIndex](std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("GenerateBuildBlocks_Generate");
+
+ Context.FilteredGeneratedBytesPerSecond.Start();
+
+ Stopwatch GenerateTimer;
+ CompressedBuffer CompressedBlock =
+ GenerateBlock(Content, Lookup, ChunksInBlock, Context.OutBlocks.BlockDescriptions[BlockIndex]);
+ if (m_Options.IsVerbose)
+ {
+ ZEN_INFO("Generated block {} ({}) containing {} chunks in {}",
+ Context.OutBlocks.BlockDescriptions[BlockIndex].BlockHash,
+ NiceBytes(CompressedBlock.GetCompressedSize()),
+ Context.OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size(),
+ NiceTimeSpanMs(GenerateTimer.GetElapsedTimeMs()));
+ }
+
+ Context.OutBlocks.BlockSizes[BlockIndex] = CompressedBlock.GetCompressedSize();
+ {
+ CbObjectWriter Writer;
+ Writer.AddString("createdBy", "zen");
+ Context.OutBlocks.BlockMetaDatas[BlockIndex] = Writer.Save();
+ }
+ Context.GenerateBlocksStats.GeneratedBlockByteCount += Context.OutBlocks.BlockSizes[BlockIndex];
+ Context.GenerateBlocksStats.GeneratedBlockCount++;
+
+ Context.Lock.WithExclusiveLock([&]() {
+ Context.OutBlocks.BlockHashToBlockIndex.insert_or_assign(Context.OutBlocks.BlockDescriptions[BlockIndex].BlockHash,
+ BlockIndex);
+ });
+
+ {
+ std::span<const SharedBuffer> Segments = CompressedBlock.GetCompressed().GetSegments();
+ ZEN_ASSERT(Segments.size() >= 2);
+ Context.OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]);
+ }
+
+ if (Context.GenerateBlocksStats.GeneratedBlockCount == Context.NewBlockCount)
+ {
+ Context.FilteredGeneratedBytesPerSecond.Stop();
+ }
+
+ if (Context.QueuedPendingBlocksForUpload.load() > 16)
+ {
+ std::span<const SharedBuffer> Segments = CompressedBlock.GetCompressed().GetSegments();
+ ZEN_ASSERT(Segments.size() >= 2);
+ Context.OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]);
+ }
+ else
+ {
+ if (!m_AbortFlag)
+ {
+ Context.QueuedPendingBlocksForUpload++;
+ Context.Work.ScheduleWork(
+ Context.UploadBlocksPool,
+ [this, &Context, BlockIndex, Payload = std::move(CompressedBlock)](std::atomic<bool>&) mutable {
+ UploadGeneratedBlock(Context, BlockIndex, std::move(Payload));
+ });
+ }
+ }
+ }
+ });
+ }
+}
+
+void
+BuildsOperationUploadFolder::UploadGeneratedBlock(GenerateBuildBlocksContext& Context, size_t BlockIndex, CompressedBuffer Payload)
+{
+ auto _ = MakeGuard([&Context] { Context.QueuedPendingBlocksForUpload--; });
+ if (m_AbortFlag)
+ {
+ return;
+ }
+
+ if (Context.GenerateBlocksStats.GeneratedBlockCount == Context.NewBlockCount)
+ {
+ ZEN_TRACE_CPU("GenerateBuildBlocks_Save");
+
+ Context.FilteredUploadedBytesPerSecond.Stop();
+ std::span<const SharedBuffer> Segments = Payload.GetCompressed().GetSegments();
+ ZEN_ASSERT(Segments.size() >= 2);
+ Context.OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]);
+ return;
+ }
+
+ ZEN_TRACE_CPU("GenerateBuildBlocks_Upload");
+
+ Context.FilteredUploadedBytesPerSecond.Start();
+
+ const CbObject BlockMetaData =
+ BuildChunkBlockDescription(Context.OutBlocks.BlockDescriptions[BlockIndex], Context.OutBlocks.BlockMetaDatas[BlockIndex]);
+
+ const IoHash& BlockHash = Context.OutBlocks.BlockDescriptions[BlockIndex].BlockHash;
+ const uint64_t CompressedBlockSize = Payload.GetCompressedSize();
+
+ if (m_Storage.CacheStorage && m_Options.PopulateCache)
+ {
+ m_Storage.CacheStorage->PutBuildBlob(m_BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload.GetCompressed());
+ }
+
+ try
+ {
+ m_Storage.BuildStorage->PutBuildBlob(m_BuildId, BlockHash, ZenContentType::kCompressedBinary, std::move(Payload).GetCompressed());
+ }
+ catch (const std::exception&)
+ {
+ // Silence http errors due to abort
+ if (!m_AbortFlag)
+ {
+ throw;
+ }
+ }
+
+ if (m_AbortFlag)
+ {
+ return;
+ }
+
+ Context.UploadStats.BlocksBytes += CompressedBlockSize;
+
+ if (m_Options.IsVerbose)
+ {
+ ZEN_INFO("Uploaded block {} ({}) containing {} chunks",
+ BlockHash,
+ NiceBytes(CompressedBlockSize),
+ Context.OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size());
+ }
+
+ if (m_Storage.CacheStorage && m_Options.PopulateCache)
+ {
+ m_Storage.CacheStorage->PutBlobMetadatas(m_BuildId, std::vector<IoHash>({BlockHash}), std::vector<CbObject>({BlockMetaData}));
+ }
+
+ bool MetadataSucceeded = false;
+ try
+ {
+ MetadataSucceeded = m_Storage.BuildStorage->PutBlockMetadata(m_BuildId, BlockHash, BlockMetaData);
+ }
+ catch (const std::exception&)
+ {
+ // Silence http errors due to abort
+ if (!m_AbortFlag)
+ {
+ throw;
+ }
+ }
+
+ if (m_AbortFlag)
+ {
+ return;
+ }
+
+ if (MetadataSucceeded)
+ {
+ if (m_Options.IsVerbose)
+ {
+ ZEN_INFO("Uploaded block {} metadata ({})", BlockHash, NiceBytes(BlockMetaData.GetSize()));
+ }
+
+ Context.OutBlocks.MetaDataHasBeenUploaded[BlockIndex] = true;
+ Context.UploadStats.BlocksBytes += BlockMetaData.GetSize();
+ }
+
+ Context.UploadStats.BlockCount++;
+ if (Context.UploadStats.BlockCount == Context.NewBlockCount)
+ {
+ Context.FilteredUploadedBytesPerSecond.Stop();
+ }
+}
+
+std::vector<uint32_t>
+BuildsOperationUploadFolder::CalculateAbsoluteChunkOrders(
+ const std::span<const IoHash> LocalChunkHashes,
+ const std::span<const uint32_t> LocalChunkOrder,
+ const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& ChunkHashToLocalChunkIndex,
+ const std::span<const uint32_t>& LooseChunkIndexes,
+ const std::span<const ChunkBlockDescription>& BlockDescriptions)
+{
+ ZEN_TRACE_CPU("CalculateAbsoluteChunkOrders");
+
+ std::vector<IoHash> TmpAbsoluteChunkHashes;
+ if (m_Options.DoExtraContentValidation)
+ {
+ TmpAbsoluteChunkHashes.reserve(LocalChunkHashes.size());
+ }
+ std::vector<uint32_t> LocalChunkIndexToAbsoluteChunkIndex;
+ LocalChunkIndexToAbsoluteChunkIndex.resize(LocalChunkHashes.size(), (uint32_t)-1);
+ std::uint32_t AbsoluteChunkCount = 0;
+ for (uint32_t ChunkIndex : LooseChunkIndexes)
+ {
+ LocalChunkIndexToAbsoluteChunkIndex[ChunkIndex] = AbsoluteChunkCount;
+ if (m_Options.DoExtraContentValidation)
+ {
+ TmpAbsoluteChunkHashes.push_back(LocalChunkHashes[ChunkIndex]);
+ }
+ AbsoluteChunkCount++;
+ }
+ for (const ChunkBlockDescription& Block : BlockDescriptions)
+ {
+ for (const IoHash& ChunkHash : Block.ChunkRawHashes)
+ {
+ if (auto It = ChunkHashToLocalChunkIndex.find(ChunkHash); It != ChunkHashToLocalChunkIndex.end())
+ {
+ const uint32_t LocalChunkIndex = It->second;
+ ZEN_ASSERT_SLOW(LocalChunkHashes[LocalChunkIndex] == ChunkHash);
+ LocalChunkIndexToAbsoluteChunkIndex[LocalChunkIndex] = AbsoluteChunkCount;
+ }
+ if (m_Options.DoExtraContentValidation)
+ {
+ TmpAbsoluteChunkHashes.push_back(ChunkHash);
+ }
+ AbsoluteChunkCount++;
+ }
+ }
+ std::vector<uint32_t> AbsoluteChunkOrder;
+ AbsoluteChunkOrder.reserve(LocalChunkHashes.size());
+ for (const uint32_t LocalChunkIndex : LocalChunkOrder)
+ {
+ const uint32_t AbsoluteChunkIndex = LocalChunkIndexToAbsoluteChunkIndex[LocalChunkIndex];
+ if (m_Options.DoExtraContentValidation)
+ {
+ ZEN_ASSERT(LocalChunkHashes[LocalChunkIndex] == TmpAbsoluteChunkHashes[AbsoluteChunkIndex]);
+ }
+ AbsoluteChunkOrder.push_back(AbsoluteChunkIndex);
+ }
+ if (m_Options.DoExtraContentValidation)
+ {
+ uint32_t OrderIndex = 0;
+ while (OrderIndex < LocalChunkOrder.size())
+ {
+ const uint32_t LocalChunkIndex = LocalChunkOrder[OrderIndex];
+ const IoHash& LocalChunkHash = LocalChunkHashes[LocalChunkIndex];
+ const uint32_t AbsoluteChunkIndex = AbsoluteChunkOrder[OrderIndex];
+ const IoHash& AbsoluteChunkHash = TmpAbsoluteChunkHashes[AbsoluteChunkIndex];
+ ZEN_ASSERT(LocalChunkHash == AbsoluteChunkHash);
+ OrderIndex++;
+ }
+ }
+ return AbsoluteChunkOrder;
+}
+
+CompositeBuffer
+BuildsOperationUploadFolder::FetchChunk(const ChunkedFolderContent& Content,
+ const ChunkedContentLookup& Lookup,
+ const IoHash& ChunkHash,
+ ReadFileCache& OpenFileCache)
+{
+ ZEN_TRACE_CPU("FetchChunk");
+ auto It = Lookup.ChunkHashToChunkIndex.find(ChunkHash);
+ ZEN_ASSERT(It != Lookup.ChunkHashToChunkIndex.end());
+ uint32_t ChunkIndex = It->second;
+ std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkLocations = GetChunkSequenceLocations(Lookup, ChunkIndex);
+ ZEN_ASSERT(!ChunkLocations.empty());
+ CompositeBuffer Chunk =
+ OpenFileCache.GetRange(ChunkLocations[0].SequenceIndex, ChunkLocations[0].Offset, Content.ChunkedContent.ChunkRawSizes[ChunkIndex]);
+ if (!Chunk)
+ {
+ throw std::runtime_error(fmt::format("Unable to read chunk at {}, size {} from '{}'",
+ ChunkLocations[0].Offset,
+ Content.ChunkedContent.ChunkRawSizes[ChunkIndex],
+ Content.Paths[Lookup.SequenceIndexFirstPathIndex[ChunkLocations[0].SequenceIndex]]));
+ }
+ ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == ChunkHash);
+ return Chunk;
+};
+
+CompressedBuffer
+BuildsOperationUploadFolder::GenerateBlock(const ChunkedFolderContent& Content,
+ const ChunkedContentLookup& Lookup,
+ const std::vector<uint32_t>& ChunksInBlock,
+ ChunkBlockDescription& OutBlockDescription)
+{
+ ZEN_TRACE_CPU("GenerateBlock");
+ ReadFileCache OpenFileCache(m_DiskStats.OpenReadCount,
+ m_DiskStats.CurrentOpenFileCount,
+ m_DiskStats.ReadCount,
+ m_DiskStats.ReadByteCount,
+ m_Path,
+ Content,
+ Lookup,
+ 4);
+
+ std::vector<std::pair<IoHash, FetchChunkFunc>> BlockContent;
+ BlockContent.reserve(ChunksInBlock.size());
+ for (uint32_t ChunkIndex : ChunksInBlock)
+ {
+ BlockContent.emplace_back(std::make_pair(
+ Content.ChunkedContent.ChunkHashes[ChunkIndex],
+ [this, &Content, &Lookup, &OpenFileCache, ChunkIndex](const IoHash& ChunkHash) -> std::pair<uint64_t, CompositeBuffer> {
+ CompositeBuffer Chunk = FetchChunk(Content, Lookup, ChunkHash, OpenFileCache);
+ ZEN_ASSERT(Chunk);
+ uint64_t RawSize = Chunk.GetSize();
+
+ const bool ShouldCompressChunk = RawSize >= m_Options.MinimumSizeForCompressInBlock &&
+ IsChunkCompressable(m_NonCompressableExtensionHashes, Lookup, ChunkIndex);
+
+ const OodleCompressionLevel CompressionLevel =
+ ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None;
+ return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid, CompressionLevel).GetCompressed()};
+ }));
+ }
+
+ return GenerateChunkBlock(std::move(BlockContent), OutBlockDescription);
+};
+
+CompressedBuffer
+BuildsOperationUploadFolder::RebuildBlock(const ChunkedFolderContent& Content,
+ const ChunkedContentLookup& Lookup,
+ CompositeBuffer&& HeaderBuffer,
+ const std::vector<uint32_t>& ChunksInBlock)
+{
+ ZEN_TRACE_CPU("RebuildBlock");
+ ReadFileCache OpenFileCache(m_DiskStats.OpenReadCount,
+ m_DiskStats.CurrentOpenFileCount,
+ m_DiskStats.ReadCount,
+ m_DiskStats.ReadByteCount,
+ m_Path,
+ Content,
+ Lookup,
+ 4);
+
+ std::vector<SharedBuffer> ResultBuffers;
+ ResultBuffers.reserve(HeaderBuffer.GetSegments().size() + ChunksInBlock.size());
+ ResultBuffers.insert(ResultBuffers.end(), HeaderBuffer.GetSegments().begin(), HeaderBuffer.GetSegments().end());
+ for (uint32_t ChunkIndex : ChunksInBlock)
+ {
+ std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkLocations = GetChunkSequenceLocations(Lookup, ChunkIndex);
+ ZEN_ASSERT(!ChunkLocations.empty());
+ CompositeBuffer Chunk = OpenFileCache.GetRange(ChunkLocations[0].SequenceIndex,
+ ChunkLocations[0].Offset,
+ Content.ChunkedContent.ChunkRawSizes[ChunkIndex]);
+ ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == Content.ChunkedContent.ChunkHashes[ChunkIndex]);
+
+ const uint64_t RawSize = Chunk.GetSize();
+ const bool ShouldCompressChunk =
+ RawSize >= m_Options.MinimumSizeForCompressInBlock && IsChunkCompressable(m_NonCompressableExtensionHashes, Lookup, ChunkIndex);
+
+ const OodleCompressionLevel CompressionLevel = ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None;
+
+ CompositeBuffer CompressedChunk =
+ CompressedBuffer::Compress(std::move(Chunk), OodleCompressor::Mermaid, CompressionLevel).GetCompressed();
+ ResultBuffers.insert(ResultBuffers.end(), CompressedChunk.GetSegments().begin(), CompressedChunk.GetSegments().end());
+ }
+ return CompressedBuffer::FromCompressedNoValidate(CompositeBuffer(std::move(ResultBuffers)));
+};
+
+void
+BuildsOperationUploadFolder::UploadBuildPart(ChunkingController& ChunkController,
+ ChunkingCache& ChunkCache,
+ uint32_t PartIndex,
+ const UploadPart& Part,
+ uint32_t PartStepOffset,
+ uint32_t StepCount)
+{
+ Stopwatch UploadTimer;
+
+ ChunkingStatistics ChunkingStats;
+ FindBlocksStatistics FindBlocksStats;
+ ReuseBlocksStatistics ReuseBlocksStats;
+ UploadStatistics UploadStats;
+ GenerateBlocksStatistics GenerateBlocksStats;
+ LooseChunksStatistics LooseChunksStats;
+
+ m_Progress.SetLogOperationProgress(PartStepOffset + (uint32_t)PartTaskSteps::ChunkPartContent, StepCount);
+
+ ChunkedFolderContent LocalContent = ScanPartContent(Part, ChunkController, ChunkCache, ChunkingStats);
+ if (m_AbortFlag)
+ {
+ return;
+ }
+
+ const ChunkedContentLookup LocalLookup = BuildChunkedContentLookup(LocalContent);
+
+ if (PartIndex == 0)
+ {
+ ConsumePrepareBuildResult();
+ }
+
+ ZEN_ASSERT(m_PreferredMultipartChunkSize != 0);
+ ZEN_ASSERT(m_LargeAttachmentSize != 0);
+
+ m_Progress.SetLogOperationProgress(PartStepOffset + (uint32_t)PartTaskSteps::CalculateDelta, StepCount);
+
+ Stopwatch BlockArrangeTimer;
+
+ std::vector<uint32_t> LooseChunkIndexes;
+ std::vector<uint32_t> NewBlockChunkIndexes;
+ std::vector<size_t> ReuseBlockIndexes;
+ ClassifyChunksByBlockEligibility(LocalContent,
+ LooseChunkIndexes,
+ NewBlockChunkIndexes,
+ ReuseBlockIndexes,
+ LooseChunksStats,
+ FindBlocksStats,
+ ReuseBlocksStats);
+
+ std::vector<std::vector<uint32_t>> NewBlockChunks;
+ ArrangeChunksIntoBlocks(LocalContent, LocalLookup, NewBlockChunkIndexes, NewBlockChunks);
+
+ FindBlocksStats.NewBlocksCount += NewBlockChunks.size();
+ for (uint32_t ChunkIndex : NewBlockChunkIndexes)
+ {
+ FindBlocksStats.NewBlocksChunkByteCount += LocalContent.ChunkedContent.ChunkRawSizes[ChunkIndex];
+ }
+ FindBlocksStats.NewBlocksChunkCount += NewBlockChunkIndexes.size();
+
+ const double AcceptedByteCountPercent = FindBlocksStats.PotentialChunkByteCount > 0
+ ? (100.0 * ReuseBlocksStats.AcceptedRawByteCount / FindBlocksStats.PotentialChunkByteCount)
+ : 0.0;
+
+ const double AcceptedReduntantByteCountPercent =
+ ReuseBlocksStats.AcceptedByteCount > 0 ? (100.0 * ReuseBlocksStats.AcceptedReduntantByteCount) /
+ (ReuseBlocksStats.AcceptedByteCount + ReuseBlocksStats.AcceptedReduntantByteCount)
+ : 0.0;
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_INFO(
+ "Found {} chunks in {} ({}) blocks eligible for reuse in {}\n"
+ " Reusing {} ({}) matching chunks in {} blocks ({:.1f}%)\n"
+ " Accepting {} ({}) redundant chunks ({:.1f}%)\n"
+ " Rejected {} ({}) chunks in {} blocks\n"
+ " Arranged {} ({}) chunks in {} new blocks\n"
+ " Keeping {} ({}) chunks as loose chunks\n"
+ " Discovery completed in {}",
+ FindBlocksStats.FoundBlockChunkCount,
+ FindBlocksStats.FoundBlockCount,
+ NiceBytes(FindBlocksStats.FoundBlockByteCount),
+ NiceTimeSpanMs(FindBlocksStats.FindBlockTimeMS),
+
+ ReuseBlocksStats.AcceptedChunkCount,
+ NiceBytes(ReuseBlocksStats.AcceptedRawByteCount),
+ FindBlocksStats.AcceptedBlockCount,
+ AcceptedByteCountPercent,
+
+ ReuseBlocksStats.AcceptedReduntantChunkCount,
+ NiceBytes(ReuseBlocksStats.AcceptedReduntantByteCount),
+ AcceptedReduntantByteCountPercent,
+
+ ReuseBlocksStats.RejectedChunkCount,
+ NiceBytes(ReuseBlocksStats.RejectedByteCount),
+ ReuseBlocksStats.RejectedBlockCount,
+
+ FindBlocksStats.NewBlocksChunkCount,
+ NiceBytes(FindBlocksStats.NewBlocksChunkByteCount),
+ FindBlocksStats.NewBlocksCount,
+
+ LooseChunksStats.ChunkCount,
+ NiceBytes(LooseChunksStats.ChunkByteCount),
+
+ NiceTimeSpanMs(BlockArrangeTimer.GetElapsedTimeMs()));
+ }
+
+ m_Progress.SetLogOperationProgress(PartStepOffset + (uint32_t)PartTaskSteps::GenerateBlocks, StepCount);
+ GeneratedBlocks NewBlocks;
+
+ if (!NewBlockChunks.empty())
+ {
+ Stopwatch GenerateBuildBlocksTimer;
+ auto __ = MakeGuard([&]() {
+ uint64_t BlockGenerateTimeUs = GenerateBuildBlocksTimer.GetElapsedTimeUs();
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_INFO("Generated {} ({}) and uploaded {} ({}) blocks in {}. Generate speed: {}B/sec. Transfer speed {}bits/sec.",
+ GenerateBlocksStats.GeneratedBlockCount.load(),
+ NiceBytes(GenerateBlocksStats.GeneratedBlockByteCount),
+ UploadStats.BlockCount.load(),
+ NiceBytes(UploadStats.BlocksBytes.load()),
+ NiceTimeSpanMs(BlockGenerateTimeUs / 1000),
+ NiceNum(GetBytesPerSecond(GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS,
+ GenerateBlocksStats.GeneratedBlockByteCount)),
+ NiceNum(GetBytesPerSecond(UploadStats.ElapsedWallTimeUS, UploadStats.BlocksBytes * 8)));
+ }
+ });
+ GenerateBuildBlocks(LocalContent, LocalLookup, NewBlockChunks, NewBlocks, GenerateBlocksStats, UploadStats);
+ }
+
+ m_Progress.SetLogOperationProgress(PartStepOffset + (uint32_t)PartTaskSteps::BuildPartManifest, StepCount);
+
+ BuiltPartManifest Manifest =
+ BuildPartManifestObject(LocalContent, LocalLookup, ChunkController, ReuseBlockIndexes, NewBlocks, LooseChunkIndexes);
+
+ m_Progress.SetLogOperationProgress(PartStepOffset + (uint32_t)PartTaskSteps::UploadBuildPart, StepCount);
+
+ Stopwatch PutBuildPartResultTimer;
+ std::pair<IoHash, std::vector<IoHash>> PutBuildPartResult =
+ m_Storage.BuildStorage->PutBuildPart(m_BuildId, Part.PartId, Part.PartName, Manifest.PartManifest);
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_INFO("PutBuildPart took {}, payload size {}. {} attachments are needed.",
+ NiceTimeSpanMs(PutBuildPartResultTimer.GetElapsedTimeMs()),
+ NiceBytes(Manifest.PartManifest.GetSize()),
+ PutBuildPartResult.second.size());
+ }
+ IoHash PartHash = PutBuildPartResult.first;
+
+ m_Progress.SetLogOperationProgress(PartStepOffset + (uint32_t)PartTaskSteps::UploadAttachments, StepCount);
+
+ std::vector<IoHash> UnknownChunks;
+ if (m_Options.IgnoreExistingBlocks)
+ {
+ if (m_Options.IsVerbose)
+ {
+ ZEN_INFO("PutBuildPart uploading all attachments, needs are: {}", FormatArray<IoHash>(PutBuildPartResult.second, "\n "sv));
+ }
+
+ std::vector<IoHash> ForceUploadChunkHashes;
+ ForceUploadChunkHashes.reserve(LooseChunkIndexes.size());
+
+ for (uint32_t ChunkIndex : LooseChunkIndexes)
+ {
+ ForceUploadChunkHashes.push_back(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]);
+ }
+
+ for (size_t BlockIndex = 0; BlockIndex < NewBlocks.BlockHeaders.size(); BlockIndex++)
+ {
+ if (NewBlocks.BlockHeaders[BlockIndex])
+ {
+ // Block was not uploaded during generation
+ ForceUploadChunkHashes.push_back(NewBlocks.BlockDescriptions[BlockIndex].BlockHash);
+ }
+ }
+ UploadAttachmentBatch(ForceUploadChunkHashes,
+ UnknownChunks,
+ LocalContent,
+ LocalLookup,
+ NewBlockChunks,
+ NewBlocks,
+ LooseChunkIndexes,
+ UploadStats,
+ LooseChunksStats);
+ }
+ else if (!PutBuildPartResult.second.empty())
+ {
+ if (m_Options.IsVerbose)
+ {
+ ZEN_INFO("PutBuildPart needs attachments: {}", FormatArray<IoHash>(PutBuildPartResult.second, "\n "sv));
+ }
+ UploadAttachmentBatch(PutBuildPartResult.second,
+ UnknownChunks,
+ LocalContent,
+ LocalLookup,
+ NewBlockChunks,
+ NewBlocks,
+ LooseChunkIndexes,
+ UploadStats,
+ LooseChunksStats);
+ }
+
+ FinalizeBuildPartWithRetries(Part,
+ PartHash,
+ UnknownChunks,
+ LocalContent,
+ LocalLookup,
+ NewBlockChunks,
+ NewBlocks,
+ LooseChunkIndexes,
+ UploadStats,
+ LooseChunksStats);
+
+ if (!NewBlocks.BlockDescriptions.empty() && !m_AbortFlag)
+ {
+ UploadMissingBlockMetadata(NewBlocks, UploadStats);
+ // The newly generated blocks are now known blocks so the next part upload can use those blocks as well
+ m_KnownBlocks.insert(m_KnownBlocks.end(), NewBlocks.BlockDescriptions.begin(), NewBlocks.BlockDescriptions.end());
+ }
+
+ m_Progress.SetLogOperationProgress(PartStepOffset + (uint32_t)PartTaskSteps::PutBuildPartStats, StepCount);
+
+ m_Storage.BuildStorage->PutBuildPartStats(
+ m_BuildId,
+ Part.PartId,
+ {{"totalSize", double(Part.LocalFolderScanStats.FoundFileByteCount.load())},
+ {"reusedRatio", AcceptedByteCountPercent / 100.0},
+ {"reusedBlockCount", double(FindBlocksStats.AcceptedBlockCount)},
+ {"reusedBlockByteCount", double(ReuseBlocksStats.AcceptedRawByteCount)},
+ {"newBlockCount", double(FindBlocksStats.NewBlocksCount)},
+ {"newBlockByteCount", double(FindBlocksStats.NewBlocksChunkByteCount)},
+ {"uploadedCount", double(UploadStats.BlockCount.load() + UploadStats.ChunkCount.load())},
+ {"uploadedByteCount", double(UploadStats.BlocksBytes.load() + UploadStats.ChunksBytes.load())},
+ {"uploadedBytesPerSec",
+ double(GetBytesPerSecond(UploadStats.ElapsedWallTimeUS, UploadStats.ChunksBytes + UploadStats.BlocksBytes))},
+ {"elapsedTimeSec", double(UploadTimer.GetElapsedTimeMs() / 1000.0)}});
+
+ m_LocalFolderScanStats += Part.LocalFolderScanStats;
+ m_ChunkingStats += ChunkingStats;
+ m_FindBlocksStats += FindBlocksStats;
+ m_ReuseBlocksStats += ReuseBlocksStats;
+ m_UploadStats += UploadStats;
+ m_GenerateBlocksStats += GenerateBlocksStats;
+ m_LooseChunksStats += LooseChunksStats;
+}
+
+ChunkedFolderContent
+BuildsOperationUploadFolder::ScanPartContent(const UploadPart& Part,
+ ChunkingController& ChunkController,
+ ChunkingCache& ChunkCache,
+ ChunkingStatistics& ChunkingStats)
+{
+ Stopwatch ScanTimer;
+
+ std::unique_ptr<ProgressBase::ProgressBar> ProgressBar = m_Progress.CreateProgressBar("Scan Folder");
+
+ FilteredRate FilteredBytesHashed;
+ FilteredBytesHashed.Start();
+ ChunkedFolderContent LocalContent = ChunkFolderContent(
+ ChunkingStats,
+ m_IOWorkerPool,
+ m_Path,
+ Part.Content,
+ ChunkController,
+ ChunkCache,
+ m_Progress.GetProgressUpdateDelayMS(),
+ [&](bool IsAborted, bool IsPaused, std::ptrdiff_t) {
+ FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load());
+ std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found",
+ ChunkingStats.FilesProcessed.load(),
+ Part.Content.Paths.size(),
+ NiceBytes(ChunkingStats.BytesHashed.load()),
+ NiceBytes(Part.TotalRawSize),
+ NiceNum(FilteredBytesHashed.GetCurrent()),
+ ChunkingStats.UniqueChunksFound.load(),
+ NiceBytes(ChunkingStats.UniqueBytesFound.load()));
+ ProgressBar->UpdateState({.Task = "Scanning files ",
+ .Details = Details,
+ .TotalCount = Part.TotalRawSize,
+ .RemainingCount = Part.TotalRawSize - ChunkingStats.BytesHashed.load(),
+ .Status = ProgressBase::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ },
+ m_AbortFlag,
+ m_PauseFlag);
+ FilteredBytesHashed.Stop();
+ ProgressBar->Finish();
+ if (m_AbortFlag)
+ {
+ return LocalContent;
+ }
+
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_INFO("Found {} ({}) files divided into {} ({}) unique chunks in '{}' in {}. Average hash rate {}B/sec",
+ Part.Content.Paths.size(),
+ NiceBytes(Part.TotalRawSize),
+ ChunkingStats.UniqueChunksFound.load(),
+ NiceBytes(ChunkingStats.UniqueBytesFound.load()),
+ m_Path,
+ NiceTimeSpanMs(ScanTimer.GetElapsedTimeMs()),
+ NiceNum(GetBytesPerSecond(ChunkingStats.ElapsedWallTimeUS, ChunkingStats.BytesHashed)));
+ }
+
+ return LocalContent;
+}
+
+void
+BuildsOperationUploadFolder::ConsumePrepareBuildResult()
+{
+ const PrepareBuildResult PrepBuildResult = m_PrepBuildResultFuture.get();
+
+ m_FindBlocksStats.FindBlockTimeMS = PrepBuildResult.ElapsedTimeMs;
+ m_FindBlocksStats.FoundBlockCount = PrepBuildResult.KnownBlocks.size();
+
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_INFO("Build prepare took {}. {} took {}, payload size {}{}",
+ NiceTimeSpanMs(PrepBuildResult.ElapsedTimeMs),
+ m_CreateBuild ? "PutBuild" : "GetBuild",
+ NiceTimeSpanMs(PrepBuildResult.PrepareBuildTimeMs),
+ NiceBytes(PrepBuildResult.PayloadSize),
+ m_Options.IgnoreExistingBlocks ? ""
+ : fmt::format(". Found {} blocks in {}",
+ PrepBuildResult.KnownBlocks.size(),
+ NiceTimeSpanMs(PrepBuildResult.FindBlocksTimeMs)));
+ }
+
+ m_PreferredMultipartChunkSize = PrepBuildResult.PreferredMultipartChunkSize;
+ m_LargeAttachmentSize = m_Options.AllowMultiparts ? m_PreferredMultipartChunkSize * 4u : (std::uint64_t)-1;
+ m_KnownBlocks = std::move(PrepBuildResult.KnownBlocks);
+}
+
+void
+BuildsOperationUploadFolder::ClassifyChunksByBlockEligibility(const ChunkedFolderContent& LocalContent,
+ std::vector<uint32_t>& OutLooseChunkIndexes,
+ std::vector<uint32_t>& OutNewBlockChunkIndexes,
+ std::vector<size_t>& OutReuseBlockIndexes,
+ LooseChunksStatistics& LooseChunksStats,
+ FindBlocksStatistics& FindBlocksStats,
+ ReuseBlocksStatistics& ReuseBlocksStats)
+{
+ const bool EnableBlocks = true;
+ std::vector<std::uint32_t> BlockChunkIndexes;
+ for (uint32_t ChunkIndex = 0; ChunkIndex < LocalContent.ChunkedContent.ChunkHashes.size(); ChunkIndex++)
+ {
+ const uint64_t ChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[ChunkIndex];
+ if (!EnableBlocks || ChunkRawSize == 0 || ChunkRawSize > m_Options.BlockParameters.MaxChunkEmbedSize)
+ {
+ OutLooseChunkIndexes.push_back(ChunkIndex);
+ LooseChunksStats.ChunkByteCount += ChunkRawSize;
+ }
+ else
+ {
+ BlockChunkIndexes.push_back(ChunkIndex);
+ FindBlocksStats.PotentialChunkByteCount += ChunkRawSize;
+ }
+ }
+ FindBlocksStats.PotentialChunkCount += BlockChunkIndexes.size();
+ LooseChunksStats.ChunkCount = OutLooseChunkIndexes.size();
+
+ if (m_Options.IgnoreExistingBlocks)
+ {
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_INFO("Ignoring any existing blocks in store");
+ }
+ OutNewBlockChunkIndexes = std::move(BlockChunkIndexes);
+ return;
+ }
+
+ OutReuseBlockIndexes = FindReuseBlocks(Log(),
+ m_Options.BlockReuseMinPercentLimit,
+ m_Options.IsVerbose,
+ ReuseBlocksStats,
+ m_KnownBlocks,
+ LocalContent.ChunkedContent.ChunkHashes,
+ BlockChunkIndexes,
+ OutNewBlockChunkIndexes);
+ FindBlocksStats.AcceptedBlockCount += OutReuseBlockIndexes.size();
+
+ for (const ChunkBlockDescription& Description : m_KnownBlocks)
+ {
+ for (uint32_t ChunkRawLength : Description.ChunkRawLengths)
+ {
+ FindBlocksStats.FoundBlockByteCount += ChunkRawLength;
+ }
+ FindBlocksStats.FoundBlockChunkCount += Description.ChunkRawHashes.size();
+ }
+}
+
+BuildsOperationUploadFolder::BuiltPartManifest
+BuildsOperationUploadFolder::BuildPartManifestObject(const ChunkedFolderContent& LocalContent,
+ const ChunkedContentLookup& LocalLookup,
+ ChunkingController& ChunkController,
+ std::span<const size_t> ReuseBlockIndexes,
+ const GeneratedBlocks& NewBlocks,
+ std::span<const uint32_t> LooseChunkIndexes)
+{
+ BuiltPartManifest Result;
+
+ CbObjectWriter PartManifestWriter;
+ Stopwatch ManifestGenerationTimer;
+ auto __ = MakeGuard([&]() {
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_INFO("Generated build part manifest in {} ({})",
+ NiceTimeSpanMs(ManifestGenerationTimer.GetElapsedTimeMs()),
+ NiceBytes(PartManifestWriter.GetSaveSize()));
+ }
+ });
+
+ PartManifestWriter.BeginObject("chunker"sv);
+ {
+ PartManifestWriter.AddString("name"sv, ChunkController.GetName());
+ PartManifestWriter.AddObject("parameters"sv, ChunkController.GetParameters());
+ }
+ PartManifestWriter.EndObject(); // chunker
+
+ Result.AllChunkBlockHashes.reserve(ReuseBlockIndexes.size() + NewBlocks.BlockDescriptions.size());
+ Result.AllChunkBlockDescriptions.reserve(ReuseBlockIndexes.size() + NewBlocks.BlockDescriptions.size());
+ for (size_t ReuseBlockIndex : ReuseBlockIndexes)
+ {
+ Result.AllChunkBlockDescriptions.push_back(m_KnownBlocks[ReuseBlockIndex]);
+ Result.AllChunkBlockHashes.push_back(m_KnownBlocks[ReuseBlockIndex].BlockHash);
+ }
+ Result.AllChunkBlockDescriptions.insert(Result.AllChunkBlockDescriptions.end(),
+ NewBlocks.BlockDescriptions.begin(),
+ NewBlocks.BlockDescriptions.end());
+ for (const ChunkBlockDescription& BlockDescription : NewBlocks.BlockDescriptions)
+ {
+ Result.AllChunkBlockHashes.push_back(BlockDescription.BlockHash);
+ }
+
+ std::vector<IoHash> AbsoluteChunkHashes;
+ if (m_Options.DoExtraContentValidation)
+ {
+ tsl::robin_map<IoHash, size_t, IoHash::Hasher> ChunkHashToAbsoluteChunkIndex;
+ AbsoluteChunkHashes.reserve(LocalContent.ChunkedContent.ChunkHashes.size());
+ for (uint32_t ChunkIndex : LooseChunkIndexes)
+ {
+ ChunkHashToAbsoluteChunkIndex.insert({LocalContent.ChunkedContent.ChunkHashes[ChunkIndex], AbsoluteChunkHashes.size()});
+ AbsoluteChunkHashes.push_back(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]);
+ }
+ for (const ChunkBlockDescription& Block : Result.AllChunkBlockDescriptions)
+ {
+ for (const IoHash& ChunkHash : Block.ChunkRawHashes)
+ {
+ ChunkHashToAbsoluteChunkIndex.insert({ChunkHash, AbsoluteChunkHashes.size()});
+ AbsoluteChunkHashes.push_back(ChunkHash);
+ }
+ }
+ for (const IoHash& ChunkHash : LocalContent.ChunkedContent.ChunkHashes)
+ {
+ ZEN_ASSERT(AbsoluteChunkHashes[ChunkHashToAbsoluteChunkIndex.at(ChunkHash)] == ChunkHash);
+ ZEN_ASSERT(LocalContent.ChunkedContent.ChunkHashes[LocalLookup.ChunkHashToChunkIndex.at(ChunkHash)] == ChunkHash);
+ }
+ for (const uint32_t ChunkIndex : LocalContent.ChunkedContent.ChunkOrders)
+ {
+ ZEN_ASSERT(AbsoluteChunkHashes[ChunkHashToAbsoluteChunkIndex.at(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex])] ==
+ LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]);
+ ZEN_ASSERT(LocalLookup.ChunkHashToChunkIndex.at(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]) == ChunkIndex);
+ }
+ }
+
+ std::vector<uint32_t> AbsoluteChunkOrders = CalculateAbsoluteChunkOrders(LocalContent.ChunkedContent.ChunkHashes,
+ LocalContent.ChunkedContent.ChunkOrders,
+ LocalLookup.ChunkHashToChunkIndex,
+ LooseChunkIndexes,
+ Result.AllChunkBlockDescriptions);
+
+ if (m_Options.DoExtraContentValidation)
+ {
+ for (uint32_t ChunkOrderIndex = 0; ChunkOrderIndex < LocalContent.ChunkedContent.ChunkOrders.size(); ChunkOrderIndex++)
+ {
+ uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[ChunkOrderIndex];
+ uint32_t AbsoluteChunkIndex = AbsoluteChunkOrders[ChunkOrderIndex];
+ const IoHash& LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex];
+ const IoHash& AbsoluteChunkHash = AbsoluteChunkHashes[AbsoluteChunkIndex];
+ ZEN_ASSERT(LocalChunkHash == AbsoluteChunkHash);
+ }
+ }
+
+ WriteBuildContentToCompactBinary(PartManifestWriter,
+ LocalContent.Platform,
+ LocalContent.Paths,
+ LocalContent.RawHashes,
+ LocalContent.RawSizes,
+ LocalContent.Attributes,
+ LocalContent.ChunkedContent.SequenceRawHashes,
+ LocalContent.ChunkedContent.ChunkCounts,
+ LocalContent.ChunkedContent.ChunkHashes,
+ LocalContent.ChunkedContent.ChunkRawSizes,
+ AbsoluteChunkOrders,
+ LooseChunkIndexes,
+ Result.AllChunkBlockHashes);
+
+ if (m_Options.DoExtraContentValidation)
+ {
+ ChunkedFolderContent VerifyFolderContent;
+
+ std::vector<uint32_t> OutAbsoluteChunkOrders;
+ std::vector<IoHash> OutLooseChunkHashes;
+ std::vector<uint64_t> OutLooseChunkRawSizes;
+ std::vector<IoHash> OutBlockRawHashes;
+ ReadBuildContentFromCompactBinary(PartManifestWriter.Save(),
+ VerifyFolderContent.Platform,
+ VerifyFolderContent.Paths,
+ VerifyFolderContent.RawHashes,
+ VerifyFolderContent.RawSizes,
+ VerifyFolderContent.Attributes,
+ VerifyFolderContent.ChunkedContent.SequenceRawHashes,
+ VerifyFolderContent.ChunkedContent.ChunkCounts,
+ OutAbsoluteChunkOrders,
+ OutLooseChunkHashes,
+ OutLooseChunkRawSizes,
+ OutBlockRawHashes);
+ ZEN_ASSERT(OutBlockRawHashes == Result.AllChunkBlockHashes);
+
+ for (uint32_t OrderIndex = 0; OrderIndex < OutAbsoluteChunkOrders.size(); OrderIndex++)
+ {
+ uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[OrderIndex];
+ const IoHash LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex];
+
+ uint32_t VerifyChunkIndex = OutAbsoluteChunkOrders[OrderIndex];
+ const IoHash VerifyChunkHash = AbsoluteChunkHashes[VerifyChunkIndex];
+
+ ZEN_ASSERT(LocalChunkHash == VerifyChunkHash);
+ }
+
+ CalculateLocalChunkOrders(OutAbsoluteChunkOrders,
+ OutLooseChunkHashes,
+ OutLooseChunkRawSizes,
+ Result.AllChunkBlockDescriptions,
+ VerifyFolderContent.ChunkedContent.ChunkHashes,
+ VerifyFolderContent.ChunkedContent.ChunkRawSizes,
+ VerifyFolderContent.ChunkedContent.ChunkOrders,
+ m_Options.DoExtraContentValidation);
+
+ ZEN_ASSERT(LocalContent.Paths == VerifyFolderContent.Paths);
+ ZEN_ASSERT(LocalContent.RawHashes == VerifyFolderContent.RawHashes);
+ ZEN_ASSERT(LocalContent.RawSizes == VerifyFolderContent.RawSizes);
+ ZEN_ASSERT(LocalContent.Attributes == VerifyFolderContent.Attributes);
+ ZEN_ASSERT(LocalContent.ChunkedContent.SequenceRawHashes == VerifyFolderContent.ChunkedContent.SequenceRawHashes);
+ ZEN_ASSERT(LocalContent.ChunkedContent.ChunkCounts == VerifyFolderContent.ChunkedContent.ChunkCounts);
+
+ for (uint32_t OrderIndex = 0; OrderIndex < LocalContent.ChunkedContent.ChunkOrders.size(); OrderIndex++)
+ {
+ uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[OrderIndex];
+ const IoHash LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex];
+ uint64_t LocalChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[LocalChunkIndex];
+
+ uint32_t VerifyChunkIndex = VerifyFolderContent.ChunkedContent.ChunkOrders[OrderIndex];
+ const IoHash VerifyChunkHash = VerifyFolderContent.ChunkedContent.ChunkHashes[VerifyChunkIndex];
+ uint64_t VerifyChunkRawSize = VerifyFolderContent.ChunkedContent.ChunkRawSizes[VerifyChunkIndex];
+
+ ZEN_ASSERT(LocalChunkHash == VerifyChunkHash);
+ ZEN_ASSERT(LocalChunkRawSize == VerifyChunkRawSize);
+ }
+ }
+
+ Result.PartManifest = PartManifestWriter.Save();
+ return Result;
+}
+
+void
+BuildsOperationUploadFolder::UploadAttachmentBatch(std::span<IoHash> RawHashes,
+ std::vector<IoHash>& OutUnknownChunks,
+ const ChunkedFolderContent& LocalContent,
+ const ChunkedContentLookup& LocalLookup,
+ const std::vector<std::vector<uint32_t>>& NewBlockChunks,
+ GeneratedBlocks& NewBlocks,
+ std::span<const uint32_t> LooseChunkIndexes,
+ UploadStatistics& UploadStats,
+ LooseChunksStatistics& LooseChunksStats)
+{
+ if (m_AbortFlag)
+ {
+ return;
+ }
+
+ UploadStatistics TempUploadStats;
+ LooseChunksStatistics TempLooseChunksStats;
+
+ Stopwatch TempUploadTimer;
+ auto __ = MakeGuard([&]() {
+ if (!m_Options.IsQuiet)
+ {
+ uint64_t TempChunkUploadTimeUs = TempUploadTimer.GetElapsedTimeUs();
+ ZEN_INFO(
+ "Uploaded {} ({}) blocks. "
+ "Compressed {} ({} {}B/s) and uploaded {} ({}) chunks. "
+ "Transferred {} ({}bits/s) in {}",
+ TempUploadStats.BlockCount.load(),
+ NiceBytes(TempUploadStats.BlocksBytes),
+
+ TempLooseChunksStats.CompressedChunkCount.load(),
+ NiceBytes(TempLooseChunksStats.CompressedChunkBytes.load()),
+ NiceNum(GetBytesPerSecond(TempLooseChunksStats.CompressChunksElapsedWallTimeUS, TempLooseChunksStats.ChunkByteCount)),
+ TempUploadStats.ChunkCount.load(),
+ NiceBytes(TempUploadStats.ChunksBytes),
+
+ NiceBytes(TempUploadStats.BlocksBytes + TempUploadStats.ChunksBytes),
+ NiceNum(GetBytesPerSecond(TempUploadStats.ElapsedWallTimeUS, TempUploadStats.ChunksBytes * 8)),
+ NiceTimeSpanMs(TempChunkUploadTimeUs / 1000));
+ }
+ });
+ UploadPartBlobs(LocalContent,
+ LocalLookup,
+ RawHashes,
+ NewBlockChunks,
+ NewBlocks,
+ LooseChunkIndexes,
+ m_LargeAttachmentSize,
+ TempUploadStats,
+ TempLooseChunksStats,
+ OutUnknownChunks);
+ UploadStats += TempUploadStats;
+ LooseChunksStats += TempLooseChunksStats;
+}
+
+void
+BuildsOperationUploadFolder::FinalizeBuildPartWithRetries(const UploadPart& Part,
+ const IoHash& PartHash,
+ std::vector<IoHash>& InOutUnknownChunks,
+ const ChunkedFolderContent& LocalContent,
+ const ChunkedContentLookup& LocalLookup,
+ const std::vector<std::vector<uint32_t>>& NewBlockChunks,
+ GeneratedBlocks& NewBlocks,
+ std::span<const uint32_t> LooseChunkIndexes,
+ UploadStatistics& UploadStats,
+ LooseChunksStatistics& LooseChunksStats)
+{
+ auto BuildUnkownChunksResponse = [](const std::vector<IoHash>& UnknownChunks, bool WillRetry) {
+ return fmt::format(
+ "The following build blobs was reported as needed for upload but was reported as existing at the start of the "
+ "operation.{}{}",
+ WillRetry ? " Treating this as a transient inconsistency issue and will attempt to retry finalization."sv : ""sv,
+ FormatArray<IoHash>(UnknownChunks, "\n "sv));
+ };
+
+ if (!InOutUnknownChunks.empty())
+ {
+ ZEN_WARN("{}", BuildUnkownChunksResponse(InOutUnknownChunks, /*WillRetry*/ true));
+ }
+
+ uint32_t FinalizeBuildPartRetryCount = 5;
+ while (!m_AbortFlag && (FinalizeBuildPartRetryCount--) > 0)
+ {
+ Stopwatch FinalizeBuildPartTimer;
+ std::vector<IoHash> Needs = m_Storage.BuildStorage->FinalizeBuildPart(m_BuildId, Part.PartId, PartHash);
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_INFO("FinalizeBuildPart took {}. {} attachments are missing.",
+ NiceTimeSpanMs(FinalizeBuildPartTimer.GetElapsedTimeMs()),
+ Needs.size());
+ }
+ if (Needs.empty())
+ {
+ break;
+ }
+ if (m_Options.IsVerbose)
+ {
+ ZEN_INFO("FinalizeBuildPart needs attachments: {}", FormatArray<IoHash>(Needs, "\n "sv));
+ }
+
+ std::vector<IoHash> RetryUnknownChunks;
+ UploadAttachmentBatch(Needs,
+ RetryUnknownChunks,
+ LocalContent,
+ LocalLookup,
+ NewBlockChunks,
+ NewBlocks,
+ LooseChunkIndexes,
+ UploadStats,
+ LooseChunksStats);
+ if (RetryUnknownChunks == InOutUnknownChunks)
+ {
+ if (FinalizeBuildPartRetryCount > 0)
+ {
+ // Back off a bit
+ Sleep(1000);
+ }
+ }
+ else
+ {
+ InOutUnknownChunks = RetryUnknownChunks;
+ ZEN_WARN("{}", BuildUnkownChunksResponse(InOutUnknownChunks, /*WillRetry*/ FinalizeBuildPartRetryCount != 0));
+ }
+ }
+
+ if (!InOutUnknownChunks.empty())
+ {
+ throw std::runtime_error(BuildUnkownChunksResponse(InOutUnknownChunks, /*WillRetry*/ false));
+ }
+}
+
+void
+BuildsOperationUploadFolder::UploadMissingBlockMetadata(GeneratedBlocks& NewBlocks, UploadStatistics& UploadStats)
+{
+ uint64_t UploadBlockMetadataCount = 0;
+ Stopwatch UploadBlockMetadataTimer;
+
+ uint32_t FailedMetadataUploadCount = 1;
+ int32_t MetadataUploadRetryCount = 3;
+ while ((MetadataUploadRetryCount-- > 0) && (FailedMetadataUploadCount > 0))
+ {
+ FailedMetadataUploadCount = 0;
+ for (size_t BlockIndex = 0; BlockIndex < NewBlocks.BlockDescriptions.size(); BlockIndex++)
+ {
+ if (m_AbortFlag)
+ {
+ break;
+ }
+ const IoHash& BlockHash = NewBlocks.BlockDescriptions[BlockIndex].BlockHash;
+ if (!NewBlocks.MetaDataHasBeenUploaded[BlockIndex])
+ {
+ const CbObject BlockMetaData =
+ BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]);
+ if (m_Storage.CacheStorage && m_Options.PopulateCache)
+ {
+ m_Storage.CacheStorage->PutBlobMetadatas(m_BuildId,
+ std::vector<IoHash>({BlockHash}),
+ std::vector<CbObject>({BlockMetaData}));
+ }
+ bool MetadataSucceeded = m_Storage.BuildStorage->PutBlockMetadata(m_BuildId, BlockHash, BlockMetaData);
+ if (MetadataSucceeded)
+ {
+ UploadStats.BlocksBytes += BlockMetaData.GetSize();
+ NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true;
+ UploadBlockMetadataCount++;
+ }
+ else
+ {
+ FailedMetadataUploadCount++;
+ }
+ }
+ }
+ }
+ if (UploadBlockMetadataCount > 0)
+ {
+ uint64_t ElapsedUS = UploadBlockMetadataTimer.GetElapsedTimeUs();
+ UploadStats.ElapsedWallTimeUS += ElapsedUS;
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_INFO("Uploaded metadata for {} blocks in {}", UploadBlockMetadataCount, NiceTimeSpanMs(ElapsedUS / 1000));
+ }
+ }
+}
+
+void
+BuildsOperationUploadFolder::UploadPartBlobs(const ChunkedFolderContent& Content,
+ const ChunkedContentLookup& Lookup,
+ std::span<IoHash> RawHashes,
+ const std::vector<std::vector<uint32_t>>& NewBlockChunks,
+ GeneratedBlocks& NewBlocks,
+ std::span<const uint32_t> LooseChunkIndexes,
+ const std::uint64_t LargeAttachmentSize,
+ UploadStatistics& TempUploadStats,
+ LooseChunksStatistics& TempLooseChunksStats,
+ std::vector<IoHash>& OutUnknownChunks)
+{
+ ZEN_TRACE_CPU("UploadPartBlobs");
+
+ UploadPartClassification Classification =
+ ClassifyUploadRawHashes(RawHashes, Content, Lookup, NewBlocks, LooseChunkIndexes, OutUnknownChunks);
+
+ if (Classification.BlockIndexes.empty() && Classification.LooseChunkOrderIndexes.empty())
+ {
+ return;
+ }
+
+ std::unique_ptr<ProgressBase::ProgressBar> ProgressBar = m_Progress.CreateProgressBar("Upload Blobs");
+
+ FilteredRate FilteredGenerateBlockBytesPerSecond;
+ FilteredRate FilteredCompressedBytesPerSecond;
+ FilteredRate FilteredUploadedBytesPerSecond;
+
+ ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ std::atomic<size_t> UploadedBlockSize = 0;
+ std::atomic<size_t> UploadedBlockCount = 0;
+ std::atomic<size_t> UploadedRawChunkSize = 0;
+ std::atomic<size_t> UploadedCompressedChunkSize = 0;
+ std::atomic<uint32_t> UploadedChunkCount = 0;
+ std::atomic<uint64_t> GeneratedBlockCount = 0;
+ std::atomic<uint64_t> GeneratedBlockByteCount = 0;
+ std::atomic<uint64_t> QueuedPendingInMemoryBlocksForUpload = 0;
+
+ const size_t UploadBlockCount = Classification.BlockIndexes.size();
+ const uint32_t UploadChunkCount = gsl::narrow<uint32_t>(Classification.LooseChunkOrderIndexes.size());
+ const uint64_t TotalRawSize = Classification.TotalLooseChunksSize + Classification.TotalBlocksSize;
+
+ UploadPartBlobsContext Context{.Work = Work,
+ .ReadChunkPool = m_IOWorkerPool,
+ .UploadChunkPool = m_NetworkPool,
+ .FilteredGenerateBlockBytesPerSecond = FilteredGenerateBlockBytesPerSecond,
+ .FilteredCompressedBytesPerSecond = FilteredCompressedBytesPerSecond,
+ .FilteredUploadedBytesPerSecond = FilteredUploadedBytesPerSecond,
+ .UploadedBlockSize = UploadedBlockSize,
+ .UploadedBlockCount = UploadedBlockCount,
+ .UploadedRawChunkSize = UploadedRawChunkSize,
+ .UploadedCompressedChunkSize = UploadedCompressedChunkSize,
+ .UploadedChunkCount = UploadedChunkCount,
+ .GeneratedBlockCount = GeneratedBlockCount,
+ .GeneratedBlockByteCount = GeneratedBlockByteCount,
+ .QueuedPendingInMemoryBlocksForUpload = QueuedPendingInMemoryBlocksForUpload,
+ .UploadBlockCount = UploadBlockCount,
+ .UploadChunkCount = UploadChunkCount,
+ .LargeAttachmentSize = LargeAttachmentSize,
+ .NewBlocks = NewBlocks,
+ .Content = Content,
+ .Lookup = Lookup,
+ .NewBlockChunks = NewBlockChunks,
+ .LooseChunkIndexes = LooseChunkIndexes,
+ .TempUploadStats = TempUploadStats,
+ .TempLooseChunksStats = TempLooseChunksStats};
+
+ ScheduleBlockGenerationAndUpload(Context, Classification.BlockIndexes);
+ ScheduleLooseChunkCompressionAndUpload(Context, Classification.LooseChunkOrderIndexes);
+
+ Work.Wait(m_Progress.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
+ FilteredCompressedBytesPerSecond.Update(TempLooseChunksStats.CompressedChunkRawBytes.load());
+ FilteredGenerateBlockBytesPerSecond.Update(GeneratedBlockByteCount.load());
+ FilteredUploadedBytesPerSecond.Update(UploadedCompressedChunkSize.load() + UploadedBlockSize.load());
+ uint64_t UploadedRawSize = UploadedRawChunkSize.load() + UploadedBlockSize.load();
+ uint64_t UploadedCompressedSize = UploadedCompressedChunkSize.load() + UploadedBlockSize.load();
+
+ std::string Details = fmt::format(
+ "Compressed {}/{} ({}/{}{}) chunks. "
+ "Uploaded {}/{} ({}/{}) blobs "
+ "({}{})",
+ TempLooseChunksStats.CompressedChunkCount.load(),
+ Classification.LooseChunkOrderIndexes.size(),
+ NiceBytes(TempLooseChunksStats.CompressedChunkRawBytes),
+ NiceBytes(Classification.TotalLooseChunksSize),
+ (TempLooseChunksStats.CompressedChunkCount == Classification.LooseChunkOrderIndexes.size())
+ ? ""
+ : fmt::format(" {}B/s", NiceNum(FilteredCompressedBytesPerSecond.GetCurrent())),
+
+ UploadedBlockCount.load() + UploadedChunkCount.load(),
+ UploadBlockCount + UploadChunkCount,
+ NiceBytes(UploadedRawSize),
+ NiceBytes(TotalRawSize),
+
+ NiceBytes(UploadedCompressedSize),
+ (UploadedBlockCount == UploadBlockCount && UploadedChunkCount == UploadChunkCount)
+ ? ""
+ : fmt::format(" {}bits/s", NiceNum(FilteredUploadedBytesPerSecond.GetCurrent())));
+
+ ProgressBar->UpdateState({.Task = "Uploading blobs ",
+ .Details = Details,
+ .TotalCount = gsl::narrow<uint64_t>(TotalRawSize),
+ .RemainingCount = gsl::narrow<uint64_t>(TotalRawSize - UploadedRawSize),
+ .Status = ProgressBase::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ });
+
+ ZEN_ASSERT(m_AbortFlag || QueuedPendingInMemoryBlocksForUpload.load() == 0);
+
+ ProgressBar->Finish();
+
+ TempUploadStats.ElapsedWallTimeUS += FilteredUploadedBytesPerSecond.GetElapsedTimeUS();
+ TempLooseChunksStats.CompressChunksElapsedWallTimeUS += FilteredCompressedBytesPerSecond.GetElapsedTimeUS();
+}
+
+BuildsOperationUploadFolder::UploadPartClassification
+BuildsOperationUploadFolder::ClassifyUploadRawHashes(std::span<IoHash> RawHashes,
+ const ChunkedFolderContent& Content,
+ const ChunkedContentLookup& Lookup,
+ const GeneratedBlocks& NewBlocks,
+ std::span<const uint32_t> LooseChunkIndexes,
+ std::vector<IoHash>& OutUnknownChunks)
+{
+ UploadPartClassification Result;
+
+ tsl::robin_map<uint32_t, uint32_t> ChunkIndexToLooseChunkOrderIndex;
+ ChunkIndexToLooseChunkOrderIndex.reserve(LooseChunkIndexes.size());
+ for (uint32_t OrderIndex = 0; OrderIndex < LooseChunkIndexes.size(); OrderIndex++)
+ {
+ ChunkIndexToLooseChunkOrderIndex.insert_or_assign(LooseChunkIndexes[OrderIndex], OrderIndex);
+ }
+
+ for (const IoHash& RawHash : RawHashes)
+ {
+ if (auto It = NewBlocks.BlockHashToBlockIndex.find(RawHash); It != NewBlocks.BlockHashToBlockIndex.end())
+ {
+ Result.BlockIndexes.push_back(It->second);
+ Result.TotalBlocksSize += NewBlocks.BlockSizes[It->second];
+ }
+ else if (auto ChunkIndexIt = Lookup.ChunkHashToChunkIndex.find(RawHash); ChunkIndexIt != Lookup.ChunkHashToChunkIndex.end())
+ {
+ const uint32_t ChunkIndex = ChunkIndexIt->second;
+ if (auto LooseOrderIndexIt = ChunkIndexToLooseChunkOrderIndex.find(ChunkIndex);
+ LooseOrderIndexIt != ChunkIndexToLooseChunkOrderIndex.end())
+ {
+ Result.LooseChunkOrderIndexes.push_back(LooseOrderIndexIt->second);
+ Result.TotalLooseChunksSize += Content.ChunkedContent.ChunkRawSizes[ChunkIndex];
+ }
+ }
+ else
+ {
+ OutUnknownChunks.push_back(RawHash);
+ }
+ }
+ return Result;
+}
+
+void
+BuildsOperationUploadFolder::ScheduleBlockGenerationAndUpload(UploadPartBlobsContext& Context, std::span<const size_t> BlockIndexes)
+{
+ for (const size_t BlockIndex : BlockIndexes)
+ {
+ const IoHash& BlockHash = Context.NewBlocks.BlockDescriptions[BlockIndex].BlockHash;
+ if (m_AbortFlag)
+ {
+ break;
+ }
+ Context.Work.ScheduleWork(
+ Context.ReadChunkPool,
+ [this, &Context, BlockHash = IoHash(BlockHash), BlockIndex, GenerateBlockCount = BlockIndexes.size()](std::atomic<bool>&) {
+ if (m_AbortFlag)
+ {
+ return;
+ }
+ ZEN_TRACE_CPU("UploadPartBlobs_GenerateBlock");
+
+ Context.FilteredGenerateBlockBytesPerSecond.Start();
+
+ Stopwatch GenerateTimer;
+ CompositeBuffer Payload;
+ if (Context.NewBlocks.BlockHeaders[BlockIndex])
+ {
+ Payload = RebuildBlock(Context.Content,
+ Context.Lookup,
+ std::move(Context.NewBlocks.BlockHeaders[BlockIndex]),
+ Context.NewBlockChunks[BlockIndex])
+ .GetCompressed();
+ }
+ else
+ {
+ ChunkBlockDescription BlockDescription;
+ CompressedBuffer CompressedBlock =
+ GenerateBlock(Context.Content, Context.Lookup, Context.NewBlockChunks[BlockIndex], BlockDescription);
+ if (!CompressedBlock)
+ {
+ throw std::runtime_error(fmt::format("Failed generating block {}", BlockHash));
+ }
+ ZEN_ASSERT(BlockDescription.BlockHash == BlockHash);
+ Payload = std::move(CompressedBlock).GetCompressed();
+ }
+
+ Context.GeneratedBlockByteCount += Context.NewBlocks.BlockSizes[BlockIndex];
+ if (Context.GeneratedBlockCount.fetch_add(1) + 1 == GenerateBlockCount)
+ {
+ Context.FilteredGenerateBlockBytesPerSecond.Stop();
+ }
+ if (m_Options.IsVerbose)
+ {
+ ZEN_INFO("{} block {} ({}) containing {} chunks in {}",
+ Context.NewBlocks.BlockHeaders[BlockIndex] ? "Regenerated" : "Generated",
+ Context.NewBlocks.BlockDescriptions[BlockIndex].BlockHash,
+ NiceBytes(Context.NewBlocks.BlockSizes[BlockIndex]),
+ Context.NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size(),
+ NiceTimeSpanMs(GenerateTimer.GetElapsedTimeMs()));
+ }
+ if (!m_AbortFlag)
+ {
+ UploadBlockPayload(Context, BlockIndex, BlockHash, std::move(Payload));
+ }
+ });
+ }
+}
+
+void
+BuildsOperationUploadFolder::UploadBlockPayload(UploadPartBlobsContext& Context,
+ size_t BlockIndex,
+ const IoHash& BlockHash,
+ CompositeBuffer Payload)
+{
+ bool IsInMemoryBlock = true;
+ if (Context.QueuedPendingInMemoryBlocksForUpload.load() > 16)
+ {
+ ZEN_TRACE_CPU("AsyncUploadBlock_WriteTempBlock");
+ std::filesystem::path TempFilePath = m_Options.TempDir / (BlockHash.ToHexString());
+ Payload = CompositeBuffer(WriteToTempFile(std::move(Payload), TempFilePath));
+ IsInMemoryBlock = false;
+ }
+ else
+ {
+ Context.QueuedPendingInMemoryBlocksForUpload++;
+ }
+
+ Context.Work.ScheduleWork(
+ Context.UploadChunkPool,
+ [this, &Context, IsInMemoryBlock, BlockIndex, BlockHash = IoHash(BlockHash), Payload = CompositeBuffer(std::move(Payload))](
+ std::atomic<bool>&) {
+ auto _ = MakeGuard([IsInMemoryBlock, &Context] {
+ if (IsInMemoryBlock)
+ {
+ Context.QueuedPendingInMemoryBlocksForUpload--;
+ }
+ });
+ if (m_AbortFlag)
+ {
+ return;
+ }
+ ZEN_TRACE_CPU("AsyncUploadBlock");
+
+ const uint64_t PayloadSize = Payload.GetSize();
+
+ Context.FilteredUploadedBytesPerSecond.Start();
+ const CbObject BlockMetaData =
+ BuildChunkBlockDescription(Context.NewBlocks.BlockDescriptions[BlockIndex], Context.NewBlocks.BlockMetaDatas[BlockIndex]);
+
+ if (m_Storage.CacheStorage && m_Options.PopulateCache)
+ {
+ m_Storage.CacheStorage->PutBuildBlob(m_BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload);
+ }
+
+ try
+ {
+ m_Storage.BuildStorage->PutBuildBlob(m_BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload);
+ }
+ catch (const std::exception&)
+ {
+ // Silence http errors due to abort
+ if (!m_AbortFlag)
+ {
+ throw;
+ }
+ }
+
+ if (m_AbortFlag)
+ {
+ return;
+ }
+ if (m_Options.IsVerbose)
+ {
+ ZEN_INFO("Uploaded block {} ({}) containing {} chunks",
+ BlockHash,
+ NiceBytes(PayloadSize),
+ Context.NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size());
+ }
+ Context.UploadedBlockSize += PayloadSize;
+ Context.TempUploadStats.BlocksBytes += PayloadSize;
+
+ if (m_Storage.CacheStorage && m_Options.PopulateCache)
+ {
+ m_Storage.CacheStorage->PutBlobMetadatas(m_BuildId,
+ std::vector<IoHash>({BlockHash}),
+ std::vector<CbObject>({BlockMetaData}));
+ }
+
+ bool MetadataSucceeded = false;
+ try
+ {
+ MetadataSucceeded = m_Storage.BuildStorage->PutBlockMetadata(m_BuildId, BlockHash, BlockMetaData);
+ }
+ catch (const std::exception&)
+ {
+ // Silence http errors due to abort
+ if (!m_AbortFlag)
+ {
+ throw;
+ }
+ }
+ if (m_AbortFlag)
+ {
+ return;
+ }
+ if (MetadataSucceeded)
+ {
+ if (m_Options.IsVerbose)
+ {
+ ZEN_INFO("Uploaded block {} metadata ({})", BlockHash, NiceBytes(BlockMetaData.GetSize()));
+ }
+ Context.NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true;
+ Context.TempUploadStats.BlocksBytes += BlockMetaData.GetSize();
+ }
+
+ Context.TempUploadStats.BlockCount++;
+
+ if (Context.UploadedBlockCount.fetch_add(1) + 1 == Context.UploadBlockCount &&
+ Context.UploadedChunkCount == Context.UploadChunkCount)
+ {
+ Context.FilteredUploadedBytesPerSecond.Stop();
+ }
+ });
+}
+
+void
+BuildsOperationUploadFolder::ScheduleLooseChunkCompressionAndUpload(UploadPartBlobsContext& Context,
+ std::span<const uint32_t> LooseChunkOrderIndexes)
+{
+ for (const uint32_t LooseChunkOrderIndex : LooseChunkOrderIndexes)
+ {
+ const uint32_t ChunkIndex = Context.LooseChunkIndexes[LooseChunkOrderIndex];
+ Context.Work.ScheduleWork(Context.ReadChunkPool,
+ [this, &Context, LooseChunkOrderCount = LooseChunkOrderIndexes.size(), ChunkIndex](std::atomic<bool>&) {
+ if (m_AbortFlag)
+ {
+ return;
+ }
+ ZEN_TRACE_CPU("UploadPartBlobs_CompressChunk");
+
+ Context.FilteredCompressedBytesPerSecond.Start();
+ Stopwatch CompressTimer;
+ CompositeBuffer Payload =
+ CompressChunk(Context.Content, Context.Lookup, ChunkIndex, Context.TempLooseChunksStats);
+ if (m_Options.IsVerbose)
+ {
+ ZEN_INFO("Compressed chunk {} ({} -> {}) in {}",
+ Context.Content.ChunkedContent.ChunkHashes[ChunkIndex],
+ NiceBytes(Context.Content.ChunkedContent.ChunkRawSizes[ChunkIndex]),
+ NiceBytes(Payload.GetSize()),
+ NiceTimeSpanMs(CompressTimer.GetElapsedTimeMs()));
+ }
+ const uint64_t ChunkRawSize = Context.Content.ChunkedContent.ChunkRawSizes[ChunkIndex];
+ Context.TempUploadStats.ReadFromDiskBytes += ChunkRawSize;
+ if (Context.TempLooseChunksStats.CompressedChunkCount == LooseChunkOrderCount)
+ {
+ Context.FilteredCompressedBytesPerSecond.Stop();
+ }
+ if (!m_AbortFlag)
+ {
+ UploadLooseChunkPayload(Context,
+ Context.Content.ChunkedContent.ChunkHashes[ChunkIndex],
+ ChunkRawSize,
+ std::move(Payload));
+ }
+ });
+ }
+}
+
+void
+BuildsOperationUploadFolder::UploadLooseChunkPayload(UploadPartBlobsContext& Context,
+ const IoHash& RawHash,
+ uint64_t RawSize,
+ CompositeBuffer Payload)
+{
+ Context.Work.ScheduleWork(
+ Context.UploadChunkPool,
+ [this, &Context, RawHash = IoHash(RawHash), RawSize, Payload = CompositeBuffer(std::move(Payload))](std::atomic<bool>&) mutable {
+ if (m_AbortFlag)
+ {
+ return;
+ }
+ ZEN_TRACE_CPU("AsyncUploadLooseChunk");
+
+ const uint64_t PayloadSize = Payload.GetSize();
+
+ if (m_Storage.CacheStorage && m_Options.PopulateCache)
+ {
+ m_Storage.CacheStorage->PutBuildBlob(m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload);
+ }
+
+ if (PayloadSize >= Context.LargeAttachmentSize)
+ {
+ ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart");
+ Context.TempUploadStats.MultipartAttachmentCount++;
+ try
+ {
+ std::vector<std::function<void()>> MultipartWork = m_Storage.BuildStorage->PutLargeBuildBlob(
+ m_BuildId,
+ RawHash,
+ ZenContentType::kCompressedBinary,
+ PayloadSize,
+ [Payload = std::move(Payload), &Context](uint64_t Offset, uint64_t Size) -> IoBuffer {
+ Context.FilteredUploadedBytesPerSecond.Start();
+
+ IoBuffer PartPayload = Payload.Mid(Offset, Size).Flatten().AsIoBuffer();
+ PartPayload.SetContentType(ZenContentType::kBinary);
+ return PartPayload;
+ },
+ [&Context, RawSize](uint64_t SentBytes, bool IsComplete) {
+ Context.TempUploadStats.ChunksBytes += SentBytes;
+ Context.UploadedCompressedChunkSize += SentBytes;
+ if (IsComplete)
+ {
+ Context.TempUploadStats.ChunkCount++;
+ if (Context.UploadedChunkCount.fetch_add(1) + 1 == Context.UploadChunkCount &&
+ Context.UploadedBlockCount == Context.UploadBlockCount)
+ {
+ Context.FilteredUploadedBytesPerSecond.Stop();
+ }
+ Context.UploadedRawChunkSize += RawSize;
+ }
+ });
+ for (auto& WorkPart : MultipartWork)
+ {
+ Context.Work.ScheduleWork(Context.UploadChunkPool, [Work = std::move(WorkPart)](std::atomic<bool>& AbortFlag) {
+ ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart_Work");
+ if (!AbortFlag)
+ {
+ Work();
+ }
+ });
+ }
+ if (m_Options.IsVerbose)
+ {
+ ZEN_INFO("Uploaded multipart chunk {} ({})", RawHash, NiceBytes(PayloadSize));
+ }
+ }
+ catch (const std::exception&)
+ {
+ // Silence http errors due to abort
+ if (!m_AbortFlag)
+ {
+ throw;
+ }
+ }
+ return;
+ }
+
+ ZEN_TRACE_CPU("AsyncUploadLooseChunk_Singlepart");
+ try
+ {
+ m_Storage.BuildStorage->PutBuildBlob(m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload);
+ }
+ catch (const std::exception&)
+ {
+ // Silence http errors due to abort
+ if (!m_AbortFlag)
+ {
+ throw;
+ }
+ }
+ if (m_AbortFlag)
+ {
+ return;
+ }
+ if (m_Options.IsVerbose)
+ {
+ ZEN_INFO("Uploaded chunk {} ({})", RawHash, NiceBytes(PayloadSize));
+ }
+ Context.TempUploadStats.ChunksBytes += Payload.GetSize();
+ Context.TempUploadStats.ChunkCount++;
+ Context.UploadedCompressedChunkSize += Payload.GetSize();
+ Context.UploadedRawChunkSize += RawSize;
+ if (Context.UploadedChunkCount.fetch_add(1) + 1 == Context.UploadChunkCount &&
+ Context.UploadedBlockCount == Context.UploadBlockCount)
+ {
+ Context.FilteredUploadedBytesPerSecond.Stop();
+ }
+ });
+}
+
+CompositeBuffer
+BuildsOperationUploadFolder::CompressChunk(const ChunkedFolderContent& Content,
+ const ChunkedContentLookup& Lookup,
+ uint32_t ChunkIndex,
+ LooseChunksStatistics& TempLooseChunksStats)
+{
+ ZEN_TRACE_CPU("CompressChunk");
+ ZEN_ASSERT(!m_Options.TempDir.empty());
+ const IoHash& ChunkHash = Content.ChunkedContent.ChunkHashes[ChunkIndex];
+ const uint64_t ChunkSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex];
+
+ const ChunkedContentLookup::ChunkSequenceLocation& Source = GetChunkSequenceLocations(Lookup, ChunkIndex)[0];
+ const std::uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[Source.SequenceIndex];
+ IoBuffer RawSource = IoBufferBuilder::MakeFromFile((m_Path / Content.Paths[PathIndex]).make_preferred(), Source.Offset, ChunkSize);
+ if (!RawSource)
+ {
+ throw std::runtime_error(fmt::format("Failed fetching chunk {}", ChunkHash));
+ }
+ if (RawSource.GetSize() != ChunkSize)
+ {
+ throw std::runtime_error(fmt::format("Fetched chunk {} has invalid size", ChunkHash));
+ }
+
+ const bool ShouldCompressChunk = IsChunkCompressable(m_NonCompressableExtensionHashes, Lookup, ChunkIndex);
+ const OodleCompressionLevel CompressionLevel = ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None;
+
+ if (ShouldCompressChunk)
+ {
+ std::filesystem::path TempFilePath = m_Options.TempDir / ChunkHash.ToHexString();
+
+ BasicFile CompressedFile;
+ std::error_code Ec;
+ CompressedFile.Open(TempFilePath, BasicFile::Mode::kTruncateDelete, Ec);
+ if (Ec)
+ {
+ throw std::runtime_error(fmt::format("Failed creating temporary file for compressing blob {}, reason: ({}) {}",
+ ChunkHash,
+ Ec.value(),
+ Ec.message()));
+ }
+
+ uint64_t StreamRawBytes = 0;
+ uint64_t StreamCompressedBytes = 0;
+
+ bool CouldCompress = CompressedBuffer::CompressToStream(
+ CompositeBuffer(SharedBuffer(RawSource)),
+ [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
+ ZEN_UNUSED(SourceOffset);
+ TempLooseChunksStats.CompressedChunkRawBytes += SourceSize;
+ CompressedFile.Write(RangeBuffer, Offset);
+ TempLooseChunksStats.CompressedChunkBytes += RangeBuffer.GetSize();
+ StreamRawBytes += SourceSize;
+ StreamCompressedBytes += RangeBuffer.GetSize();
+ },
+ OodleCompressor::Mermaid,
+ CompressionLevel);
+ if (CouldCompress)
+ {
+ uint64_t CompressedSize = CompressedFile.FileSize();
+ void* FileHandle = CompressedFile.Detach();
+ IoBuffer TempPayload = IoBuffer(IoBuffer::File,
+ FileHandle,
+ 0,
+ CompressedSize,
+ /*IsWholeFile*/ true);
+ ZEN_ASSERT(TempPayload);
+ TempPayload.SetDeleteOnClose(true);
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(TempPayload), RawHash, RawSize);
+ ZEN_ASSERT(Compressed);
+ ZEN_ASSERT(RawHash == ChunkHash);
+ ZEN_ASSERT(RawSize == ChunkSize);
+
+ TempLooseChunksStats.CompressedChunkCount++;
+
+ return Compressed.GetCompressed();
+ }
+ else
+ {
+ TempLooseChunksStats.CompressedChunkRawBytes -= StreamRawBytes;
+ TempLooseChunksStats.CompressedChunkBytes -= StreamCompressedBytes;
+ }
+ CompressedFile.Close();
+ RemoveFile(TempFilePath, Ec);
+ ZEN_UNUSED(Ec);
+ }
+
+ CompressedBuffer CompressedBlob =
+ CompressedBuffer::Compress(SharedBuffer(std::move(RawSource)), OodleCompressor::Mermaid, CompressionLevel);
+ if (!CompressedBlob)
+ {
+ throw std::runtime_error(fmt::format("Failed to compress large blob {}", ChunkHash));
+ }
+ ZEN_ASSERT_SLOW(CompressedBlob.DecodeRawHash() == ChunkHash);
+ ZEN_ASSERT_SLOW(CompressedBlob.DecodeRawSize() == ChunkSize);
+
+ TempLooseChunksStats.CompressedChunkRawBytes += ChunkSize;
+ TempLooseChunksStats.CompressedChunkBytes += CompressedBlob.GetCompressedSize();
+
+ // If we use none-compression, the compressed blob references the data and has 64 kb in memory so we don't need to write it to disk
+ if (ShouldCompressChunk)
+ {
+ std::filesystem::path TempFilePath = m_Options.TempDir / (ChunkHash.ToHexString());
+ IoBuffer TempPayload = WriteToTempFile(std::move(CompressedBlob).GetCompressed(), TempFilePath);
+ CompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(TempPayload));
+ }
+
+ TempLooseChunksStats.CompressedChunkCount++;
+ return std::move(CompressedBlob).GetCompressed();
+}
+
+std::vector<std::pair<Oid, std::string>>
+UploadFolder(LoggerRef Log,
+ ProgressBase& Progress,
+ TransferThreadWorkers& Workers,
+ StorageInstance& Storage,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag,
+ const Oid& BuildId,
+ const Oid& BuildPartId,
+ std::string_view BuildPartName,
+ const std::filesystem::path& Path,
+ const std::filesystem::path& ManifestPath,
+ const CbObject& MetaData,
+ ChunkingController& ChunkController,
+ ChunkingCache& ChunkCache,
+ const UploadFolderOptions& Options)
+{
+ Progress.SetLogOperationName("Upload Folder");
+
+ Stopwatch UploadTimer;
+
+ BuildsOperationUploadFolder UploadOp(
+ Log,
+ Progress,
+ Storage,
+ AbortFlag,
+ PauseFlag,
+ Workers.GetIOWorkerPool(),
+ Workers.GetNetworkPool(),
+ BuildId,
+ Path,
+ Options.CreateBuild,
+ std::move(MetaData),
+ BuildsOperationUploadFolder::Options{.IsQuiet = Options.IsQuiet,
+ .IsVerbose = Options.IsVerbose,
+ .DoExtraContentValidation = Options.DoExtraContentVerify,
+ .FindBlockMaxCount = Options.FindBlockMaxCount,
+ .BlockReuseMinPercentLimit = Options.BlockReuseMinPercentLimit,
+ .AllowMultiparts = Options.AllowMultiparts,
+ .IgnoreExistingBlocks = Options.IgnoreExistingBlocks,
+ .TempDir = Options.TempDir,
+ .ExcludeFolders = Options.ExcludeFolders,
+ .ExcludeExtensions = Options.ExcludeExtensions,
+ .NonCompressableExtensions = DefaultSplitOnlyExtensions,
+ .PopulateCache = Options.UploadToZenCache});
+
+ std::vector<std::pair<Oid, std::string>> UploadedParts =
+ UploadOp.Execute(BuildPartId, BuildPartName, ManifestPath, ChunkController, ChunkCache);
+ if (AbortFlag)
+ {
+ return {};
+ }
+
+ if (Options.IsVerbose)
+ {
+ ZEN_CONSOLE(
+ "Folder scanning stats:"
+ "\n FoundFileCount: {}"
+ "\n FoundFileByteCount: {}"
+ "\n AcceptedFileCount: {}"
+ "\n AcceptedFileByteCount: {}"
+ "\n ElapsedWallTimeUS: {}",
+ UploadOp.m_LocalFolderScanStats.FoundFileCount.load(),
+ NiceBytes(UploadOp.m_LocalFolderScanStats.FoundFileByteCount.load()),
+ UploadOp.m_LocalFolderScanStats.AcceptedFileCount.load(),
+ NiceBytes(UploadOp.m_LocalFolderScanStats.AcceptedFileByteCount.load()),
+ NiceLatencyNs(UploadOp.m_LocalFolderScanStats.ElapsedWallTimeUS * 1000));
+
+ ZEN_CONSOLE(
+ "Chunking stats:"
+ "\n FilesProcessed: {}"
+ "\n FilesChunked: {}"
+ "\n BytesHashed: {}"
+ "\n UniqueChunksFound: {}"
+ "\n UniqueSequencesFound: {}"
+ "\n UniqueBytesFound: {}"
+ "\n FilesFoundInCache: {}"
+ "\n ChunksFoundInCache: {}"
+ "\n FilesStoredInCache: {}"
+ "\n ChunksStoredInCache: {}"
+ "\n ElapsedWallTimeUS: {}",
+ UploadOp.m_ChunkingStats.FilesProcessed.load(),
+ UploadOp.m_ChunkingStats.FilesChunked.load(),
+ NiceBytes(UploadOp.m_ChunkingStats.BytesHashed.load()),
+ UploadOp.m_ChunkingStats.UniqueChunksFound.load(),
+ UploadOp.m_ChunkingStats.UniqueSequencesFound.load(),
+ NiceBytes(UploadOp.m_ChunkingStats.UniqueBytesFound.load()),
+ UploadOp.m_ChunkingStats.FilesFoundInCache.load(),
+ UploadOp.m_ChunkingStats.ChunksFoundInCache.load(),
+ NiceBytes(UploadOp.m_ChunkingStats.BytesFoundInCache.load()),
+ UploadOp.m_ChunkingStats.FilesStoredInCache.load(),
+ UploadOp.m_ChunkingStats.ChunksStoredInCache.load(),
+ NiceBytes(UploadOp.m_ChunkingStats.BytesStoredInCache.load()),
+ NiceLatencyNs(UploadOp.m_ChunkingStats.ElapsedWallTimeUS * 1000));
+
+ ZEN_CONSOLE(
+ "Find block stats:"
+ "\n FindBlockTimeMS: {}"
+ "\n PotentialChunkCount: {}"
+ "\n PotentialChunkByteCount: {}"
+ "\n FoundBlockCount: {}"
+ "\n FoundBlockChunkCount: {}"
+ "\n FoundBlockByteCount: {}"
+ "\n AcceptedBlockCount: {}"
+ "\n NewBlocksCount: {}"
+ "\n NewBlocksChunkCount: {}"
+ "\n NewBlocksChunkByteCount: {}",
+ NiceTimeSpanMs(UploadOp.m_FindBlocksStats.FindBlockTimeMS),
+ UploadOp.m_FindBlocksStats.PotentialChunkCount,
+ NiceBytes(UploadOp.m_FindBlocksStats.PotentialChunkByteCount),
+ UploadOp.m_FindBlocksStats.FoundBlockCount,
+ UploadOp.m_FindBlocksStats.FoundBlockChunkCount,
+ NiceBytes(UploadOp.m_FindBlocksStats.FoundBlockByteCount),
+ UploadOp.m_FindBlocksStats.AcceptedBlockCount,
+ UploadOp.m_FindBlocksStats.NewBlocksCount,
+ UploadOp.m_FindBlocksStats.NewBlocksChunkCount,
+ NiceBytes(UploadOp.m_FindBlocksStats.NewBlocksChunkByteCount));
+
+ ZEN_CONSOLE(
+ "Reuse block stats:"
+ "\n AcceptedChunkCount: {}"
+ "\n AcceptedByteCount: {}"
+ "\n AcceptedRawByteCount: {}"
+ "\n RejectedBlockCount: {}"
+ "\n RejectedChunkCount: {}"
+ "\n RejectedByteCount: {}"
+ "\n AcceptedReduntantChunkCount: {}"
+ "\n AcceptedReduntantByteCount: {}",
+ UploadOp.m_ReuseBlocksStats.AcceptedChunkCount,
+ NiceBytes(UploadOp.m_ReuseBlocksStats.AcceptedByteCount),
+ NiceBytes(UploadOp.m_ReuseBlocksStats.AcceptedRawByteCount),
+ UploadOp.m_ReuseBlocksStats.RejectedBlockCount,
+ UploadOp.m_ReuseBlocksStats.RejectedChunkCount,
+ NiceBytes(UploadOp.m_ReuseBlocksStats.RejectedByteCount),
+ UploadOp.m_ReuseBlocksStats.AcceptedReduntantChunkCount,
+ NiceBytes(UploadOp.m_ReuseBlocksStats.AcceptedReduntantByteCount));
+
+ ZEN_CONSOLE(
+ "Generate blocks stats:"
+ "\n GeneratedBlockByteCount: {}"
+ "\n GeneratedBlockCount: {}"
+ "\n GenerateBlocksElapsedWallTimeUS: {}",
+ NiceBytes(UploadOp.m_GenerateBlocksStats.GeneratedBlockByteCount.load()),
+ UploadOp.m_GenerateBlocksStats.GeneratedBlockCount.load(),
+ NiceLatencyNs(UploadOp.m_GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS * 1000));
+
+ ZEN_CONSOLE(
+ "Loose chunks stats:"
+ "\n ChunkCount: {}"
+ "\n ChunkByteCount: {}"
+ "\n CompressedChunkCount: {}"
+ "\n CompressChunksElapsedWallTimeUS: {}",
+ UploadOp.m_LooseChunksStats.ChunkCount,
+ NiceBytes(UploadOp.m_LooseChunksStats.ChunkByteCount),
+ UploadOp.m_LooseChunksStats.CompressedChunkCount.load(),
+ NiceBytes(UploadOp.m_LooseChunksStats.CompressedChunkBytes.load()),
+ NiceLatencyNs(UploadOp.m_LooseChunksStats.CompressChunksElapsedWallTimeUS * 1000));
+
+ ZEN_CONSOLE(
+ "Disk stats:"
+ "\n OpenReadCount: {}"
+ "\n OpenWriteCount: {}"
+ "\n ReadCount: {}"
+ "\n ReadByteCount: {}"
+ "\n WriteCount: {} ({} cloned)"
+ "\n WriteByteCount: {} ({} cloned)"
+ "\n CurrentOpenFileCount: {}",
+ UploadOp.m_DiskStats.OpenReadCount.load(),
+ UploadOp.m_DiskStats.OpenWriteCount.load(),
+ UploadOp.m_DiskStats.ReadCount.load(),
+ NiceBytes(UploadOp.m_DiskStats.ReadByteCount.load()),
+ UploadOp.m_DiskStats.WriteCount.load(),
+ UploadOp.m_DiskStats.CloneCount.load(),
+ NiceBytes(UploadOp.m_DiskStats.WriteByteCount.load()),
+ NiceBytes(UploadOp.m_DiskStats.CloneByteCount.load()),
+ UploadOp.m_DiskStats.CurrentOpenFileCount.load());
+
+ ZEN_CONSOLE(
+ "Upload stats:"
+ "\n BlockCount: {}"
+ "\n BlocksBytes: {}"
+ "\n ChunkCount: {}"
+ "\n ChunksBytes: {}"
+ "\n ReadFromDiskBytes: {}"
+ "\n MultipartAttachmentCount: {}"
+ "\n ElapsedWallTimeUS: {}",
+ UploadOp.m_UploadStats.BlockCount.load(),
+ NiceBytes(UploadOp.m_UploadStats.BlocksBytes.load()),
+ UploadOp.m_UploadStats.ChunkCount.load(),
+ NiceBytes(UploadOp.m_UploadStats.ChunksBytes.load()),
+ NiceBytes(UploadOp.m_UploadStats.ReadFromDiskBytes.load()),
+ UploadOp.m_UploadStats.MultipartAttachmentCount.load(),
+ NiceLatencyNs(UploadOp.m_UploadStats.ElapsedWallTimeUS * 1000));
+ }
+
+ const double DeltaByteCountPercent =
+ UploadOp.m_ChunkingStats.BytesHashed > 0
+ ? (100.0 * (UploadOp.m_FindBlocksStats.NewBlocksChunkByteCount + UploadOp.m_LooseChunksStats.CompressedChunkBytes)) /
+ (UploadOp.m_ChunkingStats.BytesHashed)
+ : 0.0;
+
+ const std::string MultipartAttachmentStats =
+ Options.AllowMultiparts ? fmt::format(" ({} as multipart)", UploadOp.m_UploadStats.MultipartAttachmentCount.load()) : "";
+
+ if (!Options.IsQuiet)
+ {
+ ZEN_CONSOLE(
+ "Uploaded part {} ('{}') to build {}, {}\n"
+ " Scanned files: {:>8} ({}), {}B/sec, {}\n"
+ " New data: {:>8} ({}) {:.1f}%\n"
+ " New blocks: {:>8} ({} -> {}), {}B/sec, {}\n"
+ " New chunks: {:>8} ({} -> {}), {}B/sec, {}\n"
+ " Uploaded: {:>8} ({}), {}bits/sec, {}\n"
+ " Blocks: {:>8} ({})\n"
+ " Chunks: {:>8} ({}){}",
+ BuildPartId,
+ BuildPartName,
+ BuildId,
+ NiceTimeSpanMs(UploadTimer.GetElapsedTimeMs()),
+
+ UploadOp.m_LocalFolderScanStats.FoundFileCount.load(),
+ NiceBytes(UploadOp.m_LocalFolderScanStats.FoundFileByteCount.load()),
+ NiceNum(GetBytesPerSecond(UploadOp.m_ChunkingStats.ElapsedWallTimeUS, UploadOp.m_ChunkingStats.BytesHashed)),
+ NiceTimeSpanMs(UploadOp.m_ChunkingStats.ElapsedWallTimeUS / 1000),
+
+ UploadOp.m_FindBlocksStats.NewBlocksChunkCount + UploadOp.m_LooseChunksStats.CompressedChunkCount,
+ NiceBytes(UploadOp.m_FindBlocksStats.NewBlocksChunkByteCount + UploadOp.m_LooseChunksStats.CompressedChunkBytes),
+ DeltaByteCountPercent,
+
+ UploadOp.m_GenerateBlocksStats.GeneratedBlockCount.load(),
+ NiceBytes(UploadOp.m_FindBlocksStats.NewBlocksChunkByteCount),
+ NiceBytes(UploadOp.m_GenerateBlocksStats.GeneratedBlockByteCount.load()),
+ NiceNum(GetBytesPerSecond(UploadOp.m_GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS,
+ UploadOp.m_GenerateBlocksStats.GeneratedBlockByteCount)),
+ NiceTimeSpanMs(UploadOp.m_GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS / 1000),
+
+ UploadOp.m_LooseChunksStats.CompressedChunkCount.load(),
+ NiceBytes(UploadOp.m_LooseChunksStats.CompressedChunkRawBytes),
+ NiceBytes(UploadOp.m_LooseChunksStats.CompressedChunkBytes.load()),
+ NiceNum(GetBytesPerSecond(UploadOp.m_LooseChunksStats.CompressChunksElapsedWallTimeUS,
+ UploadOp.m_LooseChunksStats.CompressedChunkRawBytes)),
+ NiceTimeSpanMs(UploadOp.m_LooseChunksStats.CompressChunksElapsedWallTimeUS / 1000),
+
+ UploadOp.m_UploadStats.BlockCount.load() + UploadOp.m_UploadStats.ChunkCount.load(),
+ NiceBytes(UploadOp.m_UploadStats.BlocksBytes + UploadOp.m_UploadStats.ChunksBytes),
+ NiceNum(GetBytesPerSecond(UploadOp.m_UploadStats.ElapsedWallTimeUS,
+ (UploadOp.m_UploadStats.ChunksBytes + UploadOp.m_UploadStats.BlocksBytes) * 8)),
+ NiceTimeSpanMs(UploadOp.m_UploadStats.ElapsedWallTimeUS / 1000),
+
+ UploadOp.m_UploadStats.BlockCount.load(),
+ NiceBytes(UploadOp.m_UploadStats.BlocksBytes.load()),
+
+ UploadOp.m_UploadStats.ChunkCount.load(),
+ NiceBytes(UploadOp.m_UploadStats.ChunksBytes.load()),
+ MultipartAttachmentStats);
+ }
+ return UploadedParts;
+}
+
+} // namespace zen