aboutsummaryrefslogtreecommitdiff
path: root/src/zenremotestore/builds/buildstorageoperations.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-10-14 13:13:59 +0200
committerGitHub Enterprise <[email protected]>2025-10-14 13:13:59 +0200
commit9b7580230798d83d9bb36d40150913af69a13929 (patch)
tree73552ec1d3e9d955ce391cad894c637b74be91d4 /src/zenremotestore/builds/buildstorageoperations.cpp
parentmove all storage-related services into storage tree (#571) (diff)
downloadzen-9b7580230798d83d9bb36d40150913af69a13929.tar.xz
zen-9b7580230798d83d9bb36d40150913af69a13929.zip
refactor builds cmd part2 (#572)
* fix metadata info in filebuildstorage GetBuild * move MakeSafeAbsolutePathÍnPlace to filesystem.h/cpp * add BuildsOperationUploadFolder op moving code from builds_cmd.cpp
Diffstat (limited to 'src/zenremotestore/builds/buildstorageoperations.cpp')
-rw-r--r--src/zenremotestore/builds/buildstorageoperations.cpp2429
1 files changed, 2410 insertions, 19 deletions
diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp
index f3a585737..a4f4237cd 100644
--- a/src/zenremotestore/builds/buildstorageoperations.cpp
+++ b/src/zenremotestore/builds/buildstorageoperations.cpp
@@ -5,6 +5,7 @@
#include <zenremotestore/builds/buildstorage.h>
#include <zenremotestore/builds/buildstoragecache.h>
#include <zenremotestore/chunking/chunkblock.h>
+#include <zenremotestore/chunking/chunkingcontroller.h>
#include <zencore/basicfile.h>
#include <zencore/compactbinary.h>
@@ -25,6 +26,8 @@ ZEN_THIRD_PARTY_INCLUDES_START
#include <tsl/robin_set.h>
ZEN_THIRD_PARTY_INCLUDES_END
+#define EXTRA_VERIFY 0
+
namespace zen {
using namespace std::literals;
@@ -496,8 +499,118 @@ namespace {
}
}
+ const std::vector<uint32_t> NonCompressableExtensions({HashStringDjb2(".mp4"sv),
+ HashStringDjb2(".zip"sv),
+ HashStringDjb2(".7z"sv),
+ HashStringDjb2(".bzip"sv),
+ HashStringDjb2(".rar"sv),
+ HashStringDjb2(".gzip"sv),
+ HashStringDjb2(".apk"sv),
+ HashStringDjb2(".nsp"sv),
+ HashStringDjb2(".xvc"sv),
+ HashStringDjb2(".pkg"sv),
+ HashStringDjb2(".dmg"sv),
+ HashStringDjb2(".ipa"sv)});
+
+ const tsl::robin_set<uint32_t> NonCompressableExtensionSet(NonCompressableExtensions.begin(), NonCompressableExtensions.end());
+
+ bool IsExtensionHashCompressable(const uint32_t PathHash) { return !NonCompressableExtensionSet.contains(PathHash); }
+
+ bool IsChunkCompressable(const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, uint32_t ChunkIndex)
+ {
+ ZEN_UNUSED(Content);
+ const uint32_t ChunkLocationCount = Lookup.ChunkSequenceLocationCounts[ChunkIndex];
+ if (ChunkLocationCount == 0)
+ {
+ return false;
+ }
+ const size_t ChunkLocationOffset = Lookup.ChunkSequenceLocationOffset[ChunkIndex];
+ const uint32_t SequenceIndex = Lookup.ChunkSequenceLocations[ChunkLocationOffset].SequenceIndex;
+ const uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[SequenceIndex];
+ const uint32_t ExtensionHash = Lookup.PathExtensionHash[PathIndex];
+
+ const bool IsCompressable = IsExtensionHashCompressable(ExtensionHash);
+ return IsCompressable;
+ }
+
+ template<typename T>
+ std::string FormatArray(std::span<const T> Items, std::string_view Prefix)
+ {
+ ExtendableStringBuilder<512> SB;
+ for (const T& Item : Items)
+ {
+ SB.Append(fmt::format("{}{}", Prefix, Item));
+ }
+ return SB.ToString();
+ }
+
} // namespace
+class ReadFileCache
+{
+public:
+ // A buffered file reader that provides CompositeBuffer where the buffers are owned and the memory never overwritten
+ ReadFileCache(DiskStatistics& DiskStats,
+ const std::filesystem::path& Path,
+ const ChunkedFolderContent& LocalContent,
+ const ChunkedContentLookup& LocalLookup,
+ size_t MaxOpenFileCount)
+ : m_Path(Path)
+ , m_LocalContent(LocalContent)
+ , m_LocalLookup(LocalLookup)
+ , m_DiskStats(DiskStats)
+ {
+ m_OpenFiles.reserve(MaxOpenFileCount);
+ }
+ ~ReadFileCache() { m_OpenFiles.clear(); }
+
+ CompositeBuffer GetRange(uint32_t SequenceIndex, uint64_t Offset, uint64_t Size)
+ {
+ ZEN_TRACE_CPU("ReadFileCache::GetRange");
+
+ auto CacheIt =
+ std::find_if(m_OpenFiles.begin(), m_OpenFiles.end(), [SequenceIndex](const auto& Lhs) { return Lhs.first == SequenceIndex; });
+ if (CacheIt != m_OpenFiles.end())
+ {
+ if (CacheIt != m_OpenFiles.begin())
+ {
+ auto CachedFile(std::move(CacheIt->second));
+ m_OpenFiles.erase(CacheIt);
+ m_OpenFiles.insert(m_OpenFiles.begin(), std::make_pair(SequenceIndex, std::move(CachedFile)));
+ }
+ CompositeBuffer Result = m_OpenFiles.front().second->GetRange(Offset, Size);
+ return Result;
+ }
+ const uint32_t LocalPathIndex = m_LocalLookup.SequenceIndexFirstPathIndex[SequenceIndex];
+ const std::filesystem::path LocalFilePath = (m_Path / m_LocalContent.Paths[LocalPathIndex]).make_preferred();
+ if (Size == m_LocalContent.RawSizes[LocalPathIndex])
+ {
+ IoBuffer Result = IoBufferBuilder::MakeFromFile(LocalFilePath);
+ return CompositeBuffer(SharedBuffer(Result));
+ }
+ if (m_OpenFiles.size() == m_OpenFiles.capacity())
+ {
+ m_OpenFiles.pop_back();
+ }
+ m_OpenFiles.insert(m_OpenFiles.begin(),
+ std::make_pair(SequenceIndex,
+ std::make_unique<BufferedOpenFile>(LocalFilePath,
+ m_DiskStats.OpenReadCount,
+ m_DiskStats.CurrentOpenFileCount,
+ m_DiskStats.ReadCount,
+ m_DiskStats.ReadByteCount)));
+ CompositeBuffer Result = m_OpenFiles.front().second->GetRange(Offset, Size);
+ return Result;
+ }
+
+private:
+ const std::filesystem::path m_Path;
+ const ChunkedFolderContent& m_LocalContent;
+ const ChunkedContentLookup& m_LocalLookup;
+ std::vector<std::pair<uint32_t, std::unique_ptr<BufferedOpenFile>>> m_OpenFiles;
+ DiskStatistics& m_DiskStats;
+};
+
bool
ReadStateObject(BuildOpLogOutput& LogOutput,
CbObjectView StateView,
@@ -1118,6 +1231,8 @@ ZenTempFolderPath(const std::filesystem::path& ZenFolderPath)
return ZenFolderPath / "tmp";
}
+////////////////////// BuildsOperationUpdateFolder
+
BuildsOperationUpdateFolder::BuildsOperationUpdateFolder(BuildOpLogOutput& LogOutput,
StorageInstance& Storage,
std::atomic<bool>& AbortFlag,
@@ -1132,8 +1247,7 @@ BuildsOperationUpdateFolder::BuildsOperationUpdateFolder(BuildOpLogOutput&
const ChunkedContentLookup& RemoteLookup,
const std::vector<ChunkBlockDescription>& BlockDescriptions,
const std::vector<IoHash>& LooseChunkHashes,
- const Options& Options,
- DiskStatistics& DiskStats)
+ const Options& Options)
: m_LogOutput(LogOutput)
, m_Storage(Storage)
, m_AbortFlag(AbortFlag)
@@ -1149,7 +1263,6 @@ BuildsOperationUpdateFolder::BuildsOperationUpdateFolder(BuildOpLogOutput&
, m_BlockDescriptions(BlockDescriptions)
, m_LooseChunkHashes(LooseChunkHashes)
, m_Options(Options)
-, m_DiskStats(DiskStats)
, m_CacheFolderPath(ZenTempCacheFolderPath(m_Options.ZenFolderPath))
{
}
@@ -1159,9 +1272,22 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
{
ZEN_TRACE_CPU("BuildsOperationUpdateFolder::Execute");
+ enum TaskSteps : uint32_t
+ {
+ ScanExistingData,
+ WriteChunks,
+ PrepareTarget,
+ FinalizeTarget,
+ StepCount
+ };
+
+ auto EndProgress = MakeGuard([&]() { m_LogOutput.SetLogOperationProgress(TaskSteps::StepCount, TaskSteps::StepCount); });
+
ZEN_ASSERT((!m_Options.PrimeCacheOnly) ||
(m_Options.PrimeCacheOnly && (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Off)));
+ m_LogOutput.SetLogOperationProgress(TaskSteps::ScanExistingData, TaskSteps::StepCount);
+
Stopwatch IndexTimer;
if (!m_Options.IsQuiet)
@@ -1604,6 +1730,8 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
{
ZEN_TRACE_CPU("WriteChunks");
+ m_LogOutput.SetLogOperationProgress(TaskSteps::WriteChunks, TaskSteps::StepCount);
+
Stopwatch WriteTimer;
FilteredRate FilteredDownloadedBytesPerSecond;
@@ -2853,6 +2981,8 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
return;
}
+ m_LogOutput.SetLogOperationProgress(TaskSteps::PrepareTarget, TaskSteps::StepCount);
+
tsl::robin_map<uint32_t, uint32_t> RemotePathIndexToLocalPathIndex;
RemotePathIndexToLocalPathIndex.reserve(m_RemoteContent.Paths.size());
@@ -3031,13 +3161,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
Stopwatch Timer;
// Clean target folder
- if (!CleanDirectory(m_IOWorkerPool,
- m_AbortFlag,
- m_PauseFlag,
- m_LogOutput,
- m_Options.IsQuiet,
- m_Path,
- m_Options.DefaultExcludeFolders))
+ if (!CleanDirectory(m_IOWorkerPool, m_AbortFlag, m_PauseFlag, m_LogOutput, m_Options.IsQuiet, m_Path, m_Options.ExcludeFolders))
{
LOG_OUTPUT_WARN(m_LogOutput, "Some files in {} could not be removed", m_Path);
}
@@ -3051,6 +3175,9 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
{
ZEN_TRACE_CPU("FinalizeTree");
+
+ m_LogOutput.SetLogOperationProgress(TaskSteps::FinalizeTarget, TaskSteps::StepCount);
+
Stopwatch Timer;
std::unique_ptr<BuildOpLogOutput::ProgressBar> RebuildProgressBarPtr(m_LogOutput.CreateProgressBar("Rebuild State"));
@@ -3688,9 +3815,9 @@ BuildsOperationUpdateFolder::FindDownloadedChunk(const IoHash& ChunkHash)
}
FolderContent
-BuildsOperationUpdateFolder::GetValidFolderContent(GetFolderContentStatistics& LocalFolderScanStats,
- const std::filesystem::path& Path,
- std::span<const std::filesystem::path> PathsToCheck,
+BuildsOperationUpdateFolder::GetValidFolderContent(GetFolderContentStatistics& FolderScanStats,
+ const std::filesystem::path& Path,
+ std::span<const std::filesystem::path> PathsToCheck,
std::function<void(uint64_t PathCount, uint64_t CompletedPathCount)>&& ProgressCallback)
{
ZEN_TRACE_CPU("GetValidFolderContent");
@@ -3705,7 +3832,7 @@ BuildsOperationUpdateFolder::GetValidFolderContent(GetFolderContentStatistics&
{
Stopwatch Timer;
- auto _ = MakeGuard([&LocalFolderScanStats, &Timer]() { LocalFolderScanStats.ElapsedWallTimeUS = Timer.GetElapsedTimeUs(); });
+ auto _ = MakeGuard([&FolderScanStats, &Timer]() { FolderScanStats.ElapsedWallTimeUS = Timer.GetElapsedTimeUs(); });
ParallelWork Work(m_AbortFlag,
m_PauseFlag,
@@ -3717,7 +3844,7 @@ BuildsOperationUpdateFolder::GetValidFolderContent(GetFolderContentStatistics&
{
uint32_t PathRangeCount = Min(128u, PathCount - PathIndex);
Work.ScheduleWork(m_IOWorkerPool,
- [PathIndex, PathRangeCount, &PathsToCheck, &Path, &Result, &CompletedPathCount, &LocalFolderScanStats](
+ [PathIndex, PathRangeCount, &PathsToCheck, &Path, &Result, &CompletedPathCount, &FolderScanStats](
std::atomic<bool>& AbortFlag) {
if (!AbortFlag)
{
@@ -3734,10 +3861,10 @@ BuildsOperationUpdateFolder::GetValidFolderContent(GetFolderContentStatistics&
Result.Attributes[PathRangeIndex]))
{
Result.Paths[PathRangeIndex] = std::move(FilePath);
- LocalFolderScanStats.FoundFileCount++;
- LocalFolderScanStats.FoundFileByteCount += Result.RawSizes[PathRangeIndex];
- LocalFolderScanStats.AcceptedFileCount++;
- LocalFolderScanStats.AcceptedFileByteCount += Result.RawSizes[PathRangeIndex];
+ FolderScanStats.FoundFileCount++;
+ FolderScanStats.FoundFileByteCount += Result.RawSizes[PathRangeIndex];
+ FolderScanStats.AcceptedFileCount++;
+ FolderScanStats.AcceptedFileByteCount += Result.RawSizes[PathRangeIndex];
}
CompletedPathCount++;
}
@@ -4669,6 +4796,884 @@ BuildsOperationUpdateFolder::VerifySequence(uint32_t RemoteSequenceIndex)
}
}
+////////////////////// BuildsOperationUploadFolder
+
+BuildsOperationUploadFolder::BuildsOperationUploadFolder(BuildOpLogOutput& LogOutput,
+ StorageInstance& Storage,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag,
+ WorkerThreadPool& IOWorkerPool,
+ WorkerThreadPool& NetworkPool,
+ const Oid& BuildId,
+ const Oid& BuildPartId,
+ const std::string_view BuildPartName,
+ const std::filesystem::path& Path,
+ const std::filesystem::path& ManifestPath,
+ bool CreateBuild,
+ const CbObject& MetaData,
+ const Options& Options)
+: m_LogOutput(LogOutput)
+, m_Storage(Storage)
+, m_AbortFlag(AbortFlag)
+, m_PauseFlag(PauseFlag)
+, m_IOWorkerPool(IOWorkerPool)
+, m_NetworkPool(NetworkPool)
+, m_BuildId(BuildId)
+, m_BuildPartId(BuildPartId)
+, m_BuildPartName(BuildPartName)
+, m_Path(Path)
+, m_ManifestPath(ManifestPath)
+, m_CreateBuild(CreateBuild)
+, m_MetaData(MetaData)
+, m_Options(Options)
+{
+}
+
+void
+BuildsOperationUploadFolder::Execute()
+{
+ enum TaskSteps : uint32_t
+ {
+ PrepareBuild,
+ CalculateDelta,
+ Upload,
+ // Validate,
+ Cleanup,
+ StepCount
+ };
+
+ auto EndProgress = MakeGuard([&]() { m_LogOutput.SetLogOperationProgress(TaskSteps::StepCount, TaskSteps::StepCount); });
+
+ Stopwatch ProcessTimer;
+
+ CreateDirectories(m_Options.TempDir);
+ CleanDirectory(m_Options.TempDir, {});
+ auto _ = MakeGuard([&]() {
+ if (CleanDirectory(m_Options.TempDir, {}))
+ {
+ std::error_code DummyEc;
+ RemoveDir(m_Options.TempDir, DummyEc);
+ }
+ });
+
+ m_LogOutput.SetLogOperationProgress(TaskSteps::PrepareBuild, TaskSteps::StepCount);
+
+ std::uint64_t TotalRawSize = 0;
+
+ CbObject ChunkerParameters;
+
+ struct PrepareBuildResult
+ {
+ std::vector<ChunkBlockDescription> KnownBlocks;
+ uint64_t PreferredMultipartChunkSize = 0;
+ uint64_t PayloadSize = 0;
+ uint64_t PrepareBuildTimeMs = 0;
+ uint64_t FindBlocksTimeMs = 0;
+ uint64_t ElapsedTimeMs = 0;
+ };
+
+ std::future<PrepareBuildResult> PrepBuildResultFuture = m_NetworkPool.EnqueueTask(
+ std::packaged_task<PrepareBuildResult()>{[this] {
+ ZEN_TRACE_CPU("PrepareBuild");
+
+ PrepareBuildResult Result;
+ Result.PreferredMultipartChunkSize = m_Options.PreferredMultipartChunkSize;
+ Stopwatch Timer;
+ if (m_CreateBuild)
+ {
+ ZEN_TRACE_CPU("CreateBuild");
+
+ Stopwatch PutBuildTimer;
+ CbObject PutBuildResult = m_Storage.BuildStorage->PutBuild(m_BuildId, m_MetaData);
+ Result.PrepareBuildTimeMs = PutBuildTimer.GetElapsedTimeMs();
+ Result.PreferredMultipartChunkSize = PutBuildResult["chunkSize"sv].AsUInt64(Result.PreferredMultipartChunkSize);
+ Result.PayloadSize = m_MetaData.GetSize();
+ }
+ else
+ {
+ ZEN_TRACE_CPU("PutBuild");
+ Stopwatch GetBuildTimer;
+ CbObject Build = m_Storage.BuildStorage->GetBuild(m_BuildId);
+ Result.PrepareBuildTimeMs = GetBuildTimer.GetElapsedTimeMs();
+ Result.PayloadSize = Build.GetSize();
+ if (auto ChunkSize = Build["chunkSize"sv].AsUInt64(); ChunkSize != 0)
+ {
+ Result.PreferredMultipartChunkSize = ChunkSize;
+ }
+ else if (m_Options.AllowMultiparts)
+ {
+ LOG_OUTPUT_WARN(m_LogOutput,
+ "PreferredMultipartChunkSize is unknown. Defaulting to '{}'",
+ NiceBytes(Result.PreferredMultipartChunkSize));
+ }
+ }
+
+ if (!m_Options.IgnoreExistingBlocks)
+ {
+ ZEN_TRACE_CPU("FindBlocks");
+ Stopwatch KnownBlocksTimer;
+ CbObject BlockDescriptionList = m_Storage.BuildStorage->FindBlocks(m_BuildId, m_Options.FindBlockMaxCount);
+ if (BlockDescriptionList)
+ {
+ Result.KnownBlocks = ParseChunkBlockDescriptionList(BlockDescriptionList);
+ }
+ m_FindBlocksStats.FindBlockTimeMS = KnownBlocksTimer.GetElapsedTimeMs();
+ m_FindBlocksStats.FoundBlockCount = Result.KnownBlocks.size();
+ Result.FindBlocksTimeMs = KnownBlocksTimer.GetElapsedTimeMs();
+ }
+ Result.ElapsedTimeMs = Timer.GetElapsedTimeMs();
+ return Result;
+ }},
+ WorkerThreadPool::EMode::EnableBacklog);
+
+ ChunkedFolderContent LocalContent;
+
+ {
+ auto IsAcceptedFolder = [this](const std::string_view& RelativePath) -> bool {
+ for (const std::string& ExcludeFolder : m_Options.ExcludeFolders)
+ {
+ if (RelativePath.starts_with(ExcludeFolder))
+ {
+ if (RelativePath.length() == ExcludeFolder.length())
+ {
+ return false;
+ }
+ else if (RelativePath[ExcludeFolder.length()] == '/')
+ {
+ return false;
+ }
+ }
+ }
+ return true;
+ };
+
+ auto IsAcceptedFile = [this](const std::string_view& RelativePath, uint64_t, uint32_t) -> bool {
+ for (const std::string& ExcludeExtension : m_Options.ExcludeExtensions)
+ {
+ if (RelativePath.ends_with(ExcludeExtension))
+ {
+ return false;
+ }
+ }
+ return true;
+ };
+
+ auto ParseManifest = [](const std::filesystem::path& Path,
+ const std::filesystem::path& ManifestPath) -> std::vector<std::filesystem::path> {
+ std::vector<std::filesystem::path> AssetPaths;
+ std::filesystem::path AbsoluteManifestPath =
+ MakeSafeAbsolutePath(ManifestPath.is_absolute() ? ManifestPath : Path / ManifestPath);
+ IoBuffer ManifestContent = ReadFile(AbsoluteManifestPath).Flatten();
+ std::string_view ManifestString((const char*)ManifestContent.GetView().GetData(), ManifestContent.GetSize());
+ std::string_view::size_type Offset = 0;
+ while (Offset < ManifestContent.GetSize())
+ {
+ size_t PathBreakOffset = ManifestString.find_first_of("\t\r\n", Offset);
+ if (PathBreakOffset == std::string_view::npos)
+ {
+ PathBreakOffset = ManifestContent.GetSize();
+ }
+ std::string_view AssetPath = ManifestString.substr(Offset, PathBreakOffset - Offset);
+ if (!AssetPath.empty())
+ {
+ AssetPaths.emplace_back(std::filesystem::path(AssetPath));
+ }
+ Offset = PathBreakOffset;
+ size_t EolOffset = ManifestString.find_first_of("\r\n", Offset);
+ if (EolOffset == std::string_view::npos)
+ {
+ break;
+ }
+ Offset = EolOffset;
+ size_t LineBreakOffset = ManifestString.find_first_not_of("\t\r\n", Offset);
+ if (LineBreakOffset == std::string_view::npos)
+ {
+ break;
+ }
+ Offset = LineBreakOffset;
+ }
+ return AssetPaths;
+ };
+
+ Stopwatch ScanTimer;
+ FolderContent Content;
+ if (m_ManifestPath.empty())
+ {
+ std::filesystem::path ExcludeManifestPath = m_Path / m_Options.ZenExcludeManifestName;
+ tsl::robin_set<std::string> ExcludeAssetPaths;
+ if (IsFile(ExcludeManifestPath))
+ {
+ std::vector<std::filesystem::path> AssetPaths = ParseManifest(m_Path, ExcludeManifestPath);
+ ExcludeAssetPaths.reserve(AssetPaths.size());
+ for (const std::filesystem::path& AssetPath : AssetPaths)
+ {
+ ExcludeAssetPaths.insert(AssetPath.generic_string());
+ }
+ }
+ Content = GetFolderContent(
+ m_LocalFolderScanStats,
+ m_Path,
+ std::move(IsAcceptedFolder),
+ [this, &IsAcceptedFile, &ExcludeAssetPaths](const std::string_view& RelativePath,
+ uint64_t Size,
+ uint32_t Attributes) -> bool {
+ if (RelativePath == m_Options.ZenExcludeManifestName)
+ {
+ return false;
+ }
+ if (!IsAcceptedFile(RelativePath, Size, Attributes))
+ {
+ return false;
+ }
+ if (ExcludeAssetPaths.contains(std::filesystem::path(RelativePath).generic_string()))
+ {
+ return false;
+ }
+ return true;
+ },
+ m_IOWorkerPool,
+ m_LogOutput.GetProgressUpdateDelayMS(),
+ [&](bool, std::ptrdiff_t) {
+ LOG_OUTPUT(m_LogOutput, "Found {} files in '{}'...", m_LocalFolderScanStats.AcceptedFileCount.load(), m_Path);
+ },
+ m_AbortFlag);
+ }
+ else
+ {
+ Stopwatch ManifestParseTimer;
+ std::vector<std::filesystem::path> AssetPaths = ParseManifest(m_Path, m_ManifestPath);
+ for (const std::filesystem::path& AssetPath : AssetPaths)
+ {
+ Content.Paths.push_back(AssetPath);
+ const std::filesystem::path AssetFilePath = (m_Path / AssetPath).make_preferred();
+ Content.RawSizes.push_back(FileSizeFromPath(AssetFilePath));
+#if ZEN_PLATFORM_WINDOWS
+ Content.Attributes.push_back(GetFileAttributesFromPath(AssetFilePath));
+#endif // ZEN_PLATFORM_WINDOWS
+#if ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX
+ Content.Attributes.push_back(GetFileMode(AssetFilePath));
+#endif // ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX
+ m_LocalFolderScanStats.AcceptedFileByteCount += Content.RawSizes.back();
+ m_LocalFolderScanStats.AcceptedFileCount++;
+ }
+ if (m_ManifestPath.is_relative())
+ {
+ Content.Paths.push_back(m_ManifestPath);
+ const std::filesystem::path ManifestFilePath = (m_Path / m_ManifestPath).make_preferred();
+ Content.RawSizes.push_back(FileSizeFromPath(ManifestFilePath));
+#if ZEN_PLATFORM_WINDOWS
+ Content.Attributes.push_back(GetFileAttributesFromPath(ManifestFilePath));
+#endif // ZEN_PLATFORM_WINDOWS
+#if ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX
+ Content.Attributes.push_back(GetFileMode(ManifestFilePath));
+#endif // ZEN_PLATFORM_MAC || ZEN_PLATFORM_LINUX
+
+ m_LocalFolderScanStats.AcceptedFileByteCount += Content.RawSizes.back();
+ m_LocalFolderScanStats.AcceptedFileCount++;
+ }
+ m_LocalFolderScanStats.FoundFileByteCount.store(m_LocalFolderScanStats.AcceptedFileByteCount);
+ m_LocalFolderScanStats.FoundFileCount.store(m_LocalFolderScanStats.AcceptedFileCount);
+ m_LocalFolderScanStats.ElapsedWallTimeUS = ManifestParseTimer.GetElapsedTimeUs();
+ }
+
+ std::unique_ptr<ChunkingController> ChunkController =
+ CreateChunkingControllerWithFixedChunking(ChunkingControllerWithFixedChunkingSettings{});
+ {
+ CbObjectWriter ChunkParametersWriter;
+ ChunkParametersWriter.AddString("name"sv, ChunkController->GetName());
+ ChunkParametersWriter.AddObject("parameters"sv, ChunkController->GetParameters());
+ ChunkerParameters = ChunkParametersWriter.Save();
+ }
+
+ TotalRawSize = std::accumulate(Content.RawSizes.begin(), Content.RawSizes.end(), std::uint64_t(0));
+
+ {
+ std::unique_ptr<BuildOpLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Scan Folder"));
+ BuildOpLogOutput::ProgressBar& Progress(*ProgressBarPtr);
+
+ FilteredRate FilteredBytesHashed;
+ FilteredBytesHashed.Start();
+ LocalContent = ChunkFolderContent(
+ m_ChunkingStats,
+ m_IOWorkerPool,
+ m_Path,
+ Content,
+ *ChunkController,
+ m_LogOutput.GetProgressUpdateDelayMS(),
+ [&](bool IsAborted, bool IsPaused, std::ptrdiff_t) {
+ FilteredBytesHashed.Update(m_ChunkingStats.BytesHashed.load());
+ std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found",
+ m_ChunkingStats.FilesProcessed.load(),
+ Content.Paths.size(),
+ NiceBytes(m_ChunkingStats.BytesHashed.load()),
+ NiceBytes(TotalRawSize),
+ NiceNum(FilteredBytesHashed.GetCurrent()),
+ m_ChunkingStats.UniqueChunksFound.load(),
+ NiceBytes(m_ChunkingStats.UniqueBytesFound.load()));
+ Progress.UpdateState({.Task = "Scanning files ",
+ .Details = Details,
+ .TotalCount = TotalRawSize,
+ .RemainingCount = TotalRawSize - m_ChunkingStats.BytesHashed.load(),
+ .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ },
+ m_AbortFlag,
+ m_PauseFlag);
+ FilteredBytesHashed.Stop();
+ Progress.Finish();
+ if (m_AbortFlag)
+ {
+ return;
+ }
+ }
+
+ if (!m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Found {} ({}) files divided into {} ({}) unique chunks in '{}' in {}. Average hash rate {}B/sec",
+ LocalContent.Paths.size(),
+ NiceBytes(TotalRawSize),
+ m_ChunkingStats.UniqueChunksFound.load(),
+ NiceBytes(m_ChunkingStats.UniqueBytesFound.load()),
+ m_Path,
+ NiceTimeSpanMs(ScanTimer.GetElapsedTimeMs()),
+ NiceNum(GetBytesPerSecond(m_ChunkingStats.ElapsedWallTimeUS, m_ChunkingStats.BytesHashed)));
+ }
+ }
+
+ const ChunkedContentLookup LocalLookup = BuildChunkedContentLookup(LocalContent);
+
+ std::vector<size_t> ReuseBlockIndexes;
+ std::vector<uint32_t> NewBlockChunkIndexes;
+
+ PrepareBuildResult PrepBuildResult = PrepBuildResultFuture.get();
+
+ if (!m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Build prepare took {}. {} took {}, payload size {}{}",
+ NiceTimeSpanMs(PrepBuildResult.ElapsedTimeMs),
+ m_CreateBuild ? "PutBuild" : "GetBuild",
+ NiceTimeSpanMs(PrepBuildResult.PrepareBuildTimeMs),
+ NiceBytes(PrepBuildResult.PayloadSize),
+ m_Options.IgnoreExistingBlocks ? ""
+ : fmt::format(". Found {} blocks in {}",
+ PrepBuildResult.KnownBlocks.size(),
+ NiceTimeSpanMs(PrepBuildResult.FindBlocksTimeMs)));
+ }
+
+ m_LogOutput.SetLogOperationProgress(TaskSteps::CalculateDelta, TaskSteps::StepCount);
+
+ const std::uint64_t LargeAttachmentSize =
+ m_Options.AllowMultiparts ? PrepBuildResult.PreferredMultipartChunkSize * 4u : (std::uint64_t)-1;
+
+ Stopwatch BlockArrangeTimer;
+
+ std::vector<std::uint32_t> LooseChunkIndexes;
+ {
+ bool EnableBlocks = true;
+ std::vector<std::uint32_t> BlockChunkIndexes;
+ for (uint32_t ChunkIndex = 0; ChunkIndex < LocalContent.ChunkedContent.ChunkHashes.size(); ChunkIndex++)
+ {
+ const uint64_t ChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[ChunkIndex];
+ if (!EnableBlocks || ChunkRawSize == 0 || ChunkRawSize > m_Options.BlockParameters.MaxChunkEmbedSize)
+ {
+ LooseChunkIndexes.push_back(ChunkIndex);
+ m_LooseChunksStats.ChunkByteCount += ChunkRawSize;
+ }
+ else
+ {
+ BlockChunkIndexes.push_back(ChunkIndex);
+ m_FindBlocksStats.PotentialChunkByteCount += ChunkRawSize;
+ }
+ }
+ m_FindBlocksStats.PotentialChunkCount = BlockChunkIndexes.size();
+ m_LooseChunksStats.ChunkCount = LooseChunkIndexes.size();
+
+ if (m_Options.IgnoreExistingBlocks)
+ {
+ if (!m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput, "Ignoring any existing blocks in store");
+ }
+ NewBlockChunkIndexes = std::move(BlockChunkIndexes);
+ }
+ else
+ {
+ ReuseBlockIndexes = FindReuseBlocks(PrepBuildResult.KnownBlocks,
+ LocalContent.ChunkedContent.ChunkHashes,
+ BlockChunkIndexes,
+ NewBlockChunkIndexes);
+ m_FindBlocksStats.AcceptedBlockCount = ReuseBlockIndexes.size();
+
+ for (const ChunkBlockDescription& Description : PrepBuildResult.KnownBlocks)
+ {
+ for (uint32_t ChunkRawLength : Description.ChunkRawLengths)
+ {
+ m_FindBlocksStats.FoundBlockByteCount += ChunkRawLength;
+ }
+ m_FindBlocksStats.FoundBlockChunkCount += Description.ChunkRawHashes.size();
+ }
+ }
+ }
+
+ std::vector<std::vector<uint32_t>> NewBlockChunks;
+ ArrangeChunksIntoBlocks(LocalContent, LocalLookup, NewBlockChunkIndexes, NewBlockChunks);
+
+ m_FindBlocksStats.NewBlocksCount = NewBlockChunks.size();
+ for (uint32_t ChunkIndex : NewBlockChunkIndexes)
+ {
+ m_FindBlocksStats.NewBlocksChunkByteCount += LocalContent.ChunkedContent.ChunkRawSizes[ChunkIndex];
+ }
+ m_FindBlocksStats.NewBlocksChunkCount = NewBlockChunkIndexes.size();
+
+ const double AcceptedByteCountPercent =
+ m_FindBlocksStats.PotentialChunkByteCount > 0
+ ? (100.0 * m_FindBlocksStats.AcceptedRawByteCount / m_FindBlocksStats.PotentialChunkByteCount)
+ : 0.0;
+
+ const double AcceptedReduntantByteCountPercent =
+ m_FindBlocksStats.AcceptedByteCount > 0 ? (100.0 * m_FindBlocksStats.AcceptedReduntantByteCount) /
+ (m_FindBlocksStats.AcceptedByteCount + m_FindBlocksStats.AcceptedReduntantByteCount)
+ : 0.0;
+ if (!m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Found {} chunks in {} ({}) blocks eligible for reuse in {}\n"
+ " Reusing {} ({}) matching chunks in {} blocks ({:.1f}%)\n"
+ " Accepting {} ({}) redundant chunks ({:.1f}%)\n"
+ " Rejected {} ({}) chunks in {} blocks\n"
+ " Arranged {} ({}) chunks in {} new blocks\n"
+ " Keeping {} ({}) chunks as loose chunks\n"
+ " Discovery completed in {}",
+ m_FindBlocksStats.FoundBlockChunkCount,
+ m_FindBlocksStats.FoundBlockCount,
+ NiceBytes(m_FindBlocksStats.FoundBlockByteCount),
+ NiceTimeSpanMs(m_FindBlocksStats.FindBlockTimeMS),
+
+ m_FindBlocksStats.AcceptedChunkCount,
+ NiceBytes(m_FindBlocksStats.AcceptedRawByteCount),
+ m_FindBlocksStats.AcceptedBlockCount,
+ AcceptedByteCountPercent,
+
+ m_FindBlocksStats.AcceptedReduntantChunkCount,
+ NiceBytes(m_FindBlocksStats.AcceptedReduntantByteCount),
+ AcceptedReduntantByteCountPercent,
+
+ m_FindBlocksStats.RejectedChunkCount,
+ NiceBytes(m_FindBlocksStats.RejectedByteCount),
+ m_FindBlocksStats.RejectedBlockCount,
+
+ m_FindBlocksStats.NewBlocksChunkCount,
+ NiceBytes(m_FindBlocksStats.NewBlocksChunkByteCount),
+ m_FindBlocksStats.NewBlocksCount,
+
+ m_LooseChunksStats.ChunkCount,
+ NiceBytes(m_LooseChunksStats.ChunkByteCount),
+
+ NiceTimeSpanMs(BlockArrangeTimer.GetElapsedTimeMs()));
+ }
+
+ GeneratedBlocks NewBlocks;
+
+ m_LogOutput.SetLogOperationProgress(TaskSteps::Upload, TaskSteps::StepCount);
+
+ if (!NewBlockChunks.empty())
+ {
+ Stopwatch GenerateBuildBlocksTimer;
+ auto __ = MakeGuard([&]() {
+ uint64_t BlockGenerateTimeUs = GenerateBuildBlocksTimer.GetElapsedTimeUs();
+ if (!m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Generated {} ({}) and uploaded {} ({}) blocks in {}. Generate speed: {}B/sec. Transfer speed {}bits/sec.",
+ m_GenerateBlocksStats.GeneratedBlockCount.load(),
+ NiceBytes(m_GenerateBlocksStats.GeneratedBlockByteCount),
+ m_UploadStats.BlockCount.load(),
+ NiceBytes(m_UploadStats.BlocksBytes.load()),
+ NiceTimeSpanMs(BlockGenerateTimeUs / 1000),
+ NiceNum(GetBytesPerSecond(m_GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS,
+ m_GenerateBlocksStats.GeneratedBlockByteCount)),
+ NiceNum(GetBytesPerSecond(m_UploadStats.ElapsedWallTimeUS, m_UploadStats.BlocksBytes * 8)));
+ }
+ });
+ GenerateBuildBlocks(LocalContent, LocalLookup, NewBlockChunks, NewBlocks);
+ }
+
+ CbObject PartManifest;
+ {
+ CbObjectWriter PartManifestWriter;
+ Stopwatch ManifestGenerationTimer;
+ auto __ = MakeGuard([&]() {
+ if (!m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Generated build part manifest in {} ({})",
+ NiceTimeSpanMs(ManifestGenerationTimer.GetElapsedTimeMs()),
+ NiceBytes(PartManifestWriter.GetSaveSize()));
+ }
+ });
+ PartManifestWriter.AddObject("chunker"sv, ChunkerParameters);
+
+ std::vector<IoHash> AllChunkBlockHashes;
+ std::vector<ChunkBlockDescription> AllChunkBlockDescriptions;
+ AllChunkBlockHashes.reserve(ReuseBlockIndexes.size() + NewBlocks.BlockDescriptions.size());
+ AllChunkBlockDescriptions.reserve(ReuseBlockIndexes.size() + NewBlocks.BlockDescriptions.size());
+ for (size_t ReuseBlockIndex : ReuseBlockIndexes)
+ {
+ AllChunkBlockDescriptions.push_back(PrepBuildResult.KnownBlocks[ReuseBlockIndex]);
+ AllChunkBlockHashes.push_back(PrepBuildResult.KnownBlocks[ReuseBlockIndex].BlockHash);
+ }
+ AllChunkBlockDescriptions.insert(AllChunkBlockDescriptions.end(),
+ NewBlocks.BlockDescriptions.begin(),
+ NewBlocks.BlockDescriptions.end());
+ for (const ChunkBlockDescription& BlockDescription : NewBlocks.BlockDescriptions)
+ {
+ AllChunkBlockHashes.push_back(BlockDescription.BlockHash);
+ }
+#if EXTRA_VERIFY
+ tsl::robin_map<IoHash, size_t, IoHash::Hasher> ChunkHashToAbsoluteChunkIndex;
+ std::vector<IoHash> AbsoluteChunkHashes;
+ AbsoluteChunkHashes.reserve(LocalContent.ChunkedContent.ChunkHashes.size());
+ for (uint32_t ChunkIndex : LooseChunkIndexes)
+ {
+ ChunkHashToAbsoluteChunkIndex.insert({LocalContent.ChunkedContent.ChunkHashes[ChunkIndex], AbsoluteChunkHashes.size()});
+ AbsoluteChunkHashes.push_back(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]);
+ }
+ for (const ChunkBlockDescription& Block : AllChunkBlockDescriptions)
+ {
+ for (const IoHash& ChunkHash : Block.ChunkHashes)
+ {
+ ChunkHashToAbsoluteChunkIndex.insert({ChunkHash, AbsoluteChunkHashes.size()});
+ AbsoluteChunkHashes.push_back(ChunkHash);
+ }
+ }
+ for (const IoHash& ChunkHash : LocalContent.ChunkedContent.ChunkHashes)
+ {
+ ZEN_ASSERT(AbsoluteChunkHashes[ChunkHashToAbsoluteChunkIndex.at(ChunkHash)] == ChunkHash);
+ ZEN_ASSERT(LocalContent.ChunkedContent.ChunkHashes[LocalLookup.ChunkHashToChunkIndex.at(ChunkHash)] == ChunkHash);
+ }
+ for (const uint32_t ChunkIndex : LocalContent.ChunkedContent.ChunkOrders)
+ {
+ ZEN_ASSERT(AbsoluteChunkHashes[ChunkHashToAbsoluteChunkIndex.at(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex])] ==
+ LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]);
+ ZEN_ASSERT(LocalLookup.ChunkHashToChunkIndex.at(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]) == ChunkIndex);
+ }
+#endif // EXTRA_VERIFY
+ std::vector<uint32_t> AbsoluteChunkOrders = CalculateAbsoluteChunkOrders(LocalContent.ChunkedContent.ChunkHashes,
+ LocalContent.ChunkedContent.ChunkOrders,
+ LocalLookup.ChunkHashToChunkIndex,
+ LooseChunkIndexes,
+ AllChunkBlockDescriptions);
+
+#if EXTRA_VERIFY
+ for (uint32_t ChunkOrderIndex = 0; ChunkOrderIndex < LocalContent.ChunkedContent.ChunkOrders.size(); ChunkOrderIndex++)
+ {
+ uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[ChunkOrderIndex];
+ uint32_t AbsoluteChunkIndex = AbsoluteChunkOrders[ChunkOrderIndex];
+ const IoHash& LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex];
+ const IoHash& AbsoluteChunkHash = AbsoluteChunkHashes[AbsoluteChunkIndex];
+ ZEN_ASSERT(LocalChunkHash == AbsoluteChunkHash);
+ }
+#endif // EXTRA_VERIFY
+
+ WriteBuildContentToCompactBinary(PartManifestWriter,
+ LocalContent.Platform,
+ LocalContent.Paths,
+ LocalContent.RawHashes,
+ LocalContent.RawSizes,
+ LocalContent.Attributes,
+ LocalContent.ChunkedContent.SequenceRawHashes,
+ LocalContent.ChunkedContent.ChunkCounts,
+ LocalContent.ChunkedContent.ChunkHashes,
+ LocalContent.ChunkedContent.ChunkRawSizes,
+ AbsoluteChunkOrders,
+ LooseChunkIndexes,
+ AllChunkBlockHashes);
+
+#if EXTRA_VERIFY
+ {
+ ChunkedFolderContent VerifyFolderContent;
+
+ std::vector<uint32_t> OutAbsoluteChunkOrders;
+ std::vector<IoHash> OutLooseChunkHashes;
+ std::vector<uint64_t> OutLooseChunkRawSizes;
+ std::vector<IoHash> OutBlockRawHashes;
+ ReadBuildContentFromCompactBinary(PartManifestWriter.Save(),
+ VerifyFolderContent.Platform,
+ VerifyFolderContent.Paths,
+ VerifyFolderContent.RawHashes,
+ VerifyFolderContent.RawSizes,
+ VerifyFolderContent.Attributes,
+ VerifyFolderContent.ChunkedContent.SequenceRawHashes,
+ VerifyFolderContent.ChunkedContent.ChunkCounts,
+ OutAbsoluteChunkOrders,
+ OutLooseChunkHashes,
+ OutLooseChunkRawSizes,
+ OutBlockRawHashes);
+ ZEN_ASSERT(OutBlockRawHashes == AllChunkBlockHashes);
+
+ for (uint32_t OrderIndex = 0; OrderIndex < OutAbsoluteChunkOrders.size(); OrderIndex++)
+ {
+ uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[OrderIndex];
+ const IoHash LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex];
+
+ uint32_t VerifyChunkIndex = OutAbsoluteChunkOrders[OrderIndex];
+ const IoHash VerifyChunkHash = AbsoluteChunkHashes[VerifyChunkIndex];
+
+ ZEN_ASSERT(LocalChunkHash == VerifyChunkHash);
+ }
+
+ CalculateLocalChunkOrders(OutAbsoluteChunkOrders,
+ OutLooseChunkHashes,
+ OutLooseChunkRawSizes,
+ AllChunkBlockDescriptions,
+ VerifyFolderContent.ChunkedContent.ChunkHashes,
+ VerifyFolderContent.ChunkedContent.ChunkRawSizes,
+ VerifyFolderContent.ChunkedContent.ChunkOrders);
+
+ ZEN_ASSERT(LocalContent.Paths == VerifyFolderContent.Paths);
+ ZEN_ASSERT(LocalContent.RawHashes == VerifyFolderContent.RawHashes);
+ ZEN_ASSERT(LocalContent.RawSizes == VerifyFolderContent.RawSizes);
+ ZEN_ASSERT(LocalContent.Attributes == VerifyFolderContent.Attributes);
+ ZEN_ASSERT(LocalContent.ChunkedContent.SequenceRawHashes == VerifyFolderContent.ChunkedContent.SequenceRawHashes);
+ ZEN_ASSERT(LocalContent.ChunkedContent.ChunkCounts == VerifyFolderContent.ChunkedContent.ChunkCounts);
+
+ for (uint32_t OrderIndex = 0; OrderIndex < LocalContent.ChunkedContent.ChunkOrders.size(); OrderIndex++)
+ {
+ uint32_t LocalChunkIndex = LocalContent.ChunkedContent.ChunkOrders[OrderIndex];
+ const IoHash LocalChunkHash = LocalContent.ChunkedContent.ChunkHashes[LocalChunkIndex];
+ uint64_t LocalChunkRawSize = LocalContent.ChunkedContent.ChunkRawSizes[LocalChunkIndex];
+
+ uint32_t VerifyChunkIndex = VerifyFolderContent.ChunkedContent.ChunkOrders[OrderIndex];
+ const IoHash VerifyChunkHash = VerifyFolderContent.ChunkedContent.ChunkHashes[VerifyChunkIndex];
+ uint64_t VerifyChunkRawSize = VerifyFolderContent.ChunkedContent.ChunkRawSizes[VerifyChunkIndex];
+
+ ZEN_ASSERT(LocalChunkHash == VerifyChunkHash);
+ ZEN_ASSERT(LocalChunkRawSize == VerifyChunkRawSize);
+ }
+ }
+#endif // EXTRA_VERIFY
+ PartManifest = PartManifestWriter.Save();
+ }
+
+ Stopwatch PutBuildPartResultTimer;
+ std::pair<IoHash, std::vector<IoHash>> PutBuildPartResult =
+ m_Storage.BuildStorage->PutBuildPart(m_BuildId, m_BuildPartId, m_BuildPartName, PartManifest);
+ if (!m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "PutBuildPart took {}, payload size {}. {} attachments are needed.",
+ NiceTimeSpanMs(PutBuildPartResultTimer.GetElapsedTimeMs()),
+ NiceBytes(PartManifest.GetSize()),
+ PutBuildPartResult.second.size());
+ }
+ IoHash PartHash = PutBuildPartResult.first;
+
+ auto UploadAttachments = [this, &LocalContent, &LocalLookup, &NewBlockChunks, &NewBlocks, &LooseChunkIndexes, &LargeAttachmentSize](
+ std::span<IoHash> RawHashes) {
+ if (!m_AbortFlag)
+ {
+ UploadStatistics TempUploadStats;
+ LooseChunksStatistics TempLooseChunksStats;
+
+ Stopwatch TempUploadTimer;
+ auto __ = MakeGuard([&]() {
+ if (!m_Options.IsQuiet)
+ {
+ uint64_t TempChunkUploadTimeUs = TempUploadTimer.GetElapsedTimeUs();
+ LOG_OUTPUT(m_LogOutput,
+ "Uploaded {} ({}) blocks. "
+ "Compressed {} ({} {}B/s) and uploaded {} ({}) chunks. "
+ "Transferred {} ({}bits/s) in {}",
+ TempUploadStats.BlockCount.load(),
+ NiceBytes(TempUploadStats.BlocksBytes),
+
+ TempLooseChunksStats.CompressedChunkCount.load(),
+ NiceBytes(TempLooseChunksStats.CompressedChunkBytes.load()),
+ NiceNum(GetBytesPerSecond(TempLooseChunksStats.CompressChunksElapsedWallTimeUS,
+ TempLooseChunksStats.ChunkByteCount)),
+ TempUploadStats.ChunkCount.load(),
+ NiceBytes(TempUploadStats.ChunksBytes),
+
+ NiceBytes(TempUploadStats.BlocksBytes + TempUploadStats.ChunksBytes),
+ NiceNum(GetBytesPerSecond(TempUploadStats.ElapsedWallTimeUS, TempUploadStats.ChunksBytes * 8)),
+ NiceTimeSpanMs(TempChunkUploadTimeUs / 1000));
+ }
+ });
+ UploadPartBlobs(LocalContent,
+ LocalLookup,
+ RawHashes,
+ NewBlockChunks,
+ NewBlocks,
+ LooseChunkIndexes,
+ LargeAttachmentSize,
+ TempUploadStats,
+ TempLooseChunksStats);
+ m_UploadStats += TempUploadStats;
+ m_LooseChunksStats += TempLooseChunksStats;
+ }
+ };
+ if (m_Options.IgnoreExistingBlocks)
+ {
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "PutBuildPart uploading all attachments, needs are: {}",
+ FormatArray<IoHash>(PutBuildPartResult.second, "\n "sv));
+ }
+
+ std::vector<IoHash> ForceUploadChunkHashes;
+ ForceUploadChunkHashes.reserve(LooseChunkIndexes.size());
+
+ for (uint32_t ChunkIndex : LooseChunkIndexes)
+ {
+ ForceUploadChunkHashes.push_back(LocalContent.ChunkedContent.ChunkHashes[ChunkIndex]);
+ }
+
+ for (size_t BlockIndex = 0; BlockIndex < NewBlocks.BlockHeaders.size(); BlockIndex++)
+ {
+ if (NewBlocks.BlockHeaders[BlockIndex])
+ {
+ // Block was not uploaded during generation
+ ForceUploadChunkHashes.push_back(NewBlocks.BlockDescriptions[BlockIndex].BlockHash);
+ }
+ }
+ UploadAttachments(ForceUploadChunkHashes);
+ }
+ else if (!PutBuildPartResult.second.empty())
+ {
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput, "PutBuildPart needs attachments: {}", FormatArray<IoHash>(PutBuildPartResult.second, "\n "sv));
+ }
+ UploadAttachments(PutBuildPartResult.second);
+ }
+
+ uint32_t FinalizeBuildPartRetryCount = 5;
+ while (!m_AbortFlag && (FinalizeBuildPartRetryCount--) > 0)
+ {
+ Stopwatch FinalizeBuildPartTimer;
+ std::vector<IoHash> Needs = m_Storage.BuildStorage->FinalizeBuildPart(m_BuildId, m_BuildPartId, PartHash);
+ if (!m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "FinalizeBuildPart took {}. {} attachments are missing.",
+ NiceTimeSpanMs(FinalizeBuildPartTimer.GetElapsedTimeMs()),
+ Needs.size());
+ }
+ if (Needs.empty())
+ {
+ break;
+ }
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput, "FinalizeBuildPart needs attachments: {}", FormatArray<IoHash>(Needs, "\n "sv));
+ }
+ UploadAttachments(Needs);
+ }
+
+ if (m_CreateBuild && !m_AbortFlag)
+ {
+ Stopwatch FinalizeBuildTimer;
+ m_Storage.BuildStorage->FinalizeBuild(m_BuildId);
+ if (!m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput, "FinalizeBuild took {}", NiceTimeSpanMs(FinalizeBuildTimer.GetElapsedTimeMs()));
+ }
+ }
+
+ if (!NewBlocks.BlockDescriptions.empty() && !m_AbortFlag)
+ {
+ uint64_t UploadBlockMetadataCount = 0;
+ Stopwatch UploadBlockMetadataTimer;
+
+ uint32_t FailedMetadataUploadCount = 1;
+ int32_t MetadataUploadRetryCount = 3;
+ while ((MetadataUploadRetryCount-- > 0) && (FailedMetadataUploadCount > 0))
+ {
+ FailedMetadataUploadCount = 0;
+ for (size_t BlockIndex = 0; BlockIndex < NewBlocks.BlockDescriptions.size(); BlockIndex++)
+ {
+ if (m_AbortFlag)
+ {
+ break;
+ }
+ const IoHash& BlockHash = NewBlocks.BlockDescriptions[BlockIndex].BlockHash;
+ if (!NewBlocks.MetaDataHasBeenUploaded[BlockIndex])
+ {
+ const CbObject BlockMetaData =
+ BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]);
+ if (m_Storage.BuildCacheStorage)
+ {
+ m_Storage.BuildCacheStorage->PutBlobMetadatas(m_BuildId,
+ std::vector<IoHash>({BlockHash}),
+ std::vector<CbObject>({BlockMetaData}));
+ }
+ bool MetadataSucceeded = m_Storage.BuildStorage->PutBlockMetadata(m_BuildId, BlockHash, BlockMetaData);
+ if (MetadataSucceeded)
+ {
+ m_UploadStats.BlocksBytes += BlockMetaData.GetSize();
+ NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true;
+ UploadBlockMetadataCount++;
+ }
+ else
+ {
+ FailedMetadataUploadCount++;
+ }
+ }
+ }
+ }
+ if (UploadBlockMetadataCount > 0)
+ {
+ uint64_t ElapsedUS = UploadBlockMetadataTimer.GetElapsedTimeUs();
+ m_UploadStats.ElapsedWallTimeUS += ElapsedUS;
+ if (!m_Options.IsQuiet)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Uploaded metadata for {} blocks in {}",
+ UploadBlockMetadataCount,
+ NiceTimeSpanMs(ElapsedUS / 1000));
+ }
+ }
+ }
+
+ // if (m_PostUploadVerify && !m_AbortFlag)
+ // {
+ // //m_LogOutput.SetLogOperationProgress(TaskSteps::Validate, TaskSteps::StepCount);
+ // ValidateBuildPart(*m_Storage.BuildStorage, m_BuildId, m_BuildPartId, m_BuildPartName, ValidateStats, ValidateDownloadStats);
+ // std::string ValidateInfo;
+ // if (m_PostUploadVerify)
+ // {
+ // const uint64_t DownloadedCount = ValidateDownloadStats.DownloadedChunkCount + ValidateDownloadStats.DownloadedBlockCount;
+ // const uint64_t DownloadedByteCount =
+ // ValidateDownloadStats.DownloadedChunkByteCount + ValidateDownloadStats.DownloadedBlockByteCount;
+ // ValidateInfo = fmt::format("\n Verified: {:>8} ({}), {}B/sec, {}",
+ // DownloadedCount,
+ // NiceBytes(DownloadedByteCount),
+ // NiceNum(GetBytesPerSecond(ValidateStats.ElapsedWallTimeUS, DownloadedByteCount)),
+ // NiceTimeSpanMs(ValidateStats.ElapsedWallTimeUS / 1000));
+ // }
+ // }
+
+ m_Storage.BuildStorage->PutBuildPartStats(
+ m_BuildId,
+ m_BuildPartId,
+ {{"totalSize", double(m_LocalFolderScanStats.FoundFileByteCount.load())},
+ {"reusedRatio", AcceptedByteCountPercent / 100.0},
+ {"reusedBlockCount", double(m_FindBlocksStats.AcceptedBlockCount)},
+ {"reusedBlockByteCount", double(m_FindBlocksStats.AcceptedRawByteCount)},
+ {"newBlockCount", double(m_FindBlocksStats.NewBlocksCount)},
+ {"newBlockByteCount", double(m_FindBlocksStats.NewBlocksChunkByteCount)},
+ {"uploadedCount", double(m_UploadStats.BlockCount.load() + m_UploadStats.ChunkCount.load())},
+ {"uploadedByteCount", double(m_UploadStats.BlocksBytes.load() + m_UploadStats.ChunksBytes.load())},
+ {"uploadedBytesPerSec",
+ double(GetBytesPerSecond(m_UploadStats.ElapsedWallTimeUS, m_UploadStats.ChunksBytes + m_UploadStats.BlocksBytes))},
+ {"elapsedTimeSec", double(ProcessTimer.GetElapsedTimeMs() / 1000.0)}});
+
+ m_LogOutput.SetLogOperationProgress(TaskSteps::Cleanup, TaskSteps::StepCount);
+}
+
void
DownloadLargeBlob(BuildStorage& Storage,
const std::filesystem::path& DownloadFolder,
@@ -4739,4 +5744,1390 @@ DownloadLargeBlob(BuildStorage& Storage,
}
}
+std::vector<size_t>
+BuildsOperationUploadFolder::FindReuseBlocks(const std::vector<ChunkBlockDescription>& KnownBlocks,
+ std::span<const IoHash> ChunkHashes,
+ std::span<const uint32_t> ChunkIndexes,
+ std::vector<uint32_t>& OutUnusedChunkIndexes)
+{
+ ZEN_TRACE_CPU("FindReuseBlocks");
+
+ // Find all blocks with a usage level higher than MinPercentLimit
+ // Pick out the blocks with usage higher or equal to MinPercentLimit
+ // Sort them with highest size usage - most usage first
+ // Make a list of all chunks and mark them as not found
+ // For each block, recalculate the block has usage percent based on the chunks marked as not found
+ // If the block still reaches MinPercentLimit, keep it and remove the matching chunks from the not found list
+ // Repeat for following all remaining block that initially matched MinPercentLimit
+
+ std::vector<size_t> FilteredReuseBlockIndexes;
+
+ uint32_t ChunkCount = gsl::narrow<uint32_t>(ChunkHashes.size());
+ std::vector<bool> ChunkFound(ChunkCount, false);
+
+ if (ChunkCount > 0)
+ {
+ if (!KnownBlocks.empty())
+ {
+ Stopwatch ReuseTimer;
+
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkHashToChunkIndex;
+ ChunkHashToChunkIndex.reserve(ChunkIndexes.size());
+ for (uint32_t ChunkIndex : ChunkIndexes)
+ {
+ ChunkHashToChunkIndex.insert_or_assign(ChunkHashes[ChunkIndex], ChunkIndex);
+ }
+
+ std::vector<size_t> BlockSizes(KnownBlocks.size(), 0);
+ std::vector<size_t> BlockUseSize(KnownBlocks.size(), 0);
+
+ std::vector<size_t> ReuseBlockIndexes;
+
+ for (size_t KnownBlockIndex = 0; KnownBlockIndex < KnownBlocks.size(); KnownBlockIndex++)
+ {
+ const ChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex];
+ if (KnownBlock.BlockHash != IoHash::Zero && KnownBlock.ChunkRawHashes.size() == KnownBlock.ChunkCompressedLengths.size())
+ {
+ size_t BlockAttachmentCount = KnownBlock.ChunkRawHashes.size();
+ if (BlockAttachmentCount == 0)
+ {
+ continue;
+ }
+ size_t ReuseSize = 0;
+ size_t BlockSize = 0;
+ size_t FoundAttachmentCount = 0;
+ size_t BlockChunkCount = KnownBlock.ChunkRawHashes.size();
+ for (size_t BlockChunkIndex = 0; BlockChunkIndex < BlockChunkCount; BlockChunkIndex++)
+ {
+ const IoHash& BlockChunkHash = KnownBlock.ChunkRawHashes[BlockChunkIndex];
+ const uint32_t BlockChunkSize = KnownBlock.ChunkCompressedLengths[BlockChunkIndex];
+ BlockSize += BlockChunkSize;
+ if (ChunkHashToChunkIndex.contains(BlockChunkHash))
+ {
+ ReuseSize += BlockChunkSize;
+ FoundAttachmentCount++;
+ }
+ }
+
+ size_t ReusePercent = (ReuseSize * 100) / BlockSize;
+
+ if (ReusePercent >= m_Options.BlockReuseMinPercentLimit)
+ {
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Reusing block {}. {} attachments found, usage level: {}%",
+ KnownBlock.BlockHash,
+ FoundAttachmentCount,
+ ReusePercent);
+ }
+ ReuseBlockIndexes.push_back(KnownBlockIndex);
+
+ BlockSizes[KnownBlockIndex] = BlockSize;
+ BlockUseSize[KnownBlockIndex] = ReuseSize;
+ }
+ else if (FoundAttachmentCount > 0)
+ {
+ // if (m_Options.IsVerbose)
+ //{
+ // LOG_OUTPUT(m_LogOutput, "Skipping block {}. {} attachments found, usage level: {}%", KnownBlock.BlockHash,
+ // FoundAttachmentCount, ReusePercent);
+ //}
+ m_FindBlocksStats.RejectedBlockCount++;
+ m_FindBlocksStats.RejectedChunkCount += FoundAttachmentCount;
+ m_FindBlocksStats.RejectedByteCount += ReuseSize;
+ }
+ }
+ }
+
+ if (!ReuseBlockIndexes.empty())
+ {
+ std::sort(ReuseBlockIndexes.begin(), ReuseBlockIndexes.end(), [&](size_t Lhs, size_t Rhs) {
+ return BlockUseSize[Lhs] > BlockUseSize[Rhs];
+ });
+
+ for (size_t KnownBlockIndex : ReuseBlockIndexes)
+ {
+ std::vector<uint32_t> FoundChunkIndexes;
+ size_t BlockSize = 0;
+ size_t AdjustedReuseSize = 0;
+ size_t AdjustedRawReuseSize = 0;
+ const ChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex];
+ for (size_t BlockChunkIndex = 0; BlockChunkIndex < KnownBlock.ChunkRawHashes.size(); BlockChunkIndex++)
+ {
+ const IoHash& BlockChunkHash = KnownBlock.ChunkRawHashes[BlockChunkIndex];
+ const uint32_t BlockChunkSize = KnownBlock.ChunkCompressedLengths[BlockChunkIndex];
+ BlockSize += BlockChunkSize;
+ if (auto It = ChunkHashToChunkIndex.find(BlockChunkHash); It != ChunkHashToChunkIndex.end())
+ {
+ const uint32_t ChunkIndex = It->second;
+ if (!ChunkFound[ChunkIndex])
+ {
+ FoundChunkIndexes.push_back(ChunkIndex);
+ AdjustedReuseSize += KnownBlock.ChunkCompressedLengths[BlockChunkIndex];
+ AdjustedRawReuseSize += KnownBlock.ChunkRawLengths[BlockChunkIndex];
+ }
+ }
+ }
+
+ size_t ReusePercent = (AdjustedReuseSize * 100) / BlockSize;
+
+ if (ReusePercent >= m_Options.BlockReuseMinPercentLimit)
+ {
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Reusing block {}. {} attachments found, usage level: {}%",
+ KnownBlock.BlockHash,
+ FoundChunkIndexes.size(),
+ ReusePercent);
+ }
+ FilteredReuseBlockIndexes.push_back(KnownBlockIndex);
+
+ for (uint32_t ChunkIndex : FoundChunkIndexes)
+ {
+ ChunkFound[ChunkIndex] = true;
+ }
+ m_FindBlocksStats.AcceptedChunkCount += FoundChunkIndexes.size();
+ m_FindBlocksStats.AcceptedByteCount += AdjustedReuseSize;
+ m_FindBlocksStats.AcceptedRawByteCount += AdjustedRawReuseSize;
+ m_FindBlocksStats.AcceptedReduntantChunkCount += KnownBlock.ChunkRawHashes.size() - FoundChunkIndexes.size();
+ m_FindBlocksStats.AcceptedReduntantByteCount += BlockSize - AdjustedReuseSize;
+ }
+ else
+ {
+ // if (m_Options.IsVerbose)
+ //{
+ // LOG_OUTPUT(m_LogOutput, "Skipping block {}. filtered usage level: {}%", KnownBlock.BlockHash, ReusePercent);
+ //}
+ m_FindBlocksStats.RejectedBlockCount++;
+ m_FindBlocksStats.RejectedChunkCount += FoundChunkIndexes.size();
+ m_FindBlocksStats.RejectedByteCount += AdjustedReuseSize;
+ }
+ }
+ }
+ }
+ OutUnusedChunkIndexes.reserve(ChunkIndexes.size() - m_FindBlocksStats.AcceptedChunkCount);
+ for (uint32_t ChunkIndex : ChunkIndexes)
+ {
+ if (!ChunkFound[ChunkIndex])
+ {
+ OutUnusedChunkIndexes.push_back(ChunkIndex);
+ }
+ }
+ }
+ return FilteredReuseBlockIndexes;
+}
+
+void
+BuildsOperationUploadFolder::ArrangeChunksIntoBlocks(const ChunkedFolderContent& Content,
+ const ChunkedContentLookup& Lookup,
+ std::vector<uint32_t>& ChunkIndexes,
+ std::vector<std::vector<uint32_t>>& OutBlocks)
+{
+ ZEN_TRACE_CPU("ArrangeChunksIntoBlocks");
+ std::sort(ChunkIndexes.begin(), ChunkIndexes.end(), [&Content, &Lookup](uint32_t Lhs, uint32_t Rhs) {
+ const ChunkedContentLookup::ChunkSequenceLocation& LhsLocation = GetChunkSequenceLocations(Lookup, Lhs)[0];
+ const ChunkedContentLookup::ChunkSequenceLocation& RhsLocation = GetChunkSequenceLocations(Lookup, Rhs)[0];
+ if (LhsLocation.SequenceIndex < RhsLocation.SequenceIndex)
+ {
+ return true;
+ }
+ else if (LhsLocation.SequenceIndex > RhsLocation.SequenceIndex)
+ {
+ return false;
+ }
+ return LhsLocation.Offset < RhsLocation.Offset;
+ });
+
+ uint64_t MaxBlockSizeLowThreshold = m_Options.BlockParameters.MaxBlockSize - (m_Options.BlockParameters.MaxBlockSize / 16);
+
+ uint64_t BlockSize = 0;
+
+ uint32_t ChunkIndexStart = 0;
+ for (uint32_t ChunkIndexOffset = 0; ChunkIndexOffset < ChunkIndexes.size();)
+ {
+ const uint32_t ChunkIndex = ChunkIndexes[ChunkIndexOffset];
+ const uint64_t ChunkSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex];
+
+ if (((BlockSize + ChunkSize) > m_Options.BlockParameters.MaxBlockSize) ||
+ (ChunkIndexOffset - ChunkIndexStart) > m_Options.BlockParameters.MaxChunksPerBlock)
+ {
+ // Within the span of MaxBlockSizeLowThreshold and MaxBlockSize, see if there is a break
+ // between source paths for chunks. Break the block at the last such break if any.
+ ZEN_ASSERT(ChunkIndexOffset > ChunkIndexStart);
+
+ const uint32_t ChunkSequenceIndex = Lookup.ChunkSequenceLocations[Lookup.ChunkSequenceLocationOffset[ChunkIndex]].SequenceIndex;
+
+ uint64_t ScanBlockSize = BlockSize;
+
+ uint32_t ScanChunkIndexOffset = ChunkIndexOffset - 1;
+ while (ScanChunkIndexOffset > (ChunkIndexStart + 2))
+ {
+ const uint32_t TestChunkIndex = ChunkIndexes[ScanChunkIndexOffset];
+ const uint64_t TestChunkSize = Content.ChunkedContent.ChunkRawSizes[TestChunkIndex];
+ if ((ScanBlockSize - TestChunkSize) < MaxBlockSizeLowThreshold)
+ {
+ break;
+ }
+
+ const uint32_t TestSequenceIndex =
+ Lookup.ChunkSequenceLocations[Lookup.ChunkSequenceLocationOffset[TestChunkIndex]].SequenceIndex;
+ if (ChunkSequenceIndex != TestSequenceIndex)
+ {
+ ChunkIndexOffset = ScanChunkIndexOffset + 1;
+ break;
+ }
+
+ ScanBlockSize -= TestChunkSize;
+ ScanChunkIndexOffset--;
+ }
+
+ std::vector<uint32_t> ChunksInBlock;
+ ChunksInBlock.reserve(ChunkIndexOffset - ChunkIndexStart);
+ for (uint32_t AddIndexOffset = ChunkIndexStart; AddIndexOffset < ChunkIndexOffset; AddIndexOffset++)
+ {
+ const uint32_t AddChunkIndex = ChunkIndexes[AddIndexOffset];
+ ChunksInBlock.push_back(AddChunkIndex);
+ }
+ OutBlocks.emplace_back(std::move(ChunksInBlock));
+ BlockSize = 0;
+ ChunkIndexStart = ChunkIndexOffset;
+ }
+ else
+ {
+ ChunkIndexOffset++;
+ BlockSize += ChunkSize;
+ }
+ }
+ if (ChunkIndexStart < ChunkIndexes.size())
+ {
+ std::vector<uint32_t> ChunksInBlock;
+ ChunksInBlock.reserve(ChunkIndexes.size() - ChunkIndexStart);
+ for (uint32_t AddIndexOffset = ChunkIndexStart; AddIndexOffset < ChunkIndexes.size(); AddIndexOffset++)
+ {
+ const uint32_t AddChunkIndex = ChunkIndexes[AddIndexOffset];
+ ChunksInBlock.push_back(AddChunkIndex);
+ }
+ OutBlocks.emplace_back(std::move(ChunksInBlock));
+ }
+}
+
+void
+BuildsOperationUploadFolder::GenerateBuildBlocks(const ChunkedFolderContent& Content,
+ const ChunkedContentLookup& Lookup,
+ const std::vector<std::vector<uint32_t>>& NewBlockChunks,
+ GeneratedBlocks& OutBlocks)
+{
+ ZEN_TRACE_CPU("GenerateBuildBlocks");
+ const std::size_t NewBlockCount = NewBlockChunks.size();
+ if (NewBlockCount > 0)
+ {
+ std::unique_ptr<BuildOpLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Generate Blocks"));
+ BuildOpLogOutput::ProgressBar& Progress(*ProgressBarPtr);
+
+ OutBlocks.BlockDescriptions.resize(NewBlockCount);
+ OutBlocks.BlockSizes.resize(NewBlockCount);
+ OutBlocks.BlockMetaDatas.resize(NewBlockCount);
+ OutBlocks.BlockHeaders.resize(NewBlockCount);
+ OutBlocks.MetaDataHasBeenUploaded.resize(NewBlockCount, 0);
+ OutBlocks.BlockHashToBlockIndex.reserve(NewBlockCount);
+
+ RwLock Lock;
+
+ WorkerThreadPool& GenerateBlobsPool = m_IOWorkerPool;
+ WorkerThreadPool& UploadBlocksPool = m_NetworkPool;
+
+ FilteredRate FilteredGeneratedBytesPerSecond;
+ FilteredRate FilteredUploadedBytesPerSecond;
+
+ ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ std::atomic<uint64_t> QueuedPendingBlocksForUpload = 0;
+
+ for (size_t BlockIndex = 0; BlockIndex < NewBlockCount; BlockIndex++)
+ {
+ if (Work.IsAborted())
+ {
+ break;
+ }
+ const std::vector<uint32_t>& ChunksInBlock = NewBlockChunks[BlockIndex];
+ Work.ScheduleWork(
+ GenerateBlobsPool,
+ [this,
+ &Content,
+ &Lookup,
+ &Work,
+ &UploadBlocksPool,
+ NewBlockCount,
+ ChunksInBlock,
+ &Lock,
+ &OutBlocks,
+ &FilteredGeneratedBytesPerSecond,
+ &QueuedPendingBlocksForUpload,
+ &FilteredUploadedBytesPerSecond,
+ BlockIndex](std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("GenerateBuildBlocks_Generate");
+
+ FilteredGeneratedBytesPerSecond.Start();
+ // TODO: Convert ScheduleWork body to function
+
+ Stopwatch GenerateTimer;
+ CompressedBuffer CompressedBlock =
+ GenerateBlock(Content, Lookup, ChunksInBlock, OutBlocks.BlockDescriptions[BlockIndex]);
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Generated block {} ({}) containing {} chunks in {}",
+ OutBlocks.BlockDescriptions[BlockIndex].BlockHash,
+ NiceBytes(CompressedBlock.GetCompressedSize()),
+ OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size(),
+ NiceTimeSpanMs(GenerateTimer.GetElapsedTimeMs()));
+ }
+
+ OutBlocks.BlockSizes[BlockIndex] = CompressedBlock.GetCompressedSize();
+ {
+ CbObjectWriter Writer;
+ Writer.AddString("createdBy", "zen");
+ OutBlocks.BlockMetaDatas[BlockIndex] = Writer.Save();
+ }
+ m_GenerateBlocksStats.GeneratedBlockByteCount += OutBlocks.BlockSizes[BlockIndex];
+ m_GenerateBlocksStats.GeneratedBlockCount++;
+
+ Lock.WithExclusiveLock([&]() {
+ OutBlocks.BlockHashToBlockIndex.insert_or_assign(OutBlocks.BlockDescriptions[BlockIndex].BlockHash, BlockIndex);
+ });
+
+ {
+ std::span<const SharedBuffer> Segments = CompressedBlock.GetCompressed().GetSegments();
+ ZEN_ASSERT(Segments.size() >= 2);
+ OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]);
+ }
+
+ if (m_GenerateBlocksStats.GeneratedBlockCount == NewBlockCount)
+ {
+ FilteredGeneratedBytesPerSecond.Stop();
+ }
+
+ if (QueuedPendingBlocksForUpload.load() > 16)
+ {
+ std::span<const SharedBuffer> Segments = CompressedBlock.GetCompressed().GetSegments();
+ ZEN_ASSERT(Segments.size() >= 2);
+ OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]);
+ }
+ else
+ {
+ if (!m_AbortFlag)
+ {
+ QueuedPendingBlocksForUpload++;
+
+ Work.ScheduleWork(
+ UploadBlocksPool,
+ [this,
+ NewBlockCount,
+ &FilteredUploadedBytesPerSecond,
+ &QueuedPendingBlocksForUpload,
+ &OutBlocks,
+ BlockIndex,
+ Payload = std::move(CompressedBlock)](std::atomic<bool>&) mutable {
+ auto _ = MakeGuard([&QueuedPendingBlocksForUpload] { QueuedPendingBlocksForUpload--; });
+ if (!m_AbortFlag)
+ {
+ if (m_GenerateBlocksStats.GeneratedBlockCount == NewBlockCount)
+ {
+ ZEN_TRACE_CPU("GenerateBuildBlocks_Save");
+
+ FilteredUploadedBytesPerSecond.Stop();
+ std::span<const SharedBuffer> Segments = Payload.GetCompressed().GetSegments();
+ ZEN_ASSERT(Segments.size() >= 2);
+ OutBlocks.BlockHeaders[BlockIndex] = CompositeBuffer(Segments[0], Segments[1]);
+ }
+ else
+ {
+ ZEN_TRACE_CPU("GenerateBuildBlocks_Upload");
+
+ FilteredUploadedBytesPerSecond.Start();
+ // TODO: Convert ScheduleWork body to function
+
+ const CbObject BlockMetaData =
+ BuildChunkBlockDescription(OutBlocks.BlockDescriptions[BlockIndex],
+ OutBlocks.BlockMetaDatas[BlockIndex]);
+
+ const IoHash& BlockHash = OutBlocks.BlockDescriptions[BlockIndex].BlockHash;
+ const uint64_t CompressedBlockSize = Payload.GetCompressedSize();
+
+ if (m_Storage.BuildCacheStorage)
+ {
+ m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId,
+ BlockHash,
+ ZenContentType::kCompressedBinary,
+ Payload.GetCompressed());
+ }
+
+ m_Storage.BuildStorage->PutBuildBlob(m_BuildId,
+ BlockHash,
+ ZenContentType::kCompressedBinary,
+ std::move(Payload).GetCompressed());
+ m_UploadStats.BlocksBytes += CompressedBlockSize;
+
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Uploaded block {} ({}) containing {} chunks",
+ BlockHash,
+ NiceBytes(CompressedBlockSize),
+ OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size());
+ }
+
+ if (m_Storage.BuildCacheStorage)
+ {
+ m_Storage.BuildCacheStorage->PutBlobMetadatas(m_BuildId,
+ std::vector<IoHash>({BlockHash}),
+ std::vector<CbObject>({BlockMetaData}));
+ }
+
+ bool MetadataSucceeded =
+ m_Storage.BuildStorage->PutBlockMetadata(m_BuildId, BlockHash, BlockMetaData);
+ if (MetadataSucceeded)
+ {
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Uploaded block {} metadata ({})",
+ BlockHash,
+ NiceBytes(BlockMetaData.GetSize()));
+ }
+
+ OutBlocks.MetaDataHasBeenUploaded[BlockIndex] = true;
+ m_UploadStats.BlocksBytes += BlockMetaData.GetSize();
+ }
+
+ m_UploadStats.BlockCount++;
+ if (m_UploadStats.BlockCount == NewBlockCount)
+ {
+ FilteredUploadedBytesPerSecond.Stop();
+ }
+ }
+ }
+ });
+ }
+ }
+ }
+ });
+ }
+
+ Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
+
+ FilteredGeneratedBytesPerSecond.Update(m_GenerateBlocksStats.GeneratedBlockByteCount.load());
+ FilteredUploadedBytesPerSecond.Update(m_UploadStats.BlocksBytes.load());
+
+ std::string Details = fmt::format("Generated {}/{} ({}, {}B/s). Uploaded {}/{} ({}, {}bits/s)",
+ m_GenerateBlocksStats.GeneratedBlockCount.load(),
+ NewBlockCount,
+ NiceBytes(m_GenerateBlocksStats.GeneratedBlockByteCount.load()),
+ NiceNum(FilteredGeneratedBytesPerSecond.GetCurrent()),
+ m_UploadStats.BlockCount.load(),
+ NewBlockCount,
+ NiceBytes(m_UploadStats.BlocksBytes.load()),
+ NiceNum(FilteredUploadedBytesPerSecond.GetCurrent() * 8));
+
+ Progress.UpdateState({.Task = "Generating blocks",
+ .Details = Details,
+ .TotalCount = gsl::narrow<uint64_t>(NewBlockCount),
+ .RemainingCount = gsl::narrow<uint64_t>(NewBlockCount - m_GenerateBlocksStats.GeneratedBlockCount.load()),
+ .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ });
+
+ ZEN_ASSERT(m_AbortFlag || QueuedPendingBlocksForUpload.load() == 0);
+
+ Progress.Finish();
+
+ m_GenerateBlocksStats.GenerateBlocksElapsedWallTimeUS = FilteredGeneratedBytesPerSecond.GetElapsedTimeUS();
+ m_UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS();
+ }
+}
+
+std::vector<uint32_t>
+BuildsOperationUploadFolder::CalculateAbsoluteChunkOrders(
+ const std::span<const IoHash> LocalChunkHashes,
+ const std::span<const uint32_t> LocalChunkOrder,
+ const tsl::robin_map<IoHash, uint32_t, IoHash::Hasher>& ChunkHashToLocalChunkIndex,
+ const std::span<const uint32_t>& LooseChunkIndexes,
+ const std::span<const ChunkBlockDescription>& BlockDescriptions)
+{
+ ZEN_TRACE_CPU("CalculateAbsoluteChunkOrders");
+
+#if EXTRA_VERIFY
+ std::vector<IoHash> TmpAbsoluteChunkHashes;
+ TmpAbsoluteChunkHashes.reserve(LocalChunkHashes.size());
+#endif // EXTRA_VERIFY
+ std::vector<uint32_t> LocalChunkIndexToAbsoluteChunkIndex;
+ LocalChunkIndexToAbsoluteChunkIndex.resize(LocalChunkHashes.size(), (uint32_t)-1);
+ std::uint32_t AbsoluteChunkCount = 0;
+ for (uint32_t ChunkIndex : LooseChunkIndexes)
+ {
+ LocalChunkIndexToAbsoluteChunkIndex[ChunkIndex] = AbsoluteChunkCount;
+#if EXTRA_VERIFY
+ TmpAbsoluteChunkHashes.push_back(LocalChunkHashes[ChunkIndex]);
+#endif // EXTRA_VERIFY
+ AbsoluteChunkCount++;
+ }
+ for (const ChunkBlockDescription& Block : BlockDescriptions)
+ {
+ for (const IoHash& ChunkHash : Block.ChunkRawHashes)
+ {
+ if (auto It = ChunkHashToLocalChunkIndex.find(ChunkHash); It != ChunkHashToLocalChunkIndex.end())
+ {
+ const uint32_t LocalChunkIndex = It->second;
+ ZEN_ASSERT_SLOW(LocalChunkHashes[LocalChunkIndex] == ChunkHash);
+ LocalChunkIndexToAbsoluteChunkIndex[LocalChunkIndex] = AbsoluteChunkCount;
+ }
+#if EXTRA_VERIFY
+ TmpAbsoluteChunkHashes.push_back(ChunkHash);
+#endif // EXTRA_VERIFY
+ AbsoluteChunkCount++;
+ }
+ }
+ std::vector<uint32_t> AbsoluteChunkOrder;
+ AbsoluteChunkOrder.reserve(LocalChunkHashes.size());
+ for (const uint32_t LocalChunkIndex : LocalChunkOrder)
+ {
+ const uint32_t AbsoluteChunkIndex = LocalChunkIndexToAbsoluteChunkIndex[LocalChunkIndex];
+#if EXTRA_VERIFY
+ ZEN_ASSERT(LocalChunkHashes[LocalChunkIndex] == TmpAbsoluteChunkHashes[AbsoluteChunkIndex]);
+#endif // EXTRA_VERIFY
+ AbsoluteChunkOrder.push_back(AbsoluteChunkIndex);
+ }
+#if EXTRA_VERIFY
+ {
+ uint32_t OrderIndex = 0;
+ while (OrderIndex < LocalChunkOrder.size())
+ {
+ const uint32_t LocalChunkIndex = LocalChunkOrder[OrderIndex];
+ const IoHash& LocalChunkHash = LocalChunkHashes[LocalChunkIndex];
+ const uint32_t AbsoluteChunkIndex = AbsoluteChunkOrder[OrderIndex];
+ const IoHash& AbsoluteChunkHash = TmpAbsoluteChunkHashes[AbsoluteChunkIndex];
+ ZEN_ASSERT(LocalChunkHash == AbsoluteChunkHash);
+ OrderIndex++;
+ }
+ }
+#endif // EXTRA_VERIFY
+ return AbsoluteChunkOrder;
+}
+
+void
+BuildsOperationUploadFolder::CalculateLocalChunkOrders(const std::span<const uint32_t>& AbsoluteChunkOrders,
+ const std::span<const IoHash> LooseChunkHashes,
+ const std::span<const uint64_t> LooseChunkRawSizes,
+ const std::span<const ChunkBlockDescription>& BlockDescriptions,
+ std::vector<IoHash>& OutLocalChunkHashes,
+ std::vector<uint64_t>& OutLocalChunkRawSizes,
+ std::vector<uint32_t>& OutLocalChunkOrders)
+{
+ ZEN_TRACE_CPU("CalculateLocalChunkOrders");
+
+ std::vector<IoHash> AbsoluteChunkHashes;
+ std::vector<uint64_t> AbsoluteChunkRawSizes;
+ AbsoluteChunkHashes.insert(AbsoluteChunkHashes.end(), LooseChunkHashes.begin(), LooseChunkHashes.end());
+ AbsoluteChunkRawSizes.insert(AbsoluteChunkRawSizes.end(), LooseChunkRawSizes.begin(), LooseChunkRawSizes.end());
+ for (const ChunkBlockDescription& Block : BlockDescriptions)
+ {
+ AbsoluteChunkHashes.insert(AbsoluteChunkHashes.end(), Block.ChunkRawHashes.begin(), Block.ChunkRawHashes.end());
+ AbsoluteChunkRawSizes.insert(AbsoluteChunkRawSizes.end(), Block.ChunkRawLengths.begin(), Block.ChunkRawLengths.end());
+ }
+ OutLocalChunkHashes.reserve(AbsoluteChunkHashes.size());
+ OutLocalChunkRawSizes.reserve(AbsoluteChunkRawSizes.size());
+ OutLocalChunkOrders.reserve(AbsoluteChunkOrders.size());
+
+ tsl::robin_map<IoHash, uint32_t, IoHash::Hasher> ChunkHashToChunkIndex;
+ ChunkHashToChunkIndex.reserve(AbsoluteChunkHashes.size());
+
+ for (uint32_t AbsoluteChunkOrderIndex = 0; AbsoluteChunkOrderIndex < AbsoluteChunkOrders.size(); AbsoluteChunkOrderIndex++)
+ {
+ const uint32_t AbsoluteChunkIndex = AbsoluteChunkOrders[AbsoluteChunkOrderIndex];
+ const IoHash& AbsoluteChunkHash = AbsoluteChunkHashes[AbsoluteChunkIndex];
+ const uint64_t AbsoluteChunkRawSize = AbsoluteChunkRawSizes[AbsoluteChunkIndex];
+
+ if (auto It = ChunkHashToChunkIndex.find(AbsoluteChunkHash); It != ChunkHashToChunkIndex.end())
+ {
+ const uint32_t LocalChunkIndex = It->second;
+ OutLocalChunkOrders.push_back(LocalChunkIndex);
+ }
+ else
+ {
+ uint32_t LocalChunkIndex = gsl::narrow<uint32_t>(OutLocalChunkHashes.size());
+ OutLocalChunkHashes.push_back(AbsoluteChunkHash);
+ OutLocalChunkRawSizes.push_back(AbsoluteChunkRawSize);
+ OutLocalChunkOrders.push_back(LocalChunkIndex);
+ ChunkHashToChunkIndex.insert_or_assign(AbsoluteChunkHash, LocalChunkIndex);
+ }
+#if EXTRA_VERIFY
+ const uint32_t LocalChunkIndex = OutLocalChunkOrders[AbsoluteChunkOrderIndex];
+ const IoHash& LocalChunkHash = OutLocalChunkHashes[LocalChunkIndex];
+ const uint64_t& LocalChunkRawSize = OutLocalChunkRawSizes[LocalChunkIndex];
+ ZEN_ASSERT(LocalChunkHash == AbsoluteChunkHash);
+ ZEN_ASSERT(LocalChunkRawSize == AbsoluteChunkRawSize);
+#endif // EXTRA_VERIFY
+ }
+#if EXTRA_VERIFY
+ for (uint32_t OrderIndex = 0; OrderIndex < OutLocalChunkOrders.size(); OrderIndex++)
+ {
+ uint32_t LocalChunkIndex = OutLocalChunkOrders[OrderIndex];
+ const IoHash LocalChunkHash = OutLocalChunkHashes[LocalChunkIndex];
+ uint64_t LocalChunkRawSize = OutLocalChunkRawSizes[LocalChunkIndex];
+
+ uint32_t VerifyChunkIndex = AbsoluteChunkOrders[OrderIndex];
+ const IoHash VerifyChunkHash = AbsoluteChunkHashes[VerifyChunkIndex];
+ uint64_t VerifyChunkRawSize = AbsoluteChunkRawSizes[VerifyChunkIndex];
+
+ ZEN_ASSERT(LocalChunkHash == VerifyChunkHash);
+ ZEN_ASSERT(LocalChunkRawSize == VerifyChunkRawSize);
+ }
+#endif // EXTRA_VERIFY
+}
+
+void
+BuildsOperationUploadFolder::WriteBuildContentToCompactBinary(CbObjectWriter& PartManifestWriter,
+ const SourcePlatform Platform,
+ std::span<const std::filesystem::path> Paths,
+ std::span<const IoHash> RawHashes,
+ std::span<const uint64_t> RawSizes,
+ std::span<const uint32_t> Attributes,
+ std::span<const IoHash> SequenceRawHashes,
+ std::span<const uint32_t> ChunkCounts,
+ std::span<const IoHash> LocalChunkHashes,
+ std::span<const uint64_t> LocalChunkRawSizes,
+ const std::vector<uint32_t>& AbsoluteChunkOrders,
+ const std::span<const uint32_t> LooseLocalChunkIndexes,
+ const std::span<IoHash> BlockHashes)
+{
+ ZEN_ASSERT(Platform != SourcePlatform::_Count);
+ PartManifestWriter.AddString("platform"sv, ToString(Platform));
+
+ uint64_t TotalSize = 0;
+ for (const uint64_t Size : RawSizes)
+ {
+ TotalSize += Size;
+ }
+ PartManifestWriter.AddInteger("totalSize", TotalSize);
+
+ PartManifestWriter.BeginObject("files"sv);
+ {
+ compactbinary_helpers::WriteArray(Paths, "paths"sv, PartManifestWriter);
+ compactbinary_helpers::WriteArray(RawHashes, "rawhashes"sv, PartManifestWriter);
+ compactbinary_helpers::WriteArray(RawSizes, "rawsizes"sv, PartManifestWriter);
+ if (Platform == SourcePlatform::Windows)
+ {
+ compactbinary_helpers::WriteArray(Attributes, "attributes"sv, PartManifestWriter);
+ }
+ if (Platform == SourcePlatform::Linux || Platform == SourcePlatform::MacOS)
+ {
+ compactbinary_helpers::WriteArray(Attributes, "mode"sv, PartManifestWriter);
+ }
+ }
+ PartManifestWriter.EndObject(); // files
+
+ PartManifestWriter.BeginObject("chunkedContent");
+ {
+ compactbinary_helpers::WriteArray(SequenceRawHashes, "sequenceRawHashes"sv, PartManifestWriter);
+ compactbinary_helpers::WriteArray(ChunkCounts, "chunkcounts"sv, PartManifestWriter);
+ compactbinary_helpers::WriteArray(AbsoluteChunkOrders, "chunkorders"sv, PartManifestWriter);
+ }
+ PartManifestWriter.EndObject(); // chunkedContent
+
+ size_t LooseChunkCount = LooseLocalChunkIndexes.size();
+ if (LooseChunkCount > 0)
+ {
+ PartManifestWriter.BeginObject("chunkAttachments");
+ {
+ PartManifestWriter.BeginArray("rawHashes"sv);
+ for (uint32_t ChunkIndex : LooseLocalChunkIndexes)
+ {
+ PartManifestWriter.AddBinaryAttachment(LocalChunkHashes[ChunkIndex]);
+ }
+ PartManifestWriter.EndArray(); // rawHashes
+
+ PartManifestWriter.BeginArray("chunkRawSizes"sv);
+ for (uint32_t ChunkIndex : LooseLocalChunkIndexes)
+ {
+ PartManifestWriter.AddInteger(LocalChunkRawSizes[ChunkIndex]);
+ }
+ PartManifestWriter.EndArray(); // chunkSizes
+ }
+ PartManifestWriter.EndObject(); //
+ }
+
+ if (BlockHashes.size() > 0)
+ {
+ PartManifestWriter.BeginObject("blockAttachments");
+ {
+ compactbinary_helpers::WriteBinaryAttachmentArray(BlockHashes, "rawHashes"sv, PartManifestWriter);
+ }
+ PartManifestWriter.EndObject(); // blocks
+ }
+}
+
+CompositeBuffer
+BuildsOperationUploadFolder::FetchChunk(const ChunkedFolderContent& Content,
+ const ChunkedContentLookup& Lookup,
+ const IoHash& ChunkHash,
+ ReadFileCache& OpenFileCache)
+{
+ ZEN_TRACE_CPU("FetchChunk");
+ auto It = Lookup.ChunkHashToChunkIndex.find(ChunkHash);
+ ZEN_ASSERT(It != Lookup.ChunkHashToChunkIndex.end());
+ uint32_t ChunkIndex = It->second;
+ std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkLocations = GetChunkSequenceLocations(Lookup, ChunkIndex);
+ ZEN_ASSERT(!ChunkLocations.empty());
+ CompositeBuffer Chunk =
+ OpenFileCache.GetRange(ChunkLocations[0].SequenceIndex, ChunkLocations[0].Offset, Content.ChunkedContent.ChunkRawSizes[ChunkIndex]);
+ ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == ChunkHash);
+ return Chunk;
+};
+
+CompressedBuffer
+BuildsOperationUploadFolder::GenerateBlock(const ChunkedFolderContent& Content,
+ const ChunkedContentLookup& Lookup,
+ const std::vector<uint32_t>& ChunksInBlock,
+ ChunkBlockDescription& OutBlockDescription)
+{
+ ZEN_TRACE_CPU("GenerateBlock");
+ ReadFileCache OpenFileCache(m_DiskStats, m_Path, Content, Lookup, 4);
+
+ std::vector<std::pair<IoHash, FetchChunkFunc>> BlockContent;
+ BlockContent.reserve(ChunksInBlock.size());
+ for (uint32_t ChunkIndex : ChunksInBlock)
+ {
+ BlockContent.emplace_back(std::make_pair(
+ Content.ChunkedContent.ChunkHashes[ChunkIndex],
+ [this, &Content, &Lookup, &OpenFileCache, ChunkIndex](const IoHash& ChunkHash) -> std::pair<uint64_t, CompressedBuffer> {
+ CompositeBuffer Chunk = FetchChunk(Content, Lookup, ChunkHash, OpenFileCache);
+ if (!Chunk)
+ {
+ ZEN_ASSERT(false);
+ }
+ uint64_t RawSize = Chunk.GetSize();
+ const bool ShouldCompressChunk = Lookup.RawHashToSequenceIndex.contains(ChunkHash) &&
+ (RawSize >= m_Options.MinimumSizeForCompressInBlock) &&
+ IsChunkCompressable(Content, Lookup, ChunkIndex);
+ const OodleCompressionLevel CompressionLevel =
+ ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None;
+ return {RawSize, CompressedBuffer::Compress(Chunk, OodleCompressor::Mermaid, CompressionLevel)};
+ }));
+ }
+
+ return GenerateChunkBlock(std::move(BlockContent), OutBlockDescription);
+};
+
+CompressedBuffer
+BuildsOperationUploadFolder::RebuildBlock(const ChunkedFolderContent& Content,
+ const ChunkedContentLookup& Lookup,
+ CompositeBuffer&& HeaderBuffer,
+ const std::vector<uint32_t>& ChunksInBlock)
+{
+ ZEN_TRACE_CPU("RebuildBlock");
+ ReadFileCache OpenFileCache(m_DiskStats, m_Path, Content, Lookup, 4);
+
+ std::vector<SharedBuffer> ResultBuffers;
+ ResultBuffers.reserve(HeaderBuffer.GetSegments().size() + ChunksInBlock.size());
+ ResultBuffers.insert(ResultBuffers.end(), HeaderBuffer.GetSegments().begin(), HeaderBuffer.GetSegments().end());
+ for (uint32_t ChunkIndex : ChunksInBlock)
+ {
+ std::span<const ChunkedContentLookup::ChunkSequenceLocation> ChunkLocations = GetChunkSequenceLocations(Lookup, ChunkIndex);
+ ZEN_ASSERT(!ChunkLocations.empty());
+ const IoHash& ChunkHash = Content.ChunkedContent.ChunkHashes[ChunkIndex];
+ CompositeBuffer Chunk = OpenFileCache.GetRange(ChunkLocations[0].SequenceIndex,
+ ChunkLocations[0].Offset,
+ Content.ChunkedContent.ChunkRawSizes[ChunkIndex]);
+ ZEN_ASSERT_SLOW(IoHash::HashBuffer(Chunk) == ChunkHash);
+
+ const uint64_t RawSize = Chunk.GetSize();
+ const bool ShouldCompressChunk = Lookup.RawHashToSequenceIndex.contains(ChunkHash) &&
+ (RawSize >= m_Options.MinimumSizeForCompressInBlock) &&
+ IsChunkCompressable(Content, Lookup, ChunkIndex);
+ const OodleCompressionLevel CompressionLevel = ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None;
+
+ CompositeBuffer CompressedChunk =
+ CompressedBuffer::Compress(std::move(Chunk), OodleCompressor::Mermaid, CompressionLevel).GetCompressed();
+ ResultBuffers.insert(ResultBuffers.end(), CompressedChunk.GetSegments().begin(), CompressedChunk.GetSegments().end());
+ }
+ return CompressedBuffer::FromCompressedNoValidate(CompositeBuffer(std::move(ResultBuffers)));
+};
+
+void
+BuildsOperationUploadFolder::UploadPartBlobs(const ChunkedFolderContent& Content,
+ const ChunkedContentLookup& Lookup,
+ std::span<IoHash> RawHashes,
+ const std::vector<std::vector<uint32_t>>& NewBlockChunks,
+ GeneratedBlocks& NewBlocks,
+ std::span<const uint32_t> LooseChunkIndexes,
+ const std::uint64_t LargeAttachmentSize,
+ UploadStatistics& TempUploadStats,
+ LooseChunksStatistics& TempLooseChunksStats)
+{
+ ZEN_TRACE_CPU("UploadPartBlobs");
+ {
+ std::unique_ptr<BuildOpLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Upload Blobs"));
+ BuildOpLogOutput::ProgressBar& Progress(*ProgressBarPtr);
+
+ WorkerThreadPool& ReadChunkPool = m_IOWorkerPool;
+ WorkerThreadPool& UploadChunkPool = m_NetworkPool;
+
+ FilteredRate FilteredGenerateBlockBytesPerSecond;
+ FilteredRate FilteredCompressedBytesPerSecond;
+ FilteredRate FilteredUploadedBytesPerSecond;
+
+ ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ std::atomic<size_t> UploadedBlockSize = 0;
+ std::atomic<size_t> UploadedBlockCount = 0;
+ std::atomic<size_t> UploadedRawChunkSize = 0;
+ std::atomic<size_t> UploadedCompressedChunkSize = 0;
+ std::atomic<uint32_t> UploadedChunkCount = 0;
+
+ tsl::robin_map<uint32_t, uint32_t> ChunkIndexToLooseChunkOrderIndex;
+ ChunkIndexToLooseChunkOrderIndex.reserve(LooseChunkIndexes.size());
+ for (uint32_t OrderIndex = 0; OrderIndex < LooseChunkIndexes.size(); OrderIndex++)
+ {
+ ChunkIndexToLooseChunkOrderIndex.insert_or_assign(LooseChunkIndexes[OrderIndex], OrderIndex);
+ }
+
+ std::vector<size_t> BlockIndexes;
+ std::vector<uint32_t> LooseChunkOrderIndexes;
+
+ uint64_t TotalLooseChunksSize = 0;
+ uint64_t TotalBlocksSize = 0;
+ for (const IoHash& RawHash : RawHashes)
+ {
+ if (auto It = NewBlocks.BlockHashToBlockIndex.find(RawHash); It != NewBlocks.BlockHashToBlockIndex.end())
+ {
+ BlockIndexes.push_back(It->second);
+ TotalBlocksSize += NewBlocks.BlockSizes[It->second];
+ }
+ else if (auto ChunkIndexIt = Lookup.ChunkHashToChunkIndex.find(RawHash); ChunkIndexIt != Lookup.ChunkHashToChunkIndex.end())
+ {
+ const uint32_t ChunkIndex = ChunkIndexIt->second;
+ if (auto LooseOrderIndexIt = ChunkIndexToLooseChunkOrderIndex.find(ChunkIndex);
+ LooseOrderIndexIt != ChunkIndexToLooseChunkOrderIndex.end())
+ {
+ LooseChunkOrderIndexes.push_back(LooseOrderIndexIt->second);
+ TotalLooseChunksSize += Content.ChunkedContent.ChunkRawSizes[ChunkIndex];
+ }
+ }
+ else
+ {
+ ZEN_CONSOLE_WARN(
+ "Build blob {} was reported as needed for upload but it was reported as existing at the start of the "
+ "operation. Treating it as a transient inconsistent issue and will attempt to retry finalization",
+ RawHash);
+ }
+ }
+ uint64_t TotalRawSize = TotalLooseChunksSize + TotalBlocksSize;
+
+ const size_t UploadBlockCount = BlockIndexes.size();
+ const uint32_t UploadChunkCount = gsl::narrow<uint32_t>(LooseChunkOrderIndexes.size());
+
+ auto AsyncUploadBlock = [this,
+ &Work,
+ &NewBlocks,
+ UploadBlockCount,
+ &UploadedBlockCount,
+ UploadChunkCount,
+ &UploadedChunkCount,
+ &UploadedBlockSize,
+ &TempUploadStats,
+ &FilteredUploadedBytesPerSecond,
+ &UploadChunkPool](const size_t BlockIndex,
+ const IoHash BlockHash,
+ CompositeBuffer&& Payload,
+ std::atomic<uint64_t>& QueuedPendingInMemoryBlocksForUpload) {
+ bool IsInMemoryBlock = true;
+ if (QueuedPendingInMemoryBlocksForUpload.load() > 16)
+ {
+ ZEN_TRACE_CPU("AsyncUploadBlock_WriteTempBlock");
+ std::filesystem::path TempFilePath = m_Options.TempDir / (BlockHash.ToHexString());
+ Payload = CompositeBuffer(WriteToTempFile(std::move(Payload), TempFilePath));
+ IsInMemoryBlock = false;
+ }
+ else
+ {
+ QueuedPendingInMemoryBlocksForUpload++;
+ }
+
+ Work.ScheduleWork(
+ UploadChunkPool,
+ [this,
+ &QueuedPendingInMemoryBlocksForUpload,
+ &NewBlocks,
+ UploadBlockCount,
+ &UploadedBlockCount,
+ UploadChunkCount,
+ &UploadedChunkCount,
+ &UploadedBlockSize,
+ &TempUploadStats,
+ &FilteredUploadedBytesPerSecond,
+ IsInMemoryBlock,
+ BlockIndex,
+ BlockHash,
+ Payload = std::move(Payload)](std::atomic<bool>&) mutable {
+ auto _ = MakeGuard([IsInMemoryBlock, &QueuedPendingInMemoryBlocksForUpload] {
+ if (IsInMemoryBlock)
+ {
+ QueuedPendingInMemoryBlocksForUpload--;
+ }
+ });
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("AsyncUploadBlock");
+
+ const uint64_t PayloadSize = Payload.GetSize();
+
+ FilteredUploadedBytesPerSecond.Start();
+ const CbObject BlockMetaData =
+ BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]);
+
+ if (m_Storage.BuildCacheStorage)
+ {
+ m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload);
+ }
+ m_Storage.BuildStorage->PutBuildBlob(m_BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload);
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Uploaded block {} ({}) containing {} chunks",
+ BlockHash,
+ NiceBytes(PayloadSize),
+ NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size());
+ }
+ UploadedBlockSize += PayloadSize;
+ TempUploadStats.BlocksBytes += PayloadSize;
+
+ if (m_Storage.BuildCacheStorage)
+ {
+ m_Storage.BuildCacheStorage->PutBlobMetadatas(m_BuildId,
+ std::vector<IoHash>({BlockHash}),
+ std::vector<CbObject>({BlockMetaData}));
+ }
+ bool MetadataSucceeded = m_Storage.BuildStorage->PutBlockMetadata(m_BuildId, BlockHash, BlockMetaData);
+ if (MetadataSucceeded)
+ {
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput, "Uploaded block {} metadata ({})", BlockHash, NiceBytes(BlockMetaData.GetSize()));
+ }
+
+ NewBlocks.MetaDataHasBeenUploaded[BlockIndex] = true;
+ TempUploadStats.BlocksBytes += BlockMetaData.GetSize();
+ }
+
+ TempUploadStats.BlockCount++;
+
+ UploadedBlockCount++;
+ if (UploadedBlockCount == UploadBlockCount && UploadedChunkCount == UploadChunkCount)
+ {
+ FilteredUploadedBytesPerSecond.Stop();
+ }
+ }
+ });
+ };
+
+ auto AsyncUploadLooseChunk = [this,
+ LargeAttachmentSize,
+ &Work,
+ &UploadChunkPool,
+ &FilteredUploadedBytesPerSecond,
+ &UploadedBlockCount,
+ &UploadedChunkCount,
+ UploadBlockCount,
+ UploadChunkCount,
+ &UploadedCompressedChunkSize,
+ &UploadedRawChunkSize,
+ &TempUploadStats](const IoHash& RawHash, uint64_t RawSize, CompositeBuffer&& Payload) {
+ Work.ScheduleWork(
+ UploadChunkPool,
+ [this,
+ &Work,
+ LargeAttachmentSize,
+ &FilteredUploadedBytesPerSecond,
+ &UploadChunkPool,
+ &UploadedBlockCount,
+ &UploadedChunkCount,
+ UploadBlockCount,
+ UploadChunkCount,
+ &UploadedCompressedChunkSize,
+ &UploadedRawChunkSize,
+ &TempUploadStats,
+ RawHash,
+ RawSize,
+ Payload = CompositeBuffer(std::move(Payload))](std::atomic<bool>&) mutable {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("AsyncUploadLooseChunk");
+
+ const uint64_t PayloadSize = Payload.GetSize();
+
+ if (m_Storage.BuildCacheStorage)
+ {
+ m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload);
+ }
+
+ if (PayloadSize >= LargeAttachmentSize)
+ {
+ ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart");
+ TempUploadStats.MultipartAttachmentCount++;
+ std::vector<std::function<void()>> MultipartWork = m_Storage.BuildStorage->PutLargeBuildBlob(
+ m_BuildId,
+ RawHash,
+ ZenContentType::kCompressedBinary,
+ PayloadSize,
+ [Payload = std::move(Payload), &FilteredUploadedBytesPerSecond](uint64_t Offset,
+ uint64_t Size) mutable -> IoBuffer {
+ FilteredUploadedBytesPerSecond.Start();
+
+ IoBuffer PartPayload = Payload.Mid(Offset, Size).Flatten().AsIoBuffer();
+ PartPayload.SetContentType(ZenContentType::kBinary);
+ return PartPayload;
+ },
+ [RawSize,
+ &TempUploadStats,
+ &UploadedCompressedChunkSize,
+ &UploadChunkPool,
+ &UploadedBlockCount,
+ UploadBlockCount,
+ &UploadedChunkCount,
+ UploadChunkCount,
+ &FilteredUploadedBytesPerSecond,
+ &UploadedRawChunkSize](uint64_t SentBytes, bool IsComplete) {
+ TempUploadStats.ChunksBytes += SentBytes;
+ UploadedCompressedChunkSize += SentBytes;
+ if (IsComplete)
+ {
+ TempUploadStats.ChunkCount++;
+ UploadedChunkCount++;
+ if (UploadedBlockCount == UploadBlockCount && UploadedChunkCount == UploadChunkCount)
+ {
+ FilteredUploadedBytesPerSecond.Stop();
+ }
+ UploadedRawChunkSize += RawSize;
+ }
+ });
+ for (auto& WorkPart : MultipartWork)
+ {
+ Work.ScheduleWork(UploadChunkPool, [Work = std::move(WorkPart)](std::atomic<bool>& AbortFlag) {
+ ZEN_TRACE_CPU("AsyncUploadLooseChunk_Multipart_Work");
+ if (!AbortFlag)
+ {
+ Work();
+ }
+ });
+ }
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput, "Uploaded multipart chunk {} ({})", RawHash, NiceBytes(PayloadSize));
+ }
+ }
+ else
+ {
+ ZEN_TRACE_CPU("AsyncUploadLooseChunk_Singlepart");
+ m_Storage.BuildStorage->PutBuildBlob(m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload);
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput, "Uploaded chunk {} ({})", RawHash, NiceBytes(PayloadSize));
+ }
+ TempUploadStats.ChunksBytes += Payload.GetSize();
+ TempUploadStats.ChunkCount++;
+ UploadedCompressedChunkSize += Payload.GetSize();
+ UploadedRawChunkSize += RawSize;
+ UploadedChunkCount++;
+ if (UploadedChunkCount == UploadChunkCount)
+ {
+ FilteredUploadedBytesPerSecond.Stop();
+ }
+ }
+ }
+ });
+ };
+
+ std::vector<size_t> GenerateBlockIndexes;
+
+ std::atomic<uint64_t> GeneratedBlockCount = 0;
+ std::atomic<uint64_t> GeneratedBlockByteCount = 0;
+
+ std::atomic<uint64_t> QueuedPendingInMemoryBlocksForUpload = 0;
+
+ // Start generation of any non-prebuilt blocks and schedule upload
+ for (const size_t BlockIndex : BlockIndexes)
+ {
+ const IoHash& BlockHash = NewBlocks.BlockDescriptions[BlockIndex].BlockHash;
+ if (!m_AbortFlag)
+ {
+ Work.ScheduleWork(
+ ReadChunkPool,
+ [this,
+ BlockHash = IoHash(BlockHash),
+ BlockIndex,
+ &FilteredGenerateBlockBytesPerSecond,
+ &Content,
+ &Lookup,
+ &NewBlocks,
+ &NewBlockChunks,
+ &GenerateBlockIndexes,
+ &GeneratedBlockCount,
+ &GeneratedBlockByteCount,
+ &AsyncUploadBlock,
+ &QueuedPendingInMemoryBlocksForUpload](std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("UploadPartBlobs_GenerateBlock");
+
+ FilteredGenerateBlockBytesPerSecond.Start();
+
+ Stopwatch GenerateTimer;
+ CompositeBuffer Payload;
+ if (NewBlocks.BlockHeaders[BlockIndex])
+ {
+ Payload =
+ RebuildBlock(Content, Lookup, std::move(NewBlocks.BlockHeaders[BlockIndex]), NewBlockChunks[BlockIndex])
+ .GetCompressed();
+ }
+ else
+ {
+ ChunkBlockDescription BlockDescription;
+ CompressedBuffer CompressedBlock =
+ GenerateBlock(Content, Lookup, NewBlockChunks[BlockIndex], BlockDescription);
+ if (!CompressedBlock)
+ {
+ throw std::runtime_error(fmt::format("Failed generating block {}", BlockHash));
+ }
+ ZEN_ASSERT(BlockDescription.BlockHash == BlockHash);
+ Payload = std::move(CompressedBlock).GetCompressed();
+ }
+
+ GeneratedBlockByteCount += NewBlocks.BlockSizes[BlockIndex];
+ GeneratedBlockCount++;
+ if (GeneratedBlockCount == GenerateBlockIndexes.size())
+ {
+ FilteredGenerateBlockBytesPerSecond.Stop();
+ }
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "{} block {} ({}) containing {} chunks in {}",
+ NewBlocks.BlockHeaders[BlockIndex] ? "Regenerated" : "Generated",
+ NewBlocks.BlockDescriptions[BlockIndex].BlockHash,
+ NiceBytes(NewBlocks.BlockSizes[BlockIndex]),
+ NewBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size(),
+ NiceTimeSpanMs(GenerateTimer.GetElapsedTimeMs()));
+ }
+ if (!m_AbortFlag)
+ {
+ AsyncUploadBlock(BlockIndex, BlockHash, std::move(Payload), QueuedPendingInMemoryBlocksForUpload);
+ }
+ }
+ });
+ }
+ }
+
+ // Start compression of any non-precompressed loose chunks and schedule upload
+ for (const uint32_t LooseChunkOrderIndex : LooseChunkOrderIndexes)
+ {
+ const uint32_t ChunkIndex = LooseChunkIndexes[LooseChunkOrderIndex];
+ Work.ScheduleWork(
+ ReadChunkPool,
+ [this,
+ &Content,
+ &Lookup,
+ &TempLooseChunksStats,
+ &LooseChunkOrderIndexes,
+ &FilteredCompressedBytesPerSecond,
+ &TempUploadStats,
+ &AsyncUploadLooseChunk,
+ ChunkIndex](std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ ZEN_TRACE_CPU("UploadPartBlobs_CompressChunk");
+
+ FilteredCompressedBytesPerSecond.Start();
+ Stopwatch CompressTimer;
+ CompositeBuffer Payload = CompressChunk(Content, Lookup, ChunkIndex, TempLooseChunksStats);
+ if (m_Options.IsVerbose)
+ {
+ LOG_OUTPUT(m_LogOutput,
+ "Compressed chunk {} ({} -> {}) in {}",
+ Content.ChunkedContent.ChunkHashes[ChunkIndex],
+ NiceBytes(Content.ChunkedContent.ChunkRawSizes[ChunkIndex]),
+ NiceBytes(Payload.GetSize()),
+ NiceTimeSpanMs(CompressTimer.GetElapsedTimeMs()));
+ }
+ const uint64_t ChunkRawSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex];
+ TempUploadStats.ReadFromDiskBytes += ChunkRawSize;
+ if (TempLooseChunksStats.CompressedChunkCount == LooseChunkOrderIndexes.size())
+ {
+ FilteredCompressedBytesPerSecond.Stop();
+ }
+ if (!m_AbortFlag)
+ {
+ AsyncUploadLooseChunk(Content.ChunkedContent.ChunkHashes[ChunkIndex], ChunkRawSize, std::move(Payload));
+ }
+ }
+ });
+ }
+
+ Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
+ FilteredCompressedBytesPerSecond.Update(TempLooseChunksStats.CompressedChunkRawBytes.load());
+ FilteredGenerateBlockBytesPerSecond.Update(GeneratedBlockByteCount.load());
+ FilteredUploadedBytesPerSecond.Update(UploadedCompressedChunkSize.load() + UploadedBlockSize.load());
+ uint64_t UploadedRawSize = UploadedRawChunkSize.load() + UploadedBlockSize.load();
+ uint64_t UploadedCompressedSize = UploadedCompressedChunkSize.load() + UploadedBlockSize.load();
+
+ std::string Details = fmt::format(
+ "Compressed {}/{} ({}/{} {}B/s) chunks. "
+ "Uploaded {}/{} ({}/{}) blobs "
+ "({} {}bits/s)",
+ TempLooseChunksStats.CompressedChunkCount.load(),
+ LooseChunkOrderIndexes.size(),
+ NiceBytes(TempLooseChunksStats.CompressedChunkRawBytes),
+ NiceBytes(TotalLooseChunksSize),
+ NiceNum(FilteredCompressedBytesPerSecond.GetCurrent()),
+
+ UploadedBlockCount.load() + UploadedChunkCount.load(),
+ UploadBlockCount + UploadChunkCount,
+ NiceBytes(UploadedRawSize),
+ NiceBytes(TotalRawSize),
+
+ NiceBytes(UploadedCompressedSize),
+ NiceNum(FilteredUploadedBytesPerSecond.GetCurrent()));
+
+ Progress.UpdateState({.Task = "Uploading blobs ",
+ .Details = Details,
+ .TotalCount = gsl::narrow<uint64_t>(TotalRawSize),
+ .RemainingCount = gsl::narrow<uint64_t>(TotalRawSize - UploadedRawSize),
+ .Status = BuildOpLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ });
+
+ ZEN_ASSERT(m_AbortFlag || QueuedPendingInMemoryBlocksForUpload.load() == 0);
+
+ Progress.Finish();
+
+ TempUploadStats.ElapsedWallTimeUS += FilteredUploadedBytesPerSecond.GetElapsedTimeUS();
+ TempLooseChunksStats.CompressChunksElapsedWallTimeUS += FilteredCompressedBytesPerSecond.GetElapsedTimeUS();
+ }
+}
+
+CompositeBuffer
+BuildsOperationUploadFolder::CompressChunk(const ChunkedFolderContent& Content,
+ const ChunkedContentLookup& Lookup,
+ uint32_t ChunkIndex,
+ LooseChunksStatistics& TempLooseChunksStats)
+{
+ ZEN_TRACE_CPU("CompressChunk");
+ ZEN_ASSERT(!m_Options.TempDir.empty());
+ const IoHash& ChunkHash = Content.ChunkedContent.ChunkHashes[ChunkIndex];
+ const uint64_t ChunkSize = Content.ChunkedContent.ChunkRawSizes[ChunkIndex];
+
+ const ChunkedContentLookup::ChunkSequenceLocation& Source = GetChunkSequenceLocations(Lookup, ChunkIndex)[0];
+ const std::uint32_t PathIndex = Lookup.SequenceIndexFirstPathIndex[Source.SequenceIndex];
+ IoBuffer RawSource = IoBufferBuilder::MakeFromFile((m_Path / Content.Paths[PathIndex]).make_preferred(), Source.Offset, ChunkSize);
+ if (!RawSource)
+ {
+ throw std::runtime_error(fmt::format("Failed fetching chunk {}", ChunkHash));
+ }
+ if (RawSource.GetSize() != ChunkSize)
+ {
+ throw std::runtime_error(fmt::format("Fetched chunk {} has invalid size", ChunkHash));
+ }
+
+ const bool ShouldCompressChunk = IsChunkCompressable(Content, Lookup, ChunkIndex);
+ const OodleCompressionLevel CompressionLevel = ShouldCompressChunk ? OodleCompressionLevel::VeryFast : OodleCompressionLevel::None;
+
+ if (ShouldCompressChunk)
+ {
+ std::filesystem::path TempFilePath = m_Options.TempDir / ChunkHash.ToHexString();
+
+ BasicFile CompressedFile;
+ std::error_code Ec;
+ CompressedFile.Open(TempFilePath, BasicFile::Mode::kTruncateDelete, Ec);
+ if (Ec)
+ {
+ throw std::runtime_error(
+ fmt::format("Failed creating temporary file for compressing blob {}. Reason: {}", ChunkHash, Ec.message()));
+ }
+
+ uint64_t StreamRawBytes = 0;
+ uint64_t StreamCompressedBytes = 0;
+
+ bool CouldCompress = CompressedBuffer::CompressToStream(
+ CompositeBuffer(SharedBuffer(RawSource)),
+ [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
+ ZEN_UNUSED(SourceOffset);
+ TempLooseChunksStats.CompressedChunkRawBytes += SourceSize;
+ CompressedFile.Write(RangeBuffer, Offset);
+ TempLooseChunksStats.CompressedChunkBytes += RangeBuffer.GetSize();
+ StreamRawBytes += SourceSize;
+ StreamCompressedBytes += RangeBuffer.GetSize();
+ },
+ OodleCompressor::Mermaid,
+ CompressionLevel);
+ if (CouldCompress)
+ {
+ uint64_t CompressedSize = CompressedFile.FileSize();
+ void* FileHandle = CompressedFile.Detach();
+ IoBuffer TempPayload = IoBuffer(IoBuffer::File,
+ FileHandle,
+ 0,
+ CompressedSize,
+ /*IsWholeFile*/ true);
+ ZEN_ASSERT(TempPayload);
+ TempPayload.SetDeleteOnClose(true);
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(TempPayload), RawHash, RawSize);
+ ZEN_ASSERT(Compressed);
+ ZEN_ASSERT(RawHash == ChunkHash);
+ ZEN_ASSERT(RawSize == ChunkSize);
+
+ TempLooseChunksStats.CompressedChunkCount++;
+
+ return Compressed.GetCompressed();
+ }
+ else
+ {
+ TempLooseChunksStats.CompressedChunkRawBytes -= StreamRawBytes;
+ TempLooseChunksStats.CompressedChunkBytes -= StreamCompressedBytes;
+ }
+ CompressedFile.Close();
+ RemoveFile(TempFilePath, Ec);
+ ZEN_UNUSED(Ec);
+ }
+
+ CompressedBuffer CompressedBlob =
+ CompressedBuffer::Compress(SharedBuffer(std::move(RawSource)), OodleCompressor::Mermaid, CompressionLevel);
+ if (!CompressedBlob)
+ {
+ throw std::runtime_error(fmt::format("Failed to compress large blob {}", ChunkHash));
+ }
+ ZEN_ASSERT_SLOW(CompressedBlob.DecodeRawHash() == ChunkHash);
+ ZEN_ASSERT_SLOW(CompressedBlob.DecodeRawSize() == ChunkSize);
+
+ TempLooseChunksStats.CompressedChunkRawBytes += ChunkSize;
+ TempLooseChunksStats.CompressedChunkBytes += CompressedBlob.GetCompressedSize();
+
+ // If we use none-compression, the compressed blob references the data and has 64 kb in memory so we don't need to write it to disk
+ if (ShouldCompressChunk)
+ {
+ std::filesystem::path TempFilePath = m_Options.TempDir / (ChunkHash.ToHexString());
+ IoBuffer TempPayload = WriteToTempFile(std::move(CompressedBlob).GetCompressed(), TempFilePath);
+ CompressedBlob = CompressedBuffer::FromCompressedNoValidate(std::move(TempPayload));
+ }
+
+ TempLooseChunksStats.CompressedChunkCount++;
+ return std::move(CompressedBlob).GetCompressed();
+}
+
} // namespace zen